All Products
Search
Document Center

DataHub:DataHub SDK for Python

Last Updated:Jun 02, 2026

Install and use DataHub SDK for Python to manage projects, topics, and records in DataHub.

DataHub SDK for Python

Installation

Quick installation

$ sudo pip install pydatahub

Install from source code

$ git clone https://github.com/aliyun/aliyun-datahub-sdk-python.git
$ cd aliyun-datahub-sdk-python
$ sudo python setup.py install

Troubleshooting

1. If you get the 'Python.h: No such file or directory' error, install the Python development headers:

$ sudo apt-get install python-dev   # for python2.x installs
$ sudo apt-get install python3-dev  # for python3.x installs
$ sudo yum install python-devel   # for python2.x installs
$ sudo yum install python3-devel   # for python3 installs

2. On Windows, download and install the Visual C++ compiler before installing the SDK. The cprotobuf dependency on Windows 10 requires this compiler:

bulding 'cprotobuf.internal' extention 
error: [WinError2] The system cannot find the file specified

Python 3.6 or later is recommended.

3. If you get the following linker error, run the installation from the Developer Command Prompt or locate the missing files manually:

LINK : fatal error LNK1158: cannot run 'rc.exe'

4. On Windows 7, if you get the following error, install the build tools:

error: Microsoft Visual C++ 14.0 is required. Get it with "Microsoft Visual C++ Build Tools": https://visualstudio.microsoft.com/downloads/

Verify the installation

$ python -c "from datahub import DataHub"

If no error is returned, the SDK is installed.

Terms

For more information, see Terms.

Prerequisites

  • An Alibaba Cloud account with an AccessKey pair (AccessKey ID and AccessKey secret) and the DataHub endpoint.

  • Create a project.

    • Log on to the DataHub console and create a project.

    • Call the SDK to create a project.

  • Initialize DataHub.

import sys
import traceback
from datahub import DataHub
from datahub.exceptions import ResourceExistException
from datahub.models import FieldType, RecordSchema, TupleRecord, BlobRecord, CursorType, RecordType
access_id = ***your access id***
access_key = ***your access key***
endpoint = ***your datahub server endpoint***
dh = DataHub(access_id, access_key, endpoint)

Manage projects

  • Create a project:

project_name = 'project'
comment = 'comment'
try:
    dh.create_project(project_name, comment)
    print("create project success!")
    print("=======================================\n\n")
except ResourceExistException:
    print("project already exist!")
    print("=======================================\n\n")
except Exception as e:
    print(traceback.format_exc())
    sys.exit(-1)

Manage topics

Tuple topic

  • Tuple topics require a record schema. The following table lists the supported data types.

Type

Description

Value range

Bigint

Eight-byte signed integer. Do not use -9223372036854775808 (reserved by the system).

-9223372036854775807 to 9223372036854775807.

String

A string. Only UTF-8 encoding is supported.

Total size of all values in a STRING column cannot exceed 1 MB.

Boolean

Boolean type.

True and False, true and false, or 0 and 1.

Double

Eight-byte double-precision floating-point number.

-1.0 10308 to 1.0 10308.

TimeStamp

Timestamp type.

Accurate to microseconds.

  • Create a tuple topic:

topic_name = "tuple_topic"
shard_count = 3
life_cycle = 7
record_schema = RecordSchema.from_lists(
    ['bigint_field', 'string_field', 'double_field', 'bool_field', 'time_field'],
    [FieldType.BIGINT, FieldType.STRING, FieldType.DOUBLE, FieldType.BOOLEAN, FieldType.TIMESTAMP])
try:
    dh.create_tuple_topic(project_name, topic_name, shard_count, life_cycle, record_schema, comment)
    print("create tuple topic success!")
    print("=======================================\n\n")
except ResourceExistException:
    print("topic already exist!")
    print("=======================================\n\n")
except Exception as e:
    print(traceback.format_exc())
    sys.exit(-1)

Blob topic

  • Blob topics accept binary data as records. Data is Base64-encoded for transmission.

topic_name = "blob_topic"
shard_count = 3
life_cycle = 7
try:
    dh.create_blob_topic(project_name, topic_name, shard_count, life_cycle, comment)
    print("create blob topic success!")
    print("=======================================\n\n")
except ResourceExistException:
    print("topic already exist!")
    print("=======================================\n\n")
except Exception as e:
    print(traceback.format_exc())
    sys.exit(-1)

Record publishing and subscription

List shards

  • List all shards in a topic:

shard_result = dh.list_shard(project_name, topic_name)
shards = shard_result.shards
print(len(shards))

Returns a ListShardResult containing a list of shards with shard_id, state, begin_hash_key, and end_hash_key.

Publish records

  • Write records to a topic:

put_result = dh.put_records(project_name, topic_name, records)
print(put_result.failed_record_count)
print(put_result.failed_records)

The records parameter takes a list of records of the same type (tuple or blob). Returns a PutRecordsResult with failed_record_count and failed_records. Each FailedRecord contains an index, error code, and error message.

  • Write tuple records to a topic:

try:
    # Wait until all shards are in the Active state.
    dh.wait_shards_ready(project_name, topic_name)
    print("shards all ready!!!")
    print("=======================================\n\n")
    topic_result = dh.get_topic(project_name, topic_name)
    print(topic_result)
    if topic_result.record_type != RecordType.TUPLE:
        print("topic type illegal!")
        sys.exit(-1)
    print("=======================================\n\n")
    record_schema = topic_result.record_schema
    records0 = []
    record0 = TupleRecord(schema=record_schema, values=[1, 'yc1', 10.01, True, 1455869335000000])
    record0.shard_id = '0'
    record0.put_attribute('AK', '47')
    records0.append(record0)
    record1 = TupleRecord(schema=record_schema)
    record1.set_value('bigint_field', 2)
    record1.set_value('string_field', 'yc2')
    record1.set_value('double_field', None)
    record1.set_value('bool_field', False)
    record1.set_value('time_field', 1455869335000011)
    record1.hash_key = '4FFFFFFFFFFFFFFD7FFFFFFFFFFFFFFD'
    records0.append(record1)
    record2 = TupleRecord(schema=record_schema)
    record2.set_value(0, 3)
    record2.set_value(1, 'yc3')
    record2.set_value(2,  1.1)
    record2.set_value(3, False)
    record2.set_value(4, 1455869335000011)
    record2.attributes = {'key': 'value'}
    record2.partition_key = 'TestPartitionKey'
    records0.append(record2)
    put_result = dh.put_records(project_name, topic_name, records0)
    print(put_result)
    print("put tuple %d records, failed count: %d" %(len(records0), put_result.failed_record_count))
    # If at least one record fails to be written, write the record again.
    print("=======================================\n\n")
except DatahubException as e:
    print(e)
    sys.exit(-1)
  • Write blob records to a topic:

try:
    records1 = []
    record3 = BlobRecord(blob_data='data')
    record3.shard_id = '0'
    record3.put_attribute('a', 'b')
    records1.append(record3)
    put_result = dh.put_records(project_name, topic_name, records1)
    print(put_result)
except DatahubException as e:
    print(e)
    sys.exit(-1)

Get a cursor

  • Three cursor types are available:

    • OLDEST: points to the earliest valid record in the shard.

    • LATEST: points to the latest record in the shard.

    • SYSTEM_TIME: points to the first record with a timestamp greater than or equal to the specified value (in milliseconds).

shard_id = '0'
time_stamp = 0
cursor_result0 = dh.get_cursor(project_name, topic_name, shard_id, CursorType.OLDEST)
cursor_result1 = dh.get_cursor(project_name, topic_name, shard_id, CursorType.LATEST)
cursor_result2 = dh.get_cursor(project_name, topic_name, shard_id, CursorType.SYSTEM_TIME, time_stamp)
cursor = cursor_result0.cursor

Call get_cursor to get a cursor for reading data from a specific position.

Subscribe to records

  • Specify a cursor and a maximum record count to read records from a shard. If fewer records exist, only the available records are returned.

project_name = 'project'
shard_id = "0"
limit = 10
# Read the records of a blob topic.
topic_name = 'blob_topic'
get_result = dh.get_blob_records(project_name, topic_name, shard_id, cursor, limit)
# Read the records of a tuple topic.
topic_name = 'tuple_topic'
get_result = dh.get_tuple_records(project_name, topic_name, shard_id, record_schema, cursor, limit)
  • Consume tuple records:

    try:
      # Wait until all shards are in the Active state.
      dh.wait_shards_ready(project_name, topic_name)
      print("shards all ready!!!")
      print("=======================================\n\n")
      topic_result = dh.get_topic(project_name, topic_name)
      print(topic_result)
      if topic_result.record_type != RecordType.TUPLE:
          print("topic type illegal!")
          sys.exit(-1)
      print("=======================================\n\n")
      shard_id = '0'
      limit = 10
      cursor_result = dh.get_cursor(project_name, topic_name, shard_id, CursorType.OLDEST)
      cursor = cursor_result.cursor
      while True:
          get_result = dh.get_tuple_records(project_name, topic_name, shard_id, record_schema, cursor, limit)
          for record in get_result.records:
              print(record)
          if 0 == get_result.record_count:
              time.sleep(1)
          cursor = get_result.next_cursor
    except DatahubException as e:
      print(e)
      sys.exit(-1)

    References

  • Documentation of DataHub SDK for Python

  • Install DataHub SDK for Python

  • GitHub address