All Products
Search
Document Center

DataHub:DataHub SDK for Python

Last Updated:Aug 06, 2021

DataHub SDK for Python

Installation

Quick installation

$ sudo pip install pydatahub

Use the source code for installation

$ git clone https://github.com/aliyun/aliyun-datahub-sdk-python.git
$ cd aliyun-datahub-sdk-python
$ sudo python setup.py install

Common issues related to installation

1. When you install DataHub SDK for Python, the following error message is returned: 'Python.h: No such file or directory'. To resolve this error, you can use the following methods to install DataHub SDK for Python in commonly used operating systems:

$ sudo apt-get install python-dev   # for python2.x installs
$ sudo apt-get install python3-dev  # for python3.x installs
$ sudo yum install python-devel   # for python2.x installs
$ sudo yum install python3-devel   # for python3 installs

2. If you use the Windows operating system, download and install a corresponding version of the Visual C++ compiler as prompted before you install DataHub SDK for Python. When you install the cprotobuf dependency in Windows 10, an error similar to the following error is reported. In this case, install the Visual C++ compiler.

bulding 'cprotobuf.internal' extention 
error: [WinError2] The system cannot find the file specified

We recommend that you use Python 3.6 or later because the required version information and link information are provided.

3. When you install dependencies, an error similar to the following error is reported. This indicates that the current environment is not as required. We recommend that you search for related errors and copy required files based on your requirements. Alternatively, you can use the developer command prompt tool for installation.

LINK : fatal error LNK1158: cannot run 'rc.exe'

4. When you install DataHub SDK for Python in Windows 7, the following error is reported. In this case, install build tools.

error: Microsoft Visual C++ 14.0 is required. Get it with "Microsoft Visual C++ Build Tools": https://visualstudio.microsoft.com/downloads/

Verify the installation

$ python -c "from datahub import DataHub"

If the preceding command can be run, DataHub SDK for Python is installed.

Terms

For more information, see Terms.

Preparations

  • You must use an Alibaba Cloud account to access DataHub. To access DataHub, you must provide your AccessKey ID and AccessKey secret, and the endpoint that is used to access DataHub.

  • Create a project.

    • Log on to the DataHub console and create a project.

    • Call a method provided by DataHub SDK for Python to create a project.

  • Initialize DataHub.

import sys
import traceback
from datahub import DataHub
from datahub.exceptions import ResourceExistException
from datahub.models import FieldType, RecordSchema, TupleRecord, BlobRecord, CursorType, RecordType
access_id = ***your access id***
access_key = ***your access key***
endpoint = ***your datahub server endpoint***
dh = DataHub(access_id, access_key, endpoint)

Manage projects

  • The following sample code provides an example on how to create a project:

project_name = 'project'
comment = 'comment'
try:
    dh.create_project(project_name, comment)
    print("create project success!")
    print("=======================================\n\n")
except ResourceExistException:
    print("project already exist!")
    print("=======================================\n\n")
except Exception as e:
    print(traceback.format_exc())
    sys.exit(-1)

Manage topics

Tuple topic

  • The data that is written to tuple topics is in a specific format. You must specify record schemas for tuple topics. The following table describes the supported data types.

Type

Description

Value range

Bigint

An eight-byte signed integer. Do not use the minimum value -9223372036854775808 because this is a reserved value of the system.

-9223372036854775807 to 9223372036854775807.

String

A string. Only UTF-8 encoding is supported.

The size of all values in a column of the STRING type cannot exceed 1 MB.

Boolean

The Boolean type.

True and False, true and false, or 0 and 1.

Double

A double-precision floating-point number. It is eight bytes in length.

-1.0 10308 to 1.0 10308.

TimeStamp

The type of timestamp.

A timestamp that is accurate to microseconds.

  • The following sample code provides an example on how to create a tuple topic:

topic_name = "tuple_topic"
shard_count = 3
life_cycle = 7
record_schema = RecordSchema.from_lists(
    ['bigint_field', 'string_field', 'double_field', 'bool_field', 'time_field'],
    [FieldType.BIGINT, FieldType.STRING, FieldType.DOUBLE, FieldType.BOOLEAN, FieldType.TIMESTAMP])
try:
    dh.create_tuple_topic(project_name, topic_name, shard_count, life_cycle, record_schema, comment)
    print("create tuple topic success!")
    print("=======================================\n\n")
except ResourceExistException:
    print("topic already exist!")
    print("=======================================\n\n")
except Exception as e:
    print(traceback.format_exc())
    sys.exit(-1)

Blob topic

  • You can write a block of binary data as a record to blob topics. Data in blob topics is Base64-encoded for transmission.

topic_name = "blob_topic"
shard_count = 3
life_cycle = 7
try:
    dh.create_blob_topic(project_name, topic_name, shard_count, life_cycle, comment)
    print("create blob topic success!")
    print("=======================================\n\n")
except ResourceExistException:
    print("topic already exist!")
    print("=======================================\n\n")
except Exception as e:
    print(traceback.format_exc())
    sys.exit(-1)

Record publishing and subscription

List shards

  • You can call the list_shards method to list all shards in a topic.

shard_result = dh.list_shard(project_name, topic_name)
shards = shard_result.shards
print(len(shards))

The return result is a ListShardResult object, which contains a list of shards. Each element in the list indicates a shard, including the shard information such as the values of the shard_id, state, begin_hash_key, and end_hash_key parameters.

Publish records

  • You can call the put_records method to write records to a topic.

put_result = dh.put_records(project_name, topic_name, records)
print(put_result.failed_record_count)
print(put_result.failed_records)

The records parameter specifies a List object, which contains a list of records of the same type, such as the tuple or blob type. The return result is a PutRecordsResult object, which consists of failed_record_count and failed_records. failed_records indicates a list of FailedRecord objects. A FailedRecord object contains an index, an error code, and an error message.

  • The following sample code provides an example on how to write tuple records to a topic:

try:
    # Wait until all shards are in the Active state.
    dh.wait_shards_ready(project_name, topic_name)
    print("shards all ready!!!")
    print("=======================================\n\n")
    topic_result = dh.get_topic(project_name, topic_name)
    print(topic_result)
    if topic_result.record_type != RecordType.TUPLE:
        print("topic type illegal!")
        sys.exit(-1)
    print("=======================================\n\n")
    record_schema = topic_result.record_schema
    records0 = []
    record0 = TupleRecord(schema=record_schema, values=[1, 'yc1', 10.01, True, 1455869335000000])
    record0.shard_id = '0'
    record0.put_attribute('AK', '47')
    records0.append(record0)
    record1 = TupleRecord(schema=record_schema)
    record1.set_value('bigint_field', 2)
    record1.set_value('string_field', 'yc2')
    record1.set_value('double_field', None)
    record1.set_value('bool_field', False)
    record1.set_value('time_field', 1455869335000011)
    record1.hash_key = '4FFFFFFFFFFFFFFD7FFFFFFFFFFFFFFD'
    records0.append(record1)
    record2 = TupleRecord(schema=record_schema)
    record2.set_value(0, 3)
    record2.set_value(1, 'yc3')
    record2.set_value(2,  1.1)
    record2.set_value(3, False)
    record2.set_value(4, 1455869335000011)
    record2.attributes = {'key': 'value'}
    record2.partition_key = 'TestPartitionKey'
    records0.append(record2)
    put_result = dh.put_records(project_name, topic_name, records0)
    print(put_result)
    print("put tuple %d records, failed count: %d" %(len(records0), put_result.failed_record_count))
    # If at least one record fails to be written, write the record again.
    print("=======================================\n\n")
except DatahubException as e:
    print(e)
    sys.exit(-1)
  • The following sample code provides an example on how to write blob records to a topic:

try:
    records1 = []
    record3 = BlobRecord(blob_data='data')
    record3.shard_id = '0'
    record3.put_attribute('a', 'b')
    records1.append(record3)
    put_result = dh.put_records(project_name, topic_name, records1)
    print(put_result)
except DatahubException as e:
    print(e)
    sys.exit(-1)

Obtain a cursor

  • You can obtain a cursor by using the following methods: OLDEST, LATEST, and SYSTEM_TIME.

    • OLDEST: the cursor that points to the earliest valid record in the specified shard.

    • LATEST: the cursor that points to the latest record in the specified shard.

    • SYSTEM_TIME: the cursor that points to the first record whose timestamp value is greater than or equal to the specified timestamp value. Unit: milliseconds.

shard_id = '0'
time_stamp = 0
cursor_result0 = dh.get_cursor(project_name, topic_name, shard_id, CursorType.OLDEST)
cursor_result1 = dh.get_cursor(project_name, topic_name, shard_id, CursorType.LATEST)
cursor_result2 = dh.get_cursor(project_name, topic_name, shard_id, CursorType.SYSTEM_TIME, time_stamp)
cursor = cursor_result0.cursor

You can call the get_cursor method to obtain the cursor that is used to read data from the specified position.

Subscribe to records

  • To read records from the specified shard, you must specify a cursor from which records start to be read and the maximum number of records to be read. If the number of records from the specified cursor to the end of the shard is less than the maximum value, the records that are actually read are returned.

project_name = 'project'
shard_id = "0"
limit = 10
# Read the records of a blob topic.
topic_name = 'blob_topic'
get_result = dh.get_blob_records(project_name, topic_name, shard_id, cursor, limit)
# Read the records of a tuple topic.
topic_name = 'tuple_topic'
get_result = dh.get_tuple_records(project_name, topic_name, shard_id, record_schema, cursor, limit)
  • The following sample code provides an example on how to consume tuple records:

    try:
      # Wait until all shards are in the Active state.
      dh.wait_shards_ready(project_name, topic_name)
      print("shards all ready!!!")
      print("=======================================\n\n")
      topic_result = dh.get_topic(project_name, topic_name)
      print(topic_result)
      if topic_result.record_type != RecordType.TUPLE:
          print("topic type illegal!")
          sys.exit(-1)
      print("=======================================\n\n")
      shard_id = '0'
      limit = 10
      cursor_result = dh.get_cursor(project_name, topic_name, shard_id, CursorType.OLDEST)
      cursor = cursor_result.cursor
      while True:
          get_result = dh.get_tuple_records(project_name, topic_name, shard_id, record_schema, cursor, limit)
          for record in get_result.records:
              print(record)
          if 0 == get_result.record_count:
              time.sleep(1)
          cursor = get_result.next_cursor
    except DatahubException as e:
      print(e)
      sys.exit(-1)

    References

  • Documentation of DataHub SDK for Python

  • Install DataHub SDK for Python

  • GitHub address