External volume berfungsi sebagai sistem file terdistribusi di MaxCompute yang didukung oleh Object Storage Service (OSS). Pasang external volume ke direktori OSS sehingga job Spark on MaxCompute dan MapReduce Anda dapat membaca serta menulis file melalui sistem izin MaxCompute—tanpa memberikan akses langsung ke OSS untuk setiap pengguna.
Setiap Proyek MaxCompute dapat memiliki beberapa external volume.
Kasus penggunaan
External volume berguna ketika Anda perlu:
-
Muat dependensi job saat startup — unduh secara otomatis file JAR, wheel Python, atau arsip model ke direktori kerja job sebelum eksekusi dimulai
-
Baca dan tulis file OSS dalam kode Spark — akses file yang disimpan di OSS menggunakan skema path
odps://secara langsung dalam kode job Spark Anda -
Terapkan kontrol izin detail halus — gunakan sistem izin MaxCompute untuk mengatur siapa yang dapat membaca atau menulis path volume tertentu, alih-alih mengelola kebijakan bucket OSS per pengguna
-
Simpan output job ML — simpan data indeks atau file model yang dihasilkan oleh mesin seperti Proxima CE kembali ke OSS melalui volume
Penagihan
Data dalam external volume disimpan di OSS. Anda tidak dikenai biaya penyimpanan dalam MaxCompute. Biaya komputasi berlaku ketika mesin MaxCompute membaca atau memproses data dalam external volume—misalnya, saat menjalankan job Spark on MaxCompute atau MapReduce. Output yang ditulis kembali ke OSS (seperti data indeks dari Proxima CE) dikenai biaya penyimpanan OSS standar.
Prasyarat
Sebelum memulai, pastikan Anda telah:
-
Mengajukan dan menerima persetujuan untuk penggunaan percobaan external volume. Lihat Ajukan penggunaan percobaan fitur baru
-
Klien MaxCompute (odpscmd) versi V0.43.0 atau lebih baru telah diinstal. Lihat Klien MaxCompute (odpscmd). Jika Anda menggunakan SDK untuk Java, diperlukan versi V0.43.0 atau lebih baru. Lihat Pembaruan versi
-
Bucket OSS telah dibuat. Lihat Buat bucket
-
Proyek MaxCompute Anda telah diberi otorisasi untuk mengakses OSS. Lihat Konfigurasikan metode akses OSS
Panduan cepat
Langkah 1: Berikan izin yang diperlukan
Untuk menggunakan external volume, akun Anda memerlukan izin berikut: CreateInstance, CreateVolume, List, Read, dan Write. Lihat Izin MaxCompute.
-
Periksa apakah akun Anda memiliki izin
CreateVolume:SHOW GRANTS FOR <user_name>; -
Jika izin
CreateVolumebelum ada, berikan izin tersebut:GRANT CreateVolume ON project <project_name> TO USER <user_name>;Untuk mencabut izin tersebut nanti:
REVOKE CreateVolume ON project <project_name> FROM USER <user_name>; -
Jalankan kembali
SHOW GRANTSuntuk memastikan izin telah diberikan.
Langkah 2: Buat external volume
Jalankan perintah berikut menggunakan akun yang memiliki izin CreateVolume:
vfs -create <volume_name>
-storage_provider oss
-url oss://<oss_endpoint>/<bucket>/<path>
-acd <true|false>
-role_arn <arn:aliyun:xxx/aliyunodpsdefaultrole>
Untuk detail parameter dan operasi volume lainnya, lihat Operasi external volume.
Setelah pembuatan, path volume adalah odps://[project_name]/[volume_name]. Gunakan path ini dalam job Spark on MaxCompute dan MapReduce.
Langkah 3: Verifikasi volume
Daftar semua volume dalam proyek saat ini untuk memastikan volume telah dibuat:
vfs -ls /;
Gunakan Spark on MaxCompute dengan external volume
Spark on MaxCompute kompatibel dengan Spark open source dan berjalan pada sumber daya komputasi, set data, dan sistem izin terintegrasi MaxCompute.
Terdapat dua cara mengakses external volume dari job Spark:
-
Referensikan file saat startup job — file volume diunduh ke direktori kerja job sebelum job dimulai
-
Akses file dalam kode — gunakan skema path
odps://secara langsung dalam kode Spark Anda untuk membaca dan menulis file volume saat waktu proses
Referensikan file saat startup job
Konfigurasikan parameter berikut di bagian Parameters pada node ODPS Spark DataWorks, atau dalam file spark-defaults.conf. Parameter ini tidak dapat diatur di dalam kode job Anda.
| Parameter | Deskripsi |
|---|---|
spark.hadoop.odps.cupid.volume.files |
File yang akan diunduh ke direktori kerja job sebelum startup. Pisahkan beberapa file dengan koma. Setiap nilai harus menyertakan nama file lengkap. |
spark.hadoop.odps.cupid.volume.archives |
File arsip (.zip, .tar.gz, .tar) yang akan diunduh dan diekstrak ke direktori kerja job sebelum startup. Pisahkan beberapa arsip dengan koma. |
Format nilai:
odps://[project_name]/[volume_name]/[path_to_file]
Contoh — file:
spark.hadoop.odps.cupid.volume.files=
odps://mc_project/external_volume/data/mllib/kmeans_data.txt,
odps://mc_project/external_volume/target/PythonKMeansExample/KMeansModel/data/part-00000-a2d44ac5-54f6-49fd-b793-f11e6a189f90-c000.snappy.parquet
Setelah job dimulai, direktori kerja berisi kmeans_data.txt dan part-00000-a2d44ac5-54f6-49fd-b793-f11e6a189f90-c000.snappy.parquet.
Contoh — arsip:
spark.hadoop.odps.cupid.volume.archives=
odps://spark_test_wj2/external_volume/pyspark-3.1.1.zip,
odps://spark_test_wj2/external_volume/python-3.7.9-ucs4.tar.gz
Setelah job dimulai, direktori kerja berisi konten hasil ekstraksi dari pyspark-3.1.1.zip dan python-3.7.9-ucs4.tar.gz.
Akses file dalam kode
Untuk membaca dan menulis file external volume dari kode job Spark Anda, atur parameter berikut dalam kode:
| Parameter | Nilai | Deskripsi |
|---|---|---|
spark.hadoop.odps.volume.common.filesystem |
true |
Mengaktifkan pengenalan external volume. Default: false. |
spark.hadoop.odps.cupid.volume.paths |
odps://[project_name]/[volume_name]/ |
Path volume yang akan diakses. Default: kosong. |
spark.hadoop.fs.odps.impl |
org.apache.hadoop.fs.aliyun.volume.OdpsVolumeFileSystem |
Kelas implementasi untuk akses OSS. |
spark.hadoop.fs.AbstractFileSystem.odps.impl |
org.apache.hadoop.fs.aliyun.volume.abstractfsimpl.OdpsVolumeFs |
Kelas implementasi sistem file abstrak. |
Contoh — pengelompokan K-means dengan external volume:
Contoh berikut menggunakan algoritma K-means. Data pelatihan dibaca dari odps://ms_proj1_dev/volume_yyy1/, model dilatih, dan output disimpan kembali ke volume yang sama.
Semua path file dalam kode menggunakan skema odps:// untuk membaca dari dan menulis ke external volume.
Atur keempat parameter di atas dalam spark-defaults.conf atau di bagian Parameters pada node ODPS Spark DataWorks sebelum menjalankan kode ini. Contoh ini juga memerlukan parameter tambahan berikut untuk akses OSS, SDK JindoFS, dan runtime Python:
-- Parameters
spark.hadoop.odps.cupid.volume.paths=odps://ms_proj1_dev/volume_yyy1/
spark.hadoop.odps.volume.common.filesystem=true
spark.hadoop.fs.odps.impl=org.apache.hadoop.fs.aliyun.volume.OdpsVolumeFileSystem
spark.hadoop.fs.AbstractFileSystem.odps.impl=org.apache.hadoop.fs.aliyun.volume.abstractfsimpl.OdpsVolumeFs
spark.hadoop.odps.access.id=xxxxxxxxx
spark.hadoop.odps.access.key=xxxxxxxxx
spark.hadoop.fs.oss.endpoint=oss-cn-beijing-internal.aliyuncs.com
spark.hadoop.odps.cupid.resources=ms_proj1_dev.jindofs-sdk-3.8.0.jar
spark.hadoop.fs.oss.impl=com.aliyun.emr.fs.oss.JindoOssFileSystem
spark.hadoop.odps.cupid.resources=public.python-2.7.13-ucs4.tar.gz
spark.pyspark.python=./public.python-2.7.13-ucs4.tar.gz/python-2.7.13-ucs4/bin/python
spark.hadoop.odps.spark.version=spark-2.4.5-odps0.34.0
-- Codefrom numpy import array
from math import sqrt
from pyspark import SparkContext
from pyspark.mllib.clustering import KMeans, KMeansModel
if __name__ == "__main__":
sc = SparkContext(appName="KMeansExample")
# Baca data pelatihan dari external volume
data = sc.textFile("odps://ms_proj1_dev/volume_yyy1/kmeans_data.txt")
parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))
# Latih model K-means
clusters = KMeans.train(parsedData, 2, maxIterations=10, initializationMode="random")
# Evaluasi model
def error(point):
center = clusters.centers[clusters.predict(point)]
return sqrt(sum([x**2 for x in (point - center)]))
WSSSE = parsedData.map(lambda point: error(point)).reduce(lambda x, y: x + y)
print("Within Set Sum of Squared Error = " + str(WSSSE))
# Simpan model ke external volume
clusters.save(sc, "odps://ms_proj1_dev/volume_yyy1/target/PythonKMeansExample/KMeansModel")
print(parsedData.map(lambda feature: clusters.predict(feature)).collect())
# Muat dan gunakan model yang disimpan
sameModel = KMeansModel.load(sc, "odps://ms_proj1_dev/volume_yyy1/target/PythonKMeansExample/KMeansModel")
print(parsedData.map(lambda feature: sameModel.predict(feature)).collect())
sc.stop()
Setelah job selesai, lihat file output di direktori OSS yang dipetakan ke volume tersebut.
Impor dan Ekspor Data antara File External Volume dan Tabel Internal
Untuk mengimpor file data dari External Volume ke tabel atau partisi MaxCompute, Anda dapat menggunakan perintah LOAD. Untuk menyimpan data terstruktur di OSS, Anda dapat mengekspor file data dari proyek MaxCompute ke OSS menggunakan perintah UNLOAD.
LOAD
-
Sintaks:
{load overwrite|into} table <table_name> [partition (<pt_spec>)] from location <Volume_location> stored by <StorageHandler> [with serdeproperties (<Options>)];Volume_location adalah path External Volume yang ditentukan. Formatnya adalah
odps://[project_name]/[volume_name]/. project_name adalah nama Proyek MaxCompute. volume_name adalah nama External Volume. Untuk deskripsi parameter lainnya, lihat LOAD.PentingAnda juga harus mengonfigurasi parameter
odps.properties.rolearndalam daftarwith serdepropertiesuntuk menyediakan informasi autentikasi RoleArn. Informasi RoleArn dapat berbeda dari External Volume, selama memiliki izin untuk mengakses folder OSS. Untuk informasi lebih lanjut, lihat Tabel eksternal ORC. -
Contoh:
Dalam contoh ini, path OSS yang dipetakan oleh External Volume berisi beberapa file CSV dengan skema seragam. External Volume bernama
volume_externaltelah dibuat. Anda dapat menjalankan pernyataan berikut untuk mengimpor file ke tabel internal MaxCompute.-- Buat tabel sink ambulance_data_csv_load create table ambulance_data_csv_load ( vehicleId INT, recordId INT, patientId INT, calls INT, locationLatitute DOUBLE, locationLongtitue DOUBLE, recordTime STRING, direction STRING ); -- Jalankan perintah load load overwrite table ambulance_data_csv_load from location 'odps://<project_name>/volume_external/' stored by 'com.aliyun.odps.CsvStorageHandler' with serdeproperties ( 'odps.properties.rolearn'='acs:ram::xxxxx:role/aliyunodpsdefaultrole', 'odps.text.option.delimiter'=',' ); -- Kueri SELECT * from ambulance_data_csv_load; -- Hasil contoh vehicleid recordid patientid calls locationlatitute locationlongtitue recordtime direction 1 1 51 1 46.81006 -92.08174 9/14/2014 0:00 S 1 2 13 1 46.81006 -92.08174 9/14/2014 0:01 NE 1 3 48 1 46.81006 -92.08174 9/14/2014 0:02 NE 1 4 30 1 46.81006 -92.08174 9/14/2014 0:03 W 1 5 47 1 46.81006 -92.08174 9/14/2014 0:04 S 1 6 9 1 46.81006 -92.08174 9/14/2014 0:05 S 1 7 53 1 46.81006 -92.08174 9/14/2014 0:06 N 1 8 63 1 46.81006 -92.08174 9/14/2014 0:07 SW 1 9 4 1 46.81006 -92.08174 9/14/2014 0:08 NE 1 10 31 1 46.81006 -92.08174 9/14/2014 0:09 N
UNLOAD
-
Sintaks:
unload from {<select_statement>|<table_name> [partition (<pt_spec>)]} into location <Volume_location> [stored by <StorageHandler>] [with serdeproperties ('<property_name>'='<property_value>',...)];Volume_location adalah path External Volume yang ditentukan. Formatnya adalah
odps://[project_name]/[volume_name]/. project_name adalah nama Proyek MaxCompute. volume_name adalah nama External Volume. Untuk deskripsi parameter lainnya, lihat UNLOAD. -
Contoh:
Contoh ini menunjukkan cara mengekspor data dari tabel internal MaxCompute ke path OSS yang dipetakan oleh External Volume. External Volume bernama
volume_external_unloadtelah dibuat. Anda dapat menjalankan pernyataan berikut untuk mengekspor data tabel ke External Volume.-- Kontrol jumlah file yang diekspor: Atur ukuran data tabel MaxCompute yang dapat dibaca oleh satu worker, dalam MB. Karena tabel MaxCompute dikompresi, data yang diekspor ke OSS umumnya sekitar empat kali lebih besar. set odps.stage.mapper.split.size=256; -- Ekspor data. unload from (select * from ambulance_data_csv_load) into location 'odps://project_name/volume_external_unload' stored by 'com.aliyun.odps.CsvStorageHandler' with serdeproperties ( 'odps.text.option.delimiter'=','); -- Ini setara dengan pernyataan berikut. set odps.stage.mapper.split.size=256; unload from ambulance_data_csv_load into location 'odps://project_name/volume_external_unload' stored by 'com.aliyun.odps.CsvStorageHandler' with serdeproperties ( 'odps.properties.rolearn'='acs:ram::139xxx:role/aliyunodpsdefaultrole', 'odps.text.option.delimiter'=',');Hasil contoh: File data dihasilkan di folder OSS yang dipetakan oleh External Volume.
Buat tabel eksternal dari file External Volume
Jika path OSS yang dipetakan oleh External Volume berisi file semi-terstruktur dengan skema seragam, seperti CSV, PARQUET, atau CRC, Anda dapat menjalankan perintah berikut untuk membuat tabel eksternal dari External Volume. Untuk informasi lebih lanjut tentang sintaks pembuatan tabel eksternal, lihat Tabel eksternal ORC.
-
Sintaks:
create external table [if not exists] <mc_oss_extable_name> ( <col_name> <data_type>, ... ) [partitioned by (<col_name> <data_type>, ...)] stored by '<StorageHandler>' with serdeproperties ( ['<property_name>'='<property_value>',...] ) location '<Volume_location>';Volume_location adalah path External Volume yang ditentukan. Formatnya adalah
odps://[project_name]/[volume_name]/. project_name adalah nama Proyek MaxCompute. volume_name adalah nama External Volume. Untuk deskripsi parameter lainnya, lihat Tabel eksternal ORC.CatatanAnda juga harus mengonfigurasi parameter
odps.properties.rolearndalam daftarwith serdepropertiesuntuk menyediakan informasi autentikasi RoleArn. Informasi RoleArn dapat berbeda dari External Volume, selama memiliki izin untuk mengakses folder OSS. Untuk informasi lebih lanjut, lihat Tabel eksternal ORC. -
Contoh:
Dalam contoh ini, path OSS yang dipetakan oleh External Volume berisi beberapa file CSV dengan skema seragam. External Volume bernama
demo_volume3telah dibuat. Anda dapat menjalankan pernyataan berikut untuk membuat dan mengkueri tabel eksternal.create external table ext_tbl_onvolume ( col1 string, col2 string, col3 string ) stored by 'com.aliyun.odps.CsvStorageHandler' with serdeproperties ( 'odps.properties.rolearn'='acs:ram::1248xxx:role/aliyunodpsdefaultrole' ) location 'odps://project_name/demo_volume3/'; -- Kueri tabel eksternal SELECT * from ext_tbl_onvolume;
Gunakan Proxima CE untuk vektorisasi di MaxCompute
Proxima CE melakukan pengindeksan vektor dan pencarian tetangga terdekat pada data yang disimpan di tabel MaxCompute. Hasilnya disimpan ke external volume di OSS.
Batasan
-
SDK Proxima untuk Java hanya mendukung Linux dan macOS. File JAR berisi dependensi khusus Linux dan tidak dapat dijalankan pada klien MaxCompute di Windows.
-
Proxima CE menjalankan dua jenis tugas: tugas lokal (tidak melibatkan SQL, MapReduce, atau Graph) dan tugas MaxCompute (dieksekusi melalui mesin SQL, MapReduce, atau Graph). Kedua jenis tugas dijalankan secara bergantian. Saat startup, Proxima CE mencoba memuat kernel Proxima pada mesin lokal. Jika kernel berhasil dimuat, modul tertentu dijalankan secara lokal; jika gagal dimuat, kesalahan dilaporkan tetapi job tetap berlanjut menggunakan fungsi fallback.
-
Kirimkan tugas menggunakan klien MaxCompute (odpscmd). Node MapReduce DataWorks tidak didukung karena versi klien MaxCompute yang mendasarinya sedang dalam proses peningkatan.
Jalankan tugas vektorisasi Proxima CE
Langkah 1: Instal paket resource Proxima CE.
Langkah 2: Siapkan data masukan.
Buat tabel masukan dan masukkan data sampel:
-- Buat tabel dasar dan tabel kueri
CREATE TABLE doc_table_float_smoke(pk STRING, vector STRING) PARTITIONED BY (pt STRING);
CREATE TABLE query_table_float_smoke(pk STRING, vector STRING) PARTITIONED BY (pt STRING);
-- Masukkan data ke tabel dasar
ALTER TABLE doc_table_float_smoke ADD PARTITION(pt='20230116');
INSERT OVERWRITE TABLE doc_table_float_smoke PARTITION (pt='20230116') VALUES
('1.nid','1~1~1~1~1~1~1~1'),
('2.nid','2~2~2~2~2~2~2~2'),
('3.nid','3~3~3~3~3~3~3~3'),
('4.nid','4~4~4~4~4~4~4~4'),
('5.nid','5~5~5~5~5~5~5~5'),
('6.nid','6~6~6~6~6~6~6~6'),
('7.nid','7~7~7~7~7~7~7~7'),
('8.nid','8~8~8~8~8~8~8~8'),
('9.nid','9~9~9~9~9~9~9~9'),
('10.nid','10~10~10~10~10~10~10~10');
-- Masukkan data ke tabel kueri
ALTER TABLE query_table_float_smoke ADD PARTITION(pt='20230116');
INSERT OVERWRITE TABLE query_table_float_smoke PARTITION (pt='20230116') VALUES
('q1.nid','1~1~1~1~2~2~2~2'),
('q2.nid','4~4~4~4~3~3~3~3'),
('q3.nid','9~9~9~9~5~5~5~5');
Langkah 3: Kirimkan tugas Proxima CE.
jar -libjars proxima-ce-aliyun-1.0.0.jar
-classpath proxima-ce-aliyun-1.0.0.jar com.alibaba.proxima2.ce.ProximaCERunner
-doc_table doc_table_float_smoke
-doc_table_partition 20230116
-query_table query_table_float_smoke
-query_table_partition 20230116
-output_table output_table_float_smoke
-output_table_partition 20230116
-data_type float
-dimension 8
-topk 1
-job_mode train:build:seek:recall
-external_volume shanghai_vol_ceshi
-owner_id 1248953xxx
;
Langkah 4: Verifikasi hasil.
Kueri tabel output untuk memeriksa hasil tetangga terdekat:
SELECT * FROM output_table_float_smoke WHERE pt='20230116';
Output yang diharapkan:
+------------+------------+------------+------------+
| pk | knn_result | score | pt |
+------------+------------+------------+------------+
| q1.nid | 2.nid | 4.0 | 20230116 |
| q1.nid | 1.nid | 4.0 | 20230116 |
| q1.nid | 3.nid | 20.0 | 20230116 |
| q2.nid | 4.nid | 4.0 | 20230116 |
| q2.nid | 3.nid | 4.0 | 20230116 |
| q2.nid | 2.nid | 20.0 | 20230116 |
| q3.nid | 7.nid | 32.0 | 20230116 |
| q3.nid | 8.nid | 40.0 | 20230116 |
| q3.nid | 6.nid | 40.0 | 20230116 |
+------------+------------+------------+------------+
Lanjutan
-
Operasi external volume — buat, daftar, dan kelola external volume
-
Akses OSS dari Spark on MaxCompute — akses OSS langsung tanpa external volume
-
Izin MaxCompute — kelola izin pengguna untuk volume dan proyek