123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119 |
- import threading
- import pcipywrap
- import random
- import os
- import json
- import requests
- import time
- class test_pcipywrap():
- def __init__(self, device, model, num_threads = 150,
- write_percentage = 0.1, register = 'test_prop2',
- server_host = 'http://localhost', server_port = 12412,
- server_message_delay = 0):
- #initialize enviroment variables
- if not 'APP_PATH' in os.environ:
- APP_PATH = ''
- file_dir = os.path.dirname(os.path.abspath(__file__))
- APP_PATH = str(os.path.abspath(file_dir + '/../..'))
- os.environ["APP_PATH"] = APP_PATH
-
- if not 'PCILIB_MODEL_DIR' in os.environ:
- os.environ['PCILIB_MODEL_DIR'] = os.environ["APP_PATH"] + "/xml"
- if not 'LD_LIBRARY_PATH' in os.environ:
- os.environ['LD_LIBRARY_PATH'] = os.environ["APP_PATH"] + "/pcilib"
-
- random.seed()
- #create pcilib_instance
- self.pcilib = pcipywrap.Pcipywrap(device, model)
- self.num_threads = num_threads
- self.write_percentage = write_percentage
- self.register = register
- self.server_message_delay = server_message_delay
- self.server_port = server_port
- self.server_host = server_host
-
- def testThreadSafeReadWrite(self):
- def threadFunc():
- if random.randint(0, 100) >= (self.write_percentage * 100):
- ret = self.pcilib.get_property('/test/prop2')
- print self.register, ':', ret
- del ret
- else:
- val = random.randint(0, 65536)
- print 'set value:', val
- self.pcilib.write_register(val, self.register)
- try:
- while(1):
- thread_list = [threading.Thread(target=threadFunc) for i in range(0, self.num_threads)]
- for i in range(0, self.num_threads):
- thread_list[i].start()
- for i in range(0, self.num_threads):
- thread_list[i].join()
- print 'cycle done'
- except KeyboardInterrupt:
- print 'testing done'
- pass
-
- def testMemoryLeak(self):
- try:
- while(1):
- #print self.pcilib.create_pcilib_instance('/dev/fpga0','test_pywrap')
-
- print self.pcilib.get_property_list('/test')
- print self.pcilib.get_register_info('test_prop1')
- #print self.pcilib.get_registers_list();
-
- #print self.pcilib.read_register('reg1')
- #print self.pcilib.write_register(12, 'reg1')
-
- #print self.pcilib.get_property('/test/prop2')
- #print self.pcilib.set_property(12, '/test/prop2')
- except KeyboardInterrupt:
- print 'testing done'
- pass
- def testServer(self):
- url = str(self.server_host + ':' + str(self.server_port))
- headers = {'content-type': 'application/json'}
- payload =[{'com': 'open', 'data2' : '12341'},
- #{'command': 'open', 'device' : '/dev/fpga0', 'model': 'test_pywrap'},
- {'command': 'help'},
- {'command': 'get_registers_list'},
- {'command': 'get_register_info', 'reg': 'reg1'},
- {'command': 'get_property_list'},
- {'command': 'read_register', 'reg': 'reg1'},
- {'command': 'write_register', 'reg': 'reg1'},
- {'command': 'get_property', 'prop': '/test/prop2'},
- {'command': 'set_property', 'prop': '/test/prop2'}]
-
- def sendRandomMessage():
- message_number = random.randint(1, len(payload) - 1)
- print 'message number: ', message_number
- payload[message_number]['value'] = random.randint(0, 65535)
- r = requests.get(url, data=json.dumps(payload[message_number]), headers=headers)
- print json.dumps(r.json(), sort_keys=True, indent=4, separators=(',', ': '))
-
- try:
- r = requests.get(url, data=json.dumps(payload[1]), headers=headers)
- print json.dumps(r.json(), sort_keys=True, indent=3, separators=(',', ': '))
-
- while(1):
- time.sleep(self.server_message_delay)
- thread_list = [threading.Thread(target=sendRandomMessage) for i in range(0, self.num_threads)]
- for i in range(0, self.num_threads):
- thread_list[i].start()
- for i in range(0, self.num_threads):
- thread_list[i].join()
- print 'cycle done'
-
- except KeyboardInterrupt:
- print 'testing done'
- pass
- if __name__ == '__main__':
- lib = test_pcipywrap('/dev/fpga0','test_pywrap', num_threads = 150,
- write_percentage = 0.1, register = 'test_prop2',server_host = 'http://localhost', server_port = 12412,
- server_message_delay = 0)
- lib.testThreadSafeReadWrite()
|