MaxCompute支援第三方引擎(如Spark on EMR、StarRocks、Presto、PAI和Hologres)通過SDK調用Storage API直接存取MaxCompute資料,本文為您介紹使用Python SDK訪問MaxCompute的程式碼範例。
MaxCompute提供了開放儲存相關介面,詳情請參見aliyun-odps-python-sdk。
前提條件
本文範例程式碼是基於PyODPS,若您是在本地環境執行以下代碼,請確保已安裝PyODPS。具體操作,請參見安裝PyODPS。
同時PyODPS還支援在DataWorks、PAI Notebooks中使用,其中:
DataWorks的PyODPS節點已安裝好了PyODPS,您可以直接在DataWorks的PyODPS節點上開發PyODPS任務並周期性運行,操作指導請參見通過DataWorks使用PyODPS。
PAI的Python環境也可安裝運行PyODPS,其中PAI的內建鏡像均已安裝好了PyODPS可直接使用,如PAI-Designer的自訂Python組件,在PAI Notebooks中使用PyODPS的方式與通用的使用方式基本一致,可參考基本操作概述、DataFrame(不推薦使用)。
PyODPS是MaxCompute的Python版本的SDK,關於PyODPS詳情,請參見PyODPS。
使用樣本
使用Python SDK訪問MaxCompute的程式碼範例,詳情請參見Python SDK Examples。
配置串連MaxCompute服務的環境。
import os from odps import ODPS from odps.apis.storage_api import * # 確保 ALIBABA_CLOUD_ACCESS_KEY_ID 環境變數設定為使用者 Access Key ID, # ALIBABA_CLOUD_ACCESS_KEY_SECRET 環境變數設定為使用者 Access Key Secret, # 不建議直接使用 Access Key ID / Access Key Secret 字串 # endpoint為MaxCompute服務的串連地址,當前僅支援使用阿里雲VPC網路 o = ODPS( os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'), os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'), project='your-default-project', endpoint='your-end-point' ) #MaxCompute表名稱 table = "<table to access>" # 訪問MaxCompute使用的Quota名稱 quota_name = "<quota name>" # 串連並訪問阿里雲MaxCompute服務並建立基於Arrow格式的Storage API對象 def get_arrow_client(): odps_table = o.get_table(table) client = StorageApiArrowClient(odps=o, table=odps_table, quota_name=quota_name) return client說明擷取獨享Data Transmission Service資源群組(訂用帳戶)資源的Quota名稱的方式分別如下:
獨享Data Transmission Service資源群組:登入MaxCompute控制台,左上方切換地區後,在左側導覽列選擇工作區>配額(Quota)管理,查看可使用的Quota列表。具體操作,請參見計算資源-Quota管理。
開放儲存:登入MaxCompute控制台,在左側導覽列選擇租戶管理>租戶屬性,開啟開放儲存。
讀表操作。
建立資料讀取會話,讀取MaxCompute資料。
import logging import sys from odps.apis.storage_api import * from util import * logger = logging.getLogger(__name__) # 定義函數create_read_session,mode參數用於指定掃描資料時所使用的分區策略,若mode為size,按資料大小進行分區,若為row,按行數進行分區 def create_read_session(mode): client = get_arrow_client() req = TableBatchScanRequest(required_partitions=['pt=test_write_1']) if mode == "size": req.split_options = SplitOptions.get_default_options(SplitOptions.SplitMode.SIZE) elif mode == "row": req.split_options = SplitOptions.get_default_options(SplitOptions.SplitMode.ROW_OFFSET) resp = client.create_read_session(req) if resp.status != Status.OK: logger.info("Create read session failed") return logger.info("Read session id: " + resp.session_id) if __name__ == '__main__': logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s', level=logging.INFO) if len(sys.argv) != 2: raise ValueError("Please provide split mode: size|row") mode = sys.argv[1] if mode != "row" and mode != "size": raise ValueError("Please provide split mode: size|row") create_read_session(mode)建立監控和檢查資料讀取狀態的會話。
import logging import sys import time from odps.apis.storage_api import * from util import * logger = logging.getLogger(__name__) # 確保在執行資料讀取操作前,確認read session它已經成功建立並處於準備就緒的狀態 def check_session_status(session_id): client = get_arrow_client() req = SessionRequest(session_id=session_id) resp = client.get_read_session(req) if resp.status != Status.OK: logger.info("Get read session failed") return # session建立過程可能時間較長,需要等待session status為NORMAL才可以讀資料 if resp.session_status == SessionStatus.NORMAL: logger.info("Read session id: " + resp.session_id) else: logger.info("Session status is not expected") if __name__ == '__main__': logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s', level=logging.INFO) if len(sys.argv) != 2: raise ValueError("Please provide session id") session_id = sys.argv[1] check_session_status(session_id)讀取MaxCompute資料。
# 通過指定的session_id從MaxCompute中讀取資料行,並統計總共讀取的資料行數 import logging import sys from odps.apis.storage_api import * from util import * logger = logging.getLogger(__name__) def read_rows(session_id): client = get_arrow_client() req = SessionRequest(session_id=session_id) resp = client.get_read_session(req) if resp.status != Status.OK and resp.status != Status.WAIT: logger.info("Get read session failed") return req = ReadRowsRequest(session_id=session_id) if resp.split_count == -1: req.row_index = 0 req.row_count = resp.record_count else: req.split_index = 0 reader = client.read_rows_arrow(req) total_line = 0 while True: record_batch = reader.read() if record_batch is None: break total_line += record_batch.num_rows if reader.get_status() != Status.OK: logger.info("Read rows failed") return logger.info("Total line is:" + str(total_line)) if __name__ == '__main__': logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s', level=logging.INFO) if len(sys.argv) != 2: raise ValueError("Please provide session id") session_id = sys.argv[1] read_rows(session_id)
相關文檔
關於MaxCompute開放儲存詳情,請參見開放儲存概述。