123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101 |
- import os
- import math
- import logging
- import numpy as np
- from heb import DataSet, BUNCHES_PER_TURN, HEADER_SIZE_BYTES
- def is_data_consistent(dataset):
- bunch_numbers = dataset.array[:, -1]
- expected = np.tile(np.arange(0, BUNCHES_PER_TURN), bunch_numbers.shape[0] / BUNCHES_PER_TURN)
- wrong_indices = np.argwhere(bunch_numbers != expected)
- if wrong_indices.shape[0] > 0:
- first_error = bunch_numbers.shape[0] - wrong_indices.shape[0]
- logging.info('Data inconsistent at offset %i'%first_error)
- np.savetxt('wrongdump', dataset.array[first_error - 3: first_error + 3])
- filling = bunch_numbers[wrong_indices[0][0]:]
- expected_filling = np.tile([222, 223], filling.shape[0] / 2)
- wrong_filling_indices = np.argwhere(filling != expected_filling)
- if wrong_filling_indices.shape[0] > 2: # Some times filling does not start immediately... Why? I have NO IDEA!
- return False
- else:
- return True
- else:
- return True
- def _cached_exist(filename):
- return os.path.exists(os.path.abspath('{}.npy'.format(filename)))
- def decode_data(data):
- data = data[np.where(data != 0x01234567)]
- # Make sure we read multiple of fours
- data = data[:4 * (math.floor(data.size / 4))]
- bunch_low = data & 0xfff
- bunch_high = np.right_shift(data, 12) & 0xfff
- bunch_number = np.right_shift(data, 24) & 0xfff
- bunch_low = bunch_low.reshape(-1, 4)
- bunch_high = bunch_high.reshape(-1, 4)
- result = np.empty((bunch_low.shape[0] + bunch_high.shape[0], 5), dtype=np.float)
- result[0::2,:4] = bunch_low
- result[1::2,:4] = bunch_high
- result[0::2, 4] = bunch_number[::4]
- result[1::2, 4] = bunch_number[::4] + 1
- result = result[:184 * (math.floor(result.shape[0] / 184)), :]
- return result
- def read_from_file(filename, force=False, header=False, cache=False):
- if _cached_exist(filename) and not force:
- cached_filename = '{}.npy'.format(filename)
- logging.info("Read pre-computed data from {}".format(cached_filename))
- return DataSet(np.load(cached_filename), filename)
- with open(filename, 'rb') as f:
- logging.info("Read data from {}".format(filename))
- data = np.fromfile(f, dtype=np.uint32)
- #If header is sent with the data, truncate it
- if header:
- # We read words of 4 bytes each
- splice_words = HEADER_SIZE_BYTES / 4
- data = data[splice_words:]
- result = decode_data(data)
- dataset = DataSet(result, filename)
- if cache:
- logging.info('Saving pre-computed data')
- np.save('{}.npy'.format(filename), result)
- return dataset
-
-
- def read_from_string(raw_data, force=False, header=False, cache=False, cache_filename="_heb_data_cache"):
- if _cached_exist(cache_filename) and not force:
- cache_file = '{}.npy'.format(cache_filename)
- logging.info("Read pre-computed data from {}".format(cache_file))
- return DataSet(np.load(cache_file), cache_filename)
- logging.info("Read data directly from device.")
- logging.info("Read %i bytes of data" % len(raw_data))
- data = np.fromstring(raw_data, dtype=np.uint32)
- #If header is sent with the data, truncate it
- if header:
- # We read words of 4 bytes each
- splice_words = HEADER_SIZE_BYTES / 4
- data = data[splice_words:]
- result = decode_data(data)
- dataset = DataSet(result, "HEB Live Data")
- if cache:
- logging.info('Saving pre-computed data')
- np.save('{}.npy'.format(cache_filename), result)
- return dataset
|