test_pcilib.py 12 KB


  1. import threading
  2. import pcilib
  3. import random
  4. import os
  5. import json
  6. import requests
  7. import time
  8. from optparse import OptionParser, OptionGroup
  9. class test_pcilib():
  10. def __init__(self,
  11. device,
  12. model,
  13. num_threads = 150,
  14. write_percentage = 0.1,
  15. register = 'reg1',
  16. prop = '/test/prop1',
  17. branch = '/test',
  18. server_host = 'http://localhost',
  19. server_port = 12412,
  20. server_message_delay = 0):
  21. #initialize enviroment variables
  22. if not 'APP_PATH' in os.environ:
  23. APP_PATH = ''
  24. file_dir = os.path.dirname(os.path.abspath(__file__))
  25. APP_PATH = str(os.path.abspath(file_dir + '/../..'))
  26. os.environ["APP_PATH"] = APP_PATH
  27. if not 'PCILIB_MODEL_DIR' in os.environ:
  28. os.environ['PCILIB_MODEL_DIR'] = os.environ["APP_PATH"] + "/xml"
  29. if not 'LD_LIBRARY_PATH' in os.environ:
  30. os.environ['LD_LIBRARY_PATH'] = os.environ["APP_PATH"] + "/pcilib"
  31. random.seed()
  32. #create pcilib_instance
  33. self.device = device
  34. self.model = model
  35. self.pcilib = pcilib.pcilib(device, model)
  36. self.num_threads = num_threads
  37. self.write_percentage = write_percentage
  38. self.register = register
  39. self.prop = prop
  40. self.branch = branch
  41. self.server_message_delay = server_message_delay
  42. self.server_port = server_port
  43. self.server_host = server_host
  44. def testLocking(self, message, lock_id = None):
  45. url = str(self.server_host + ':' + str(self.server_port))
  46. headers = {'content-type': 'application/json'}
  47. payload =[{'command': 'lock', 'lock_id': lock_id},
  48. {'command': 'try_lock', 'lock_id': lock_id},
  49. {'command': 'unlock', 'lock_id': lock_id},
  50. {'command': 'lock_global'},
  51. {'command': 'unlock_global'},
  52. {'command': 'help'}]
  53. r = requests.get(url, data=json.dumps(payload[message]), headers=headers)
  54. print(json.dumps(r.json(), sort_keys=True, indent=3, separators=(',', ': ')))
  55. def testThreadSafeReadWrite(self):
  56. def threadFunc():
  57. if random.randint(0, 100) >= (self.write_percentage * 100):
  58. ret = self.pcilib.get_property(self.prop)
  59. print(self.register, ':', ret)
  60. del ret
  61. else:
  62. val = random.randint(0, 65536)
  63. print('set value:', val)
  64. self.pcilib.set_property(val, self.prop)
  65. try:
  66. while(1):
  67. thread_list = [threading.Thread(target=threadFunc) for i in range(0, self.num_threads)]
  68. for i in range(0, self.num_threads):
  69. thread_list[i].start()
  70. for i in range(0, self.num_threads):
  71. thread_list[i].join()
  72. print('cycle done')
  73. except KeyboardInterrupt:
  74. print('testing done')
  75. pass
  76. def testMemoryLeak(self):
  77. try:
  78. while(1):
  79. val = random.randint(0, 8096)
  80. self.pcilib = pcilib.pcilib(self.device, self.model)
  81. print(self.pcilib.get_property_list(self.branch))
  82. print(self.pcilib.get_register_info(self.register))
  83. print(self.pcilib.get_registers_list())
  84. print(self.pcilib.write_register(val, self.register))
  85. print(self.pcilib.read_register(self.register))
  86. print(self.pcilib.set_property(val, self.prop))
  87. print(self.pcilib.get_property(self.prop))
  88. except KeyboardInterrupt:
  89. print('testing done')
  90. pass
  91. def testServer(self):
  92. url = str(self.server_host + ':' + str(self.server_port))
  93. headers = {'content-type': 'application/json'}
  94. payload =[{'com': 'open', 'data2' : '12341'},
  95. #{'command': 'open', 'device' : '/dev/fpga0', 'model': 'test_pywrap'},
  96. {'command': 'help'},
  97. {'command': 'get_registers_list'},
  98. {'command': 'get_register_info', 'reg': self.register},
  99. {'command': 'get_property_list', 'branch': self.branch},
  100. {'command': 'read_register', 'reg': self.register},
  101. {'command': 'write_register', 'reg': self.register},
  102. {'command': 'get_property', 'prop': self.prop},
  103. {'command': 'set_property', 'prop': self.prop}]
  104. def sendRandomMessage():
  105. message_number = random.randint(1, len(payload) - 1)
  106. print('message number: ', message_number)
  107. payload[message_number]['value'] = random.randint(0, 8096)
  108. r = requests.get(url, data=json.dumps(payload[message_number]), headers=headers)
  109. if(r.headers['content-type'] == 'application/json'):
  110. print(json.dumps(r.json(), sort_keys=True, indent=3, separators=(',', ': ')))
  111. else:
  112. print(r.content)
  113. try:
  114. r = requests.get(url, data=json.dumps(payload[1]), headers=headers)
  115. if(r.headers['content-type'] == 'application/json'):
  116. print(json.dumps(r.json(), sort_keys=True, indent=3, separators=(',', ': ')))
  117. else:
  118. print(r.content)
  119. while(1):
  120. time.sleep(self.server_message_delay)
  121. thread_list = [threading.Thread(target=sendRandomMessage) for i in range(0, self.num_threads)]
  122. for i in range(0, self.num_threads):
  123. thread_list[i].start()
  124. for i in range(0, self.num_threads):
  125. thread_list[i].join()
  126. print('cycle done')
  127. except KeyboardInterrupt:
  128. print('testing done')
  129. pass
  130. if __name__ == '__main__':
  131. #parce command line options
  132. parser = OptionParser()
  133. parser.add_option("-d", "--device", action="store",
  134. type="string", dest="device", default=str('/dev/fpga0'),
  135. help="FPGA device (/dev/fpga0)")
  136. parser.add_option("-m", "--model", action="store",
  137. type="string", dest="model", default=str('test'),
  138. help="Memory model (autodetected)")
  139. parser.add_option("-t", "--threads", action="store",
  140. type="int", dest="threads", default=150,
  141. help="Threads number (150)")
  142. server_group = OptionGroup(parser, "Server Options",
  143. "Options for testing server.")
  144. server_group.add_option("-p", "--port", action="store",
  145. type="int", dest="port", default=9000,
  146. help="Set testing server port (9000)")
  147. server_group.add_option("--host", action="store",
  148. type="string", dest="host", default='http://localhost',
  149. help="Set testing server host (http://localhost)")
  150. server_group.add_option("--delay", action="store",
  151. type="float", dest="delay", default=0.0,
  152. help="Set delay in seconds between sending messages to setver (0.0)")
  153. parser.add_option_group(server_group)
  154. rw_group = OptionGroup(parser, "Registers/properties Options",
  155. "Group stores testing register and property options")
  156. rw_group.add_option("--write_percentage", action="store",
  157. type="float", dest="write_percentage", default=0.5,
  158. help="Set percentage (0.0 - 1.0) of write commands in multithread (0.5)"
  159. "read/write test")
  160. rw_group.add_option("-r", "--register", action="store",
  161. type="string", dest="register", default='reg1',
  162. help="Set register name (reg1)")
  163. rw_group.add_option("--property", action="store",
  164. type="string", dest="prop", default='/test/prop1',
  165. help="Set property name (/test/prop1)")
  166. rw_group.add_option("-b", "--branch", action="store",
  167. type="string", dest="branch", default='/test',
  168. help="Set property branch (/test)")
  169. parser.add_option_group(rw_group)
  170. test_group = OptionGroup(parser, "Test commands group",
  171. "This group conatains aviable commands for testing. "
  172. "If user add more than one command, they will process"
  173. "sequientally. To stop test, press Ctrl-C."
  174. )
  175. test_group.add_option("--test_mt_rw", action="store_true",
  176. dest="test_mt_rw", default=False,
  177. help="Multithread read/write test. This test will execute "
  178. "get_property/set_property commands with random values in multi-thread mode")
  179. test_group.add_option("--test_memory_leak", action="store_true",
  180. dest="test_memory_leak", default=False,
  181. help="Python wrap memory leak test. This test will execute all"
  182. "Python wrap memory leak test. This test will execute all"
  183. )
  184. test_group.add_option("--test_server", action="store_true",
  185. dest="test_server", default=False,
  186. help="Python server test. This test will send "
  187. "random commands to server in multi-thread mode"
  188. )
  189. parser.add_option_group(test_group)
  190. lock_group = OptionGroup(parser, "Locking test group")
  191. lock_group.add_option("--lock", action="store",
  192. dest="lock_id", default=None, type="string",
  193. help="Lock id."
  194. )
  195. lock_group.add_option("--try_lock", action="store",
  196. dest="try_lock_id", default=None, type="string",
  197. help="Try to lock id."
  198. )
  199. lock_group.add_option("--unlock", action="store",
  200. dest="unlock_id", default=None, type="string",
  201. help="Unlock lock with id."
  202. )
  203. lock_group.add_option("--lock_global", action="store_true",
  204. dest="lock_global", default=False,
  205. help="Global pcilib lock."
  206. )
  207. lock_group.add_option("--unlock_global", action="store_true",
  208. dest="unlock_global", default=False,
  209. help="Global pcilib unlock."
  210. )
  211. lock_group.add_option("--get_server_message", action="store_true",
  212. dest="get_server_message", default=False,
  213. help="Global pcilib unlock."
  214. )
  215. parser.add_option_group(lock_group)
  216. opts = parser.parse_args()[0]
  217. #create pcilib test instance
  218. lib = test_pcilib(opts.device,
  219. opts.model,
  220. num_threads = opts.threads,
  221. write_percentage = opts.write_percentage,
  222. register = opts.register,
  223. prop = opts.prop,
  224. branch = opts.branch,
  225. server_host = opts.host,
  226. server_port = opts.port,
  227. server_message_delay = opts.delay)
  228. #make some tests
  229. if opts.test_mt_rw:
  230. lib.testThreadSafeReadWrite()
  231. if opts.test_memory_leak:
  232. lib.testMemoryLeak()
  233. if opts.test_server:
  234. lib.testServer()
  235. if opts.lock_id:
  236. lib.testLocking(0, opts.lock_id)
  237. if opts.try_lock_id:
  238. lib.testLocking(1, opts.try_lock_id)
  239. if opts.unlock_id:
  240. lib.testLocking(2, opts.unlock_id)
  241. if opts.lock_global:
  242. lib.testLocking(3)
  243. if opts.unlock_global:
  244. lib.testLocking(4)
  245. if(opts.get_server_message):
  246. lib.testLocking(5)