このトピックでは、コード例を使用して、MaxFrame で分散コンピューティングのストレージとして Alibaba Cloud Object Storage Service (OSS) を効率的かつ安全にマウントして使用する方法について説明します。with_fs_mount デコレーターは、ファイルシステムレベルで OSS をマウントします。これにより、大規模なデータ処理に安定した信頼性の高い外部データアクセスが提供されます。
利用シーン
この方法は、MaxFrame のジョブと OSS のような永続的なオブジェクトストレージを組み合わせる必要があるビッグデータ分析シナリオで使用します。例:
OSS から生データをロードし、クレンジングまたは処理する。
下流タスクが使用するために、中間結果を OSS に書き込む。
トレーニング済みモデルファイルや設定ファイルなどの静的リソースを共有する。
pd.read_csv("oss://...") のような従来の読み取りおよび書き込みメソッドは、ソフトウェア開発キット (SDK) のパフォーマンスとネットワークオーバーヘッドによる制限があるため、分散環境では非効率です。ファイルシステムレベルのマウント (FS Mount) を使用すると、MaxCompute の OSS ファイルに、ローカルディスク上にあるかのようにアクセスできます。これにより、開発効率が大幅に向上します。
ベストプラクティス
サービスの有効化と権限の付与
OSS を有効化し、バケットを作成します。
Object Storage Service (OSS) コンソールにログインします。
左側のナビゲーションウィンドウで、バケット をクリックします。
バケット ページで、バケットの作成 をクリックします。
この例では、バケット名は
xxx-oss-test-shです。
MaxCompute 用の RAM ロールを作成し、MaxCompute のランタイム環境にアタッチします。
左側のナビゲーションウィンドウで、 を選択します。
[ロール] ページで、[ロールの作成] をクリックします。
[ロールの作成] ページの右上隅にある [サービス連携ロールの作成] をクリックします。
[ロールの作成] ページで、[信頼できるエンティティ] を [Alibaba Cloud サービス] に設定します。
[ロールタイプ]には、 [Cloud-native Big Data Computing Service MaxCompute]を選択します。
[権限] タブで、[権限の追加] をクリックします。[権限の追加] パネルで、ロールのアクセスポリシーを選択し、[OK] をクリックします。
以下のアクセスポリシーを選択します:
Object Storage Service (OSS) を管理する権限: AliyunOSSFullAccess
MaxCompute を管理する権限: AliyunMaxComputeFullAccess
with_fs_mount を使用した OSS のマウント
推奨される方法
from maxframe.udf import with_fs_mount @with_fs_mount( "oss://oss-cn-xxxx-internal.aliyuncs.com/xxx-oss-test-sh/test/", "/mnt/oss_data", storage_options={ "role_arn": "acs:ram::xxx:role/maxframe-oss" }, ) def _process(batch_df): import os if os.path.exists('/mnt/oss_data'): print(f"Mounted files: {os.listdir('/mnt/oss_data')}") else: print("/mnt/oss_data not mounted!") return batch_df * 2推奨されない方法
この方法はテストには使用できますが、本番環境での使用は推奨されません。
storage_options={ "oss_access_key_id": "LTAI5t...", "oss_access_key_secret": "Wp9H..." }重要AccessKey のハードコーディングは避けてください。
role_arnを使用すると、システムは一時的なセキュリティトークンサービス (STS) トークンを自動的にリクエストできます。 このプラクティスにより、AccessKey ID と AccessKey Secret が漏洩するリスクを回避できます。
with_running_options によるリソース割り当ての制御
タスクの種類に基づいて、適切な CPU とメモリリソースを設定します:
from maxframe.udf import with_running_options
@with_running_options(engine="dpe", cpu=2, memory=16)
@with_fs_mount(...)
def _process(batch_df):
...パラメーター | 推奨値 | 説明 |
| 固定 | FS マウントは現在、DPE エンジンのみをサポートしています。 |
| 1 から 4 | 複雑な I/O や展開を行う場合は、この値を増やしてください。 |
| 8 GB 以上 | 大きなファイルをロードするには、16 GB 以上を使用してください。 |
例
推奨パターン:データバッチ処理。
大規模なデータ処理には、MaxFrame の apply_chunk 機能を使用して入力データをバッチで処理できます。
MaxFrame セッションの作成と SQL サポートの有効化
import os
from odps import ODPS
from maxframe import new_session
from maxframe.udf import with_fs_mount
# ODPS クライアントを初期化します
o = ODPS(
# ALIBABA_CLOUD_ACCESS_KEY_ID 環境変数に AccessKey ID が設定されていることを確認してください。
# ALIBABA_CLOUD_ACCESS_KEY_SECRET 環境変数に AccessKey Secret が設定されていることを確認してください。
# AccessKey ID と AccessKey Secret の文字列を直接使用しないでください。
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
project='<your project>',
endpoint='https://service.cn-<region>.maxcompute.aliyun.com/api',
)
# イメージを設定します (カスタム依存関係がある場合)
options.sql.settings = { "odps.session.image": "maxframe_service_dpe_runtime"}
# セッションを開始します
session = new_session(o)
print("LogView:", session.get_logview_address())
print("Session ID:", session.session_id)
@with_fs_mount(
"oss://oss-cn-<region>-internal.aliyuncs.com/wzy-oss-test-sh/test/",
"/mnt/oss_data",
storage_options={
"role_arn": "acs:ram::<uid>:role/maxframe-oss"
},
)
@with_running_options(engine="dpe", cpu=2, memory=16)ユーザー定義関数の作成
def _process(batch_df):
import pandas as pd
import os
# ステップ 1: マウントが成功したか確認します
mount_point = "/mnt/oss_data"
if not os.path.exists(mount_point):
raise RuntimeError("OSS mount failed!")
# ステップ 2: データをロードします (マッピングテーブルや辞書など)
mapping_file = os.path.join(mount_point, "category_map.csv")
if os.path.isfile(mapping_file):
mapping_df = pd.read_csv(mapping_file)
# ステップ 3: 現在のチャンクを処理します
result = batch_df.copy()
result['F'] = result['A'] * 10
return resultDataFrame の構築とユーザー定義関数の適用
data = [[1.0, 2.0, 3.0, 4.0, 5.0], ...]
df = md.DataFrame(data, columns=['A', 'B', 'C', 'D', 'E'])
# apply_chunk を使用して、マウントとともにユーザー定義関数を適用します
result_df = df.mf.apply_chunk(
_process,
skip_infer=True,
output_type="dataframe",
dtypes=df.dtypes,
index=df.index
)
# 実行して結果を取得します
result = result_df.execute().fetch()skip_infer=True は型推論をスキップして実行を高速化します。dtypes と index を正しく渡すようにしてください。
デバッグのヒント
マウントステータスの確認
_process 関数にデバッグログを追加します:
import os
print("Mount path exists:", os.path.exists("/mnt/oss_data"))
print("Files in mount:", os.listdir("/mnt/oss_data") if os.path.exists("/mnt/oss_data") else [])LogView の出力を確認し、以下のようなログが生成されていることを確認します:
FS Mount successful! /mnt/oss_data: ['data.csv', 'config.json', 'model.pkl']
Processing batch with shape: (1000, 5)