This article provides a script template and a sample script in Python to parse Thing Specification Language (TSL) data.

Script template

You can write a script in Python based on the following script template to parse TSL data:

Note This template applies only to products whose Data Format parameter is set to Custom.
#Convert custom data of a device to Alink data when the device submits data to IoT Platform. 
#rawData: the input parameter. The value is a list of integers that cannot be empty. 
#jsonObj: the output parameter. The value is a JSON object that indicates a dictionary and cannot be empty. 
def raw_data_to_protocol(rawData):
    jsonObj = {}
    return jsonObj

#Convert Alink data to data that can be recognized by a device when IoT Platform sends data to the device. 
#jsonData: the input parameter. The value is a JSON object that indicates a dictionary and cannot be empty. 
#rawData: the output parameter. The value is a list of integers within a value range of [0,255] and cannot be empty. 
def protocol_to_raw_data(jsonData):
    rawData = []
    return rawData

Notes for script writing

  • Do not use global variables. Otherwise, results may be inconsistent.
  • The script is used to process data by using the two's complement operation. The complement range for values in the range of [-128,127] is [0,255]. For example, the complement of -1 is 255 in decimal.
  • The raw_data_to_protocol function parses the data that is submitted by devices. The input parameter of the function is an integer array. Use 0xFF for the bitwise AND operation to obtain the complement.
  • The protocol_to_raw_data function parses the data that is sent by IoT Platform and returns the result in an array. Each element in the array must be an integer within a value range of [0,255].

Sample script

The following sample script is written based on the properties and protocol that are defined in the Example for parsing TSL data article:

For more information about the data types supported by TSL models, see Supported data types.

# coding=UTF-8
import struct
import common_util

COMMAND_REPORT = 0x00 # Property data reported by devices.
COMMAND_SET = 0x01 # Property setting command data from the cloud.
COMMAND_REPORT_REPLY = 0x02 # Response data for devices reporting properties.
COMMAND_SET_REPLY = 0x03 # Response data for setting device properties.
Command_unkown = 0xff # Unknown commands.
ALINK_PROP_REPORT_METHOD = 'thing.event.property.post' # The topic for devices to upload property data to the cloud.
ALINK_PROP_SET_METHOD = 'thing.service.property.set' # The topic for IoT Platform to send a property control command to devices.
SELF_DEFINE_TOPIC_UPDATE_FLAG = '/user/update'  # Custom topic: /user/update.
SELF_DEFINE_TOPIC_ERROR_FLAG = '/user/update/error' # Custom topic: /user/update/error.


# Example data:
# The device reports data
# Input parameters:
#    0x000000000100320100000000
# Output result:
#    {"method":"thing.event.property.post","id":"1","params":{"prop_float":0,"prop_int16":50,"prop_bool":1},"version":"1.0"}
# The device responds after setting properties.
# Input parameters:
#    0x0300223344c8
# Output result:
#    {"code":"200","data":{},"id":"2241348","version":"1.0"}

def raw_data_to_protocol(bytes):
    uint8Array = []
    for byteValue in bytes:
        uint8Array.append(byteValue & 0xff)

    fHead = uint8Array[0]
    jsonMap = {}
    if fHead == COMMAND_REPORT:
        jsonMap['method'] = ALINK_PROP_REPORT_METHOD
        jsonMap['version'] = '1.0'
        jsonMap['id'] = str(bytes_to_int(uint8Array[1:5]))
        params = {}
        params['prop_int16'] = bytes_to_int(uint8Array[5:7])
        params['prop_bool'] = bytes_to_int(uint8Array[7: 8])
        params['prop_float'] = bytes_to_int(uint8Array[8:])
        jsonMap['params'] = params
    elif fHead == COMMAND_SET_REPLY:
        jsonMap['version'] = '1.0'
        jsonMap['id'] = str(bytes_to_int(uint8Array[1:5]))
        jsonMap['code'] = str(bytes_to_int(uint8Array[5:]))
        jsonMap['data'] = {}

    return jsonMap


# Example data:
# IoT Platform pushes a command for setting properties to the device.
# Input parameters:
#    {"method":"thing.service.property.set","id":"12345","version":"1.0",
#    "params":{"prop_float":123.452, "prop_int16":333, "prop_bool":1}}
# Output result:
#    0x0100003039014d0142f6e76d
# IoT Platform responds after the device reports properties.
# Input data:
#    {"method":"thing.event.property.post","id":"12345","version":"1.0","code":200,"data":{}}
# Output result:
#    0x0200003039c8
def protocol_to_raw_data(json):
    method = json.get('method', None)
    id = json.get('id', None)
    version = json.get('version', None)
    payload_array = []

    if method == ALINK_PROP_SET_METHOD:
        params = json.get('params')
        prop_float = params.get('prop_float', None)
        prop_int16 = params.get('prop_int16', None)
        prop_bool = params.get('prop_bool', None)

        payload_array = payload_array + int_8_to_byte(COMMAND_SET)
        payload_array = payload_array + int_32_to_byte(int(id))
        payload_array = payload_array + int_16_to_byte(prop_int16)
        payload_array = payload_array + int_8_to_byte(prop_bool)
        payload_array = payload_array + float_to_byte(prop_float)

    elif method == ALINK_PROP_REPORT_METHOD:
        code = json.get('code', None)
        payload_array = payload_array + int_8_to_byte(COMMAND_REPORT_REPLY)
        payload_array = payload_array + int_32_to_byte(int(id))
        payload_array = payload_array + int_8_to_byte(code)
    else:
        code = json.get('code')
        payload_array = payload_array + int_8_to_byte(COMMAD_UNKOWN)
        payload_array = payload_array + int_32_to_byte(int(id))
        payload_array = payload_array + int_8_to_byte(code)

    return payload_array



# Sample data
# Custom topic for reporting data :/user/update
# Input parameters: 
#   topic: /{productKey}/{deviceName}/user/update
#   bytes: 0x000000000100320100000000
# Output parameters:
# {
#     "prop_float": 0,
#     "prop_int16": 50,
#     "prop_bool": 1,
#     "topic": "/{productKey}/{deviceName}/user/update"
#   }
def transform_payload(topic, bytes):
    uint8Array = []
    for byteValue in bytes:
        uint8Array.append(byteValue & 0xff)

    jsonMap = {}
    if SELF_DEFINE_TOPIC_ERROR_FLAG in topic:
        jsonMap['topic'] = topic
        jsonMap['errorCode'] = bytes_to_int(uint8Array[0:1])

    elif SELF_DEFINE_TOPIC_UPDATE_FLAG in topic:
        jsonMap['topic'] = topic
        jsonMap['prop_int16'] = bytes_to_int(uint8Array[5:7])
        jsonMap['prop_bool'] = bytes_to_int(uint8Array[7: 8])
        jsonMap['prop_float'] = bytes_to_int(uint8Array[8:])

    return jsonMap


# byte to int.
def bytes_to_int(bytes):
    data = ['%02X' % i for i in bytes]
    return int(''.join(data), 16)

# Converts a byte array to a float without precision.
def bytes_to_float(bytes):
    data = []
    for i in bytes:
        t_value = '%02X' % i
        if t_value % 2 ! = 0:
            t_value += 0
        data.append(t_value)
    hex_str = ''.join(data)
    return struct.unpack('! f', hex_str.decode('hex'))[0]

# Converts an 8-bit integer to a byte array.
def int_8_to_byte(value):
    t_value = '%02X' % value
    if len(t_value) % 2 ! = 0:
        t_value += '0'

    return hex_string_to_byte_array(t_value)


# Converts a 32-bit integer to a byte array.
def int_32_to_byte(value):
    t_value = '%08X' % value
    if len(t_value) % 2 ! = 0:
        t_value += '0'

    return hex_string_to_byte_array(t_value)


# Converts a 16-bit integer to a byte array.
def int_16_to_byte(value):
    t_value = '%04X' % value
    if len(t_value) % 2 ! = 0:
        t_value += '0'

    return hex_string_to_byte_array(t_value)


# Converts a float into an integer array.
def float_to_byte(param):
    return hex_string_to_byte_array(struct.pack(">f", param).encode('hex'))


# Converts a hexadecimal string to a byte array.
def hex_string_to_byte_array(str_value):
    if len(str_value) % 2 ! = 0:
        return None

    cycle = len(str_value) / 2

    pos = 0
    result = []
    for i in range(0, cycle, 1):
        temp_str_value = str_value[pos:pos + 2]
        temp_int_value = int(temp_str_value, base=16)

        result.append(temp_int_value)
        pos += 2
    return result