123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503 |
- from PyQt4 import QtGui
- from ..base import kcgwidget as kcgw
- from ..base.globals import glob as global_objects
- from ..base import log
- from .. import config
- import sys
- if sys.version_info[:3] < (3,0):
- import ConfigParser as configparser
- else:
- import configparser
- import os
- import logging
- from functools import partial
- import numpy as np
- __widget_id__ = None
- no_epics = True
- try: # try to import epics
- import epics
- no_epics = False
- except ImportError:
- no_epics = True
- logging.error("Python Epics not found")
- def caget(pvname):
- """
- Wrapper for epics.caget to handle no_epics
- """
- if no_epics: return None
- try:
- return epics.caget(pvname)
- except:
- logging.error("Epics PV is not accessible (possible Timeout)")
- return None
- def caCheck(pvname):
- """
- check if PV is readable
- """
- try:
- if epics.caget(pvname) == None:
- return False
- else:
- return True
- except:
- return False
- class EpicsConfig(object):
- """
- need one instance at app startup
- is done by kcgGUI via
- from ..widgets import epics_widget
- epics_widget.epicsConfig = epics_widget.EpicsConfig()
- """
- def __init__(self):
- global no_epics
- if no_epics:
- logging.error("Epics package not found!")
- return
- self.loadConfig(False)
-
- os.environ["EPICS_BASE"] = self.getKey('epics_base_path') # config.epics_base_path
- try: # if import was successful, try to find libca and test for connectivity with epics_test_pv
- # sys.stderr = os.devnull # epics.ca.find_libca() prints a useless None to stderr if libca is not found
- epics.ca.find_libca()
- # sys.stderr = sys.__stderr__
- if epics.caget(self.getKey('epics_test_pv')) == None:
- no_epics = True
- epics_reachable = False
- logging.error("Epics is not accessible (possible Timeout)")
- except epics.ca.ChannelAccessException as e:
- if str(e) == "cannot find Epics CA DLL":
- no_epics = True
- logging.error("Epics CA DLL not found - check epics config")
- self.setLogger()
- def setLogger(self):
- if no_epics:
- return
- # first remove all epics entrys
- params = log.logger.predefined_parameters
- log.logger.predefined_parameters = []
- for item in params:
- if item[1][0] != epics.caget:
- log.logger.predefined_parameters.append(item)
- params = log.logger.parameter_functions
- log.logger.reset_parameters()
- for item in params:
- if item[0] != epics.caget:
- log.logger.register_parameter(item[2], item[0], item[1])
- # add epics entrys
- for entry in self.getKey('epics_log_entry_pvs'):#config.epics_log_entry_pvs:
- log.logger.predefined_parameters.append([entry[0], [epics.caget, entry[1]]])
- if entry[2] == "True":
- if caCheck(entry[1]): #only add PVs that are reachable
- log.logger.register_parameter(entry[0], epics.caget, entry[1])
- def loadConfig(self, updateLogger=True):
- epics_key = ["epics_log_entry_pvs", "epics_test_pv", "epics_base_path"]
- self.parser = configparser.ConfigParser()
- self.parser.optionxform = str
- if not os.path.isfile(config.config_path("epics.cfg")):
- with open(config.config_path("epics.cfg"), 'w') as f:
- self.parser['EPICS'] = {'epics_log_entry_pvs':'[("Beam Energy (GeV)", "A:SR:BeamInfo:01:Energy", "False", "True"),("Beam Current (mA)", "A:SR:BeamInfo:01:Current", "True", "True")]',
- 'epics_test_pv':'"A:SR:BeamInfo:01:Current"',
- 'epics_base_path':'"/opt/epics/base/"'
- }
- self.parser.write(f)
- # print("new Epics config")
-
- self.parser.read(config.config_path("epics.cfg"))
- #print("epics config loaded")
- #print(config.config_path("epics.cfg"))
-
- if updateLogger: self.setLogger()
- def saveConfig(self):
- with open(config.config_path("epics.cfg"), 'w+') as f:
- self.parser.write(f)
- self.setLogger()
- def getKey(self, key):
- return config.leval(self.parser.get('EPICS', key))
- def setKey(self,key, value):
- if value[0] != "'":
- value = "'" + value + "'"
- self.parser.set('EPICS', key, value)
- def getPVList(self):
- return self.getKey('epics_log_entry_pvs')
- def setPVList(self, pvList):
- self.parser.set('EPICS', 'epics_log_entry_pvs', self.generatePVKey(pvList))
- def addPV(self, name, pv, default, monitor=False):
- dat = self.getKey('epics_log_entry_pvs')
- dat.append((name,pv,default,monitor))
- self.parser.set('EPICS', 'epics_log_entry_pvs', self.generatePVKey(dat))
- def removePV(self, index):
- n = int(index)
- pvList = []
- for i, item in enumerate(self.getPVList()):
- if n != i: pvList.append((item[0], item[1], item[2], item[3]))
- self.setPVList(pvList)
-
- def generatePVKey(self, pvList):
- out = "[\n"
- i=0
- for item in pvList:
- if i: out = out + ",\n"
- out = out + "('" + str(item[0]) + "', '" + str(item[1]) + "', '" + str(item[2]) + "', '" + str(item[3]) +"')"
- i = i+1
- out = out + "\n]"
- return out
-
- # 8888888888 d8b 888 888d8b 888 888
- # 888 Y8P 888 o 888Y8P 888 888
- # 888 888 d8b 888 888 888
- # 8888888 88888b. 888 .d8888b.d8888b 888 d888b 888888 .d88888 .d88b. .d88b. 888888
- # 888 888 "88b888d88P" 88K 888d88888b888888d88" 888d88P"88bd8P Y8b888
- # 888 888 888888888 "Y8888b.88888P Y88888888888 888888 88888888888888
- # 888 888 d88P888Y88b. X888888P Y8888888Y88b 888Y88b 888Y8b. Y88b.
- # 888888888888888P" 888 "Y8888P 88888P'888P Y888888 "Y88888 "Y88888 "Y8888 "Y888
- # 888 888
- # 888 Y8b d88P
- # 888 "Y88P"
- #
- class EpicsWidget(kcgw.KCGWidgets):
- def __init__(self, unique_id, parent):
- super(EpicsWidget, self).__init__()
- self.id = unique_id
- self.par = parent
- self.monitorPVList = {}
- self.configList = []
- self.setWindowTitle("Epics Widget")
- #self.layout = QtGui.QVBoxLayout() #.QGridLayout()
- #self.setLayout(self.layout)
- self.layout = QtGui.QGridLayout()
- self.outerLayout = QtGui.QVBoxLayout()
- self.outerLayout.addLayout(self.layout)
- self.setLayout(self.outerLayout)
-
- if no_epics:
- self.layout.addWidget(self.createLabel('no Epics available'))
- return
-
- self.editInnerGroupBox = QtGui.QGroupBox("")
- self.editInnerGroupBoxLayout = QtGui.QGridLayout()
- self.editScrollArea = QtGui.QScrollArea()
- self.pvInputList = []
- self.editInnerGroupBoxLayout.addWidget(self.createLabel("Name"), 0,0)
- self.editInnerGroupBoxLayout.addWidget(self.createLabel("EPICS PV"), 0,1)
- self.editInnerGroupBoxLayout.addWidget(self.createLabel("in Log"), 0,2)
- self.editInnerGroupBoxLayout.addWidget(self.createLabel("Monitoring"), 0,3)
- self.editInnerGroupBoxLayout.addWidget(self.createLabel("Value"), 0,4)
- self.generateList(False)
-
- self.editInnerGroupBox.setLayout(self.editInnerGroupBoxLayout)
- self.editScrollArea.setWidget(self.editInnerGroupBox)
- self.editOuterGroupBox = QtGui.QGroupBox("Epics PV List")
- #self.editOuterGroupBox.clicked.connect(self.toggleEdit)
- self.editOuterGroupBoxLayout = QtGui.QGridLayout()
- self.editOuterGroupBoxLayout.addWidget(self.editScrollArea,0,0,1,5)
- self.readValues()
- self.button1 = self.createButton("refresh values", connect=self.readValues)
- self.button2 = self.createButton("apply changes", connect=self.apply)
- self.button3 = self.createButton("restore changes", connect=self.restore)
- self.button4 = self.createButton("apply + save", connect=self.save)
- self.button5 = self.createButton("load", connect=self.load)
- self.button6 = self.createButton("Edit Configfile", connect=self.editConfigs)
-
- self.editOuterGroupBoxLayout.addWidget(self.button1, 1,0)
- self.editOuterGroupBoxLayout.addWidget(self.button2, 1,1)
- self.editOuterGroupBoxLayout.addWidget(self.button3, 1,2)
- self.editOuterGroupBoxLayout.addWidget(self.button4, 1,3)
- self.editOuterGroupBoxLayout.addWidget(self.button5, 1,4)
- self.editOuterGroupBoxLayout.addWidget(self.button6, 2,0,1,5)
- self.editOuterGroupBox.setLayout(self.editOuterGroupBoxLayout)
-
- self.monitorLabel = self.createLabel("Monitor\n")
- self.layout.addWidget(self.monitorLabel)#,0,0)
- self.layout.addWidget(self.createCheckbox("Open PV List", connect=self.toggleEdit))#, 1,0)
- self.editOuterGroupBox.hide()
- self.layout.addWidget(self.editOuterGroupBox)#,2,0)
- # self.button6 = self.createButton("Edit Config-file", connect=self.editConfigs)
- #self.layout.addWidget(self.button6)
- self.pvList = []
- self.editHide = True
- # self.toggleEdit()
-
- self.startMonitor()
- # self.resize(self.monitorLabel.frameGeometry().width()+50, self.monitorLabel.frameGeometry().height()+10)
- self.outerLayout.addStretch(1)
- self.adjustSize()
-
- def closeEvent(self, event):
- global __widget_id__
- __widget_id__ = None
- if not no_epics:
- self.stopMonitor()
- del self.par.widgets[self.id]
- def generateList(self, updateMonitor=True):
- if updateMonitor:
- self.stopMonitor()
- if len(self.pvInputList):
- for item in self.pvInputList:
- for q in item:
- self.editInnerGroupBoxLayout.removeWidget(q)
- q.deleteLater()
- self.pvInputList = []
- for q in self.pvInputAdd:
- self.editInnerGroupBoxLayout.removeWidget(q)
- q.deleteLater()
-
- i = 0
- for item in epicsConfig.getKey('epics_log_entry_pvs'): #config.epics_log_entry_pvs:
- self.pvInputList.append([self.createInput(item[0]), self.createInput(item[1],width=300), self.createSwitch(item[2]=="True"),
- self.createSwitch(item[3]=="True"), self.createLabel("value"),
- self.createLabel(image=QtGui.QPixmap(config.icon_path('open-trash-can.png')), click=True, connect=partial(self.remove,i))
- ])
- self.editInnerGroupBoxLayout.addWidget(self.pvInputList[i][0], i+1, 0)
- self.editInnerGroupBoxLayout.addWidget(self.pvInputList[i][1], i+1, 1)
- self.editInnerGroupBoxLayout.addWidget(self.pvInputList[i][2], i+1, 2)
- self.editInnerGroupBoxLayout.addWidget(self.pvInputList[i][3], i+1, 3)
- self.editInnerGroupBoxLayout.addWidget(self.pvInputList[i][4], i+1, 4)
- self.editInnerGroupBoxLayout.addWidget(self.pvInputList[i][5], i+1, 5)
- i = i+1
- self.pvInputAdd = [ self.createInput('name?'), self.createInput("?",width=300),
- self.createSwitch(True), self.createSwitch(False),self.createButton("add",connect=self.add, tooltip="If PV name is wrong, it can take a bit")]
- self.editInnerGroupBoxLayout.addWidget(self.pvInputAdd[0], i+1, 0)
- self.editInnerGroupBoxLayout.addWidget(self.pvInputAdd[1], i+1, 1)
- self.editInnerGroupBoxLayout.addWidget(self.pvInputAdd[2], i+1, 2)
- self.editInnerGroupBoxLayout.addWidget(self.pvInputAdd[3], i+1, 3)
- self.editInnerGroupBoxLayout.addWidget(self.pvInputAdd[4], i+1, 4)
- if updateMonitor:
- self.startMonitor()
- def toggleEdit(self):
- if self.editHide:
- # self.resize(800, 800)
- self.editOuterGroupBox.show()
- self.restore()
- else:
- # self.resize(self.monitorLabel.frameGeometry().width()+10, self.monitorLabel.frameGeometry().height()+50)
- self.editOuterGroupBox.hide()
- self.editHide = not self.editHide
- #self.update()
-
- def editConfigs(self):
- if not len(self.configList):
- self.button6.hide()
- self.configBox = QtGui.QGroupBox("Edit Configfile")
- self.configBoxLayout = QtGui.QGridLayout()
- self.configList = []
- self.configBoxLayout.addWidget(self.createLabel("epics_test_pv"),0,0)
- self.configList.append(self.createInput(epicsConfig.getKey('epics_test_pv')))
- self.configBoxLayout.addWidget(self.configList[-1],0,1)
- self.configBoxLayout.addWidget(self.createLabel("epics_base_path"),1,0)
- self.configList.append(self.createInput(epicsConfig.getKey('epics_base_path')))
- self.configBoxLayout.addWidget(self.configList[-1],1,1)
- self.configBoxLayout.addWidget(self.createLabel("after saving a newstart is needed"),2,0,1,2)
-
- # self.configBoxLayout.addWidget(self.createButton("Save", connect=self.saveConfigs))
- # self.configBoxLayout.addWidget(self.createButton("Abort", connect=self.abortConfigs))
- self.configBox.setLayout(self.configBoxLayout)
- self.editOuterGroupBoxLayout.addWidget(self.configBox,2,0,1,5)
- pass
- def saveConfigs(self):
- if len(self.configList):
- epicsConfig.setKey('epics_test_pv', str(self.configList[0].text()))
- epicsConfig.setKey('epics_base_path', str(self.configList[1].text()))
-
- self.abortConfigs()
- pass
- def abortConfigs(self):
- if len(self.configList):
- while self.configBoxLayout.count() > 0:
- item = self.configBoxLayout.takeAt(0)
- if not item:
- continue
- w = item.widget()
- if w:
- w.deleteLater()
- self.configBox.deleteLater()
- self.configList = []
- self.button6.show()
- pass
- def readValues(self):
- for item in self.pvInputList:
- try:
- val = str(caget(str(item[1].text())))
- except:
- val = "ERROR"
- try:
- val = "{0:.5f}".format(float(val))
- except:
- pass
- item[4].setText(val)
- def add(self):
- if caCheck( self.pvInputAdd[1].text()):
- epicsConfig.addPV(self.pvInputAdd[0].text(), self.pvInputAdd[1].text(), self.pvInputAdd[2].state, self.pvInputAdd[3].state)
- self.generateList()
- self.readValues()
- else:
- pop = kcgw.PopupDialog('{} not readable'.format(self.pvInputAdd[1].text()), okonly=True)
- pop.exec_()
- pop.deleteLater()
- def remove(self, n):
- epicsConfig.removePV(n)
- self.generateList()
- self.readValues()
-
- def restore(self):
- self.generateList()
- self.readValues()
- self.abortConfigs()
- def load(self):
- epicsConfig.loadConfig()
- self.generateList()
- self.readValues()
- self.abortConfigs()
- def save(self):
- self.apply()
- self.saveConfigs()
- epicsConfig.saveConfig()
- def apply(self):
- pvList = []
- for item in self.pvInputList:
- pvList.append((item[0].text(), item[1].text(), item[2].state, item[3].state))
- epicsConfig.setPVList(pvList)
- self.generateList()
- self.readValues()
- def monitorPV(self, pvname=None, value=None, char_value=None, clearname="", **kw):
- if clearname != "":
- self.monitorPVList[str(pvname)] = [clearname, char_value]
- self.monitorPVList[str(pvname)][1] = char_value
- self.updateMonitor()
- def updateMonitor(self):
- string = "Monitor\n"
- for item in self.monitorPVList:
- string += str(self.monitorPVList[item][0]) + " = " + str(self.monitorPVList[item][1]) + "\n"
- self.monitorLabel.setText(string)
- def startMonitor(self):
- if len(self.pvList):
- self.stopMonitor()
- for item in epicsConfig.getKey('epics_log_entry_pvs'):
- if item[3] == 'True':
- try:
- if caget(item[1]) != None:
- self.pvList.append(item[1])
- self.monitorPV(item[1], char_value=caget(item[1]), clearname=item[0])
- epics.camonitor(item[1], callback=self.monitorPV)
-
- except:
- logging.info('epics add Failed', item[1])
- pass
- def stopMonitor(self):
- for item in self.pvList:
- try:
- epics.camonitor_clear(item)
- except:
- logging.info('epics remove Failed', item)
- pass
- self.pvList = []
- self.monitorPVList = {}
- def addEpicsWidget():
- global __widget_id__
- if __widget_id__:
- global_objects.get_global('area').widgets[__widget_id__].setFocus()
- else:
- nid = kcgw.idg.genid()
- __widget_id__ = nid
- w = EpicsWidget(nid, global_objects.get_global('area'))
- global_objects.get_global('area').newWidget(w, "Epics Widget", nid, widget_type=4)
- kcgw.register_widget(QtGui.QIcon(config.icon_path("EPICS_Logo.png")), "Epics Widget", addEpicsWidget, "Ctrl+E")
|