Tunnel是MaxCompute的資料通道,支援向MaxCompute中上傳和下載資料。Python版Tunnel SDK是PyODPS(MaxCompute 官方 Python SDK)的一部分,本文將提供其基礎操作樣本。
注意事項
下文介紹了使用Python SDK上傳和下載資料的基礎樣本,更多應用情境請參見Python SDK文檔。
如果您安裝了Cython,在安裝PyODPS時會編譯C代碼,在上傳和下載情境下,可以提升Tunnel通道的資料轉送效率。
上傳樣本
import os
from odps import ODPS
from odps.tunnel import TableTunnel
# 確保 ALIBABA_CLOUD_ACCESS_KEY_ID 環境變數設定為使用者 Access Key ID,
# ALIBABA_CLOUD_ACCESS_KEY_SECRET 環境變數設定為使用者 Access Key Secret,
# 不建議直接使用 Access Key ID / Access Key Secret 字串
o = ODPS(
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
project='your-default-project',
endpoint='your-end-point',
)
table = o.get_table('my_table')
tunnel = TableTunnel(o)
upload_session = tunnel.create_upload_session(table.name, partition_spec='pt=test')
with upload_session.open_record_writer(0) as writer:
record = table.new_record()
record[0] = 'test1'
record[1] = 'id1'
writer.write(record)
record = table.new_record(['test2', 'id2'])
writer.write(record)
# 需要在 with 代碼塊外 commit,否則資料未寫入即 commit,會導致報錯
upload_session.commit([0])下載樣本
from odps.tunnel import TableTunnel
tunnel = TableTunnel(odps)
download_session = tunnel.create_download_session('my_table', partition_spec='pt=test')
with download_session.open_record_reader(0, download_session.count) as reader:
for record in reader:
# 處理每條記錄
with download_session.open_arrow_reader(0, download_session.count) as reader:
for batch in reader:
# 處理每個 Arrow RecordBatch