本トピックでは、MaxCompute コネクタの使用方法について説明します。
背景情報
MaxCompute(旧称:ODPS)は、大規模データウェアハウス向けの高速かつフルマネージドなコンピューティングプラットフォームです。MaxCompute はエクサバイト規模のデータを処理できます。また、データウェアハウスにおける大量の構造化データの保存・計算に加え、分析およびモデリングサービスも提供します。
以下の表に、MaxCompute コネクタがサポートする機能を示します。
項目 | 説明 |
対応タイプ | ソーステーブル、ディメンションテーブル、結果テーブル |
実行モード | ストリーミングモードおよびバッチモード |
データフォーマット | 該当なし |
メトリック | |
API 種別 | DataStream API および SQL API |
結果テーブルへのデータ更新または削除 | MaxCompute Batch Tunnel または MaxCompute Streaming Tunnel を使用する場合、結果テーブルへのデータ挿入のみ可能です。一方、MaxCompute Upsert Tunnel を使用する場合は、結果テーブルへのデータ更新・削除および挿入が可能です。 |
前提条件
MaxCompute テーブルが作成されています。詳細については、「テーブルの作成」をご参照ください。
制限事項
MaxCompute コネクタは、at-least-once セマンティクスのみをサポートします。
説明at-least-once セマンティクスは、データ損失を防止するために使用されます。特定のケースでは、MaxCompute へ重複したデータが書き込まれる可能性があります。重複データの発生は、使用するトンネルの種類によって異なります。「MaxCompute トンネル」の選択方法については、「上流・下流ストレージに関するよくある質問」トピックの「データトンネルの選択方法」セクションをご参照ください。
デフォルトでは、ソースがフルモードで動作する場合、
partitionオプションで指定されたパーティションからデータのみを読み取ります。指定パーティションの全データを読み取ると、ジョブは終了し、新規パーティションの監視は行われません。新規パーティションを継続的に監視するには、WITH 句で
startPartitionオプションを指定して、増分ソースを作成します。説明ディメンションテーブルが更新されるたびに、最新のパーティションをチェックします。
ソーステーブルの実行開始後、パーティションに新たに追加されたデータは読み取られません。パーティションに完全なデータが含まれている状態でデプロイメントを実行することを推奨します。
SQL
MaxCompute コネクタは、SQL ベースのジョブにおいて、ソーステーブル、ディメンションテーブル、または結果テーブルとして使用できます。
構文
CREATE TEMPORARY TABLE odps_source(
id INT,
user_name VARCHAR,
content VARCHAR
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpoint>',
'project' = '<yourProjectName>',
'schemaName' = '<yourSchemaName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'partition' = 'ds=2018****'
);コネクタオプション
一般設定
オプション
説明
データ型
必須
デフォルト値
備考
connector
テーブルの種別。
STRING
はい
デフォルト値なし
値を odps に設定します。
endpoint
MaxCompute のエンドポイント。
STRING
はい
デフォルト値なし
詳細については、「エンドポイント」をご参照ください。
tunnelEndpoint
MaxCompute Tunnel のエンドポイント。
STRING
いいえ
デフォルト値なし
詳細については、「エンドポイント」をご参照ください。
説明このオプションを指定しない場合、MaxCompute は Server Load Balancer (SLB) サービスに基づいてトンネル接続を割り当てます。
project
MaxCompute プロジェクトの名前。
STRING
はい
デフォルト値なし
該当なし。
schemaName
MaxCompute スキーマの名前。
STRING
いいえ
デフォルト値なし
このオプションは、MaxCompute スキーマ機能が有効化されている場合にのみ必須です。その場合、このオプションを MaxCompute テーブルのスキーマ名に設定する必要があります。「スキーマ操作」をご参照ください。
説明このオプションは VVR 8.0.6 以降でのみサポートされます。
tableName
MaxCompute テーブルの名前。
STRING
はい
デフォルト値なし
該当なし。
accessId
MaxCompute へのアクセスに使用する AccessKey ID。
STRING
はい
デフォルト値なし
詳細については、「アカウントの AccessKey ID および AccessKey Secret の情報を確認する方法」をご参照ください。
重要AccessKey ペアの保護のため、変数を使用して AccessKey ID を設定することを推奨します。「変数の管理」をご参照ください。
accessKey
MaxCompute へのアクセスに使用する AccessKey Secret。
STRING
はい
デフォルト値なし
partition
MaxCompute テーブル内のパーティション名。
STRING
いいえ
デフォルト値なし
非パーティション化テーブルまたは増分ソースの場合、このオプションを指定する必要はありません。
説明パーティション化テーブルにおける partition オプションの設定方法については、「上流・下流ストレージに関するよくある質問」トピックの「パーティションからのデータ読み取り/書き込み時に partition オプションを設定する方法」セクションをご参照ください。
compressAlgorithm
MaxCompute Tunnel で使用される圧縮アルゴリズム。
STRING
いいえ
SNAPPY
有効な値:
RAW(圧縮なし)
ZLIB
SNAPPY
ZLIB と比較して、SNAPPY はスループットを大幅に向上させます。テストシナリオでは、スループットが約 50 % 向上します。
quotaName
MaxCompute 専用トンネルリソースグループのクォータ名。
STRING
いいえ
デフォルト値なし
このオプションを指定すると、MaxCompute 専用トンネルリソースグループを使用できます。
重要このオプションは VVR 8.0.3 以降でのみサポートされます。
このオプションを指定する場合、tunnelEndpoint オプションを削除する必要があります。そうでない場合、tunnelEndpoint オプションで指定されたトンネルが使用されます。
ソース固有
オプション
説明
データ型
必須
デフォルト値
備考
maxPartitionCount
データ読み取り対象の最大パーティション数。
INTEGER
いいえ
100
読み取り対象のパーティション数がこのオプションの値を超えると、次のエラーメッセージが表示されます:
"一致したパーティション数がデフォルト制限を超えています"。重要過剰なパーティションからの読み取りは MaxCompute の負荷を高め、ジョブの起動を遅くする可能性があります。ビジネス要件により必要であれば、手動でこのオプションの値を調整してください。
useArrow
Arrow フォーマットを使用してデータを読み取るかどうかを指定します。
BOOLEAN
いいえ
false
Arrow フォーマットは、MaxCompute のストレージ API オペレーションを呼び出すために使用できます。
重要このオプションはバッチデプロイメントでのみ有効です。
このオプションは VVR 8.0.8 以降でのみサポートされます。
splitSize
Arrow フォーマットを使用してデータを読み取る際の 1 回のデータ取得サイズ。
MEMORYSIZE
いいえ
256 MB
このオプションは VVR 8.0.8 以降でのみサポートされます。
重要このオプションはバッチデプロイメントでのみ有効です。
compressCodec
Arrow フォーマットを使用してデータを読み取る際の圧縮アルゴリズム。
STRING
いいえ
""
有効な値:
""(圧縮なし)
ZSTD
LZ4_FRAME
圧縮アルゴリズムを指定すると、圧縮なしと比較してスループットを向上させることができます。
重要このオプションはバッチデプロイメントでのみ有効です。
このオプションは VVR 8.0.8 以降でのみサポートされます。
dynamicLoadBalance
シャードの動的割り当てを有効化するかどうかを指定します。
BOOLEAN
いいえ
false
有効な値:
true
false
シャードの動的割り当てにより、Realtime Compute for Apache Flink の異なる演算子の処理パフォーマンスが向上し、MaxCompute からの読み取り全体にかかる時間が短縮されます。ただし、異なる演算子が読み取るデータ量が不均一になるため、データスキューが発生する可能性があります。
重要このオプションはバッチデプロイメントでのみ有効です。
このオプションは VVR 8.0.8 以降でのみサポートされます。
増分 MaxCompute ソーステーブル固有のオプション
増分テーブルソースは、すべてのパーティション情報を取得するために、MaxCompute サーバーに対して間欠的にポーリングを実行し、新しいパーティションを監視します。ソースが新しいパーティションからデータの読み取りを開始する前に、そのパーティションへのデータ書き込みが完了している必要があります。詳細については、「アップストリームおよびダウンストリームストレージに関するよくある質問」トピックの「データがまだパーティションに書き込まれているときに、増分 MaxCompute ソーステーブルが新しいパーティションを検出したらどうすればよいですか?」セクションをご参照ください。 startPartition オプションを設定して、データの読み取りを開始するパーティションを指定できます。startPartition オプションで指定されたパーティションの アルファベット順以上であるパーティションのみのデータが読み取られます。たとえば、パーティション
year=2023,month=10のアルファベット順は、パーティションyear=2023,month=9のアルファベット順より小さくなります。この場合、コード内で宣言されているパーティション名の月の数値の前にゼロを追加することで、パーティションのアルファベット順が有効になるようにできます。これにより、パーティションオプションの値を year=2023,month=9 からyear=2023,month=09に変更できます。オプション
説明
データ型
必須
デフォルト値
備考
startPartition
増分データ読み取りを開始するパーティション。
STRING
はい
デフォルト値なし
このオプションを指定すると、増分ソースが使用され、partition オプションは無視されます。
ソーステーブルが多段パーティション化テーブルの場合、パーティションレベルに従って各パーティション列の値を降順で設定する必要があります。
説明startPartition オプションの設定方法については、「上流・下流ストレージに関するよくある質問」トピックの「増分 MaxCompute ソーステーブルの startPartition オプションを設定する方法」セクションをご参照ください。
subscribeIntervalInSec
パーティション情報を取得するために MaxCompute に対してポーリングを行う間隔。
INTEGER
いいえ
30
単位:秒。
modifiedTableOperation
パーティション読み取り中にパーティション内のデータが変更された場合の操作。
Enum(NONE、SKIP)
いいえ
NONE
ダウンロードセッションはチェックポイントに保存されます。チェックポイントからセッションを再開するたびに、Realtime Compute for Apache Flink はセッションの読み取り進捗を復元しようと試みます。しかし、パーティション内のデータが変更された場合、セッションは利用不可となります。この場合、デプロイメントが繰り返し再起動されます。この問題を解決するには、このオプションを指定します。有効な値:
NONE:このオプションを NONE に設定した場合、startPartition オプションの値を変更し、利用不可となったパーティションよりアルファベット順で後のパーティションを指定して、ステートなしでデプロイメントを開始する必要があります。
SKIP:ステートなしでデプロイメントを開始したくない場合は、このオプションを SKIP に設定できます。この場合、Realtime Compute for Apache Flink はチェックポイントからセッションを再開しようとする際に、利用不可となったパーティションをスキップします。
重要このオプションは VVR 8.0.3 以降でのみサポートされます。
このオプションを NONE または SKIP に設定した場合、変更されたパーティションから読み取られたデータは保持され、読み取られなかったデータは無視されます。
シンク専用
オプション
説明
データ型
必須
デフォルト値
備考
useStreamTunnel
MaxCompute Streaming Tunnel を使用してデータをアップロードするかどうかを指定します。
BOOLEAN
いいえ
false
有効な値:
true:MaxCompute Streaming Tunnel を使用してデータをアップロードします。
false:MaxCompute Batch Tunnel を使用してデータをアップロードします。
説明トンネルの選択方法については、「上流・下流ストレージに関するよくある質問」トピックの「データトンネルの選択方法」セクションをご参照ください。
flushIntervalMs
MaxCompute Tunnel のライターのバッファー内でフラッシュ操作を実行する間隔。
LONG
いいえ
30000(30 秒)
データはバッファーに蓄積され、
flushIntervalMsで指定された間隔でバッチ処理でフラッシュされます。Streaming Tunnel:フラッシュされたデータは、送信先の MaxCompute テーブルで即座に利用可能になります。
Batch Tunnel:フラッシュされたデータは、チェックポイント操作が完了した後にのみ利用可能になります。スケジュールされたフラッシュ機能を無効化するには、このオプションを 0 に設定することを推奨します。
単位:ミリ秒。
説明このオプションは
batchSizeオプションと併用できます。batchSizeオプションまたはflushIntervalMsオプションで指定された条件のいずれかが満たされると、フラッシュ操作がトリガーされます。batchSize
MaxCompute Tunnel のバッファー容量。
LONG
いいえ
67108864(64 MB)
MaxCompute sink は、データをバッファーに挿入します。その後、バッファー内のデータのサイズが batchSize オプションで指定された値を超えると、MaxCompute sink はバッファー内のデータを送信先の MaxCompute テーブルに書き込みます。
単位:バイト。
説明このオプションは flushIntervalMs オプションと併用できます。どちらかの条件が満たされると、フラッシュ操作がトリガーされます。
numFlushThreads
MaxCompute Tunnel のライターのバッファー内のデータをフラッシュするために使用されるスレッド数。
INTEGER
いいえ
1
各 MaxCompute 結果テーブルは、numFlushThreads オプションで指定された数のスレッドを作成してデータをフラッシュします。このオプションの値が 1 より大きい場合、異なるパーティションのデータを同時にフラッシュできます。これにより、フラッシュ操作の効率が向上します。
slotNum
Flink から MaxCompute へデータを受信する際に MaxCompute が使用するトンネルスロット数。
INTEGER
いいえ
0
スロット数の制限については、「MaxCompute ドキュメント」の「データ伝送サービスの概要」をご参照ください。
dynamicPartitionLimit
データ書き込み対象の動的パーティションの最大数。
INTEGER
いいえ
100
チェックポイント間で結果テーブルへ書き込まれる動的パーティション数が dynamicPartitionLimit オプションの値を超えると、次のエラーメッセージが表示されます:
"動的パーティション数が多すぎます"。重要MaxCompute テーブルの多数のパーティションへデータを書き込むと、MaxCompute サービスのワークロードが高まり、チェックポイントおよびフラッシュの速度が低下します。これを防ぐため、多数のパーティションへデータを書き込む必要があるかどうかを確認してください。ビジネス要件により多数のパーティションへデータを書き込む必要がある場合は、手動で dynamicPartitionLimit オプションの値を増加させてください。
retryTimes
MaxCompute サーバーに対するリクエストの最大リトライ回数。
INTEGER
いいえ
3
セッションの作成、セッションの送信、またはデータのフラッシュ時に、MaxCompute サービスが一時的に利用不可になることがあります。MaxCompute サービスが利用不可になった場合、このオプションの設定に基づいて MaxCompute サーバーへのリクエストが行われます。
sleepMillis
リトライ間隔。
INTEGER
いいえ
1000
単位:ミリ秒。
enableUpsert
MaxCompute Upsert Tunnel を使用してデータをアップロードするかどうかを指定します。
BOOLEAN
いいえ
false
有効な値:
true:Realtime Compute for Apache Flink で INSERT、UPDATE_AFTER、DELETE データを処理するために MaxCompute Upsert Tunnel を使用します。
false:Realtime Compute for Apache Flink で INSERT および UPDATE_AFTER データを処理するために、useStreamTunnel オプションで指定された MaxCompute Batch Tunnel または MaxCompute Streaming Tunnel を使用します。
重要MaxCompute 結果テーブルがアップサートモードでセッションをコミットする際に、エラー、デプロイメント失敗、長時間の処理障害などの問題が発生した場合、sink 演算子の並列処理の次数を 10 以下に設定することを推奨します。
このオプションは VVR 8.0.6 以降でのみサポートされます。
upsertAsyncCommit
MaxCompute 結果テーブルがアップサートモードでセッションをコミットする際に非同期モードを使用するかどうかを指定します。
BOOLEAN
いいえ
false
有効な値:
true:非同期モードを使用します。非同期モードを使用すると、セッションのコミットにかかる時間が短縮されますが、セッションコミット後に書き込まれたデータは即座にクエリできません。
false:デフォルトで同期モードが使用されます。MaxCompute 結果テーブルがセッションをコミットする際、システムはサーバーによるセッション処理が完了するまで待機します。
説明このオプションは VVR 8.0.6 以降でのみサポートされます。
upsertCommitTimeoutMs
MaxCompute 結果テーブルがアップサートモードでセッションをコミットする際のタイムアウト期間。
INTEGER
いいえ
120000
(120 秒)
単位:ミリ秒。
説明このオプションは VVR 8.0.6 以降でのみサポートされます。
sink.operation
Delta テーブルへの書き込み操作モード。
STRING
いいえ
insert
有効な値:
insert:データを追加モードでテーブルに書き込みます。
upsert:データを更新します。
説明このオプションは VVR 8.0.10 以降でのみサポートされます。
sink.parallelism
Delta テーブルへのデータ書き込み時の並列処理の次数。
INTEGER
いいえ
なし
データ書き込みの並列度。このオプションを設定しない場合、デフォルトで上流のデータ並列度が使用されます。
このオプションは VVR 8.0.10 以降でのみサポートされます。
重要write.bucket.num オプションの値が sink.parallelism オプションの値の整数倍になるように設定してください。これにより、最適な書き込みパフォーマンスが得られ、sink ノードのメモリ消費を効率的に節約できます。
sink.file-cached.enable
Delta テーブルの動的パーティションへのデータ書き込み時にファイルキャッシュモードを有効化するかどうかを指定します。
BOOLEAN
いいえ
false
有効な値:
true:ファイルキャッシュモードを有効化します。
false:ファイルキャッシュモードは無効化されています。
ファイルキャッシュモードを有効化すると、サーバーへ書き込まれる小規模ファイルの数が減少しますが、書き込みレイテンシーが高くなります。sink テーブルの並列度が高い場合、ファイルキャッシュモードを有効化することを推奨します。
説明このオプションは VVR 8.0.10 以降でのみサポートされます。
sink.file-cached.writer.num
ファイルキャッシュモードで、1 つのタスク内でデータを同時アップロードするために使用されるスレッド数。
INTEGER
いいえ
16
このオプションは sink.file-cached.enable オプションが true に設定されている場合にのみ有効です。
このオプションの値を大きく設定しないことを推奨します。多数のパーティションへ同時にデータを書き込むと、メモリ不足(OOM)エラーが発生する可能性があります。
説明このオプションは VVR 8.0.10 以降でのみサポートされます。
sink.bucket.check-interval
ファイルキャッシュモードでファイルサイズをチェックする間隔。単位:ミリ秒。
INTEGER
いいえ
60000
このオプションは sink.file-cached.enable オプションが true に設定されている場合にのみ有効です。
このオプションは VVR 8.0.10 以降でのみサポートされます。
sink.file-cached.rolling.max-size
ファイルキャッシュモードにおける単一キャッシュファイルの最大値。
MEMORYSIZE
いいえ
16 MB
このオプションは sink.file-cached.enable オプションが true に設定されている場合にのみ有効です。
ファイルサイズがこのオプションの値を超えると、ファイルデータがサーバーへアップロードされます。
説明このオプションは VVR 8.0.10 以降でのみサポートされます。
sink.file-cached.memory
ファイルキャッシュモードでファイルへのデータ書き込みに使用されるオフヒープメモリの最大サイズ。
MEMORYSIZE
いいえ
64 MB
このオプションは sink.file-cached.enable オプションが true に設定されている場合にのみ有効です。
このオプションは VVR 8.0.10 以降でのみサポートされます。
sink.file-cached.memory.segment-size
ファイルキャッシュモードでファイルへのデータ書き込みに使用されるバッファーのサイズ。
MEMORYSIZE
いいえ
128 KB
このオプションは sink.file-cached.enable オプションが true に設定されている場合にのみ有効です。
このオプションは VVR 8.0.10 以降でのみサポートされます。
sink.file-cached.flush.always
ファイルキャッシュモードでファイルへのデータ書き込みにキャッシュを使用するかどうかを指定します。
BOOLEAN
いいえ
true
このオプションは sink.file-cached.enable オプションが true に設定されている場合にのみ有効です。
このオプションは VVR 8.0.10 以降でのみサポートされます。
sink.file-cached.write.max-retries
ファイルキャッシュモードでのデータアップロードのリトライ回数。
INTEGER
いいえ
3
このオプションは sink.file-cached.enable オプションが true に設定されている場合にのみ有効です。
このオプションは VVR 8.0.10 以降でのみサポートされます。
upsert.writer.max-retries
Upsert Writer セッションでバケットへのデータ書き込みの最大リトライ回数。
INTEGER
いいえ
3
このオプションは VVR 8.0.10 以降でのみサポートされます。
upsert.writer.buffer-size
Realtime Compute for Apache Flink における Upsert Writer セッションのバッファー容量。
MEMORYSIZE
いいえ
64 MB
すべてのバケットの合計バッファー容量が指定されたしきい値に達すると、システムが自動的にデータをサーバーへ更新します。
説明Upsert Writer セッションのデータは、同時に複数のバケットへ書き込まれることがあります。書き込み効率を向上させるには、このオプションの値を増加させることを推奨します。
多数のパーティションへデータを書き込む場合、OOM エラーが発生する可能性があります。これを防ぐには、このオプションの値を減らすことができます。
このオプションは VVR 8.0.10 以降でのみサポートされます。
upsert.writer.bucket.buffer-size
Realtime Compute for Apache Flink における単一バケットのバッファー容量。
MEMORYSIZE
いいえ
1 MB
Flink サーバーのメモリリソースが不足している場合、このオプションの値を減らすことができます。
このオプションは VVR 8.0.10 以降でのみサポートされます。
upsert.write.bucket.num
データ書き込み先テーブルのバケット数。
INTEGER
はい
なし
このオプションの値は、データ書き込み先 Delta テーブルで設定された
write.bucket.numオプションの値と同じにする必要があります。このオプションは VVR 8.0.10 以降でのみサポートされます。
upsert.write.slot-num
セッションで使用されるトンネルスロット数。
INTEGER
いいえ
1
このオプションは VVR 8.0.10 以降でのみサポートされます。
upsert.commit.max-retries
アップサートセッションコミットの最大リトライ回数。
INTEGER
いいえ
3
このオプションは VVR 8.0.10 以降でのみサポートされます。
upsert.commit.thread-num
アップサートセッションコミットの並列処理の次数。
INTEGER
いいえ
16
このオプションの値を大きすぎないように設定することを推奨します。過剰なアップサートセッションコミットが同時に実行されると、リソース消費が増加し、パフォーマンスの問題や過剰なリソース消費が発生する可能性があります。
このオプションは VVR 8.0.10 以降でのみサポートされます。
upsert.commit.timeout
アップサートセッションコミットのタイムアウト期間。単位:秒。
INTEGER
いいえ
600
このオプションは VVR 8.0.10 以降でのみサポートされます。
upsert.flush.concurrent
1 つのパーティション内のデータを同時に書き込める最大バケット数。
INTEGER
いいえ
2
バケット内のデータがリフレッシュされるたびに、トンネルスロットが占有されます。
このオプションは VVR 8.0.10 以降でのみサポートされます。
insert.commit.thread-num
コミットセッションの並列処理の次数。
INTEGER
いいえ
16
このオプションは VVR 8.0.10 以降でのみサポートされます。
insert.arrow-writer.enable
Arrow フォーマットを使用するかどうかを指定します。
BOOLEAN
いいえ
false
有効な値:
true:Arrow フォーマットを使用します。
false:Arrow フォーマットを使用しません。
説明このオプションは VVR 8.0.10 以降でのみサポートされます。
insert.arrow-writer.batch-size
Arrow 形式のデータの 1 バッチあたりの最大行数。
INTEGER
いいえ
512
このオプションは VVR 8.0.10 以降でのみサポートされます。
insert.arrow-writer.flush-interval
ライターがデータをフラッシュする間隔。単位:ミリ秒。
INTEGER
いいえ
100000
このオプションは VVR 8.0.10 以降でのみサポートされます。
insert.writer.buffer-size
バッファライターのキャッシュサイズ。
MEMORYSIZE
いいえ
64 MB
このオプションは VVR 8.0.10 以降でのみサポートされます。
upsert.partial-column.enable
特定のカラムのデータのみを更新するかどうかを指定します。
BOOLEAN
いいえ
false
このオプションは、MaxCompute Delta テーブルへデータを書き込む結果テーブルにのみ適用されます。「MaxCompute ドキュメント」の「特定カラムのデータ更新」をご参照ください。
有効な値:
true
false
データ更新の動作は、sink に新しいデータと同じプライマリキーを持つレコードが存在するかどうかによって異なります。
sink テーブルに同じプライマリキーを持つデータが存在する場合、プライマリキーに基づいて対応するフィールドが更新されます。具体的には、値が
nullでない場合、指定されたフィールドが新しい値で上書きされます。sink テーブルに同じプライマリキーを持つレコードが存在しない場合、新しいレコードが追加されます。指定されたカラムには新しい値が挿入され、他のすべてのカラムには
nullが挿入されます。
説明このオプションは VVR 8.0.11 以降でのみサポートされます。
ディメンションテーブル固有
デプロイメント開始時に、ディメンションテーブルは partition オプションで指定されたパーティションから完全データをプルします。このオプションは max_pt() 関数をサポートしています。キャッシュエントリの有効期限が切れてキャッシュが再読み込みされる場合、partition オプションで指定された最新のパーティションのデータが再解析されます。partition オプションが max_two_pt() に設定されている場合、ディメンションテーブルは 2 つのパーティションからデータをプルできます。それ以外の場合、1 つのパーティションからのみデータをプルできます。
オプション
説明
データ型
必須
デフォルト値
備考
cache
キャッシュポリシー。
STRING
はい
デフォルト値なし
ディメンションテーブルでは、cache オプションを
ALLに設定し、DDL 文で明示的に宣言する必要があります。リモートテーブルのデータ量が少なく、多数の欠落キーが存在する場合、このオプションを ALL に設定することを推奨します。ソースとディメンションテーブルは ON 句に基づいて関連付けられません。ALL:ディメンションテーブルのすべてのデータをキャッシュすることを意味します。システムがデプロイメントを実行する前に、ディメンションテーブルのすべてのデータがキャッシュに読み込まれます。その後、ディメンションテーブルに対するすべてのクエリはキャッシュを検索します。キーが存在しない場合、システムはキャッシュ内にデータレコードを見つけることができません。キャッシュエントリの有効期限が切れた後、システムはキャッシュ内のすべてのデータを再読み込みします。
説明cache オプションを ALL に設定する場合、ディメンションテーブルのデータを非同期で読み込むため、結合ノードのメモリを増加させる必要があります。リモートテーブルのデータ量の少なくとも 4 倍のメモリサイズを増加させることを推奨します。メモリサイズは MaxCompute のストレージ圧縮アルゴリズムに関係します。
ディメンションテーブルのデータ量が非常に多い場合、SHUFFLE_HASH ヒントを使用してデータを各サブタスクに均等に分散させることができます。「上流・下流ストレージに関するよくある質問」トピックの「ディメンションテーブルで SHUFFLE_HASH ヒントを使用する方法」セクションをご参照ください。
超大規模なディメンションテーブルを使用する場合、Java 仮想マシン(JVM)のガーベジコレクション(GC)が頻繁に発生し、デプロイメント例外が発生する可能性があります。この問題を解決するには、ディメンションテーブルが他のテーブルと結合されるノードのメモリを増加させます。それでも問題が解決しない場合は、LRU(Least Recently Used)キャッシュポリシーをサポートするキー・バリュー型のディメンションテーブルに変換することを推奨します。たとえば、ApsaraDB for HBase ディメンションテーブルをキー・バリュー型のディメンションテーブルとして使用できます。
cacheSize
キャッシュ可能なデータ行数の最大値。
LONG
いいえ
100000
ディメンションテーブルのデータレコード数が cacheSize オプションの値を超えると、次のエラーメッセージが表示されます:
"テーブル <table-name> のパーティション <partition-name> の行数が maxRowCount 制限を超えています"。重要ディメンションテーブルに大量のデータレコードが存在する場合、JVM ヒープメモリの消費量が大きくなります。この場合、デプロイメントの起動速度およびディメンションテーブルの更新速度が低下します。これを防ぐため、大量のデータレコードをキャッシュする必要があるかどうかを確認してください。ビジネス要件によりディメンションテーブルに大量のデータレコードをキャッシュする必要がある場合は、手動でこのオプションの値を増加させてください。
cacheTTLMs
キャッシュのタイムアウト期間。
LONG
いいえ
Long.MAX_VALUE
単位:ミリ秒。
cacheReloadTimeBlackList
キャッシュがリフレッシュされない時間帯。このオプションで指定された時間帯には、キャッシュがリフレッシュされません。
STRING
いいえ
デフォルト値なし
このオプションは、ピーク時間のキャンペーンなど、大規模なオンラインプロモーションイベントに適用されます。このオプションを指定することで、キャッシュのリフレッシュ時にデプロイメントが不安定になるのを防ぐことができます。「上流・下流ストレージに関するよくある質問」トピックの「CacheReloadTimeBlackList オプションの設定方法」セクションをご参照ください。
maxLoadRetries
キャッシュのリフレッシュを試行する最大リトライ回数。デプロイメント開始時に最初にデータをプルする際、キャッシュがリフレッシュされます。リトライ回数がこのオプションの値を超えると、デプロイメントの実行に失敗します。
INTEGER
いいえ
10
該当なし。
データ型のマッピング
MaxCompute がサポートするデータ型の詳細については、「MaxCompute データ型システム バージョン 2.0」をご参照ください。
MaxCompute のデータ型 | Realtime Compute for Apache Flink のデータ型 |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INTEGER |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL(precision, scale) | DECIMAL(precision, scale) |
CHAR(n) | CHAR(n) |
VARCHAR(n) | VARCHAR(n) |
STRING | STRING |
BINARY | BYTES |
DATE | DATE |
DATETIME | TIMESTAMP(3) |
TIMESTAMP | TIMESTAMP(9) |
TIMESTAMP_NTZ | TIMESTAMP(9) |
ARRAY | ARRAY |
MAP | MAP |
STRUCT | ROW |
JSON | STRING |
MaxCompute 物理テーブルにネストされた複合データ型(ARRAY、MAP、STRUCT など)および JSON 型のフィールドが含まれる場合、Realtime Compute for Apache Flink が物理テーブルからデータを読み取り・書き込みできるようにするには、MaxCompute 物理テーブルを作成する際に tblproperties('columnar.nested.type'='true') を指定する必要があります。
Flink CDC(パブリックプレビュー)
MaxCompute コネクタは、YAML ベースのジョブでデータインジェスト sink として使用できます。
VVR エンジンの要件
VVR 11.1 以降
構文
source:
type: xxx
sink:
type: maxcompute
name: MaxComputeSinkaccess-id: ${your_accessId}
access-key: ${your_accessKey}
endpoint: ${your_maxcompute_endpoint}
project: ${your_project}buckets-num: 8構成オプション
オプション | 必須? | デフォルト値 | データ型 | 説明 |
type | はい | デフォルト値なし。 | String | 使用するコネクタ。値を |
name | いいえ | デフォルト値なし。 | String | シンク名。 |
access-id | はい | デフォルト値なし。 | String | お使いの Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID です。[リソースアクセス管理コンソール] で取得してください。 |
access-key | はい | デフォルト値なし。 | String | お客様の AccessKey Secret です。 |
endpoint | はい | デフォルト値なし。 | String | MaxCompute エンドポイント。MaxCompute プロジェクトが配置されているリージョンおよびネットワーク接続方法に基づいて設定してください。「エンドポイント」をご参照ください。 |
project | はい | デフォルト値なし。 | String | MaxCompute プロジェクト名。以下の手順で取得できます:
|
tunnel.endpoint | いいえ | デフォルト値なし。 | String | MaxCompute Tunnel のエンドポイント。通常、このエンドポイントは MaxCompute によって |
quota.name | いいえ | デフォルト値なし。 | String | 専用リソースグループ のクォータ名。このオプションを明示的に指定しない場合、共有リソースグループが使用されます。 |
sts-token | いいえ | デフォルト値なし。 | String | RAM ロールの STS トークン。RAM ロールを使用して MaxCompute にアクセスする場合、身分認証のためにこのオプションが必要です。 |
buckets-num | いいえ | 16 | Integer | 自動作成される MaxCompute Delta テーブルのバケット数。「ニアリアルタイムデータウェアハウス」をご参照ください。 |
compress.algorithm | いいえ | zlib | String | データ圧縮アルゴリズム。有効な値:
|
total.buffer-size | いいえ | 64 MB | String | メモリ内バッファーのサイズ。パーティションテーブルの場合、このバッファーはパーティションレベルで適用されます。非パーティション化テーブルの場合、テーブルレベルで適用されます。異なるパーティションまたはテーブルのバッファーは独立しています。バッファーが容量に達すると、そのデータは MaxCompute へフラッシュされます。 |
bucket.buffer-size | いいえ | 4 MB | String | バケットのメモリ内バッファーのサイズ。このオプションは、MaxCompute Delta テーブルへデータを書き込む場合にのみ適用されます。異なるバケットのバッファーは独立しています。バッファーが容量に達すると、そのデータは MaxCompute へフラッシュされます。 |
commit.thread-num | いいえ | 16 | Integer | チェックポイント中における、同時にコミット可能な最大パーティション数またはテーブル数。 |
flush.concurrent-num | いいえ | 4 | Integer | Flink が同時にフラッシュできる最大バケット数を指定します。このオプションは、MaxCompute Delta テーブルへデータを書き込む場合にのみ適用されます。 |
テーブル位置のマッピング
コネクタが MaxCompute で自動テーブル作成をトリガーする場合、位置は次のようにマッピングされます:
MaxCompute プロジェクトでスキーマ機能が無効になっている場合、コネクタは tableId.namespace を無視します。この場合、単一のデータベースまたはその論理的な同等物のみが MaxCompute へ取り込まれます。たとえば、MySQL から MaxCompute へデータを取り込む場合、1 つの MySQL データベースのみが取り込まれます。
MySQL の位置 | Flink CDC の概要 | MaxCompute の位置 |
該当なし | 構成ファイルのプロジェクト | プロジェクト |
データベース | TableId.namespace | スキーマ 説明 MaxCompute プロジェクトでスキーマが無効になっている場合、この設定は無視されます。 |
テーブル | TableId.tableName | テーブル |
データ型のマッピング
Flink CDC 型 | MaxCompute 型 |
CHAR | STRING |
VARCHAR | STRING |
BOOLEAN | BOOLEAN |
BINARY/VARBINARY | BINARY |
DECIMAL | DECIMAL |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INTEGER | INTEGER |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
TIME_WITHOUT_TIME_ZONE | STRING |
DATE | DATE |
TIMESTAMP_WITHOUT_TIME_ZONE | TIMESTAMP_NTZ |
TIMESTAMP_WITH_LOCAL_TIME_ZONE(Precision>3) | TIMESTAMP |
TIMESTAMP_WITH_LOCAL_TIME_ZONE(Precision<=3) | DATETIME |
TIMESTAMP_WITH_TIME_ZONE(Precision>3) | TIMESTAMP |
TIMESTAMP_WITH_TIME_ZONE(Precision<=3) | DATETIME |
ARRAY | ARRAY |
MAP | MAP |
ROW | STRUCT |
使用例
SQL API
ソーステーブル
特定パーティションの全データ読み取り
partition オプションで指定されたパーティションのすべてのデータを読み取ります。
CREATE TEMPORARY TABLE odps_source (
cid VARCHAR,
rt DOUBLE
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpointName>',
'project' = '<yourProjectName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'partition' = 'ds=201809*'
);
CREATE TEMPORARY TABLE blackhole_sink (
cid VARCHAR,
invoke_count BIGINT
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink
SELECT
cid,
COUNT(*) AS invoke_count
FROM odps_source GROUP BY cid;増分データの読み取り
startPartition で指定されたパーティションからデータの読み取りを開始し、新しいレコードを継続的に監視します。
CREATE TEMPORARY TABLE odps_source (
cid VARCHAR,
rt DOUBLE
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpointName>',
'project' = '<yourProjectName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'startPartition' = 'yyyy=2018,MM=09,dd=05' -- 20180905 パーティションから読み取りを開始します。
);
CREATE TEMPORARY TABLE blackhole_sink (
cid VARCHAR,
invoke_count BIGINT
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink
SELECT cid, COUNT(*) AS invoke_count
FROM odps_source GROUP BY cid;結果テーブル
静的パーティションへの書き込み
partition オプションで指定されたパーティションに書き込みます:
CREATE TEMPORARY TABLE datagen_source (
id INT,
len INT,
content VARCHAR
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE odps_sink (
id INT,
len INT,
content VARCHAR
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpoint>',
'project' = '<yourProjectName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'partition' = 'ds=20180905' -- データはパーティション 20180905 に書き込まれます。
);
INSERT INTO odps_sink
SELECT
id, len, content
FROM datagen_source;動的パーティションへの書き込み
partition オプションに基づいて動的にパーティションにデータを書き込みます:
CREATE TEMPORARY TABLE datagen_source (
id INT,
len INT,
content VARCHAR,
c TIMESTAMP
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE odps_sink (
id INT,
len INT,
content VARCHAR,
ds VARCHAR -- 動的パーティション列を明示的に指定します。
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpoint>',
'project' = '<yourProjectName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'partition' = 'ds' -- ds の値を指定しません。データは ds フィールドの値に基づいて異なるパーティションに書き込まれます。
);
INSERT INTO odps_sink
SELECT
id,
len,
content,
DATE_FORMAT(c, 'yyMMdd') as ds
FROM datagen_source;ディメンションテーブル
単一値キー
各キーが一意の値を持つ場合にプライマリキーを指定します:
CREATE TEMPORARY TABLE datagen_source (
k INT,
v VARCHAR
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE odps_dim (
k INT,
v VARCHAR,
PRIMARY KEY (k) NOT ENFORCED -- プライマリキーを指定します。
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpoint>',
'project' = '<yourProjectName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'partition' = 'ds=20180905',
'cache' = 'ALL'
);
CREATE TEMPORARY TABLE blackhole_sink (
k VARCHAR,
v1 VARCHAR,
v2 VARCHAR
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink
SELECT k, s.v, d.v
FROM datagen_source AS s
INNER JOIN odps_dim FOR SYSTEM_TIME AS OF PROCTIME() AS d ON s.k = d.k;複数値キー
キーが複数の値を持つ可能性がある場合、プライマリキーを指定しません:
CREATE TEMPORARY TABLE datagen_source (
k INT,
v VARCHAR
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE odps_dim (
k INT,
v VARCHAR
-- プライマリキーの指定は必須ではありません。
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpoint>',
'project' = '<yourProjectName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'partition' = 'ds=20180905',
'cache' = 'ALL'
);
CREATE TEMPORARY TABLE blackhole_sink (
k VARCHAR,
v1 VARCHAR,
v2 VARCHAR
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink
SELECT k, s.v, d.v
FROM datagen_source AS s
INNER JOIN odps_dim FOR SYSTEM_TIME AS OF PROCTIME() AS d ON s.k = d.k;DataStream API
DataStream API を呼び出してデータを読み書きする場合は、関連するタイプの DataStream コネクタを使用する必要があります。DataStream コネクタの設定方法の詳細については、「DataStream コネクタの統合」をご参照ください。
知的財産保護のため、VVR 6.0.6 以降では、MaxCompute コネクタを使用した DataStream プログラムのオンプレミスでのデバッグは最大 30 分間に制限されています。これを超えるデバッグセッションは、プログラムがエラーで終了します。詳細については、「コネクタのローカルデバッグ」をご参照ください。
MaxCompute Delta テーブルからのデータ読み取りはサポートされていません。Delta テーブルは、指定された
primary keyとプロパティtransactional=trueで作成されたテーブルです。詳細については、「用語」をご参照ください。
MaxCompute DataStream コネクタを使用する場合、SQL 文を使用して MaxCompute テーブルを宣言することを推奨します。Table API オペレーションを呼び出して MaxCompute テーブルにアクセスしたり、DataStream API オペレーションを呼び出してデータストリームにアクセスしたりできます。
ソーステーブルへの接続
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql(String.join(
"\n",
"CREATE TEMPORARY TABLE IF NOT EXISTS odps_source (",
" cid VARCHAR,",
" rt DOUBLE",
") WITH (",
" 'connector' = 'odps',",
" 'endpoint' = '<yourEndpointName>',",
" 'project' = '<yourProjectName>',",
" 'tableName' = '<yourTableName>',",
" 'accessId' = '<yourAccessId>',",
" 'accessKey' = '<yourAccessPassword>',",
" 'partition' = 'ds=201809*'",
")");
DataStream<Row> source = tEnv.toDataStream(tEnv.from("odps_source"));
source.print();
env.execute("odps source"); シンクに接続
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql(String.join(
"\n",
"CREATE TEMPORARY TABLE IF NOT EXISTS odps_sink (",
" cid VARCHAR,",
" rt DOUBLE",
") WITH (",
" 'connector' = 'odps',",
" 'endpoint' = '<yourEndpointName>',",
" 'project' = '<yourProjectName>',",
" 'tableName' = '<yourTableName>',",
" 'accessId' = '<yourAccessId>',",
" 'accessKey' = '<yourAccessPassword>',",
" 'partition' = 'ds=20180905'",
")");
DataStream<Row> data = env.fromElements(
Row.of("id0", 3.),
Row.of("id1", 4.));
tEnv.fromDataStream(data).insertInto("odps_sink").execute();XML
MaxCompute コネクタの Maven 依存関係には、完全ソース、増分ソース、結果テーブル、ディメンションテーブルを作成するために必要なクラスが含まれています。さまざまなバージョンの MaxCompute DataStream コネクタは、Maven 中央リポジトリに格納されています。
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-odps</artifactId>
<version>${vvr-version}</version>
</dependency>