This article provides the script template and a sample script for parsing Thing Specification Language (TSL) data in Python.

Script template

You can write a script in Python based on the following script template to parse TSL data:

Note This template applies only to products whose Data Format parameter is set to Custom.
# Call the raw_data_to_protocol function to convert custom data of a device to Alink data when the device reports data to IoT Platform.
# rawData: an input parameter. The value is a list of integers that cannot be left empty.
# jsonObj: an output parameter. The value is a JSON object that indicates a dictionary and cannot be left empty.
def raw_data_to_protocol(rawData):
    jsonObj = {}
    return jsonObj

# Call the protocol_to_raw_data function to convert Alink data to data that can be recognized by a device when IoT Platform sends data to the device.
# jsonData: an input parameter. The value is a JSON object that indicates a dictionary and cannot be left empty.
# rawData: an output parameter. The value is a list of integers within a value range of [0,255] and cannot be left empty.
def protocol_to_raw_data(jsonData):
    rawData = []
    return rawData

Notes for script writing

  • Do not use global variables. Otherwise, results may be inconsistent.
  • The script is used to process data by using the two's complement operation. The complement range for values in the range of [-128,127] is [0,255]. For example, the complement of -1 is 255 in decimal.
  • The raw_data_to_protocol function is used to parse the data that is reported by devices. The input of the function is an integer array. Use 0xFF for the bitwise AND operation to obtain the complement.
  • The protocol_to_raw_data function parses the data that is sent by IoT Platform and returns the result in an array. Each element in the array must be an integer within a value range of [0,255].

Sample script

The following sample script is written based on the properties and protocol that are defined in the Example for parsing TSL data article:

# coding=UTF-8
import struct
import common_util

COMMAND_REPORT = 0x00 # Property data reported by devices.
COMMAND_SET = 0x01 # Property setting command data from the cloud.
COMMAND_REPORT_REPLY = 0x02 # Response data for devices reporting properties.
COMMAND_SET_REPLY = 0x03 # Response data for setting device properties.
Command_unkown = 0xff # Unknown commands.
ALINK_PROP_REPORT_METHOD = 'thing.event.property.post' # The topic for devices to upload property data to the cloud.
ALINK_PROP_SET_METHOD = 'thing.service.property.set' # The topic for IoT Platform to send a property control command to devices.
ALINK_PROP_SET_REPLY_METHOD = 'thing.service.property.set' # The topic for devices to report the property setting results to IoT Platform.
SELF_DEFINE_TOPIC_UPDATE_FLAG = '/user/update'  # Custom topic: /user/update.
SELF_DEFINE_TOPIC_ERROR_FLAG = '/user/update/error' # Custom topic: /user/update/error.


# Example data:
# The device reports data
# Input parameters:
#    0x000000000100320100000000
# Output result:
#    {"method":"thing.event.property.post","id":"1","params":{"prop_float":0,"prop_int16":50,"prop_bool":1},"version":"1.0"}
# The device responds after setting properties.
# Input parameters:
#    0x0300223344c8
# Output result:
#    {"code":"200","data":{},"id":"2241348","version":"1.0"}

def raw_data_to_protocol(bytes):
    uint8Array = []
    for byteValue in bytes:
        uint8Array.append(byteValue & 0xff)

    fHead = uint8Array[0]
    jsonMap = {}
    if fHead == COMMAND_REPORT:
        jsonMap['method'] = ALINK_PROP_REPORT_METHOD
        jsonMap['version'] = '1.0'
        jsonMap['id'] = str(bytes_to_int(uint8Array[1:5]))
        params = {}
        params['prop_int16'] = bytes_to_int(uint8Array[5:7])
        params['prop_bool'] = bytes_to_int(uint8Array[7: 8])
        params['prop_float'] = bytes_to_int(uint8Array[8:])
        jsonMap['params'] = params
    elif fHead == COMMAND_SET_REPLY:
        jsonMap['version'] = '1.0'
        jsonMap['id'] = str(bytes_to_int(uint8Array[1:5]))
        jsonMap['code'] = str(bytes_to_int(uint8Array[5:]))
        jsonMap['data'] = {}

    return jsonMap


# Example data:
# IoT Platform pushes a command for setting properties to the device.
# Input parameters:
#    {"method":"thing.service.property.set","id":"12345","version":"1.0",
#    "params":{"prop_float":123.452, "prop_int16":333, "prop_bool":1}}
# Output result:
#    0x0100003039014d0142f6e76d
# IoT Platform responds after the device reports properties.
# Input data:
#    {"method":"thing.event.property.post","id":"12345","version":"1.0","code":200,"data":{}}
# Output result:
#    0x0200003039c8
def protocol_to_raw_data(json):
    method = json.get('method', None)
    id = json.get('id', None)
    version = json.get('version', None)
    payload_array = []

    if method == ALINK_PROP_SET_METHOD:
        params = json.get('params')
        prop_float = params.get('prop_float', None)
        prop_int16 = params.get('prop_int16', None)
        prop_bool = params.get('prop_bool', None)

        payload_array = payload_array + int_8_to_byte(COMMAND_SET)
        payload_array = payload_array + int_32_to_byte(int(id))
        payload_array = payload_array + int_16_to_byte(prop_int16)
        payload_array = payload_array + int_8_to_byte(prop_bool)
        payload_array = payload_array + float_to_byte(prop_float)

    elif method == ALINK_PROP_REPORT_METHOD:
        code = json.get('code', None)
        payload_array = payload_array + int_8_to_byte(COMMAND_REPORT_REPLY)
        payload_array = payload_array + int_32_to_byte(int(id))
        payload_array = payload_array + int_8_to_byte(code)
    else:
        code = json.get('code')
        payload_array = payload_array + int_8_to_byte(COMMAD_UNKOWN)
        payload_array = payload_array + int_32_to_byte(int(id))
        payload_array = payload_array + int_8_to_byte(code)

    return payload_array



# Sample data
# Custom topic for reporting data :/user/update
# Input parameters: 
#   topic: /{productKey}/{deviceName}/user/update
#   bytes: 0x000000000100320100000000
# Output parameters:
# {
#     "prop_float": 0,
#     "prop_int16": 50,
#     "prop_bool": 1,
#     "topic": "/{productKey}/{deviceName}/user/update"
#   }
def transform_payload(topic, bytes):
    uint8Array = []
    for byteValue in bytes:
        uint8Array.append(byteValue & 0xff)

    jsonMap = {}
    if SELF_DEFINE_TOPIC_ERROR_FLAG in topic:
        jsonMap['topic'] = topic
        jsonMap['errorCode'] = bytes_to_int(uint8Array[0:1])

    elif SELF_DEFINE_TOPIC_UPDATE_FLAG in topic:
        jsonMap['topic'] = topic
        jsonMap['prop_int16'] = bytes_to_int(uint8Array[5:7])
        jsonMap['prop_bool'] = bytes_to_int(uint8Array[7: 8])
        jsonMap['prop_float'] = bytes_to_int(uint8Array[8:])

    return jsonMap


# byte to int.
def bytes_to_int(bytes):
    data = ['%02X' % i for i in bytes]
    return int(''.join(data), 16)


# Converts a byte array to a float without precision.
def bytes_to_float(bytes):
    data = []
    for i in bytes:
        t_value = '%02X' % i
        if t_value % 2 ! = 0:
            t_value += 0
        data.append(t_value)
    hex_str = ''.join(data)
    return struct.unpack('! f', hex_str.decode('hex'))[0]


# Converts an 8-bit integer to a byte array.
def int_8_to_byte(value):
    t_value = '%02X' % value
    if len(t_value) % 2 ! = 0:
        t_value += '0'

    return hex_string_to_byte_array(t_value)


# Converts a 32-bit integer to a byte array.
def int_32_to_byte(value):
    t_value = '%08X' % value
    if len(t_value) % 2 ! = 0:
        t_value += '0'

    return hex_string_to_byte_array(t_value)


# Converts a 16-bit integer to a byte array.
def int_16_to_byte(value):
    t_value = '%04X' % value
    if len(t_value) % 2 ! = 0:
        t_value += '0'

    return hex_string_to_byte_array(t_value)


# Converts a float into an integer array.
def float_to_byte(param):
    return hex_string_to_byte_array(struct.pack(">f", param).encode('hex'))


# Converts a hexadecimal string to a byte array.
def hex_string_to_byte_array(str_value):
    if len(str_value) % 2 ! = 0:
        return None

    cycle = len(str_value) / 2

    pos = 0
    result = []
    for i in range(0, cycle, 1):
        temp_str_value = str_value[pos:pos + 2]
        temp_int_value = int(temp_str_value, base=16)

        result.append(temp_int_value)
        pos += 2
    return result