123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336 |
- #!/usr/bin/env python
- import os
- import shlex
- import collections
- import datetime
- import logging
- import random
- import argparse
- import subprocess
- import curses
- class Configuration(object):
- def __init__(self, source, destination):
- self.source = os.path.abspath(source)
- self.destination = os.path.abspath(destination)
- class LineList(object):
- def __init__(self, window):
- self.window = window
- self.height, self.width = window.getmaxyx()
- self.current = 0
- self.lines = []
- def redraw(self):
- for y in range(len(self.lines)):
- line, attr = self.lines[y]
- self.window.addnstr(y, 0, line, self.width, attr)
- self.window.clrtoeol()
- self.window.refresh()
- def add_line(self, s, attr=0):
- self.lines.append((s, attr))
- if len(self.lines) > self.height:
- self.lines = self.lines[1:]
- self.redraw()
- def update_last(self, line=None, attr=None):
- if not self.lines:
- return
- old_line, old_attr = self.lines[-1]
- self.lines[-1] = (line or old_line, attr or old_attr)
- self.redraw()
- class Colors(object):
- NORMAL = 1
- SUCCESS = 2
- WARN = 3
- ERROR = 4
- STATUS_BAR = 5
- HIGHLIGHT = 6
- def __init__(self):
- curses.init_pair(Colors.NORMAL, curses.COLOR_WHITE, curses.COLOR_BLACK)
- curses.init_pair(Colors.SUCCESS, curses.COLOR_GREEN, curses.COLOR_BLACK)
- curses.init_pair(Colors.WARN, curses.COLOR_YELLOW, curses.COLOR_BLACK)
- curses.init_pair(Colors.ERROR, curses.COLOR_RED, curses.COLOR_BLACK)
- curses.init_pair(Colors.STATUS_BAR, curses.COLOR_BLACK, curses.COLOR_YELLOW)
- curses.init_pair(Colors.HIGHLIGHT, curses.COLOR_WHITE, curses.COLOR_BLACK)
- self.attrs = {
- Colors.NORMAL: curses.A_NORMAL,
- Colors.SUCCESS: curses.A_NORMAL,
- Colors.WARN: curses.A_BOLD,
- Colors.ERROR: curses.A_BOLD,
- Colors.STATUS_BAR: curses.A_NORMAL,
- Colors.HIGHLIGHT: curses.A_BOLD,
- }
- def get(self, code):
- return curses.color_pair(code) | self.attrs[code]
- class StatusBar(object):
- def __init__(self, window, colors):
- self.window = window
- self.c = colors
- def update(self, s):
- self.window.bkgd(' ', self.c.get(Colors.STATUS_BAR))
- self.window.addstr(s, self.c.get(Colors.STATUS_BAR))
- class LogList(LineList):
- def __init__(self, window, colors):
- self.line_list = LineList(window)
- self.c = colors
- def _log_time(self, s, attr):
- timestamp = datetime.datetime.now().strftime('%H:%M:%S')
- self.line_list.add_line('[{}] {}'.format(timestamp, s), attr)
- def info(self, s):
- self._log_time(s, self.c.get(Colors.NORMAL))
- def success(self, s):
- self._log_time(s, self.c.get(Colors.SUCCESS))
- def warn(self, s):
- self._log_time(s, self.c.get(Colors.WARN))
- def error(self, s):
- self._log_time(s, self.c.get(Colors.ERROR))
- class CommandList(LineList):
-
- def __init__(self, window, colors):
- self.line_list = LineList(window)
- self.normal = colors.get(Colors.NORMAL)
- self.highlight = colors.get(Colors.HIGHLIGHT)
- self.current = ''
- def add_character(self, c):
- self.current += c
- self.line_list.update_last(line='> {}'.format(self.current))
- def backspace(self):
- if self.current:
- self.current = self.current[:len(self.current) - 1]
- self.add_character('')
- def set_actions(self, actions):
- self.line_list.update_last(attr=self.normal)
- self.current = ''
- message = ' | '.join(('[{}] {}'.format(a.key, a.note) for a in actions.values()))
- self.line_list.add_line(message, self.highlight)
- class Action(object):
- def __init__(self, key, note, func, next_state):
- self.key = key
- self.note = note
- self.run = func
- self.next_state = next_state
- def __repr__(self):
- return '<Action:key={}>'.format(self.key)
- class StateMachine(object):
- START = 0
- QUIT = 1
- SYNC = 2
- CLEAN = 3
- FLATCORRECT = 4
- OPTIMIZE = 5
- def __init__(self):
- self.current = StateMachine.START
- self.transitions = {
- StateMachine.START: collections.OrderedDict(),
- StateMachine.CLEAN: collections.OrderedDict(),
- StateMachine.FLATCORRECT: collections.OrderedDict(),
- StateMachine.QUIT: collections.OrderedDict(),
- StateMachine.SYNC: collections.OrderedDict(),
- StateMachine.OPTIMIZE: collections.OrderedDict(),
- }
- def add_action(self, from_state, action):
- self.transitions[from_state][action.key] = action
- def transition(self, action):
- self.current = self.transitions[self.current][action.key].next_state
- @property
- def actions(self):
- return self.transitions[self.current]
- def is_valid_key(self, key):
- return key in (k for k in self.transitions[self.current].keys())
- class Application(object):
- def __init__(self, config):
- self.config = config
- self.running = True
- def run_command(self, cmd):
- try:
- p = subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- stdout, stderr = p.communicate()
- def output_lines(pipe, log_func):
- for line in pipe.split(os.linesep):
- if line:
- log_func(line)
- output_lines(stdout, self.log.info)
- output_lines(stderr, self.log.error)
- if p.returncode == 0:
- self.log.success("done")
- return True
- self.log.error("Command returned {}".format(p.returncode))
- return False
- except OSError as e:
- self.log.error("{}".format(e))
- return False
- def on_quit(self):
- self.running = False
- return True
- def on_sync(self):
- self.log.info("Syncing data ...")
- cmd = 'bash sync.sh {} {}'.format(self.config.source, self.config.destination)
- return self.run_command(cmd)
- def on_clean(self):
- self.log.info("Cleaning {}".format(self.config.destination))
- return True
- def on_flat_correct(self):
- num = 0
- data = dict(p=self.config.destination, num=num, step=123))
- cmd = ('tofu flatcorrect --verbose'
- ' --reduction-mode median'
- ' --projections "{p}/foo*.edf"'
- ' --darks {p}/darkend0000.edf'
- ' --flats {p}/ref*_0000.edf'
- ' --flats2 {p}/ref*_{num}.edf'
- ' --output {p}/fc/fc-%04i.tif'
- ' --number {num}'
- ' --step {step}'
- ' --absorptivity'
- ' --fix-nan-and-inf'.format(**data))
- return self.run_command(cmd)
- def on_optimize(self):
- self.log.info("Optimizing ...")
- return True
- def do_nothing(self):
- return True
- def _run(self, screen):
- curses.curs_set(False)
- height, width = screen.getmaxyx()
- colors = Colors()
- top_pane = screen.subwin(1, width, 0, 0)
- bottom_pane = screen.subwin(height - 1, width, 1, 0)
- left_pane = bottom_pane.subwin(height - 1, width / 2, 1, 0)
- right_pane = bottom_pane.subwin(height - 1, width / 2, 1, width / 2)
- status_bar = StatusBar(top_pane, colors)
- status_bar.update('Cockpit')
- cmd_window = CommandList(left_pane, colors)
- machine = StateMachine()
- quit = Action('q', 'Quit', self.on_quit, machine.QUIT)
- sync = Action('s', 'Sync data', self.on_sync, machine.SYNC)
- flatcorrect = Action('f', 'Flat correct', self.on_flat_correct, machine.FLATCORRECT)
- clean = Action('c', 'Clean', self.on_clean, machine.CLEAN)
- optimize = Action('o', 'Optimize', self.on_optimize, machine.OPTIMIZE)
- machine.add_action(machine.START, sync)
- machine.add_action(machine.START, clean)
- machine.add_action(machine.START, quit)
- machine.add_action(machine.SYNC, flatcorrect)
- machine.add_action(machine.SYNC, clean)
- machine.add_action(machine.SYNC, quit)
- machine.add_action(machine.FLATCORRECT, optimize)
- machine.add_action(machine.FLATCORRECT, quit)
- machine.add_action(machine.OPTIMIZE, sync)
- machine.add_action(machine.OPTIMIZE, clean)
- machine.add_action(machine.OPTIMIZE, quit)
- machine.add_action(machine.CLEAN, sync)
- machine.add_action(machine.CLEAN, quit)
- cmd_window.set_actions(machine.actions)
- self.log = LogList(right_pane, colors)
- self.log.info('Source dir set to {}'.format(self.config.source))
- self.log.info('Destination dir set to {}'.format(self.config.destination))
- while self.running:
- ci = screen.getch()
- cc = chr(ci) if ci < 256 else ''
- if machine.is_valid_key(cc):
- cmd_window.add_character(cc)
- action = machine.actions[cc]
- if action.run():
- machine.transition(action)
- cmd_window.set_actions(machine.actions)
- elif ci == curses.KEY_BACKSPACE:
- cmd_window.backspace()
- screen.refresh()
- def run(self):
- curses.wrapper(self._run)
- if __name__ == '__main__':
- parser = argparse.ArgumentParser()
- parser.add_argument('--source', help='Source directory', default='.', metavar='DIR')
- parser.add_argument('--destination', help='Destination directory', default='.', metavar='DIR')
- args = parser.parse_args()
- config = Configuration(args.source, args.destination)
- app = Application(config)
- app.run()
|