全部產品
Search
文件中心

MaxCompute:在本地環境使用PyODPS

更新時間:Dec 03, 2025

PyODPS是阿里雲開發的Python SDK,方便開發人員通過代碼使用MaxCompute進行巨量資料處理和分析。本文將介紹如何在本地環境上使用PyODPS進行表操作、資料載入和運行SQL查詢。

前提條件

  • 本地已安裝PyODPS,並設定環境變數

  • 已準備資料集pyodps_iris。您可參考使用樣本下載資料集、建立pyodps_iris表並寫入資料,文檔中的操作樣本即以此表作為樣本資料,示範基本操作。

主要流程

  1. 開啟Python編輯器,並建立一個Python檔案。

    說明

    若本地未安裝Python編輯器,可直接建立一個尾碼名為.py的檔案。

  2. 開發PyODPS任務代碼。

    建立完成後,參考下文內容學習簡單樣本的操作,瞭解PyODPS的主要能力。

    更多PyODPS的使用指導請參見基本操作概述DataFrame(不推薦使用)。也可以參考範例文件:使用PyODPS節點進行結巴中文分詞,實現端到端的簡單操作。

  3. 儲存Python檔案,並在本地運行。

初始化ODPS入口

重要

手動定義ODPS入口,程式碼範例如下。

import os
from odps import ODPS

o = ODPS(
    # (推薦)確保已設定環境變數。
    # 確保ALIBABA_CLOUD_ACCESS_KEY_ID環境變數設定為使用者 Access Key ID。
    access_id=os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
    # 確保ALIBABA_CLOUD_ACCESS_KEY_SECRET環境變數設定為使用者Access Key Secret。
    secret_access_key=os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
    
    # (不推薦)若未設定環境變數,請使用如下格式替代。
    # access_id='your-aliyun-access-key-id',
    # secret_access_key= 'your-aliyun-access-key-secret',
    
    project='your-default-project',
    endpoint='your-end-point',
)

參數

填寫說明

your-default-project

填寫MaxCompute專案名稱,登入MaxCompute控制台擷取專案列表。

your-end-point

填寫所在地區和所在網路環境下的Endpoint

例如杭州地區的公網Endpoint,請填寫https://service.cn-hangzhou.maxcompute.aliyun.com/api

重要

請確保選擇正確的網路類型,否則將無法正常串連。

完成上述配置後,可以在本地環境中使用PyODPS,例如對於ODPS對象的基本操作listgetexistcreatedelete等。更多PyODPS的使用指導請參見基本操作概述DataFrame(不推薦使用)

執行SQL

  1. 在PyODPS節點中執行SQL命令。

    在PyODPS節點中可以使用傳統模式或查詢加速(MCQA)執行SQL命令,當前主要支援運行DDL、DML類型的SQL命令。與傳統模式相比,加速查詢模式(MCQA)會將作業的運行結果寫入臨時緩衝中。當後續執行相同的查詢作業時,MaxCompute會優先返回緩衝中的結果,加快執行速度,相關計費規則請參見計算費用(隨用隨付),可根據需要選擇相應的模式執行。

    說明

    可以執行的SQL語句並非都可以通過入口對象的execute_sql()run_sql()等方法執行。

    在調用非DDL或非DML語句時,請使用其他方法。例如,調用建立表語句時,請使用create_table方法;調用API命令時,請使用run_xflowexecute_xflow方法。

    使用傳統模式執行SQL命令

    可以使用execute_sql()/run_sql()來執行SQL命令,樣本如下:

    使用create_table方法建立一個新的表。

    o.create_table('my_t', 'num bigint, id string', if_not_exists=True)

    使用execute_sql方法執行SQL查詢。

    result = o.execute_sql('SELECT * FROM pyodps_iris LIMIT 3')
    with result.open_reader() as reader:
        for record in reader:
            print(record)
    
  2. 在PyODPS節點中讀取SQL運行結果。

    可以使用open_reader()讀取SQL命令運行結果。具體操作請參見讀取SQL執行結果

更多PyODPS節點的SQL相關操作詳情請參見SQL

DataFrame

PyODPS提供了DataFrame API,支援使用DataFrame進行資料處理,更多DataFrame的操作樣本請參見DataFrame

  • 執行

    DataFrame的執行需要顯式調用立即執行的方法(如executepersist等)。範例程式碼如下。

    # 調用立即執行的方法,處理每條Record,列印出表pyodps_iris中iris.sepalwidth小於3的所有資料。
    from odps.df import DataFrame
    iris = DataFrame(o.get_table('pyodps_iris'))
    for record in iris[iris.sepalwidth < 3].execute():  
      print(record)
    
  • 列印詳細資料

    預設情況下,本地環境的PyODPS節點運行過程不會列印Logview等詳細過程。您可以手動設定options.verbose選項,開啟列印Logview等詳細過程。

    from odps import options
    options.verbose = True
    

設定運行參數hints

運行任務時如果需要設定運行時參數,可以通過設定hints參數來實現,參數類型是dict。

o.execute_sql('SELECT * FROM pyodps_iris', hints={'odps.sql.mapper.split.size': 16})

也可以對全域設定sql.setting,設定後後續每次運行時都會添加相關的運行時參數。

from odps import options
options.sql.settings = {'odps.sql.mapper.split.size': 16}
# 會根據全域配置添加hints
o.execute_sql('SELECT * FROM pyodps_iris')

完整樣本

  1. 本地建立test-pyodps-local.py檔案。

  2. 寫入範例程式碼。

    import os
    from odps import ODPS
    
    o = ODPS(
        # (推薦)確保已設定環境變數。
        # 確保ALIBABA_CLOUD_ACCESS_KEY_ID環境變數設定為使用者 Access Key ID。
        access_id=os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
        # 確保ALIBABA_CLOUD_ACCESS_KEY_SECRET環境變數設定為使用者Access Key Secret。
        secret_access_key=os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
        
        project='your-default-project',
        endpoint='your-end-point',
    )
    
    # 以直接指定欄位名以及欄位類型的方式建立非分區表my_new_table。
    table = o.create_table('my_new_table', 'num bigint, id string', if_not_exists=True)
    
    # 向非分區表my_new_table中插入資料。
    records = [[111, 'aaa'],
               [222, 'bbb'],
               [333, 'ccc'],
               [444, '中文']]
    o.write_table(table, records)
    
    # 讀取非分區表my_new_table中的資料。
    for record in o.read_table(table):
        print(record[0], record[1])
    
    # 以運行SQL的方式讀取表中的資料。
    result = o.execute_sql('SELECT * FROM pyodps_iris LIMIT 3;', hints={'odps.sql.allow.fullscan': 'true'})
    
    # 刪除表以清除資源。
    table.drop()
    
    print('使用open_reader方式讀取pyodps_iris表資料:')
    
    # 讀取SQL執行結果。
    with result.open_reader() as reader:
        for record in reader:
            print(record[0], record[1])
    
  3. 運行Python代碼。

    python test-pyodps-local.py

    運行結果:

    111 aaa
    222 bbb
    333 ccc
    444 中文
    使用open_reader方式讀取pyodps_iris表資料:
    4.9 3.0
    4.7 3.2
    4.6 3.1