全部产品
Search
文档中心

IoT Platform:Contoh skrip Python

更新时间:Jul 02, 2025

Topik ini menyediakan template skrip Python serta contoh skrip Python yang dapat digunakan untuk mengurai pesan yang dikirim ke topik kustom.

Template skrip

SELF_DEFINE_TOPIC_UPDATE_FLAG = '/user/update' # Topik kustom /user/update. 
SELF_DEFINE_TOPIC_ERROR_FLAG = '/user/update/error' # Topik kustom /user/update/error. 

# Panggil fungsi untuk mengonversi data pesan dalam topik kustom menjadi data JSON ketika perangkat mengirimkan data ke IoT Platform. 
# topic: parameter input yang menentukan topik ke mana perangkat mengirimkan pesan. Nilainya adalah string.    
#rawData: parameter input. Nilainya adalah daftar bilangan bulat yang tidak boleh kosong. 
# jsonObj: parameter output. Nilainya adalah objek JSON yang menunjukkan kamus.     
def transform_payload(topic, rawData):
   jsonObj = {}
   return jsonObj

Contoh skrip

Catatan Contoh skrip berikut hanya dapat digunakan untuk mengurai pesan yang dikirim ke topik kustom. Jika Anda mengatur parameter Data Type produk ke Custom, Anda juga harus menulis skrip untuk mengurai pesan Thing Specification Language (TSL). Untuk informasi lebih lanjut tentang cara menulis skrip untuk mengurai pesan TSL, lihat Kirim Skrip untuk Mengurai Data TSL.

Untuk informasi lebih lanjut tentang custom data formats, lihat Buat Produk.

# coding=UTF-8
SELF_DEFINE_TOPIC_UPDATE_FLAG = '/user/update' # Topik kustom /user/update. 
SELF_DEFINE_TOPIC_ERROR_FLAG = '/user/update/error' # Topik kustom /user/update/error. 

#  Contoh data:
#  Topik kustom: /user/update, ke mana data dikirim. 
#  Input:
#     topic: /${productKey}/${deviceName}/user/update
#     bytes: 0x000000000100320100000000
#  Output:
#  {
#     "prop_float": 0,
#     "prop_int16": 50,
#     "prop_bool": 1,
#     "topic": "/${productKey}/${deviceName}/user/update"
#   }
def transform_payload(topic, bytes):
    uint8Array = []
    for byteValue in bytes:
        uint8Array.append(byteValue & 0xff)

    jsonMap = {}
    if SELF_DEFINE_TOPIC_ERROR_FLAG in topic:
        jsonMap['topic'] = topic
        jsonMap['errorCode'] = bytes_to_int(uint8Array[0:1])

    elif SELF_DEFINE_TOPIC_UPDATE_FLAG in topic:
        jsonMap['topic'] = topic
        jsonMap['prop_int16'] = bytes_to_int(uint8Array[5:7])
        jsonMap['prop_bool'] = bytes_to_int(uint8Array[7: 8])
        jsonMap['prop_float'] = bytes_to_int(uint8Array[8:])

    return jsonMap

# Konversikan larik byte menjadi data tipe INT. 
def bytes_to_int(bytes):
    data = ['%02X' % i for i in bytes]
    return int(''.join(data), 16)