#!/usr/bin/env python import os import re import json import glob import shlex import collections import datetime import logging import random import argparse import subprocess import curses def read_info_file(path): result = {} with open(path) as f: for line in f: key, value = line.split('=') result[key] = value.strip() return result def read_edf_id19_header(filename): result = {} with open(filename) as f: data = f.read(1024) pattern = re.compile(r'(.*) = (.*) ;') for line in data.split('\n'): m = pattern.match(line) if m: result[m.group(1).strip()] = m.group(2).strip() return result def extract_motor_positions(id19_header): names = id19_header['motor_mne'].split(' ') values = id19_header['motor_pos'].split(' ') return {k: float(v) for (k, v) in zip(names, values)} def have_data(path): return os.path.exists(path) and glob.glob(os.path.join(path, '*.tif')) class Configuration(object): def __init__(self, source, destination): self.source = os.path.abspath(source) self.destination = os.path.abspath(destination) class LineList(object): def __init__(self, window): self.window = window self.height, self.width = window.getmaxyx() self.current = 0 self.lines = [] def redraw(self): for y in range(len(self.lines)): line, attr = self.lines[y] self.window.addnstr(y, 0, line, self.width, attr) self.window.clrtoeol() self.window.refresh() def add_line(self, s, attr=0): self.lines.append((s, attr)) if len(self.lines) > self.height - 1: self.lines = self.lines[1:] self.redraw() def update_last(self, line=None, attr=None): if not self.lines: return old_line, old_attr = self.lines[-1] self.lines[-1] = (line or old_line, attr or old_attr) self.redraw() class Colors(object): NORMAL = 1 SUCCESS = 2 WARN = 3 ERROR = 4 STATUS_BAR = 5 HIGHLIGHT = 6 def __init__(self): curses.init_pair(Colors.NORMAL, curses.COLOR_WHITE, curses.COLOR_BLACK) curses.init_pair(Colors.SUCCESS, curses.COLOR_GREEN, curses.COLOR_BLACK) curses.init_pair(Colors.WARN, curses.COLOR_YELLOW, curses.COLOR_BLACK) curses.init_pair(Colors.ERROR, curses.COLOR_RED, curses.COLOR_BLACK) curses.init_pair(Colors.STATUS_BAR, curses.COLOR_BLACK, curses.COLOR_YELLOW) curses.init_pair(Colors.HIGHLIGHT, curses.COLOR_WHITE, curses.COLOR_BLACK) self.attrs = { Colors.NORMAL: curses.A_NORMAL, Colors.SUCCESS: curses.A_NORMAL, Colors.WARN: curses.A_BOLD, Colors.ERROR: curses.A_BOLD, Colors.STATUS_BAR: curses.A_NORMAL, Colors.HIGHLIGHT: curses.A_BOLD, } def get(self, code): return curses.color_pair(code) | self.attrs[code] class StatusBar(object): def __init__(self, window, colors): self.window = window self.c = colors def update(self, s): self.window.clear() self.window.bkgd(' ', self.c.get(Colors.STATUS_BAR)) self.window.addstr(s, self.c.get(Colors.STATUS_BAR)) class LogList(LineList): def __init__(self, window, colors): self.line_list = LineList(window) self.c = colors self.log_file = open('cockpit.log', 'a') def _log_time(self, s, attr): timestamp = datetime.datetime.now().strftime('%H:%M:%S') log = '[{}] {}'.format(timestamp, s) self.line_list.add_line(log, attr) self.log_file.write(log) if not log.endswith('\n'): self.log_file.write('\n') self.log_file.flush() os.fsync(self.log_file.fileno()) def info(self, s): self._log_time(s, self.c.get(Colors.NORMAL)) def highlight(self, s): self._log_time(s, self.c.get(Colors.HIGHLIGHT)) def success(self, s): self._log_time(s, self.c.get(Colors.SUCCESS)) def warn(self, s): self._log_time(s, self.c.get(Colors.WARN)) def error(self, s): self._log_time(s, self.c.get(Colors.ERROR)) class CommandList(LineList): def __init__(self, window, colors): self.line_list = LineList(window) self.normal = colors.get(Colors.NORMAL) self.highlight = colors.get(Colors.HIGHLIGHT) self.current = '' def add_character(self, c): self.current += c self.line_list.update_last(line='> {}'.format(self.current)) def backspace(self): if self.current: self.current = self.current[:len(self.current) - 1] self.add_character('') def set_actions(self, actions): self.current = '' for a in actions.values(): self.line_list.add_line('[{}] {}'.format(a.key, a.note), self.highlight) class Action(object): def __init__(self, key, note, func, next_state): self.key = key self.note = note self.run = func self.next_state = next_state def __repr__(self): return ''.format(self.key) class StateMachine(object): START = 0 QUIT = 1 SYNC = 2 CLEAN = 3 OPTIMIZE = 5 QUICK_OPTIMIZE = 6 RECONSTRUCT = 7 def __init__(self): self.current = StateMachine.START self.transitions = { StateMachine.START: collections.OrderedDict(), StateMachine.CLEAN: collections.OrderedDict(), StateMachine.QUIT: collections.OrderedDict(), StateMachine.SYNC: collections.OrderedDict(), StateMachine.OPTIMIZE: collections.OrderedDict(), StateMachine.QUICK_OPTIMIZE: collections.OrderedDict(), StateMachine.RECONSTRUCT: collections.OrderedDict(), } def add_action(self, from_state, action): self.transitions[from_state][action.key] = action def transition(self, action): self.current = self.transitions[self.current][action.key].next_state @property def actions(self): return self.transitions[self.current] def is_valid_key(self, key): return key in (k for k in self.transitions[self.current].keys()) class Parameters(object): def __init__(self, axis, angle): self.axis = axis self.angle = angle class Dataset(object): def __init__(self, path): self.path = path self.prefix = os.path.basename(path) def __repr__(self): return ''.format(self.prefix) def join_path(self, child_path): return os.path.join(self.path, child_path) class Application(object): def __init__(self, config): self.config = config self.running = True self.datasets = [] self.known = set() self.current = None self.index = -1 def scan(self): new = [] for p in os.listdir(self.config.destination): path = os.path.join(self.config.destination, p) if os.path.isdir(path) and p not in self.known: new.append(Dataset(path)) self.known.add(p) if new: for d in new: self.log.highlight("Found new dataset {}".format(d.prefix)) self.datasets.append(d) def run_command(self, cmd): try: self.log.info("Executing `{}`".format(cmd)) p = subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE) for line in iter(p.stdout.readline, ''): self.log.info(line) for line in iter(p.stderr.readline, ''): self.log.error(line) while p.poll() is None: pass if p.returncode == 0: self.log.success("done") return True self.log.error("Command returned {}".format(p.returncode)) return False except Exception as e: self.log.error("{}".format(e)) return False @property def params_name(self): return '{}params.json'.format(self.current.prefix) def read_info(self): info_file = os.path.join(self.current.path, '{}.info'.format(self.current.prefix)) return read_info_file(info_file) def read_optimal_params(self, scale=1.0): with open(self.params_name) as f: opt = json.load(f) return Parameters(opt['x-center']['value'], opt['lamino-angle']['value'] * scale) def optimize(self, resize): slices_per_device = 100 half_range = 1.0 info = self.read_info() axis = (float(info['Col_end']) + 1) / 2.0 axis_step = 0.25 fname = os.path.join(self.current.path, '{}0000.edf'.format(self.current.prefix)) header = read_edf_id19_header(fname) motor_pos = extract_motor_positions(header) inclination_angle = motor_pos['rytot'] theta = 90.0 - inclination_angle angle_step = 0.025 angle_start = theta - half_range angle_stop = theta + half_range x_region = 960 y_region = 960 fc_path = '{path}/fc'.format(path=self.current.path) if resize: x_region /= 2 y_region /= 2 axis /= 2 axis_step /= 2 fc_path = '{path}/fc-small'.format(path=self.current.path) axis_start = axis - slices_per_device * axis_step axis_stop = axis + slices_per_device * axis_step self.log.info(" Using theta = {}, inclination angle = {}".format(theta, inclination_angle)) self.log.info(" Scanning lamino angle within [{}:{}:{}]".format(angle_start, angle_stop, angle_step)) self.log.info(" Scanning x axis within [{}:{}:{}]".format(axis_start, axis_stop, axis_step)) opt_params = ('--num-iterations 2' ' --axis-range={ax_start},{ax_stop},{ax_step}' ' --lamino-angle-range={an_start},{an_stop},{an_step}' ' --metric kurtosis --z-metric kurtosis' ' --tmp-output {path}/tmp' .format(ax_start=axis_start, ax_stop=axis_stop, ax_step=axis_step, an_start=angle_start, an_stop=angle_stop, an_step=angle_step, path=self.current.path)) params = ('--x-region="-{x_region},{x_region},1"' ' --y-region="-{y_region},{y_region},1"' ' --overall-angle -360' ' --pixel-size {pixel_size}e-6' ' --roll-angle 0' ' --slices-per-device 100' .format(pixel_size=info['PixelSize'], x_region=x_region, y_region=y_region)) cmd = ('optimize-parameters --verbose' ' {fc_path}' ' {opt_params}' ' --reco-params "{params}"' ' --params-filename {params_name}' .format(opt_params=opt_params, params=params, params_name=self.params_name, fc_path=fc_path)) return self.run_command(cmd) def update_statusbar(self): self.log.info("Current dataset: {}".format(self.current.prefix)) if self.current: self.status_bar.update("Current: {} [{}/{}]".format(self.current.prefix, self.index + 1, len(self.datasets))) def on_reconstruct(self): self.log.highlight("Reconstructing ...") path = self.current.join_path('fc') if not have_data(path): if not self.flat_correct(path): return False params = self.read_optimal_params() info = self.read_info() x_region = 960 y_region = 960 cmd = ('tofu lamino --verbose' ' --projections {prefix}/fc' ' --x-region="-{x_region},{x_region},1"' ' --y-region="-{y_region},{y_region},1"' ' --lamino-angle {lamino_angle}' ' --axis {x_axis},{y_axis}' ' --overall-angle -360' ' --pixel-size {pixel_size}e-6' ' --roll-angle 0' ' --slices-per-device 300' ' --output {prefix}/slices/slice' .format(pixel_size=info['PixelSize'], x_region=x_region, y_region=y_region, lamino_angle=params.angle, x_axis=params.axis, y_axis=float(info['Dim_2']) / 2, prefix=self.current.prefix)) return self.run_command(cmd) def on_quit(self): self.running = False return True def on_sync(self): self.log.highlight("Syncing data ...") cmd = 'bash sync.sh {} {}'.format(self.config.source, self.config.destination) return self.run_command(cmd) def on_clean(self): self.log.highlight("Cleaning {}".format(self.config.destination)) return True def flat_correct(self, output_path, append=''): self.log.highlight("Generate flat field corrected projections ...") try: info = self.read_info() path = self.current.path data = dict(path=path, prefix=self.current.prefix, num=info['TOMO_N'], step=1, output=output_path) cmd = ('tofu flatcorrect --verbose' ' --reduction-mode median' ' --projections "{path}/{prefix}*.edf"' ' --darks {path}/darkend0000.edf' ' --flats {path}/ref*_0000.edf' ' --flats2 {path}/ref*_{num}.edf' ' --output {output}/fc-%04i.tif' ' --number {num}' ' --step {step}' ' --absorptivity' ' --fix-nan-and-inf '.format(**data)) cmd += append return self.run_command(cmd) except Exception as e: self.log.error("Error: {}".format(e)) return False def on_quick_optimize(self): self.log.highlight("Quick optimization ...") path = self.current.join_path('fc-small') if not have_data(path): if not self.flat_correct(path, append='--resize 2'): return False try: if self.optimize(True): with open(self.params_name) as f: opt = json.load(f) opt['x-center']['value'] *= 2.0 with open(self.params_name, 'w') as f: json.dump(opt, f) params = self.read_optimal_params() self.log.highlight(" Optimal axis: {}".format(params.angle)) self.log.highlight(" Optimal center: {}".format(params.axis)) return True except Exception as e: self.log.error("Error: {}".format(e)) return False def on_optimize(self): self.log.highlight("Optimizing ...") path = self.current.join_path('fc') if not have_data(path): if not self.flat_correct(path): return False try: if self.optimize(False): params = self.read_optimal_params() self.log.highlight(" Optimal axis: {}".format(params.angle)) self.log.highlight(" Optimal center: {}".format(params.axis)) return True except Exception as e: self.log.error("Could not optimize: {}".format(e)) return False def on_next_dataset(self): self.index = (self.index + 1) % len(self.datasets) self.current = self.datasets[self.index] self.update_statusbar() def on_previous_dataset(self): self.index = (self.index - 1) % len(self.datasets) self.current = self.datasets[self.index] self.update_statusbar() def do_nothing(self): return True def _run(self, screen): curses.curs_set(False) height, width = screen.getmaxyx() colors = Colors() top_pane = screen.subwin(1, width, 0, 0) bottom_pane = screen.subwin(height - 1, width, 1, 0) left_pane = bottom_pane.subwin(height - 1, width / 4, 1, 0) right_pane = bottom_pane.subwin(height - 1, 3 * width / 4, 1, width / 4) self.status_bar = StatusBar(top_pane, colors) self.status_bar.update('Cockpit') cmd_window = CommandList(left_pane, colors) machine = StateMachine() quit = Action('e', 'Exit', self.on_quit, machine.QUIT) sync = Action('s', 'Sync', self.on_sync, machine.SYNC) clean = Action('c', 'Clean', self.on_clean, machine.CLEAN) quick_optimize = Action('q', 'Quick optimization', self.on_quick_optimize, machine.QUICK_OPTIMIZE) optimize = Action('o', 'Optimization', self.on_optimize, machine.OPTIMIZE) reconstruct = Action('r', 'Reconstruct', self.on_reconstruct, machine.RECONSTRUCT) next_dataset = Action('n', 'Next dataset', self.on_next_dataset, machine.START) previous_dataset = Action('p', 'Previous dataset', self.on_previous_dataset, machine.START) machine.add_action(machine.START, sync) machine.add_action(machine.START, quick_optimize) machine.add_action(machine.START, optimize) machine.add_action(machine.START, next_dataset) machine.add_action(machine.START, previous_dataset) machine.add_action(machine.START, quit) machine.add_action(machine.SYNC, quit) machine.add_action(machine.QUICK_OPTIMIZE, reconstruct) machine.add_action(machine.QUICK_OPTIMIZE, optimize) machine.add_action(machine.QUICK_OPTIMIZE, next_dataset) machine.add_action(machine.QUICK_OPTIMIZE, previous_dataset) machine.add_action(machine.QUICK_OPTIMIZE, quit) machine.add_action(machine.OPTIMIZE, quick_optimize) machine.add_action(machine.OPTIMIZE, reconstruct) machine.add_action(machine.OPTIMIZE, next_dataset) machine.add_action(machine.OPTIMIZE, previous_dataset) machine.add_action(machine.OPTIMIZE, quit) machine.add_action(machine.CLEAN, sync) machine.add_action(machine.CLEAN, quit) self.log = LogList(right_pane, colors) self.log.info('Source dir set to {}'.format(self.config.source)) self.log.info('Destination dir set to {}'.format(self.config.destination)) self.scan() if self.datasets: self.index = 0 self.current = self.datasets[0] if os.path.exists(self.params_name): machine.add_action(machine.START, reconstruct) self.update_statusbar() cmd_window.set_actions(machine.actions) while self.running: ci = screen.getch() cc = chr(ci) if ci < 256 else '' if machine.is_valid_key(cc): cmd_window.add_character(cc) action = machine.actions[cc] if action.run(): machine.transition(action) cmd_window.set_actions(machine.actions) elif ci == curses.KEY_BACKSPACE: cmd_window.backspace() self.scan() screen.refresh() def run(self): curses.wrapper(self._run) if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument('--source', help='Source directory', default='.', metavar='DIR') parser.add_argument('--destination', help='Destination directory', default='.', metavar='DIR') args = parser.parse_args() config = Configuration(args.source, args.destination) app = Application(config) app.run()