このトピックでは、MaxCompute コネクタの構文、WITH パラメーター、および使用例について説明します。
背景情報
MaxCompute (旧称 ODPS) は、エクサバイト規模のデータに対応する、高速でフルマネージド型のデータウェアハウスソリューションです。大量の構造化データを保存および計算し、データウェアハウスソリューション、分析、モデリングサービスを提供します。
MaxCompute コネクタは、以下の特徴をサポートしています。
カテゴリ | 詳細 |
サポートされている種類 | ソーステーブル、ディメンションテーブル、シンクテーブル |
実行モード | ストリームモードとバッチモード |
データフォーマット | サポートされていません |
特定のモニタリングメトリック | |
API タイプ | DataStream と SQL |
シンクテーブルでのデータ更新または削除のサポート | Batch Tunnel および Stream Tunnel モードはデータの挿入のみをサポートします。Upsert Tunnel モードはデータの挿入、更新、削除をサポートします。 |
前提条件
MaxCompute テーブルを作成済みであること。詳細については、「テーブルの作成」をご参照ください。
制限事項
MaxCompute コネクタは at-least-once セマンティクスのみをサポートします。
説明at-least-once セマンティクスはデータが失われないことを保証します。ただし、まれに重複データが MaxCompute に書き込まれることがあります。この可能性は MaxCompute Tunnel のタイプによって異なります。MaxCompute Tunnel の詳細については、「どのデータトンネルを選択すべきか?」をご参照ください。
デフォルトでは、ソーステーブルは完全データモードです。`partition` パラメーターで指定されたパーティションのみを読み取ります。すべてのデータが読み取られると、ジョブは終了します。コネクタは新しいパーティションを監視しません。
新しいパーティションを継続的に監視するには、WITH 句で `startPartition` パラメーターを指定して、増分ソーステーブルモードを使用できます。
説明ディメンションテーブルは更新のたびに最新のパーティションをチェックするため、この制限の対象外です。
ソーステーブルのジョブが開始された後、ジョブはパーティションに追加された新しいデータを読み取りません。パーティションデータが完全になった後にのみジョブを実行する必要があります。
SQL
MaxCompute コネクタは、SQL ジョブでソーステーブル、ディメンションテーブル、またはシンクテーブルとして使用できます。
構文
CREATE TEMPORARY TABLE odps_source(
id INT,
user_name VARCHAR,
content VARCHAR
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpoint>',
'project' = '<yourProjectName>',
'schemaName' = '<yourSchemaName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'partition' = 'ds=2018****'
);WITH パラメーター
一般
パラメーター | 説明 | データ型 | 必須 | デフォルト値 | 備考 |
connector | テーブルのタイプ。 | String | はい | なし | 値は odps である必要があります。 |
endpoint | MaxCompute サービスのエンドポイント。 | String | はい | なし | 詳細については、「エンドポイント」をご参照ください。 |
tunnelEndpoint | MaxCompute Tunnel サービスのエンドポイント。 | String | いいえ | なし | 詳細については、「エンドポイント」をご参照ください。 説明 このパラメーターを指定しない場合、MaxCompute は内部のロードバランシングサービスに基づいてトンネル接続を割り当てます。 |
project | MaxCompute プロジェクトの名前。 | String | はい | なし | なし。 |
schemaName | MaxCompute スキーマの名前。 | String | いいえ | なし | このパラメーターは、MaxCompute プロジェクトでスキーマ機能が有効になっている場合にのみ必須です。このパラメーターを、MaxCompute テーブルが属するスキーマの名前に設定します。詳細については、「スキーマ操作」をご参照ください。 説明 このパラメーターは VVR 8.0.6 以降でのみサポートされています。 |
tableName | MaxCompute テーブルの名前。 | String | はい | なし | なし。 |
accessId | MaxCompute の AccessKey ID。 | String | はい | なし | 詳細については、「AccessKey ID と AccessKey Secret を表示するにはどうすればよいですか?」をご参照ください。 重要 AccessKey 情報の漏洩を防ぐため、変数を使用して AccessKey の値を指定してください。詳細については、「プロジェクト変数」をご参照ください。 |
accessKey | MaxCompute の AccessKey Secret。 | String | はい | なし | |
partition | MaxCompute パーティションの名前。 | String | いいえ | なし | このパラメーターは、非パーティションテーブルおよび増分ソーステーブルには必須ではありません。 説明 パーティションテーブルの詳細については、「パーティションへの読み書き時に partition パラメーターを指定するにはどうすればよいですか?」をご参照ください。 |
compressAlgorithm | MaxCompute Tunnel で使用される圧縮アルゴリズム。 | String | いいえ | SNAPPY | 有効な値:
|
quotaName | MaxCompute Data Transmission Service の専用リソースのクォータ名。 | String | いいえ | なし | 専用の MaxCompute Data Transmission Service を使用するには、このパラメーターを設定します。 重要
|
ソーステーブル固有
パラメーター | 説明 | データ型 | 必須 | デフォルト値 | 備考 |
maxPartitionCount | 読み取り可能なパーティションの最大数。 | Integer | いいえ | 100 | 読み取るパーティションの数がこの値を超えると、エラー 重要 一度に多くのパーティションを読み取ると、MaxCompute の負荷が増加し、ジョブの起動が遅くなります。`partition` パラメーターが誤って設定されていないか確認してください。多くのパーティションを読み取るには、`maxPartitionCount` の値を手動で増やしてください。 |
useArrow | データの読み取りに Arrow フォーマットを使用するかどうかを指定します。 | Boolean | いいえ | false | Arrow フォーマットを使用すると、MaxCompute Storage API を呼び出すことができます。 重要
|
splitSize | Arrow フォーマットでデータを読み取る際に一度にプルするデータのサイズ。 | MemorySize | いいえ | 256 MB | このパラメーターは VVR 8.0.8 以降でのみサポートされています。 重要 このパラメーターはバッチジョブでのみ有効です。 |
compressCodec | Arrow フォーマットでデータを読み取る際に使用する圧縮アルゴリズム。 | String | いいえ | "" | 有効な値:
圧縮アルゴリズムを指定すると、圧縮なしの場合と比較してスループットが向上します。 重要
|
dynamicLoadBalance | 動的シャーディングを許可するかどうかを指定します。 | Boolean | いいえ | false | 有効な値:
動的シャーディングを許可すると、異なる Flink ノードの処理能力を活用でき、ソーステーブルの全体的な読み取り時間が短縮されます。ただし、異なるノードで読み取られるデータ量にばらつきが生じ、データスキューが発生する可能性があります。 重要
|
増分ソーステーブル固有
増分ソーステーブルは、MaxCompute サーバーに定期的にポーリングしてすべてのパーティション情報を取得することで、新しいパーティションを検出します。コネクタが新しいパーティションを読み取る際、そのパーティションのデータは完全である必要があります。詳細については、「増分 MaxCompute ソーステーブルが書き込み中の新しいパーティションを検出した場合、どうすればよいですか?」をご参照ください。`startPartition` を使用して開始オフセットを指定できます。開始オフセットと辞書順で等しいか、それ以降のパーティションのみが読み取られることに注意してください。たとえば、パーティション year=2023,month=10 は、パーティション year=2023,month=9 よりも辞書順で小さくなります。このタイプのパーティション宣言では、正しい辞書順を確保するために値をゼロでパディングすることができます。例:year=2023,month=09。
パラメーター | 説明 | データ型 | 必須 | デフォルト値 | 備考 |
startPartition | 増分読み取りを開始する MaxCompute パーティション (そのパーティションを含む)。 | String | はい | なし |
説明 `startPartition` パラメーターの詳細については、「増分 MaxCompute ソーステーブルの startPartition パラメーターを設定するにはどうすればよいですか?」をご参照ください。 |
subscribeIntervalInSec | MaxCompute にパーティションリストをポーリングする間隔。 | Integer | いいえ | 30 | 単位は秒です。 |
modifiedTableOperation | 読み取り中にパーティションデータが変更された場合の対処法。 | Enum (NONE, SKIP) | いいえ | NONE | ダウンロードセッションはチェックポイントに保存されます。ジョブがチェックポイントから回復するたびに、そのセッションからの読み取りを再開しようとします。パーティションデータが変更されたためにセッションが利用できない場合、Flink ジョブは再起動ループに入ります。この状況に対処するために、このパラメーターを設定できます:
重要
|
シンクテーブルのパラメーター
パラメーター | 説明 | データ型 | 必須 | デフォルト値 | 備考 |
useStreamTunnel | データのアップロードに MaxCompute Stream Tunnel を使用するかどうかを指定します。 | Boolean | いいえ | false | 有効な値:
説明 データトンネルの選択に関する詳細については、「どのデータトンネルを選択すべきか?」をご参照ください。 |
flushIntervalMs | MaxCompute Tunnel Writer バッファーのフラッシュ間隔。 | Long | いいえ | 30000 (30 秒) | データはまずバッファーに書き込まれます。バッファーがいっぱいになるか、`flushIntervalMs` の間隔に達すると、データは宛先テーブルにバッチで書き込まれます。
単位はミリ秒です。 説明 このパラメーターは `batchSize` と一緒に使用できます。どちらかの条件が満たされるとデータがフラッシュされます。 |
batchSize | MaxCompute Tunnel Writer バッファーのフラッシュサイズ。 | Long | いいえ | 67108864 (64 MB) | 単位はバイトです。 レコードが書き込まれると、データはまず MaxCompute バッファーに保存されます。バッファーが特定のサイズ (`batchSize`) に達すると、バッファー内のデータは宛先の MaxCompute テーブルに書き込まれます。 説明 このパラメーターは `flushIntervalMs` と一緒に使用できます。どちらかの条件が満たされるとデータがフラッシュされます。 |
numFlushThreads | MaxCompute Tunnel Writer バッファーをフラッシュするためのスレッド数。 | Integer | いいえ | 1 | 各 MaxCompute シンクの並列度は、データをフラッシュするために `numFlushThreads` 個のスレッドを作成します。この値が 1 より大きい場合、異なるパーティションのデータを同時にフラッシュでき、フラッシュ効率が向上します。 |
slotNum | MaxCompute Tunnel Writer が使用するスロット数。 | Integer | いいえ | 0 | スロット数の制限については、「Data Transmission Service の概要」をご参照ください。 |
dynamicPartitionLimit | 書き込む動的パーティションの最大数。 | Integer | いいえ | 100 | 2 つのチェックポイント間でシンクテーブルに書き込まれる動的パーティションの数が `dynamicPartitionLimit` を超えると、エラー 重要 一度に多くのパーティションに書き込むと、MaxCompute サービスに負荷がかかり、シンクテーブルのフラッシュやジョブのチェックポイントが遅くなる可能性があります。このエラーが発生した場合は、本当に多くのパーティションに書き込む必要があるか確認してください。必要な場合は、`dynamicPartitionLimit` の値を手動で増やしてください。 |
retryTimes | MaxCompute サーバーへのリクエストの最大再試行回数。 | Integer | いいえ | 3 | セッションの作成、セッションのコミット、またはデータのフラッシュ時に、MaxCompute サービスが一時的に利用できなくなることがあります。システムはこの設定に基づいて再試行します。 |
sleepMillis | 再試行間隔。 | Integer | いいえ | 1000 | 単位はミリ秒です。 |
enableUpsert | データのアップロードに MaxCompute Upsert Tunnel を使用するかどうかを指定します。 | Boolean | いいえ | false | 有効な値:
重要
|
upsertAsyncCommit | Upsert モードでセッションをコミットする際に非同期モードを使用するかどうかを指定します。 | Boolean | いいえ | false | 有効な値:
説明 このパラメーターは VVR 8.0.6 以降でのみサポートされています。 |
upsertCommitTimeoutMs | Upsert モードでセッションをコミットするためのタイムアウト期間。 | Integer | いいえ | 120000 (120 秒) | 単位はミリ秒です。 説明 このパラメーターは VVR 8.0.6 以降でのみサポートされています。 |
sink.operation | Delta Lake テーブルに書き込む際の書き込みモード。 | String | いいえ | insert | 有効な値:
説明 このパラメーターは VVR 8.0.10 以降でのみサポートされています。 |
sink.parallelism | Delta Lake テーブルへの書き込みの並列度。 | Integer | いいえ | なし |
重要 Delta Lake テーブルの `write.bucket.num` プロパティがこの設定値の整数倍であることを確認してください。これにより、最高の書き込みパフォーマンスが提供され、シンクノードのメモリを最も節約できます。 |
sink.file-cached.enable | Delta Lake テーブルの動的パーティションに書き込む際にファイルキャッシュモードを使用するかどうかを指定します。 | Boolean | いいえ | false | 有効な値:
ファイルキャッシュモードを使用すると、サーバー側に書き込まれる小さなファイルの数が減りますが、データ出力のレイテンシが増加します。シンクテーブルの並列度が高い場合は、ファイルキャッシュモードを使用してください。 説明 このパラメーターは VVR 8.0.10 以降でのみサポートされています。 |
sink.file-cached.writer.num | ファイルキャッシュモードでの単一タスクの同時データアップロード数。 | Integer | いいえ | 16 |
説明 このパラメーターは VVR 8.0.10 以降でのみサポートされています。 |
sink.bucket.check-interval | ファイルキャッシュモードでのファイルサイズチェックの間隔。単位はミリ秒 (ms) です。 | Integer | いいえ | 60000 | このパラメーターは 説明 このパラメーターは VVR 8.0.10 以降でのみサポートされています。 |
sink.file-cached.rolling.max-size | ファイルキャッシュモードでの単一キャッシュファイルの最大サイズ。 | MemorySize | いいえ | 16 M |
説明 このパラメーターは VVR 8.0.10 以降でのみサポートされています。 |
sink.file-cached.memory | ファイルキャッシュモードでファイルを書き込むために使用される最大オフヒープメモリサイズ。 | MemorySize | いいえ | 64 MB | このパラメーターは 説明 このパラメーターは VVR 8.0.10 以降でのみサポートされています。 |
sink.file-cached.memory.segment-size | ファイルキャッシュモードでファイルを書き込むために使用されるバッファーサイズ。 | MemorySize | いいえ | 128 KB | このパラメーターは 説明 このパラメーターは VVR 8.0.10 以降でのみサポートされています。 |
sink.file-cached.flush.always | ファイルキャッシュモードでファイルを書き込む際にキャッシュを使用するかどうかを指定します。 | Boolean | いいえ | true | このパラメーターは 説明 このパラメーターは VVR 8.0.10 以降でのみサポートされています。 |
sink.file-cached.write.max-retries | ファイルキャッシュモードでのデータアップロードの再試行回数。 | Integer | いいえ | 3 | このパラメーターは 説明 このパラメーターは VVR 8.0.10 以降でのみサポートされています。 |
upsert.writer.max-retries | Upsert Writer がバケットへの書き込みに失敗した後の再試行回数。 | Integer | いいえ | 3 | 説明 このパラメーターは VVR 8.0.10 以降でのみサポートされています。 |
upsert.writer.buffer-size | Flink 内の単一の Upsert Writer のキャッシュサイズ。 | MemorySize | いいえ | 64 m |
説明 このパラメーターは VVR 8.0.10 以降でのみサポートされています。 |
upsert.writer.bucket.buffer-size | Flink 内の単一バケットのキャッシュサイズ。 | MemorySize | いいえ | 1 m | クラスターのメモリリソースが逼迫している場合は、このパラメーター値を減らすことができます。 説明 このパラメーターは VVR 8.0.10 以降でのみサポートされています。 |
upsert.write.bucket.num | 書き込み対象のテーブル内のバケット数。 | Integer | はい | なし | この値は、書き込み対象の Delta Lake テーブルの 説明 このパラメーターは VVR 8.0.10 以降でのみサポートされています。 |
upsert.write.slot-num | 単一セッションで使用される Tunnel スロットの数。 | Integer | いいえ | 1 | 説明 このパラメーターは VVR 8.0.10 以降でのみサポートされています。 |
upsert.commit.max-retries | Upsert セッションコミットの再試行回数。 | Integer | いいえ | 3 | 説明 このパラメーターは VVR 8.0.10 以降でのみサポートされています。 |
upsert.commit.thread-num | Upsert セッションコミットの並列度。 | Integer | いいえ | 16 | このパラメーターを非常に大きな値に設定しないでください。同時コミット数が多いほど多くのリソースを消費し、パフォーマンスの問題や過剰なリソース消費につながる可能性があります。 説明 このパラメーターは VVR 8.0.10 以降でのみサポートされています。 |
upsert.commit.timeout | Upsert セッションコミットのタイムアウト期間。単位は秒 (s) です。 | Integer | いいえ | 600 | 説明 このパラメーターは VVR 8.0.10 以降でのみサポートされています。 |
upsert.flush.concurrent | 単一パーティションに対して同時に書き込み可能なバケットの最大数。 | Integer | いいえ | 2 | バケット内のデータがフラッシュされるたびに、Tunnel Slot リソースを消費します。 説明 このパラメーターは VVR 8.0.10 以降でのみサポートされています。 |
insert.commit.thread-num | コミットセッションの並列度。 | Integer | いいえ | 16 | 説明 このパラメーターは VVR 8.0.10 以降でのみサポートされています。 |
insert.arrow-writer.enable | Arrow フォーマットを使用するかどうかを指定します。 | Boolean | いいえ | false | 有効な値:
説明 このパラメーターは VVR 8.0.10 以降でのみサポートされています。 |
insert.arrow-writer.batch-size | Arrow バッチ内の最大行数。 | Integer | いいえ | 512 | 説明 このパラメーターは VVR 8.0.10 以降でのみサポートされています。 |
insert.arrow-writer.flush-interval | Writer のフラッシュ間隔。単位はミリ秒 (ms) です。 | Integer | いいえ | 100000 | 説明 このパラメーターは VVR 8.0.10 以降でのみサポートされています。 |
insert.writer.buffer-size | Buffered Writer のキャッシュサイズ。 | MemorySize | いいえ | 64 MB | 説明 このパラメーターは VVR 8.0.10 以降でのみサポートされています。 |
upsert.partial-column.enable | 一部の列のみを更新するかどうかを指定します。 | Boolean | いいえ | false | このパラメーターは、シンクテーブルが Delta Lake テーブルの場合にのみ有効です。詳細については、「部分的な列の更新」をご参照ください。 有効な値:
データは、更新データのプライマリキーがシンクテーブルに存在するかどうかに応じて、異なる方法で書き込まれます:
説明 このパラメーターは VVR 8.0.11 以降でのみサポートされています。 |
ディメンションテーブル固有
ジョブが開始されると、MaxCompute ディメンションテーブルは指定されたパーティションから完全なデータをプルします。`partition` パラメーターは `max_pt()` などの関数をサポートします。キャッシュの有効期限が切れて再読み込みされると、`partition` パラメーターが再解析され、最新のパーティションからデータがプルされます。`max_two_pt()` を使用する場合、ディメンションテーブルは 2 つのパーティションからデータをプルできます。それ以外の場合は、単一のパーティションのみを指定できます。
パラメーター | 説明 | データ型 | 必須 | デフォルト値 | 備考 |
cache | キャッシュポリシー。 | String | はい | なし | 現在、MaxCompute ディメンションテーブルは ALL: ディメンションテーブルのすべてのデータをキャッシュします。ジョブが実行される前に、システムはディメンションテーブルからすべてのデータをキャッシュにロードします。以降のすべてのディメンションテーブルクエリはキャッシュを介して実行されます。キャッシュにデータが見つからない場合、そのキーは存在しません。キャッシュの有効期限が切れると、完全なキャッシュが再ロードされます。 説明
|
cacheSize | キャッシュするデータレコードの最大数。 | Long | いいえ | 100000 | ディメンションテーブルのデータ量が `cacheSize` を超えると、エラー 重要 大きなディメンションテーブルは多くの JVM ヒープメモリを消費し、ジョブの起動やディメンションテーブルの更新を遅くするため、本当にこの量のデータをキャッシュする必要があるか確認してください。必要な場合は、このパラメーターを手動で増やしてください。 |
cacheTTLMs | キャッシュのタイムアウト期間、つまりキャッシュの更新間隔。 | Long | いいえ | Long.MAX_VALUE (更新しない) | 単位はミリ秒です。 |
cacheReloadTimeBlackList | キャッシュ更新禁止時間。このパラメーターで指定された期間中、キャッシュは更新されません。 | String | いいえ | なし | プロモーション中のトラフィックピーク時など、重要な期間中にキャッシュが更新されるのを防ぎ、ジョブの不安定性を回避します。このパラメーターの設定方法については、「cacheReloadTimeBlackList パラメーターを設定するにはどうすればよいですか?」をご参照ください。 |
maxLoadRetries | キャッシュの更新試行の最大回数。ジョブ起動時の初期データプルを含みます。この回数を超えると、ジョブは失敗します。 | Integer | いいえ | 10 | なし。 |
データ型のマッピング
MaxCompute がサポートするデータ型の詳細については、「データ型 (2.0)」をご参照ください。
MaxCompute 型 | Flink 型 |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INTEGER |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL(precision, scale) | DECIMAL(precision, scale) |
CHAR(n) | CHAR(n) |
VARCHAR(n) | VARCHAR(n) |
STRING | STRING |
BINARY | BYTES |
DATE | DATE |
DATETIME | TIMESTAMP(3) |
TIMESTAMP | TIMESTAMP(9) |
TIMESTAMP_NTZ | TIMESTAMP(9) |
ARRAY | ARRAY |
MAP | MAP |
STRUCT | ROW |
JSON | STRING |
MaxCompute 物理テーブルにネストされた複合型フィールド (ARRAY、MAP、または STRUCT) と JSON 型フィールドの両方が含まれている場合、Flink がこれらのフィールドを正しく読み書きできるように、MaxCompute 物理テーブルを作成する際に tblproperties('columnar.nested.type'='true') を指定する必要があります。
データインジェスト (パブリックプレビュー)
MaxCompute コネクタをシンクとして使用し、YAML ベースのデータインジェストジョブでデータを書き込むことができます。
制限事項
この機能は VVR 11.1 以降のバージョンでのみサポートされています。
構文
source:
type: xxx
sink:
type: maxcompute
name: MaxComputeSink
access-id: ${your_accessId}
access-key: ${your_accessKey}
endpoint: ${your_maxcompute_endpoint}
project: ${your_project}
buckets-num: 8設定項目
設定項目 | 必須 | デフォルト値 | タイプ | 説明 |
type | はい | なし | String | 使用するコネクタを指定します。このパラメーターを |
name | いいえ | なし | String | シンクの名前。 |
access-id | はい | なし | String | Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID。AccessKey 管理ページで AccessKey ID を取得できます。 |
access-key | はい | なし | String | AccessKey ID に対応する AccessKey Secret。 |
endpoint | はい | なし | String | MaxCompute サービスのエンドポイント。MaxCompute プロジェクトを作成する際に選択したリージョンとネットワーク接続タイプに基づいてエンドポイントを設定する必要があります。各リージョンとネットワークタイプのエンドポイントの詳細については、「エンドポイント」をご参照ください。 |
project | はい | なし | String | MaxCompute プロジェクトの名前です。MaxCompute コンソールにログインし、[ワークスペース] > [プロジェクト管理] ページでプロジェクト名を取得できます。 |
tunnel.endpoint | いいえ | なし | String | MaxCompute Tunnel サービスのエンドポイント。この設定は通常、指定されたプロジェクトのリージョンに基づいて自動的にルーティングされます。プロキシを使用する場合など、特別なネットワーク環境でのみこの設定を使用してください。 |
quota.name | いいえ | なし | String | MaxCompute データ転送用の専用リソースグループの名前。この設定を指定しない場合、共有リソースグループが使用されます。詳細については、「Data Transmission Service の専用リソースグループの購入と使用」をご参照ください。 |
sts-token | いいえ | なし | String | 認証に RAM ロールによって発行された短期間の Security Token Service (STS) トークンを使用する場合にこのパラメーターを指定します。 |
buckets-num | いいえ | 16 | Integer | MaxCompute Delta Lake テーブルを自動的に作成する際に使用するバケットの数。この機能の使用方法の詳細については、「ニアリアルタイムデータウェアハウスの概要」をご参照ください。 |
compress.algorithm | いいえ | zlib | String | MaxCompute への書き込み時に使用されるデータ圧縮アルゴリズム。サポートされている値は |
total.buffer-size | いいえ | 64MB | String | メモリにバッファリングするデータの量 (パーティションレベル、または非パーティションテーブルの場合はテーブルレベル)。異なるパーティション (またはテーブル) のバッファーは独立しています。しきい値に達すると、データは MaxCompute に書き込まれます。 |
bucket.buffer-size | いいえ | 4MB | String | メモリにバッファリングするデータの量 (バケットレベル)。このパラメーターは Delta Lake テーブルへの書き込み時にのみ有効です。異なるデータバケットのバッファーは独立しています。しきい値に達すると、バケット内のデータは MaxCompute に書き込まれます。 |
commit.thread-num | いいえ | 16 | Integer | チェックポイントフェーズ中に同時に処理できるパーティション (またはテーブル) の数。 |
flush.concurrent-num | いいえ | 4 | Integer | MaxCompute にデータを書き込む際に同時に書き込み可能なバケットの数。このパラメーターは Delta Lake テーブルへの書き込み時にのみ有効です。 |
テーブルロケーションのマッピング
コネクタがテーブルを自動的に作成する際、ソーステーブルのロケーション情報を次のように MaxCompute テーブルにマッピングします。
MaxCompute プロジェクトがスキーマモデルをサポートしていない場合、各同期タスクは 1 つのデータベースのみを同期できます。たとえば、アップストリームのデータソースが MySQL の場合、同期タスクは 1 つの MySQL データベースのみを同期できます。これは他のデータソースにも適用されます。コネクタは `tableId.namespace` 情報を無視します。
データインジェストジョブのオブジェクト | MaxCompute のロケーション | MySQL のロケーション |
設定内の Project パラメーター | プロジェクト | なし |
TableId.namespace | スキーマ (この設定は MaxCompute プロジェクトがスキーマモデルをサポートしていない場合は無視されます。) | データベース |
TableId.tableName | テーブル | テーブル |
型マッピング
CDC 型 | MaxCompute 型 |
CHAR | STRING |
VARCHAR | STRING |
BOOLEAN | BOOLEAN |
BINARY/VARBINARY | BINARY |
DECIMAL | DECIMAL |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INTEGER | INTEGER |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
TIME_WITHOUT_TIME_ZONE | STRING |
DATE | DATE |
TIMESTAMP_WITHOUT_TIME_ZONE | TIMESTAMP_NTZ |
TIMESTAMP_WITH_LOCAL_TIME_ZONE | TIMESTAMP |
TIMESTAMP_WITH_TIME_ZONE | TIMESTAMP |
ARRAY | ARRAY |
MAP | MAP |
ROW | STRUCT |
例
SQL
ソーステーブルの例
完全なデータの読み取り
デフォルトでは、ソーステーブルは完全データモードであり、partition パラメーターで指定されたパーティションを読み取ります。
CREATE TEMPORARY TABLE odps_source (
cid VARCHAR,
rt DOUBLE
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpointName>',
'project' = '<yourProjectName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'partition' = 'ds=201809*'
);
CREATE TEMPORARY TABLE blackhole_sink (
cid VARCHAR,
invoke_count BIGINT
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink
SELECT
cid,
COUNT(*) AS invoke_count
FROM odps_source GROUP BY cid;増分データの読み取り
指定された startPartition からデータを増分的に読み取ります。
CREATE TEMPORARY TABLE odps_source (
cid VARCHAR,
rt DOUBLE
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpointName>',
'project' = '<yourProjectName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'startPartition' = 'yyyy=2018,MM=09,dd=05' -- 20180905 のパーティションからデータを読み取ります。
);
CREATE TEMPORARY TABLE blackhole_sink (
cid VARCHAR,
invoke_count BIGINT
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink
SELECT cid, COUNT(*) AS invoke_count
FROM odps_source GROUP BY cid;シンクテーブルの例
静的パーティションへの書き込み
partition に静的なパーティション値を指定します。
CREATE TEMPORARY TABLE datagen_source (
id INT,
len INT,
content VARCHAR
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE odps_sink (
id INT,
len INT,
content VARCHAR
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpoint>',
'project' = '<yourProjectName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'partition' = 'ds=20180905' -- 静的パーティション ds=20180905 にデータを書き込みます。
);
INSERT INTO odps_sink
SELECT
id, len, content
FROM datagen_source;動的パーティションへの書き込み
テーブルのパーティションキー列に基づいて partition を指定します。
CREATE TEMPORARY TABLE datagen_source (
id INT,
len INT,
content VARCHAR,
c TIMESTAMP
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE odps_sink (
id INT,
len INT,
content VARCHAR,
ds VARCHAR -- 動的パーティション列は明示的に宣言する必要があります。
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpoint>',
'project' = '<yourProjectName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'partition' = 'ds' -- パーティション値が指定されていない場合、データは ds フィールドの値に基づいて異なるパーティションに書き込まれます。
);
INSERT INTO odps_sink
SELECT
id,
len,
content,
DATE_FORMAT(c, 'yyMMdd') as ds
FROM datagen_source;ディメンションテーブルの例
1 対 1 のディメンションテーブル
1 対 1 のディメンションテーブルにはプライマリキーを宣言する必要があります。
CREATE TEMPORARY TABLE datagen_source (
k INT,
v VARCHAR
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE odps_dim (
k INT,
v VARCHAR,
PRIMARY KEY (k) NOT ENFORCED -- 1 対 1 のディメンションテーブルにはプライマリキーを宣言する必要があります。
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpoint>',
'project' = '<yourProjectName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'partition' = 'ds=20180905',
'cache' = 'ALL'
);
CREATE TEMPORARY TABLE blackhole_sink (
k VARCHAR,
v1 VARCHAR,
v2 VARCHAR
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink
SELECT k, s.v, d.v
FROM datagen_source AS s
INNER JOIN odps_dim FOR SYSTEM_TIME AS OF PROCTIME() AS d ON s.k = d.k;1 対多のディメンションテーブル
1 対多のディメンションテーブルにはプライマリキーは必要ありません。
CREATE TEMPORARY TABLE datagen_source (
k INT,
v VARCHAR
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE odps_dim (
k INT,
v VARCHAR
-- 1 対多のディメンションテーブルにはプライマリキーは必要ありません。
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpoint>',
'project' = '<yourProjectName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'partition' = 'ds=20180905',
'cache' = 'ALL'
);
CREATE TEMPORARY TABLE blackhole_sink (
k VARCHAR,
v1 VARCHAR,
v2 VARCHAR
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink
SELECT k, s.v, d.v
FROM datagen_source AS s
INNER JOIN odps_dim FOR SYSTEM_TIME AS OF PROCTIME() AS d ON s.k = d.k;DataStream
DataStream を使用してデータを読み書きするには、対応する DataStream コネクタを使用して Flink に接続する必要があります。DataStream コネクタの設定方法の詳細については、「DataStream コネクタの使用方法」をご参照ください。
知的財産を保護するため、VVR 6.0.6 以降、このコネクタは単一のローカルデバッグジョブの実行時間を 30 分に制限します。30 分が経過すると、ジョブはエラーを報告して終了します。MaxCompute コネクタを含むジョブをローカルで実行およびデバッグする方法の詳細については、「コネクタを含むジョブのローカルでの実行とデバッグ」をご参照ください。
このコネクタは、
primary keyとtransactional=trueで作成されたテーブルである Delta Lake テーブルからの読み取りをサポートしていません。詳細については、「基本概念」をご参照ください。
DataStream ジョブで MaxCompute コネクタを使用するには、SQL を使用して MaxCompute テーブルを宣言し、その後、Table と DataStream の間で変換を行い、MaxCompute テーブルとデータストリームを接続する必要があります。
ソーステーブルへの接続
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql(String.join(
"\n",
"CREATE TEMPORARY TABLE IF NOT EXISTS odps_source (",
" cid VARCHAR,",
" rt DOUBLE",
") WITH (",
" 'connector' = 'odps',",
" 'endpoint' = '<yourEndpointName>',",
" 'project' = '<yourProjectName>',",
" 'tableName' = '<yourTableName>',",
" 'accessId' = '<yourAccessId>',",
" 'accessKey' = '<yourAccessPassword>',",
" 'partition' = 'ds=201809*'",
")");
DataStream<Row> source = tEnv.toDataStream(tEnv.from("odps_source"));
source.print();
env.execute("odps source"); 結果テーブルへの接続
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql(String.join(
"\n",
"CREATE TEMPORARY TABLE IF NOT EXISTS odps_sink (",
" cid VARCHAR,",
" rt DOUBLE",
") WITH (",
" 'connector' = 'odps',",
" 'endpoint' = '<yourEndpointName>',",
" 'project' = '<yourProjectName>',",
" 'tableName' = '<yourTableName>',",
" 'accessId' = '<yourAccessId>',",
" 'accessKey' = '<yourAccessPassword>',",
" 'partition' = 'ds=20180905'",
")");
DataStream<Row> data = env.fromElements(
Row.of("id0", 3.),
Row.of("id1", 4.));
tEnv.fromDataStream(data).insertInto("odps_sink").execute();XML
MaxCompute コネクタの Maven 依存関係には、全量ソーステーブル、増分ソーステーブル、結果テーブル、およびディメンションテーブルをビルドするために必要なクラスが含まれています。MaxCompute DataStream コネクタは Maven セントラルリポジトリで入手できます。
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-odps</artifactId>
<version>${vvr-version}</version>
</dependency>よくある質問
全量および増分 MaxCompute ソーステーブルは、どのようにして MaxCompute からデータを読み取りますか。
全量または増分 MaxCompute ソーステーブルを使用するジョブは、一時停止して再開した後に並列度を変更することをサポートしていますか。
増分 MaxCompute ソーステーブルが、まだ書き込み中の新しいパーティションを検出した場合はどうすればよいですか。
MaxCompute コネクタの実行時エラー:Authorization Failed [4019]: Insufficient privileges
増分 MaxCompute ソーステーブルを使用するジョブが、開始後、長時間データの読み取りを開始しないのはなぜですか。
パーティションからの読み取りまたはパーティションへの書き込みを行う際に、partition パラメーターはどのように指定しますか。
MaxCompute ソーステーブルを使用するジョブが起動中の状態のままになる、または正常に起動した後にデータの生成に時間がかかるのはなぜですか。
MaxCompute 結果テーブルを使用するジョブの実行時に「Invalid partition spec」エラーが報告された場合はどうすればよいですか。
MaxCompute 結果テーブルを使用するジョブの実行時に「No more available blockId」エラーが報告された場合はどうすればよいですか。