全部產品
Search
文件中心

MaxCompute:OSS掛載及使用實踐

更新時間:Jan 09, 2026

本文基於實際代碼案例介紹如何在MaxFrame中高效、安全地掛載和使用阿里雲 OSS 作為分散式運算儲存。通過MaxFrame with_fs_mount裝飾器實現檔案系統級掛載,為大規模資料處理提供穩定可靠的外部資料訪問能力。

應用情境

需要將MaxFrame作業與持久化Object Storage Service(如 OSS)結合使用的巨量資料分析情境。例如:

  • 從 OSS 載入未經處理資料並清洗或處理;

  • 將中間結果寫入OSS供下遊任務消費;

  • 共用訓練後的模型檔案、設定檔等靜態資源。

傳統的讀寫方式(如 pd.read_csv("oss://..."))受限於 SDK 效能和網路開銷,在分布式環境下效率較低。而通過檔案系統級掛載(FS Mount),可以在 MaxCompute 中像操作本地磁碟一樣訪問 OSS 檔案,極大提升開發效率。

實踐指南

開通服務及授權

  1. 開通OSS服務並建立Bucket。

    1. 登入Object Storage Service控制台

    2. 在左側導覽列單擊Bucket 列表

    3. Bucket 列表頁面,單擊建立 Bucket

      本實踐中Bucket Name為xxx-oss-test-sh

  2. 建立用於MaxCompute的RAM 角色(Role)並綁定該角色到MaxCompute運行環境。

    1. 登入RAM控制台

    2. 在左側導覽列選擇身份管理 > 角色

    3. 角色頁面,單擊建立角色

    4. 建立角色頁面的右上方,單擊建立服務關聯角色

      1. 建立角色頁面,選擇信任主體類型雲端服務

      2. 信任主體名稱選擇雲原生MaxCompute

      3. 許可權管理頁簽,單擊新增授權。在彈出的新增授權面板中,選擇要授予該角色的權限原則,單擊確認新增授權

        權限原則選擇:

使用with_fs_mount掛載OSS

  1. 推薦用法

    from maxframe.udf import with_fs_mount
    
    @with_fs_mount(
        "oss://oss-cn-xxxx-internal.aliyuncs.com/xxx-oss-test-sh/test/",
        "/mnt/oss_data",
        storage_options={
            "role_arn": "acs:ram::xxx:role/maxframe-oss"
        },
    )
    def _process(batch_df):
        import os
        if os.path.exists('/mnt/oss_data'):
            print(f"Mounted files: {os.listdir('/mnt/oss_data')}")
        else:
            print("/mnt/oss_data not mounted!")
        return batch_df * 2
        
  2. 不推薦寫法

    可用於測試用途,不建議用於生產環境。

    storage_options={
        "oss_access_key_id": "LTAI5t...",
        "oss_access_key_secret": "Wp9H..."
    }
    重要

    避免寫入程式碼 AccessKey。使用 role_arn 可以讓系統自動申請臨時 STS Token,避免 AK/SK 泄露風險

結合with_running_options控制資源分派

建議根據任務類型設定合理的 CPU 和記憶體資源:

from maxframe.udf import with_running_options
@with_running_options(engine="dpe", cpu=2, memory=16)
@with_fs_mount(...)
def _process(batch_df):
    ...

參數

建議值

說明

engine="dpe"

固定

當前 FS Mount 僅支援 DPE 引擎。

cpu

1~4

若涉及複雜 IO 或解壓可適當增加。

memory

8GB 起

大檔案載入建議 ≥16GB。

使用樣本

推薦模式:批量處理(data batch processing)。

在大規模資料處理情境下,可結合MaxFrame apply_chunk功能,對輸入資料批量處理。

建立 MaxFrame Session 並啟用 SQL 支援

import os
from odps import ODPS
from maxframe import new_session
from maxframe.udf import with_fs_mount

# 初始化 ODPS 用戶端
o = ODPS(
    # 確保 ALIBABA_CLOUD_ACCESS_KEY_ID 環境變數設定為使用者 Access Key ID,
    # ALIBABA_CLOUD_ACCESS_KEY_SECRET 環境變數設定為使用者 Access Key Secret,
    # 不建議直接使用AccessKey ID和 AccessKey Secret字串。
    os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
    os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
    project='<your project>',
    endpoint='https://service.cn-<region>.maxcompute.aliyun.com/api',
)

# 設定鏡像(如有自訂依賴)
options.sql.settings = {"odps.session.image": "xxxx"}

# 啟動會話
session = new_session(o)

print("LogView:", session.get_logview_address())
print("Session ID:", session.session_id)



@with_fs_mount(
    "oss://oss-cn-<region>-internal.aliyuncs.com/wzy-oss-test-sh/test/",
    "/mnt/oss_data",
    storage_options={
        "role_arn": "acs:ram::<uid>:role/maxframe-oss"
    },
)
@with_running_options(engine="dpe", cpu=2, memory=16)

建立自訂函數

def _process(batch_df):
    import pandas as pd
    import os

    # Step 1: 檢查掛載是否成功
    mount_point = "/mnt/oss_data"
    if not os.path.exists(mount_point):
        raise RuntimeError("OSS mount failed!")

    # Step 2: 載入資料(如映射表、詞典)
    mapping_file = os.path.join(mount_point, "category_map.csv")
    if os.path.isfile(mapping_file):
        mapping_df = pd.read_csv(mapping_file)

    # Step 3: 處理當前 chunk
    result = batch_df.copy()
    result['F'] = result['A'] * 10

    return result

構建DataFrame並應用自訂函數

data = [[1.0, 2.0, 3.0, 4.0, 5.0], ...]
df = md.DataFrame(data, columns=['A', 'B', 'C', 'D', 'E'])

# 使用 apply_chunk 應用掛載後的函數
result_df = df.mf.apply_chunk(
    _process,
    skip_infer=True,
    output_type="dataframe",
    dtypes=df.dtypes,
    index=df.index
)

# 執行並擷取結果
result = result_df.execute().fetch()

skip_infer=True可跳過類型推斷,加快執行速度,但需確保 dtypes 和 index 正確傳遞。

調試技巧

驗證掛載狀態

可在 _process 函數中加入調試日誌:

import os
print("Mount path exists:", os.path.exists("/mnt/oss_data"))
print("Files in mount:", os.listdir("/mnt/oss_data") if os.path.exists("/mnt/oss_data") else [])

查看 LogView 輸出,確認是否有類似日誌:

FS Mount 成功!/mnt/oss_data: ['data.csv', 'config.json', 'model.pkl']
Processing batch with shape: (1000, 5)