MaxCompute のオープンストレージにより、Spark はコネクタを使用して Storage API を呼び出し、MaxCompute からデータを直接読み取ることができます。これにより、データの読み取りプロセスが簡素化され、アクセスパフォーマンスが向上します。MaxCompute のデータストレージ機能と統合することで、Spark は効率的で柔軟、かつ強力なデータ処理と分析を提供します。
注意事項
サードパーティエンジンが MaxCompute にアクセスする場合:
標準テーブル、パーティションテーブル、クラスター化テーブル、Delta テーブル、マテリアライズドビューを読み取ることができます。
MaxCompute の外部テーブルや論理ビューは読み取れません。
JSON データ型の読み取りはサポートされていません。
操作手順
Spark 開発者環境をデプロイします。
バージョン
Spark 3.2.x - Spark 3.5.xの Spark パッケージを使用します。Spark をクリックしてパッケージをダウンロードし、ローカルフォルダに展開できます。Linux オペレーティングシステムで環境をセットアップするには、「Linux 開発者環境のセットアップ」をご参照ください。
Windows オペレーティングシステムで環境をセットアップするには、「Windows 開発者環境のセットアップ」をご参照ください。
Spark コネクタをダウンロードしてコンパイルします。(Spark 3.2.x から 3.5.x のみがサポートされています。このトピックでは、Spark 3.3.1 を例として使用します。)
git cloneコマンドを実行して、Spark コネクタのインストールパッケージをダウンロードします。環境に Git がインストールされていることを確認してください。インストールされていない場合、コマンドの実行時にエラーが発生します。## Spark コネクタのダウンロード: git clone https://github.com/aliyun/aliyun-maxcompute-data-collectors.git ## spark-connector フォルダへの切り替え cd aliyun-maxcompute-data-collectors/spark-connector ## コンパイル mvn clean package ## データソース JAR パッケージの場所 datasource/target/spark-odps-datasource-3.3.1-odps0.43.0.jar ## データソース JAR パッケージを $SPARK_HOME/jars/ フォルダにコピー cp datasource/target/spark-odps-datasource-3.3.1-odps0.43.0.jar $SPARK_HOME/jars/ご利用の MaxCompute アカウントのアクセス情報を設定します。
Spark の
confフォルダに、spark-defaults.confファイルを作成します。cd $SPARK_HOME/conf vim spark-defaults.confspark-defaults.confファイルにアカウント情報を設定します。## spark-defaults.conf でアカウントを設定 spark.hadoop.odps.project.name=doc_test spark.hadoop.odps.access.id=L******************** spark.hadoop.odps.access.key=******************* spark.hadoop.odps.end.point=http://service.cn-beijing.maxcompute.aliyun.com/api spark.hadoop.odps.tunnel.quota.name=ot_xxxx_p#ot_xxxx ## MaxCompute カタログの設定 spark.sql.catalog.odps=org.apache.spark.sql.execution.datasources.v2.odps.OdpsTableCatalog spark.sql.extensions=org.apache.spark.sql.execution.datasources.v2.odps.extension.OdpsExtensionsSpark コネクタを使用して MaxCompute にアクセスします。
Spark の
binフォルダで次のコマンドを実行し、Spark SQL クライアントを起動します。cd $SPARK_HOME/bin spark-sqlMaxCompute プロジェクト内のテーブルをクエリします。
SHOW tables in odps.doc_test;doc_testは MaxCompute プロジェクト名の例です。実際のプロジェクト名に置き換えてください。テーブルを作成します。
CREATE TABLE odps.doc_test.mc_test_table (name STRING, num BIGINT);テーブルからデータを読み取ります。
SELECT * FROM odps.doc_test.mc_test_table;パーティションテーブルを作成します。
CREATE TABLE odps.doc_test.mc_test_table_pt (name STRING, num BIGINT) PARTITIONED BY (pt1 STRING, pt2 STRING);パーティションテーブルからデータを読み取ります。
SELECT * FROM odps.doc_test.mc_test_table_pt;次のコードは結果の例です。
test1 1 2018 0601 test2 2 2018 0601 Time taken: 1.312 seconds, Fetched 2 row(s)テーブルを削除します。
DROP TABLE IF EXISTS odps.doc_test.mc_test_table;