opa.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516
  1. import os
  2. import sys
  3. import yaml
  4. import requests
  5. import MySQLdb
  6. import shutil
  7. from datetime import date
  8. import csv
  9. import urllib2
  10. import tornado.escape
  11. import tornado.ioloop
  12. import tornado.web
  13. import tornado.autoreload
  14. from tornado.escape import json_decode
  15. from tornado.escape import json_encode
  16. from threading import Timer
  17. import collections
  18. root = os.path.dirname(__file__)
  19. with open("config.yaml", 'r') as stream:
  20. try:
  21. #print(yaml.load(stream))
  22. config = yaml.load(stream)
  23. except yaml.YAMLError as exc:
  24. print(exc)
  25. if config == None:
  26. print("Error: Empty configuration file.")
  27. sys.exit(1)
  28. class RepeatedTimer(object):
  29. def __init__(self, interval, function, *args, **kwargs):
  30. self._timer = None
  31. self.interval = interval
  32. self.function = function
  33. self.args = args
  34. self.kwargs = kwargs
  35. self.is_running = False
  36. self.start()
  37. def _run(self):
  38. self.is_running = False
  39. self.start()
  40. self.function(*self.args, **self.kwargs)
  41. def start(self):
  42. if not self.is_running:
  43. self._timer = Timer(self.interval, self._run)
  44. self._timer.start()
  45. self.is_running = True
  46. def stop(self):
  47. self._timer.cancel()
  48. self.is_running = False
  49. def setInterval(self, interval):
  50. self.interval = interval
  51. def fetchDataMySQL(sort_name):
  52. # config contains db setup
  53. # varname consist column name
  54. with open("varname.yaml", 'r') as stream:
  55. try:
  56. #print(yaml.load(stream))
  57. varname = yaml.load(stream)
  58. except yaml.YAMLError as exc:
  59. print(exc)
  60. if varname == None:
  61. print("Error: Empty varname file.")
  62. return
  63. # first write to tmp.yml
  64. db = MySQLdb.connect(config["server"],
  65. config["user"],
  66. config["password"],
  67. config["database"])
  68. # prepare a cursor object using cursor() method
  69. cursor = db.cursor()
  70. # execute SQL query using execute() method.
  71. sql = "SELECT "
  72. for table_name in varname:
  73. column_names = varname[table_name]
  74. if isinstance(column_names, list):
  75. sql += ",".join(column_names)
  76. else:
  77. sql += column_names
  78. sql += " FROM "
  79. sql += table_name
  80. sql += " ORDER BY "
  81. sql += sort_name
  82. sql += " DESC LIMIT 1"
  83. print sql
  84. cursor.execute(sql)
  85. # Fetch a single row using fetchone() method.
  86. data = cursor.fetchone()
  87. print data
  88. # when ready then copy to cache.yml
  89. print config
  90. print varname
  91. cache_data = {}
  92. if isinstance(varname[table_name], list):
  93. for i, item in enumerate(varname[table_name]):
  94. cache_data[item] = float(data[i])
  95. else:
  96. cache_data[varname[table_name]] = float(data[0])
  97. print cache_data
  98. with open(".tmp.yaml", 'w') as stream_tmp:
  99. stream_tmp.write(yaml.dump(cache_data, default_flow_style=False))
  100. src_file = config["path"] + ".tmp.yaml"
  101. dst_file = config["path"] + "cache.yaml"
  102. shutil.copy(src_file, dst_file)
  103. def fetchDataADEI():
  104. """
  105. with open("config.yaml", 'r') as stream:
  106. try:
  107. #print(yaml.load(stream))
  108. config = yaml.load(stream)
  109. except yaml.YAMLError as exc:
  110. print(exc)
  111. if config == None:
  112. print("Error: Empty configuration file.")
  113. return
  114. """
  115. if os.path.isfile(config["path"]+".mutex"):
  116. print("Process running...")
  117. return
  118. else:
  119. print("Created mutex")
  120. file = open(config["path"]+'.mutex', 'w+')
  121. with open("varname.yaml", 'r') as stream:
  122. try:
  123. #print(yaml.load(stream))
  124. varname = yaml.load(stream)
  125. except yaml.YAMLError as exc:
  126. print(exc)
  127. if varname == None:
  128. print("Error: Empty varname file.")
  129. return
  130. cache_data = {}
  131. for param in varname:
  132. print param
  133. dest = config['server'] + config['script']
  134. url = dest + "?" + varname[param]
  135. print url
  136. data = requests.get(url,
  137. auth=(config['username'],
  138. config['password'])).content
  139. #tmp_data = data.content
  140. last_value = data.split(",")[-1].strip()
  141. print last_value
  142. cache_data[param] = last_value
  143. with open(".tmp.yaml", 'w') as stream_tmp:
  144. stream_tmp.write(yaml.dump(cache_data, default_flow_style=False))
  145. src_file = config["path"] + ".tmp.yaml"
  146. dst_file = config["path"] + "cache.yaml"
  147. shutil.copy(src_file, dst_file)
  148. os.remove(config["path"]+".mutex")
  149. """
  150. with open("config.yaml", 'r') as stream:
  151. try:
  152. #print(yaml.load(stream))
  153. config = yaml.load(stream)
  154. except yaml.YAMLError as exc:
  155. print(exc)
  156. if config == None:
  157. print("Error: Empty configuration file.")
  158. sys.exit(1)
  159. """
  160. print "Start torrenting..."
  161. # it auto-starts, no need of rt.start()
  162. print "Debugging..."
  163. # TODO: Turn off for debug
  164. rt = RepeatedTimer(config["polling"], fetchDataADEI)
  165. class ListHandler(tornado.web.RequestHandler):
  166. def get(self):
  167. with open("cache.yaml", 'r') as stream:
  168. try:
  169. #print(yaml.load(stream))
  170. response = yaml.load(stream)
  171. except yaml.YAMLError as exc:
  172. print(exc)
  173. if response == None:
  174. response = {"error": "No data entry."}
  175. print response
  176. self.write(response)
  177. class StartHandler(tornado.web.RequestHandler):
  178. def get(self):
  179. print "Start fetchData"
  180. rt.start()
  181. class StopHandler(tornado.web.RequestHandler):
  182. def get(self):
  183. print "Stop fetchData"
  184. rt.stop()
  185. if os.path.isfile(config["path"]+".mutex"):
  186. os.remove(config["path"]+".mutex")
  187. class SetTimerHandler(tornado.web.RequestHandler):
  188. def get(self, duration):
  189. print "Set interval"
  190. rt.setInterval(float(duration))
  191. class AddHandler(tornado.web.RequestHandler):
  192. def get(self, **params):
  193. print params
  194. table_name = str(params["table_name"])
  195. column_name = str(params["column_name"])
  196. response = {}
  197. """
  198. with open("config.yaml", 'r') as stream:
  199. try:
  200. #print(yaml.load(stream))
  201. config = yaml.load(stream)
  202. except yaml.YAMLError as exc:
  203. print(exc)
  204. if config == None:
  205. print("Error: Empty configuration file.")
  206. return
  207. """
  208. if config["type"] == "mysql":
  209. print("Inside MySQL block:")
  210. print config
  211. # Open database connection
  212. db = MySQLdb.connect(config["server"],
  213. config["user"],
  214. config["password"],
  215. config["database"])
  216. # prepare a cursor object using cursor() method
  217. cursor = db.cursor()
  218. # execute SQL query using execute() method.
  219. sql = "SHOW COLUMNS FROM `" + params["table_name"] + "` LIKE '" + params["column_name"] + "'"
  220. #print sql
  221. cursor.execute(sql)
  222. # Fetch a single row using fetchone() method.
  223. data = cursor.fetchone()
  224. db.close()
  225. if data == None:
  226. response = {"error": "Data name not valid."}
  227. else:
  228. # column name available
  229. # store in yaml file
  230. with open("varname.yaml", 'r') as stream:
  231. try:
  232. #print(yaml.load(stream))
  233. cache_data = yaml.load(stream)
  234. except yaml.YAMLError as exc:
  235. print(exc)
  236. if cache_data == None:
  237. cache_data = {table_name: column_name}
  238. else:
  239. if table_name in cache_data:
  240. tmp = cache_data[table_name]
  241. print tmp
  242. if isinstance(tmp, list):
  243. tmp_lst = tmp
  244. else:
  245. tmp_lst = [tmp]
  246. tmp_lst.append(column_name)
  247. # remove redundant in list
  248. tmp_lst = list(set(tmp_lst))
  249. cache_data[table_name] = tmp_lst
  250. else:
  251. cache_data[table_name] = column_name
  252. with open("varname.yaml", 'w') as output:
  253. output.write(yaml.dump(cache_data, default_flow_style=False))
  254. response = {"success": "Data entry inserted."}
  255. self.write(response)
  256. class DesignerHandler(tornado.web.RequestHandler):
  257. # TODO: Need to load and pass style to client
  258. # If user did nothing, save should actually save the
  259. # same content.
  260. def get(self):
  261. print "In designer mode."
  262. with open("cache.yaml", 'r') as stream:
  263. try:
  264. #print(yaml.load(stream))
  265. cache_data = yaml.load(stream)
  266. except yaml.YAMLError as exc:
  267. print(exc)
  268. if cache_data == None:
  269. print("Error: Empty cache data file.")
  270. return
  271. print cache_data
  272. with open("style.yaml", 'r') as stream:
  273. try:
  274. #print(yaml.load(stream))
  275. style_data = yaml.load(stream)
  276. except yaml.YAMLError as exc:
  277. print(exc)
  278. data = {
  279. "cache": cache_data,
  280. "style": style_data
  281. }
  282. self.render('designer.html', data=data)
  283. class VersionHandler(tornado.web.RequestHandler):
  284. def get(self):
  285. response = {'version': '0.0.1',
  286. 'last_build': date.today().isoformat()}
  287. self.write(response)
  288. class SaveHandler(tornado.web.RequestHandler):
  289. def post(self):
  290. print self.request.body
  291. json_obj = json_decode(self.request.body)
  292. print('Post data received')
  293. with open("style.yaml", 'w') as output:
  294. output.write(yaml.safe_dump(json_obj, encoding='utf-8', allow_unicode=True, default_flow_style=False))
  295. response = {"success": "Data entry inserted."}
  296. #self.write(json.dumps(response))
  297. class StatusHandler(tornado.web.RequestHandler):
  298. def get(self):
  299. print "In status mode."
  300. with open("style.yaml", 'r') as stream:
  301. try:
  302. #print(yaml.load(stream))
  303. style_data = yaml.load(stream)
  304. except yaml.YAMLError as exc:
  305. print(exc)
  306. if style_data == None:
  307. print("Error: Empty style data file.")
  308. return
  309. data = {
  310. "style": style_data
  311. }
  312. self.render('status.html', data=data)
  313. class AdeiKatrinHandler(tornado.web.RequestHandler):
  314. def get(self, **params):
  315. print params
  316. sensor_name = str(params["sensor_name"])
  317. """
  318. {'db_group': u'320_KRY_Kryo_4K_CurLead',
  319. 'db_name': u'ControlSystem_CPS',
  320. 'sensor_name': u'320-RTP-8-1103',
  321. 'db_server': u'cscps',
  322. 'control_group': u'320_KRY_Kryo_3K'}
  323. """
  324. """
  325. with open("config.yaml", 'r') as stream:
  326. try:
  327. #print(yaml.load(stream))
  328. config = yaml.load(stream)
  329. except yaml.YAMLError as exc:
  330. print(exc)
  331. if config == None:
  332. print("Error: Empty configuration file.")
  333. return
  334. """
  335. if config["type"] != "adei":
  336. print("Error: Wrong handler.")
  337. return
  338. """
  339. requests.get('http://katrin.kit.edu/adei-katrin/services/getdata.php?db_server=cscps&db_name=ControlSystem_CPS&db_group=320_KRY_Kryo_4K_CurLead&control_group=320_KRY_Kryo_3K&db_mask=33&window=-1')
  340. requests.get(url, auth=(username, password))
  341. """
  342. print config
  343. dest = config['server'] + config['script']
  344. query_cmds = []
  345. query_cmds.append("db_server="+str(params['db_server']))
  346. query_cmds.append("db_name="+str(params['db_name']))
  347. query_cmds.append("db_group="+str(params['db_group']))
  348. query_cmds.append("db_mask=all")
  349. query_cmds.append("window=-1")
  350. query = "&".join(query_cmds)
  351. url = dest + "?" + query
  352. print url
  353. # get the db_masks
  354. # store the query command in varname
  355. data = requests.get(url, auth=(config['username'], config['password']))
  356. cr = data.content
  357. cr = cr.split(",")
  358. print cr, len(cr)
  359. # parameter name stored in ADEI with '-IST_Val' suffix
  360. match_token = params['sensor_name'] + "-IST_Val"
  361. db_mask = None
  362. for i, item in enumerate(cr):
  363. if item.strip() == match_token:
  364. db_mask = i - 1
  365. if db_mask == None:
  366. response = {"Error": "Cannot find variable on ADEI server."}
  367. self.write(response)
  368. return
  369. query_cmds.append("db_mask="+str(db_mask))
  370. query = "&".join(query_cmds)
  371. # column name available
  372. # store in yaml file
  373. with open("varname.yaml", 'r') as stream:
  374. try:
  375. #print(yaml.load(stream))
  376. cache_data = yaml.load(stream)
  377. except yaml.YAMLError as exc:
  378. print(exc)
  379. print "CHECK THIS"
  380. print sensor_name, query
  381. print cache_data
  382. if cache_data == None:
  383. cache_data = {}
  384. cache_data[sensor_name] = query
  385. else:
  386. if sensor_name not in cache_data:
  387. cache_data[sensor_name] = query
  388. else:
  389. response = {"Error": "Variable already available in varname file."}
  390. self.write(response)
  391. return
  392. with open("varname.yaml", 'w') as output:
  393. output.write(yaml.dump(cache_data, default_flow_style=False))
  394. response = {"success": "Data entry inserted."}
  395. print match_token, db_mask
  396. self.write(response)
  397. class GetDataHandler(tornado.web.RequestHandler):
  398. def get(self):
  399. with open("cache.yaml", 'r') as stream:
  400. try:
  401. #print(yaml.load(stream))
  402. cache_data = yaml.load(stream)
  403. except yaml.YAMLError as exc:
  404. print(exc)
  405. print("GetData:")
  406. print cache_data
  407. self.write(cache_data)
  408. application = tornado.web.Application([
  409. (r"/version", VersionHandler),
  410. (r"/list", ListHandler),
  411. (r"/start", StartHandler),
  412. (r"/stop", StopHandler),
  413. (r"/designer", DesignerHandler),
  414. (r"/status", StatusHandler),
  415. (r"/save/", SaveHandler),
  416. (r"/getdata/", GetDataHandler),
  417. (r"/timer/(?P<duration>[^\/]+)/?", SetTimerHandler),
  418. (r"/mysql/add/(?P<table_name>[^\/]+)/?(?P<column_name>[^\/]+)?", AddHandler),
  419. (r"/add/(?P<db_server>[^\/]+)/?(?P<db_name>[^\/]+)/?(?P<db_group>[^\/]+)/?(?P<sensor_name>[^\/]+)?", AdeiKatrinHandler)
  420. ], debug=True, static_path=os.path.join(root, 'static'), js_path=os.path.join(root, 'js'))
  421. if __name__ == "__main__":
  422. application.listen(8888)
  423. tornado.autoreload.start()
  424. #tornado.autoreload.watch('myfile')
  425. tornado.ioloop.IOLoop.instance().start()