外部ボリュームは、Object Storage Service (OSS) をバックエンドとする MaxCompute 上の分散ファイルシステムとして機能します。外部ボリュームを OSS ディレクトリにマウントすると、MaxCompute 上の Spark ジョブおよび MapReduce ジョブは、各ユーザーに直接 OSS へのアクセス権限を付与することなく、MaxCompute の権限システムを通じてファイルの読み取りおよび書き込みが可能になります。
各 MaxCompute プロジェクトには、複数の外部ボリュームを設定できます。
ユースケース
外部ボリュームは、以下のような用途で有効です。
-
起動時にジョブ依存関係を読み込む — 実行開始前に、JAR ファイル、Python wheels、またはモデルアーカイブを自動的にジョブの作業ディレクトリにダウンロード
-
Spark コード内で OSS ファイルを読み書きする — Spark ジョブコード内から
odps://パススキームを直接使用して、OSS に格納されたファイルにアクセス -
細かい粒度での権限制御を適用する — OSS バケットポリシーをユーザー単位で管理する代わりに、MaxCompute の権限システムを用いて、特定のボリュームパスに対する読み取り・書き込み権限を制御
-
ML ジョブの出力を保存する — Proxima CE などのエンジンによって生成されたインデックスデータやモデルファイルを、ボリューム経由で OSS に保存
課金
外部ボリューム内のデータは OSS に格納されます。MaxCompute 内でのストレージに対しては課金されません。ただし、MaxCompute エンジンが外部ボリューム内のデータを読み取ったり処理したりする際(例:MaxCompute 上の Spark ジョブまたは MapReduce ジョブの実行時)には、コンピューティング料金が発生します。また、Proxima CE などによる出力結果を OSS へ書き戻す場合(例:インデックスデータ)は、OSS の標準ストレージ料金が適用されます。
前提条件
開始する前に、以下の条件を満たしていることを確認してください。
-
外部ボリュームのトライアル利用を申請し、承認されました。詳細については、「新機能のトライアル利用を申請する」をご参照ください。
-
MaxCompute クライアント(odpscmd)バージョン V0.43.0 以降がインストール済みです。「MaxCompute クライアント(odpscmd)」をご参照ください。Java SDK を使用する場合は、バージョン V0.43.0 以降が必要です。「バージョン更新情報」をご参照ください。
-
OSS バケットが作成済みです。「バケットの作成」をご参照ください。
-
OSS へのアクセスが承認された MaxCompute プロジェクトについては、「OSS アクセス メソッドを設定する」をご参照ください。
クイックスタート
ステップ 1:必要な権限を付与
外部ボリュームを使用するには、アカウントに以下の権限が必要です:CreateInstance、CreateVolume、List、Read、および Write。「MaxCompute の権限」をご参照ください。
-
アカウントが
CreateVolume権限を持っているか確認します。SHOW GRANTS FOR <user_name>; -
CreateVolume権限がない場合は、以下のコマンドで付与します。GRANT CreateVolume ON project <project_name> TO USER <user_name>;後で権限を取り消す場合は、以下のコマンドを実行します。
REVOKE CreateVolume ON project <project_name> FROM USER <user_name>; -
権限が正しく付与されたか確認するため、再度
SHOW GRANTSを実行します。
ステップ 2:外部ボリュームの作成
CreateVolume 権限を持つアカウントで、以下のコマンドを実行します。
vfs -create <volume_name>
-storage_provider oss
-url oss://<oss_endpoint>/<bucket>/<path>
-acd <true|false>
-role_arn <arn:aliyun:xxx/aliyunodpsdefaultrole>
パラメーターの詳細およびその他のボリューム操作については、「外部ボリューム操作」をご参照ください。
作成後、ボリュームのパスは odps://[project_name]/[volume_name] となります。このパスを MaxCompute 上の Spark ジョブおよび MapReduce ジョブで使用します。
ステップ 3:ボリュームの検証
現在のプロジェクトに存在するすべてのボリュームを一覧表示し、ボリュームが正しく作成されたか確認します。
vfs -ls /;
外部ボリュームを用いた MaxCompute 上の Spark の利用
MaxCompute 上の Spark はオープンソースの Spark と互換性があり、MaxCompute の統合コンピューティングリソース、データセット、および権限システム上で実行されます。
Spark ジョブから外部ボリュームにアクセスする方法は、以下の 2 通りあります。
-
ジョブ起動時にファイルを参照する — ボリューム内のファイルをジョブ起動前に作業ディレクトリにダウンロード
-
コード内でファイルにアクセスする — Spark コード内で
odps://パススキームを直接使用して、実行時にボリューム内のファイルを読み書き
ジョブ起動時にファイルを参照する
以下のパラメーターを DataWorks ODPS Spark ノードの パラメーター セクション、または spark-defaults.conf ファイルに設定します。これらのパラメーターはジョブコード内では設定できません。
| パラメーター | 説明 |
|---|---|
spark.hadoop.odps.cupid.volume.files |
起動前にジョブの作業ディレクトリにダウンロードするファイル。複数指定する場合はカンマで区切ります。各値にはファイル名を含む完全なパスを指定する必要があります。 |
spark.hadoop.odps.cupid.volume.archives |
起動前にジョブの作業ディレクトリにダウンロード・解凍するアーカイブファイル(.zip、.tar.gz、.tar)。複数指定する場合はカンマで区切ります。 |
値の形式:
odps://[project_name]/[volume_name]/[path_to_file]
例 — ファイル:
spark.hadoop.odps.cupid.volume.files=
odps://mc_project/external_volume/data/mllib/kmeans_data.txt,
odps://mc_project/external_volume/target/PythonKMeansExample/KMeansModel/data/part-00000-a2d44ac5-54f6-49fd-b793-f11e6a189f90-c000.snappy.parquet
ジョブが開始されると、作業ディレクトリには kmeans_data.txt および part-00000-a2d44ac5-54f6-49fd-b793-f11e6a189f90-c000.snappy.parquet が配置されます。
例 — アーカイブ:
spark.hadoop.odps.cupid.volume.archives=
odps://spark_test_wj2/external_volume/pyspark-3.1.1.zip,
odps://spark_test_wj2/external_volume/python-3.7.9-ucs4.tar.gz
ジョブが開始されると、作業ディレクトリには pyspark-3.1.1.zip および python-3.7.9-ucs4.tar.gz の解凍内容が配置されます。
コード内でファイルにアクセスする
Spark ジョブコードから外部ボリュームのファイルを読み書きするには、コード内で以下のパラメーターを設定します。
| パラメーター | 値 | 説明 |
|---|---|---|
spark.hadoop.odps.volume.common.filesystem |
true |
外部ボリュームの認識を有効化します。デフォルト値は false です。 |
spark.hadoop.odps.cupid.volume.paths |
odps://[project_name]/[volume_name]/ |
アクセス対象のボリュームパス。デフォルト値は空です。 |
spark.hadoop.fs.odps.impl |
org.apache.hadoop.fs.aliyun.volume.OdpsVolumeFileSystem |
OSS アクセスの実装クラス。 |
spark.hadoop.fs.AbstractFileSystem.odps.impl |
org.apache.hadoop.fs.aliyun.volume.abstractfsimpl.OdpsVolumeFs |
抽象ファイルシステムの実装クラス。 |
例 — 外部ボリュームを用いた K-means クラスタリング:
以下の例では K-means アルゴリズムを使用しています。トレーニングデータを odps://ms_proj1_dev/volume_yyy1/ から読み込み、モデルを学習した後、出力を同じボリュームに保存します。
コード内のすべてのファイルパスは、外部ボリュームからの読み取りおよび書き込みに odps:// スキームを使用します。
このコードを実行する前に、上記の 4 つのパラメーターを spark-defaults.conf または DataWorks ODPS Spark ノードの パラメーター セクションに設定してください。また、この例では OSS アクセス、JindoFS SDK、および Python ランタイムのための追加パラメーターも必要です。
-- パラメーター
spark.hadoop.odps.cupid.volume.paths=odps://ms_proj1_dev/volume_yyy1/
spark.hadoop.odps.volume.common.filesystem=true
spark.hadoop.fs.odps.impl=org.apache.hadoop.fs.aliyun.volume.OdpsVolumeFileSystem
spark.hadoop.fs.AbstractFileSystem.odps.impl=org.apache.hadoop.fs.aliyun.volume.abstractfsimpl.OdpsVolumeFs
spark.hadoop.odps.access.id=xxxxxxxxx
spark.hadoop.odps.access.key=xxxxxxxxx
spark.hadoop.fs.oss.endpoint=oss-cn-beijing-internal.aliyuncs.com
spark.hadoop.odps.cupid.resources=ms_proj1_dev.jindofs-sdk-3.8.0.jar
spark.hadoop.fs.oss.impl=com.aliyun.emr.fs.oss.JindoOssFileSystem
spark.hadoop.odps.cupid.resources=public.python-2.7.13-ucs4.tar.gz
spark.pyspark.python=./public.python-2.7.13-ucs4.tar.gz/python-2.7.13-ucs4/bin/python
spark.hadoop.odps.spark.version=spark-2.4.5-odps0.34.0
-- コードfrom numpy import array
from math import sqrt
from pyspark import SparkContext
from pyspark.mllib.clustering import KMeans, KMeansModel
if __name__ == "__main__":
sc = SparkContext(appName="KMeansExample")
# 外部ボリュームからトレーニングデータを読み込む
data = sc.textFile("odps://ms_proj1_dev/volume_yyy1/kmeans_data.txt")
parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))
# K-means モデルを学習
clusters = KMeans.train(parsedData, 2, maxIterations=10, initializationMode="random")
# モデルの評価
def error(point):
center = clusters.centers[clusters.predict(point)]
return sqrt(sum([x**2 for x in (point - center)]))
WSSSE = parsedData.map(lambda point: error(point)).reduce(lambda x, y: x + y)
print("Within Set Sum of Squared Error = " + str(WSSSE))
# モデルを外部ボリュームに保存
clusters.save(sc, "odps://ms_proj1_dev/volume_yyy1/target/PythonKMeansExample/KMeansModel")
print(parsedData.map(lambda feature: clusters.predict(feature)).collect())
# 保存済みモデルを読み込んで使用
sameModel = KMeansModel.load(sc, "odps://ms_proj1_dev/volume_yyy1/target/PythonKMeansExample/KMeansModel")
print(parsedData.map(lambda feature: sameModel.predict(feature)).collect())
sc.stop()
ジョブが完了すると、ボリュームにマッピングされた OSS ディレクトリ内に生成された出力ファイルを確認できます。
外部ボリュームファイルと内部テーブル間のデータのインポートおよびエクスポート
外部ボリュームから MaxCompute テーブルまたはパーティションへデータファイルをインポートするには、LOAD コマンドを使用します。逆に、構造化データを OSS に格納するには、MaxCompute プロジェクトから OSS へデータファイルをエクスポートするために、UNLOAD コマンドを使用します。
LOAD
-
構文:
{load overwrite|into} table <table_name> [partition (<pt_spec>)] from location <Volume_location> stored by <StorageHandler> [with serdeproperties (<Options>)];Volume_location は指定された外部ボリュームのパスです。形式は
odps://[project_name]/[volume_name]/です。project_name は MaxCompute プロジェクト名、volume_name は外部ボリューム名です。その他のパラメーターの説明については、「LOAD」をご参照ください。重要odps.properties.rolearnパラメーターをwith serdepropertiesリストに設定し、RoleArn 認証情報を提供する必要があります。この RoleArn 情報は、外部ボリュームのものとは異なるものでも構いませんが、対象 OSS フォルダーへのアクセス権限を有している必要があります。詳細については、「ORC 外部テーブル」をご参照ください。 -
例:
この例では、外部ボリュームにマッピングされた OSS パスには、共通のスキーマを持つ複数の CSV ファイルが含まれています。外部ボリューム
volume_externalを作成済みとします。以下のステートメントを実行することで、ファイルを MaxCompute の内部テーブルにインポートできます。-- 結果テーブル ambulance_data_csv_load を作成 create table ambulance_data_csv_load ( vehicleId INT, recordId INT, patientId INT, calls INT, locationLatitute DOUBLE, locationLongtitue DOUBLE, recordTime STRING, direction STRING ); -- LOAD コマンドを実行 load overwrite table ambulance_data_csv_load from location 'odps://<project_name>/volume_external/' stored by 'com.aliyun.odps.CsvStorageHandler' with serdeproperties ( 'odps.properties.rolearn'='acs:ram::xxxxx:role/aliyunodpsdefaultrole', 'odps.text.option.delimiter'=',' ); -- クエリ実行 SELECT * from ambulance_data_csv_load; -- 例の結果 vehicleid recordid patientid calls locationlatitute locationlongtitue recordtime direction 1 1 51 1 46.81006 -92.08174 9/14/2014 0:00 S 1 2 13 1 46.81006 -92.08174 9/14/2014 0:01 NE 1 3 48 1 46.81006 -92.08174 9/14/2014 0:02 NE 1 4 30 1 46.81006 -92.08174 9/14/2014 0:03 W 1 5 47 1 46.81006 -92.08174 9/14/2014 0:04 S 1 6 9 1 46.81006 -92.08174 9/14/2014 0:05 S 1 7 53 1 46.81006 -92.08174 9/14/2014 0:06 N 1 8 63 1 46.81006 -92.08174 9/14/2014 0:07 SW 1 9 4 1 46.81006 -92.08174 9/14/2014 0:08 NE 1 10 31 1 46.81006 -92.08174 9/14/2014 0:09 N
UNLOAD
-
構文:
unload from {<select_statement>|<table_name> [partition (<pt_spec>)]} into location <Volume_location> [stored by <StorageHandler>] [with serdeproperties ('<property_name>'='<property_value>',...)];Volume_location は指定された外部ボリュームのパスです。形式は
odps://[project_name]/[volume_name]/です。project_name は MaxCompute プロジェクト名、volume_name は外部ボリューム名です。その他のパラメーターの説明については、「UNLOAD」をご参照ください。 -
例:
この例では、MaxCompute の内部テーブルから、外部ボリュームにマッピングされた OSS パスへデータをエクスポートする方法を示します。外部ボリューム
volume_external_unloadを作成済みとします。以下のステートメントを実行することで、テーブルデータを外部ボリュームへエクスポートできます。-- エクスポートされるファイル数を制御:MaxCompute テーブルデータを 1 つのワーカーが読み取るサイズ(MB 単位)を設定します。MaxCompute テーブルは圧縮されているため、OSS に出力されるデータは通常約 4 倍のサイズになります。 set odps.stage.mapper.split.size=256; -- データをエクスポート unload from (select * from ambulance_data_csv_load) into location 'odps://project_name/volume_external_unload' stored by 'com.aliyun.odps.CsvStorageHandler' with serdeproperties ( 'odps.text.option.delimiter'=','); -- これは以下のステートメントと同等です。 set odps.stage.mapper.split.size=256; unload from ambulance_data_csv_load into location 'odps://project_name/volume_external_unload' stored by 'com.aliyun.odps.CsvStorageHandler' with serdeproperties ( 'odps.properties.rolearn'='acs:ram::139xxx:role/aliyunodpsdefaultrole', 'odps.text.option.delimiter'=',');例の結果:外部ボリュームにマッピングされた OSS フォルダー内にデータファイルが生成されます。
外部ボリュームファイルから外部テーブルを作成する
外部ボリュームにマッピングされた OSS パスに、CSV、PARQUET、CRC などの共通スキーマを持つ半構造化ファイルが含まれている場合、以下のコマンドを実行して外部テーブルを作成できます。外部テーブル作成の構文の詳細については、「ORC 外部テーブル」をご参照ください。
-
構文:
create external table [if not exists] <mc_oss_extable_name> ( <col_name> <data_type>, ... ) [partitioned by (<col_name> <data_type>, ...)] stored by '<StorageHandler>' with serdeproperties ( ['<property_name>'='<property_value>',...] ) location '<Volume_location>';Volume_location は指定された外部ボリュームのパスです。形式は
odps://[project_name]/[volume_name]/です。project_name は MaxCompute プロジェクト名、volume_name は外部ボリューム名です。その他のパラメーターの説明については、「ORC 外部テーブル」をご参照ください。説明odps.properties.rolearnパラメーターをwith serdepropertiesリストに設定し、RoleArn 認証情報を提供する必要があります。この RoleArn 情報は、外部ボリュームのものとは異なるものでも構いませんが、対象 OSS フォルダーへのアクセス権限を有している必要があります。詳細については、「ORC 外部テーブル」をご参照ください。 -
例:
この例では、外部ボリュームにマッピングされた OSS パスには、共通のスキーマを持つ複数の CSV ファイルが含まれています。外部ボリューム
demo_volume3を作成済みとします。以下のステートメントを実行することで、外部テーブルを作成・クエリできます。create external table ext_tbl_onvolume ( col1 string, col2 string, col3 string ) stored by 'com.aliyun.odps.CsvStorageHandler' with serdeproperties ( 'odps.properties.rolearn'='acs:ram::1248xxx:role/aliyunodpsdefaultrole' ) location 'odps://project_name/demo_volume3/'; -- 外部テーブルをクエリ SELECT * from ext_tbl_onvolume;
MaxCompute における Proxima CE を用いたベクトル化
Proxima CE は、MaxCompute テーブルに格納されたデータに対してベクトルインデックス作成および近傍探索を実行します。結果は OSS の外部ボリュームに保存されます。
制限事項
-
Proxima の Java SDK は Linux および macOS のみをサポートします。JAR ファイルには Linux 固有の依存関係が含まれており、Windows 上の MaxCompute クライアントでは実行できません。
-
Proxima CE は「ローカルタスク(SQL、MapReduce、Graph を含まないタスク)」と「MaxCompute タスク(SQL、MapReduce、Graph エンジンを介して実行されるタスク)」の 2 種類のタスクを実行します。これらは交互に実行されます。起動時に Proxima CE はローカルマシン上で Proxima カーネルの読み込みを試みます。カーネルの読み込みに成功した場合、一部のモジュールがローカルで実行されます。読み込みに失敗した場合、エラーが報告されますが、代替機能を用いてジョブは継続して実行されます。
-
タスクは MaxCompute クライアント(odpscmd)を用いて送信してください。DataWorks の MapReduce ノードはサポートされていません。これは、基盤となる MaxCompute クライアントのバージョンがアップグレード中であるためです。
Proxima CE ベクトル化タスクの実行
ステップ 1:Proxima CE リソースパッケージのインストール
ステップ 2:入力データの準備
入力テーブルを作成し、サンプルデータを挿入します。
-- ベーステーブルとクエリテーブルを作成
CREATE TABLE doc_table_float_smoke(pk STRING, vector STRING) PARTITIONED BY (pt STRING);
CREATE TABLE query_table_float_smoke(pk STRING, vector STRING) PARTITIONED BY (pt STRING);
-- ベーステーブルにデータを挿入
ALTER TABLE doc_table_float_smoke ADD PARTITION(pt='20230116');
INSERT OVERWRITE TABLE doc_table_float_smoke PARTITION (pt='20230116') VALUES
('1.nid','1~1~1~1~1~1~1~1'),
('2.nid','2~2~2~2~2~2~2~2'),
('3.nid','3~3~3~3~3~3~3~3'),
('4.nid','4~4~4~4~4~4~4~4'),
('5.nid','5~5~5~5~5~5~5~5'),
('6.nid','6~6~6~6~6~6~6~6'),
('7.nid','7~7~7~7~7~7~7~7'),
('8.nid','8~8~8~8~8~8~8~8'),
('9.nid','9~9~9~9~9~9~9~9'),
('10.nid','10~10~10~10~10~10~10~10');
-- クエリテーブルにデータを挿入
ALTER TABLE query_table_float_smoke ADD PARTITION(pt='20230116');
INSERT OVERWRITE TABLE query_table_float_smoke PARTITION (pt='20230116') VALUES
('q1.nid','1~1~1~1~2~2~2~2'),
('q2.nid','4~4~4~4~3~3~3~3'),
('q3.nid','9~9~9~9~5~5~5~5');
ステップ 3:Proxima CE タスクの送信
jar -libjars proxima-ce-aliyun-1.0.0.jar
-classpath proxima-ce-aliyun-1.0.0.jar com.alibaba.proxima2.ce.ProximaCERunner
-doc_table doc_table_float_smoke
-doc_table_partition 20230116
-query_table query_table_float_smoke
-query_table_partition 20230116
-output_table output_table_float_smoke
-output_table_partition 20230116
-data_type float
-dimension 8
-topk 1
-job_mode train:build:seek:recall
-external_volume shanghai_vol_ceshi
-owner_id 1248953xxx
;
ステップ 4:結果の検証
出力テーブルをクエリして、近傍探索の結果を確認します。
SELECT * FROM output_table_float_smoke WHERE pt='20230116';
期待される出力:
+------------+------------+------------+------------+
| pk | knn_result | score | pt |
+------------+------------+------------+------------+
| q1.nid | 2.nid | 4.0 | 20230116 |
| q1.nid | 1.nid | 4.0 | 20230116 |
| q1.nid | 3.nid | 20.0 | 20230116 |
| q2.nid | 4.nid | 4.0 | 20230116 |
| q2.nid | 3.nid | 4.0 | 20230116 |
| q2.nid | 2.nid | 20.0 | 20230116 |
| q3.nid | 7.nid | 32.0 | 20230116 |
| q3.nid | 8.nid | 40.0 | 20230116 |
| q3.nid | 6.nid | 40.0 | 20230116 |
+------------+------------+------------+------------+
次のステップ
-
外部ボリューム操作 — 外部ボリュームの作成、一覧表示、および管理
-
MaxCompute 上の Spark から OSS にアクセスする — 外部ボリュームを使用せずに OSS へ直接アクセス
-
MaxCompute の権限 — ボリュームおよびプロジェクトに対するユーザー権限の管理