This document covers the parameters and common usage of the producer and consumer in the Python High-Level SDK.
Prerequisites
Python 3.6+
You must have a DataHub project, a topic, and a subscription ID (
sub_id).You must have the AccessKey ID and AccessKey Secret for a RAM user.
Install the SDK
pip install pydatahubTo use a zero-trust credential (recommended), you also need to install:
pip install alibabacloud-credentialsNote: The PyPI package name is pydatahub, not aliyun-datahub-sdk.
Authentication
Method 1: Environment variables (recommended)
The Alibaba Cloud SDK automatically retrieves credentials from environment variables, eliminating the need to hard-code them in your code.
export ALIBABA_CLOUD_ACCESS_KEY_ID=<your_access_key_id>
export ALIBABA_CLOUD_ACCESS_KEY_SECRET=<your_access_key_secret>import os
from alibabacloud_credentials.client import Client as CredClient
from alibabacloud_credentials.models import Config as CredConfig
from datahub.client import ProducerConfig, ConsumerConfig
config = CredConfig(
type='access_key',
access_key_id=os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID'),
access_key_secret=os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
)
credential = CredClient(config)
# Producer
producer_config = ProducerConfig.from_credential(credential, endpoint)
# Consumer
consumer_config = ConsumerConfig.from_credential(credential, endpoint)Method 2: Construct with an AccessKey pair
from datahub.client import ProducerConfig
# Method A: Pass as arguments directly
producer_config = ProducerConfig(access_id, access_key, endpoint)
# Method B: Use the from_access factory method (recommended)
producer_config = ProducerConfig.from_access(access_id, access_key, endpoint)Important: When you use a configuration file, make sure that the ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variables are not set. Otherwise, the configuration file will not take effect.
Write data with a producer
Limitations
Thread safety: A producer is thread-safe, meaning you only need one instance per topic within a process.
ProducerConfig parameters
Parameter | Type | Default | Description |
| str | Required | DataHub endpoint, such as |
| int |
| The number of retries after a write failure. A value of |
| int |
| The size of the asynchronous thread pool (range: 2–100). |
| int |
| The length limit of the asynchronous queue. |
| int |
| The maximum size of the asynchronous buffer in bytes. |
| int |
| The maximum number of records in the asynchronous buffer. |
| int |
| The maximum wait time for the asynchronous buffer in seconds. |
| int |
| The queue length limit for a single packing operation. |
| DatahubProtocolType |
| Protocol type: |
| CompressFormat |
| Compression format: |
| logging.Level |
| The logging level. |
| str |
| The path to the log file. |
Note: The Python SDK currently supports the NONE, LZ4, DEFLATE, and ZLIB compression formats, but does not support ZSTD.
Synchronous write
import os
from alibabacloud_credentials.client import Client as CredClient
from alibabacloud_credentials.models import Config as CredConfig
from datahub.core import DatahubProtocolType
from datahub.models import TupleRecord, BlobRecord, FieldType, RecordSchema, CompressFormat
from datahub.client import DatahubProducer, ProducerConfig
from datahub.exceptions import DatahubException
# ====== Configuration ======
endpoint = "https://dh-cn-hangzhou.aliyuncs.com"
project_name = "test_project"
topic_name = "test_topic"
# ====== Authentication ======
config = CredConfig(
type='access_key',
access_key_id=os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID'),
access_key_secret=os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
)
credential = CredClient(config)
producer_config = ProducerConfig.from_credential(credential, endpoint)
producer_config.protocol_type = DatahubProtocolType.PB
producer_config.compress_format = CompressFormat.LZ4
producer_config.retry_times = 3
# ====== Create a producer ======
producer = DatahubProducer(project_name, topic_name, producer_config)
try:
# Get topic metadata (contains the field list)
topic_meta = producer.topic_meta
# Construct TUPLE records
records = []
for i in range(100):
record = TupleRecord(schema=topic_meta.record_schema)
record.set_value(0, f"user_{i}") # Set field value by index
record.set_value(1, "login")
record.set_value(2, 1700000000 + i)
record.put_attribute("source", "python-sdk")
records.append(record)
# Synchronous write (blocks until the write operation completes)
shard_id = producer.write(records)
print(f"Synchronous write succeeded. Written to Shard: {shard_id}")
finally:
producer.close()Note: The TupleRecord object does not have a .schema attribute. To access field metadata, use topic_meta.record_schema.field_list on the Producer side, or record.field_list on the Consumer side.
Asynchronous write
producer = DatahubProducer(project_name, topic_name, producer_config)
try:
topic_meta = producer.topic_meta
records = []
for i in range(100):
record = TupleRecord(schema=topic_meta.record_schema)
record.set_value(0, f"user_{i}")
record.set_value(1, "click")
record.set_value(2, 1700000000 + i)
records.append(record)
# Asynchronous write (non-blocking, returns immediately)
shard_id = producer.write_async(records)
print(f"Asynchronous write submitted. Written to Shard: {shard_id}")
# Call flush() before the program exits to ensure all data is sent.
producer.flush()
finally:
producer.close()Note: For asynchronous writes, data is first written to a local buffer and then sent asynchronously by a background thread pool. Before the program exits, you must call producer.flush() to ensure that all data is sent.
Consume data with a consumer
Limitations
Subscription ID: Data consumption requires a subscription, which maintains an independent set of offsets.
Thread safety: A consumer is not thread-safe; therefore, each thread must use its own instance.
Consumption modes: Supports group consumption (shards are assigned automatically) and specified shard consumption (shards are assigned manually).
ConsumerConfig parameters
Parameter | Type | Default | Description |
| str | Required | The DataHub service endpoint. |
| int |
| The number of retries if consumption fails, where |
| int |
| The size of the asynchronous thread pool (range: 2–100). |
| int |
| The length limit of the asynchronous queue. |
| bool |
| Specifies whether to automatically commit the offset (ACK). If you disable this feature, you must manually call |
| int |
| The session timeout in milliseconds. A consumer is considered offline if it fails to send a heartbeat within this period. |
| int |
| The maximum number of records returned by a single read operation. |
| int |
| The maximum number of records to fetch from DataHub in a single request. |
| DatahubProtocolType |
| The protocol type. |
| CompressFormat |
| The compression format. |
| logging.Level |
| The logging level. |
| str |
| The path to the log file. |
Consume with auto ACK (recommended)
In this mode, the offset is automatically committed after each successful read. This is the simplest method and is suitable for scenarios that can tolerate minor data loss.
import os
from alibabacloud_credentials.client import Client as CredClient
from alibabacloud_credentials.models import Config as CredConfig
from datahub.core import DatahubProtocolType
from datahub.models import CompressFormat, TupleRecord, BlobRecord, FieldType
from datahub.client import DatahubConsumer, ConsumerConfig
from datahub.exceptions import DatahubException
# ====== Configuration ======
endpoint = "https://dh-cn-hangzhou.aliyuncs.com"
project_name = "test_project"
topic_name = "test_topic"
sub_id = "YOUR_SUBSCRIPTION_ID" # Create this in the DataHub console
# ====== Authentication ======
config = CredConfig(
type='access_key',
access_key_id=os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID'),
access_key_secret=os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
)
credential = CredClient(config)
consumer_config = ConsumerConfig.from_credential(credential, endpoint)
consumer_config.auto_ack_offset = True # This is the default value and can be omitted.
# ====== Create a consumer (group consumption mode) ======
datahub_consumer = DatahubConsumer(
project_name, topic_name, sub_id, consumer_config
)
record_cnt = 0
try:
while True:
record = datahub_consumer.read(timeout=60)
if record is None:
# Returns None if no data is available within the timeout period.
break
# Differentiate between TupleRecord and BlobRecord
if isinstance(record, TupleRecord):
fields = record.values # Tuple of field values
field_list = record.field_list # List of field metadata
print(f"Tuple record: {fields}")
# Access by field name
# for idx, field in enumerate(field_list):
# print(f" {field.name} = {fields[idx]}")
elif isinstance(record, BlobRecord):
print(f"Blob record: {record.blob_data}")
record_cnt += 1
# When auto_ack_offset=True, the offset is automatically committed after read() returns.
except DatahubException as e:
print(f"Consumption error: {e}")
finally:
datahub_consumer.close()
print(f"Consumed {record_cnt} records in total.")Consume with manual ACK
Use this method for scenarios that require committing the offset only after each record is successfully processed.
consumer_config = ConsumerConfig.from_credential(credential, endpoint)
consumer_config.auto_ack_offset = False # Disable auto ACK
datahub_consumer = DatahubConsumer(
project_name, topic_name, sub_id, consumer_config
)
try:
while True:
record = None
try:
record = datahub_consumer.read(timeout=60)
if record is not None:
# TODO: Process the data.
process_record(record)
except DatahubException as e:
print(f"Read error: {e}")
finally:
# After handling a successfully read record, you must manually ACK it.
if record is not None:
record.record_key.ack()
print("Manual ACK succeeded.")
finally:
datahub_consumer.close()Note: After disabling automatic ACK, you must call record.record_key.ack() for each read record. Otherwise, the offset cannot be advanced.
Group consumption
If you do not specify shard_ids, the consumer automatically joins the consumer group, and the DataHub server automatically assigns Shards to each consumer instance.
# Do not pass shard_ids to enable automatic group consumption.
datahub_consumer = DatahubConsumer(
project_name, topic_name, sub_id, consumer_config
)
# Start multiple consumer instances (in different processes or threads),
# and the service will automatically assign shards.
# Instance 1 -> Assigned Shards 0, 1
# Instance 2 -> Assigned Shards 2, 3Features:
Multiple consumer instances with the same subscription ID form a consumer group.
The service automatically assigns shards, ensuring that each shard is consumed by only one consumer at a time.
When a consumer goes offline, its shards are automatically reassigned to other active consumers in the group.
Download sample code
FAQ
Q: When should I use synchronous vs. asynchronous writes?
Synchronous write (
write): Blocks until the data is successfully written or retries are exhausted. Suitable for scenarios that require high data reliability.Asynchronous write (
write_async): This operation is non-blocking and writes data to a background buffer. It is suitable for high-throughput scenarios. You must callflush()before the program exits.
Q: When should I use auto ACK vs. manual ACK?
Auto ACK: Simple and efficient, but carries a small risk of data loss if the process crashes during processing.
Manual ACK: This mode ensures that an offset is committed only after its corresponding record is processed. Use it for mission-critical scenarios, such as financial transactions, where data loss is unacceptable.
Q: How do I view producer/consumer logs?
Logs are output to ./DatahubClient.log by default, which you can change in the configuration:
import logging
producer_config.logging_level = logging.DEBUG
producer_config.logging_filename = "/var/log/datahub/producer.log"Q: How do I use record.record_key.ack()?
record.record_key.ack() is used only in manual ACK mode, and the record must be returned by consumer.read() (the server automatically injects record_key).
consumer_config.auto_ack_offset = False
datahub_consumer = DatahubConsumer(
project_name, topic_name, sub_id, consumer_config
)
try:
while True:
record = datahub_consumer.read(timeout=60)
if record is not None:
# TODO: Process the data.
process_record(record)
# Manually ACK after processing is complete.
record.record_key.ack()
finally:
datahub_consumer.close()Note: Manually created records, such as a TupleRecord written by a producer, do not have a record_key. Calling record.record_key.ack() results in an AttributeError: 'NoneType' object has no attribute 'ack'.