olctest base on pytest driver

秒速五厘米 2021-12-01 15:16 438阅读 0赞

MyDemo for pytest driver

  1. #!/usr/bin/env python3
  2. import json
  3. import binascii
  4. import asn1tools
  5. import paho.mqtt.client as mqtt
  6. import requests
  7. import logging
  8. import serial
  9. import threading
  10. import time
  11. import datetime
  12. import random
  13. from enum import Enum
  14. from queue import Queue
  15. class ResponseCode(Enum):
  16. CREATED = 2 << 5 | 1
  17. DELETED = 2 << 5 | 2
  18. VALID = 2 << 5 | 3
  19. CHANGED = 2 << 5 | 4
  20. CONTENT = 2 << 5 | 5
  21. CONTINUE = 2 << 5 | 31
  22. BAD_REQUEST = 4 << 5 | 0
  23. UNAUTHORIZED = 4 << 5 | 1
  24. BAD_OPTION = 4 << 5 | 2
  25. FORBIDDEN = 4 << 5 | 3
  26. NOT_FOUND = 4 << 5 | 4
  27. METHOD_NOT_ALLOWED = 4 << 5 | 5
  28. NOT_ACCEPTABLE = 4 << 5 | 6
  29. REQUEST_ENTITY_INCOMPLETE = 4 << 5 | 8
  30. CONFLICT = 4 << 5 | 9
  31. PRECONDITION_FAILED = 4 << 5 | 12
  32. REQUEST_ENTITY_TOO_LARGE = 4 << 5 | 13
  33. UNSUPPORTED_CONTENT_FORMAT = 4 << 5 | 15
  34. UNPROCESSABLE_ENTITY = 4 << 5 | 22
  35. INTERNAL_SERVER_ERROR = 5 << 5 | 0
  36. NOT_IMPLEMENTED = 5 << 5 | 1
  37. BAD_GATEWAY = 5 << 5 | 2
  38. SERVICE_UNAVAILABLE = 5 << 5 | 3
  39. GATEWAY_TIMEOUT = 5 << 5 | 4
  40. PROXY_NOT_SUPPORTED = 5 << 5 | 5
  41. class CoapMessage:
  42. def load(self, dict_obj):
  43. for p in self.__dict__:
  44. if p == 'block2' and dict_obj[p] is not None:
  45. bob = BlockOptionBean()
  46. bob.load(dict_obj[p])
  47. self.__dict__[p] = bob
  48. else:
  49. self.__dict__[p] = dict_obj[p]
  50. def __str__(self):
  51. attrs = ", ".join("{}={}".format(k, getattr(self, k)) for k in self.__dict__.keys())
  52. return "[{}: {}]".format(self.__class__.__name__, attrs)
  53. class RequestBean(CoapMessage):
  54. def __init__(self):
  55. self.type = None
  56. self.code = None
  57. self.url = None
  58. self.contentFormat = None
  59. self.payload = None
  60. self.block2 = None
  61. class ResponseBean(CoapMessage):
  62. def __init__(self):
  63. self.code = None
  64. self.contentFormat = None
  65. self.payload = None
  66. self.block2 = None
  67. class BlockOptionBean(CoapMessage):
  68. def __init__(self):
  69. self.szx = None
  70. self.m = None
  71. self.num = None
  72. class TriggerTool:
  73. def trigger(self):
  74. pass
  75. class UartTriggerTool:
  76. def __init__(self, uart):
  77. self.uart = uart
  78. def trigger(self):
  79. logging.debug('uart trigger sync')
  80. self.uart.send('')
  81. time.sleep(1)
  82. self.uart.send('sync')
  83. class HttpTriggerTool:
  84. def trigger(self):
  85. logging.debug('trigger sync')
  86. url = 'http://127.0.0.1:18000/api/v1/simulator/sms'
  87. requests.post(url, 'test sms tirggered.')
  88. class ManualTriggerTool:
  89. def __init__(self, timeout):
  90. self.timeout = timeout
  91. def trigger(self):
  92. logging.info('please trigger the sync within %d seconds.' % self.timeout)
  93. time.sleep(self.timeout + 2)
  94. class SerialTool:
  95. encoding = 'ascii'
  96. error_policy = 'ignore'
  97. def __init__(self, port, baud, logfile):
  98. self.port = port
  99. self.baud = baud
  100. self.logfile = logfile
  101. self.terminated = None
  102. self.serial = None
  103. self._thread = None
  104. def loop(self):
  105. self.serial = serial.Serial(self.port, self.baud)
  106. logging.info('serial open status is %s' % self.serial.is_open)
  107. f = open(self.logfile, 'a')
  108. while not self.terminated:
  109. try:
  110. text = self.serial.readline()
  111. text = str(text, encoding=self.encoding, errors=self.error_policy)
  112. text = text.replace("\r", '')
  113. text = text.replace("\n", '')
  114. text = '%s -- %s\n' % (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S %f'), text)
  115. f.write(text)
  116. f.flush()
  117. except BaseException as e:
  118. logging.error('serial thread: %s' % e)
  119. f.close()
  120. self.serial.close()
  121. self.serial = None
  122. def loop_start(self):
  123. if self._thread is None:
  124. self.terminated = False
  125. self._thread = threading.Thread(target=self.loop)
  126. self._thread.daemon = True
  127. self._thread.start()
  128. def loop_stop(self):
  129. if self._thread is not None:
  130. self.terminated = True
  131. self.serial.cancel_read()
  132. if threading.current_thread() != self._thread:
  133. self._thread.join()
  134. self._thread = None
  135. def send(self, text):
  136. if self.serial is not None and self.serial.is_open:
  137. text = text + "\r"
  138. text = text.encode(encoding=self.encoding, errors=self.error_policy)
  139. self.serial.write(text)
  140. else:
  141. logging.error('serial port is not opened.')
  142. class MqttTool:
  143. def __init__(self, host, port, qos, timeout):
  144. self.host = host
  145. self.port = port
  146. self.qos = qos
  147. self.queue = Queue()
  148. self.mqtt_client = None
  149. self.timeout = timeout
  150. def on_message(self, client, userdata, msg):
  151. logging.debug('get a message: %s' % msg)
  152. self.queue.put(msg)
  153. def subscribe(self, topic):
  154. self.mqtt_client.subscribe(topic, self.qos)
  155. logging.debug('subscribe to %s' % topic)
  156. def unsubscribe(self, topic):
  157. self.mqtt_client.unsubscribe(topic)
  158. logging.debug('unsubscribe %s' % topic)
  159. def receive_msg(self, timeout=None):
  160. logging.debug('waiting for message.')
  161. if timeout is None:
  162. timeout = self.timeout
  163. return self.queue.get(timeout=timeout)
  164. def publish(self, topic, blob):
  165. self.mqtt_client.publish(topic, blob)
  166. def loop_start(self):
  167. if self.mqtt_client is None:
  168. self.mqtt_client = mqtt.Client()
  169. self.mqtt_client.on_message = self.on_message
  170. self.mqtt_client.connect(self.host, self.port, self.timeout)
  171. self.mqtt_client.loop_start()
  172. def loop_stop(self):
  173. if self.mqtt_client is not None:
  174. self.mqtt_client.loop_stop()
  175. self.mqtt_client.disconnect()
  176. self.mqtt_client = None
  177. class ParserTool:
  178. scheme_file = 'ctc-message-schema.asn1',
  179. codec = 'uper'
  180. def __init__(self):
  181. self.asn1compiler = asn1tools.compile_files(self.scheme_file, self.codec)
  182. self.obj_names = {
  183. 'POST /s/2': ('SyncRequest', 'SyncResponse2'),
  184. 'GET /s/2': ('', 'SyncResponse2')}
  185. def serialize(self, obj):
  186. if isinstance(obj, ResponseCode):
  187. return obj.name
  188. else:
  189. return obj.__dict__
  190. def asn1_encode(self, name, bean):
  191. return self.asn1compiler.encode(name, bean, check_constraints=True)
  192. def asn1_decode(self, name, blob):
  193. return self.asn1compiler.decode(name, blob, check_constraints=True)
  194. def get_resource_path(self, request_bean):
  195. url = request_bean.url
  196. if url.startswith('coaps'):
  197. url = url[8:]
  198. i = url.index('/')
  199. return url[i:]
  200. def get_obj_path(self, resource_path):
  201. tmp = resource_path.split('/')
  202. if len(tmp) == 4:
  203. return '/%s/%s/_/%s' % (tmp[1], tmp[2], tmp[3])
  204. return resource_path
  205. def get_obj_name(self, code, path):
  206. key = '%s %s' % (code, path)
  207. return self.obj_names[key]
  208. def decode_request_payload(self, request_bean):
  209. if request_bean.payload is not None and len(request_bean.payload) > 0:
  210. path = self.get_resource_path(request_bean)
  211. path = self.get_obj_path(path)
  212. names = self.get_obj_name(request_bean.code, path)
  213. asnobj = self.asn1_decode(names[0], binascii.a2b_base64(request_bean.payload))
  214. logging.info('request asn1: %s' % asnobj)
  215. return asnobj
  216. def decode_request(self, mqtt_message):
  217. obj = json.loads(mqtt_message.payload.decode('utf-8'))
  218. req = RequestBean()
  219. req.load(obj)
  220. return req
  221. def encode_response_payload(self, name, bean):
  222. logging.info('response asn1: %s' % bean)
  223. s = self.asn1_encode(name, bean)
  224. return s
  225. def encode_response(self, response_bean):
  226. data = json.dumps(response_bean, ensure_ascii=True, default=self.serialize)
  227. return bytes(data, 'utf-8')
  228. class BaseClient:
  229. APPLICATION_OCTET_STREAM = 42
  230. TOPIC_PREFIX = 'olc'
  231. def __init__(self, trigger, mqclient, timeout, identity):
  232. self.parser = ParserTool()
  233. self.trigger = trigger
  234. self.mqclient = mqclient
  235. self.timeout = timeout
  236. self.identity = identity
  237. identity = binascii.hexlify(identity.encode(encoding='utf-8'))
  238. identity = identity.decode()
  239. self.topic_request = '/%s/%s/request' % (self.TOPIC_PREFIX, identity)
  240. self.topic_reqponse = '/%s/%s/response' % (self.TOPIC_PREFIX, identity)
  241. self.mqclient.subscribe(self.topic_request)
  242. def random_data(self, len):
  243. tmp = []
  244. for i in range(len):
  245. tmp.append(random.randint(0, 255))
  246. return bytes(tmp)
  247. def get_seconds_20000101(self, dt=None):
  248. if dt is None:
  249. dt = datetime.datetime.now()
  250. s = (dt - datetime.datetime(2000, 1, 1, 0, 0, 0)).total_seconds()
  251. return int(s)
  252. def send_resp(self, code, data=None, block2=None):
  253. resp = ResponseBean()
  254. resp.code = code
  255. if data is not None:
  256. resp.contentFormat = self.APPLICATION_OCTET_STREAM
  257. data = binascii.b2a_base64(data)
  258. data = str(data, 'utf-8')
  259. data = data.replace('\n', '').replace('\r', '')
  260. resp.payload = data
  261. if block2 is not None:
  262. resp.block2 = block2
  263. logging.info('send response: %s' % resp)
  264. blob = self.parser.encode_response(resp)
  265. self.mqclient.publish(self.topic_reqponse, blob)
  266. def send_resp_bean(self, code, name, bean):
  267. data = self.parser.encode_response_payload(name, bean)
  268. return self.send_resp(code, data)
  269. def send_resp_block2(self, code, data, m, szx, num):
  270. bob = BlockOptionBean()
  271. bob.m = m
  272. bob.szx = szx
  273. bob.num = num
  274. return self.send_resp(code, data, bob)
  275. def send_resp_sync(self, resource, sr, seconds=None):
  276. if seconds is None:
  277. seconds = self.get_seconds_20000101()
  278. resp = {
  279. 'resourcePaths': resource, 'revision': sr, 'currentUtcTime': {
  280. 'secondsSince20000101': seconds}}
  281. self.send_resp_bean(code=ResponseCode.CONTENT, name='SyncResponse2', bean=resp)
  282. def receive_req(self, timeout=None):
  283. if timeout is None:
  284. timeout = self.timeout
  285. msg = self.mqclient.receive_msg(timeout)
  286. req = self.parser.decode_request(msg)
  287. logging.info('receive request: %s' % req)
  288. return req
  289. def do_resource_sync(self, resource, sr=None):
  290. self.trigger.trigger()
  291. while True:
  292. req = self.receive_req()
  293. if req.url.endswith('/s/2') and req.code == 'POST':
  294. if sr is None:
  295. asnobj = self.parser.decode_request_payload(req)
  296. sr = int(asnobj['revision']) + 1
  297. self.send_resp_sync(resource, sr)
  298. return sr
  299. elif req.url.endswith('/s/2') and req.code == 'GET' and sr is not None:
  300. self.send_resp_sync(resource, sr)
  301. return sr
  302. def do_empty_sync(self, sr=None):
  303. return self.do_resource_sync(resource=[], sr=sr)
  304. def test_case(self):
  305. pass
  306. class PostSyncClient(BaseClient):
  307. def __init__(self, trigger, mqclient, timeout, identity, res_paths, res_responses):
  308. super().__init__(trigger, mqclient, timeout, identity)
  309. self.res_paths = res_paths
  310. self.res_responses = res_responses
  311. self.res_count = None
  312. def test_case(self):
  313. time.sleep(2)
  314. sr = 0
  315. state = 100
  316. self.res_count = 0
  317. while True:
  318. if state == 100:
  319. self.trigger.trigger()
  320. state = 200
  321. elif state == 200:
  322. req = self.receive_req()
  323. if req.url.endswith('/s/2') and req.code == 'POST':
  324. asnobj = self.parser.decode_request_payload(req)
  325. sr = int(asnobj['revision']) + 1
  326. self.send_resp_sync(self.res_paths, sr)
  327. state = 300
  328. elif state == 300:
  329. req = self.receive_req()
  330. if req.url.endswith('/s/2') and req.code == 'POST':
  331. self.parser.decode_request_payload(req)
  332. self.send_resp_sync([], sr)
  333. state = 400
  334. else:
  335. for p in self.res_responses:
  336. if req.url.endswith(p):
  337. code = self.res_responses[p][0]
  338. name = self.res_responses[p][1]
  339. resp = self.res_responses[p][2]
  340. if code is None:
  341. raise Exception("the response code shouldn't be none")
  342. if code in (ResponseCode.CONTENT, ResponseCode.CREATED, ResponseCode.CHANGED,
  343. ResponseCode.DELETED, ResponseCode.VALID, ResponseCode.CONTINUE):
  344. if isinstance(resp, dict):
  345. self.send_resp_bean(code=code, name=name, bean=resp)
  346. elif isinstance(resp, bytes) or isinstance(resp, bytearray):
  347. self.send_resp(code=code, data=resp)
  348. else:
  349. self.send_resp(code=code)
  350. self.res_count += 1
  351. else:
  352. logging.error('unknown resource request %s' % req.url)
  353. elif state == 400:
  354. # consume report log messages.
  355. flag = True
  356. try:
  357. req = self.receive_req()
  358. except:
  359. logging.debug('finished')
  360. flag = False
  361. state = 500
  362. if flag:
  363. self.parser.decode_request_payload(req)
  364. logging.info('receive report bean: %s' % req)
  365. self.send_resp(code=ResponseCode.CONTENT)
  366. elif state == 500:
  367. time.sleep(60)
  368. break
  369. class OtaClient(BaseClient):
  370. def __init__(self, trigger, mqclient, timeout, identity, filename):
  371. super().__init__(trigger, mqclient, timeout, identity)
  372. self.bin = None
  373. self.rr = binascii.hexlify(self.random_data(20))
  374. def read_file(self, filename):
  375. with open(filename, 'rb') as f:
  376. self.bin = f.read()
  377. f.close()
  378. def handle_blocks(self, req):
  379. if req.url.endswith('/p/fu/%s/0' % self.rr) and req.code == 'GET':
  380. if req.block2 is not None:
  381. return True
  382. else:
  383. logging.error('block2 is not found')
  384. self.send_resp(ResponseCode.REQUEST_ENTITY_INCOMPLETE)
  385. return False
  386. def handle_block0(self, req):
  387. if self.handle_blocks(req):
  388. if req.block2.num == 0:
  389. sz = 1 << (req.block2.szx + 4)
  390. data = self.bin[:sz]
  391. pass
  392. return False
  393. def test_case(self):
  394. time.sleep(2)
  395. state = 100
  396. sr = None
  397. while True:
  398. if state == 100:
  399. sr = self.do_resource_sync(['/p/fu/%s' % self.rr])
  400. state = 300
  401. elif state == 300:
  402. req = self.receive_req()
  403. if req.url.endswith('/p/fu/%s/0' % self.rr) and req.code == 'GET':
  404. if req.block2 is None:
  405. logging.error('block2 is not found')
  406. self.send_resp(ResponseCode.REQUEST_ENTITY_INCOMPLETE)
  407. else:
  408. sz = 1 << (req.block2.szx + 4)
  409. a = sz * req.block2.num
  410. b = a + sz
  411. last_num = (len(self.bin) + sz - 1) / sz
  412. if a >= len(self.bin):
  413. logging.error('the data range [%d:%d] is out of file length. ' % (a, b))
  414. self.send_resp(ResponseCode.REQUEST_ENTITY_TOO_LARGE)
  415. else:
  416. if b >= self.bin[a:b]:
  417. data = self.bin[a:]
  418. else:
  419. data = self.bin[a:b]
  420. self.send_resp_block2(ResponseCode.CONTENT, data=data, m=req.block2.num < last_num, szx=req.block2.szx,
  421. num=req.block2.num)
  422. elif req.url.endswith('/s/2') and req.code == 'POST':
  423. self.parser.decode_request_payload(req)
  424. self.send_resp_sync([], sr)
  425. elif req.url.endswith('/s/2') and req.code == 'GET':
  426. self.send_resp_sync(['/p/fu/%s' % self.rr], sr)
  427. elif req.url.endswith('/p/fs/0') and req.code == 'POST':
  428. self.parser.decode_request_payload(req)
  429. break
  430. class UnhappySyncClient(BaseClient):
  431. def __init__(self, trigger, mqclient, timeout, identity, error_code):
  432. super().__init__(trigger, mqclient, timeout, identity)
  433. self.error_code = error_code
  434. def test_case(self):
  435. time.sleep(2)
  436. self.trigger.trigger()
  437. while True:
  438. req = self.receive_req()
  439. if req.url.endswith('/s/2') and req.code == 'POST':
  440. self.parser.decode_request_payload(req)
  441. self.send_resp(code=self.error_code)
  442. time.sleep(60)
  443. break
  444. class NilSyncClient(BaseClient):
  445. def __init__(self, trigger, mqclient, timeout, identity):
  446. super().__init__(trigger, mqclient, timeout, identity)
  447. def test_case(self):
  448. time.sleep(2)
  449. self.trigger.trigger()
  450. while True:
  451. req = self.receive_req()
  452. if req.url.endswith('/s/2') and req.code == 'POST':
  453. self.parser.decode_request_payload(req)
  454. self.send_resp(code=ResponseCode.CONTENT)
  455. time.sleep(60)
  456. break
  457. class WrongSyncClient(BaseClient):
  458. def __init__(self, trigger, mqclient, timeout, identity):
  459. super().__init__(trigger, mqclient, timeout, identity)
  460. def test_case(self):
  461. time.sleep(2)
  462. self.trigger.trigger()
  463. while True:
  464. req = self.receive_req()
  465. if req.url.endswith('/s/2') and req.code == 'POST':
  466. self.parser.decode_request_payload(req)
  467. self.send_resp_bean(code=ResponseCode.CONTENT, name='Override', bean={
  468. 'nominalLightLevel': 10000, 'enabled': 1})
  469. time.sleep(60)
  470. break
  471. class MessySyncClient(BaseClient):
  472. def __init__(self, trigger, mqclient, timeout, identity):
  473. super().__init__(trigger, mqclient, timeout, identity)
  474. def test_case(self):
  475. time.sleep(2)
  476. self.trigger.trigger()
  477. while True:
  478. req = self.receive_req()
  479. if req.url.endswith('/s/2') and req.code == 'POST':
  480. self.parser.decode_request_payload(req)
  481. data = self.random_data(20)
  482. self.send_resp(code=ResponseCode.CONTENT, data=data)
  483. time.sleep(60)
  484. break
  485. class ExtremitySyncClient(BaseClient):
  486. def __init__(self, trigger, mqclient, timeout, identity, bean):
  487. super().__init__(trigger, mqclient, timeout, identity)
  488. self.bean = bean
  489. self.res_cnt = None
  490. self.res_abn_cnt = None
  491. def test_case(self):
  492. time.sleep(2)
  493. state = 100
  494. self.res_cnt = 0
  495. self.res_abn_cnt = 0
  496. while True:
  497. if state == 100:
  498. self.trigger.trigger()
  499. state = 200
  500. elif state == 200:
  501. req = self.receive_req()
  502. if req.url.endswith('/s/2') and req.code == 'POST':
  503. asnobj = self.parser.decode_request_payload(req)
  504. if self.bean['revision'] < 0:
  505. self.bean['revision'] = int(asnobj['revision']) + 1
  506. self.send_resp_bean(code=ResponseCode.CONTENT, name='SyncResponse2', bean=self.bean)
  507. if len(self.bean['resourcePaths']) == 0:
  508. state = 300
  509. else:
  510. state = 250
  511. elif state == 250:
  512. req = self.receive_req()
  513. if req.url.endswith('/s/2') and req.code == 'POST':
  514. asnobj = self.parser.decode_request_payload(req)
  515. sr = int(asnobj['revision'])
  516. self.send_resp_sync([], sr)
  517. state = 300
  518. else:
  519. flag = False
  520. for r in self.bean['resourcePaths']:
  521. if r in req.url:
  522. self.send_resp(ResponseCode.NOT_FOUND)
  523. self.res_cnt += 1
  524. flag = True
  525. if not flag:
  526. logging.error('unkown resource path %s' % req.url)
  527. self.res_abn_cnt += 1
  528. elif state == 300:
  529. time.sleep(60)
  530. break
  531. class CaseSuites:
  532. def __init__(self, mqclient, trigger, timeout, identity):
  533. self.mqclient = mqclient
  534. self.trigger = trigger
  535. self.timeout = timeout
  536. self.identity = identity
  537. self.basis = BaseClient(trigger=self.trigger,
  538. mqclient=self.mqclient,
  539. timeout=self.timeout,
  540. identity=self.identity)
  541. self.ov_client = PostSyncClient(trigger=self.trigger,
  542. mqclient=self.mqclient,
  543. timeout=self.timeout,
  544. identity=self.identity,
  545. res_paths=[],
  546. res_responses={})
  547. self.ov_revision = 1
  548. self.ota_client = OtaClient(trigger=self.trigger,
  549. mqclient=self.mqclient,
  550. timeout=self.timeout,
  551. identity=self.identity,
  552. filename='aaa.bin')
  553. self.unhappysync = UnhappySyncClient(trigger=self.trigger,
  554. mqclient=self.mqclient,
  555. timeout=self.timeout,
  556. identity=self.identity,
  557. error_code=None)
  558. self.emptysync = NilSyncClient(trigger=self.trigger,
  559. mqclient=self.mqclient,
  560. timeout=self.timeout,
  561. identity=self.identity)
  562. self.wrongsync = WrongSyncClient(trigger=self.trigger,
  563. mqclient=self.mqclient,
  564. timeout=self.timeout,
  565. identity=self.identity)
  566. self.messysync = MessySyncClient(trigger=self.trigger,
  567. mqclient=self.mqclient,
  568. timeout=self.timeout,
  569. identity=self.identity)
  570. self.extremitysync = ExtremitySyncClient(trigger=self.trigger,
  571. mqclient=self.mqclient,
  572. timeout=self.timeout,
  573. identity=self.identity,
  574. bean=None)
  575. def base_func(self):
  576. return self.basis
  577. def override(self, dim):
  578. self.ov_client.res_paths = ['/l/ov/%d' % self.ov_revision]
  579. self.ov_client.res_responses = {
  580. '/l/ov/%d/0' % self.ov_revision: (ResponseCode.CONTENT, 'Override', {
  581. 'nominalLightLevel': dim, 'enabled': 1})}
  582. self.ov_revision += 1
  583. return self.ov_client
  584. def remove_override(self):
  585. self.ov_client.res_paths = ['/l/ov/0']
  586. self.ov_client.res_responses = {}
  587. self.ov_revision += 1
  588. return self.ov_client
  589. def duplicate_override(self):
  590. self.ov_client.res_paths = ['/l/ov/%d' % self.ov_revision]
  591. self.ov_client.res_responses = {
  592. '/l/ov/%d/0' % self.ov_revision: (ResponseCode.CONTENT, 'Override', {
  593. 'nominalLightLevel': 100, 'enabled': 1})}
  594. return self.ov_client
  595. def error_override(self, code):
  596. self.ov_client.res_paths = ['/l/ov/%d' % self.ov_revision]
  597. self.ov_client.res_responses = {
  598. '/l/ov/%d/0' % self.ov_revision: (code, None, None)}
  599. return self.ov_client
  600. def ota(self):
  601. return OtaClient(trigger=self.trigger,
  602. mqclient=self.mqclient,
  603. timeout=self.timeout,
  604. identity=self.identity)
  605. def unhappy_sync(self, error_code):
  606. self.unhappysync.error_code = error_code
  607. return self.unhappysync
  608. def empty_sync(self):
  609. return self.emptysync
  610. def wrong_sync(self):
  611. return self.wrongsync
  612. def messy_sync(self):
  613. return self.messysync
  614. def extremity_sync(self, bean):
  615. self.extremitysync.bean = bean
  616. return self.extremitysync

转载于:https://www.cnblogs.com/mftang2018/p/10919411.html

发表评论

表情:
评论列表 (有 0 条评论,438人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Base on QC Automation Framework v1.0

    从事自动化测试及自动化测试框架开发已经好多年了,大大小小的自动化框架也开发了不少,由于比较懒,很少在网上Share,今天突然有开个自己BLOG和大家讨论一下自动化测试相关的知识