adeireplay.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323
  1. #! /bin/python3
  2. import argparse as argp
  3. import datetime as dt
  4. import hashlib
  5. import json
  6. import os
  7. import threading as th
  8. import time
  9. from xml.etree import ElementTree as ET
  10. import pandas as pd
  11. import requests
  12. from pcaspy import Driver, PVInfo, SimpleServer, cas
  13. from pcaspy.tools import ServerThread
  14. from sqlalchemy import MetaData, Table, create_engine
  15. sleep_time = os.environ.get('SLEEP_TIME') or 1
  16. groups_url = 'http://adei-katrin.kaas.kit.edu/adei/services/list.php?target=groups'
  17. data_url_unformatted = 'http://adei-katrin.kaas.kit.edu/adei/services/getdata.php?format=csv&db_server=%s&db_name=%s&db_group=%s&db_mask=%s&window=%d,-1&rt=full'
  18. items_url_unformatted = 'http://adei-katrin.kaas.kit.edu/adei/services/list.php?target=items&db_server=%s&db_name=%s&db_group=%s'
  19. adei_dateformat = '%d-%b-%y %H:%M:%S.%f'
  20. skipped = ['110-VAO-8-0101-ZUST_Q', '110-VAO-8-0101-ZUST_TS', '110-VAO-8-0101-ZUST_Val', '320-VAO-3-0004-ZUST_Q',
  21. '320-VAO-3-0004-ZUST_TS', '320-VAO-3-0004-ZUST_Val', '610-VAO-3-0001-ZUST_Q', '610-VAO-3-0001-ZUST_TS',
  22. '610-VAO-3-0001-ZUST_Val', '620-RPH-8-5420-IST_Q', '620-RPH-8-5420-IST_TS', '620-RPH-8-5420-IST_Val',
  23. '620-RPP-8-5430-IST_Q', '620-RPP-8-5430-IST_TS', '620-RPP-8-5430-IST_Val', '620-VAO-3-0002-ZUST_Q',
  24. '620-VAO-3-0002-ZUST_TS', '620-VAO-3-0002-ZUST_Val', '630-VAO-8-6100-ZUST_Q', '630-VAO-8-6100-ZUST_TS',
  25. '630-VAO-8-6100-ZUST_Val', '630-VAO-8-6300-ZUST_Q', '630-VAO-8-6300-ZUST_TS', '630-VAO-8-6300-ZUST_Val',
  26. 'MoSVoltage']
  27. skipped_databases = ['katrinpse', 'BakeOut2013', 'mos0']
  28. def write_record(record):
  29. return f'{record["db_server"]},{record["db_name"]},{record["db_group"]},{record["db_mask"]},{record["pvName"]}'
  30. def write_records(records, filename='adei-replay', ):
  31. names = '\n'.join([write_record(record) for record in records])
  32. with open(f'{filename}.csv', 'w') as f:
  33. f.write('db_server,db_name,db_group,chid,uid\n')
  34. f.write(names)
  35. def get_groups(user, password):
  36. response = requests.get(groups_url, auth=(user, password))
  37. return ET.fromstring(response.text)
  38. def each_group_one_pv(user, password):
  39. records = []
  40. groups = get_groups(user, password)
  41. for group in groups:
  42. value = group.attrib.get('value', None)
  43. db_server = group.attrib['db_server']
  44. db_name = group.attrib['db_name']
  45. db_group = group.attrib['db_group']
  46. items = get_items(user, password, db_server, db_name, db_group)
  47. record = {'pvName': value, 'nelm': len(
  48. items), 'db_group': db_group, 'db_name': db_name, 'db_server': db_server, }
  49. records.append(record)
  50. return records
  51. def get_items(user, password, db_server, db_name, db_group):
  52. response = requests.get(items_url_unformatted % (
  53. db_server, db_name, db_group), auth=(user, password))
  54. return ET.fromstring(response.text) if response.text else []
  55. def get_item_record(pvName, db_server, db_name, db_group, db_mask, desc):
  56. return {'pvName': pvName, 'db_group': db_group,
  57. 'db_name': db_name, 'db_server': db_server, 'db_mask': db_mask, 'desc': desc}
  58. def get_group_item_records(user, password, db_server, db_name, db_group):
  59. records = []
  60. items = get_items(user, password, db_server, db_name, db_group)
  61. for item in items:
  62. channel_id = item.attrib['value']
  63. uid = item.attrib.get('uid')
  64. if uid not in skipped and db_name not in skipped_databases:
  65. pvRaw = f'{db_server}-{db_name}-{db_group}-{channel_id}'
  66. pvName = uid.split(' ')[0] if uid else pvRaw
  67. record = get_item_record(
  68. pvName, db_server, db_name, db_group, channel_id, pvRaw)
  69. records.append(record)
  70. return records
  71. def each_item_one_pv(user, password,):
  72. records = []
  73. groups = get_groups(user, password)
  74. for group in groups:
  75. db_server = group.attrib['db_server']
  76. db_name = group.attrib['db_name']
  77. db_group = group.attrib['db_group']
  78. group_records = get_group_item_records(
  79. user, password, db_server, db_name, db_group)
  80. records = records + group_records
  81. return records
  82. def read_adei_group(user, password, db_server=None, db_name=None, db_group=None, pvName=None, driver=None, **kwargs):
  83. last = None
  84. while True:
  85. now = dt.datetime.now()
  86. data_url = data_url_unformatted % (db_server, db_name, db_group, '', int(
  87. (last - now).total_seconds()) if last is not None else 1)
  88. resp = requests.get(data_url, auth=(user, password))
  89. last = now
  90. csv_values = resp.text.split(',')
  91. ts = dt.datetime.strptime(csv_values[0], adei_dateformat)
  92. driver.write(pvName, {
  93. 'value': [float(str(value).strip()) for value in csv_values[1:]],
  94. 'ts': ts.timestamp()
  95. })
  96. time.sleep(sleep_time)
  97. def read_adei_item_http(user, password, db_server=None, db_name=None, db_group=None, driver=None, **kwargs):
  98. items = get_group_item_records(
  99. user, password, db_server, db_name, db_group)
  100. latest = None
  101. while True:
  102. now = dt.datetime.now()
  103. data_url = data_url_unformatted % (db_server, db_name, db_group, '', int(
  104. (latest - now).total_seconds()) if latest is not None else 1)
  105. resp = requests.get(data_url, auth=(user, password))
  106. if resp.text:
  107. # if 'html' not in resp.text and 'ERROR' not in resp.text or 'Error' not in resp.text:
  108. latest = now
  109. csv_values = resp.text.split(',')
  110. values = csv_values[1:]
  111. for idx, record in enumerate(items):
  112. record_name = record['pvName']
  113. try:
  114. value_str = str(values[idx]).strip()
  115. if value_str and value_str.isnumeric():
  116. record_value = float(value_str)
  117. ts = dt.datetime.strptime(
  118. csv_values[0], adei_dateformat)
  119. driver.write(record_name, {
  120. 'value': record_value,
  121. 'ts': ts.timestamp()
  122. })
  123. except IndexError:
  124. print(resp.text)
  125. time.sleep(sleep_time)
  126. def md5it(it):
  127. return hashlib.md5(it.encode()).hexdigest()
  128. def read_adei_item(user, password, db_server=None, db_name=None, db_group=None, driver=None, engine=None, meta=None, records=None, **kwargs):
  129. items = records if records else get_group_item_records(
  130. user, password, db_server, db_name, db_group)
  131. table_name = f'cache0__{db_server}__{db_name}__{db_group}'
  132. if len(table_name) > 64 or not engine.dialect.has_table(engine, table_name):
  133. md5hash = md5it(f'__{db_server}__{db_name}__{db_group}')
  134. table_name = f'cache0__md5_{md5hash}'
  135. if not engine.dialect.has_table(engine, table_name):
  136. print(
  137. f'Table for (db_server={db_server}, db_name={db_name}, db_group={db_group},) does not exist. Skipping...')
  138. return
  139. table = Table(table_name, meta, autoload=True, autoload_with=engine)
  140. last_recorded = None
  141. while True:
  142. query = table.select().order_by(table.c['time'].desc()).limit(1)
  143. if last_recorded is not None:
  144. query = query.where(table.c['time'] > last_recorded)
  145. result = query.execute().fetchone()
  146. if result:
  147. for idx, record in enumerate(items):
  148. record_name = record['pvName']
  149. value_str = result[f'v{idx}']
  150. last_recorded = result['time']
  151. if value_str:
  152. record_value = float(value_str)
  153. driver.write(record_name, {
  154. 'value': record_value,
  155. 'ts': last_recorded.timestamp()
  156. })
  157. time.sleep(sleep_time)
  158. class AdeiDriver(Driver):
  159. def __init__(self, server):
  160. super(AdeiDriver, self).__init__()
  161. self.server = server
  162. def setParam(self, reason, value):
  163. super().setParam(reason, value['value'])
  164. ts = value['ts']
  165. self.pvDB[reason].time.secPastEpoch = int(ts - 631152000)
  166. self.pvDB[reason].time.nsec = int((ts - int(ts)) * 1e9)
  167. self.pvDB[reason].mask = (cas.DBE_LOG | cas.DBE_VALUE)
  168. self.updatePV(reason)
  169. if __name__ == '__main__':
  170. user = os.environ.get('ADEI_USER')
  171. password = os.environ.get('ADEI_PASSWORD')
  172. if user is None or password is None:
  173. print('Bad ADEI authentication properties. Aborting...')
  174. exit(255)
  175. parser = argp.ArgumentParser()
  176. parser.add_argument('generate', default=False, nargs='?')
  177. parser.add_argument('run', default=True, nargs='?')
  178. parser.add_argument('--item', action='store_true', default=True)
  179. parser.add_argument('--group', action='store_true')
  180. parser.add_argument('--non-cached', action='store_true', default=False)
  181. args = parser.parse_args()
  182. print('Bootstrapping...')
  183. bootstrap_start = dt.datetime.now()
  184. if args.non_cached:
  185. groups = each_group_one_pv(user, password)
  186. print('Total groups', len(groups))
  187. if args.item:
  188. print('Each ADEI Item is a PV')
  189. records = each_item_one_pv(user, password,)
  190. elif args.group:
  191. print('Each ADEI Group is a PV')
  192. records = groups
  193. else:
  194. df = pd.read_csv('./adei-replay.csv')
  195. dfg = df.groupby(['db_server', 'db_name', 'db_group'])
  196. groups = []
  197. records = []
  198. for key, items in dfg:
  199. group_records = [
  200. get_item_record(record[1]['uid'], record[1]['db_server'], record[1]['db_name'],
  201. record[1]['db_group'], record[1]['chid'], f'{record[1]["db_server"]}, {record[1]["db_name"]}, {record[1]["db_group"]}, {record[1]["chid"]}') for record in items.iterrows()
  202. ]
  203. groups.append({'pvName': '_'.join(key), 'nelm': len(
  204. items), 'db_group': key[2], 'db_name': key[1], 'db_server': key[0], 'records': group_records})
  205. records = records + group_records
  206. print('Total Records', len(records))
  207. if args.generate:
  208. write_records(records)
  209. exit(0)
  210. adei_db_host = os.environ.get('ADEI_DB_HOST')
  211. adei_db_user = os.environ.get('ADEI_DB_USER')
  212. adei_db_password = os.environ.get('ADEI_DB_PASSWORD')
  213. if adei_db_host is None or adei_db_user is None or adei_db_password is None:
  214. print('Bad ADEI DB properties. Aborting...')
  215. exit(255)
  216. pvdb = {}
  217. if args.item or not args.non_cached:
  218. for record in records:
  219. pvName = record['pvName']
  220. desc = record['desc']
  221. pvdb[pvName] = {
  222. 'type': 'float',
  223. 'desc': desc
  224. }
  225. elif args.group:
  226. for record in records:
  227. pvName = record['pvName']
  228. desc = record['desc']
  229. pvdb[pvName] = {
  230. 'type': 'float',
  231. 'desc': desc,
  232. 'count': record['nelm']
  233. }
  234. server = SimpleServer()
  235. server.createPV('adei:', pvdb)
  236. driver = AdeiDriver(server)
  237. server_thread = ServerThread(server)
  238. server_thread.start()
  239. engine = create_engine(
  240. f'mysql+pymysql://{adei_db_user}:{adei_db_password}@{adei_db_host}/adei_katrin', pool_size=30)
  241. katrin = MetaData(bind=engine)
  242. bootstrap_end = dt.datetime.now()
  243. print('Boostrapped in', (bootstrap_end -
  244. bootstrap_start).total_seconds(), 'seconds')
  245. threads = []
  246. now = dt.datetime.now()
  247. print(f'[{now}] Starting Threads...')
  248. if args.group:
  249. threads = [th.Thread(target=read_adei_group, args=(
  250. user, password,), kwargs={**record, 'driver': driver}) for record in groups]
  251. elif args.item or not args.non_cached:
  252. threads = [th.Thread(target=read_adei_item, args=(
  253. user, password,), kwargs={**group, 'driver': driver, 'engine': engine, 'meta': katrin}) for group in groups]
  254. threads_started = 0
  255. for thread in threads:
  256. thread.start()
  257. threads_started += 1
  258. if threads_started == 10:
  259. threads_started = 0
  260. time.sleep(5)
  261. now = dt.datetime.now()
  262. print(f'[{now}] Started All Threads')
  263. for thread in threads:
  264. thread.join()
  265. server_thread.stop()
  266. print('Server Stopped')