介紹 Python 的 High-Level SDK中 Producer 和 Consumer 的相關參數和常見用法。
前提條件
Python 3.6+
已建立 DataHub Project 和 Topic,並擷取訂閱 ID(sub_id)
已擷取 RAM 使用者的 AccessKey ID 和 AccessKey Secret
安裝 SDK
pip install pydatahub如需使用零信任憑證(推薦),還需安裝:
pip install alibabacloud-credentials注意:PyPI 包名為 pydatahub,而非 aliyun-datahub-sdk。
身份認證
方式一:環境變數(推薦)
阿里雲 SDK 支援通過環境變數自動擷取憑證,無需在代碼中寫入程式碼 AK。
export ALIBABA_CLOUD_ACCESS_KEY_ID=<your_access_key_id>
export ALIBABA_CLOUD_ACCESS_KEY_SECRET=<your_access_key_secret>代碼中無需寫入程式碼 AK:
import os
from alibabacloud_credentials.client import Client as CredClient
from alibabacloud_credentials.models import Config as CredConfig
from datahub.client import ProducerConfig, ConsumerConfig
config = CredConfig(
type='access_key',
access_key_id=os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID'),
access_key_secret=os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
)
credential = CredClient(config)
# Producer
producer_config = ProducerConfig.from_credential(credential, endpoint)
# Consumer
consumer_config = ConsumerConfig.from_credential(credential, endpoint)方式二:直接使用 AK 構建
from datahub.client import ProducerConfig
# 方式 A:直接傳參
producer_config = ProducerConfig(access_id, access_key, endpoint)
# 方式 B:使用 from_access Factory 方法(推薦)
producer_config = ProducerConfig.from_access(access_id, access_key, endpoint)重要:使用設定檔方案時,請確保系統中不存在環境變數 ALIBABA_CLOUD_ACCESS_KEY_ID 和 ALIBABA_CLOUD_ACCESS_KEY_SECRET,否則設定檔將不生效。
Producer 寫入資料
限制說明
安全執行緒:Producer 是安全執行緒的,同一進程內一個 Topic 只需一個 Producer 執行個體。
ProducerConfig 參數說明
參數 | 類型 | 預設值 | 說明 |
| str | 必填 | DataHub 服務地址,如 |
| int |
| 寫入失敗時的重試次數, |
| int |
| 非同步線程池大小(範圍:2~100) |
| int |
| 非同步隊列長度限制 |
| int |
| 非同步緩衝區最大位元組數 |
| int |
| 非同步緩衝區最大記錄條數 |
| int |
| 非同步緩衝區最大等待時間(秒) |
| int |
| 單次打包隊列長度限制 |
| DatahubProtocolType |
| 協議類型: |
| CompressFormat |
| 壓縮格式: |
| logging.Level |
| 記錄層級 |
| str |
| 記錄檔路徑 |
注意:Python SDK 當前支援的壓縮格式為 NONE、LZ4、DEFLATE、ZLIB,不支援 ZSTD。
同步寫入
import os
from alibabacloud_credentials.client import Client as CredClient
from alibabacloud_credentials.models import Config as CredConfig
from datahub.core import DatahubProtocolType
from datahub.models import TupleRecord, BlobRecord, FieldType, RecordSchema, CompressFormat
from datahub.client import DatahubProducer, ProducerConfig
from datahub.exceptions import DatahubException
# ====== 配置 ======
endpoint = "https://dh-cn-hangzhou.aliyuncs.com"
project_name = "test_project"
topic_name = "test_topic"
# ====== 認證 ======
config = CredConfig(
type='access_key',
access_key_id=os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID'),
access_key_secret=os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
)
credential = CredClient(config)
producer_config = ProducerConfig.from_credential(credential, endpoint)
producer_config.protocol_type = DatahubProtocolType.PB
producer_config.compress_format = CompressFormat.LZ4
producer_config.retry_times = 3
# ====== 建立 Producer ======
producer = DatahubProducer(project_name, topic_name, producer_config)
try:
# 擷取 Topic 元資訊(包含 field_list)
topic_meta = producer.topic_meta
# 構建 TUPLE 類型記錄
records = []
for i in range(100):
record = TupleRecord(schema=topic_meta.record_schema)
record.set_value(0, f"user_{i}") # 按索引設定欄位值
record.set_value(1, "login")
record.set_value(2, 1700000000 + i)
record.put_attribute("source", "python-sdk")
records.append(record)
# 同步寫入(阻塞直到寫入完成)
shard_id = producer.write(records)
print(f"同步寫入成功,寫入 Shard: {shard_id}")
finally:
producer.close()注意:TupleRecord 沒有 .schema 屬性。訪問欄位元資訊請用 topic_meta.record_schema.field_list(Producer 端),或 record.field_list(Consumer 端)。
非同步寫入
producer = DatahubProducer(project_name, topic_name, producer_config)
try:
topic_meta = producer.topic_meta
records = []
for i in range(100):
record = TupleRecord(schema=topic_meta.record_schema)
record.set_value(0, f"user_{i}")
record.set_value(1, "click")
record.set_value(2, 1700000000 + i)
records.append(record)
# 非同步寫入(非阻塞,立即返回)
shard_id = producer.write_async(records)
print(f"非同步寫入提交成功,寫入 Shard: {shard_id}")
# 程式退出前調用 flush 確保所有資料發送完畢
producer.flush()
finally:
producer.close()注意:非同步寫入時,資料會先寫入本地緩衝區,由後台線程池非同步發送。程式退出前必須調用 producer.flush() 確保資料發送完畢。
Consumer 消費資料
限制說明
訂閱 ID:消費資料需要先建立訂閱(SubId),一個訂閱對應一組獨立的消費位點。
安全執行緒:Consumer 不是安全執行緒的,每個線程應使用獨立的 Consumer 執行個體。
消費模式:支援 協同消費(自動分配 Shard)和 指定 Shard 消費(手動分配 Shard)兩種模式。
ConsumerConfig 參數說明
參數 | 類型 | 預設值 | 說明 |
| str | 必填 | DataHub 服務地址 |
| int |
| 消費失敗時的重試次數, |
| int |
| 非同步線程池大小(範圍:2~100) |
| int |
| 非同步隊列長度限制 |
| bool |
| 是否自動認可消費位點(ACK)。關閉後需手動 |
| int |
| 會話逾時時間(毫秒)。逾時未心跳的 Consumer 會被認為下線 |
| int |
| 單次讀取最大記錄數 |
| int |
| 單次請求 DataHub 拉取的最大記錄數 |
| DatahubProtocolType |
| 協議類型 |
| CompressFormat |
| 壓縮格式 |
| logging.Level |
| 記錄層級 |
| str |
| 記錄檔路徑 |
自動 ACK 消費(推薦)
最簡單的方式,讀取成功後自動認可位點。適用於對資料丟失不敏感的情境。
import os
from alibabacloud_credentials.client import Client as CredClient
from alibabacloud_credentials.models import Config as CredConfig
from datahub.core import DatahubProtocolType
from datahub.models import CompressFormat, TupleRecord, BlobRecord, FieldType
from datahub.client import DatahubConsumer, ConsumerConfig
from datahub.exceptions import DatahubException
# ====== 配置 ======
endpoint = "https://dh-cn-hangzhou.aliyuncs.com"
project_name = "test_project"
topic_name = "test_topic"
sub_id = "YOUR_SUBSCRIPTION_ID" # 在 DataHub 控制台建立
# ====== 認證 ======
config = CredConfig(
type='access_key',
access_key_id=os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID'),
access_key_secret=os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
)
credential = CredClient(config)
consumer_config = ConsumerConfig.from_credential(credential, endpoint)
consumer_config.auto_ack_offset = True # 預設值,可省略
# ====== 建立 Consumer(協同消費模式) ======
datahub_consumer = DatahubConsumer(
project_name, topic_name, sub_id, consumer_config
)
record_cnt = 0
try:
while True:
record = datahub_consumer.read(timeout=60)
if record is None:
# timeout 內無資料,返回 None
break
# 區分 TupleRecord 和 BlobRecord
if isinstance(record, TupleRecord):
fields = record.values # 欄位值元組
field_list = record.field_list # 欄位元資訊列表
print(f"Tuple 記錄: {fields}")
# 按欄位名訪問
# for idx, field in enumerate(field_list):
# print(f" {field.name} = {fields[idx]}")
elif isinstance(record, BlobRecord):
print(f"Blob 記錄: {record.blob_data}")
record_cnt += 1
# auto_ack_offset=True 時,read() 返回即自動 ACK
except DatahubException as e:
print(f"消費異常: {e}")
finally:
datahub_consumer.close()
print(f"共消費 {record_cnt} 條記錄")手動 ACK 消費
適用於要求 每條資料處理成功後才能提交位點 的情境
consumer_config = ConsumerConfig.from_credential(credential, endpoint)
consumer_config.auto_ack_offset = False # 關閉自動 ACK
datahub_consumer = DatahubConsumer(
project_name, topic_name, sub_id, consumer_config
)
try:
while True:
record = None
try:
record = datahub_consumer.read(timeout=60)
if record is not None:
# TODO: 處理資料
process_record(record)
except DatahubException as e:
print(f"讀取異常: {e}")
finally:
# 無論成功還是失敗,處理完後必須手動 ACK
if record is not None:
record.record_key.ack()
print("手動 ACK 成功")
finally:
datahub_consumer.close()注意:關閉自動 ACK 後,必須對每條讀取到的記錄調用 record.record_key.ack(),否則消費位點無法推進。
協同消費
不指定 shard_ids 時,Consumer 會自動加入消費組,DataHub 服務端會自動分配 Shard 給各個 Consumer 執行個體。
# 不傳 shard_ids,自動協同消費
datahub_consumer = DatahubConsumer(
project_name, topic_name, sub_id, consumer_config
)
# 啟動多個 Consumer 執行個體(不同進程或不同線程),服務端會自動分配 Shard
# 執行個體 1 -> 分配 Shard 0, 1
# 執行個體 2 -> 分配 Shard 2, 3特性:
同一訂閱 ID 的多個 Consumer 執行個體組成一個消費組
服務端自動分配 Shard,確保每個 Shard 只被一個 Consumer 消費
Consumer 下線後,其 Shard 會自動重新分配給其他 Consumer
範例程式碼下載
常見問題
Q: 同步寫入和非同步寫入怎麼選?
同步寫入 (
write):阻塞直到資料寫入成功或重試耗盡。適合對資料可靠性要求高的情境。非同步寫入 (
write_async):非阻塞,資料寫入背景緩衝區。適合高吞吐情境,程式退出前必須調用flush()。
Q: 自動 ACK 和手動 ACK 怎麼選?
自動 ACK:簡單高效,但存在極少量資料丟失風險(處理中途進程崩潰)。
手動 ACK:確保每條資料處理完才提交位點,適合金融、訂單等不能丟資料的情境。
Q: 如何查看 Producer/Consumer 的日誌?
日誌預設輸出到 ./DatahubClient.log,可以通過配置修改:
import logging
producer_config.logging_level = logging.DEBUG
producer_config.logging_filename = "/var/log/datahub/producer.log"Q: record.record_key.ack() 如何使用?
record.record_key.ack() 僅在手動 ACK 模式下使用,且記錄必須是通過 consumer.read() 返回的(服務端會自動注入 record_key)。
consumer_config.auto_ack_offset = False
datahub_consumer = DatahubConsumer(
project_name, topic_name, sub_id, consumer_config
)
try:
while True:
record = datahub_consumer.read(timeout=60)
if record is not None:
# TODO: 處理資料
process_record(record)
# 處理完成後手動 ACK
record.record_key.ack()
finally:
datahub_consumer.close()注意:手動建立的記錄(如 Producer 寫入時的 TupleRecord)沒有 record_key,調用 record.record_key.ack() 會報 AttributeError: 'NoneType' object has no attribute 'ack'。