すべてのプロダクト
Search
ドキュメントセンター

DataHub:Python 用 DataHub SDK

最終更新日:Jul 17, 2025

DataHub SDK for Python

インストール

クイックインストール

$ sudo pip install pydatahub

ソースコードを使用したインストール

$ git clone https://github.com/aliyun/aliyun-datahub-sdk-python.git
$ cd aliyun-datahub-sdk-python
$ sudo python setup.py install

インストールに関する一般的な問題

1. DataHub SDK for Python をインストールすると、「Python.h: No such file or directory」というエラーメッセージが返されます。このエラーを解決するには、以下の方法を使用して、一般的に使用されるオペレーティングシステムに DataHub SDK for Python をインストールできます。

$ sudo apt-get install python-dev   # python2.x インストールの場合
$ sudo apt-get install python3-dev  # python3.x インストールの場合
$ sudo yum install python-devel   # python2.x インストールの場合
$ sudo yum install python3-devel   # python3 インストールの場合

2. Windows オペレーティングシステムを使用している場合は、Python 用 DataHub SDK をインストールする前に、ダウンロード し、プロンプトに従って対応するバージョンの Visual C++ コンパイラをインストールしてください。Windows 10 で cprotobuf 依存関係をインストールすると、次のようなエラーが報告されます。この場合は、Visual C++ コンパイラをインストールしてください。

bulding 'cprotobuf.internal' extention 
error: [WinError2] The system cannot find the file specified

必要なバージョン情報とリンク情報が提供されているため、Python 3.6 以降を使用することをお勧めします。

3. 依存関係をインストールすると、次のようなエラーが報告されます。これは、現在の環境が必要な環境ではないことを示しています。関連するエラーを検索し、要件に基づいて必要なファイルをコピーすることをお勧めします。または、開発者コマンドプロンプトツールを使用してインストールすることもできます。

LINK : fatal error LNK1158: cannot run 'rc.exe'

4. Windows 7 に Python 用 DataHub SDK をインストールすると、次のエラーが報告されます。この場合は、ビルドツール をインストールしてください。

error: Microsoft Visual C++ 14.0 is required. Get it with "Microsoft Visual C++ Build Tools": https://visualstudio.microsoft.com/downloads/

インストールの確認

$ python -c "from datahub import DataHub"

上記のコマンドを実行できる場合、DataHub SDK for Python はインストールされています。

規約

詳細については、「用語」をご参照ください。

準備

  • DataHub にアクセスするには、Alibaba Cloud アカウントを使用する必要があります。DataHub にアクセスするには、AccessKey ID と AccessKey シークレット、および DataHub へのアクセスに使用するエンドポイントを指定する必要があります。

  • プロジェクトを作成します。

    • DataHub コンソール にログオンし、プロジェクトを作成します。

    • DataHub SDK for Python によって提供されるメソッドを呼び出して、プロジェクトを作成します。

  • DataHub を初期化します。

import sys
import traceback
from datahub import DataHub
from datahub.exceptions import ResourceExistException
from datahub.models import FieldType, RecordSchema, TupleRecord, BlobRecord, CursorType, RecordType
access_id = ***your access id***  # アクセスID
access_key = ***your access key*** # アクセスキー
endpoint = ***your datahub server endpoint*** # DataHub サーバーのエンドポイント
dh = DataHub(access_id, access_key, endpoint)

プロジェクトの管理

  • 次のサンプルコードは、プロジェクトを作成する方法の例を示しています。

project_name = 'project' # プロジェクト名
comment = 'comment' # コメント
try:
    dh.create_project(project_name, comment)
    print("create project success!") # プロジェクトの作成に成功しました!
    print("=======================================\n\n")
except ResourceExistException:
    print("project already exist!") # プロジェクトはすでに存在します!
    print("=======================================\n\n")
except Exception as e:
    print(traceback.format_exc())
    sys.exit(-1)

トピックの管理

タプルトピック

  • タプルトピックに書き込まれるデータは特定の形式です。タプルトピックのレコードスキーマを指定する必要があります。次の表は、サポートされているデータ型を示しています。

説明

値の範囲

Bigint

8 バイトの符号付き整数。最小値 -9223372036854775808 はシステムの予約値であるため、使用しないでください。

-9223372036854775807 ~ 9223372036854775807。

String

文字列。UTF-8 エンコーディングのみがサポートされています。

STRING 型の列のすべての値のサイズは 1 MB を超えることはできません。

Boolean

ブール型。

True と False、true と false、または 0 と 1。

Double

倍精度浮動小数点数。長さは 8 バイトです。

-1.0 10308 ~ 1.0 10308。

TimeStamp

タイムスタンプの型。

マイクロ秒単位のタイムスタンプ。

  • 次のサンプルコードは、タプルトピックを作成する方法の例を示しています。

topic_name = "tuple_topic" # トピック名
shard_count = 3 # シャード数
life_cycle = 7 # ライフサイクル
record_schema = RecordSchema.from_lists(
    ['bigint_field', 'string_field', 'double_field', 'bool_field', 'time_field'], # フィールド名
    [FieldType.BIGINT, FieldType.STRING, FieldType.DOUBLE, FieldType.BOOLEAN, FieldType.TIMESTAMP]) # フィールド型
try:
    dh.create_tuple_topic(project_name, topic_name, shard_count, life_cycle, record_schema, comment)
    print("create tuple topic success!") # タプルトピックの作成に成功しました!
    print("=======================================\n\n")
except ResourceExistException:
    print("topic already exist!") # トピックはすでに存在します!
    print("=======================================\n\n")
except Exception as e:
    print(traceback.format_exc())
    sys.exit(-1)

ブロッブトピック

  • バイナリデータのブロックをレコードとしてブロッブトピックに書き込むことができます。ブロッブトピックのデータは、送信のために Base64 エンコードされます。

topic_name = "blob_topic" # トピック名
shard_count = 3 # シャード数
life_cycle = 7 # ライフサイクル
try:
    dh.create_blob_topic(project_name, topic_name, shard_count, life_cycle, comment)
    print("create blob topic success!") # ブロブトピックの作成に成功しました!
    print("=======================================\n\n")
except ResourceExistException:
    print("topic already exist!") # トピックはすでに存在します!
    print("=======================================\n\n")
except Exception as e:
    print(traceback.format_exc())
    sys.exit(-1)

レコードのパブリッシュとサブスクライブ

シャードの一覧表示

  • list_shards メソッドを呼び出して、トピック内のすべてのシャードを一覧表示できます。

shard_result = dh.list_shard(project_name, topic_name)
shards = shard_result.shards
print(len(shards))

戻り値は ListShardResult オブジェクトで、シャードのリストが含まれています。リストの各要素はシャードを示し、shard_id、state、begin_hash_key、end_hash_key パラメーターの値などのシャード情報が含まれています。

レコードのパブリッシュ

  • put_records メソッドを呼び出して、トピックにレコードを書き込むことができます。

put_result = dh.put_records(project_name, topic_name, records)
print(put_result.failed_record_count)
print(put_result.failed_records)

records パラメーターは List オブジェクトを指定し、タプル型やブロッブ型など、同じ型のレコードのリストが含まれています。戻り値は PutRecordsResult オブジェクトで、failed_record_count と failed_records で構成されます。failed_records は FailedRecord オブジェクトのリストを示します。FailedRecord オブジェクトには、インデックス、エラーコード、およびエラーメッセージが含まれています。

  • 次のサンプルコードは、タプルレコードをトピックに書き込む方法の例を示しています。

try:
    # すべてのシャードが Active 状態になるまで待機します。
    dh.wait_shards_ready(project_name, topic_name)
    print("shards all ready!!!") # シャードの準備ができました!!!
    print("=======================================\n\n")
    topic_result = dh.get_topic(project_name, topic_name)
    print(topic_result)
    if topic_result.record_type != RecordType.TUPLE:
        print("topic type illegal!") # トピック型が不正です!
        sys.exit(-1)
    print("=======================================\n\n")
    record_schema = topic_result.record_schema
    records0 = []
    record0 = TupleRecord(schema=record_schema, values=[1, 'yc1', 10.01, True, 1455869335000000])
    record0.shard_id = '0'
    record0.put_attribute('AK', '47')
    records0.append(record0)
    record1 = TupleRecord(schema=record_schema)
    record1.set_value('bigint_field', 2)
    record1.set_value('string_field', 'yc2')
    record1.set_value('double_field', None)
    record1.set_value('bool_field', False)
    record1.set_value('time_field', 1455869335000011)
    record1.hash_key = '4FFFFFFFFFFFFFFD7FFFFFFFFFFFFFFD'
    records0.append(record1)
    record2 = TupleRecord(schema=record_schema)
    record2.set_value(0, 3)
    record2.set_value(1, 'yc3')
    record2.set_value(2,  1.1)
    record2.set_value(3, False)
    record2.set_value(4, 1455869335000011)
    record2.attributes = {'key': 'value'}
    record2.partition_key = 'TestPartitionKey'
    records0.append(record2)
    put_result = dh.put_records(project_name, topic_name, records0)
    print(put_result)
    print("put tuple %d records, failed count: %d" %(len(records0), put_result.failed_record_count)) # タプル %d レコードを書き込み、失敗したカウント: %d
    # 少なくとも 1 つのレコードの書き込みに失敗した場合は、レコードをもう一度書き込みます。
    print("=======================================\n\n")
except DatahubException as e:
    print(e)
    sys.exit(-1)
  • 次のサンプルコードは、ブロッブレコードをトピックに書き込む方法の例を示しています。

try:
    records1 = []
    record3 = BlobRecord(blob_data='data') # データ
    record3.shard_id = '0'
    record3.put_attribute('a', 'b')
    records1.append(record3)
    put_result = dh.put_records(project_name, topic_name, records1)
    print(put_result)
except DatahubException as e:
    print(e)
    sys.exit(-1)

カーソルの取得

  • OLDEST、LATEST、SYSTEM_TIME の方法を使用してカーソルを取得できます。

    • OLDEST:指定されたシャードの最も古い有効なレコードを指すカーソル。

    • LATEST:指定されたシャードの最新のレコードを指すカーソル。

    • SYSTEM_TIME:タイムスタンプ値が指定されたタイムスタンプ値以上である最初のレコードを指すカーソル。単位:ミリ秒。

shard_id = '0' # シャードID
time_stamp = 0 # タイムスタンプ
cursor_result0 = dh.get_cursor(project_name, topic_name, shard_id, CursorType.OLDEST)
cursor_result1 = dh.get_cursor(project_name, topic_name, shard_id, CursorType.LATEST)
cursor_result2 = dh.get_cursor(project_name, topic_name, shard_id, CursorType.SYSTEM_TIME, time_stamp)
cursor = cursor_result0.cursor

get_cursor メソッドを呼び出して、指定された位置からデータを読み取るために使用されるカーソルを取得できます。

レコードのサブスクライブ

  • 指定されたシャードからレコードを読み取るには、レコードの読み取りを開始するカーソルと、読み取るレコードの最大数を指定する必要があります。指定されたカーソルからシャードの末尾までのレコード数が最大値よりも少ない場合、実際に読み取られたレコードが返されます。

project_name = 'project' # プロジェクト名
shard_id = "0" # シャードID
limit = 10 # 制限
# ブロブトピックのレコードを読み取ります。
topic_name = 'blob_topic' # トピック名
get_result = dh.get_blob_records(project_name, topic_name, shard_id, cursor, limit)
# タプルトピックのレコードを読み取ります。
topic_name = 'tuple_topic' # トピック名
get_result = dh.get_tuple_records(project_name, topic_name, shard_id, record_schema, cursor, limit)
  • 次のサンプルコードは、タプルレコードを消費する方法の例を示しています。

    try:
      # すべてのシャードが Active 状態になるまで待機します。
      dh.wait_shards_ready(project_name, topic_name)
      print("shards all ready!!!") # シャードの準備ができました!!!
      print("=======================================\n\n")
      topic_result = dh.get_topic(project_name, topic_name)
      print(topic_result)
      if topic_result.record_type != RecordType.TUPLE:
          print("topic type illegal!") # トピック型が不正です!
          sys.exit(-1)
      print("=======================================\n\n")
      shard_id = '0' # シャードID
      limit = 10 # 制限
      cursor_result = dh.get_cursor(project_name, topic_name, shard_id, CursorType.OLDEST)
      cursor = cursor_result.cursor
      while True:
          get_result = dh.get_tuple_records(project_name, topic_name, shard_id, record_schema, cursor, limit)
          for record in get_result.records:
              print(record)
          if 0 == get_result.record_count:
              time.sleep(1)
          cursor = get_result.next_cursor
    except DatahubException as e:
      print(e)
      sys.exit(-1)

    参照

  • DataHub SDK for Python のドキュメント

  • DataHub SDK for Python をインストールする

  • GitHub アドレス