test_pcipywrap.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. import threading
  2. import pcipywrap
  3. import random
  4. import os
  5. import json
  6. import requests
  7. import time
  8. class test_pcipywrap():
  9. def __init__(self, device, model, num_threads = 150,
  10. write_percentage = 0.1, register = 'test_prop2',
  11. server_host = 'http://localhost', server_port = 12412,
  12. server_message_delay = 0):
  13. #initialize enviroment variables
  14. if not 'APP_PATH' in os.environ:
  15. APP_PATH = ''
  16. file_dir = os.path.dirname(os.path.abspath(__file__))
  17. APP_PATH = str(os.path.abspath(file_dir + '/../..'))
  18. os.environ["APP_PATH"] = APP_PATH
  19. if not 'PCILIB_MODEL_DIR' in os.environ:
  20. os.environ['PCILIB_MODEL_DIR'] = os.environ["APP_PATH"] + "/xml"
  21. if not 'LD_LIBRARY_PATH' in os.environ:
  22. os.environ['LD_LIBRARY_PATH'] = os.environ["APP_PATH"] + "/pcilib"
  23. random.seed()
  24. #create pcilib_instance
  25. self.pcilib = pcipywrap.Pcipywrap(device, model)
  26. self.num_threads = num_threads
  27. self.write_percentage = write_percentage
  28. self.register = register
  29. self.server_message_delay = server_message_delay
  30. self.server_port = server_port
  31. self.server_host = server_host
  32. def testThreadSafeReadWrite(self):
  33. def threadFunc():
  34. if random.randint(0, 100) >= (self.write_percentage * 100):
  35. ret = self.pcilib.get_property('/test/prop2')
  36. print self.register, ':', ret
  37. del ret
  38. else:
  39. val = random.randint(0, 65536)
  40. print 'set value:', val
  41. self.pcilib.write_register(val, self.register)
  42. try:
  43. while(1):
  44. thread_list = [threading.Thread(target=threadFunc) for i in range(0, self.num_threads)]
  45. for i in range(0, self.num_threads):
  46. thread_list[i].start()
  47. for i in range(0, self.num_threads):
  48. thread_list[i].join()
  49. print 'cycle done'
  50. except KeyboardInterrupt:
  51. print 'testing done'
  52. pass
  53. def testMemoryLeak(self):
  54. try:
  55. while(1):
  56. #print self.pcilib.create_pcilib_instance('/dev/fpga0','test_pywrap')
  57. print self.pcilib.get_property_list('/test')
  58. print self.pcilib.get_register_info('test_prop1')
  59. #print self.pcilib.get_registers_list();
  60. #print self.pcilib.read_register('reg1')
  61. #print self.pcilib.write_register(12, 'reg1')
  62. #print self.pcilib.get_property('/test/prop2')
  63. #print self.pcilib.set_property(12, '/test/prop2')
  64. except KeyboardInterrupt:
  65. print 'testing done'
  66. pass
  67. def testServer(self):
  68. url = str(self.server_host + ':' + str(self.server_port))
  69. headers = {'content-type': 'application/json'}
  70. payload =[{'com': 'open', 'data2' : '12341'},
  71. #{'command': 'open', 'device' : '/dev/fpga0', 'model': 'test_pywrap'},
  72. {'command': 'help'},
  73. {'command': 'get_registers_list'},
  74. {'command': 'get_register_info', 'reg': 'reg1'},
  75. {'command': 'get_property_list'},
  76. {'command': 'read_register', 'reg': 'reg1'},
  77. {'command': 'write_register', 'reg': 'reg1'},
  78. {'command': 'get_property', 'prop': '/test/prop2'},
  79. {'command': 'set_property', 'prop': '/test/prop2'}]
  80. def sendRandomMessage():
  81. message_number = random.randint(1, len(payload) - 1)
  82. print 'message number: ', message_number
  83. payload[message_number]['value'] = random.randint(0, 65535)
  84. r = requests.get(url, data=json.dumps(payload[message_number]), headers=headers)
  85. print json.dumps(r.json(), sort_keys=True, indent=4, separators=(',', ': '))
  86. try:
  87. r = requests.get(url, data=json.dumps(payload[1]), headers=headers)
  88. print json.dumps(r.json(), sort_keys=True, indent=3, separators=(',', ': '))
  89. while(1):
  90. time.sleep(self.server_message_delay)
  91. thread_list = [threading.Thread(target=sendRandomMessage) for i in range(0, self.num_threads)]
  92. for i in range(0, self.num_threads):
  93. thread_list[i].start()
  94. for i in range(0, self.num_threads):
  95. thread_list[i].join()
  96. print 'cycle done'
  97. except KeyboardInterrupt:
  98. print 'testing done'
  99. pass
  100. if __name__ == '__main__':
  101. lib = test_pcipywrap('/dev/fpga0','test_pywrap', num_threads = 150,
  102. write_percentage = 0.1, register = 'test_prop2',server_host = 'http://localhost', server_port = 12412,
  103. server_message_delay = 0)
  104. lib.testThreadSafeReadWrite()