123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339 |
- import logging
- import time
- import os
- import csv
- import math
- import numpy as np
- import h5py
- import traceback
- try:
- #Compatibility Python3 and Python 2
- from .CalibrationHandle import theCalibration
- from .constants import KARA
- except:
- from CalibrationHandle import theCalibration
- from constants import KARA
- """
- ScanInfo has these fields:
- Nr
- Info
- Threshold
- ADC to T/H
- T/H
- 25ps
- fine
- Step4
- Mode
- Fast Mode
- Calibration
- """
- class TimeScan(object):
- """
- Class to read Timescan files.
- ! Be aware: parameter ADC is in range of 0 to 7
- """
- def __init__(self, filename=None, calibrationFile=""):
- """
- Initialise the TimeScan
- :param filename: string of the rawfile to open. If available it reads automatically the meta information from the Logfile
- :param calibrationFile: define the to use calibration. by default it looks for "calibration.hdf" in the same dic as the filename
- :return: -
- """
- self.fileName = filename
- self.scanData = None
- self.adcNumber = 0
- self.scanInfo = None
- self.appending = False
- self.scanx = None
- self.scany = None
- self.scane = None
- self.calibrateY = False
- self.calibrateX = False
- self.multibunch = False
- self.bunch = 0
- self.multibunchData = None
- self.multibunchStd = None
-
- self.calibId = "current"
- if calibrationFile != "":
- self.calibId = theCalibration.openFile(calibrationFile)
- else:
- if self.fileName is not None:
- path = os.path.dirname(os.path.dirname(self.fileName))
- self.calibId = theCalibration.openFile(os.path.join(path,'calibration.hdf'))
- else:
- print('calibrationFile not defined - no calibration possible')
- if self.fileName is not None:
- if os.path.isfile(self.fileName):
- self._loadTimeScan()
- self.scanInfo = self._loadScanInfo()
- else:
- print('File does not exist', self.file)
- ###################################################################################################
-
- def getScanInfo(self):
- """
- :return the logfile entry as a dict
- """
- return self.scanInfo
- def getScanOverTime(self, adc, calibrateX=True, calibrateY=True, force=False, bunch=0):
- """
- get The Timescan with x-Axis in ps
- :param adc: Channel to be returned
- :return scanx, scany, scane
- """
- while self.appending:
- pass
- if (self.scanx is None) or (self.calibrateX != calibrateX) or (self.calibrateY != calibrateY) or (self.bunch != bunch) or force:
- self._prepareScan(calibrateX, calibrateY, bunch, force)
- return self.scanx[adc], self.scany[adc], self.scane[adc]
- def getScanOverDelay(self, adc, bunch=0):
- """
- Returns the Scan where the returned x values are the Delays
- :return scanx, scany, scane; where scanx is a array of arrays containing [330ps, 25ps, 3ps, 2. 25ps]
- """
- while self.appending:
- pass
- dat = np.array(self.scanData[adc])
-
- scanx = np.array([[val[2], val[3], val[4], val[6]] for val in dat])
- if self.multibunch:
- scany = np.array(self.multibunchData[adc])[:,bunch]
- scane = np.array(self.multibunchStd[adc])[:,bunch]
- else:
- scany = dat.T[0]
- scane = dat.T[1]
- return scanx, scany, scane
- def _loadTimeScan(self):
- data = [[],[],[],[], [],[],[],[]]
- multibunchData = [[],[],[],[], [],[],[],[]]
- multibunchStd = [[],[],[],[], [],[],[],[]]
- with open(self.fileName) as f:
- reader = csv.reader(f, delimiter=';')
- adc = -1
- calib = False
- for row in reader:
- #print(row)
- if row == []:
- pass
- elif '#ADC' in row[0]:
- adc += 1
- elif '#' in row[0]:
- pass
- else:
- if len(row) < 4:
- data[adc].append([float(row[2]), float(0), int(row[0]), int(0), int(row[1]),0,4])
- elif len(row) <= 5:
- data[adc].append([float(row[3]), float(row[4]), int(row[0]), int(row[1]), int(row[2]),0,4])
- else:
- if len(row[3]) > 10:
- y = np.fromstring(row[3], sep=',')
- e = np.fromstring(row[4], sep=',')
- multibunchData[adc].append(y)
- multibunchStd[adc].append(e)
- data[adc].append([y[0], e[0], int(row[0]), int(row[1]), int(row[2]), int(row[5]), int(row[6])])
- self.multibunch = True
- else:
- data[adc].append([float(row[3]), float(row[4]), int(row[0]), int(row[1]), int(row[2]), int(row[5]), int(row[6])])
- calib=True
- self.scanData = np.array(data[:adc+1])
- self.adcNumber = adc+1
- if self.multibunch is not None:
- self.multibunchData = np.array(multibunchData)
- self.multibunchStd = np.array(multibunchStd)
- print('adcNumber', self.adcNumber)
- def _loadScanInfo(self):
- file = os.path.join(os.path.dirname(self.fileName), 'scan.info')
- basename = os.path.basename(self.fileName)
- with open(file, 'r') as f:
- scaninfo = f.read().split('Scan\n')[1:]
- for item in scaninfo:
- tmp = {}
- tmp['File'] = ""
- for v in item.split('\n'):
- if '#' in v:
- continue
- t = v.split(':')
- if len(t)>1:
- if 'T/H' in str(t[0]) and 'ADC' not in str(t[0]):
- tmp['T/H'] = np.array(t[1].split('-'), dtype=np.int)
- elif 'fine' in str(t[0]):
- tmp['fine'] = np.array(t[1].split('-'), dtype=np.int)
- elif '25ps' in str(t[0]):
- tmp['25ps'] = np.array(t[1].split('-'), dtype=np.int)
- else:
- tmp[str(t[0])] = t[1].strip(' ')
- if tmp['File'] == basename:
- return tmp
- print('no ScanInfo')
- return None
- def _prepareScan(self, calibrateX, calibrateY, bunch=0, force=True):
- #print('_prepareScan')
- if (self.calibrateX != calibrateX) or force or (self.scanx is None):
- self.calibrateX = calibrateX
- self.scanx = []
- for adc in range(self.adcNumber):
- dat = np.array(self.scanData[adc])
- if calibrateX:
- scanx = np.array([theCalibration.calibrateX(adc, val[2], val[3], val[4], id=self.calibId, c25b=val[6]) for val in dat])
- else:
- scanx = np.array([val[2]*330 + val[3]*25 + val[4]*3 + (4-val[6])*25 for val in dat])*1e-12
- self.scanx.append(scanx)
-
- if (self.calibrateY != calibrateY) or force or (self.scany is None) or (self.bunch != bunch):
- self.calibrateY = calibrateY
- self.scany = []
- self.scane = []
- self.bunch = bunch
- for adc in range(self.adcNumber):
- dat = np.array(self.scanData[adc])
-
- if self.multibunch:
- scany = np.array(self.multibunchData[adc])[:,bunch]
- if calibrateY:
- scany = theCalibration.calibrateY(scany, adc, id=self.calibId)
-
- scane = np.array(self.multibunchStd[adc])[:,bunch]
- else:
- scany = dat.T[0]
- if calibrateY:
- scany = theCalibration.calibrateY(scany, adc, id=self.calibId)
- scane = dat.T[1]
- self.scany.append(scany)
- self.scane.append(scane)
- #print('_prepareScan --')
- #scanx = scanx[np.where(data[0,:,6] == 4)]
- ###################################################################################################
- # .d8888b. 888
- # d88P Y88b 888
- # 888 888 888
- # 888 .d88b. 88888b. .d88b. 888d888 8888b. 888888 .d88b.
- # 888 88888 d8P Y8b 888 "88b d8P Y8b 888P" "88b 888 d8P Y8b
- # 888 888 88888888 888 888 88888888 888 .d888888 888 88888888
- # Y88b d88P Y8b. 888 888 Y8b. 888 888 888 Y88b. Y8b.
- # "Y8888P88 "Y8888 888 888 "Y8888 888 "Y888888 "Y888 "Y8888
- #
- #
- #
- def prepare(self, adcNumber, multibunch):
- self.scanData = [[],[],[],[], [],[],[],[]]
- self.adcNumber = adcNumber
- self.scanInfo = None
- self.scanx = None
- self.scany = None
- self.scane = None
- self.calibrateY = False
- self.calibrateX = False
- self.multibunch = multibunch
- self.bunch = 0
- self.multibunchData = [[],[],[],[], [],[],[],[]]
- self.multibunchStd = [[],[],[],[], [],[],[],[]]
-
- self.calibId = "current"
- pass
- def appendData(self, value, error, adc, c330, c25, c3, c25b, c330b=0):
- self.appending = True
- if self.multibunch:
- self.scanData[adc].append([value[0], error[0], c330, c25, c3, c330b, c25b])
-
- self.multibunchData[adc].append(value)
- self.multibunchStd[adc].append(error)
- else:
- self.scanData[adc].append([value, error, c330, c25, c3, c330b, c25b])
- self.appending = False
- def saveScan(self, filename, index, info, threshold,
- c_min, c_max, c25_min, c25_max,
- f_min, f_max, stop, c_step, c25_step, f_step,
- step4, mode, fastMode, calibrationScan
- ):
- fileinfo = os.path.join(os.path.dirname(filename), 'scan.info')
- f = open(filename, 'w')
- self.fileName = filename
- path = os.path.dirname(os.path.dirname(self.fileName))
- self.calibId = theCalibration.openFile(os.path.join(path,'calibration.hdf'))
- if fastMode:
- f.write('#FastMode\n')
- for adc in range(self.adcNumber):
- f.write("#ADC_%s\n" % adc)
- f.write("#330ps;25ps;3ps;mean;std;330psB;25psB\n")
-
- for i, item in enumerate(self.scanData[adc]):
- if self.multibunch:
- f.write("{:02d};{:02d};{:02d};{};{};{:02d};{:02d}\n".format(item[2], item[3], item[4],
- np.array2string(np.array(self.multibunchData[adc][i]), precision=2, separator=',', threshold=2000, max_line_width=10000000)[1:-1],
- np.array2string(np.array(self.multibunchStd[adc][i]), precision=2, separator=',', threshold=2000, max_line_width=10000000)[1:-1],
- item[5], item[6]))
- else:
- f.write("{:02d};{:02d};{:02d};{:0.3f};{:0.3f};{:02d};{:02d}\n".format(item[2], item[3], item[4], item[0], item[1], item[5], item[6]))
-
- f.write('\n')
- f.close()
-
- f = open(fileinfo, 'a')
- f.write('Scan\n')
- f.write('Nr: {}\n'.format(index))
- f.write('File: {}\n'.format(os.path.basename(filename)))
- f.write('Info: {}\n'.format(info))
- f.write('Threshold: {}\n'.format(threshold))
- #f.write('ADC to T/H: {} + {}\n'.format(bif.bk_get_config(self.board_id, 'delay_330_adc'), bif.bk_get_config(self.board_id, 'delay_25_adc')))
-
- f.write('Range\n')
- f.write('\t T/H: {}-{}\n'.format(c_min, c_max))
- f.write('\t 25ps: {}-{}\n'.format(c25_min, c25_max))
- f.write('\t fine: {}-{}\n'.format(f_min, f_max))
- f.write('Step4: {}\n'.format(step4))
- if stop:
- f.write('Scan Stopped at: {},{},{}\n'.format(c_step, c25_step, f_step))
- f.write('Mode: {}\n'.format(mode))
- f.write('Fast Mode: {}\n'.format(fastMode))
- f.write('Calibration: {}\n'.format(calibrationScan))
- f.write('\n')
- f.close()
- ###################################################################################################
- ###################################################################################################
|