This topic provides code examples that show how to efficiently and securely mount and use Alibaba Cloud Object Storage Service (OSS) as storage for distributed computing in MaxFrame. The MaxFrame with_fs_mount decorator provides file system-level mounting, which offers stable and reliable external data access for large-scale data processing.
Scenarios
You can use this method in big data analytics scenarios where you need to combine MaxFrame jobs with persistent object storage, such as OSS. For example:
Load raw data from OSS and then scrub or process it.
Write intermediate results to OSS for downstream tasks.
Share static resources, such as trained model files and configuration files.
Traditional read and write methods, such as pd.read_csv("oss://..."), are limited by software development kit (SDK) performance and network overhead. These methods are inefficient in a distributed environment. File system-level mounting (FS Mount) lets you access OSS files in MaxCompute as if they were on a local disk. This greatly improves development efficiency.
Best practices
Activate services and grant permissions
Activate OSS and create a bucket.
Log on to the Object Storage Service (OSS) console.
In the navigation pane on the left, click Buckets.
On the Buckets page, click Create Bucket.
In this example, the Bucket Name is
xxx-oss-test-sh.
Create a RAM role for MaxCompute and attach the role to the MaxCompute runtime environment.
Log on to the Resource Access Management (RAM) console.
In the navigation pane on the left, choose .
On the Roles page, click Create Role.
In the upper-right corner of the Create Role page, click Create Service Linked Role.
On the Create Role page, set Principal Type to Cloud Service.
For Principal Type, select MaxCompute.
On the Permissions tab, click Grant Permission. In the Grant Permission panel, select an access policy for the role and click OK.
Select the following access policies:
Permissions to manage OSS: AliyunOSSFullAccess
Permissions to manage MaxCompute: AliyunMaxComputeFullAccess
Mount OSS using with_fs_mount
Recommended method
from maxframe.udf import with_fs_mount @with_fs_mount( "oss://oss-cn-xxxx-internal.aliyuncs.com/xxx-oss-test-sh/test/", "/mnt/oss_data", storage_options={ "role_arn": "acs:ram::xxx:role/maxframe-oss" }, ) def _process(batch_df): import os if os.path.exists('/mnt/oss_data'): print(f"Mounted files: {os.listdir('/mnt/oss_data')}") else: print("/mnt/oss_data not mounted!") return batch_df * 2Not recommended
You can use this method for testing purposes. Do not use it in a production environment.
storage_options={ "oss_access_key_id": "LTAI5t...", "oss_access_key_secret": "Wp9H..." }ImportantAvoid hard-coding your AccessKey. Using
role_arnallows the system to automatically request a temporary Security Token Service (STS) token. This method prevents your AccessKey ID and AccessKey secret from being leaked.
Control resource allocation using with_running_options
Set appropriate CPU and memory resources based on the task type.
from maxframe.udf import with_running_options
@with_running_options(engine="dpe", cpu=2, memory=16)
@with_fs_mount(...)
def _process(batch_df):
...Parameter | Suggested value | Description |
| Fixed | FS Mount currently supports only the DPE engine. |
| 1 to 4 | Increase the value for complex I/O operations or decompression. |
| 8 GB or more | Use 16 GB or more to load large files. |
Usage example
Recommended pattern: data batch processing.
In large-scale data processing scenarios, you can use the MaxFrame apply_chunk feature to process input data in batches.
Create a MaxFrame session
import os
from odps import ODPS
from maxframe import new_session
from maxframe.udf import with_fs_mount
# Initialize the ODPS client.
o = ODPS(
# Make sure that the ALIBABA_CLOUD_ACCESS_KEY_ID environment variable is set to your AccessKey ID,
# and the ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variable is set to your AccessKey secret.
# Do not directly use the AccessKey ID and AccessKey secret strings.
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
project='<your project>',
endpoint='https://service.cn-<region>.maxcompute.aliyun.com/api',
)
# Set the runtime image.
# The maxframe_service_dpe_runtime image includes ossfs2_2.0.3.1_linux_x86_64.deb.
# If you use a custom image, you must download the OSS dependency package, and then upload and use it in the image. The dependency package is provided below the code block.
options.sql.settings = { "odps.session.image": "maxframe_service_dpe_runtime"}
# Start the session.
session = new_session(o)
print("LogView:", session.get_logview_address())
print("Session ID:", session.session_id)
@with_running_options(engine="dpe", cpu=2, memory=8)
@with_fs_mount(
"oss://oss-cn-<region>-internal.aliyuncs.com/wzy-oss-test-sh/test/",
"/mnt/oss_data",
storage_options={
"role_arn": "acs:ram::<uid>:role/maxframe-oss"
},
)OSSFS dependency package: ossfs2_2.0.3.1_linux_x86_64.deb
Create a user-defined function
def _process(batch_df):
import pandas as pd
import os
# Step 1: Check whether the mount is successful.
mount_point = "/mnt/oss_data"
if not os.path.exists(mount_point):
raise RuntimeError("OSS mount failed!")
# Step 2: Load data, such as mapping tables or dictionaries.
mapping_file = os.path.join(mount_point, "category_map.csv")
if os.path.isfile(mapping_file):
mapping_df = pd.read_csv(mapping_file)
# Step 3: Process the current chunk.
result = batch_df.copy()
result['F'] = result['A'] * 10
return resultBuild a DataFrame and apply the user-defined function
data = [[1.0, 2.0, 3.0, 4.0, 5.0], ...]
df = md.DataFrame(data, columns=['A', 'B', 'C', 'D', 'E'])
# Use apply_chunk to apply the function after mounting.
result_df = df.mf.apply_chunk(
_process,
skip_infer=True,
output_type="dataframe",
dtypes=df.dtypes,
index=df.index
)
# Execute the operation and fetch the result.
result = result_df.execute().fetch()Setting skip_infer=True skips type inference to speed up execution. However, you must ensure that dtypes and index are passed correctly.
Debugging tips
Verify the mount status
You can add debug logs to the _process function.
import os
print("Mount path exists:", os.path.exists("/mnt/oss_data"))
print("Files in mount:", os.listdir("/mnt/oss_data") if os.path.exists("/mnt/oss_data") else [])Check the LogView output to verify that a log similar to the following is generated:
FS Mount successful! /mnt/oss_data: ['data.csv', 'config.json', 'model.pkl']
Processing batch with shape: (1000, 5)