全部产品
Search
文档中心

DataHub:Python SDK

更新时间:Dec 12, 2022

PythonSDK

安装

快速安装

$ sudo pip install pydatahub

源码安装

$ git clone https://github.com/aliyun/aliyun-datahub-sdk-python.git
$ cd aliyun-datahub-sdk-python
$ sudo python setup.py install

常见问题

1.如果安装过程中出现错误信息’Python.h: No such file or directory’,常用的操作系统安装方式如下

$ sudo apt-get install python-dev   # for python2.x installs
$ sudo apt-get install python3-dev  # for python3.x installs
$ sudo yum install python-devel   # for python2.x installs
$ sudo yum install python3-devel   # for python3 installs

2.如果使用windows操作系统,根据提示信息可到 此处 下载安装对应版本的 Visual C++ SDK。Windows 10 安装cprotobuf依赖时如果报类似如下错误,也表示需要安装Visual C++ 生成工具:

bulding 'cprotobuf.internal' extention 
error: [WinError2] The system cannot find the file specified

推荐使用python3.6或以上,会明确提示所需版本及链接信息。

3.Windows 下如果安装依赖时报类似如下错误,是环境问题所致,请搜索相关错误,根据具体情况,拷贝所需文件,或是直接使用 developer command prompt 工具进行安装:

LINK : fatal error LNK1158: cannot run 'rc.exe'

4.Windows 7 如果提示如下错误,可安装此 build tools

error: Microsoft Visual C++ 14.0 is required. Get it with "Microsoft Visual C++ Build Tools": https://visualstudio.microsoft.com/downloads/

安装验证

$ python -c "from datahub import DataHub"

如果上述命令执行成功,恭喜你安装DataHub Python版本SDK成功!

基本概念

详见: 名词解释

准备工作

  • 访问DataHub服务需要使用阿里云认证账号,需要提供阿里云accessId及accessKey。 同时需要提供访问的服务地址。

  • 创建Project

  • 初始化DataHub

import sys
import traceback
from datahub import DataHub
from datahub.exceptions import ResourceExistException
from datahub.models import FieldType, RecordSchema, TupleRecord, BlobRecord, CursorType, RecordType
access_id = ***your access id***
access_key = ***your access key***
endpoint = ***your datahub server endpoint***
dh = DataHub(access_id, access_key, endpoint)

Project操作

  • 创建示例

project_name = 'project'
comment = 'comment'
try:
    dh.create_project(project_name, comment)
    print("create project success!")
    print("=======================================\n\n")
except ResourceExistException:
    print("project already exist!")
    print("=======================================\n\n")
except Exception as e:
    print(traceback.format_exc())
    sys.exit(-1)

Topic操作

Tuple Topic

  • Tuple类型Topic写入的数据是有格式的,需要指定Record Schema,目前支持以下几种数据类型:

类型

含义

值域

Bigint

8字节有符号整型。请不要使用整型的最小值 (-9223372036854775808),这是系统保留值。

-9223372036854775807 ~ 9223372036854775807

String

字符串,只支持UTF-8编码。

单个String列最长允许1MB。

Boolean

布尔型。

可以表示为True/False,true/false, 0/1

Double

8字节双精度浮点数。

-1.0 10308 ~ 1.0 10308

TimeStamp

时间戳类型

表示到微秒的时间戳类型

  • 创建示例

topic_name = "tuple_topic"
shard_count = 3
life_cycle = 7
record_schema = RecordSchema.from_lists(
    ['bigint_field', 'string_field', 'double_field', 'bool_field', 'time_field'],
    [FieldType.BIGINT, FieldType.STRING, FieldType.DOUBLE, FieldType.BOOLEAN, FieldType.TIMESTAMP])
try:
    dh.create_tuple_topic(project_name, topic_name, shard_count, life_cycle, record_schema, comment)
    print("create tuple topic success!")
    print("=======================================\n\n")
except ResourceExistException:
    print("topic already exist!")
    print("=======================================\n\n")
except Exception as e:
    print(traceback.format_exc())
    sys.exit(-1)

Blob Topic

  • Blob类型Topic支持写入一块二进制数据作为一个Record,数据将会以BASE64编码传输。

topic_name = "blob_topic"
shard_count = 3
life_cycle = 7
try:
    dh.create_blob_topic(project_name, topic_name, shard_count, life_cycle, comment)
    print("create blob topic success!")
    print("=======================================\n\n")
except ResourceExistException:
    print("topic already exist!")
    print("=======================================\n\n")
except Exception as e:
    print(traceback.format_exc())
    sys.exit(-1)

数据发布/订阅

获取Shard列表

  • list_shards接口获取topic下的所有shard

shard_result = dh.list_shard(project_name, topic_name)
shards = shard_result.shards
print(len(shards))

返回结果是一个ListShardResult对象,包含一个Shard对象的list,list中的每个元素是一个shard,可以获取shard_id,state状态,begin_hash_key,end_hash_key等信息

发布数据

  • put_records接口向一个topic发布数据

put_result = dh.put_records(project_name, topic_name, records)
print(put_result.failed_record_count)
print(put_result.failed_records)

其中传入参数records是一个List对象,每个元素为一个record,但是必须为相同类型的record,即Tuple类型或者Blob类型,返回结果为PutRecordsResult对象,包含failed_record_count和failed_records成员,failed_records是一个FailedRecord对象的list,FailedRecord对象包含成员index,error_code和error_message

  • 写入Tuple类型Record示例

try:
    # block等待所有shard状态ready
    dh.wait_shards_ready(project_name, topic_name)
    print("shards all ready!!!")
    print("=======================================\n\n")
    topic_result = dh.get_topic(project_name, topic_name)
    print(topic_result)
    if topic_result.record_type != RecordType.TUPLE:
        print("topic type illegal!")
        sys.exit(-1)
    print("=======================================\n\n")
    record_schema = topic_result.record_schema
    records0 = []
    record0 = TupleRecord(schema=record_schema, values=[1, 'yc1', 10.01, True, 1455869335000000])
    record0.shard_id = '0'
    record0.put_attribute('AK', '47')
    records0.append(record0)
    record1 = TupleRecord(schema=record_schema)
    record1.set_value('bigint_field', 2)
    record1.set_value('string_field', 'yc2')
    record1.set_value('double_field', None)
    record1.set_value('bool_field', False)
    record1.set_value('time_field', 1455869335000011)
    record1.hash_key = '4FFFFFFFFFFFFFFD7FFFFFFFFFFFFFFD'
    records0.append(record1)
    record2 = TupleRecord(schema=record_schema)
    record2.set_value(0, 3)
    record2.set_value(1, 'yc3')
    record2.set_value(2,  1.1)
    record2.set_value(3, False)
    record2.set_value(4, 1455869335000011)
    record2.attributes = {'key': 'value'}
    record2.partition_key = 'TestPartitionKey'
    records0.append(record2)
    put_result = dh.put_records(project_name, topic_name, records0)
    print(put_result)
    print("put tuple %d records, failed count: %d" %(len(records0), put_result.failed_record_count))
    # failed_record_count如果大于0最好对failed record再进行重试
    print("=======================================\n\n")
except DatahubException as e:
    print(e)
    sys.exit(-1)
  • 写入BLOB类型Record示例

try:
    records1 = []
    record3 = BlobRecord(blob_data='data')
    record3.shard_id = '0'
    record3.put_attribute('a', 'b')
    records1.append(record3)
    put_result = dh.put_records(project_name, topic_name, records1)
    print(put_result)
except DatahubException as e:
    print(e)
    sys.exit(-1)

获取cursor

  • 获取Cursor,可以通过三种方式获取:OLDEST, LATEST, SYSTEM_TIME

    • OLDEST: 表示获取的cursor指向当前有效数据中时间最久远的record

    • LATEST: 表示获取的cursor指向当前最新的record

    • SYSTEM_TIME: 表示获取的cursor指向大于等于该时间(单位毫秒)的第一条record

shard_id = '0'
time_stamp = 0
cursor_result0 = dh.get_cursor(project_name, topic_name, shard_id, CursorType.OLDEST)
cursor_result1 = dh.get_cursor(project_name, topic_name, shard_id, CursorType.LATEST)
cursor_result2 = dh.get_cursor(project_name, topic_name, shard_id, CursorType.SYSTEM_TIME, time_stamp)
cursor = cursor_result0.cursor

通过get_cursor接口获取用于读取指定位置之后数据的cursor

订阅数据

  • 从指定shard读取数据,需要指定从哪个Cursor开始读,并指定读取的上限数据条数,如果从Cursor到shard结尾少于Limit条数的数据,则返回实际的条数的数据。

project_name = 'project'
shard_id = "0"
limit = 10
# 读取blob topic的record
topic_name = 'blob_topic'
get_result = dh.get_blob_records(project_name, topic_name, shard_id, cursor, limit)
# 读取tuple topic的record
topic_name = 'tuple_topic'
get_result = dh.get_tuple_records(project_name, topic_name, shard_id, record_schema, cursor, limit)
  • 消费Tuple类型Record示例

    try:
      # block等待所有shard状态ready
      dh.wait_shards_ready(project_name, topic_name)
      print("shards all ready!!!")
      print("=======================================\n\n")
      topic_result = dh.get_topic(project_name, topic_name)
      print(topic_result)
      if topic_result.record_type != RecordType.TUPLE:
          print("topic type illegal!")
          sys.exit(-1)
      print("=======================================\n\n")
      shard_id = '0'
      limit = 10
      cursor_result = dh.get_cursor(project_name, topic_name, shard_id, CursorType.OLDEST)
      cursor = cursor_result.cursor
      while True:
          get_result = dh.get_tuple_records(project_name, topic_name, shard_id, record_schema, cursor, limit)
          for record in get_result.records:
              print(record)
          if 0 == get_result.record_count:
              time.sleep(1)
          cursor = get_result.next_cursor
    except DatahubException as e:
      print(e)
      sys.exit(-1)

    结尾

  • Python API Doc

  • Python Package Index

  • GitHub地址