olctest base on pytest driver
MyDemo for pytest driver
#!/usr/bin/env python3
import json
import binascii
import asn1tools
import paho.mqtt.client as mqtt
import requests
import logging
import serial
import threading
import time
import datetime
import random
from enum import Enum
from queue import Queue
class ResponseCode(Enum):
CREATED = 2 << 5 | 1
DELETED = 2 << 5 | 2
VALID = 2 << 5 | 3
CHANGED = 2 << 5 | 4
CONTENT = 2 << 5 | 5
CONTINUE = 2 << 5 | 31
BAD_REQUEST = 4 << 5 | 0
UNAUTHORIZED = 4 << 5 | 1
BAD_OPTION = 4 << 5 | 2
FORBIDDEN = 4 << 5 | 3
NOT_FOUND = 4 << 5 | 4
METHOD_NOT_ALLOWED = 4 << 5 | 5
NOT_ACCEPTABLE = 4 << 5 | 6
REQUEST_ENTITY_INCOMPLETE = 4 << 5 | 8
CONFLICT = 4 << 5 | 9
PRECONDITION_FAILED = 4 << 5 | 12
REQUEST_ENTITY_TOO_LARGE = 4 << 5 | 13
UNSUPPORTED_CONTENT_FORMAT = 4 << 5 | 15
UNPROCESSABLE_ENTITY = 4 << 5 | 22
INTERNAL_SERVER_ERROR = 5 << 5 | 0
NOT_IMPLEMENTED = 5 << 5 | 1
BAD_GATEWAY = 5 << 5 | 2
SERVICE_UNAVAILABLE = 5 << 5 | 3
GATEWAY_TIMEOUT = 5 << 5 | 4
PROXY_NOT_SUPPORTED = 5 << 5 | 5
class CoapMessage:
def load(self, dict_obj):
for p in self.__dict__:
if p == 'block2' and dict_obj[p] is not None:
bob = BlockOptionBean()
bob.load(dict_obj[p])
self.__dict__[p] = bob
else:
self.__dict__[p] = dict_obj[p]
def __str__(self):
attrs = ", ".join("{}={}".format(k, getattr(self, k)) for k in self.__dict__.keys())
return "[{}: {}]".format(self.__class__.__name__, attrs)
class RequestBean(CoapMessage):
def __init__(self):
self.type = None
self.code = None
self.url = None
self.contentFormat = None
self.payload = None
self.block2 = None
class ResponseBean(CoapMessage):
def __init__(self):
self.code = None
self.contentFormat = None
self.payload = None
self.block2 = None
class BlockOptionBean(CoapMessage):
def __init__(self):
self.szx = None
self.m = None
self.num = None
class TriggerTool:
def trigger(self):
pass
class UartTriggerTool:
def __init__(self, uart):
self.uart = uart
def trigger(self):
logging.debug('uart trigger sync')
self.uart.send('')
time.sleep(1)
self.uart.send('sync')
class HttpTriggerTool:
def trigger(self):
logging.debug('trigger sync')
url = 'http://127.0.0.1:18000/api/v1/simulator/sms'
requests.post(url, 'test sms tirggered.')
class ManualTriggerTool:
def __init__(self, timeout):
self.timeout = timeout
def trigger(self):
logging.info('please trigger the sync within %d seconds.' % self.timeout)
time.sleep(self.timeout + 2)
class SerialTool:
encoding = 'ascii'
error_policy = 'ignore'
def __init__(self, port, baud, logfile):
self.port = port
self.baud = baud
self.logfile = logfile
self.terminated = None
self.serial = None
self._thread = None
def loop(self):
self.serial = serial.Serial(self.port, self.baud)
logging.info('serial open status is %s' % self.serial.is_open)
f = open(self.logfile, 'a')
while not self.terminated:
try:
text = self.serial.readline()
text = str(text, encoding=self.encoding, errors=self.error_policy)
text = text.replace("\r", '')
text = text.replace("\n", '')
text = '%s -- %s\n' % (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S %f'), text)
f.write(text)
f.flush()
except BaseException as e:
logging.error('serial thread: %s' % e)
f.close()
self.serial.close()
self.serial = None
def loop_start(self):
if self._thread is None:
self.terminated = False
self._thread = threading.Thread(target=self.loop)
self._thread.daemon = True
self._thread.start()
def loop_stop(self):
if self._thread is not None:
self.terminated = True
self.serial.cancel_read()
if threading.current_thread() != self._thread:
self._thread.join()
self._thread = None
def send(self, text):
if self.serial is not None and self.serial.is_open:
text = text + "\r"
text = text.encode(encoding=self.encoding, errors=self.error_policy)
self.serial.write(text)
else:
logging.error('serial port is not opened.')
class MqttTool:
def __init__(self, host, port, qos, timeout):
self.host = host
self.port = port
self.qos = qos
self.queue = Queue()
self.mqtt_client = None
self.timeout = timeout
def on_message(self, client, userdata, msg):
logging.debug('get a message: %s' % msg)
self.queue.put(msg)
def subscribe(self, topic):
self.mqtt_client.subscribe(topic, self.qos)
logging.debug('subscribe to %s' % topic)
def unsubscribe(self, topic):
self.mqtt_client.unsubscribe(topic)
logging.debug('unsubscribe %s' % topic)
def receive_msg(self, timeout=None):
logging.debug('waiting for message.')
if timeout is None:
timeout = self.timeout
return self.queue.get(timeout=timeout)
def publish(self, topic, blob):
self.mqtt_client.publish(topic, blob)
def loop_start(self):
if self.mqtt_client is None:
self.mqtt_client = mqtt.Client()
self.mqtt_client.on_message = self.on_message
self.mqtt_client.connect(self.host, self.port, self.timeout)
self.mqtt_client.loop_start()
def loop_stop(self):
if self.mqtt_client is not None:
self.mqtt_client.loop_stop()
self.mqtt_client.disconnect()
self.mqtt_client = None
class ParserTool:
scheme_file = 'ctc-message-schema.asn1',
codec = 'uper'
def __init__(self):
self.asn1compiler = asn1tools.compile_files(self.scheme_file, self.codec)
self.obj_names = {
'POST /s/2': ('SyncRequest', 'SyncResponse2'),
'GET /s/2': ('', 'SyncResponse2')}
def serialize(self, obj):
if isinstance(obj, ResponseCode):
return obj.name
else:
return obj.__dict__
def asn1_encode(self, name, bean):
return self.asn1compiler.encode(name, bean, check_constraints=True)
def asn1_decode(self, name, blob):
return self.asn1compiler.decode(name, blob, check_constraints=True)
def get_resource_path(self, request_bean):
url = request_bean.url
if url.startswith('coaps'):
url = url[8:]
i = url.index('/')
return url[i:]
def get_obj_path(self, resource_path):
tmp = resource_path.split('/')
if len(tmp) == 4:
return '/%s/%s/_/%s' % (tmp[1], tmp[2], tmp[3])
return resource_path
def get_obj_name(self, code, path):
key = '%s %s' % (code, path)
return self.obj_names[key]
def decode_request_payload(self, request_bean):
if request_bean.payload is not None and len(request_bean.payload) > 0:
path = self.get_resource_path(request_bean)
path = self.get_obj_path(path)
names = self.get_obj_name(request_bean.code, path)
asnobj = self.asn1_decode(names[0], binascii.a2b_base64(request_bean.payload))
logging.info('request asn1: %s' % asnobj)
return asnobj
def decode_request(self, mqtt_message):
obj = json.loads(mqtt_message.payload.decode('utf-8'))
req = RequestBean()
req.load(obj)
return req
def encode_response_payload(self, name, bean):
logging.info('response asn1: %s' % bean)
s = self.asn1_encode(name, bean)
return s
def encode_response(self, response_bean):
data = json.dumps(response_bean, ensure_ascii=True, default=self.serialize)
return bytes(data, 'utf-8')
class BaseClient:
APPLICATION_OCTET_STREAM = 42
TOPIC_PREFIX = 'olc'
def __init__(self, trigger, mqclient, timeout, identity):
self.parser = ParserTool()
self.trigger = trigger
self.mqclient = mqclient
self.timeout = timeout
self.identity = identity
identity = binascii.hexlify(identity.encode(encoding='utf-8'))
identity = identity.decode()
self.topic_request = '/%s/%s/request' % (self.TOPIC_PREFIX, identity)
self.topic_reqponse = '/%s/%s/response' % (self.TOPIC_PREFIX, identity)
self.mqclient.subscribe(self.topic_request)
def random_data(self, len):
tmp = []
for i in range(len):
tmp.append(random.randint(0, 255))
return bytes(tmp)
def get_seconds_20000101(self, dt=None):
if dt is None:
dt = datetime.datetime.now()
s = (dt - datetime.datetime(2000, 1, 1, 0, 0, 0)).total_seconds()
return int(s)
def send_resp(self, code, data=None, block2=None):
resp = ResponseBean()
resp.code = code
if data is not None:
resp.contentFormat = self.APPLICATION_OCTET_STREAM
data = binascii.b2a_base64(data)
data = str(data, 'utf-8')
data = data.replace('\n', '').replace('\r', '')
resp.payload = data
if block2 is not None:
resp.block2 = block2
logging.info('send response: %s' % resp)
blob = self.parser.encode_response(resp)
self.mqclient.publish(self.topic_reqponse, blob)
def send_resp_bean(self, code, name, bean):
data = self.parser.encode_response_payload(name, bean)
return self.send_resp(code, data)
def send_resp_block2(self, code, data, m, szx, num):
bob = BlockOptionBean()
bob.m = m
bob.szx = szx
bob.num = num
return self.send_resp(code, data, bob)
def send_resp_sync(self, resource, sr, seconds=None):
if seconds is None:
seconds = self.get_seconds_20000101()
resp = {
'resourcePaths': resource, 'revision': sr, 'currentUtcTime': {
'secondsSince20000101': seconds}}
self.send_resp_bean(code=ResponseCode.CONTENT, name='SyncResponse2', bean=resp)
def receive_req(self, timeout=None):
if timeout is None:
timeout = self.timeout
msg = self.mqclient.receive_msg(timeout)
req = self.parser.decode_request(msg)
logging.info('receive request: %s' % req)
return req
def do_resource_sync(self, resource, sr=None):
self.trigger.trigger()
while True:
req = self.receive_req()
if req.url.endswith('/s/2') and req.code == 'POST':
if sr is None:
asnobj = self.parser.decode_request_payload(req)
sr = int(asnobj['revision']) + 1
self.send_resp_sync(resource, sr)
return sr
elif req.url.endswith('/s/2') and req.code == 'GET' and sr is not None:
self.send_resp_sync(resource, sr)
return sr
def do_empty_sync(self, sr=None):
return self.do_resource_sync(resource=[], sr=sr)
def test_case(self):
pass
class PostSyncClient(BaseClient):
def __init__(self, trigger, mqclient, timeout, identity, res_paths, res_responses):
super().__init__(trigger, mqclient, timeout, identity)
self.res_paths = res_paths
self.res_responses = res_responses
self.res_count = None
def test_case(self):
time.sleep(2)
sr = 0
state = 100
self.res_count = 0
while True:
if state == 100:
self.trigger.trigger()
state = 200
elif state == 200:
req = self.receive_req()
if req.url.endswith('/s/2') and req.code == 'POST':
asnobj = self.parser.decode_request_payload(req)
sr = int(asnobj['revision']) + 1
self.send_resp_sync(self.res_paths, sr)
state = 300
elif state == 300:
req = self.receive_req()
if req.url.endswith('/s/2') and req.code == 'POST':
self.parser.decode_request_payload(req)
self.send_resp_sync([], sr)
state = 400
else:
for p in self.res_responses:
if req.url.endswith(p):
code = self.res_responses[p][0]
name = self.res_responses[p][1]
resp = self.res_responses[p][2]
if code is None:
raise Exception("the response code shouldn't be none")
if code in (ResponseCode.CONTENT, ResponseCode.CREATED, ResponseCode.CHANGED,
ResponseCode.DELETED, ResponseCode.VALID, ResponseCode.CONTINUE):
if isinstance(resp, dict):
self.send_resp_bean(code=code, name=name, bean=resp)
elif isinstance(resp, bytes) or isinstance(resp, bytearray):
self.send_resp(code=code, data=resp)
else:
self.send_resp(code=code)
self.res_count += 1
else:
logging.error('unknown resource request %s' % req.url)
elif state == 400:
# consume report log messages.
flag = True
try:
req = self.receive_req()
except:
logging.debug('finished')
flag = False
state = 500
if flag:
self.parser.decode_request_payload(req)
logging.info('receive report bean: %s' % req)
self.send_resp(code=ResponseCode.CONTENT)
elif state == 500:
time.sleep(60)
break
class OtaClient(BaseClient):
def __init__(self, trigger, mqclient, timeout, identity, filename):
super().__init__(trigger, mqclient, timeout, identity)
self.bin = None
self.rr = binascii.hexlify(self.random_data(20))
def read_file(self, filename):
with open(filename, 'rb') as f:
self.bin = f.read()
f.close()
def handle_blocks(self, req):
if req.url.endswith('/p/fu/%s/0' % self.rr) and req.code == 'GET':
if req.block2 is not None:
return True
else:
logging.error('block2 is not found')
self.send_resp(ResponseCode.REQUEST_ENTITY_INCOMPLETE)
return False
def handle_block0(self, req):
if self.handle_blocks(req):
if req.block2.num == 0:
sz = 1 << (req.block2.szx + 4)
data = self.bin[:sz]
pass
return False
def test_case(self):
time.sleep(2)
state = 100
sr = None
while True:
if state == 100:
sr = self.do_resource_sync(['/p/fu/%s' % self.rr])
state = 300
elif state == 300:
req = self.receive_req()
if req.url.endswith('/p/fu/%s/0' % self.rr) and req.code == 'GET':
if req.block2 is None:
logging.error('block2 is not found')
self.send_resp(ResponseCode.REQUEST_ENTITY_INCOMPLETE)
else:
sz = 1 << (req.block2.szx + 4)
a = sz * req.block2.num
b = a + sz
last_num = (len(self.bin) + sz - 1) / sz
if a >= len(self.bin):
logging.error('the data range [%d:%d] is out of file length. ' % (a, b))
self.send_resp(ResponseCode.REQUEST_ENTITY_TOO_LARGE)
else:
if b >= self.bin[a:b]:
data = self.bin[a:]
else:
data = self.bin[a:b]
self.send_resp_block2(ResponseCode.CONTENT, data=data, m=req.block2.num < last_num, szx=req.block2.szx,
num=req.block2.num)
elif req.url.endswith('/s/2') and req.code == 'POST':
self.parser.decode_request_payload(req)
self.send_resp_sync([], sr)
elif req.url.endswith('/s/2') and req.code == 'GET':
self.send_resp_sync(['/p/fu/%s' % self.rr], sr)
elif req.url.endswith('/p/fs/0') and req.code == 'POST':
self.parser.decode_request_payload(req)
break
class UnhappySyncClient(BaseClient):
def __init__(self, trigger, mqclient, timeout, identity, error_code):
super().__init__(trigger, mqclient, timeout, identity)
self.error_code = error_code
def test_case(self):
time.sleep(2)
self.trigger.trigger()
while True:
req = self.receive_req()
if req.url.endswith('/s/2') and req.code == 'POST':
self.parser.decode_request_payload(req)
self.send_resp(code=self.error_code)
time.sleep(60)
break
class NilSyncClient(BaseClient):
def __init__(self, trigger, mqclient, timeout, identity):
super().__init__(trigger, mqclient, timeout, identity)
def test_case(self):
time.sleep(2)
self.trigger.trigger()
while True:
req = self.receive_req()
if req.url.endswith('/s/2') and req.code == 'POST':
self.parser.decode_request_payload(req)
self.send_resp(code=ResponseCode.CONTENT)
time.sleep(60)
break
class WrongSyncClient(BaseClient):
def __init__(self, trigger, mqclient, timeout, identity):
super().__init__(trigger, mqclient, timeout, identity)
def test_case(self):
time.sleep(2)
self.trigger.trigger()
while True:
req = self.receive_req()
if req.url.endswith('/s/2') and req.code == 'POST':
self.parser.decode_request_payload(req)
self.send_resp_bean(code=ResponseCode.CONTENT, name='Override', bean={
'nominalLightLevel': 10000, 'enabled': 1})
time.sleep(60)
break
class MessySyncClient(BaseClient):
def __init__(self, trigger, mqclient, timeout, identity):
super().__init__(trigger, mqclient, timeout, identity)
def test_case(self):
time.sleep(2)
self.trigger.trigger()
while True:
req = self.receive_req()
if req.url.endswith('/s/2') and req.code == 'POST':
self.parser.decode_request_payload(req)
data = self.random_data(20)
self.send_resp(code=ResponseCode.CONTENT, data=data)
time.sleep(60)
break
class ExtremitySyncClient(BaseClient):
def __init__(self, trigger, mqclient, timeout, identity, bean):
super().__init__(trigger, mqclient, timeout, identity)
self.bean = bean
self.res_cnt = None
self.res_abn_cnt = None
def test_case(self):
time.sleep(2)
state = 100
self.res_cnt = 0
self.res_abn_cnt = 0
while True:
if state == 100:
self.trigger.trigger()
state = 200
elif state == 200:
req = self.receive_req()
if req.url.endswith('/s/2') and req.code == 'POST':
asnobj = self.parser.decode_request_payload(req)
if self.bean['revision'] < 0:
self.bean['revision'] = int(asnobj['revision']) + 1
self.send_resp_bean(code=ResponseCode.CONTENT, name='SyncResponse2', bean=self.bean)
if len(self.bean['resourcePaths']) == 0:
state = 300
else:
state = 250
elif state == 250:
req = self.receive_req()
if req.url.endswith('/s/2') and req.code == 'POST':
asnobj = self.parser.decode_request_payload(req)
sr = int(asnobj['revision'])
self.send_resp_sync([], sr)
state = 300
else:
flag = False
for r in self.bean['resourcePaths']:
if r in req.url:
self.send_resp(ResponseCode.NOT_FOUND)
self.res_cnt += 1
flag = True
if not flag:
logging.error('unkown resource path %s' % req.url)
self.res_abn_cnt += 1
elif state == 300:
time.sleep(60)
break
class CaseSuites:
def __init__(self, mqclient, trigger, timeout, identity):
self.mqclient = mqclient
self.trigger = trigger
self.timeout = timeout
self.identity = identity
self.basis = BaseClient(trigger=self.trigger,
mqclient=self.mqclient,
timeout=self.timeout,
identity=self.identity)
self.ov_client = PostSyncClient(trigger=self.trigger,
mqclient=self.mqclient,
timeout=self.timeout,
identity=self.identity,
res_paths=[],
res_responses={})
self.ov_revision = 1
self.ota_client = OtaClient(trigger=self.trigger,
mqclient=self.mqclient,
timeout=self.timeout,
identity=self.identity,
filename='aaa.bin')
self.unhappysync = UnhappySyncClient(trigger=self.trigger,
mqclient=self.mqclient,
timeout=self.timeout,
identity=self.identity,
error_code=None)
self.emptysync = NilSyncClient(trigger=self.trigger,
mqclient=self.mqclient,
timeout=self.timeout,
identity=self.identity)
self.wrongsync = WrongSyncClient(trigger=self.trigger,
mqclient=self.mqclient,
timeout=self.timeout,
identity=self.identity)
self.messysync = MessySyncClient(trigger=self.trigger,
mqclient=self.mqclient,
timeout=self.timeout,
identity=self.identity)
self.extremitysync = ExtremitySyncClient(trigger=self.trigger,
mqclient=self.mqclient,
timeout=self.timeout,
identity=self.identity,
bean=None)
def base_func(self):
return self.basis
def override(self, dim):
self.ov_client.res_paths = ['/l/ov/%d' % self.ov_revision]
self.ov_client.res_responses = {
'/l/ov/%d/0' % self.ov_revision: (ResponseCode.CONTENT, 'Override', {
'nominalLightLevel': dim, 'enabled': 1})}
self.ov_revision += 1
return self.ov_client
def remove_override(self):
self.ov_client.res_paths = ['/l/ov/0']
self.ov_client.res_responses = {}
self.ov_revision += 1
return self.ov_client
def duplicate_override(self):
self.ov_client.res_paths = ['/l/ov/%d' % self.ov_revision]
self.ov_client.res_responses = {
'/l/ov/%d/0' % self.ov_revision: (ResponseCode.CONTENT, 'Override', {
'nominalLightLevel': 100, 'enabled': 1})}
return self.ov_client
def error_override(self, code):
self.ov_client.res_paths = ['/l/ov/%d' % self.ov_revision]
self.ov_client.res_responses = {
'/l/ov/%d/0' % self.ov_revision: (code, None, None)}
return self.ov_client
def ota(self):
return OtaClient(trigger=self.trigger,
mqclient=self.mqclient,
timeout=self.timeout,
identity=self.identity)
def unhappy_sync(self, error_code):
self.unhappysync.error_code = error_code
return self.unhappysync
def empty_sync(self):
return self.emptysync
def wrong_sync(self):
return self.wrongsync
def messy_sync(self):
return self.messysync
def extremity_sync(self, bean):
self.extremitysync.bean = bean
return self.extremitysync
转载于//www.cnblogs.com/mftang2018/p/10919411.html
还没有评论,来说两句吧...