123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323 |
- #! /bin/python3
- import argparse as argp
- import datetime as dt
- import hashlib
- import json
- import os
- import threading as th
- import time
- from xml.etree import ElementTree as ET
- import pandas as pd
- import requests
- from pcaspy import Driver, PVInfo, SimpleServer, cas
- from pcaspy.tools import ServerThread
- from sqlalchemy import MetaData, Table, create_engine
- sleep_time = os.environ.get('SLEEP_TIME') or 1
- groups_url = 'http://adei-katrin.kaas.kit.edu/adei/services/list.php?target=groups'
- data_url_unformatted = 'http://adei-katrin.kaas.kit.edu/adei/services/getdata.php?format=csv&db_server=%s&db_name=%s&db_group=%s&db_mask=%s&window=%d,-1&rt=full'
- items_url_unformatted = 'http://adei-katrin.kaas.kit.edu/adei/services/list.php?target=items&db_server=%s&db_name=%s&db_group=%s'
- adei_dateformat = '%d-%b-%y %H:%M:%S.%f'
- skipped = ['110-VAO-8-0101-ZUST_Q', '110-VAO-8-0101-ZUST_TS', '110-VAO-8-0101-ZUST_Val', '320-VAO-3-0004-ZUST_Q',
- '320-VAO-3-0004-ZUST_TS', '320-VAO-3-0004-ZUST_Val', '610-VAO-3-0001-ZUST_Q', '610-VAO-3-0001-ZUST_TS',
- '610-VAO-3-0001-ZUST_Val', '620-RPH-8-5420-IST_Q', '620-RPH-8-5420-IST_TS', '620-RPH-8-5420-IST_Val',
- '620-RPP-8-5430-IST_Q', '620-RPP-8-5430-IST_TS', '620-RPP-8-5430-IST_Val', '620-VAO-3-0002-ZUST_Q',
- '620-VAO-3-0002-ZUST_TS', '620-VAO-3-0002-ZUST_Val', '630-VAO-8-6100-ZUST_Q', '630-VAO-8-6100-ZUST_TS',
- '630-VAO-8-6100-ZUST_Val', '630-VAO-8-6300-ZUST_Q', '630-VAO-8-6300-ZUST_TS', '630-VAO-8-6300-ZUST_Val',
- 'MoSVoltage']
- skipped_databases = ['katrinpse', 'BakeOut2013', 'mos0']
- def write_record(record):
- return f'{record["db_server"]},{record["db_name"]},{record["db_group"]},{record["db_mask"]},{record["pvName"]}'
- def write_records(records, filename='adei-replay', ):
- names = '\n'.join([write_record(record) for record in records])
- with open(f'{filename}.csv', 'w') as f:
- f.write('db_server,db_name,db_group,chid,uid\n')
- f.write(names)
- def get_groups(user, password):
- response = requests.get(groups_url, auth=(user, password))
- return ET.fromstring(response.text)
- def each_group_one_pv(user, password):
- records = []
- groups = get_groups(user, password)
- for group in groups:
- value = group.attrib.get('value', None)
- db_server = group.attrib['db_server']
- db_name = group.attrib['db_name']
- db_group = group.attrib['db_group']
- items = get_items(user, password, db_server, db_name, db_group)
- record = {'pvName': value, 'nelm': len(
- items), 'db_group': db_group, 'db_name': db_name, 'db_server': db_server, }
- records.append(record)
- return records
- def get_items(user, password, db_server, db_name, db_group):
- response = requests.get(items_url_unformatted % (
- db_server, db_name, db_group), auth=(user, password))
- return ET.fromstring(response.text) if response.text else []
- def get_item_record(pvName, db_server, db_name, db_group, db_mask, desc):
- return {'pvName': pvName, 'db_group': db_group,
- 'db_name': db_name, 'db_server': db_server, 'db_mask': db_mask, 'desc': desc}
- def get_group_item_records(user, password, db_server, db_name, db_group):
- records = []
- items = get_items(user, password, db_server, db_name, db_group)
- for item in items:
- channel_id = item.attrib['value']
- uid = item.attrib.get('uid')
- if uid not in skipped and db_name not in skipped_databases:
- pvRaw = f'{db_server}-{db_name}-{db_group}-{channel_id}'
- pvName = uid.split(' ')[0] if uid else pvRaw
- record = get_item_record(
- pvName, db_server, db_name, db_group, channel_id, pvRaw)
- records.append(record)
- return records
- def each_item_one_pv(user, password,):
- records = []
- groups = get_groups(user, password)
- for group in groups:
- db_server = group.attrib['db_server']
- db_name = group.attrib['db_name']
- db_group = group.attrib['db_group']
- group_records = get_group_item_records(
- user, password, db_server, db_name, db_group)
- records = records + group_records
- return records
- def read_adei_group(user, password, db_server=None, db_name=None, db_group=None, pvName=None, driver=None, **kwargs):
- last = None
- while True:
- now = dt.datetime.now()
- data_url = data_url_unformatted % (db_server, db_name, db_group, '', int(
- (last - now).total_seconds()) if last is not None else 1)
- resp = requests.get(data_url, auth=(user, password))
- last = now
- csv_values = resp.text.split(',')
- ts = dt.datetime.strptime(csv_values[0], adei_dateformat)
- driver.write(pvName, {
- 'value': [float(str(value).strip()) for value in csv_values[1:]],
- 'ts': ts.timestamp()
- })
- time.sleep(sleep_time)
- def read_adei_item_http(user, password, db_server=None, db_name=None, db_group=None, driver=None, **kwargs):
- items = get_group_item_records(
- user, password, db_server, db_name, db_group)
- latest = None
- while True:
- now = dt.datetime.now()
- data_url = data_url_unformatted % (db_server, db_name, db_group, '', int(
- (latest - now).total_seconds()) if latest is not None else 1)
- resp = requests.get(data_url, auth=(user, password))
- if resp.text:
- # if 'html' not in resp.text and 'ERROR' not in resp.text or 'Error' not in resp.text:
- latest = now
- csv_values = resp.text.split(',')
- values = csv_values[1:]
- for idx, record in enumerate(items):
- record_name = record['pvName']
- try:
- value_str = str(values[idx]).strip()
- if value_str and value_str.isnumeric():
- record_value = float(value_str)
- ts = dt.datetime.strptime(
- csv_values[0], adei_dateformat)
- driver.write(record_name, {
- 'value': record_value,
- 'ts': ts.timestamp()
- })
- except IndexError:
- print(resp.text)
- time.sleep(sleep_time)
- def md5it(it):
- return hashlib.md5(it.encode()).hexdigest()
- def read_adei_item(user, password, db_server=None, db_name=None, db_group=None, driver=None, engine=None, meta=None, records=None, **kwargs):
- items = records if records else get_group_item_records(
- user, password, db_server, db_name, db_group)
- table_name = f'cache0__{db_server}__{db_name}__{db_group}'
- if len(table_name) > 64 or not engine.dialect.has_table(engine, table_name):
- md5hash = md5it(f'__{db_server}__{db_name}__{db_group}')
- table_name = f'cache0__md5_{md5hash}'
- if not engine.dialect.has_table(engine, table_name):
- print(
- f'Table for (db_server={db_server}, db_name={db_name}, db_group={db_group},) does not exist. Skipping...')
- return
- table = Table(table_name, meta, autoload=True, autoload_with=engine)
- last_recorded = None
- while True:
- query = table.select().order_by(table.c['time'].desc()).limit(1)
- if last_recorded is not None:
- query = query.where(table.c['time'] > last_recorded)
- result = query.execute().fetchone()
- if result:
- for idx, record in enumerate(items):
- record_name = record['pvName']
- value_str = result[f'v{idx}']
- last_recorded = result['time']
- if value_str:
- record_value = float(value_str)
- driver.write(record_name, {
- 'value': record_value,
- 'ts': last_recorded.timestamp()
- })
- time.sleep(sleep_time)
- class AdeiDriver(Driver):
- def __init__(self, server):
- super(AdeiDriver, self).__init__()
- self.server = server
- def setParam(self, reason, value):
- super().setParam(reason, value['value'])
- ts = value['ts']
- self.pvDB[reason].time.secPastEpoch = int(ts - 631152000)
- self.pvDB[reason].time.nsec = int((ts - int(ts)) * 1e9)
- self.pvDB[reason].mask = (cas.DBE_LOG | cas.DBE_VALUE)
- self.updatePV(reason)
- if __name__ == '__main__':
- user = os.environ.get('ADEI_USER')
- password = os.environ.get('ADEI_PASSWORD')
- if user is None or password is None:
- print('Bad ADEI authentication properties. Aborting...')
- exit(255)
- parser = argp.ArgumentParser()
- parser.add_argument('generate', default=False, nargs='?')
- parser.add_argument('run', default=True, nargs='?')
- parser.add_argument('--item', action='store_true', default=True)
- parser.add_argument('--group', action='store_true')
- parser.add_argument('--non-cached', action='store_true', default=False)
- args = parser.parse_args()
- print('Bootstrapping...')
- bootstrap_start = dt.datetime.now()
- if args.non_cached:
- groups = each_group_one_pv(user, password)
- print('Total groups', len(groups))
- if args.item:
- print('Each ADEI Item is a PV')
- records = each_item_one_pv(user, password,)
- elif args.group:
- print('Each ADEI Group is a PV')
- records = groups
- else:
- df = pd.read_csv('./adei-replay.csv')
- dfg = df.groupby(['db_server', 'db_name', 'db_group'])
- groups = []
- records = []
- for key, items in dfg:
- group_records = [
- get_item_record(record[1]['uid'], record[1]['db_server'], record[1]['db_name'],
- record[1]['db_group'], record[1]['chid'], f'{record[1]["db_server"]}, {record[1]["db_name"]}, {record[1]["db_group"]}, {record[1]["chid"]}') for record in items.iterrows()
- ]
- groups.append({'pvName': '_'.join(key), 'nelm': len(
- items), 'db_group': key[2], 'db_name': key[1], 'db_server': key[0], 'records': group_records})
- records = records + group_records
- print('Total Records', len(records))
- if args.generate:
- write_records(records)
- exit(0)
- adei_db_host = os.environ.get('ADEI_DB_HOST')
- adei_db_user = os.environ.get('ADEI_DB_USER')
- adei_db_password = os.environ.get('ADEI_DB_PASSWORD')
- if adei_db_host is None or adei_db_user is None or adei_db_password is None:
- print('Bad ADEI DB properties. Aborting...')
- exit(255)
- pvdb = {}
- if args.item or not args.non_cached:
- for record in records:
- pvName = record['pvName']
- desc = record['desc']
- pvdb[pvName] = {
- 'type': 'float',
- 'desc': desc
- }
- elif args.group:
- for record in records:
- pvName = record['pvName']
- desc = record['desc']
- pvdb[pvName] = {
- 'type': 'float',
- 'desc': desc,
- 'count': record['nelm']
- }
- server = SimpleServer()
- server.createPV('adei:', pvdb)
- driver = AdeiDriver(server)
- server_thread = ServerThread(server)
- server_thread.start()
- engine = create_engine(
- f'mysql+pymysql://{adei_db_user}:{adei_db_password}@{adei_db_host}/adei_katrin', pool_size=30)
- katrin = MetaData(bind=engine)
- bootstrap_end = dt.datetime.now()
- print('Boostrapped in', (bootstrap_end -
- bootstrap_start).total_seconds(), 'seconds')
- threads = []
- now = dt.datetime.now()
- print(f'[{now}] Starting Threads...')
- if args.group:
- threads = [th.Thread(target=read_adei_group, args=(
- user, password,), kwargs={**record, 'driver': driver}) for record in groups]
- elif args.item or not args.non_cached:
- threads = [th.Thread(target=read_adei_item, args=(
- user, password,), kwargs={**group, 'driver': driver, 'engine': engine, 'meta': katrin}) for group in groups]
- threads_started = 0
- for thread in threads:
- thread.start()
- threads_started += 1
- if threads_started == 10:
- threads_started = 0
- time.sleep(5)
- now = dt.datetime.now()
- print(f'[{now}] Started All Threads')
- for thread in threads:
- thread.join()
- server_thread.stop()
- print('Server Stopped')
|