All Products
Search
Document Center

IoT Platform:Python sample script

Last Updated:Jun 16, 2026

Use the following Python script template and sample script to parse messages sent to custom topics.

Script template

SELF_DEFINE_TOPIC_UPDATE_FLAG = '/user/update' # The custom topic /user/update. 
SELF_DEFINE_TOPIC_ERROR_FLAG = '/user/update/error' # The custom topic /user/update/error. 

# Call a function to convert the data of the messages in a custom topic to JSON data when the device submits data to IoT Platform. 
# topic: the input parameter that specifies the topic to which the device sends messages. The value is a string.    
#rawData: the input parameter. The value is a list of integers that cannot be empty. 
# jsonObj: the output parameter. The value is a JSON object that indicates a dictionary.     
def transform_payload(topic, rawData):
   jsonObj = {}
   return jsonObj

Sample script

Note The following sample script parses only messages sent to custom topics. If you set the Data Type parameter of the product to Custom, you must also write a script to parse Thing Specification Language (TSL) messages. For more information, see Submit a TSL model message parsing script.

For information about custom data formats, see Create a product.

# coding=UTF-8
SELF_DEFINE_TOPIC_UPDATE_FLAG = '/user/update' # The custom topic /user/update. 
SELF_DEFINE_TOPIC_ERROR_FLAG = '/user/update/error' # The custom topic /user/update/error. 

#  Sample data:
#  Custom topic: /user/update, to which data is sent. 
#  Input:
#     topic: /${productKey}/${deviceName}/user/update
#     bytes: 0x000000000100320100000000
#  Output:
#  {
#     "prop_float": 0,
#     "prop_int16": 50,
#     "prop_bool": 1,
#     "topic": "/${productKey}/${deviceName}/user/update"
#   }
def transform_payload(topic, bytes):
    uint8Array = []
    for byteValue in bytes:
        uint8Array.append(byteValue & 0xff)

    jsonMap = {}
    if SELF_DEFINE_TOPIC_ERROR_FLAG in topic:
        jsonMap['topic'] = topic
        jsonMap['errorCode'] = bytes_to_int(uint8Array[0:1])

    elif SELF_DEFINE_TOPIC_UPDATE_FLAG in topic:
        jsonMap['topic'] = topic
        jsonMap['prop_int16'] = bytes_to_int(uint8Array[5:7])
        jsonMap['prop_bool'] = bytes_to_int(uint8Array[7: 8])
        jsonMap['prop_float'] = bytes_to_int(uint8Array[8:])

    return jsonMap

# Convert a byte array into data of the INT type. 
def bytes_to_int(bytes):
    data = ['%02X' % i for i in bytes]
    return int(''.join(data), 16)