123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195 |
- """
- Communication part of the board package
- """
- import subprocess
- import re
- import logging as log
- import time
- from errors import *
- from ...kcgwidget import error
- class PCI(object):
- def __init__(self):
- pass
- def _safe_call(self, cmd):
- log.debug(cmd)
- try:
- # if '-r' in cmd:
- return subprocess.check_output(cmd)
- # else:
- # subprocess.Popen(cmd, shell=False)
- except subprocess.CalledProcessError as e:
- raise BoardError(e.output)
- except OSError as e:
- if str(e) == "[Errno 2] No such file or directory":
- error(0x003, "Pci command not found. Exiting.")
- raise InterfaceNotFoundError("pci command was not found in your searchpath")
- else:
- raise BoardError('{}: {}'.format(' '.join(cmd), str(e)))
- def _format(self, output):
- output = output.split("\n")
- lines = []
- for line in output:
- if line and line != "\n":
- lines.append(line.split(": ")[1])
- formatted = []
- for line in lines:
- if line and line != "\n":
- formatted += re.findall(r'[\da-fA-F]{8}', line)
- return formatted
- def read(self, board_id, amount=1, reg=None, dma=None, destination=None, decimal=False, timeout=None):
- """
- Read from boards
- :param board_id: id of the board to write to (mandatory)
- :param amount: number of 32byte blocks to read (default is 1) [1]
- :param reg: register to read from [1]
- :param dma: the dma to read from [1]
- :param destination: the destination to write the retrieved data to ('memory' to return the data)
- :param decimal: whether to return the result as decimal number (default is False)
- :param timeout: the timeout for the read (only useful when reading data from dma)
- :return:
- [1]: If neither reg nor dma are given reg will default to 0x9000 and data from registers will be read
- If reg is given, data from registers will be read
- If dma is given, data from dma will be read and amount is ignored
- If both reg and dma are given, an error will be raised
- """
- # either reg or dma have to be None
- assert reg is None or dma is None, "read was called with reg and dma arguments."
- if not reg and not dma:
- reg = '0x9000'
- source = reg if reg else dma
- cmd = ['pci', '-d', available_boards.get_device_file(board_id), '-r', source]
- dst = '/dev/stdout' if destination == 'memory' else destination
- if reg:
- cmd_extend = ["-s%i" % amount]
- else:
- cmd_extend = ['-o', dst, '--multipacket']
- if timeout:
- cmd_extend.extend(['--timeout', str(timeout)])
- log.vinfo('Write data to {}'.format(dst))
- cmd.extend(cmd_extend)
- output = self._safe_call(cmd)
- if reg:
- formatted = self._format(output)
- if decimal:
- if dst:
- with open(dst, 'r+') as f: # to be consistent with use of destination
- f.write(str([int(x, 16) for x in formatted]))
- return
- else:
- return [int(x, 16) for x in formatted]
- else:
- if dst:
- with open(dst, 'r+') as f: # to be consistent with use of destination
- f.write(str(formatted))
- return
- return formatted
- else:
- if dst == '/dev/stdout':
- return output.split('Writting')[0]
- else:
- self.write(board_id, '003f0', hex_mask='CF0') # what does this do?
- def write(self, board_id, value, reg='0x9040', hex_mask='FFFFFFFF'):
- """
- Write to boards
- :param board_id: id of the board to write to (mandatory)
- :param value: value to write (mandatory)
- :param reg: register to write to (optional, default is '0x9040')
- :param hex_mask: hex mask to apply to value before writing (optional)
- :return:
- """
- assert len(hex_mask) <= 8, "Hex Mask has more than 32 bit."
- if hex_mask != 'FFFFFFFF':
- prev_val = self.read(board_id, 1, reg)[0]
- prev_bits = '{0:032b}'.format(int(prev_val, 16))
- mask_bits = '{0:032b}'.format(int(hex_mask, 16))
- value_bits = '{0:032b}'.format(int(value, 16))
- new_bits = list(prev_bits)
- for i, bit in enumerate(mask_bits):
- if bit == '1':
- new_bits[i] = value_bits[i]
- value = hex(int("".join(new_bits), 2))
- cmd = ['pci', '-d', available_boards.get_device_file(board_id), '-w', reg, value]
- self._safe_call(cmd)
- log.debug('Written %str to register %s' % (value, reg))
- def read_data_to_file(self, board_id, filename, dma='dma0', timeout=None):
- """
- Read data from board and write to file
- :param board_id: the board to read from
- :param filename: the filename to write to
- :param dma: the dma to use?
- :param timeout: if not None: the timeout for the underlying pci command
- :return:
- """
- return self.read(board_id, dma=dma, destination=filename, timeout=timeout)
- def read_data_to_variable(self, board_id, dma='dma0', timeout=None):
- """
- Read data and return it.
- :param board_id: the board to read from
- :param dma: the dma to use?
- :param timeout: if not None: the timeout for the underlying pci command
- :return: string with data read from board
- """
- return self.read(board_id, dma=dma, timeout=timeout, destination='memory')
- def start_dma(self, board_id, dma='dma0r'):
- """
- Start dma engine.
- :param board_id: the board to start the dma engine for
- :param dma: the dma to use
- :return:
- """
- # TODO: implement identifier usage
- log.vinfo('Start DMA')
- cmd = ['pci', '-d', available_boards.get_device_file(board_id), '--start-dma', dma]
- self._safe_call(cmd)
- time.sleep(0.05)
- def stop_dma(self, board_id, dma='dma0r'):
- """
- Stop dma engine.
- :param board_id: the board to stop the dma engine for
- :param dma: the dma to use
- :return:
- """
- # TODO: implement identifier usage
- log.vinfo('Stop DMA')
- cmd = ['pci', '-d', available_boards.get_device_file(board_id), '--stop-dma', dma]
- self._safe_call(cmd)
- time.sleep(0.05)
- def info(self, board_id=None, dev_file=None):
- """
- Get Device info (output of pci -i)
- :return: Information string returned by pci -i
- """
- assert board_id != dev_file, "info got both board_id and dev_file or got none of both"
- if board_id is not None:
- cmd = ['pci', '-d', available_boards.get_device_file(board_id), '-i']
- else:
- cmd = ['pci', '-d', dev_file, '-i']
- return self._safe_call(cmd)
- pci = PCI()
- from boards_connected import available_boards # this import has to be here as boards_connected imports pci
|