すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:ビッグデータコンピューティングサービス MaxCompute

最終更新日:Jan 23, 2026

このトピックでは、MaxCompute コネクタの構文、WITH パラメーター、および使用例について説明します。

背景情報

MaxCompute (旧称 ODPS) は、エクサバイト規模のデータに対応する、高速でフルマネージド型のデータウェアハウスソリューションです。大量の構造化データを保存および計算し、データウェアハウスソリューション、分析、モデリングサービスを提供します。

MaxCompute コネクタは、以下の特徴をサポートしています。

カテゴリ

詳細

サポートされている種類

ソーステーブル、ディメンションテーブル、シンクテーブル

実行モード

ストリームモードとバッチモード

データフォーマット

サポートされていません

特定のモニタリングメトリック

モニタリングメトリック

  • ソーステーブル

    numRecordsIn

    numRecordsInPerSecond

    numBytesIn

    numBytesInPerSecond

  • シンクテーブル

    numRecordsOut

    numRecordsOutPerSecond

    numBytesOut

    numBytesOutPerSecond

  • ディメンションテーブル

    dim.odps.cacheSize

説明

メトリックの詳細については、「メトリックの説明」をご参照ください。

API タイプ

DataStream と SQL

シンクテーブルでのデータ更新または削除のサポート

Batch Tunnel および Stream Tunnel モードはデータの挿入のみをサポートします。Upsert Tunnel モードはデータの挿入、更新、削除をサポートします。

前提条件

MaxCompute テーブルを作成済みであること。詳細については、「テーブルの作成」をご参照ください。

制限事項

  • MaxCompute コネクタは at-least-once セマンティクスのみをサポートします。

    説明

    at-least-once セマンティクスはデータが失われないことを保証します。ただし、まれに重複データが MaxCompute に書き込まれることがあります。この可能性は MaxCompute Tunnel のタイプによって異なります。MaxCompute Tunnel の詳細については、「どのデータトンネルを選択すべきか?」をご参照ください。

  • デフォルトでは、ソーステーブルは完全データモードです。`partition` パラメーターで指定されたパーティションのみを読み取ります。すべてのデータが読み取られると、ジョブは終了します。コネクタは新しいパーティションを監視しません。

    新しいパーティションを継続的に監視するには、WITH 句で `startPartition` パラメーターを指定して、増分ソーステーブルモードを使用できます。

    説明
    • ディメンションテーブルは更新のたびに最新のパーティションをチェックするため、この制限の対象外です。

    • ソーステーブルのジョブが開始された後、ジョブはパーティションに追加された新しいデータを読み取りません。パーティションデータが完全になった後にのみジョブを実行する必要があります。

SQL

MaxCompute コネクタは、SQL ジョブでソーステーブル、ディメンションテーブル、またはシンクテーブルとして使用できます。

構文

CREATE TEMPORARY TABLE odps_source(
  id INT,
  user_name VARCHAR,
  content VARCHAR
) WITH (
  'connector' = 'odps', 
  'endpoint' = '<yourEndpoint>',
  'project' = '<yourProjectName>',
  'schemaName' = '<yourSchemaName>',
  'tableName' = '<yourTableName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'partition' = 'ds=2018****'
);

WITH パラメーター

一般

パラメーター

説明

データ型

必須

デフォルト値

備考

connector

テーブルのタイプ。

String

はい

なし

値は odps である必要があります。

endpoint

MaxCompute サービスのエンドポイント。

String

はい

なし

詳細については、「エンドポイント」をご参照ください。

tunnelEndpoint

MaxCompute Tunnel サービスのエンドポイント。

String

いいえ

なし

詳細については、「エンドポイント」をご参照ください。

説明

このパラメーターを指定しない場合、MaxCompute は内部のロードバランシングサービスに基づいてトンネル接続を割り当てます。

project

MaxCompute プロジェクトの名前。

String

はい

なし

なし。

schemaName

MaxCompute スキーマの名前。

String

いいえ

なし

このパラメーターは、MaxCompute プロジェクトでスキーマ機能が有効になっている場合にのみ必須です。このパラメーターを、MaxCompute テーブルが属するスキーマの名前に設定します。詳細については、「スキーマ操作」をご参照ください。

説明

このパラメーターは VVR 8.0.6 以降でのみサポートされています。

tableName

MaxCompute テーブルの名前。

String

はい

なし

なし。

accessId

MaxCompute の AccessKey ID。

String

はい

なし

詳細については、「AccessKey ID と AccessKey Secret を表示するにはどうすればよいですか?」をご参照ください。

重要

AccessKey 情報の漏洩を防ぐため、変数を使用して AccessKey の値を指定してください。詳細については、「プロジェクト変数」をご参照ください。

accessKey

MaxCompute の AccessKey Secret。

String

はい

なし

partition

MaxCompute パーティションの名前。

String

いいえ

なし

このパラメーターは、非パーティションテーブルおよび増分ソーステーブルには必須ではありません。

説明

パーティションテーブルの詳細については、「パーティションへの読み書き時に partition パラメーターを指定するにはどうすればよいですか?」をご参照ください。

compressAlgorithm

MaxCompute Tunnel で使用される圧縮アルゴリズム。

String

いいえ

SNAPPY

有効な値:

  • RAW (圧縮なし)

  • ZLIB

  • SNAPPY

    SNAPPY は ZLIB よりも大幅に高いスループットを提供します。テストシナリオでは、スループットが約 50% 向上します。

quotaName

MaxCompute Data Transmission Service の専用リソースのクォータ名。

String

いいえ

なし

専用の MaxCompute Data Transmission Service を使用するには、このパラメーターを設定します。

重要
  • このパラメーターは VVR 8.0.3 以降でのみサポートされています。

  • このパラメーターを設定する場合は、`tunnelEndpoint` パラメーターを削除する必要があります。そうしないと、`tunnelEndpoint` で指定されたデータトンネルが引き続き使用されます。

ソーステーブル固有

パラメーター

説明

データ型

必須

デフォルト値

備考

maxPartitionCount

読み取り可能なパーティションの最大数。

Integer

いいえ

100

読み取るパーティションの数がこの値を超えると、エラー The number of matched partitions exceeds the default limit が発生します。

重要

一度に多くのパーティションを読み取ると、MaxCompute の負荷が増加し、ジョブの起動が遅くなります。`partition` パラメーターが誤って設定されていないか確認してください。多くのパーティションを読み取るには、`maxPartitionCount` の値を手動で増やしてください。

useArrow

データの読み取りに Arrow フォーマットを使用するかどうかを指定します。

Boolean

いいえ

false

Arrow フォーマットを使用すると、MaxCompute Storage API を呼び出すことができます。

重要
  • このパラメーターはバッチジョブでのみ有効です。

  • このパラメーターは VVR 8.0.8 以降でのみサポートされています。

splitSize

Arrow フォーマットでデータを読み取る際に一度にプルするデータのサイズ。

MemorySize

いいえ

256 MB

このパラメーターは VVR 8.0.8 以降でのみサポートされています。

重要

このパラメーターはバッチジョブでのみ有効です。

compressCodec

Arrow フォーマットでデータを読み取る際に使用する圧縮アルゴリズム。

String

いいえ

""

有効な値:

  • "" (圧縮なし)

  • ZSTD

  • LZ4_FRAME

圧縮アルゴリズムを指定すると、圧縮なしの場合と比較してスループットが向上します。

重要
  • このパラメーターはバッチジョブでのみ有効です。

  • このパラメーターは VVR 8.0.8 以降でのみサポートされています。

dynamicLoadBalance

動的シャーディングを許可するかどうかを指定します。

Boolean

いいえ

false

有効な値:

  • true: 許可

  • false: 操作は許可されません。

動的シャーディングを許可すると、異なる Flink ノードの処理能力を活用でき、ソーステーブルの全体的な読み取り時間が短縮されます。ただし、異なるノードで読み取られるデータ量にばらつきが生じ、データスキューが発生する可能性があります。

重要
  • このパラメーターはバッチジョブでのみ有効です。

  • このパラメーターは VVR 8.0.8 以降でのみサポートされています。

増分ソーステーブル固有

増分ソーステーブルは、MaxCompute サーバーに定期的にポーリングしてすべてのパーティション情報を取得することで、新しいパーティションを検出します。コネクタが新しいパーティションを読み取る際、そのパーティションのデータは完全である必要があります。詳細については、「増分 MaxCompute ソーステーブルが書き込み中の新しいパーティションを検出した場合、どうすればよいですか?」をご参照ください。`startPartition` を使用して開始オフセットを指定できます。開始オフセットと辞書順で等しいか、それ以降のパーティションのみが読み取られることに注意してください。たとえば、パーティション year=2023,month=10 は、パーティション year=2023,month=9 よりも辞書順で小さくなります。このタイプのパーティション宣言では、正しい辞書順を確保するために値をゼロでパディングすることができます。例:year=2023,month=09

パラメーター

説明

データ型

必須

デフォルト値

備考

startPartition

増分読み取りを開始する MaxCompute パーティション (そのパーティションを含む)。

String

はい

なし

  • このパラメーターを使用すると、増分ソーステーブルモードが有効になり、`partition` パラメーターは無視されます。

  • 複数レベルのパーティションの場合、最上位から最下位までの各パーティションキー列の値を宣言する必要があります。

説明

`startPartition` パラメーターの詳細については、「増分 MaxCompute ソーステーブルの startPartition パラメーターを設定するにはどうすればよいですか?」をご参照ください。

subscribeIntervalInSec

MaxCompute にパーティションリストをポーリングする間隔。

Integer

いいえ

30

単位は秒です。

modifiedTableOperation

読み取り中にパーティションデータが変更された場合の対処法。

Enum (NONE, SKIP)

いいえ

NONE

ダウンロードセッションはチェックポイントに保存されます。ジョブがチェックポイントから回復するたびに、そのセッションからの読み取りを再開しようとします。パーティションデータが変更されたためにセッションが利用できない場合、Flink ジョブは再起動ループに入ります。この状況に対処するために、このパラメーターを設定できます:

  • NONE: `startPartition` パラメーターを利用できないパーティションより大きい値に変更し、ステートなしでジョブを開始する必要があります。

  • SKIP: ステートなしでジョブを開始したくない場合は、モードを SKIP に設定します。Flink がチェックポイントからセッションを回復しようとすると、利用できないパーティションをスキップします。

重要
  • このパラメーターは VVR 8.0.3 以降でのみサポートされています。

  • NONE と SKIP の両方のモードで、変更されたパーティションから既に読み取られたデータは取り消されず、未読のデータは読み取られません。

シンクテーブルのパラメーター

パラメーター

説明

データ型

必須

デフォルト値

備考

useStreamTunnel

データのアップロードに MaxCompute Stream Tunnel を使用するかどうかを指定します。

Boolean

いいえ

false

有効な値:

  • true: データのアップロードに MaxCompute Stream Tunnel を使用します。

  • false: データのアップロードに MaxCompute Batch Tunnel を使用します。

説明

データトンネルの選択に関する詳細については、「どのデータトンネルを選択すべきか?」をご参照ください。

flushIntervalMs

MaxCompute Tunnel Writer バッファーのフラッシュ間隔。

Long

いいえ

30000 (30 秒)

データはまずバッファーに書き込まれます。バッファーがいっぱいになるか、`flushIntervalMs` の間隔に達すると、データは宛先テーブルにバッチで書き込まれます。

  • Stream Tunnel: データはフラッシュ後すぐに表示されます。

  • Batch Tunnel: データはチェックポイントが完了した後にのみ表示されます。遅延を避けるために、`flushIntervalMs` を 0 に設定して時間指定のフラッシュを無効にします。

単位はミリ秒です。

説明

このパラメーターは `batchSize` と一緒に使用できます。どちらかの条件が満たされるとデータがフラッシュされます。

batchSize

MaxCompute Tunnel Writer バッファーのフラッシュサイズ。

Long

いいえ

67108864 (64 MB)

単位はバイトです。

レコードが書き込まれると、データはまず MaxCompute バッファーに保存されます。バッファーが特定のサイズ (`batchSize`) に達すると、バッファー内のデータは宛先の MaxCompute テーブルに書き込まれます。

説明

このパラメーターは `flushIntervalMs` と一緒に使用できます。どちらかの条件が満たされるとデータがフラッシュされます。

numFlushThreads

MaxCompute Tunnel Writer バッファーをフラッシュするためのスレッド数。

Integer

いいえ

1

各 MaxCompute シンクの並列度は、データをフラッシュするために `numFlushThreads` 個のスレッドを作成します。この値が 1 より大きい場合、異なるパーティションのデータを同時にフラッシュでき、フラッシュ効率が向上します。

slotNum

MaxCompute Tunnel Writer が使用するスロット数。

Integer

いいえ

0

スロット数の制限については、「Data Transmission Service の概要」をご参照ください。

dynamicPartitionLimit

書き込む動的パーティションの最大数。

Integer

いいえ

100

2 つのチェックポイント間でシンクテーブルに書き込まれる動的パーティションの数が `dynamicPartitionLimit` を超えると、エラー Too many dynamic partitions が発生します。

重要

一度に多くのパーティションに書き込むと、MaxCompute サービスに負荷がかかり、シンクテーブルのフラッシュやジョブのチェックポイントが遅くなる可能性があります。このエラーが発生した場合は、本当に多くのパーティションに書き込む必要があるか確認してください。必要な場合は、`dynamicPartitionLimit` の値を手動で増やしてください。

retryTimes

MaxCompute サーバーへのリクエストの最大再試行回数。

Integer

いいえ

3

セッションの作成、セッションのコミット、またはデータのフラッシュ時に、MaxCompute サービスが一時的に利用できなくなることがあります。システムはこの設定に基づいて再試行します。

sleepMillis

再試行間隔。

Integer

いいえ

1000

単位はミリ秒です。

enableUpsert

データのアップロードに MaxCompute Upsert Tunnel を使用するかどうかを指定します。

Boolean

いいえ

false

有効な値:

  • true: Upsert Tunnel を使用して、Flink からの INSERT、UPDATE_AFTER、DELETE データを処理します。

  • false: useStreamTunnel パラメーターに基づいて Batch Tunnel または Stream Tunnel を使用して、Flink からの INSERT および UPDATE_AFTER データを処理します。

重要
  • Upsert モードの MaxCompute シンクがデータのコミット時にエラー、失敗、または長時間の遅延が発生した場合、シンクノードの並列度を 10 以下に制限してください。

  • このパラメーターは VVR 8.0.6 以降でのみサポートされています。

upsertAsyncCommit

Upsert モードでセッションをコミットする際に非同期モードを使用するかどうかを指定します。

Boolean

いいえ

false

有効な値:

  • true: 非同期モードを使用します。コミットにかかる時間は短くなりますが、書き込まれたデータは完了後すぐに読み取り可能にはなりません。

  • false: デフォルトで同期モードを使用します。システムは、サーバー側がセッションを処理するのを待ってからコミットします。

説明

このパラメーターは VVR 8.0.6 以降でのみサポートされています。

upsertCommitTimeoutMs

Upsert モードでセッションをコミットするためのタイムアウト期間。

Integer

いいえ

120000

(120 秒)

単位はミリ秒です。

説明

このパラメーターは VVR 8.0.6 以降でのみサポートされています。

sink.operation

Delta Lake テーブルに書き込む際の書き込みモード。

String

いいえ

insert

有効な値:

  • insert: データを追加します。

  • upsert: データを更新します。

説明

このパラメーターは VVR 8.0.10 以降でのみサポートされています。

sink.parallelism

Delta Lake テーブルへの書き込みの並列度。

Integer

いいえ

なし

  • 書き込みの並列度。設定されていない場合、デフォルトでアップストリームのデータ並列度が使用されます。

  • このパラメーターは VVR 8.0.10 以降でのみサポートされています。

重要

Delta Lake テーブルの `write.bucket.num` プロパティがこの設定値の整数倍であることを確認してください。これにより、最高の書き込みパフォーマンスが提供され、シンクノードのメモリを最も節約できます。

sink.file-cached.enable

Delta Lake テーブルの動的パーティションに書き込む際にファイルキャッシュモードを使用するかどうかを指定します。

Boolean

いいえ

false

有効な値:

  • true: ファイルキャッシュモードを使用します。

  • false: ファイルキャッシュモードを使用しません。

ファイルキャッシュモードを使用すると、サーバー側に書き込まれる小さなファイルの数が減りますが、データ出力のレイテンシが増加します。シンクテーブルの並列度が高い場合は、ファイルキャッシュモードを使用してください。

説明

このパラメーターは VVR 8.0.10 以降でのみサポートされています。

sink.file-cached.writer.num

ファイルキャッシュモードでの単一タスクの同時データアップロード数。

Integer

いいえ

16

  • このパラメーターは sink.file-cached.enable が true に設定されている場合にのみ有効です。

  • この値を大幅に増やさないでください。同時に多くのパーティションに書き込むと、Out-of-Memory (OOM) エラーが発生しやすくなります。

説明

このパラメーターは VVR 8.0.10 以降でのみサポートされています。

sink.bucket.check-interval

ファイルキャッシュモードでのファイルサイズチェックの間隔。単位はミリ秒 (ms) です。

Integer

いいえ

60000

このパラメーターは sink.file-cached.enable が true に設定されている場合にのみ有効です。

説明

このパラメーターは VVR 8.0.10 以降でのみサポートされています。

sink.file-cached.rolling.max-size

ファイルキャッシュモードでの単一キャッシュファイルの最大サイズ。

MemorySize

いいえ

16 M

  • このパラメーターは sink.file-cached.enable が true に設定されている場合にのみ有効です。

  • ファイルサイズがこの値を超えると、ファイルデータはサーバー側にアップロードされます。

説明

このパラメーターは VVR 8.0.10 以降でのみサポートされています。

sink.file-cached.memory

ファイルキャッシュモードでファイルを書き込むために使用される最大オフヒープメモリサイズ。

MemorySize

いいえ

64 MB

このパラメーターは sink.file-cached.enable が true に設定されている場合にのみ有効です。

説明

このパラメーターは VVR 8.0.10 以降でのみサポートされています。

sink.file-cached.memory.segment-size

ファイルキャッシュモードでファイルを書き込むために使用されるバッファーサイズ。

MemorySize

いいえ

128 KB

このパラメーターは sink.file-cached.enable が true に設定されている場合にのみ有効です。

説明

このパラメーターは VVR 8.0.10 以降でのみサポートされています。

sink.file-cached.flush.always

ファイルキャッシュモードでファイルを書き込む際にキャッシュを使用するかどうかを指定します。

Boolean

いいえ

true

このパラメーターは sink.file-cached.enable が true に設定されている場合にのみ有効です。

説明

このパラメーターは VVR 8.0.10 以降でのみサポートされています。

sink.file-cached.write.max-retries

ファイルキャッシュモードでのデータアップロードの再試行回数。

Integer

いいえ

3

このパラメーターは sink.file-cached.enable が true に設定されている場合にのみ有効です。

説明

このパラメーターは VVR 8.0.10 以降でのみサポートされています。

upsert.writer.max-retries

Upsert Writer がバケットへの書き込みに失敗した後の再試行回数。

Integer

いいえ

3

説明

このパラメーターは VVR 8.0.10 以降でのみサポートされています。

upsert.writer.buffer-size

Flink 内の単一の Upsert Writer のキャッシュサイズ。

MemorySize

いいえ

64 m

  • すべてのバケットの合計バッファーサイズが事前設定されたしきい値に達すると、システムは自動的にフラッシュ操作をトリガーしてデータをサーバー側に更新します。

  • 単一の upsert writer は同時に複数のバケットに書き込みます。この値を増やすと書き込み効率が向上します。

  • 多くのパーティションが書き込まれる場合、OOM エラーのリスクがあります。このパラメーター値を減らすことを検討してください。

説明

このパラメーターは VVR 8.0.10 以降でのみサポートされています。

upsert.writer.bucket.buffer-size

Flink 内の単一バケットのキャッシュサイズ。

MemorySize

いいえ

1 m

クラスターのメモリリソースが逼迫している場合は、このパラメーター値を減らすことができます。

説明

このパラメーターは VVR 8.0.10 以降でのみサポートされています。

upsert.write.bucket.num

書き込み対象のテーブル内のバケット数。

Integer

はい

なし

この値は、書き込み対象の Delta Lake テーブルの write.bucket.num プロパティの値と同じである必要があります。

説明

このパラメーターは VVR 8.0.10 以降でのみサポートされています。

upsert.write.slot-num

単一セッションで使用される Tunnel スロットの数。

Integer

いいえ

1

説明

このパラメーターは VVR 8.0.10 以降でのみサポートされています。

upsert.commit.max-retries

Upsert セッションコミットの再試行回数。

Integer

いいえ

3

説明

このパラメーターは VVR 8.0.10 以降でのみサポートされています。

upsert.commit.thread-num

Upsert セッションコミットの並列度。

Integer

いいえ

16

このパラメーターを非常に大きな値に設定しないでください。同時コミット数が多いほど多くのリソースを消費し、パフォーマンスの問題や過剰なリソース消費につながる可能性があります。

説明

このパラメーターは VVR 8.0.10 以降でのみサポートされています。

upsert.commit.timeout

Upsert セッションコミットのタイムアウト期間。単位は秒 (s) です。

Integer

いいえ

600

説明

このパラメーターは VVR 8.0.10 以降でのみサポートされています。

upsert.flush.concurrent

単一パーティションに対して同時に書き込み可能なバケットの最大数。

Integer

いいえ

2

バケット内のデータがフラッシュされるたびに、Tunnel Slot リソースを消費します。

説明

このパラメーターは VVR 8.0.10 以降でのみサポートされています。

insert.commit.thread-num

コミットセッションの並列度。

Integer

いいえ

16

説明

このパラメーターは VVR 8.0.10 以降でのみサポートされています。

insert.arrow-writer.enable

Arrow フォーマットを使用するかどうかを指定します。

Boolean

いいえ

false

有効な値:

  • true: Arrow モードを使用します。

  • false: Arrow モードを使用しません。

説明

このパラメーターは VVR 8.0.10 以降でのみサポートされています。

insert.arrow-writer.batch-size

Arrow バッチ内の最大行数。

Integer

いいえ

512

説明

このパラメーターは VVR 8.0.10 以降でのみサポートされています。

insert.arrow-writer.flush-interval

Writer のフラッシュ間隔。単位はミリ秒 (ms) です。

Integer

いいえ

100000

説明

このパラメーターは VVR 8.0.10 以降でのみサポートされています。

insert.writer.buffer-size

Buffered Writer のキャッシュサイズ。

MemorySize

いいえ

64 MB

説明

このパラメーターは VVR 8.0.10 以降でのみサポートされています。

upsert.partial-column.enable

一部の列のみを更新するかどうかを指定します。

Boolean

いいえ

false

このパラメーターは、シンクテーブルが Delta Lake テーブルの場合にのみ有効です。詳細については、「部分的な列の更新」をご参照ください。

有効な値:

  • true: 一部の列のみを更新します。

  • false: すべての列を更新します。

データは、更新データのプライマリキーがシンクテーブルに存在するかどうかに応じて、異なる方法で書き込まれます:

  • シンクテーブルに同じプライマリキーを持つデータが存在する場合、レコードはそのプライマリキーによって更新されます。更新には、指定された列のうち null でない値が使用されます。

  • シンクテーブルに同じプライマリキーを持つデータが存在しない場合、プライマリキーを持つ新しいレコードが追加されます。指定された列の値が挿入され、他のすべての列には null が挿入されます。

説明

このパラメーターは VVR 8.0.11 以降でのみサポートされています。

ディメンションテーブル固有

ジョブが開始されると、MaxCompute ディメンションテーブルは指定されたパーティションから完全なデータをプルします。`partition` パラメーターは `max_pt()` などの関数をサポートします。キャッシュの有効期限が切れて再読み込みされると、`partition` パラメーターが再解析され、最新のパーティションからデータがプルされます。`max_two_pt()` を使用する場合、ディメンションテーブルは 2 つのパーティションからデータをプルできます。それ以外の場合は、単一のパーティションのみを指定できます。

パラメーター

説明

データ型

必須

デフォルト値

備考

cache

キャッシュポリシー。

String

はい

なし

現在、MaxCompute ディメンションテーブルは ALL ポリシーのみをサポートしており、明示的に宣言する必要があります。このポリシーは、リモートテーブルが小さく、多くの MISS KEY (ソーステーブルとディメンションテーブルの結合中に ON 条件がデータの関連付けに失敗する) があるシナリオに適しています。

ALL: ディメンションテーブルのすべてのデータをキャッシュします。ジョブが実行される前に、システムはディメンションテーブルからすべてのデータをキャッシュにロードします。以降のすべてのディメンションテーブルクエリはキャッシュを介して実行されます。キャッシュにデータが見つからない場合、そのキーは存在しません。キャッシュの有効期限が切れると、完全なキャッシュが再ロードされます。

説明
  • システムはディメンションテーブルのデータを非同期にロードするため、CACHE ALL を使用する場合は、ディメンションテーブル結合ノードのメモリを増やす必要があります。増加させるメモリサイズは、リモートテーブルのデータ量の少なくとも 4 倍にする必要があります。具体的な値は MaxCompute ストレージの圧縮アルゴリズムによって異なります。

  • MaxCompute ディメンションテーブルが大きい場合は、SHUFFLE_HASH ヒントを使用して、ディメンションテーブルのデータをすべての同時実行インスタンスに均等に分散させることを検討してください。詳細については、「ディメンションテーブルに SHUFFLE_HASH ヒントを使用するにはどうすればよいですか?」をご参照ください。

  • 非常に大きな MaxCompute ディメンションテーブルを使用している場合、頻繁な JVM のガベージコレクション (GC) がジョブの例外を引き起こし、ディメンションテーブル結合ノードのメモリを増やしても解決しない場合は、ApsaraDB for HBase ディメンションテーブルなど、LRU キャッシュポリシーをサポートするキーバリューディメンションテーブルに切り替えることを検討してください。

cacheSize

キャッシュするデータレコードの最大数。

Long

いいえ

100000

ディメンションテーブルのデータ量が `cacheSize` を超えると、エラー Row count of table <table-name> partition <partition-name> exceeds maxRowCount limit が発生します。

重要

大きなディメンションテーブルは多くの JVM ヒープメモリを消費し、ジョブの起動やディメンションテーブルの更新を遅くするため、本当にこの量のデータをキャッシュする必要があるか確認してください。必要な場合は、このパラメーターを手動で増やしてください。

cacheTTLMs

キャッシュのタイムアウト期間、つまりキャッシュの更新間隔。

Long

いいえ

Long.MAX_VALUE (更新しない)

単位はミリ秒です。

cacheReloadTimeBlackList

キャッシュ更新禁止時間。このパラメーターで指定された期間中、キャッシュは更新されません。

String

いいえ

なし

プロモーション中のトラフィックピーク時など、重要な期間中にキャッシュが更新されるのを防ぎ、ジョブの不安定性を回避します。このパラメーターの設定方法については、「cacheReloadTimeBlackList パラメーターを設定するにはどうすればよいですか?」をご参照ください。

maxLoadRetries

キャッシュの更新試行の最大回数。ジョブ起動時の初期データプルを含みます。この回数を超えると、ジョブは失敗します。

Integer

いいえ

10

なし。

データ型のマッピング

MaxCompute がサポートするデータ型の詳細については、「データ型 (2.0)」をご参照ください。

MaxCompute 型

Flink 型

BOOLEAN

BOOLEAN

TINYINT

TINYINT

SMALLINT

SMALLINT

INT

INTEGER

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

DECIMAL(precision, scale)

DECIMAL(precision, scale)

CHAR(n)

CHAR(n)

VARCHAR(n)

VARCHAR(n)

STRING

STRING

BINARY

BYTES

DATE

DATE

DATETIME

TIMESTAMP(3)

TIMESTAMP

TIMESTAMP(9)

TIMESTAMP_NTZ

TIMESTAMP(9)

ARRAY

ARRAY

MAP

MAP

STRUCT

ROW

JSON

STRING

重要

MaxCompute 物理テーブルにネストされた複合型フィールド (ARRAY、MAP、または STRUCT) と JSON 型フィールドの両方が含まれている場合、Flink がこれらのフィールドを正しく読み書きできるように、MaxCompute 物理テーブルを作成する際に tblproperties('columnar.nested.type'='true') を指定する必要があります。

データインジェスト (パブリックプレビュー)

MaxCompute コネクタをシンクとして使用し、YAML ベースのデータインジェストジョブでデータを書き込むことができます。

制限事項

この機能は VVR 11.1 以降のバージョンでのみサポートされています。

構文

source:
  type: xxx

sink:
   type: maxcompute
   name: MaxComputeSink
   access-id: ${your_accessId}
   access-key: ${your_accessKey}
   endpoint: ${your_maxcompute_endpoint}
   project: ${your_project}
   buckets-num: 8

設定項目

設定項目

必須

デフォルト値

タイプ

説明

type

はい

なし

String

使用するコネクタを指定します。このパラメーターを maxcompute に設定します。

name

いいえ

なし

String

シンクの名前。

access-id

はい

なし

String

Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID。AccessKey 管理ページで AccessKey ID を取得できます。

access-key

はい

なし

String

AccessKey ID に対応する AccessKey Secret。

endpoint

はい

なし

String

MaxCompute サービスのエンドポイント。MaxCompute プロジェクトを作成する際に選択したリージョンとネットワーク接続タイプに基づいてエンドポイントを設定する必要があります。各リージョンとネットワークタイプのエンドポイントの詳細については、「エンドポイント」をご参照ください。

project

はい

なし

String

MaxCompute プロジェクトの名前です。MaxCompute コンソールにログインし、[ワークスペース] > [プロジェクト管理] ページでプロジェクト名を取得できます。

tunnel.endpoint

いいえ

なし

String

MaxCompute Tunnel サービスのエンドポイント。この設定は通常、指定されたプロジェクトのリージョンに基づいて自動的にルーティングされます。プロキシを使用する場合など、特別なネットワーク環境でのみこの設定を使用してください。

quota.name

いいえ

なし

String

MaxCompute データ転送用の専用リソースグループの名前。この設定を指定しない場合、共有リソースグループが使用されます。詳細については、「Data Transmission Service の専用リソースグループの購入と使用」をご参照ください。

sts-token

いいえ

なし

String

認証に RAM ロールによって発行された短期間の Security Token Service (STS) トークンを使用する場合にこのパラメーターを指定します。

buckets-num

いいえ

16

Integer

MaxCompute Delta Lake テーブルを自動的に作成する際に使用するバケットの数。この機能の使用方法の詳細については、「ニアリアルタイムデータウェアハウスの概要」をご参照ください。

compress.algorithm

いいえ

zlib

String

MaxCompute への書き込み時に使用されるデータ圧縮アルゴリズム。サポートされている値は raw (圧縮なし)、zlib、および snappy です。

total.buffer-size

いいえ

64MB

String

メモリにバッファリングするデータの量 (パーティションレベル、または非パーティションテーブルの場合はテーブルレベル)。異なるパーティション (またはテーブル) のバッファーは独立しています。しきい値に達すると、データは MaxCompute に書き込まれます。

bucket.buffer-size

いいえ

4MB

String

メモリにバッファリングするデータの量 (バケットレベル)。このパラメーターは Delta Lake テーブルへの書き込み時にのみ有効です。異なるデータバケットのバッファーは独立しています。しきい値に達すると、バケット内のデータは MaxCompute に書き込まれます。

commit.thread-num

いいえ

16

Integer

チェックポイントフェーズ中に同時に処理できるパーティション (またはテーブル) の数。

flush.concurrent-num

いいえ

4

Integer

MaxCompute にデータを書き込む際に同時に書き込み可能なバケットの数。このパラメーターは Delta Lake テーブルへの書き込み時にのみ有効です。

テーブルロケーションのマッピング

コネクタがテーブルを自動的に作成する際、ソーステーブルのロケーション情報を次のように MaxCompute テーブルにマッピングします。

重要

MaxCompute プロジェクトがスキーマモデルをサポートしていない場合、各同期タスクは 1 つのデータベースのみを同期できます。たとえば、アップストリームのデータソースが MySQL の場合、同期タスクは 1 つの MySQL データベースのみを同期できます。これは他のデータソースにも適用されます。コネクタは `tableId.namespace` 情報を無視します。

データインジェストジョブのオブジェクト

MaxCompute のロケーション

MySQL のロケーション

設定内の Project パラメーター

プロジェクト

なし

TableId.namespace

スキーマ (この設定は MaxCompute プロジェクトがスキーマモデルをサポートしていない場合は無視されます。)

データベース

TableId.tableName

テーブル

テーブル

型マッピング

CDC 型

MaxCompute 型

CHAR

STRING

VARCHAR

STRING

BOOLEAN

BOOLEAN

BINARY/VARBINARY

BINARY

DECIMAL

DECIMAL

TINYINT

TINYINT

SMALLINT

SMALLINT

INTEGER

INTEGER

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

TIME_WITHOUT_TIME_ZONE

STRING

DATE

DATE

TIMESTAMP_WITHOUT_TIME_ZONE

TIMESTAMP_NTZ

TIMESTAMP_WITH_LOCAL_TIME_ZONE

TIMESTAMP

TIMESTAMP_WITH_TIME_ZONE

TIMESTAMP

ARRAY

ARRAY

MAP

MAP

ROW

STRUCT

SQL

ソーステーブルの例

完全なデータの読み取り

デフォルトでは、ソーステーブルは完全データモードであり、partition パラメーターで指定されたパーティションを読み取ります。

CREATE TEMPORARY TABLE odps_source (
  cid VARCHAR,
  rt DOUBLE
) WITH (
  'connector' = 'odps',
  'endpoint' = '<yourEndpointName>',
  'project' = '<yourProjectName>',
  'tableName' = '<yourTableName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'partition' = 'ds=201809*'
);

CREATE TEMPORARY TABLE blackhole_sink (
  cid VARCHAR,
  invoke_count BIGINT
) WITH (
  'connector' = 'blackhole'
);

INSERT INTO blackhole_sink
SELECT
   cid,
   COUNT(*) AS invoke_count
FROM odps_source GROUP BY cid;
増分データの読み取り

指定された startPartition からデータを増分的に読み取ります。

CREATE TEMPORARY TABLE odps_source (
  cid VARCHAR,
  rt DOUBLE
) WITH (
  'connector' = 'odps',
  'endpoint' = '<yourEndpointName>',
  'project' = '<yourProjectName>',
  'tableName' = '<yourTableName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'startPartition' = 'yyyy=2018,MM=09,dd=05' -- 20180905 のパーティションからデータを読み取ります。
);

CREATE TEMPORARY TABLE blackhole_sink (
  cid VARCHAR,
  invoke_count BIGINT
) WITH (
  'connector' = 'blackhole'
);

INSERT INTO blackhole_sink
SELECT cid, COUNT(*) AS invoke_count
FROM odps_source GROUP BY cid;

シンクテーブルの例

静的パーティションへの書き込み

partition に静的なパーティション値を指定します。

CREATE TEMPORARY TABLE datagen_source (
  id INT,
  len INT,
  content VARCHAR
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE odps_sink (
  id INT,
  len INT,
  content VARCHAR
) WITH (
  'connector' = 'odps',
  'endpoint' = '<yourEndpoint>',
  'project' = '<yourProjectName>',
  'tableName' = '<yourTableName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'partition' = 'ds=20180905' -- 静的パーティション ds=20180905 にデータを書き込みます。
);

INSERT INTO odps_sink
SELECT
  id, len, content
FROM datagen_source;
動的パーティションへの書き込み

テーブルのパーティションキー列に基づいて partition を指定します。

CREATE TEMPORARY TABLE datagen_source (
  id INT,
  len INT,
  content VARCHAR,
  c TIMESTAMP
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE odps_sink (
  id  INT,
  len INT,
  content VARCHAR,
  ds VARCHAR -- 動的パーティション列は明示的に宣言する必要があります。
) WITH (
  'connector' = 'odps',
  'endpoint' = '<yourEndpoint>',
  'project' = '<yourProjectName>',
  'tableName' = '<yourTableName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'partition' = 'ds' -- パーティション値が指定されていない場合、データは ds フィールドの値に基づいて異なるパーティションに書き込まれます。
);

INSERT INTO odps_sink
SELECT
   id,
   len,
   content,
   DATE_FORMAT(c, 'yyMMdd') as ds
FROM datagen_source;

ディメンションテーブルの例

1 対 1 のディメンションテーブル

1 対 1 のディメンションテーブルにはプライマリキーを宣言する必要があります。

CREATE TEMPORARY TABLE datagen_source (
  k INT,
  v VARCHAR
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE odps_dim (
  k INT,
  v VARCHAR,
  PRIMARY KEY (k) NOT ENFORCED  -- 1 対 1 のディメンションテーブルにはプライマリキーを宣言する必要があります。
) WITH (
  'connector' = 'odps',
  'endpoint' = '<yourEndpoint>',
  'project' = '<yourProjectName>',
  'tableName' = '<yourTableName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'partition' = 'ds=20180905',
  'cache' = 'ALL'
);

CREATE TEMPORARY TABLE blackhole_sink (
  k VARCHAR,
  v1 VARCHAR,
  v2 VARCHAR
) WITH (
  'connector' = 'blackhole'
);

INSERT INTO blackhole_sink
SELECT k, s.v, d.v
FROM datagen_source AS s
INNER JOIN odps_dim FOR SYSTEM_TIME AS OF PROCTIME() AS d ON s.k = d.k;
1 対多のディメンションテーブル

1 対多のディメンションテーブルにはプライマリキーは必要ありません。

CREATE TEMPORARY TABLE datagen_source (
  k INT,
  v VARCHAR
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE odps_dim (
  k INT,
  v VARCHAR
  -- 1 対多のディメンションテーブルにはプライマリキーは必要ありません。
) WITH (
  'connector' = 'odps',
  'endpoint' = '<yourEndpoint>',
  'project' = '<yourProjectName>',
  'tableName' = '<yourTableName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'partition' = 'ds=20180905',
  'cache' = 'ALL'
);

CREATE TEMPORARY TABLE blackhole_sink (
  k VARCHAR,
  v1 VARCHAR,
  v2 VARCHAR
) WITH (
  'connector' = 'blackhole'
);

INSERT INTO blackhole_sink
SELECT k, s.v, d.v
FROM datagen_source AS s
INNER JOIN odps_dim FOR SYSTEM_TIME AS OF PROCTIME() AS d ON s.k = d.k;

DataStream

重要
  • DataStream を使用してデータを読み書きするには、対応する DataStream コネクタを使用して Flink に接続する必要があります。DataStream コネクタの設定方法の詳細については、「DataStream コネクタの使用方法」をご参照ください。

  • 知的財産を保護するため、VVR 6.0.6 以降、このコネクタは単一のローカルデバッグジョブの実行時間を 30 分に制限します。30 分が経過すると、ジョブはエラーを報告して終了します。MaxCompute コネクタを含むジョブをローカルで実行およびデバッグする方法の詳細については、「コネクタを含むジョブのローカルでの実行とデバッグ」をご参照ください。

  • このコネクタは、primary keytransactional=true で作成されたテーブルである Delta Lake テーブルからの読み取りをサポートしていません。詳細については、「基本概念」をご参照ください。

DataStream ジョブで MaxCompute コネクタを使用するには、SQL を使用して MaxCompute テーブルを宣言し、その後、Table と DataStream の間で変換を行い、MaxCompute テーブルとデータストリームを接続する必要があります。

ソーステーブルへの接続

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql(String.join(
    "\n",
    "CREATE TEMPORARY TABLE IF NOT EXISTS odps_source (",
    "  cid VARCHAR,",
    "  rt DOUBLE",
    ") WITH (",
    "  'connector' = 'odps',",
    "  'endpoint' = '<yourEndpointName>',",
    "  'project' = '<yourProjectName>',",
    "  'tableName' = '<yourTableName>',",
    "  'accessId' = '<yourAccessId>',",
    "  'accessKey' = '<yourAccessPassword>',",
    "  'partition' = 'ds=201809*'",
    ")");
DataStream<Row> source = tEnv.toDataStream(tEnv.from("odps_source"));
source.print();
env.execute("odps source"); 

結果テーブルへの接続

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql(String.join(
    "\n",
    "CREATE TEMPORARY TABLE IF NOT EXISTS odps_sink (",
    "  cid VARCHAR,",
    "  rt DOUBLE",
    ") WITH (",
    "  'connector' = 'odps',",
    "  'endpoint' = '<yourEndpointName>',",
    "  'project' = '<yourProjectName>',",
    "  'tableName' = '<yourTableName>',",
    "  'accessId' = '<yourAccessId>',",
    "  'accessKey' = '<yourAccessPassword>',",
    "  'partition' = 'ds=20180905'",
    ")");
DataStream<Row> data = env.fromElements(
    Row.of("id0", 3.),
    Row.of("id1", 4.));
tEnv.fromDataStream(data).insertInto("odps_sink").execute();

XML

MaxCompute コネクタの Maven 依存関係には、全量ソーステーブル、増分ソーステーブル、結果テーブル、およびディメンションテーブルをビルドするために必要なクラスが含まれています。MaxCompute DataStream コネクタは Maven セントラルリポジトリで入手できます。

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-odps</artifactId>
    <version>${vvr-version}</version>
</dependency>

よくある質問