全部產品
Search
文件中心

DataHub:Python High-Level SDK

更新時間:May 15, 2026

介紹 Python 的 High-Level SDK中 Producer 和 Consumer 的相關參數和常見用法。


前提條件

  • Python 3.6+

  • 已建立 DataHub Project 和 Topic,並擷取訂閱 ID(sub_id)

  • 已擷取 RAM 使用者的 AccessKey ID 和 AccessKey Secret


安裝 SDK

pip install pydatahub

如需使用零信任憑證(推薦),還需安裝:

pip install alibabacloud-credentials

注意:PyPI 包名為 pydatahub,而非 aliyun-datahub-sdk


身份認證

方式一:環境變數(推薦)

阿里雲 SDK 支援通過環境變數自動擷取憑證,無需在代碼中寫入程式碼 AK。

export ALIBABA_CLOUD_ACCESS_KEY_ID=<your_access_key_id>
export ALIBABA_CLOUD_ACCESS_KEY_SECRET=<your_access_key_secret>

代碼中無需寫入程式碼 AK:

import os
from alibabacloud_credentials.client import Client as CredClient
from alibabacloud_credentials.models import Config as CredConfig
from datahub.client import ProducerConfig, ConsumerConfig

config = CredConfig(
    type='access_key',
    access_key_id=os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID'),
    access_key_secret=os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
)
credential = CredClient(config)

# Producer
producer_config = ProducerConfig.from_credential(credential, endpoint)

# Consumer
consumer_config = ConsumerConfig.from_credential(credential, endpoint)

方式二:直接使用 AK 構建

from datahub.client import ProducerConfig

# 方式 A:直接傳參
producer_config = ProducerConfig(access_id, access_key, endpoint)

# 方式 B:使用 from_access Factory 方法(推薦)
producer_config = ProducerConfig.from_access(access_id, access_key, endpoint)

重要:使用設定檔方案時,請確保系統中不存在環境變數 ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRET,否則設定檔將不生效。


Producer 寫入資料

限制說明

  • 安全執行緒:Producer 是安全執行緒的,同一進程內一個 Topic 只需一個 Producer 執行個體。

ProducerConfig 參數說明

參數

類型

預設值

說明

endpoint

str

必填

DataHub 服務地址,如 https://dh-cn-hangzhou.aliyuncs.com

retry_times

int

3

寫入失敗時的重試次數,-1 表示無限重試

async_thread_limit

int

16

非同步線程池大小(範圍:2~100)

thread_queue_limit

int

1024

非同步隊列長度限制

max_async_buffer_size

int

4000000

非同步緩衝區最大位元組數

max_async_buffer_records

int

10000

非同步緩衝區最大記錄條數

max_async_buffer_time

int

1

非同步緩衝區最大等待時間(秒)

max_record_pack_queue_limit

int

1024

單次打包隊列長度限制

protocol_type

DatahubProtocolType

PB

協議類型:PB(Protocol Buffers)或 JSON

compress_format

CompressFormat

LZ4

壓縮格式:NONELZ4DEFLATEZLIB

logging_level

logging.Level

INFO

記錄層級

logging_filename

str

./DatahubClient.log

記錄檔路徑

注意:Python SDK 當前支援的壓縮格式為 NONELZ4DEFLATEZLIB不支援 ZSTD

同步寫入

import os
from alibabacloud_credentials.client import Client as CredClient
from alibabacloud_credentials.models import Config as CredConfig
from datahub.core import DatahubProtocolType
from datahub.models import TupleRecord, BlobRecord, FieldType, RecordSchema, CompressFormat
from datahub.client import DatahubProducer, ProducerConfig
from datahub.exceptions import DatahubException

# ====== 配置 ======
endpoint = "https://dh-cn-hangzhou.aliyuncs.com"
project_name = "test_project"
topic_name = "test_topic"

# ====== 認證 ======
config = CredConfig(
    type='access_key',
    access_key_id=os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID'),
    access_key_secret=os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
)
credential = CredClient(config)

producer_config = ProducerConfig.from_credential(credential, endpoint)
producer_config.protocol_type = DatahubProtocolType.PB
producer_config.compress_format = CompressFormat.LZ4
producer_config.retry_times = 3

# ====== 建立 Producer ======
producer = DatahubProducer(project_name, topic_name, producer_config)

try:
    # 擷取 Topic 元資訊(包含 field_list)
    topic_meta = producer.topic_meta

    # 構建 TUPLE 類型記錄
    records = []
    for i in range(100):
        record = TupleRecord(schema=topic_meta.record_schema)
        record.set_value(0, f"user_{i}")          # 按索引設定欄位值
        record.set_value(1, "login")
        record.set_value(2, 1700000000 + i)
        record.put_attribute("source", "python-sdk")
        records.append(record)

    # 同步寫入(阻塞直到寫入完成)
    shard_id = producer.write(records)
    print(f"同步寫入成功,寫入 Shard: {shard_id}")

finally:
    producer.close()

注意TupleRecord 沒有 .schema 屬性。訪問欄位元資訊請用 topic_meta.record_schema.field_list(Producer 端),或 record.field_list(Consumer 端)。

非同步寫入

producer = DatahubProducer(project_name, topic_name, producer_config)

try:
    topic_meta = producer.topic_meta

    records = []
    for i in range(100):
        record = TupleRecord(schema=topic_meta.record_schema)
        record.set_value(0, f"user_{i}")
        record.set_value(1, "click")
        record.set_value(2, 1700000000 + i)
        records.append(record)

    # 非同步寫入(非阻塞,立即返回)
    shard_id = producer.write_async(records)
    print(f"非同步寫入提交成功,寫入 Shard: {shard_id}")

    # 程式退出前調用 flush 確保所有資料發送完畢
    producer.flush()

finally:
    producer.close()

注意:非同步寫入時,資料會先寫入本地緩衝區,由後台線程池非同步發送。程式退出前必須調用 producer.flush() 確保資料發送完畢。


Consumer 消費資料

限制說明

  • 訂閱 ID:消費資料需要先建立訂閱(SubId),一個訂閱對應一組獨立的消費位點。

  • 安全執行緒:Consumer 不是安全執行緒的,每個線程應使用獨立的 Consumer 執行個體。

  • 消費模式:支援 協同消費(自動分配 Shard)和 指定 Shard 消費(手動分配 Shard)兩種模式。

ConsumerConfig 參數說明

參數

類型

預設值

說明

endpoint

str

必填

DataHub 服務地址

retry_times

int

3

消費失敗時的重試次數,-1 表示無限重試

async_thread_limit

int

16

非同步線程池大小(範圍:2~100)

thread_queue_limit

int

1024

非同步隊列長度限制

auto_ack_offset

bool

True

是否自動認可消費位點(ACK)。關閉後需手動 record.record_key.ack()

session_timeout

int

6000

會話逾時時間(毫秒)。逾時未心跳的 Consumer 會被認為下線

max_record_buffer_size

int

100

單次讀取最大記錄數

fetch_limit

int

1000

單次請求 DataHub 拉取的最大記錄數

protocol_type

DatahubProtocolType

PB

協議類型

compress_format

CompressFormat

LZ4

壓縮格式

logging_level

logging.Level

INFO

記錄層級

logging_filename

str

./DatahubClient.log

記錄檔路徑

自動 ACK 消費(推薦)

最簡單的方式,讀取成功後自動認可位點。適用於對資料丟失不敏感的情境。

import os
from alibabacloud_credentials.client import Client as CredClient
from alibabacloud_credentials.models import Config as CredConfig
from datahub.core import DatahubProtocolType
from datahub.models import CompressFormat, TupleRecord, BlobRecord, FieldType
from datahub.client import DatahubConsumer, ConsumerConfig
from datahub.exceptions import DatahubException

# ====== 配置 ======
endpoint = "https://dh-cn-hangzhou.aliyuncs.com"
project_name = "test_project"
topic_name = "test_topic"
sub_id = "YOUR_SUBSCRIPTION_ID"  # 在 DataHub 控制台建立

# ====== 認證 ======
config = CredConfig(
    type='access_key',
    access_key_id=os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID'),
    access_key_secret=os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
)
credential = CredClient(config)

consumer_config = ConsumerConfig.from_credential(credential, endpoint)
consumer_config.auto_ack_offset = True    # 預設值,可省略

# ====== 建立 Consumer(協同消費模式) ======
datahub_consumer = DatahubConsumer(
    project_name, topic_name, sub_id, consumer_config
)

record_cnt = 0
try:
    while True:
        record = datahub_consumer.read(timeout=60)
        if record is None:
            # timeout 內無資料,返回 None
            break

        # 區分 TupleRecord 和 BlobRecord
        if isinstance(record, TupleRecord):
            fields = record.values            # 欄位值元組
            field_list = record.field_list    # 欄位元資訊列表
            print(f"Tuple 記錄: {fields}")
            # 按欄位名訪問
            # for idx, field in enumerate(field_list):
            #     print(f"  {field.name} = {fields[idx]}")
        elif isinstance(record, BlobRecord):
            print(f"Blob 記錄: {record.blob_data}")

        record_cnt += 1
        # auto_ack_offset=True 時,read() 返回即自動 ACK

except DatahubException as e:
    print(f"消費異常: {e}")
finally:
    datahub_consumer.close()

print(f"共消費 {record_cnt} 條記錄")

手動 ACK 消費

適用於要求 每條資料處理成功後才能提交位點 的情境

consumer_config = ConsumerConfig.from_credential(credential, endpoint)
consumer_config.auto_ack_offset = False   # 關閉自動 ACK

datahub_consumer = DatahubConsumer(
    project_name, topic_name, sub_id, consumer_config
)

try:
    while True:
        record = None
        try:
            record = datahub_consumer.read(timeout=60)
            if record is not None:
                # TODO: 處理資料
                process_record(record)
        except DatahubException as e:
            print(f"讀取異常: {e}")
        finally:
            # 無論成功還是失敗,處理完後必須手動 ACK
            if record is not None:
                record.record_key.ack()
                print("手動 ACK 成功")
finally:
    datahub_consumer.close()

注意:關閉自動 ACK 後,必須對每條讀取到的記錄調用 record.record_key.ack(),否則消費位點無法推進。

協同消費

不指定 shard_ids 時,Consumer 會自動加入消費組,DataHub 服務端會自動分配 Shard 給各個 Consumer 執行個體。

# 不傳 shard_ids,自動協同消費
datahub_consumer = DatahubConsumer(
    project_name, topic_name, sub_id, consumer_config
)

# 啟動多個 Consumer 執行個體(不同進程或不同線程),服務端會自動分配 Shard
# 執行個體 1 -> 分配 Shard 0, 1
# 執行個體 2 -> 分配 Shard 2, 3

特性

  • 同一訂閱 ID 的多個 Consumer 執行個體組成一個消費組

  • 服務端自動分配 Shard,確保每個 Shard 只被一個 Consumer 消費

  • Consumer 下線後,其 Shard 會自動重新分配給其他 Consumer


範例程式碼下載

aliyun_python_sdk_demo

常見問題

Q: 同步寫入和非同步寫入怎麼選?

  • 同步寫入 (write):阻塞直到資料寫入成功或重試耗盡。適合對資料可靠性要求高的情境。

  • 非同步寫入 (write_async):非阻塞,資料寫入背景緩衝區。適合高吞吐情境,程式退出前必須調用 flush()

Q: 自動 ACK 和手動 ACK 怎麼選?

  • 自動 ACK:簡單高效,但存在極少量資料丟失風險(處理中途進程崩潰)。

  • 手動 ACK:確保每條資料處理完才提交位點,適合金融、訂單等不能丟資料的情境。

Q: 如何查看 Producer/Consumer 的日誌?

日誌預設輸出到 ./DatahubClient.log,可以通過配置修改:

import logging

producer_config.logging_level = logging.DEBUG
producer_config.logging_filename = "/var/log/datahub/producer.log"

Q: record.record_key.ack() 如何使用?

record.record_key.ack() 僅在手動 ACK 模式下使用,且記錄必須是通過 consumer.read() 返回的(服務端會自動注入 record_key)。

consumer_config.auto_ack_offset = False

datahub_consumer = DatahubConsumer(
    project_name, topic_name, sub_id, consumer_config
)

try:
    while True:
        record = datahub_consumer.read(timeout=60)
        if record is not None:
            # TODO: 處理資料
            process_record(record)
            # 處理完成後手動 ACK
            record.record_key.ack()
finally:
    datahub_consumer.close()

注意:手動建立的記錄(如 Producer 寫入時的 TupleRecord)沒有 record_key,調用 record.record_key.ack() 會報 AttributeError: 'NoneType' object has no attribute 'ack'