123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446 |
- #!/usr/bin/env python
- import os
- import re
- import json
- import shlex
- import collections
- import datetime
- import logging
- import random
- import argparse
- import subprocess
- import curses
- def read_info_file(path):
- result = {}
- with open(path) as f:
- for line in f:
- key, value = line.split('=')
- result[key] = value.strip()
- return result
- def read_edf_id19_header(filename):
- result = {}
- with open(filename) as f:
- data = f.read(1024)
- pattern = re.compile(r'(.*) = (.*) ;')
- for line in data.split('\n'):
- m = pattern.match(line)
- if m:
- result[m.group(1).strip()] = m.group(2).strip()
- return result
- def extract_motor_positions(id19_header):
- names = id19_header['motor_mne'].split(' ')
- values = id19_header['motor_pos'].split(' ')
- return {k: float(v) for (k, v) in zip(names, values)}
- class Configuration(object):
- def __init__(self, source, destination):
- self.source = os.path.abspath(source)
- self.destination = os.path.abspath(destination)
- class LineList(object):
- def __init__(self, window):
- self.window = window
- self.height, self.width = window.getmaxyx()
- self.current = 0
- self.lines = []
- def redraw(self):
- for y in range(len(self.lines)):
- line, attr = self.lines[y]
- self.window.addnstr(y, 0, line, self.width, attr)
- self.window.clrtoeol()
- self.window.refresh()
- def add_line(self, s, attr=0):
- self.lines.append((s, attr))
- if len(self.lines) > self.height - 1:
- self.lines = self.lines[1:]
- self.redraw()
- def update_last(self, line=None, attr=None):
- if not self.lines:
- return
- old_line, old_attr = self.lines[-1]
- self.lines[-1] = (line or old_line, attr or old_attr)
- self.redraw()
- class Colors(object):
- NORMAL = 1
- SUCCESS = 2
- WARN = 3
- ERROR = 4
- STATUS_BAR = 5
- HIGHLIGHT = 6
- def __init__(self):
- curses.init_pair(Colors.NORMAL, curses.COLOR_WHITE, curses.COLOR_BLACK)
- curses.init_pair(Colors.SUCCESS, curses.COLOR_GREEN, curses.COLOR_BLACK)
- curses.init_pair(Colors.WARN, curses.COLOR_YELLOW, curses.COLOR_BLACK)
- curses.init_pair(Colors.ERROR, curses.COLOR_RED, curses.COLOR_BLACK)
- curses.init_pair(Colors.STATUS_BAR, curses.COLOR_BLACK, curses.COLOR_YELLOW)
- curses.init_pair(Colors.HIGHLIGHT, curses.COLOR_WHITE, curses.COLOR_BLACK)
- self.attrs = {
- Colors.NORMAL: curses.A_NORMAL,
- Colors.SUCCESS: curses.A_NORMAL,
- Colors.WARN: curses.A_BOLD,
- Colors.ERROR: curses.A_BOLD,
- Colors.STATUS_BAR: curses.A_NORMAL,
- Colors.HIGHLIGHT: curses.A_BOLD,
- }
- def get(self, code):
- return curses.color_pair(code) | self.attrs[code]
- class StatusBar(object):
- def __init__(self, window, colors):
- self.window = window
- self.c = colors
- def update(self, s):
- self.window.bkgd(' ', self.c.get(Colors.STATUS_BAR))
- self.window.addstr(s, self.c.get(Colors.STATUS_BAR))
- class LogList(LineList):
- def __init__(self, window, colors):
- self.line_list = LineList(window)
- self.c = colors
- self.log_file = open('cockpit.log', 'a')
- def _log_time(self, s, attr):
- timestamp = datetime.datetime.now().strftime('%H:%M:%S')
- log = '[{}] {}'.format(timestamp, s)
- self.line_list.add_line(log, attr)
- self.log_file.write(log)
- if not log.endswith('\n'):
- self.log_file.write('\n')
- self.log_file.flush()
- os.fsync(self.log_file.fileno())
- def info(self, s):
- self._log_time(s, self.c.get(Colors.NORMAL))
- def highlight(self, s):
- self._log_time(s, self.c.get(Colors.HIGHLIGHT))
- def success(self, s):
- self._log_time(s, self.c.get(Colors.SUCCESS))
- def warn(self, s):
- self._log_time(s, self.c.get(Colors.WARN))
- def error(self, s):
- self._log_time(s, self.c.get(Colors.ERROR))
- class CommandList(LineList):
- def __init__(self, window, colors):
- self.line_list = LineList(window)
- self.normal = colors.get(Colors.NORMAL)
- self.highlight = colors.get(Colors.HIGHLIGHT)
- self.current = ''
- def add_character(self, c):
- self.current += c
- self.line_list.update_last(line='> {}'.format(self.current))
- def backspace(self):
- if self.current:
- self.current = self.current[:len(self.current) - 1]
- self.add_character('')
- def set_actions(self, actions):
- self.line_list.update_last(attr=self.normal)
- self.current = ''
- message = ' | '.join(('[{}] {}'.format(a.key, a.note) for a in actions.values()))
- self.line_list.add_line(message, self.highlight)
- class Action(object):
- def __init__(self, key, note, func, next_state):
- self.key = key
- self.note = note
- self.run = func
- self.next_state = next_state
- def __repr__(self):
- return '<Action:key={}>'.format(self.key)
- class StateMachine(object):
- START = 0
- QUIT = 1
- SYNC = 2
- CLEAN = 3
- FLATCORRECT = 4
- OPTIMIZE = 5
- def __init__(self):
- self.current = StateMachine.START
- self.transitions = {
- StateMachine.START: collections.OrderedDict(),
- StateMachine.CLEAN: collections.OrderedDict(),
- StateMachine.FLATCORRECT: collections.OrderedDict(),
- StateMachine.QUIT: collections.OrderedDict(),
- StateMachine.SYNC: collections.OrderedDict(),
- StateMachine.OPTIMIZE: collections.OrderedDict(),
- }
- def add_action(self, from_state, action):
- self.transitions[from_state][action.key] = action
- def transition(self, action):
- self.current = self.transitions[self.current][action.key].next_state
- @property
- def actions(self):
- return self.transitions[self.current]
- def is_valid_key(self, key):
- return key in (k for k in self.transitions[self.current].keys())
- class Application(object):
- def __init__(self, config):
- self.config = config
- self.running = True
- def run_command(self, cmd):
- try:
- p = subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- for line in iter(p.stdout.readline, ''):
- self.log.info(line)
- for line in iter(p.stderr.readline, ''):
- self.log.error(line)
- while p.poll() is None:
- pass
- if p.returncode == 0:
- self.log.success("done")
- return True
- self.log.error("Command returned {}".format(p.returncode))
- return False
- except Exception as e:
- self.log.error("{}".format(e))
- return False
- @property
- def prefix(self):
- return os.path.basename(self.config.destination)
- def read_info(self):
- info_file = os.path.join(self.config.destination, '{}.info'.format(self.prefix))
- return read_info_file(info_file)
- def on_quit(self):
- self.running = False
- return True
- def on_sync(self):
- self.log.highlight("Syncing data ...")
- cmd = 'bash sync.sh {} {}'.format(self.config.source, self.config.destination)
- return self.run_command(cmd)
- def on_clean(self):
- self.log.highlight("Cleaning {}".format(self.config.destination))
- return True
- def on_flat_correct(self):
- self.log.highlight("Flat field correction ...")
- info = self.read_info()
- data = dict(path=self.config.destination, prefix=self.prefix, num=info['TOMO_N'], step=1)
- cmd = ('tofu flatcorrect --verbose'
- ' --reduction-mode median'
- ' --projections "{path}/{prefix}*.edf"'
- ' --darks {path}/darkend0000.edf'
- ' --flats {path}/ref*_0000.edf'
- ' --flats2 {path}/ref*_{num}.edf'
- ' --output {path}/fc/fc-%04i.tif'
- ' --number {num}'
- ' --step {step}'
- ' --absorptivity'
- ' --fix-nan-and-inf'.format(**data))
- return self.run_command(cmd)
- def on_optimize(self):
- self.log.highlight("Optimizing ...")
- slices_per_device = 100
- half_range = 1.0
- info = self.read_info()
- axis = (float(info['Col_end']) + 1) / 2.0
- axis_step = 0.25
- axis_start = axis - slices_per_device * axis_step
- axis_stop = axis + slices_per_device * axis_step
- fname = os.path.join(self.config.destination, '{}0000.edf'.format(self.prefix))
- header = read_edf_id19_header(fname)
- motor_pos = extract_motor_positions(header)
- inclination_angle = motor_pos['rytot']
- theta = 90.0 - inclination_angle
- angle_step = 0.025
- angle_start = theta - half_range
- angle_stop = theta + half_range
- self.log.info(" Using theta = {}, inclination angle = {}".format(theta, inclination_angle))
- self.log.info(" Scanning angle within [{}:{}:{}]".format(angle_start, angle_stop, angle_step))
- self.log.info(" Scanning axis within [{}:{}:{}]".format(axis_start, axis_stop, axis_step))
- opt_params = ('--num-iterations 2'
- ' --axis-range={ax_start},{ax_stop},{ax_step}'
- ' --lamino-angle-range={an_start},{an_stop},{an_step}'
- ' --metric kurtosis --z-metric kurtosis'
- .format(ax_start=axis_start, ax_stop=axis_stop, ax_step=axis_step,
- an_start=angle_start, an_stop=angle_stop, an_step=angle_step))
- params = ('--x-region=-960,960,1'
- ' --y-region=-960,960,1'
- ' --overall-angle -360'
- ' --pixel-size {pixel_size}e-6'
- ' --roll-angle 0'
- ' --slices-per-device 100'
- .format(pixel_size=info['PixelSize']))
- cmd = ('optimize-parameters --verbose'
- ' {prefix}/fc'
- ' {opt_params}'
- ' --reco-params "{params}"'
- ' --params-filename params.json'
- .format(opt_params=opt_params, params=params, prefix=self.prefix))
- result = self.run_command(cmd)
- if result == 0:
- with open('params.json') as f:
- opt = json.load(f)
- self.log.highlight(" Optimal axis: {}".format(opt['lamino-angle']['value']))
- self.log.highlight(" Optimal center: {}".format(opt['x-center']['value']))
- return result
- def do_nothing(self):
- return True
- def _run(self, screen):
- curses.curs_set(False)
- height, width = screen.getmaxyx()
- colors = Colors()
- top_pane = screen.subwin(1, width, 0, 0)
- bottom_pane = screen.subwin(height - 1, width, 1, 0)
- left_pane = bottom_pane.subwin(height - 1, width / 3, 1, 0)
- right_pane = bottom_pane.subwin(height - 1, 2 * width / 3, 1, width / 3)
- status_bar = StatusBar(top_pane, colors)
- status_bar.update('Cockpit')
- cmd_window = CommandList(left_pane, colors)
- machine = StateMachine()
- quit = Action('q', 'Quit', self.on_quit, machine.QUIT)
- sync = Action('s', 'Sync', self.on_sync, machine.SYNC)
- flatcorrect = Action('f', 'Flat correct', self.on_flat_correct, machine.FLATCORRECT)
- clean = Action('c', 'Clean', self.on_clean, machine.CLEAN)
- optimize = Action('o', 'Optimize', self.on_optimize, machine.OPTIMIZE)
- machine.add_action(machine.START, sync)
- machine.add_action(machine.START, flatcorrect)
- machine.add_action(machine.START, optimize)
- machine.add_action(machine.START, clean)
- machine.add_action(machine.START, quit)
- machine.add_action(machine.SYNC, flatcorrect)
- machine.add_action(machine.SYNC, clean)
- machine.add_action(machine.SYNC, quit)
- machine.add_action(machine.FLATCORRECT, optimize)
- machine.add_action(machine.FLATCORRECT, quit)
- machine.add_action(machine.OPTIMIZE, sync)
- machine.add_action(machine.OPTIMIZE, clean)
- machine.add_action(machine.OPTIMIZE, quit)
- machine.add_action(machine.CLEAN, sync)
- machine.add_action(machine.CLEAN, quit)
- cmd_window.set_actions(machine.actions)
- self.log = LogList(right_pane, colors)
- self.log.info('Source dir set to {}'.format(self.config.source))
- self.log.info('Destination dir set to {}'.format(self.config.destination))
- while self.running:
- ci = screen.getch()
- cc = chr(ci) if ci < 256 else ''
- if machine.is_valid_key(cc):
- cmd_window.add_character(cc)
- action = machine.actions[cc]
- if action.run():
- machine.transition(action)
- cmd_window.set_actions(machine.actions)
- elif ci == curses.KEY_BACKSPACE:
- cmd_window.backspace()
- screen.refresh()
- def run(self):
- curses.wrapper(self._run)
- if __name__ == '__main__':
- parser = argparse.ArgumentParser()
- parser.add_argument('--source', help='Source directory', default='.', metavar='DIR')
- parser.add_argument('--destination', help='Destination directory', default='.', metavar='DIR')
- args = parser.parse_args()
- config = Configuration(args.source, args.destination)
- app = Application(config)
- app.run()
|