MaxCompute は、Spark on EMR、StarRocks、Presto、PAI、Hologres などのサードパーティエンジンをサポートしており、Java SDK を使用して Storage API 経由で MaxCompute データに直接アクセスできます。このトピックでは、Java SDK を使用して MaxCompute にアクセスするためのコード例を紹介します。
概要
Java SDK で MaxCompute にアクセスするための主要なインターフェイスを以下に示します。
メインインターフェイス | 説明 |
MaxCompute テーブル読み取りセッションを作成するために使用されます。 | |
MaxCompute テーブルからデータを読み取るためのセッションを表します。 | |
データセッションに含まれるデータセグメントを読み取るために使用されます。 |
Maven ユーザーの場合は、Maven リポジトリ で odps-sdk-table-api を検索して、さまざまなバージョンの Java SDK を入手してください。構成の詳細は次のとおりです。
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-sdk-table-api</artifactId>
<version>0.48.8-public</version>
</dependency>MaxCompute はストレージ API を提供しています。詳細については、「odps-sdk-table-api」をご参照ください。
TableReadSessionBuilder
このインターフェイスは、MaxCompute テーブルの読み取りセッションを作成するために設計されています。主なインターフェイス定義は次のとおりです。詳細については、Java-sdk-doc をご参照ください。
インターフェイス定義
public class TableReadSessionBuilder {
public TableReadSessionBuilder table(Table table);
public TableReadSessionBuilder identifier(TableIdentifier identifier);
public TableReadSessionBuilder requiredDataColumns(List<String> requiredDataColumns);
public TableReadSessionBuilder requiredPartitionColumns(List<String> requiredPartitionColumns);
public TableReadSessionBuilder requiredPartitions(List<PartitionSpec> requiredPartitions);
public TableReadSessionBuilder requiredBucketIds(List<Integer> requiredBucketIds);
public TableReadSessionBuilder withSplitOptions(SplitOptions splitOptions);
public TableReadSessionBuilder withArrowOptions(ArrowOptions arrowOptions);
public TableReadSessionBuilder withFilterPredicate(Predicate filterPredicate);
public TableReadSessionBuilder withSettings(EnvironmentSettings settings);
public TableReadSessionBuilder withSessionId(String sessionId);
public TableBatchReadSession buildBatchReadSession();
}使用上の注意
メソッド名 | 説明 |
| 渡されたパラメーター Table を現在のセッションのターゲットテーブルとして定義します。 |
| 渡されたパラメーター TableIdentifier を現在のセッションのターゲットテーブルとして定義します。 |
| 指定されたフィールドのデータを読み取り、返されるデータのフィールドの順序がパラメーター 説明 パラメーター |
| 指定されたテーブルの指定されたパーティションの指定された列からデータを読み取ります。これは、パーティションプルーニングが実行されるシナリオに適用できます。 説明 パラメーター |
| 指定されたテーブルの指定されたパーティションのデータを読み取ります。これは、パーティションのトリミングシナリオに適用できます。 説明 パラメーター |
| 指定されたバケットのデータを読み取ります。これは、クラスタ化されたテーブルに対してのみ有効であり、バケットのトリミングシナリオに適用できます。 説明 パラメーター |
| テーブルデータを分割します。 SplitOptions オブジェクトは次のように定義されます。
使用例 |
| Arrow データオプションを指定します。
使用例 |
| Predicate Pushdown オプションを指定します。 Predicate は次のように定義されます。 使用例 |
| ランタイム環境情報を指定します。 EnvironmentSettings インターフェイスは次のように定義されます。
|
| 既存のセッションを再読み込みするための SessionID 情報を指定します。 |
| テーブル読み取りセッションを作成または取得します。入力パラメーター SessionID が指定されている場合、SessionID に基づいて作成されたセッションが返されます。入力パラメーターが指定されていない場合は、新しいテーブル読み取りセッションが作成されます。 説明 作成操作には大きなオーバーヘッドがあります。ファイルが多い場合、完了までに時間がかかります。 |
TableBatchReadSession
TableBatchReadSession インターフェイスは、MaxCompute テーブルから読み取るためのセッションを表します。主なインターフェイス定義は次のとおりです。
インターフェイス定義
public interface TableBatchReadSession {
String getId();
TableIdentifier getTableIdentifier();
SessionStatus getStatus();
DataSchema readSchema();
InputSplitAssigner getInputSplitAssigner() throws IOException;
SplitReader<ArrayRecord> createRecordReader(InputSplit split, ReaderOptions options) throws IOException;
SplitReader<VectorSchemaRoot> createArrowReader(InputSplit split, ReaderOptions options) throws IOException;
}使用上の注意
メソッド名 | 説明 |
| 現在のセッション ID を取得します。セッション ID を読み取るためのデフォルトのタイムアウトは 24 時間です。 |
| 現在のセッションのテーブル名を取得します。 |
| 現在のセッションステータスを取得します。ステータス値は次のとおりです。
|
| 現在のセッションのテーブル構造情報を取得します。 DataSchema は次のように定義されます。
|
| 現在のセッションの InputSplitAssigner を取得します。 InputSplitAssigner インターフェイスは、現在の読み取りセッションで InputSplit インスタンスを割り当てるためのメソッドを定義します。各 InputSplit は、単一の SplitReader で処理できるデータセグメントを表します。 InputSplitAssigner は次のように定義されます。
|
|
|
|
|
SplitReader
SplitReader インターフェイスは、MaxCompute テーブルからデータを読み取るために使用されます。
インターフェイス定義
public interface SplitReader<T> {
boolean hasNext() throws IOException;
T get();
Metrics currentMetricsValues();
void close() throws IOException;
}使用上の注意
メソッド名 | 説明 |
| 読み取るデータ項目がさらに存在するかどうかを確認します。次のデータ項目が存在する場合は true を返します。それ以外の場合は false を返します。 |
| 現在のデータ項目を取得します。このメソッドを呼び出す前に、 |
| SplitReader に関連するメトリックを取得します。 |
| 読み取りが完了したら、接続を閉じます。 |
使用例
MaxCompute サービスに接続するように環境を設定します。
// Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID と AccessKey Secret // Alibaba Cloud アカウントの AccessKey ペアは、すべての API 操作に対する権限を持っています。これらの資格情報を使用して操作を実行することは、リスクの高い操作です。 RAM ユーザーを使用して API 操作を呼び出したり、日常的な O&M を実行することをお勧めします。 RAM ユーザーを作成するには、RAM コンソールにログインします // この例では、AccessKey ID と AccessKey secret は環境変数に格納されています。ビジネス要件に基づいて、構成ファイルに AccessKey ペアを保存することもできます // AccessKey ペアの漏洩を防ぐため、コードに AccessKey ID と AccessKey secret を直接指定しないことをお勧めします private static String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"); private static String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"); //MaxCompute にアクセスするために使用されるクォータ名 String quotaName = "<quotaName>"; //MaxCompute プロジェクト名 String project = "<project>"; //MaxCompute サービスに接続するための Odps オブジェクトを作成します Account account = new AliyunAccount(accessId, accessKey); Odps odps = new Odps(account); odps.setDefaultProject(project); //MaxCompute サービスの接続アドレス。 Alibaba Cloud VPC ネットワークのみがサポートされています odps.setEndpoint(endpoint); Credentials credentials = Credentials.newBuilder().withAccount(odps.getAccount()).withAppAccount(odps.getAppAccount()).build(); EnvironmentSettings settings = EnvironmentSettings.newBuilder().withCredentials(credentials).withServiceEndpoint(odps.getEndpoint()).withQuotaName(quotaName).build();説明専用の Data Transmission Service リソースグループ(サブスクリプション)リソースのクォータ名を取得するには、次の手順に従います。
専用の Data Transmission Service リソースグループ:MaxCompute コンソール にログインします。次に、左上隅でリージョンを切り替え、左側のナビゲーションウィンドウで [ワークエリア] > [クォータ] を選択して、使用可能なクォータのリストを表示します。詳細な手順については、「MaxCompute コンソールで計算リソースのクォータを管理する」をご参照ください。
ストレージ API:MaxCompute コンソール にログインし、左側のナビゲーションウィンドウで [テナント管理] > [テナントプロパティ] を選択し、[オープンストレージ (Storage API) スイッチ] を有効にします。
ジョブレベルのクォータ承認の場合、デフォルトでは、Alibaba Cloud アカウントとロールを含むすべてのアカウントに権限がありません。承認が必要です。承認の詳細については、「承認」をご参照ください。
テーブル読み取り操作を実行します。
MaxCompute データにアクセスするためのデータ読み取りセッションを作成します。
//MaxCompute プロジェクトに対応するテーブル名 String tableName = "<table.name>"; //テーブルデータ読み取りセッションを作成します TableReadSessionBuilder scanBuilder = new TableReadSessionBuilder(); TableBatchReadSession scan = scanBuilder.identifier(TableIdentifier.of(project, tableName)).withSettings(settings) .withSplitOptions(SplitOptions.newBuilder() .SplitByByteSize(256 * 1024L * 1024L) .withCrossPartition(false).build()) .requiredDataColumns(Arrays.asList("timestamp")) .requiredPartitionColumns(Arrays.asList("pt1")) .buildBatchReadSession();説明データ量が多い、ネットワーク遅延がある、または不安定なシナリオでは、データ読み取りセッションの作成に時間がかかり、セッション作成の非同期プロセスに自動的に切り替わる可能性があります。
Arrow リーダーを使用して各セグメントの MaxCompute データをトラバースし、各セグメントのデータコンテンツを順番に読み取って出力します。
//すべての入力セグメントデータをトラバースし、Arrow リーダーを使用して各セグメントのデータバッチを 1 つずつ読み取り、最後に各バッチデータのコンテンツを出力します InputSplitAssigner assigner = scan.getInputSplitAssigner(); for (InputSplit split : assigner.getAllSplits()) { SplitReader<VectorSchemaRoot> reader = scan.createArrowReader(split, ReaderOptions.newBuilder() .withSettings(settings) .withCompressionCodec(CompressionCodec.ZSTD) .withReuseBatch(true) .build()); int rowCount = 0; List<VectorSchemaRoot> batchList = new ArrayList<>(); while (reader.hasNext()) { VectorSchemaRoot data = reader.get(); rowCount += data.getRowCount(); System.out.println(data.contentToTSVString()); } reader.close(); }
参照
MaxCompute Storage API の概要については、「ストレージ API の概要」をご参照ください。