本文基於實際代碼案例介紹如何在MaxFrame中高效、安全地掛載和使用阿里雲 OSS 作為分散式運算儲存。通過MaxFrame with_fs_mount裝飾器實現檔案系統級掛載,為大規模資料處理提供穩定可靠的外部資料訪問能力。
應用情境
需要將MaxFrame作業與持久化Object Storage Service(如 OSS)結合使用的巨量資料分析情境。例如:
從 OSS 載入未經處理資料並清洗或處理;
將中間結果寫入OSS供下遊任務消費;
共用訓練後的模型檔案、設定檔等靜態資源。
傳統的讀寫方式(如 pd.read_csv("oss://..."))受限於 SDK 效能和網路開銷,在分布式環境下效率較低。而通過檔案系統級掛載(FS Mount),可以在 MaxCompute 中像操作本地磁碟一樣訪問 OSS 檔案,極大提升開發效率。
實踐指南
開通服務及授權
開通OSS服務並建立Bucket。
在左側導覽列單擊Bucket 列表。
在Bucket 列表頁面,單擊建立 Bucket。
本實踐中Bucket Name為
xxx-oss-test-sh。
建立用於MaxCompute的RAM 角色(Role)並綁定該角色到MaxCompute運行環境。
登入RAM控制台。
在左側導覽列選擇。
在角色頁面,單擊建立角色。
在建立角色頁面的右上方,單擊建立服務關聯角色。
在建立角色頁面,選擇信任主體類型為雲端服務。
信任主體名稱選擇雲原生MaxCompute。
在許可權管理頁簽,單擊新增授權。在彈出的新增授權面板中,選擇要授予該角色的權限原則,單擊確認新增授權。
權限原則選擇:
管理Object Storage Service服務(OSS)許可權:AliyunOSSFullAccess
管理MaxCompute(MaxCompute)的許可權:AliyunMaxComputeFullAccess
使用with_fs_mount掛載OSS
推薦用法
from maxframe.udf import with_fs_mount @with_fs_mount( "oss://oss-cn-xxxx-internal.aliyuncs.com/xxx-oss-test-sh/test/", "/mnt/oss_data", storage_options={ "role_arn": "acs:ram::xxx:role/maxframe-oss" }, ) def _process(batch_df): import os if os.path.exists('/mnt/oss_data'): print(f"Mounted files: {os.listdir('/mnt/oss_data')}") else: print("/mnt/oss_data not mounted!") return batch_df * 2不推薦寫法
可用於測試用途,不建議用於生產環境。
storage_options={ "oss_access_key_id": "LTAI5t...", "oss_access_key_secret": "Wp9H..." }重要避免寫入程式碼 AccessKey。使用
role_arn可以讓系統自動申請臨時 STS Token,避免 AK/SK 泄露風險。
結合with_running_options控制資源分派
建議根據任務類型設定合理的 CPU 和記憶體資源:
from maxframe.udf import with_running_options
@with_running_options(engine="dpe", cpu=2, memory=16)
@with_fs_mount(...)
def _process(batch_df):
...參數 | 建議值 | 說明 |
| 固定 | 當前 FS Mount 僅支援 DPE 引擎。 |
| 1~4 | 若涉及複雜 IO 或解壓可適當增加。 |
| 8GB 起 | 大檔案載入建議 ≥16GB。 |
使用樣本
推薦模式:批量處理(data batch processing)。
在大規模資料處理情境下,可結合MaxFrame apply_chunk功能,對輸入資料批量處理。
建立 MaxFrame Session 並啟用 SQL 支援
import os
from odps import ODPS
from maxframe import new_session
from maxframe.udf import with_fs_mount
# 初始化 ODPS 用戶端
o = ODPS(
# 確保 ALIBABA_CLOUD_ACCESS_KEY_ID 環境變數設定為使用者 Access Key ID,
# ALIBABA_CLOUD_ACCESS_KEY_SECRET 環境變數設定為使用者 Access Key Secret,
# 不建議直接使用AccessKey ID和 AccessKey Secret字串。
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
project='<your project>',
endpoint='https://service.cn-<region>.maxcompute.aliyun.com/api',
)
# 設定鏡像(如有自訂依賴)
options.sql.settings = {"odps.session.image": "xxxx"}
# 啟動會話
session = new_session(o)
print("LogView:", session.get_logview_address())
print("Session ID:", session.session_id)
@with_fs_mount(
"oss://oss-cn-<region>-internal.aliyuncs.com/wzy-oss-test-sh/test/",
"/mnt/oss_data",
storage_options={
"role_arn": "acs:ram::<uid>:role/maxframe-oss"
},
)
@with_running_options(engine="dpe", cpu=2, memory=16)建立自訂函數
def _process(batch_df):
import pandas as pd
import os
# Step 1: 檢查掛載是否成功
mount_point = "/mnt/oss_data"
if not os.path.exists(mount_point):
raise RuntimeError("OSS mount failed!")
# Step 2: 載入資料(如映射表、詞典)
mapping_file = os.path.join(mount_point, "category_map.csv")
if os.path.isfile(mapping_file):
mapping_df = pd.read_csv(mapping_file)
# Step 3: 處理當前 chunk
result = batch_df.copy()
result['F'] = result['A'] * 10
return result構建DataFrame並應用自訂函數
data = [[1.0, 2.0, 3.0, 4.0, 5.0], ...]
df = md.DataFrame(data, columns=['A', 'B', 'C', 'D', 'E'])
# 使用 apply_chunk 應用掛載後的函數
result_df = df.mf.apply_chunk(
_process,
skip_infer=True,
output_type="dataframe",
dtypes=df.dtypes,
index=df.index
)
# 執行並擷取結果
result = result_df.execute().fetch()skip_infer=True可跳過類型推斷,加快執行速度,但需確保 dtypes 和 index 正確傳遞。
調試技巧
驗證掛載狀態
可在 _process 函數中加入調試日誌:
import os
print("Mount path exists:", os.path.exists("/mnt/oss_data"))
print("Files in mount:", os.listdir("/mnt/oss_data") if os.path.exists("/mnt/oss_data") else [])查看 LogView 輸出,確認是否有類似日誌:
FS Mount 成功!/mnt/oss_data: ['data.csv', 'config.json', 'model.pkl']
Processing batch with shape: (1000, 5)