Topik ini menggunakan contoh kode untuk menunjukkan cara memasang dan menggunakan Alibaba Cloud Object Storage Service (OSS) secara efisien dan aman sebagai penyimpanan untuk komputasi terdistribusi di MaxFrame. Dekorator with_fs_mount memasang OSS pada tingkat sistem file, menyediakan akses data eksternal yang stabil dan andal untuk pemrosesan data skala besar.
Skenario
Gunakan metode ini untuk skenario analitik data besar yang memerlukan integrasi pekerjaan MaxFrame dengan penyimpanan objek persisten seperti OSS. Contohnya:
Muat data mentah dari OSS, lalu bersihkan atau proses.
Tulis hasil antara ke OSS agar dapat digunakan oleh tugas-tugas berikutnya.
Bagikan sumber daya statis, seperti file model yang telah dilatih dan file konfigurasi.
Metode baca dan tulis tradisional, seperti pd.read_csv("oss://..."), dibatasi oleh kinerja kit pengembangan perangkat lunak (SDK) dan overhead jaringan, sehingga tidak efisien dalam lingkungan terdistribusi. Dengan pemasangan pada tingkat sistem file (FS Mount), Anda dapat mengakses file OSS di MaxCompute seolah-olah berada di disk lokal, yang secara signifikan meningkatkan efisiensi pengembangan.
Praktik terbaik
Aktifkan layanan dan berikan izin
Aktifkan OSS dan buat bucket.
Masuk ke Konsol Object Storage Service (OSS).
Di panel navigasi sebelah kiri, klik Buckets.
Di halaman Buckets, klik Create Bucket.
Dalam contoh ini, nama bucket-nya adalah
xxx-oss-test-sh.
Buat peran RAM untuk MaxCompute dan sambungkan ke lingkungan runtime MaxCompute.
Masuk ke Konsol Resource Access Management (RAM).
Di panel navigasi sebelah kiri, pilih .
Di halaman Roles, klik Create Role.
Di pojok kanan atas halaman Create Role, klik Create Service Linked Role.
Di halaman Create Role, atur Principal Type ke Cloud Service.
Untuk Role Type, pilih Cloud-native Big Data Computing Service MaxCompute.
Di tab Permissions, klik Grant Permission. Di panel Grant Permission, pilih kebijakan akses untuk peran tersebut lalu klik OK.
Pilih kebijakan akses berikut:
Izin untuk mengelola Object Storage Service (OSS): AliyunOSSFullAccess
Izin untuk mengelola MaxCompute: AliyunMaxComputeFullAccess
Pasang OSS menggunakan with_fs_mount
Metode yang direkomendasikan
from maxframe.udf import with_fs_mount @with_fs_mount( "oss://oss-cn-xxxx-internal.aliyuncs.com/xxx-oss-test-sh/test/", "/mnt/oss_data", storage_options={ "role_arn": "acs:ram::xxx:role/maxframe-oss" }, ) def _process(batch_df): import os if os.path.exists('/mnt/oss_data'): print(f"Mounted files: {os.listdir('/mnt/oss_data')}") else: print("/mnt/oss_data not mounted!") return batch_df * 2Metode yang tidak direkomendasikan
Metode ini dapat digunakan untuk pengujian, tetapi tidak direkomendasikan untuk lingkungan produksi.
storage_options={ "oss_access_key_id": "LTAI5t...", "oss_access_key_secret": "Wp9H..." }PentingHindari hardcoding AccessKey Anda. Menggunakan
role_arnmemungkinkan sistem secara otomatis meminta token Security Token Service (STS) sementara. Praktik ini menghindari risiko kebocoran ID AccessKey dan rahasia AccessKey Anda.
Kontrol alokasi sumber daya dengan with_running_options
Tentukan sumber daya CPU dan memori yang wajar berdasarkan jenis tugas:
from maxframe.udf import with_running_options
@with_running_options(engine="dpe", cpu=2, memory=16)
@with_fs_mount(...)
def _process(batch_df):
...Parameter | Nilai yang direkomendasikan | Deskripsi |
| Tetap | FS Mount saat ini hanya mendukung mesin DPE. |
| 1 hingga 4 | Tingkatkan nilai ini untuk I/O kompleks atau dekompresi. |
| 8 GB atau lebih | Gunakan 16 GB atau lebih untuk memuat file besar. |
Contoh
Pola yang direkomendasikan: Pemrosesan batch data.
Untuk pemrosesan data skala besar, Anda dapat menggunakan fitur MaxFrame apply_chunk untuk memproses data masukan dalam batch.
Buat sesi MaxFrame dan aktifkan dukungan SQL
import os
from odps import ODPS
from maxframe import new_session
from maxframe.udf import with_fs_mount
# Inisialisasi klien ODPS
o = ODPS(
# Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID diatur ke ID AccessKey Anda.
# Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_SECRET diatur ke rahasia AccessKey Anda.
# Jangan gunakan string ID AccessKey dan rahasia AccessKey secara langsung.
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
project='<your project>',
endpoint='https://service.cn-<region>.maxcompute.aliyun.com/api',
)
# Atur image (jika Anda memiliki dependensi khusus)
options.sql.settings = { "odps.session.image": "maxframe_service_dpe_runtime"}
# Mulai sesi
session = new_session(o)
print("LogView:", session.get_logview_address())
print("Session ID:", session.session_id)
@with_fs_mount(
"oss://oss-cn-<region>-internal.aliyuncs.com/wzy-oss-test-sh/test/",
"/mnt/oss_data",
storage_options={
"role_arn": "acs:ram::<uid>:role/maxframe-oss"
},
)
@with_running_options(engine="dpe", cpu=2, memory=16)Buat user-defined function
def _process(batch_df):
import pandas as pd
import os
# Langkah 1: Periksa apakah pemasangan berhasil
mount_point = "/mnt/oss_data"
if not os.path.exists(mount_point):
raise RuntimeError("OSS mount failed!")
# Langkah 2: Muat data (seperti tabel pemetaan atau kamus)
mapping_file = os.path.join(mount_point, "category_map.csv")
if os.path.isfile(mapping_file):
mapping_df = pd.read_csv(mapping_file)
# Langkah 3: Proses chunk saat ini
result = batch_df.copy()
result['F'] = result['A'] * 10
return resultBuat DataFrame dan terapkan user-defined function
data = [[1.0, 2.0, 3.0, 4.0, 5.0], ...]
df = md.DataFrame(data, columns=['A', 'B', 'C', 'D', 'E'])
# Gunakan apply_chunk untuk menerapkan fungsi dengan pemasangan
result_df = df.mf.apply_chunk(
_process,
skip_infer=True,
output_type="dataframe",
dtypes=df.dtypes,
index=df.index
)
# Eksekusi dan ambil hasilnya
result = result_df.execute().fetch()skip_infer=True melewatkan inferensi tipe untuk mempercepat eksekusi. Pastikan untuk meneruskan dtypes dan index dengan benar.
Kiat debugging
Verifikasi status pemasangan
Tambahkan log debugging ke fungsi _process:
import os
print("Mount path exists:", os.path.exists("/mnt/oss_data"))
print("Files in mount:", os.listdir("/mnt/oss_data") if os.path.exists("/mnt/oss_data") else [])Periksa output LogView untuk memastikan log seperti berikut dihasilkan:
FS Mount successful! /mnt/oss_data: ['data.csv', 'config.json', 'model.pkl']
Processing batch with shape: (1000, 5)