PyODPS是阿里雲開發的Python SDK,方便開發人員通過代碼使用MaxCompute進行巨量資料處理和分析。本文將介紹如何在本地環境上使用PyODPS進行表操作、資料載入和運行SQL查詢。
前提條件
主要流程
開啟Python編輯器,並建立一個Python檔案。
說明若本地未安裝Python編輯器,可直接建立一個尾碼名為
.py的檔案。開發PyODPS任務代碼。
建立完成後,參考下文內容學習簡單樣本的操作,瞭解PyODPS的主要能力。
更多PyODPS的使用指導請參見基本操作概述、DataFrame(不推薦使用)。也可以參考範例文件:使用PyODPS節點進行結巴中文分詞,實現端到端的簡單操作。
儲存Python檔案,並在本地運行。
初始化ODPS入口
確保已設定環境變數ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET。
(不推薦)若未設定環境變數,可以使用顯式指定方式替代,單擊連結擷取access_id及secret_access_key。
手動定義ODPS入口,程式碼範例如下。
import os
from odps import ODPS
o = ODPS(
# (推薦)確保已設定環境變數。
# 確保ALIBABA_CLOUD_ACCESS_KEY_ID環境變數設定為使用者 Access Key ID。
access_id=os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
# 確保ALIBABA_CLOUD_ACCESS_KEY_SECRET環境變數設定為使用者Access Key Secret。
secret_access_key=os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
# (不推薦)若未設定環境變數,請使用如下格式替代。
# access_id='your-aliyun-access-key-id',
# secret_access_key= 'your-aliyun-access-key-secret',
project='your-default-project',
endpoint='your-end-point',
)參數 | 填寫說明 |
your-default-project | 填寫MaxCompute專案名稱,登入MaxCompute控制台擷取專案列表。 |
your-end-point | 填寫所在地區和所在網路環境下的Endpoint。 例如杭州地區的公網Endpoint,請填寫 重要 請確保選擇正確的網路類型,否則將無法正常串連。 |
完成上述配置後,可以在本地環境中使用PyODPS,例如對於ODPS對象的基本操作list、get、exist、create、delete等。更多PyODPS的使用指導請參見基本操作概述、DataFrame(不推薦使用)。
執行SQL
在PyODPS節點中執行SQL命令。
在PyODPS節點中可以使用傳統模式或查詢加速(MCQA)執行SQL命令,當前主要支援運行DDL、DML類型的SQL命令。與傳統模式相比,加速查詢模式(MCQA)會將作業的運行結果寫入臨時緩衝中。當後續執行相同的查詢作業時,MaxCompute會優先返回緩衝中的結果,加快執行速度,相關計費規則請參見計算費用(隨用隨付),可根據需要選擇相應的模式執行。
說明可以執行的SQL語句並非都可以通過入口對象的
execute_sql()和run_sql()等方法執行。在調用非DDL或非DML語句時,請使用其他方法。例如,調用建立表語句時,請使用
create_table方法;調用API命令時,請使用run_xflow或execute_xflow方法。使用傳統模式執行SQL命令
可以使用execute_sql()/run_sql()來執行SQL命令,樣本如下:
使用
create_table方法建立一個新的表。o.create_table('my_t', 'num bigint, id string', if_not_exists=True)使用
execute_sql方法執行SQL查詢。result = o.execute_sql('SELECT * FROM pyodps_iris LIMIT 3') with result.open_reader() as reader: for record in reader: print(record)在PyODPS節點中讀取SQL運行結果。
可以使用open_reader()讀取SQL命令運行結果。具體操作請參見讀取SQL執行結果。
更多PyODPS節點的SQL相關操作詳情請參見SQL。
DataFrame
PyODPS提供了DataFrame API,支援使用DataFrame進行資料處理,更多DataFrame的操作樣本請參見DataFrame。
執行
列印詳細資料
預設情況下,本地環境的PyODPS節點運行過程不會列印Logview等詳細過程。您可以手動設定
options.verbose選項,開啟列印Logview等詳細過程。from odps import options options.verbose = True
設定運行參數hints
運行任務時如果需要設定運行時參數,可以通過設定hints參數來實現,參數類型是dict。
o.execute_sql('SELECT * FROM pyodps_iris', hints={'odps.sql.mapper.split.size': 16})也可以對全域設定sql.setting,設定後後續每次運行時都會添加相關的運行時參數。
from odps import options
options.sql.settings = {'odps.sql.mapper.split.size': 16}
# 會根據全域配置添加hints
o.execute_sql('SELECT * FROM pyodps_iris')
完整樣本
本地建立
test-pyodps-local.py檔案。寫入範例程式碼。
import os from odps import ODPS o = ODPS( # (推薦)確保已設定環境變數。 # 確保ALIBABA_CLOUD_ACCESS_KEY_ID環境變數設定為使用者 Access Key ID。 access_id=os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'), # 確保ALIBABA_CLOUD_ACCESS_KEY_SECRET環境變數設定為使用者Access Key Secret。 secret_access_key=os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'), project='your-default-project', endpoint='your-end-point', ) # 以直接指定欄位名以及欄位類型的方式建立非分區表my_new_table。 table = o.create_table('my_new_table', 'num bigint, id string', if_not_exists=True) # 向非分區表my_new_table中插入資料。 records = [[111, 'aaa'], [222, 'bbb'], [333, 'ccc'], [444, '中文']] o.write_table(table, records) # 讀取非分區表my_new_table中的資料。 for record in o.read_table(table): print(record[0], record[1]) # 以運行SQL的方式讀取表中的資料。 result = o.execute_sql('SELECT * FROM pyodps_iris LIMIT 3;', hints={'odps.sql.allow.fullscan': 'true'}) # 刪除表以清除資源。 table.drop() print('使用open_reader方式讀取pyodps_iris表資料:') # 讀取SQL執行結果。 with result.open_reader() as reader: for record in reader: print(record[0], record[1])運行Python代碼。
python test-pyodps-local.py運行結果:
111 aaa 222 bbb 333 ccc 444 中文 使用open_reader方式讀取pyodps_iris表資料: 4.9 3.0 4.7 3.2 4.6 3.1