123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270 |
- import threading
- import pcilib
- import random
- import os
- import json
- import requests
- import time
- from optparse import OptionParser, OptionGroup
- class test_pcilib():
- def __init__(self,
- device,
- model,
- num_threads = 150,
- write_percentage = 0.1,
- register = 'reg1',
- prop = '/test/prop1',
- branch = '/test',
- server_host = 'http://localhost',
- server_port = 12412,
- server_message_delay = 0):
- #initialize enviroment variables
- if not 'APP_PATH' in os.environ:
- APP_PATH = ''
- file_dir = os.path.dirname(os.path.abspath(__file__))
- APP_PATH = str(os.path.abspath(file_dir + '/../..'))
- os.environ["APP_PATH"] = APP_PATH
-
- if not 'PCILIB_MODEL_DIR' in os.environ:
- os.environ['PCILIB_MODEL_DIR'] = os.environ["APP_PATH"] + "/xml"
- if not 'LD_LIBRARY_PATH' in os.environ:
- os.environ['LD_LIBRARY_PATH'] = os.environ["APP_PATH"] + "/pcilib"
-
- random.seed()
- #create pcilib_instance
- self.device = device
- self.model = model
- self.pcilib = pcilib.pcilib(device, model)
- self.num_threads = num_threads
- self.write_percentage = write_percentage
- self.register = register
- self.prop = prop
- self.branch = branch
- self.server_message_delay = server_message_delay
- self.server_port = server_port
- self.server_host = server_host
- def testLocking(self, message, lock_id = None):
- url = str(self.server_host + ':' + str(self.server_port))
- headers = {'content-type': 'application/json'}
- payload =[{'command': 'lock', 'lock_id': lock_id},
- {'command': 'try_lock', 'lock_id': lock_id},
- {'command': 'unlock', 'lock_id': lock_id},
- {'command': 'lock_global'},
- {'command': 'unlock_global'},
- {'command': 'help'}]
- r = requests.get(url, data=json.dumps(payload[message]), headers=headers)
- print(json.dumps(r.json(), sort_keys=True, indent=3, separators=(',', ': ')))
-
- def testThreadSafeReadWrite(self):
- def threadFunc():
- if random.randint(0, 100) >= (self.write_percentage * 100):
- ret = self.pcilib.get_property(self.prop)
- print(self.register, ':', ret)
- del ret
- else:
- val = random.randint(0, 65536)
- print('set value:', val)
- self.pcilib.set_property(val, self.prop)
- try:
- while(1):
- thread_list = [threading.Thread(target=threadFunc) for i in range(0, self.num_threads)]
- for i in range(0, self.num_threads):
- thread_list[i].start()
- for i in range(0, self.num_threads):
- thread_list[i].join()
- print('cycle done')
- except KeyboardInterrupt:
- print('testing done')
- pass
-
- def testMemoryLeak(self):
- try:
- while(1):
- val = random.randint(0, 8096)
- self.pcilib = pcilib.pcilib(self.device, self.model)
- print(self.pcilib.get_property_list(self.branch))
- print(self.pcilib.get_register_info(self.register))
- print(self.pcilib.get_registers_list())
- print(self.pcilib.write_register(val, self.register))
- print(self.pcilib.read_register(self.register))
- print(self.pcilib.set_property(val, self.prop))
- print(self.pcilib.get_property(self.prop))
- except KeyboardInterrupt:
- print('testing done')
- pass
- def testServer(self):
- url = str(self.server_host + ':' + str(self.server_port))
- headers = {'content-type': 'application/json'}
- payload =[{'com': 'open', 'data2' : '12341'},
- #{'command': 'open', 'device' : '/dev/fpga0', 'model': 'test_pywrap'},
- {'command': 'help'},
- {'command': 'get_registers_list'},
- {'command': 'get_register_info', 'reg': self.register},
- {'command': 'get_property_list', 'branch': self.branch},
- {'command': 'read_register', 'reg': self.register},
- {'command': 'write_register', 'reg': self.register},
- {'command': 'get_property', 'prop': self.prop},
- {'command': 'set_property', 'prop': self.prop}]
-
- def sendRandomMessage():
- message_number = random.randint(1, len(payload) - 1)
- print('message number: ', message_number)
- payload[message_number]['value'] = random.randint(0, 8096)
- r = requests.get(url, data=json.dumps(payload[message_number]), headers=headers)
- if(r.headers['content-type'] == 'application/json'):
- print(json.dumps(r.json(), sort_keys=True, indent=3, separators=(',', ': ')))
- else:
- print(r.content)
- try:
- r = requests.get(url, data=json.dumps(payload[1]), headers=headers)
- if(r.headers['content-type'] == 'application/json'):
- print(json.dumps(r.json(), sort_keys=True, indent=3, separators=(',', ': ')))
- else:
- print(r.content)
-
- while(1):
- time.sleep(self.server_message_delay)
- thread_list = [threading.Thread(target=sendRandomMessage) for i in range(0, self.num_threads)]
- for i in range(0, self.num_threads):
- thread_list[i].start()
- for i in range(0, self.num_threads):
- thread_list[i].join()
- print('cycle done')
-
- except KeyboardInterrupt:
- print('testing done')
- pass
- if __name__ == '__main__':
- #parce command line options
- parser = OptionParser()
- parser.add_option("-d", "--device", action="store",
- type="string", dest="device", default=str('/dev/fpga0'),
- help="FPGA device (/dev/fpga0)")
- parser.add_option("-m", "--model", action="store",
- type="string", dest="model", default=str('test'),
- help="Memory model (autodetected)")
- parser.add_option("-t", "--threads", action="store",
- type="int", dest="threads", default=150,
- help="Threads number (150)")
-
- server_group = OptionGroup(parser, "Server Options",
- "Options for testing server.")
- server_group.add_option("-p", "--port", action="store",
- type="int", dest="port", default=9000,
- help="Set testing server port (9000)")
- server_group.add_option("--host", action="store",
- type="string", dest="host", default='http://localhost',
- help="Set testing server host (http://localhost)")
- server_group.add_option("--delay", action="store",
- type="float", dest="delay", default=0.0,
- help="Set delay in seconds between sending messages to setver (0.0)")
- parser.add_option_group(server_group)
-
- rw_group = OptionGroup(parser, "Registers/properties Options",
- "Group stores testing register and property options")
- rw_group.add_option("--write_percentage", action="store",
- type="float", dest="write_percentage", default=0.5,
- help="Set percentage (0.0 - 1.0) of write commands in multithread (0.5)"
- "read/write test")
- rw_group.add_option("-r", "--register", action="store",
- type="string", dest="register", default='reg1',
- help="Set register name (reg1)")
- rw_group.add_option("--property", action="store",
- type="string", dest="prop", default='/test/prop1',
- help="Set property name (/test/prop1)")
- rw_group.add_option("-b", "--branch", action="store",
- type="string", dest="branch", default='/test',
- help="Set property branch (/test)")
-
- parser.add_option_group(rw_group)
-
- test_group = OptionGroup(parser, "Test commands group",
- "This group conatains aviable commands for testing. "
- "If user add more than one command, they will process"
- "sequientally. To stop test, press Ctrl-C."
- )
- test_group.add_option("--test_mt_rw", action="store_true",
- dest="test_mt_rw", default=False,
- help="Multithread read/write test. This test will execute "
- "get_property/set_property commands with random values in multi-thread mode")
- test_group.add_option("--test_memory_leak", action="store_true",
- dest="test_memory_leak", default=False,
- help="Python wrap memory leak test. This test will execute all"
- "Python wrap memory leak test. This test will execute all"
- )
- test_group.add_option("--test_server", action="store_true",
- dest="test_server", default=False,
- help="Python server test. This test will send "
- "random commands to server in multi-thread mode"
- )
- parser.add_option_group(test_group)
-
- lock_group = OptionGroup(parser, "Locking test group")
-
- lock_group.add_option("--lock", action="store",
- dest="lock_id", default=None, type="string",
- help="Lock id."
- )
- lock_group.add_option("--try_lock", action="store",
- dest="try_lock_id", default=None, type="string",
- help="Try to lock id."
- )
- lock_group.add_option("--unlock", action="store",
- dest="unlock_id", default=None, type="string",
- help="Unlock lock with id."
- )
- lock_group.add_option("--lock_global", action="store_true",
- dest="lock_global", default=False,
- help="Global pcilib lock."
- )
- lock_group.add_option("--unlock_global", action="store_true",
- dest="unlock_global", default=False,
- help="Global pcilib unlock."
- )
-
- lock_group.add_option("--get_server_message", action="store_true",
- dest="get_server_message", default=False,
- help="Global pcilib unlock."
- )
-
- parser.add_option_group(lock_group)
-
- opts = parser.parse_args()[0]
- #create pcilib test instance
- lib = test_pcilib(opts.device,
- opts.model,
- num_threads = opts.threads,
- write_percentage = opts.write_percentage,
- register = opts.register,
- prop = opts.prop,
- branch = opts.branch,
- server_host = opts.host,
- server_port = opts.port,
- server_message_delay = opts.delay)
-
- #make some tests
- if opts.test_mt_rw:
- lib.testThreadSafeReadWrite()
- if opts.test_memory_leak:
- lib.testMemoryLeak()
- if opts.test_server:
- lib.testServer()
- if opts.lock_id:
- lib.testLocking(0, opts.lock_id)
- if opts.try_lock_id:
- lib.testLocking(1, opts.try_lock_id)
- if opts.unlock_id:
- lib.testLocking(2, opts.unlock_id)
- if opts.lock_global:
- lib.testLocking(3)
- if opts.unlock_global:
- lib.testLocking(4)
- if(opts.get_server_message):
- lib.testLocking(5)
|