全部产品
Search
文档中心

IoT Platform:Contoh skrip Python

更新时间:Jul 02, 2025

Topik ini menyediakan template dan contoh skrip Python yang dapat digunakan untuk mengurai pesan Thing Specification Language (TSL).

Template skrip

Anda dapat menulis skrip Python berdasarkan template berikut untuk mengurai pesan TSL.

Catatan Template skrip hanya berlaku untuk produk dengan parameter Data Type diatur ke Custom.
# Mengonversi data kustom perangkat menjadi data Alink JSON ketika perangkat mengirimkan data ke IoT Platform. 
# rawData: parameter input. Nilainya adalah daftar bilangan bulat yang tidak boleh kosong. 
# jsonObj: parameter output. Nilainya adalah objek JSON yang menunjukkan kamus dan tidak boleh kosong. 
def raw_data_to_protocol(rawData):
    jsonObj = {}
    return jsonObj

# Mengonversi data Alink JSON menjadi data yang dapat dikenali oleh perangkat ketika IoT Platform mengirimkan data ke perangkat. 
# jsonData: parameter input. Nilainya adalah objek JSON yang menunjukkan kamus dan tidak boleh kosong. 
# rawData: parameter output. Nilainya adalah daftar bilangan bulat dalam rentang nilai [0,255] dan tidak boleh kosong. 
def protocol_to_raw_data(jsonData):
    rawData = []
    return rawData

Catatan untuk penulisan skrip

  • Hindari penggunaan variabel global karena hasilnya mungkin tidak konsisten.
  • Skrip digunakan untuk memproses data menggunakan operasi pelengkap dua. Rentang pelengkap untuk nilai dalam rentang [-128,127] adalah [0,255]. Sebagai contoh, pelengkap dari -1 adalah 255 dalam desimal.
  • Fungsi raw_data_to_protocol mengurai data yang dikirimkan oleh perangkat. Parameter input fungsi tersebut adalah array bilangan bulat. Gunakan 0xFF untuk operasi AND bit demi bit guna mendapatkan pelengkap.
  • Fungsi protocol_to_raw_data mengurai data yang dikirimkan oleh IoT Platform dan mengembalikan hasilnya dalam array. Setiap elemen dalam array harus berupa bilangan bulat dalam rentang nilai [0,255].

Contoh skrip

Contoh skrip berikut ditulis berdasarkan properti dan protokol yang didefinisikan dalam topik Kirim Skrip untuk Mengurai Data TSL:

Untuk informasi lebih lanjut tentang tipe data yang didukung oleh model TSL, lihat Tipe Data yang Didukung. Setelah perangkat mengirimkan data properti TSL atau data acara, IoT Platform menghasilkan respons dalam format Alink JSON. Sebelum IoT Platform mengirimkan respons ke perangkat, skrip digunakan untuk mengonversi format data menjadi format yang dapat dikenali oleh perangkat. Untuk informasi lebih lanjut tentang format Alink JSON, lihat Properti Perangkat, Acara, dan Layanan.

# coding=UTF-8
import struct
import common_util

COMMAND_REPORT = 0x00 # Kirim properti. 
COMMAND_SET = 0x01 # Konfigurasi properti. 
COMMAND_REPORT_REPLY = 0x02 # Kembalikan hasil pengiriman data. 
COMMAND_SET_REPLY = 0x03 # Kembalikan hasil pengaturan properti. 
COMMAD_UNKOWN = 0xff # Perintah tidak dikenal. 
ALINK_PROP_REPORT_METHOD = 'thing.event.property.post' # Topik untuk perangkat mengirimkan data properti ke IoT Platform. 
ALINK_PROP_SET_METHOD = 'thing.service.property.set' # Topik untuk IoT Platform mengirimkan perintah pengaturan properti ke perangkat. 
SELF_DEFINE_TOPIC_UPDATE_FLAG = '/user/update' # Tentukan topik kustom berikut: /user/update. 
SELF_DEFINE_TOPIC_ERROR_FLAG = '/user/update/error' # Tentukan topik kustom berikut: /user/update/error. 


# Contoh data:
# Kirim data properti
# Input:
#    0x000000000100320100000000
# Output:
#    {"method":"thing.event.property.post","id":"1","params":{"prop_float":0,"prop_int16":50,"prop_bool":1},"version":"1.0"}
# Hasil yang dikembalikan setelah pengaturan properti
# Input:
#    0x0300223344c8
# Output:
#    {"code":"200","data":{},"id":"2241348","version":"1.0"}

def raw_data_to_protocol(bytes):
    uint8Array = []
    for byteValue in bytes:
        uint8Array.append(byteValue & 0xff)

    fHead = uint8Array[0]
    jsonMap = {}
    if fHead == COMMAND_REPORT:
        jsonMap['method'] = ALINK_PROP_REPORT_METHOD
        jsonMap['version'] = '1.0'
        jsonMap['id'] = str(bytes_to_int(uint8Array[1:5]))
        params = {}
        params['prop_int16'] = bytes_to_int(uint8Array[5:7])
        params['prop_bool'] = bytes_to_int(uint8Array[7: 8])
        params['prop_float'] = bytes_to_int(uint8Array[8:])
        jsonMap['params'] = params
    elif fHead == COMMAND_SET_REPLY:
        jsonMap['version'] = '1.0'
        jsonMap['id'] = str(bytes_to_int(uint8Array[1:5]))
        jsonMap['code'] = str(bytes_to_int(uint8Array[5:]))
        jsonMap['data'] = {}

    return jsonMap


# Contoh data:
# Mengirim perintah downstream untuk mengonfigurasi properti perangkat
# Input:
#    {"method":"thing.service.property.set","id":"12345","version":"1.0",
#    "params":{"prop_float":123.452, "prop_int16":333, "prop_bool":1}}
# Output:
#    0x0100003039014d0142f6e76d
# Hasil yang dikembalikan setelah perangkat mengirimkan data
# Input:
#    {"method":"thing.event.property.post","id":"12345","version":"1.0","code":200,"data":{}}
# Output:
#    0x0200003039c8
def protocol_to_raw_data(json):
    method = json.get('method', None)
    id = json.get('id', None)
    version = json.get('version', None)
    payload_array = []

    if method == ALINK_PROP_SET_METHOD:
        params = json.get('params')
        prop_float = params.get('prop_float', None)
        prop_int16 = params.get('prop_int16', None)
        prop_bool = params.get('prop_bool', None)

        payload_array = payload_array + int_8_to_byte(COMMAND_SET)
        payload_array = payload_array + int_32_to_byte(int(id))
        payload_array = payload_array + int_16_to_byte(prop_int16)
        payload_array = payload_array + int_8_to_byte(prop_bool)
        payload_array = payload_array + float_to_byte(prop_float)

    elif method == ALINK_PROP_REPORT_METHOD:
        code = json.get('code', None)
        payload_array = payload_array + int_8_to_byte(COMMAND_REPORT_REPLY)
        payload_array = payload_array + int_32_to_byte(int(id))
        payload_array = payload_array + int_8_to_byte(code)
    else:
        code = json.get('code')
        payload_array = payload_array + int_8_to_byte(COMMAD_UNKOWN)
        payload_array = payload_array + int_32_to_byte(int(id))
        payload_array = payload_array + int_8_to_byte(code)

    return payload_array



# Contoh data:
# Gunakan topik kustom berikut untuk mengirimkan data:
# /user/update. 
# Input:
#      topic:/{productKey}/{deviceName}/user/update
#      bytes: 0x000000000100320100000000
# Output:
#  {
#     "prop_float": 0,
#     "prop_int16": 50,
#     "prop_bool": 1,
#     "topic": "/{productKey}/{deviceName}/user/update"
#   }
def transform_payload(topic, bytes):
    uint8Array = []
    for byteValue in bytes:
        uint8Array.append(byteValue & 0xff)

    jsonMap = {}
    if SELF_DEFINE_TOPIC_ERROR_FLAG in topic:
        jsonMap['topic'] = topic
        jsonMap['errorCode'] = bytes_to_int(uint8Array[0:1])

    elif SELF_DEFINE_TOPIC_UPDATE_FLAG in topic:
        jsonMap['topic'] = topic
        jsonMap['prop_int16'] = bytes_to_int(uint8Array[5:7])
        jsonMap['prop_bool'] = bytes_to_int(uint8Array[7: 8])
        jsonMap['prop_float'] = bytes_to_int(uint8Array[8:])

    return jsonMap


# Konversi array byte menjadi bilangan bulat. 
def bytes_to_int(bytes):
    data = ['%02X' % i for i in bytes]
    return int(''.join(data), 16)



# Konversi bilangan bulat 8-bit menjadi array byte. 
def int_8_to_byte(value):
    t_value = '%02X' % value
    if len(t_value) % 2 != 0:
        t_value += '0'

    return hex_string_to_byte_array(t_value)


# Konversi bilangan bulat 32-bit menjadi array byte. 
def int_32_to_byte(value):
    t_value = '%08X' % value
    if len(t_value) % 2 != 0:
        t_value += '0'

    return hex_string_to_byte_array(t_value)


# Konversi bilangan bulat 16-bit menjadi array byte. 
def int_16_to_byte(value):
    t_value = '%04X' % value
    if len(t_value) % 2 != 0:
        t_value += '0'

    return hex_string_to_byte_array(t_value)


# Konversi float menjadi array bilangan bulat. 
def float_to_byte(param):
    return hex_string_to_byte_array(struct.pack(">f", param).encode('hex'))


# Konversi string heksadesimal menjadi array byte. 
def hex_string_to_byte_array(str_value):
    if len(str_value) % 2 != 0:
        return None

    cycle = len(str_value) / 2

    pos = 0
    result = []
    for i in range(0, cycle, 1):
        temp_str_value = str_value[pos:pos + 2]
        temp_int_value = int(temp_str_value, base=16)

        result.append(temp_int_value)
        pos += 2
    return result