すべてのプロダクト
Search
ドキュメントセンター

MaxCompute:OSS のマウントと使用に関するベストプラクティス

最終更新日:Mar 01, 2026

このトピックでは、MaxFrame の分散コンピューティングにおけるストレージとして、Alibaba Cloud Object Storage Service (OSS) を効率的かつ安全にマウントして使用する方法をコード例で示します。MaxFrame の with_fs_mount デコレーターは、ファイルシステムレベルのマウントを提供し、大規模なデータ処理に対して安定した信頼性の高い外部データアクセスを実現します。

利用シーン

この方法は、MaxFrame のジョブと OSS のような永続的なオブジェクトストレージを組み合わせる必要があるビッグデータ分析シナリオで使用できます。例:

  • OSS から生データをロードし、クレンジングまたは処理する。

  • 後続タスクのために中間結果を OSS に書き込む。

  • トレーニング済みモデルファイルや設定ファイルなどの静的リソースを共有する。

従来の読み書き方法、例えば pd.read_csv("oss://...") などは、SDK のパフォーマンスやネットワークオーバーヘッドによって制限されます。これらの方法は、分散環境では非効率です。ファイルシステムレベルのマウント (FS Mount) を使用すると、MaxCompute 内の OSS ファイルにローカルディスク上にあるかのようにアクセスできます。これにより、開発効率が大幅に向上します。

ベストプラクティス

サービスの有効化と権限の付与

  1. OSS の有効化とバケットの作成

    1. Object Storage Service (OSS) コンソールにログインします。

    2. 左側のナビゲーションウィンドウで、バケット をクリックします。

    3. バケット ページで、バケットの作成 をクリックします。

      この例では、バケット名は xxx-oss-test-sh です。

  2. MaxCompute 用の RAM ロールを作成し、そのロールを MaxCompute のランタイム環境にアタッチします。

    1. Resource Access Management (RAM) コンソールにログインします。

    2. 左側のナビゲーションウィンドウで、[アイデンティティ] > [ロール]を選択します。

    3. [ロール] ページで、[ロールの作成] をクリックします。

    4. [ロールの作成] ページの右上隅にある [サービスにリンクされたロールの作成] をクリックします。

      1. [ロールの作成] ページで、[プリンシパルタイプ][クラウドサービス] に設定します。

      2. [プリンシパルタイプ] で、[MaxCompute] を選択します。

      3. [権限] タブで、[権限の付与] をクリックします。[権限の付与] パネルで、ロールのアクセスポリシーを選択し、[OK] をクリックします。

        以下のアクセスポリシーを選択します:

with_fs_mount を使用した OSS のマウント

  1. 推奨される方法

    from maxframe.udf import with_fs_mount
    
    @with_fs_mount(
        "oss://oss-cn-xxxx-internal.aliyuncs.com/xxx-oss-test-sh/test/",
        "/mnt/oss_data",
        storage_options={
            "role_arn": "acs:ram::xxx:role/maxframe-oss"
        },
    )
    def _process(batch_df):
        import os
        if os.path.exists('/mnt/oss_data'):
            print(f"Mounted files: {os.listdir('/mnt/oss_data')}")
        else:
            print("/mnt/oss_data not mounted!")
        return batch_df * 2
        
  2. 非推奨

    この方法はテスト目的で使用できます。本番環境では使用しないでください。

    storage_options={
        "oss_access_key_id": "LTAI5t...",
        "oss_access_key_secret": "Wp9H..."
    }
    重要

    AccessKey のハードコーディングは避けてください。 role_arn を使用すると、システムが一時的なセキュリティトークンサービス (STS) トークンを自動的にリクエストできます。この方法により、AccessKey ID と AccessKey Secret の漏洩を防ぐことができます。

with_running_options を使用したリソース割り当ての制御

タスクの種類に応じて、適切な CPU とメモリリソースを設定します。

from maxframe.udf import with_running_options
@with_running_options(engine="dpe", cpu=2, memory=16)
@with_fs_mount(...)
def _process(batch_df):
    ...

パラメーター

推奨値

説明

engine="dpe"

固定

FS Mount は現在、DPE エンジンのみをサポートしています。

cpu

1~4

複雑な I/O 操作や展開の場合は、値を増やします。

memory

8 GB 以上

大きなファイルをロードする場合は 16 GB 以上を使用します。

使用例

推奨パターン:データバッチ処理

大規模なデータ処理シナリオでは、MaxFrame の apply_chunk 機能を使用して、入力データをバッチで処理できます。

MaxFrame セッションの作成

import os
from odps import ODPS
from maxframe import new_session
from maxframe.udf import with_fs_mount

# ODPS クライアントを初期化します。
o = ODPS(
    # 環境変数 ALIBABA_CLOUD_ACCESS_KEY_ID がご利用の AccessKey ID に設定されていることを確認してください。
    # また、環境変数 ALIBABA_CLOUD_ACCESS_KEY_SECRET がご利用の AccessKey Secret に設定されていることを確認してください。
    # AccessKey ID と AccessKey Secret の文字列を直接使用しないでください。
    os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
    os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
    project='<your project>',
    endpoint='https://service.cn-<region>.maxcompute.aliyun.com/api',
)

# ランタイムイメージを設定します。
# maxframe_service_dpe_runtime イメージには ossfs2_2.0.3.1_linux_x86_64.deb が含まれています。
# カスタムイメージを使用する場合は、OSS 依存パッケージをダウンロードし、アップロードしてイメージ内で使用する必要があります。依存パッケージはコードブロックの下に記載されています。
options.sql.settings = { "odps.session.image": "maxframe_service_dpe_runtime"}

# セッションを開始します。
session = new_session(o)

print("LogView:", session.get_logview_address())
print("Session ID:", session.session_id)

@with_running_options(engine="dpe", cpu=2, memory=8)
@with_fs_mount(
    "oss://oss-cn-<region>-internal.aliyuncs.com/wzy-oss-test-sh/test/",
    "/mnt/oss_data",
    storage_options={
        "role_arn": "acs:ram::<uid>:role/maxframe-oss"
    },
)

OSSFS 依存パッケージ:ossfs2_2.0.3.1_linux_x86_64.deb

ユーザー定義関数の作成

def _process(batch_df):
  import pandas as pd
  import os

  # ステップ 1:マウントが成功したかどうかを確認します。
  mount_point = "/mnt/oss_data"
  if not os.path.exists(mount_point):
    raise RuntimeError("OSS mount failed!")

    # ステップ 2:マッピングテーブルや辞書などのデータをロードします。
  mapping_file = os.path.join(mount_point, "category_map.csv")
  if os.path.isfile(mapping_file):
    mapping_df = pd.read_csv(mapping_file)

    # ステップ 3:現在のチャンクを処理します。
  result = batch_df.copy()
  result['F'] = result['A'] * 10

  return result

DataFrame の構築とユーザー定義関数の適用

data = [[1.0, 2.0, 3.0, 4.0, 5.0], ...]
df = md.DataFrame(data, columns=['A', 'B', 'C', 'D', 'E'])

# マウント後、apply_chunk を使用して関数を適用します。
result_df = df.mf.apply_chunk(
  _process,
  skip_infer=True,
  output_type="dataframe",
  dtypes=df.dtypes,
  index=df.index
)

# 操作を実行し、結果をフェッチします。
result = result_df.execute().fetch()

skip_infer=True を設定すると、型推論がスキップされ、実行が高速化されます。ただし、dtypesindex が正しく渡されていることを確認する必要があります。

デバッグのヒント

マウントステータスの確認

_process 関数にデバッグログを追加できます。

import os
print("Mount path exists:", os.path.exists("/mnt/oss_data"))
print("Files in mount:", os.listdir("/mnt/oss_data") if os.path.exists("/mnt/oss_data") else [])

LogView の出力を確認し、次のようなログが生成されていることを確認します:

FS Mount successful! /mnt/oss_data: ['data.csv', 'config.json', 'model.pkl']
Processing batch with shape: (1000, 5)