全部产品
Search
文档中心

:Kode contoh lengkap untuk penguraian data pass-through atau kustom dalam Python

更新时间:Nov 09, 2025

Saat menulis kode untuk mengurai data kustom dari suatu produk, pastikan kode tersebut mencakup skrip untuk mengurai data topik kustom serta skrip untuk mengurai data Thing Specification Language (TSL). Topik ini menyediakan contoh skrip lengkap dalam Python.

# coding=UTF-8
import struct
import common_util

COMMAND_REPORT = 0x00 # Submit properties. 
COMMAND_SET = 0x01 # Configure properties. 
COMMAND_REPORT_REPLY = 0x02 # Return results of data submission. 
COMMAND_SET_REPLY = 0x03 # Return results of property setting. 
COMMAD_UNKOWN = 0xff # Unknown commands. 
ALINK_PROP_REPORT_METHOD = 'thing.event.property.post' # The topic for devices to submit property data to IoT Platform. 
ALINK_PROP_SET_METHOD = 'thing.service.property.set' # The topic for IoT Platform to send a property setting command to devices. 
SELF_DEFINE_TOPIC_UPDATE_FLAG = '/user/update' # Define the following custom topic: /user/update. 
SELF_DEFINE_TOPIC_ERROR_FLAG = '/user/update/error' # Define the following custom topic: /user/update/error. 


# Contoh data:
# Mengirimkan data properti
# Input:
#    0x000000000100320100000000
# Output:
#    {"method":"thing.event.property.post","id":"1","params":{"prop_float":0,"prop_int16":50,"prop_bool":1},"version":"1.0"}
# Hasil balasan pengaturan properti
# Input:
#    0x0300223344c8
# Output:
#    {"code":"200","data":{},"id":"2241348","version":"1.0"}

def raw_data_to_protocol(bytes):
    uint8Array = []
    for byteValue in bytes:
        uint8Array.append(byteValue & 0xff)

    fHead = uint8Array[0]
    jsonMap = {}
    if fHead == COMMAND_REPORT:
        jsonMap['method'] = ALINK_PROP_REPORT_METHOD
        jsonMap['version'] = '1.0'
        jsonMap['id'] = str(bytes_to_int(uint8Array[1:5]))
        params = {}
        params['prop_int16'] = bytes_to_int(uint8Array[5:7])
        params['prop_bool'] = bytes_to_int(uint8Array[7: 8])
        params['prop_float'] = bytes_to_int(uint8Array[8:])
        jsonMap['params'] = params
    elif fHead == COMMAND_SET_REPLY:
        jsonMap['version'] = '1.0'
        jsonMap['id'] = str(bytes_to_int(uint8Array[1:5]))
        jsonMap['code'] = str(bytes_to_int(uint8Array[5:]))
        jsonMap['data'] = {}

    return jsonMap


# Contoh data:
# Mengirim perintah downstream untuk mengonfigurasi properti perangkat
# Input:
#    {"method":"thing.service.property.set","id":"12345","version":"1.0",
#    "params":{"prop_float":123.452, "prop_int16":333, "prop_bool":1}}
# Output:
#    0x0100003039014d0142f6e76d
# Hasil setelah perangkat mengirimkan data
# Input:
#    {"method":"thing.event.property.post","id":"12345","version":"1.0","code":200,"data":{}}
# Output:
#    0x0200003039c8
def protocol_to_raw_data(json):
    method = json.get('method', None)
    id = json.get('id', None)
    version = json.get('version', None)
    payload_array = []

    if method == ALINK_PROP_SET_METHOD:
        params = json.get('params')
        prop_float = params.get('prop_float', None)
        prop_int16 = params.get('prop_int16', None)
        prop_bool = params.get('prop_bool', None)

        payload_array = payload_array + int_8_to_byte(COMMAND_SET)
        payload_array = payload_array + int_32_to_byte(int(id))
        payload_array = payload_array + int_16_to_byte(prop_int16)
        payload_array = payload_array + int_8_to_byte(prop_bool)
        payload_array = payload_array + float_to_byte(prop_float)

    elif method == ALINK_PROP_REPORT_METHOD:
        code = json.get('code', None)
        payload_array = payload_array + int_8_to_byte(COMMAND_REPORT_REPLY)
        payload_array = payload_array + int_32_to_byte(int(id))
        payload_array = payload_array + int_8_to_byte(code)
    else:
        code = json.get('code')
        payload_array = payload_array + int_8_to_byte(COMMAD_UNKOWN)
        payload_array = payload_array + int_32_to_byte(int(id))
        payload_array = payload_array + int_8_to_byte(code)

    return payload_array



# Contoh data:
# Gunakan topik kustom berikut untuk mengirimkan data:
# /user/update. 
# Input:
#      topic:/{productKey}/{deviceName}/user/update
#      bytes: 0x000000000100320100000000
# Output:
#  {
#     "prop_float": 0,
#     "prop_int16": 50,
#     "prop_bool": 1,
#     "topic": "/{productKey}/{deviceName}/user/update"
#   }
def transform_payload(topic, bytes):
    uint8Array = []
    for byteValue in bytes:
        uint8Array.append(byteValue & 0xff)

    jsonMap = {}
    if SELF_DEFINE_TOPIC_ERROR_FLAG in topic:
        jsonMap['topic'] = topic
        jsonMap['errorCode'] = bytes_to_int(uint8Array[0:1])

    elif SELF_DEFINE_TOPIC_UPDATE_FLAG in topic:
        jsonMap['topic'] = topic
        jsonMap['prop_int16'] = bytes_to_int(uint8Array[5:7])
        jsonMap['prop_bool'] = bytes_to_int(uint8Array[7: 8])
        jsonMap['prop_float'] = bytes_to_int(uint8Array[8:])

    return jsonMap


# Mengonversi larik byte menjadi integer. 
def bytes_to_int(bytes):
    data = ['%02X' % i for i in bytes]
    return int(''.join(data), 16)



# Mengonversi integer 8-bit menjadi larik byte. 
def int_8_to_byte(value):
    t_value = '%02X' % value
    if len(t_value) % 2 != 0:
        t_value += '0'

    return hex_string_to_byte_array(t_value)


# Mengonversi integer 32-bit menjadi larik byte. 
def int_32_to_byte(value):
    t_value = '%08X' % value
    if len(t_value) % 2 != 0:
        t_value += '0'

    return hex_string_to_byte_array(t_value)


# Mengonversi integer 16-bit menjadi larik byte. 
def int_16_to_byte(value):
    t_value = '%04X' % value
    if len(t_value) % 2 != 0:
        t_value += '0'

    return hex_string_to_byte_array(t_value)


# Mengonversi float menjadi larik integer. 
def float_to_byte(param):
    return hex_string_to_byte_array(struct.pack(">f", param).encode('hex'))


# Mengonversi string heksadesimal menjadi larik byte. 
def hex_string_to_byte_array(str_value):
    if len(str_value) % 2 != 0:
        return None

    cycle = len(str_value) / 2

    pos = 0
    result = []
    for i in range(0, cycle, 1):
        temp_str_value = str_value[pos:pos + 2]
        temp_int_value = int(temp_str_value, base=16)

        result.append(temp_int_value)
        pos += 2
    return result