PythonSDK
安裝
快速安裝
$ sudo pip install pydatahub源碼安裝
$ git clone https://github.com/aliyun/aliyun-datahub-sdk-python.git
$ cd aliyun-datahub-sdk-python
$ sudo python setup.py install常見問題
1.如果安裝過程中出現錯誤資訊’Python.h: No such file or directory’,常用的作業系統安裝方式如下
$ sudo apt-get install python-dev # for python2.x installs
$ sudo apt-get install python3-dev # for python3.x installs
$ sudo yum install python-devel # for python2.x installs
$ sudo yum install python3-devel # for python3 installs2.如果使用windows作業系統,根據提示資訊可到 此處 下載安裝對應版本的 Visual C++ SDK。Windows 10 安裝cprotobuf依賴時如果報類似如下錯誤,也表示需要安裝Visual C++ 產生工具:
bulding 'cprotobuf.internal' extention
error: [WinError2] The system cannot find the file specified推薦使用python3.6或以上,會明確提示所需版本及連結資訊。
3.Windows 下如果安裝依賴時報類似如下錯誤,是環境問題所致,請搜尋相關錯誤,根據具體情況,拷貝所需檔案,或是直接使用 developer command prompt 工具進行安裝:
LINK : fatal error LNK1158: cannot run 'rc.exe'4.Windows 7 如果提示如下錯誤,可安裝此 build tools:
error: Microsoft Visual C++ 14.0 is required. Get it with "Microsoft Visual C++ Build Tools": https://visualstudio.microsoft.com/downloads/安裝驗證
$ python -c "from datahub import DataHub"如果上述命令執行成功,恭喜你安裝DataHub Python版本SDK成功!
基本概念
詳見: 名詞解釋
準備工作
訪問DataHub服務需要使用阿里雲認證帳號,需要提供阿里雲accessId及accessKey。 同時需要提供訪問的服務地址。
建立Project
登入DataHub WebConsole頁面,建立Project
或使用SDK介面進行建立
初始化DataHub
import sys
import traceback
from datahub import DataHub
from datahub.exceptions import ResourceExistException
from datahub.models import FieldType, RecordSchema, TupleRecord, BlobRecord, CursorType, RecordType
access_id = ***your access id***
access_key = ***your access key***
endpoint = ***your datahub server endpoint***
dh = DataHub(access_id, access_key, endpoint)Project操作
建立樣本
project_name = 'project'
comment = 'comment'
try:
dh.create_project(project_name, comment)
print("create project success!")
print("=======================================\n\n")
except ResourceExistException:
print("project already exist!")
print("=======================================\n\n")
except Exception as e:
print(traceback.format_exc())
sys.exit(-1)Topic操作
Tuple Topic
Tuple類型Topic寫入的資料是有格式的,需要指定Record Schema,目前支援以下幾種資料類型:
類型 | 含義 | 範圍 |
Bigint | 8位元組有符號整型。請不要使用整型的最小值 (-9223372036854775808),這是系統保留值。 | -9223372036854775807 ~ 9223372036854775807 |
String | 字串,只支援UTF-8編碼。 | 單個String列最長允許1MB。 |
Boolean | 布爾型。 | 可以表示為True/False,true/false, 0/1 |
Double | 8位元組雙精確度浮點數。 | -1.0 10308 ~ 1.0 10308 |
TimeStamp | 時間戳記類型 | 表示到微秒的時間戳記類型 |
建立樣本
topic_name = "tuple_topic"
shard_count = 3
life_cycle = 7
record_schema = RecordSchema.from_lists(
['bigint_field', 'string_field', 'double_field', 'bool_field', 'time_field'],
[FieldType.BIGINT, FieldType.STRING, FieldType.DOUBLE, FieldType.BOOLEAN, FieldType.TIMESTAMP])
try:
dh.create_tuple_topic(project_name, topic_name, shard_count, life_cycle, record_schema, comment)
print("create tuple topic success!")
print("=======================================\n\n")
except ResourceExistException:
print("topic already exist!")
print("=======================================\n\n")
except Exception as e:
print(traceback.format_exc())
sys.exit(-1)Blob Topic
Blob類型Topic支援寫入一塊位元據作為一個Record,資料將會以BASE64編碼傳輸。
topic_name = "blob_topic"
shard_count = 3
life_cycle = 7
try:
dh.create_blob_topic(project_name, topic_name, shard_count, life_cycle, comment)
print("create blob topic success!")
print("=======================================\n\n")
except ResourceExistException:
print("topic already exist!")
print("=======================================\n\n")
except Exception as e:
print(traceback.format_exc())
sys.exit(-1)資料發布/訂閱
擷取Shard列表
list_shards介面擷取topic下的所有shard
shard_result = dh.list_shard(project_name, topic_name)
shards = shard_result.shards
print(len(shards))返回結果是一個ListShardResult對象,包含一個Shard對象的list,list中的每個元素是一個shard,可以擷取shard_id,state狀態,begin_hash_key,end_hash_key等資訊
發布資料
put_records介面向一個topic發布資料
put_result = dh.put_records(project_name, topic_name, records)
print(put_result.failed_record_count)
print(put_result.failed_records)其中傳入參數records是一個List對象,每個元素為一個record,但是必須為相同類型的record,即Tuple類型或者Blob類型,返回結果為PutRecordsResult對象,包含failed_record_count和failed_records成員,failed_records是一個FailedRecord對象的list,FailedRecord對象包含成員index,error_code和error_message
寫入Tuple類型Record樣本
try:
# block等待所有shard狀態ready
dh.wait_shards_ready(project_name, topic_name)
print("shards all ready!!!")
print("=======================================\n\n")
topic_result = dh.get_topic(project_name, topic_name)
print(topic_result)
if topic_result.record_type != RecordType.TUPLE:
print("topic type illegal!")
sys.exit(-1)
print("=======================================\n\n")
record_schema = topic_result.record_schema
records0 = []
record0 = TupleRecord(schema=record_schema, values=[1, 'yc1', 10.01, True, 1455869335000000])
record0.shard_id = '0'
record0.put_attribute('AK', '47')
records0.append(record0)
record1 = TupleRecord(schema=record_schema)
record1.set_value('bigint_field', 2)
record1.set_value('string_field', 'yc2')
record1.set_value('double_field', None)
record1.set_value('bool_field', False)
record1.set_value('time_field', 1455869335000011)
record1.hash_key = '4FFFFFFFFFFFFFFD7FFFFFFFFFFFFFFD'
records0.append(record1)
record2 = TupleRecord(schema=record_schema)
record2.set_value(0, 3)
record2.set_value(1, 'yc3')
record2.set_value(2, 1.1)
record2.set_value(3, False)
record2.set_value(4, 1455869335000011)
record2.attributes = {'key': 'value'}
record2.partition_key = 'TestPartitionKey'
records0.append(record2)
put_result = dh.put_records(project_name, topic_name, records0)
print(put_result)
print("put tuple %d records, failed count: %d" %(len(records0), put_result.failed_record_count))
# failed_record_count如果大於0最好對failed record再進行重試
print("=======================================\n\n")
except DatahubException as e:
print(e)
sys.exit(-1)寫入BLOB類型Record樣本
try:
records1 = []
record3 = BlobRecord(blob_data='data')
record3.shard_id = '0'
record3.put_attribute('a', 'b')
records1.append(record3)
put_result = dh.put_records(project_name, topic_name, records1)
print(put_result)
except DatahubException as e:
print(e)
sys.exit(-1)擷取cursor
擷取Cursor,可以通過三種方式擷取:OLDEST, LATEST, SYSTEM_TIME
OLDEST: 表示擷取的cursor指向當前有效資料中時間最久遠的record
LATEST: 表示擷取的cursor指向當前最新的record
SYSTEM_TIME: 表示擷取的cursor指向大於等於該時間(單位毫秒)的第一條record
shard_id = '0'
time_stamp = 0
cursor_result0 = dh.get_cursor(project_name, topic_name, shard_id, CursorType.OLDEST)
cursor_result1 = dh.get_cursor(project_name, topic_name, shard_id, CursorType.LATEST)
cursor_result2 = dh.get_cursor(project_name, topic_name, shard_id, CursorType.SYSTEM_TIME, time_stamp)
cursor = cursor_result0.cursor通過get_cursor介面擷取用於讀取指定位置之後資料的cursor
訂閱資料
從指定shard讀取資料,需要指定從哪個Cursor開始讀,並指定讀取的上限資料條數,如果從Cursor到shard結尾少於Limit條數的資料,則返回實際的條數的資料。
project_name = 'project'
shard_id = "0"
limit = 10
# 讀取blob topic的record
topic_name = 'blob_topic'
get_result = dh.get_blob_records(project_name, topic_name, shard_id, cursor, limit)
# 讀取tuple topic的record
topic_name = 'tuple_topic'
get_result = dh.get_tuple_records(project_name, topic_name, shard_id, record_schema, cursor, limit)消費Tuple類型Record樣本
try: # block等待所有shard狀態ready dh.wait_shards_ready(project_name, topic_name) print("shards all ready!!!") print("=======================================\n\n") topic_result = dh.get_topic(project_name, topic_name) print(topic_result) if topic_result.record_type != RecordType.TUPLE: print("topic type illegal!") sys.exit(-1) print("=======================================\n\n") shard_id = '0' limit = 10 cursor_result = dh.get_cursor(project_name, topic_name, shard_id, CursorType.OLDEST) cursor = cursor_result.cursor while True: get_result = dh.get_tuple_records(project_name, topic_name, shard_id, record_schema, cursor, limit) for record in get_result.records: print(record) if 0 == get_result.record_count: time.sleep(1) cursor = get_result.next_cursor except DatahubException as e: print(e) sys.exit(-1)結尾