import logging import time import os import csv import math import numpy as np import h5py import traceback try: #Compatibility Python3 and Python 2 from .CalibrationHandle import theCalibration from .constants import KARA except: from CalibrationHandle import theCalibration from constants import KARA """ ScanInfo has these fields: Nr Info Threshold ADC to T/H T/H 25ps fine Step4 Mode Fast Mode Calibration """ class TimeScan(object): """ Class to read Timescan files. ! Be aware: parameter ADC is in range of 0 to 7 """ def __init__(self, filename=None, calibrationFile=""): """ Initialise the TimeScan :param filename: string of the rawfile to open. If available it reads automatically the meta information from the Logfile :param calibrationFile: define the to use calibration. by default it looks for "calibration.hdf" in the same dic as the filename :return: - """ self.fileName = filename self.scanData = None self.adcNumber = 0 self.scanInfo = None self.appending = False self.scanx = None self.scany = None self.scane = None self.calibrateY = False self.calibrateX = False self.multibunch = False self.bunch = 0 self.multibunchData = None self.multibunchStd = None self.calibId = "current" if calibrationFile != "": self.calibId = theCalibration.openFile(calibrationFile) else: if self.fileName is not None: path = os.path.dirname(os.path.dirname(self.fileName)) self.calibId = theCalibration.openFile(os.path.join(path,'calibration.hdf')) else: print('calibrationFile not defined - no calibration possible') if self.fileName is not None: if os.path.isfile(self.fileName): self._loadTimeScan() self.scanInfo = self._loadScanInfo() else: print('File does not exist', self.file) ################################################################################################### def getScanInfo(self): """ :return the logfile entry as a dict """ return self.scanInfo def getScanOverTime(self, adc, calibrateX=True, calibrateY=True, force=False, bunch=0): """ get The Timescan with x-Axis in ps :param adc: Channel to be returned :return scanx, scany, scane """ while self.appending: pass if (self.scanx is None) or (self.calibrateX != calibrateX) or (self.calibrateY != calibrateY) or (self.bunch != bunch) or force: self._prepareScan(calibrateX, calibrateY, bunch, force) return self.scanx[adc], self.scany[adc], self.scane[adc] def getScanOverDelay(self, adc, bunch=0): """ Returns the Scan where the returned x values are the Delays :return scanx, scany, scane; where scanx is a array of arrays containing [330ps, 25ps, 3ps, 2. 25ps] """ while self.appending: pass dat = np.array(self.scanData[adc]) scanx = np.array([[val[2], val[3], val[4], val[6]] for val in dat]) if self.multibunch: scany = np.array(self.multibunchData[adc])[:,bunch] scane = np.array(self.multibunchStd[adc])[:,bunch] else: scany = dat.T[0] scane = dat.T[1] return scanx, scany, scane def _loadTimeScan(self): data = [[],[],[],[], [],[],[],[]] multibunchData = [[],[],[],[], [],[],[],[]] multibunchStd = [[],[],[],[], [],[],[],[]] with open(self.fileName) as f: reader = csv.reader(f, delimiter=';') adc = -1 calib = False for row in reader: #print(row) if row == []: pass elif '#ADC' in row[0]: adc += 1 elif '#' in row[0]: pass else: if len(row) < 4: data[adc].append([float(row[2]), float(0), int(row[0]), int(0), int(row[1]),0,4]) elif len(row) <= 5: data[adc].append([float(row[3]), float(row[4]), int(row[0]), int(row[1]), int(row[2]),0,4]) else: if len(row[3]) > 10: y = np.fromstring(row[3], sep=',') e = np.fromstring(row[4], sep=',') multibunchData[adc].append(y) multibunchStd[adc].append(e) data[adc].append([y[0], e[0], int(row[0]), int(row[1]), int(row[2]), int(row[5]), int(row[6])]) self.multibunch = True else: data[adc].append([float(row[3]), float(row[4]), int(row[0]), int(row[1]), int(row[2]), int(row[5]), int(row[6])]) calib=True self.scanData = np.array(data[:adc+1]) self.adcNumber = adc+1 if self.multibunch is not None: self.multibunchData = np.array(multibunchData) self.multibunchStd = np.array(multibunchStd) print('adcNumber', self.adcNumber) def _loadScanInfo(self): file = os.path.join(os.path.dirname(self.fileName), 'scan.info') basename = os.path.basename(self.fileName) with open(file, 'r') as f: scaninfo = f.read().split('Scan\n')[1:] for item in scaninfo: tmp = {} tmp['File'] = "" for v in item.split('\n'): if '#' in v: continue t = v.split(':') if len(t)>1: if 'T/H' in str(t[0]) and 'ADC' not in str(t[0]): tmp['T/H'] = np.array(t[1].split('-'), dtype=np.int) elif 'fine' in str(t[0]): tmp['fine'] = np.array(t[1].split('-'), dtype=np.int) elif '25ps' in str(t[0]): tmp['25ps'] = np.array(t[1].split('-'), dtype=np.int) else: tmp[str(t[0])] = t[1].strip(' ') if tmp['File'] == basename: return tmp print('no ScanInfo') return None def _prepareScan(self, calibrateX, calibrateY, bunch=0, force=True): #print('_prepareScan') if (self.calibrateX != calibrateX) or force or (self.scanx is None): self.calibrateX = calibrateX self.scanx = [] for adc in range(self.adcNumber): dat = np.array(self.scanData[adc]) if calibrateX: scanx = np.array([theCalibration.calibrateX(adc, val[2], val[3], val[4], id=self.calibId, c25b=val[6]) for val in dat]) else: scanx = np.array([val[2]*330 + val[3]*25 + val[4]*3 + (4-val[6])*25 for val in dat])*1e-12 self.scanx.append(scanx) if (self.calibrateY != calibrateY) or force or (self.scany is None) or (self.bunch != bunch): self.calibrateY = calibrateY self.scany = [] self.scane = [] self.bunch = bunch for adc in range(self.adcNumber): dat = np.array(self.scanData[adc]) if self.multibunch: scany = np.array(self.multibunchData[adc])[:,bunch] if calibrateY: scany = theCalibration.calibrateY(scany, adc, id=self.calibId) scane = np.array(self.multibunchStd[adc])[:,bunch] else: scany = dat.T[0] if calibrateY: scany = theCalibration.calibrateY(scany, adc, id=self.calibId) scane = dat.T[1] self.scany.append(scany) self.scane.append(scane) #print('_prepareScan --') #scanx = scanx[np.where(data[0,:,6] == 4)] ################################################################################################### # .d8888b. 888 # d88P Y88b 888 # 888 888 888 # 888 .d88b. 88888b. .d88b. 888d888 8888b. 888888 .d88b. # 888 88888 d8P Y8b 888 "88b d8P Y8b 888P" "88b 888 d8P Y8b # 888 888 88888888 888 888 88888888 888 .d888888 888 88888888 # Y88b d88P Y8b. 888 888 Y8b. 888 888 888 Y88b. Y8b. # "Y8888P88 "Y8888 888 888 "Y8888 888 "Y888888 "Y888 "Y8888 # # # def prepare(self, adcNumber, multibunch): self.scanData = [[],[],[],[], [],[],[],[]] self.adcNumber = adcNumber self.scanInfo = None self.scanx = None self.scany = None self.scane = None self.calibrateY = False self.calibrateX = False self.multibunch = multibunch self.bunch = 0 self.multibunchData = [[],[],[],[], [],[],[],[]] self.multibunchStd = [[],[],[],[], [],[],[],[]] self.calibId = "current" pass def appendData(self, value, error, adc, c330, c25, c3, c25b, c330b=0): self.appending = True if self.multibunch: self.scanData[adc].append([value[0], error[0], c330, c25, c3, c330b, c25b]) self.multibunchData[adc].append(value) self.multibunchStd[adc].append(error) else: self.scanData[adc].append([value, error, c330, c25, c3, c330b, c25b]) self.appending = False def saveScan(self, filename, index, info, threshold, c_min, c_max, c25_min, c25_max, f_min, f_max, stop, c_step, c25_step, f_step, step4, mode, fastMode, calibrationScan ): fileinfo = os.path.join(os.path.dirname(filename), 'scan.info') f = open(filename, 'w') self.fileName = filename path = os.path.dirname(os.path.dirname(self.fileName)) self.calibId = theCalibration.openFile(os.path.join(path,'calibration.hdf')) if fastMode: f.write('#FastMode\n') for adc in range(self.adcNumber): f.write("#ADC_%s\n" % adc) f.write("#330ps;25ps;3ps;mean;std;330psB;25psB\n") for i, item in enumerate(self.scanData[adc]): if self.multibunch: f.write("{:02d};{:02d};{:02d};{};{};{:02d};{:02d}\n".format(item[2], item[3], item[4], np.array2string(np.array(self.multibunchData[adc][i]), precision=2, separator=',', threshold=2000, max_line_width=10000000)[1:-1], np.array2string(np.array(self.multibunchStd[adc][i]), precision=2, separator=',', threshold=2000, max_line_width=10000000)[1:-1], item[5], item[6])) else: f.write("{:02d};{:02d};{:02d};{:0.3f};{:0.3f};{:02d};{:02d}\n".format(item[2], item[3], item[4], item[0], item[1], item[5], item[6])) f.write('\n') f.close() f = open(fileinfo, 'a') f.write('Scan\n') f.write('Nr: {}\n'.format(index)) f.write('File: {}\n'.format(os.path.basename(filename))) f.write('Info: {}\n'.format(info)) f.write('Threshold: {}\n'.format(threshold)) #f.write('ADC to T/H: {} + {}\n'.format(bif.bk_get_config(self.board_id, 'delay_330_adc'), bif.bk_get_config(self.board_id, 'delay_25_adc'))) f.write('Range\n') f.write('\t T/H: {}-{}\n'.format(c_min, c_max)) f.write('\t 25ps: {}-{}\n'.format(c25_min, c25_max)) f.write('\t fine: {}-{}\n'.format(f_min, f_max)) f.write('Step4: {}\n'.format(step4)) if stop: f.write('Scan Stopped at: {},{},{}\n'.format(c_step, c25_step, f_step)) f.write('Mode: {}\n'.format(mode)) f.write('Fast Mode: {}\n'.format(fastMode)) f.write('Calibration: {}\n'.format(calibrationScan)) f.write('\n') f.close() ################################################################################################### ###################################################################################################