All Products
Search
Document Center

MaxCompute:Best practices for mounting and using OSS

Last Updated:Jan 24, 2026

This topic provides code examples that show how to efficiently and securely mount and use Alibaba Cloud Object Storage Service (OSS) as storage for distributed computing in MaxFrame. The MaxFrame with_fs_mount decorator provides file system-level mounting, which offers stable and reliable external data access for large-scale data processing.

Scenarios

You can use this method in big data analytics scenarios where you need to combine MaxFrame jobs with persistent object storage, such as OSS. For example:

  • Load raw data from OSS and then scrub or process it.

  • Write intermediate results to OSS for downstream tasks.

  • Share static resources, such as trained model files and configuration files.

Traditional read and write methods, such as pd.read_csv("oss://..."), are limited by software development kit (SDK) performance and network overhead. These methods are inefficient in a distributed environment. File system-level mounting (FS Mount) lets you access OSS files in MaxCompute as if they were on a local disk. This greatly improves development efficiency.

Best practices

Activate services and grant permissions

  1. Activate OSS and create a bucket.

    1. Log on to the Object Storage Service (OSS) console.

    2. In the navigation pane on the left, click Buckets.

    3. On the Buckets page, click Create Bucket.

      In this example, the Bucket Name is xxx-oss-test-sh.

  2. Create a RAM role for MaxCompute and attach the role to the MaxCompute runtime environment.

    1. Log on to the Resource Access Management (RAM) console.

    2. In the navigation pane on the left, choose Identities > Roles.

    3. On the Roles page, click Create Role.

    4. In the upper-right corner of the Create Role page, click Create Service Linked Role.

      1. On the Create Role page, set Principal Type to Cloud Service.

      2. For Principal Type, select MaxCompute.

      3. On the Permissions tab, click Grant Permission. In the Grant Permission panel, select an access policy for the role and click OK.

        Select the following access policies:

Mount OSS using with_fs_mount

  1. Recommended method

    from maxframe.udf import with_fs_mount
    
    @with_fs_mount(
        "oss://oss-cn-xxxx-internal.aliyuncs.com/xxx-oss-test-sh/test/",
        "/mnt/oss_data",
        storage_options={
            "role_arn": "acs:ram::xxx:role/maxframe-oss"
        },
    )
    def _process(batch_df):
        import os
        if os.path.exists('/mnt/oss_data'):
            print(f"Mounted files: {os.listdir('/mnt/oss_data')}")
        else:
            print("/mnt/oss_data not mounted!")
        return batch_df * 2
        
  2. Not recommended

    You can use this method for testing purposes. Do not use it in a production environment.

    storage_options={
        "oss_access_key_id": "LTAI5t...",
        "oss_access_key_secret": "Wp9H..."
    }
    Important

    Avoid hard-coding your AccessKey. Using role_arn allows the system to automatically request a temporary Security Token Service (STS) token. This method prevents your AccessKey ID and AccessKey secret from being leaked.

Control resource allocation using with_running_options

Set appropriate CPU and memory resources based on the task type.

from maxframe.udf import with_running_options
@with_running_options(engine="dpe", cpu=2, memory=16)
@with_fs_mount(...)
def _process(batch_df):
    ...

Parameter

Suggested value

Description

engine="dpe"

Fixed

FS Mount currently supports only the DPE engine.

cpu

1 to 4

Increase the value for complex I/O operations or decompression.

memory

8 GB or more

Use 16 GB or more to load large files.

Usage example

Recommended pattern: data batch processing.

In large-scale data processing scenarios, you can use the MaxFrame apply_chunk feature to process input data in batches.

Create a MaxFrame session

import os
from odps import ODPS
from maxframe import new_session
from maxframe.udf import with_fs_mount

# Initialize the ODPS client.
o = ODPS(
    # Make sure that the ALIBABA_CLOUD_ACCESS_KEY_ID environment variable is set to your AccessKey ID,
    # and the ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variable is set to your AccessKey secret.
    # Do not directly use the AccessKey ID and AccessKey secret strings.
    os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
    os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
    project='<your project>',
    endpoint='https://service.cn-<region>.maxcompute.aliyun.com/api',
)

# Set the runtime image.
# The maxframe_service_dpe_runtime image includes ossfs2_2.0.3.1_linux_x86_64.deb.
# If you use a custom image, you must download the OSS dependency package, and then upload and use it in the image. The dependency package is provided below the code block.
options.sql.settings = { "odps.session.image": "maxframe_service_dpe_runtime"}

# Start the session.
session = new_session(o)

print("LogView:", session.get_logview_address())
print("Session ID:", session.session_id)

@with_running_options(engine="dpe", cpu=2, memory=8)
@with_fs_mount(
    "oss://oss-cn-<region>-internal.aliyuncs.com/wzy-oss-test-sh/test/",
    "/mnt/oss_data",
    storage_options={
        "role_arn": "acs:ram::<uid>:role/maxframe-oss"
    },
)

OSSFS dependency package: ossfs2_2.0.3.1_linux_x86_64.deb

Create a user-defined function

def _process(batch_df):
  import pandas as pd
  import os

  # Step 1: Check whether the mount is successful.
  mount_point = "/mnt/oss_data"
  if not os.path.exists(mount_point):
    raise RuntimeError("OSS mount failed!")

    # Step 2: Load data, such as mapping tables or dictionaries.
  mapping_file = os.path.join(mount_point, "category_map.csv")
  if os.path.isfile(mapping_file):
    mapping_df = pd.read_csv(mapping_file)

    # Step 3: Process the current chunk.
  result = batch_df.copy()
  result['F'] = result['A'] * 10

  return result

Build a DataFrame and apply the user-defined function

data = [[1.0, 2.0, 3.0, 4.0, 5.0], ...]
df = md.DataFrame(data, columns=['A', 'B', 'C', 'D', 'E'])

# Use apply_chunk to apply the function after mounting.
result_df = df.mf.apply_chunk(
  _process,
  skip_infer=True,
  output_type="dataframe",
  dtypes=df.dtypes,
  index=df.index
)

# Execute the operation and fetch the result.
result = result_df.execute().fetch()

Setting skip_infer=True skips type inference to speed up execution. However, you must ensure that dtypes and index are passed correctly.

Debugging tips

Verify the mount status

You can add debug logs to the _process function.

import os
print("Mount path exists:", os.path.exists("/mnt/oss_data"))
print("Files in mount:", os.listdir("/mnt/oss_data") if os.path.exists("/mnt/oss_data") else [])

Check the LogView output to verify that a log similar to the following is generated:

FS Mount successful! /mnt/oss_data: ['data.csv', 'config.json', 'model.pkl']
Processing batch with shape: (1000, 5)