このトピックでは、MaxFrame の分散コンピューティングにおけるストレージとして、Alibaba Cloud Object Storage Service (OSS) を効率的かつ安全にマウントして使用する方法をコード例で示します。MaxFrame の with_fs_mount デコレーターは、ファイルシステムレベルのマウントを提供し、大規模なデータ処理に対して安定した信頼性の高い外部データアクセスを実現します。
利用シーン
この方法は、MaxFrame のジョブと OSS のような永続的なオブジェクトストレージを組み合わせる必要があるビッグデータ分析シナリオで使用できます。例:
OSS から生データをロードし、クレンジングまたは処理する。
後続タスクのために中間結果を OSS に書き込む。
トレーニング済みモデルファイルや設定ファイルなどの静的リソースを共有する。
従来の読み書き方法、例えば pd.read_csv("oss://...") などは、SDK のパフォーマンスやネットワークオーバーヘッドによって制限されます。これらの方法は、分散環境では非効率です。ファイルシステムレベルのマウント (FS Mount) を使用すると、MaxCompute 内の OSS ファイルにローカルディスク上にあるかのようにアクセスできます。これにより、開発効率が大幅に向上します。
ベストプラクティス
サービスの有効化と権限の付与
OSS の有効化とバケットの作成
Object Storage Service (OSS) コンソールにログインします。
左側のナビゲーションウィンドウで、バケット をクリックします。
バケット ページで、バケットの作成 をクリックします。
この例では、バケット名は
xxx-oss-test-shです。
MaxCompute 用の RAM ロールを作成し、そのロールを MaxCompute のランタイム環境にアタッチします。
左側のナビゲーションウィンドウで、を選択します。
[ロール] ページで、[ロールの作成] をクリックします。
[ロールの作成] ページの右上隅にある [サービスにリンクされたロールの作成] をクリックします。
[ロールの作成] ページで、[プリンシパルタイプ] を [クラウドサービス] に設定します。
[プリンシパルタイプ] で、[MaxCompute] を選択します。
[権限] タブで、[権限の付与] をクリックします。[権限の付与] パネルで、ロールのアクセスポリシーを選択し、[OK] をクリックします。
以下のアクセスポリシーを選択します:
OSS を管理する権限:AliyunOSSFullAccess
MaxCompute を管理する権限:AliyunMaxComputeFullAccess
with_fs_mount を使用した OSS のマウント
推奨される方法
from maxframe.udf import with_fs_mount @with_fs_mount( "oss://oss-cn-xxxx-internal.aliyuncs.com/xxx-oss-test-sh/test/", "/mnt/oss_data", storage_options={ "role_arn": "acs:ram::xxx:role/maxframe-oss" }, ) def _process(batch_df): import os if os.path.exists('/mnt/oss_data'): print(f"Mounted files: {os.listdir('/mnt/oss_data')}") else: print("/mnt/oss_data not mounted!") return batch_df * 2非推奨
この方法はテスト目的で使用できます。本番環境では使用しないでください。
storage_options={ "oss_access_key_id": "LTAI5t...", "oss_access_key_secret": "Wp9H..." }重要AccessKey のハードコーディングは避けてください。
role_arnを使用すると、システムが一時的なセキュリティトークンサービス (STS) トークンを自動的にリクエストできます。この方法により、AccessKey ID と AccessKey Secret の漏洩を防ぐことができます。
with_running_options を使用したリソース割り当ての制御
タスクの種類に応じて、適切な CPU とメモリリソースを設定します。
from maxframe.udf import with_running_options
@with_running_options(engine="dpe", cpu=2, memory=16)
@with_fs_mount(...)
def _process(batch_df):
...パラメーター | 推奨値 | 説明 |
| 固定 | FS Mount は現在、DPE エンジンのみをサポートしています。 |
| 1~4 | 複雑な I/O 操作や展開の場合は、値を増やします。 |
| 8 GB 以上 | 大きなファイルをロードする場合は 16 GB 以上を使用します。 |
使用例
推奨パターン:データバッチ処理
大規模なデータ処理シナリオでは、MaxFrame の apply_chunk 機能を使用して、入力データをバッチで処理できます。
MaxFrame セッションの作成
import os
from odps import ODPS
from maxframe import new_session
from maxframe.udf import with_fs_mount
# ODPS クライアントを初期化します。
o = ODPS(
# 環境変数 ALIBABA_CLOUD_ACCESS_KEY_ID がご利用の AccessKey ID に設定されていることを確認してください。
# また、環境変数 ALIBABA_CLOUD_ACCESS_KEY_SECRET がご利用の AccessKey Secret に設定されていることを確認してください。
# AccessKey ID と AccessKey Secret の文字列を直接使用しないでください。
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
project='<your project>',
endpoint='https://service.cn-<region>.maxcompute.aliyun.com/api',
)
# ランタイムイメージを設定します。
# maxframe_service_dpe_runtime イメージには ossfs2_2.0.3.1_linux_x86_64.deb が含まれています。
# カスタムイメージを使用する場合は、OSS 依存パッケージをダウンロードし、アップロードしてイメージ内で使用する必要があります。依存パッケージはコードブロックの下に記載されています。
options.sql.settings = { "odps.session.image": "maxframe_service_dpe_runtime"}
# セッションを開始します。
session = new_session(o)
print("LogView:", session.get_logview_address())
print("Session ID:", session.session_id)
@with_running_options(engine="dpe", cpu=2, memory=8)
@with_fs_mount(
"oss://oss-cn-<region>-internal.aliyuncs.com/wzy-oss-test-sh/test/",
"/mnt/oss_data",
storage_options={
"role_arn": "acs:ram::<uid>:role/maxframe-oss"
},
)OSSFS 依存パッケージ:ossfs2_2.0.3.1_linux_x86_64.deb
ユーザー定義関数の作成
def _process(batch_df):
import pandas as pd
import os
# ステップ 1:マウントが成功したかどうかを確認します。
mount_point = "/mnt/oss_data"
if not os.path.exists(mount_point):
raise RuntimeError("OSS mount failed!")
# ステップ 2:マッピングテーブルや辞書などのデータをロードします。
mapping_file = os.path.join(mount_point, "category_map.csv")
if os.path.isfile(mapping_file):
mapping_df = pd.read_csv(mapping_file)
# ステップ 3:現在のチャンクを処理します。
result = batch_df.copy()
result['F'] = result['A'] * 10
return resultDataFrame の構築とユーザー定義関数の適用
data = [[1.0, 2.0, 3.0, 4.0, 5.0], ...]
df = md.DataFrame(data, columns=['A', 'B', 'C', 'D', 'E'])
# マウント後、apply_chunk を使用して関数を適用します。
result_df = df.mf.apply_chunk(
_process,
skip_infer=True,
output_type="dataframe",
dtypes=df.dtypes,
index=df.index
)
# 操作を実行し、結果をフェッチします。
result = result_df.execute().fetch()skip_infer=True を設定すると、型推論がスキップされ、実行が高速化されます。ただし、dtypes と index が正しく渡されていることを確認する必要があります。
デバッグのヒント
マウントステータスの確認
_process 関数にデバッグログを追加できます。
import os
print("Mount path exists:", os.path.exists("/mnt/oss_data"))
print("Files in mount:", os.listdir("/mnt/oss_data") if os.path.exists("/mnt/oss_data") else [])LogView の出力を確認し、次のようなログが生成されていることを確認します:
FS Mount successful! /mnt/oss_data: ['data.csv', 'config.json', 'model.pkl']
Processing batch with shape: (1000, 5)