All Products
Search
Document Center

MaxCompute:Proses data tidak terstruktur menggunakan MaxCompute External Volume

Last Updated:Mar 27, 2026

External volume berfungsi sebagai sistem file terdistribusi di MaxCompute yang didukung oleh Object Storage Service (OSS). Pasang external volume ke direktori OSS sehingga job Spark on MaxCompute dan MapReduce Anda dapat membaca serta menulis file melalui sistem izin MaxCompute—tanpa memberikan akses langsung ke OSS untuk setiap pengguna.

Setiap Proyek MaxCompute dapat memiliki beberapa external volume.

Kasus penggunaan

External volume berguna ketika Anda perlu:

  • Muat dependensi job saat startup — unduh secara otomatis file JAR, wheel Python, atau arsip model ke direktori kerja job sebelum eksekusi dimulai

  • Baca dan tulis file OSS dalam kode Spark — akses file yang disimpan di OSS menggunakan skema path odps:// secara langsung dalam kode job Spark Anda

  • Terapkan kontrol izin detail halus — gunakan sistem izin MaxCompute untuk mengatur siapa yang dapat membaca atau menulis path volume tertentu, alih-alih mengelola kebijakan bucket OSS per pengguna

  • Simpan output job ML — simpan data indeks atau file model yang dihasilkan oleh mesin seperti Proxima CE kembali ke OSS melalui volume

Penagihan

Data dalam external volume disimpan di OSS. Anda tidak dikenai biaya penyimpanan dalam MaxCompute. Biaya komputasi berlaku ketika mesin MaxCompute membaca atau memproses data dalam external volume—misalnya, saat menjalankan job Spark on MaxCompute atau MapReduce. Output yang ditulis kembali ke OSS (seperti data indeks dari Proxima CE) dikenai biaya penyimpanan OSS standar.

Prasyarat

Sebelum memulai, pastikan Anda telah:

Panduan cepat

Langkah 1: Berikan izin yang diperlukan

Untuk menggunakan external volume, akun Anda memerlukan izin berikut: CreateInstance, CreateVolume, List, Read, dan Write. Lihat Izin MaxCompute.

  1. Periksa apakah akun Anda memiliki izin CreateVolume:

    SHOW GRANTS FOR <user_name>;
  2. Jika izin CreateVolume belum ada, berikan izin tersebut:

    GRANT CreateVolume ON project <project_name> TO USER <user_name>;

    Untuk mencabut izin tersebut nanti:

    REVOKE CreateVolume ON project <project_name> FROM USER <user_name>;
  3. Jalankan kembali SHOW GRANTS untuk memastikan izin telah diberikan.

Langkah 2: Buat external volume

Jalankan perintah berikut menggunakan akun yang memiliki izin CreateVolume:

vfs -create <volume_name>
    -storage_provider oss
    -url oss://<oss_endpoint>/<bucket>/<path>
    -acd <true|false>
    -role_arn <arn:aliyun:xxx/aliyunodpsdefaultrole>

Untuk detail parameter dan operasi volume lainnya, lihat Operasi external volume.

Setelah pembuatan, path volume adalah odps://[project_name]/[volume_name]. Gunakan path ini dalam job Spark on MaxCompute dan MapReduce.

Langkah 3: Verifikasi volume

Daftar semua volume dalam proyek saat ini untuk memastikan volume telah dibuat:

vfs -ls /;

Gunakan Spark on MaxCompute dengan external volume

Spark on MaxCompute kompatibel dengan Spark open source dan berjalan pada sumber daya komputasi, set data, dan sistem izin terintegrasi MaxCompute.

Terdapat dua cara mengakses external volume dari job Spark:

  • Referensikan file saat startup job — file volume diunduh ke direktori kerja job sebelum job dimulai

  • Akses file dalam kode — gunakan skema path odps:// secara langsung dalam kode Spark Anda untuk membaca dan menulis file volume saat waktu proses

Referensikan file saat startup job

Konfigurasikan parameter berikut di bagian Parameters pada node ODPS Spark DataWorks, atau dalam file spark-defaults.conf. Parameter ini tidak dapat diatur di dalam kode job Anda.

Parameter Deskripsi
spark.hadoop.odps.cupid.volume.files File yang akan diunduh ke direktori kerja job sebelum startup. Pisahkan beberapa file dengan koma. Setiap nilai harus menyertakan nama file lengkap.
spark.hadoop.odps.cupid.volume.archives File arsip (.zip, .tar.gz, .tar) yang akan diunduh dan diekstrak ke direktori kerja job sebelum startup. Pisahkan beberapa arsip dengan koma.

Format nilai:

odps://[project_name]/[volume_name]/[path_to_file]

Contoh — file:

spark.hadoop.odps.cupid.volume.files=
odps://mc_project/external_volume/data/mllib/kmeans_data.txt,
odps://mc_project/external_volume/target/PythonKMeansExample/KMeansModel/data/part-00000-a2d44ac5-54f6-49fd-b793-f11e6a189f90-c000.snappy.parquet

Setelah job dimulai, direktori kerja berisi kmeans_data.txt dan part-00000-a2d44ac5-54f6-49fd-b793-f11e6a189f90-c000.snappy.parquet.

Contoh — arsip:

spark.hadoop.odps.cupid.volume.archives=
odps://spark_test_wj2/external_volume/pyspark-3.1.1.zip,
odps://spark_test_wj2/external_volume/python-3.7.9-ucs4.tar.gz

Setelah job dimulai, direktori kerja berisi konten hasil ekstraksi dari pyspark-3.1.1.zip dan python-3.7.9-ucs4.tar.gz.

Akses file dalam kode

Untuk membaca dan menulis file external volume dari kode job Spark Anda, atur parameter berikut dalam kode:

Parameter Nilai Deskripsi
spark.hadoop.odps.volume.common.filesystem true Mengaktifkan pengenalan external volume. Default: false.
spark.hadoop.odps.cupid.volume.paths odps://[project_name]/[volume_name]/ Path volume yang akan diakses. Default: kosong.
spark.hadoop.fs.odps.impl org.apache.hadoop.fs.aliyun.volume.OdpsVolumeFileSystem Kelas implementasi untuk akses OSS.
spark.hadoop.fs.AbstractFileSystem.odps.impl org.apache.hadoop.fs.aliyun.volume.abstractfsimpl.OdpsVolumeFs Kelas implementasi sistem file abstrak.

Contoh — pengelompokan K-means dengan external volume:

Contoh berikut menggunakan algoritma K-means. Data pelatihan dibaca dari odps://ms_proj1_dev/volume_yyy1/, model dilatih, dan output disimpan kembali ke volume yang sama.

Semua path file dalam kode menggunakan skema odps:// untuk membaca dari dan menulis ke external volume.

Catatan

Atur keempat parameter di atas dalam spark-defaults.conf atau di bagian Parameters pada node ODPS Spark DataWorks sebelum menjalankan kode ini. Contoh ini juga memerlukan parameter tambahan berikut untuk akses OSS, SDK JindoFS, dan runtime Python:

-- Parameters
spark.hadoop.odps.cupid.volume.paths=odps://ms_proj1_dev/volume_yyy1/
spark.hadoop.odps.volume.common.filesystem=true
spark.hadoop.fs.odps.impl=org.apache.hadoop.fs.aliyun.volume.OdpsVolumeFileSystem
spark.hadoop.fs.AbstractFileSystem.odps.impl=org.apache.hadoop.fs.aliyun.volume.abstractfsimpl.OdpsVolumeFs

spark.hadoop.odps.access.id=xxxxxxxxx
spark.hadoop.odps.access.key=xxxxxxxxx
spark.hadoop.fs.oss.endpoint=oss-cn-beijing-internal.aliyuncs.com
spark.hadoop.odps.cupid.resources=ms_proj1_dev.jindofs-sdk-3.8.0.jar
spark.hadoop.fs.oss.impl=com.aliyun.emr.fs.oss.JindoOssFileSystem

spark.hadoop.odps.cupid.resources=public.python-2.7.13-ucs4.tar.gz
spark.pyspark.python=./public.python-2.7.13-ucs4.tar.gz/python-2.7.13-ucs4/bin/python
spark.hadoop.odps.spark.version=spark-2.4.5-odps0.34.0

-- Code
from numpy import array
from math import sqrt

from pyspark import SparkContext
from pyspark.mllib.clustering import KMeans, KMeansModel

if __name__ == "__main__":
    sc = SparkContext(appName="KMeansExample")

    # Baca data pelatihan dari external volume
    data = sc.textFile("odps://ms_proj1_dev/volume_yyy1/kmeans_data.txt")
    parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))

    # Latih model K-means
    clusters = KMeans.train(parsedData, 2, maxIterations=10, initializationMode="random")

    # Evaluasi model
    def error(point):
        center = clusters.centers[clusters.predict(point)]
        return sqrt(sum([x**2 for x in (point - center)]))

    WSSSE = parsedData.map(lambda point: error(point)).reduce(lambda x, y: x + y)
    print("Within Set Sum of Squared Error = " + str(WSSSE))

    # Simpan model ke external volume
    clusters.save(sc, "odps://ms_proj1_dev/volume_yyy1/target/PythonKMeansExample/KMeansModel")
    print(parsedData.map(lambda feature: clusters.predict(feature)).collect())

    # Muat dan gunakan model yang disimpan
    sameModel = KMeansModel.load(sc, "odps://ms_proj1_dev/volume_yyy1/target/PythonKMeansExample/KMeansModel")
    print(parsedData.map(lambda feature: sameModel.predict(feature)).collect())

    sc.stop()

Setelah job selesai, lihat file output di direktori OSS yang dipetakan ke volume tersebut.

Impor dan Ekspor Data antara File External Volume dan Tabel Internal

Untuk mengimpor file data dari External Volume ke tabel atau partisi MaxCompute, Anda dapat menggunakan perintah LOAD. Untuk menyimpan data terstruktur di OSS, Anda dapat mengekspor file data dari proyek MaxCompute ke OSS menggunakan perintah UNLOAD.

LOAD

  • Sintaks:

    {load overwrite|into} table <table_name> [partition (<pt_spec>)]
    from location <Volume_location>
    stored by <StorageHandler>
    [with serdeproperties (<Options>)];

    Volume_location adalah path External Volume yang ditentukan. Formatnya adalah odps://[project_name]/[volume_name]/. project_name adalah nama Proyek MaxCompute. volume_name adalah nama External Volume. Untuk deskripsi parameter lainnya, lihat LOAD.

    Penting

    Anda juga harus mengonfigurasi parameter odps.properties.rolearn dalam daftar with serdeproperties untuk menyediakan informasi autentikasi RoleArn. Informasi RoleArn dapat berbeda dari External Volume, selama memiliki izin untuk mengakses folder OSS. Untuk informasi lebih lanjut, lihat Tabel eksternal ORC.

  • Contoh:

    Dalam contoh ini, path OSS yang dipetakan oleh External Volume berisi beberapa file CSV dengan skema seragam. External Volume bernama volume_external telah dibuat. Anda dapat menjalankan pernyataan berikut untuk mengimpor file ke tabel internal MaxCompute.

    -- Buat tabel sink ambulance_data_csv_load
    create table ambulance_data_csv_load (
      vehicleId INT,
      recordId INT,
      patientId INT,
      calls INT,
      locationLatitute DOUBLE,
      locationLongtitue DOUBLE,
      recordTime STRING,
      direction STRING );
    
    -- Jalankan perintah load
    load overwrite table ambulance_data_csv_load
    from
    location 'odps://<project_name>/volume_external/'
    stored by 'com.aliyun.odps.CsvStorageHandler'
    with serdeproperties (
      'odps.properties.rolearn'='acs:ram::xxxxx:role/aliyunodpsdefaultrole',
      'odps.text.option.delimiter'=','
    );
    
    -- Kueri
    SELECT * from ambulance_data_csv_load;
    
    -- Hasil contoh
    vehicleid	recordid	patientid	calls	locationlatitute	locationlongtitue	recordtime	direction
    1	1	51	1	46.81006	-92.08174	9/14/2014 0:00	S
    1	2	13	1	46.81006	-92.08174	9/14/2014 0:01	NE
    1	3	48	1	46.81006	-92.08174	9/14/2014 0:02	NE
    1	4	30	1	46.81006	-92.08174	9/14/2014 0:03	W
    1	5	47	1	46.81006	-92.08174	9/14/2014 0:04	S
    1	6	9	1	46.81006	-92.08174	9/14/2014 0:05	S
    1	7	53	1	46.81006	-92.08174	9/14/2014 0:06	N
    1	8	63	1	46.81006	-92.08174	9/14/2014 0:07	SW
    1	9	4	1	46.81006	-92.08174	9/14/2014 0:08	NE
    1	10	31	1	46.81006	-92.08174	9/14/2014 0:09	N

UNLOAD

  • Sintaks:

    unload from {<select_statement>|<table_name> [partition (<pt_spec>)]} 
    into 
    location <Volume_location>
    [stored by <StorageHandler>]
    [with serdeproperties ('<property_name>'='<property_value>',...)];

    Volume_location adalah path External Volume yang ditentukan. Formatnya adalah odps://[project_name]/[volume_name]/. project_name adalah nama Proyek MaxCompute. volume_name adalah nama External Volume. Untuk deskripsi parameter lainnya, lihat UNLOAD.

  • Contoh:

    Contoh ini menunjukkan cara mengekspor data dari tabel internal MaxCompute ke path OSS yang dipetakan oleh External Volume. External Volume bernama volume_external_unload telah dibuat. Anda dapat menjalankan pernyataan berikut untuk mengekspor data tabel ke External Volume.

    -- Kontrol jumlah file yang diekspor: Atur ukuran data tabel MaxCompute yang dapat dibaca oleh satu worker, dalam MB. Karena tabel MaxCompute dikompresi, data yang diekspor ke OSS umumnya sekitar empat kali lebih besar.
    set odps.stage.mapper.split.size=256;
    -- Ekspor data.
    unload from
    (select * from ambulance_data_csv_load)
    into
    location 'odps://project_name/volume_external_unload'
    stored by 'com.aliyun.odps.CsvStorageHandler'
    with serdeproperties (
    'odps.text.option.delimiter'=',');
    
    -- Ini setara dengan pernyataan berikut.
    set odps.stage.mapper.split.size=256;
    unload from ambulance_data_csv_load 
    into
    location 'odps://project_name/volume_external_unload'
    stored by 'com.aliyun.odps.CsvStorageHandler'
    with serdeproperties (
      'odps.properties.rolearn'='acs:ram::139xxx:role/aliyunodpsdefaultrole',
      'odps.text.option.delimiter'=','); 

    Hasil contoh: File data dihasilkan di folder OSS yang dipetakan oleh External Volume.

Buat tabel eksternal dari file External Volume

Jika path OSS yang dipetakan oleh External Volume berisi file semi-terstruktur dengan skema seragam, seperti CSV, PARQUET, atau CRC, Anda dapat menjalankan perintah berikut untuk membuat tabel eksternal dari External Volume. Untuk informasi lebih lanjut tentang sintaks pembuatan tabel eksternal, lihat Tabel eksternal ORC.

  • Sintaks:

    create external table [if not exists] <mc_oss_extable_name> 
    (
    <col_name> <data_type>,
    ...
    )
    [partitioned by (<col_name> <data_type>, ...)] 
    stored by '<StorageHandler>'  
    with serdeproperties (
     ['<property_name>'='<property_value>',...]
    ) 
    location '<Volume_location>';

    Volume_location adalah path External Volume yang ditentukan. Formatnya adalah odps://[project_name]/[volume_name]/. project_name adalah nama Proyek MaxCompute. volume_name adalah nama External Volume. Untuk deskripsi parameter lainnya, lihat Tabel eksternal ORC.

    Catatan

    Anda juga harus mengonfigurasi parameter odps.properties.rolearn dalam daftar with serdeproperties untuk menyediakan informasi autentikasi RoleArn. Informasi RoleArn dapat berbeda dari External Volume, selama memiliki izin untuk mengakses folder OSS. Untuk informasi lebih lanjut, lihat Tabel eksternal ORC.

  • Contoh:

    Dalam contoh ini, path OSS yang dipetakan oleh External Volume berisi beberapa file CSV dengan skema seragam. External Volume bernama demo_volume3 telah dibuat. Anda dapat menjalankan pernyataan berikut untuk membuat dan mengkueri tabel eksternal.

    create external table ext_tbl_onvolume
    (
        col1 string,
        col2 string,
        col3 string
    )
    stored by 'com.aliyun.odps.CsvStorageHandler' 
    with serdeproperties (
     'odps.properties.rolearn'='acs:ram::1248xxx:role/aliyunodpsdefaultrole'
    ) 
    location 'odps://project_name/demo_volume3/';
    
    -- Kueri tabel eksternal
    SELECT * from ext_tbl_onvolume;

Gunakan Proxima CE untuk vektorisasi di MaxCompute

Proxima CE melakukan pengindeksan vektor dan pencarian tetangga terdekat pada data yang disimpan di tabel MaxCompute. Hasilnya disimpan ke external volume di OSS.

Batasan

  • SDK Proxima untuk Java hanya mendukung Linux dan macOS. File JAR berisi dependensi khusus Linux dan tidak dapat dijalankan pada klien MaxCompute di Windows.

  • Proxima CE menjalankan dua jenis tugas: tugas lokal (tidak melibatkan SQL, MapReduce, atau Graph) dan tugas MaxCompute (dieksekusi melalui mesin SQL, MapReduce, atau Graph). Kedua jenis tugas dijalankan secara bergantian. Saat startup, Proxima CE mencoba memuat kernel Proxima pada mesin lokal. Jika kernel berhasil dimuat, modul tertentu dijalankan secara lokal; jika gagal dimuat, kesalahan dilaporkan tetapi job tetap berlanjut menggunakan fungsi fallback.

  • Kirimkan tugas menggunakan klien MaxCompute (odpscmd). Node MapReduce DataWorks tidak didukung karena versi klien MaxCompute yang mendasarinya sedang dalam proses peningkatan.

Jalankan tugas vektorisasi Proxima CE

Langkah 1: Instal paket resource Proxima CE.

Langkah 2: Siapkan data masukan.

Buat tabel masukan dan masukkan data sampel:

-- Buat tabel dasar dan tabel kueri
CREATE TABLE doc_table_float_smoke(pk STRING, vector STRING) PARTITIONED BY (pt STRING);
CREATE TABLE query_table_float_smoke(pk STRING, vector STRING) PARTITIONED BY (pt STRING);

-- Masukkan data ke tabel dasar
ALTER TABLE doc_table_float_smoke ADD PARTITION(pt='20230116');
INSERT OVERWRITE TABLE doc_table_float_smoke PARTITION (pt='20230116') VALUES
('1.nid','1~1~1~1~1~1~1~1'),
('2.nid','2~2~2~2~2~2~2~2'),
('3.nid','3~3~3~3~3~3~3~3'),
('4.nid','4~4~4~4~4~4~4~4'),
('5.nid','5~5~5~5~5~5~5~5'),
('6.nid','6~6~6~6~6~6~6~6'),
('7.nid','7~7~7~7~7~7~7~7'),
('8.nid','8~8~8~8~8~8~8~8'),
('9.nid','9~9~9~9~9~9~9~9'),
('10.nid','10~10~10~10~10~10~10~10');

-- Masukkan data ke tabel kueri
ALTER TABLE query_table_float_smoke ADD PARTITION(pt='20230116');
INSERT OVERWRITE TABLE query_table_float_smoke PARTITION (pt='20230116') VALUES
('q1.nid','1~1~1~1~2~2~2~2'),
('q2.nid','4~4~4~4~3~3~3~3'),
('q3.nid','9~9~9~9~5~5~5~5');

Langkah 3: Kirimkan tugas Proxima CE.

jar -libjars proxima-ce-aliyun-1.0.0.jar
-classpath proxima-ce-aliyun-1.0.0.jar com.alibaba.proxima2.ce.ProximaCERunner
-doc_table doc_table_float_smoke
-doc_table_partition 20230116
-query_table query_table_float_smoke
-query_table_partition 20230116
-output_table output_table_float_smoke
-output_table_partition 20230116
-data_type float
-dimension 8
-topk 1
-job_mode train:build:seek:recall
-external_volume shanghai_vol_ceshi
-owner_id 1248953xxx
;

Langkah 4: Verifikasi hasil.

Kueri tabel output untuk memeriksa hasil tetangga terdekat:

SELECT * FROM output_table_float_smoke WHERE pt='20230116';

Output yang diharapkan:

+------------+------------+------------+------------+
| pk         | knn_result | score      | pt         |
+------------+------------+------------+------------+
| q1.nid     | 2.nid      | 4.0        | 20230116   |
| q1.nid     | 1.nid      | 4.0        | 20230116   |
| q1.nid     | 3.nid      | 20.0       | 20230116   |
| q2.nid     | 4.nid      | 4.0        | 20230116   |
| q2.nid     | 3.nid      | 4.0        | 20230116   |
| q2.nid     | 2.nid      | 20.0       | 20230116   |
| q3.nid     | 7.nid      | 32.0       | 20230116   |
| q3.nid     | 8.nid      | 40.0       | 20230116   |
| q3.nid     | 6.nid      | 40.0       | 20230116   |
+------------+------------+------------+------------+

Lanjutan