すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:MaxCompute コネクタ

最終更新日:Mar 11, 2026

本トピックでは、MaxCompute コネクタの使用方法について説明します。

背景情報

MaxCompute(旧称:ODPS)は、大規模データウェアハウス向けの高速かつフルマネージドなコンピューティングプラットフォームです。MaxCompute はエクサバイト規模のデータを処理できます。また、データウェアハウスにおける大量の構造化データの保存・計算に加え、分析およびモデリングサービスも提供します。

以下の表に、MaxCompute コネクタがサポートする機能を示します。

項目

説明

対応タイプ

ソーステーブル、ディメンションテーブル、結果テーブル

実行モード

ストリーミングモードおよびバッチモード

データフォーマット

該当なし

メトリック

メトリック

  • ソース

    numRecordsIn

    numRecordsInPerSecond

    numBytesIn

    numBytesInPerSecond

  • シンク

    numRecordsOut

    numRecordsOutPerSecond

    numBytesOut

    numBytesOutPerSecond

  • ディメンションテーブル

    dim.odps.cacheSize

説明

詳細については、「モニタリングメトリック」をご参照ください。

API 種別

DataStream API および SQL API

結果テーブルへのデータ更新または削除

MaxCompute Batch Tunnel または MaxCompute Streaming Tunnel を使用する場合、結果テーブルへのデータ挿入のみ可能です。一方、MaxCompute Upsert Tunnel を使用する場合は、結果テーブルへのデータ更新・削除および挿入が可能です。

前提条件

MaxCompute テーブルが作成されています。詳細については、「テーブルの作成」をご参照ください。

制限事項

  • MaxCompute コネクタは、at-least-once セマンティクスのみをサポートします。

    説明

    at-least-once セマンティクスは、データ損失を防止するために使用されます。特定のケースでは、MaxCompute へ重複したデータが書き込まれる可能性があります。重複データの発生は、使用するトンネルの種類によって異なります。「MaxCompute トンネル」の選択方法については、「上流・下流ストレージに関するよくある質問」トピックの「データトンネルの選択方法」セクションをご参照ください。

  • デフォルトでは、ソースがフルモードで動作する場合、partition オプションで指定されたパーティションからデータのみを読み取ります。指定パーティションの全データを読み取ると、ジョブは終了し、新規パーティションの監視は行われません。

    新規パーティションを継続的に監視するには、WITH 句で startPartition オプションを指定して、増分ソースを作成します。

    説明
    • ディメンションテーブルが更新されるたびに、最新のパーティションをチェックします。

    • ソーステーブルの実行開始後、パーティションに新たに追加されたデータは読み取られません。パーティションに完全なデータが含まれている状態でデプロイメントを実行することを推奨します。

SQL

MaxCompute コネクタは、SQL ベースのジョブにおいて、ソーステーブル、ディメンションテーブル、または結果テーブルとして使用できます。

構文

CREATE TEMPORARY TABLE odps_source(
  id INT,
  user_name VARCHAR,
  content VARCHAR
) WITH (
  'connector' = 'odps', 
  'endpoint' = '<yourEndpoint>',
  'project' = '<yourProjectName>',
  'schemaName' = '<yourSchemaName>',
  'tableName' = '<yourTableName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'partition' = 'ds=2018****'
);

コネクタオプション

  • 一般設定

    オプション

    説明

    データ型

    必須

    デフォルト値

    備考

    connector

    テーブルの種別。

    STRING

    はい

    デフォルト値なし

    値を odps に設定します。

    endpoint

    MaxCompute のエンドポイント。

    STRING

    はい

    デフォルト値なし

    詳細については、「エンドポイント」をご参照ください。

    tunnelEndpoint

    MaxCompute Tunnel のエンドポイント。

    STRING

    いいえ

    デフォルト値なし

    詳細については、「エンドポイント」をご参照ください。

    説明

    このオプションを指定しない場合、MaxCompute は Server Load Balancer (SLB) サービスに基づいてトンネル接続を割り当てます。

    project

    MaxCompute プロジェクトの名前。

    STRING

    はい

    デフォルト値なし

    該当なし。

    schemaName

    MaxCompute スキーマの名前。

    STRING

    いいえ

    デフォルト値なし

    このオプションは、MaxCompute スキーマ機能が有効化されている場合にのみ必須です。その場合、このオプションを MaxCompute テーブルのスキーマ名に設定する必要があります。「スキーマ操作」をご参照ください。

    説明

    このオプションは VVR 8.0.6 以降でのみサポートされます。

    tableName

    MaxCompute テーブルの名前。

    STRING

    はい

    デフォルト値なし

    該当なし。

    accessId

    MaxCompute へのアクセスに使用する AccessKey ID。

    STRING

    はい

    デフォルト値なし

    詳細については、「アカウントの AccessKey ID および AccessKey Secret の情報を確認する方法」をご参照ください。

    重要

    AccessKey ペアの保護のため、変数を使用して AccessKey ID を設定することを推奨します。「変数の管理」をご参照ください。

    accessKey

    MaxCompute へのアクセスに使用する AccessKey Secret。

    STRING

    はい

    デフォルト値なし

    partition

    MaxCompute テーブル内のパーティション名。

    STRING

    いいえ

    デフォルト値なし

    非パーティション化テーブルまたは増分ソースの場合、このオプションを指定する必要はありません。

    説明

    パーティション化テーブルにおける partition オプションの設定方法については、「上流・下流ストレージに関するよくある質問」トピックの「パーティションからのデータ読み取り/書き込み時に partition オプションを設定する方法」セクションをご参照ください。

    compressAlgorithm

    MaxCompute Tunnel で使用される圧縮アルゴリズム。

    STRING

    いいえ

    SNAPPY

    有効な値:

    • RAW(圧縮なし)

    • ZLIB

    • SNAPPY

      ZLIB と比較して、SNAPPY はスループットを大幅に向上させます。テストシナリオでは、スループットが約 50 % 向上します。

    quotaName

    MaxCompute 専用トンネルリソースグループのクォータ名。

    STRING

    いいえ

    デフォルト値なし

    このオプションを指定すると、MaxCompute 専用トンネルリソースグループを使用できます。

    重要
    • このオプションは VVR 8.0.3 以降でのみサポートされます。

    • このオプションを指定する場合、tunnelEndpoint オプションを削除する必要があります。そうでない場合、tunnelEndpoint オプションで指定されたトンネルが使用されます。

  • ソース固有

    オプション

    説明

    データ型

    必須

    デフォルト値

    備考

    maxPartitionCount

    データ読み取り対象の最大パーティション数。

    INTEGER

    いいえ

    100

    読み取り対象のパーティション数がこのオプションの値を超えると、次のエラーメッセージが表示されます:"一致したパーティション数がデフォルト制限を超えています"

    重要

    過剰なパーティションからの読み取りは MaxCompute の負荷を高め、ジョブの起動を遅くする可能性があります。ビジネス要件により必要であれば、手動でこのオプションの値を調整してください。

    useArrow

    Arrow フォーマットを使用してデータを読み取るかどうかを指定します。

    BOOLEAN

    いいえ

    false

    Arrow フォーマットは、MaxCompute のストレージ API オペレーションを呼び出すために使用できます。

    重要
    • このオプションはバッチデプロイメントでのみ有効です。

    • このオプションは VVR 8.0.8 以降でのみサポートされます。

    splitSize

    Arrow フォーマットを使用してデータを読み取る際の 1 回のデータ取得サイズ。

    MEMORYSIZE

    いいえ

    256 MB

    このオプションは VVR 8.0.8 以降でのみサポートされます。

    重要

    このオプションはバッチデプロイメントでのみ有効です。

    compressCodec

    Arrow フォーマットを使用してデータを読み取る際の圧縮アルゴリズム。

    STRING

    いいえ

    ""

    有効な値:

    • ""(圧縮なし)

    • ZSTD

    • LZ4_FRAME

    圧縮アルゴリズムを指定すると、圧縮なしと比較してスループットを向上させることができます。

    重要
    • このオプションはバッチデプロイメントでのみ有効です。

    • このオプションは VVR 8.0.8 以降でのみサポートされます。

    dynamicLoadBalance

    シャードの動的割り当てを有効化するかどうかを指定します。

    BOOLEAN

    いいえ

    false

    有効な値:

    • true

    • false

    シャードの動的割り当てにより、Realtime Compute for Apache Flink の異なる演算子の処理パフォーマンスが向上し、MaxCompute からの読み取り全体にかかる時間が短縮されます。ただし、異なる演算子が読み取るデータ量が不均一になるため、データスキューが発生する可能性があります。

    重要
    • このオプションはバッチデプロイメントでのみ有効です。

    • このオプションは VVR 8.0.8 以降でのみサポートされます。

  • 増分 MaxCompute ソーステーブル固有のオプション

    増分テーブルソースは、すべてのパーティション情報を取得するために、MaxCompute サーバーに対して間欠的にポーリングを実行し、新しいパーティションを監視します。ソースが新しいパーティションからデータの読み取りを開始する前に、そのパーティションへのデータ書き込みが完了している必要があります。詳細については、「アップストリームおよびダウンストリームストレージに関するよくある質問」トピックの「データがまだパーティションに書き込まれているときに、増分 MaxCompute ソーステーブルが新しいパーティションを検出したらどうすればよいですか?」セクションをご参照ください。 startPartition オプションを設定して、データの読み取りを開始するパーティションを指定できます。startPartition オプションで指定されたパーティションの アルファベット順以上であるパーティションのみのデータが読み取られます。たとえば、パーティション year=2023,month=10 のアルファベット順は、パーティション year=2023,month=9 のアルファベット順より小さくなります。この場合、コード内で宣言されているパーティション名の月の数値の前にゼロを追加することで、パーティションのアルファベット順が有効になるようにできます。これにより、パーティションオプションの値を year=2023,month=9 から year=2023,month=09 に変更できます。

    オプション

    説明

    データ型

    必須

    デフォルト値

    備考

    startPartition

    増分データ読み取りを開始するパーティション。

    STRING

    はい

    デフォルト値なし

    • このオプションを指定すると、増分ソースが使用され、partition オプションは無視されます。

    • ソーステーブルが多段パーティション化テーブルの場合、パーティションレベルに従って各パーティション列の値を降順で設定する必要があります。

    説明

    startPartition オプションの設定方法については、「上流・下流ストレージに関するよくある質問」トピックの「増分 MaxCompute ソーステーブルの startPartition オプションを設定する方法」セクションをご参照ください。

    subscribeIntervalInSec

    パーティション情報を取得するために MaxCompute に対してポーリングを行う間隔。

    INTEGER

    いいえ

    30

    単位:秒。

    modifiedTableOperation

    パーティション読み取り中にパーティション内のデータが変更された場合の操作。

    Enum(NONE、SKIP)

    いいえ

    NONE

    ダウンロードセッションはチェックポイントに保存されます。チェックポイントからセッションを再開するたびに、Realtime Compute for Apache Flink はセッションの読み取り進捗を復元しようと試みます。しかし、パーティション内のデータが変更された場合、セッションは利用不可となります。この場合、デプロイメントが繰り返し再起動されます。この問題を解決するには、このオプションを指定します。有効な値:

    • NONE:このオプションを NONE に設定した場合、startPartition オプションの値を変更し、利用不可となったパーティションよりアルファベット順で後のパーティションを指定して、ステートなしでデプロイメントを開始する必要があります。

    • SKIP:ステートなしでデプロイメントを開始したくない場合は、このオプションを SKIP に設定できます。この場合、Realtime Compute for Apache Flink はチェックポイントからセッションを再開しようとする際に、利用不可となったパーティションをスキップします。

    重要
    • このオプションは VVR 8.0.3 以降でのみサポートされます。

    • このオプションを NONE または SKIP に設定した場合、変更されたパーティションから読み取られたデータは保持され、読み取られなかったデータは無視されます。

  • シンク専用

    オプション

    説明

    データ型

    必須

    デフォルト値

    備考

    useStreamTunnel

    MaxCompute Streaming Tunnel を使用してデータをアップロードするかどうかを指定します。

    BOOLEAN

    いいえ

    false

    有効な値:

    • true:MaxCompute Streaming Tunnel を使用してデータをアップロードします。

    • false:MaxCompute Batch Tunnel を使用してデータをアップロードします。

    説明

    トンネルの選択方法については、「上流・下流ストレージに関するよくある質問」トピックの「データトンネルの選択方法」セクションをご参照ください。

    flushIntervalMs

    MaxCompute Tunnel のライターのバッファー内でフラッシュ操作を実行する間隔。

    LONG

    いいえ

    30000(30 秒)

    データはバッファーに蓄積され、flushIntervalMs で指定された間隔でバッチ処理でフラッシュされます。

    • Streaming Tunnel:フラッシュされたデータは、送信先の MaxCompute テーブルで即座に利用可能になります。

    • Batch Tunnel:フラッシュされたデータは、チェックポイント操作が完了した後にのみ利用可能になります。スケジュールされたフラッシュ機能を無効化するには、このオプションを 0 に設定することを推奨します。

    単位:ミリ秒。

    説明

    このオプションは batchSize オプションと併用できます。batchSize オプションまたは flushIntervalMs オプションで指定された条件のいずれかが満たされると、フラッシュ操作がトリガーされます。

    batchSize

    MaxCompute Tunnel のバッファー容量。

    LONG

    いいえ

    67108864(64 MB)

    MaxCompute sink は、データをバッファーに挿入します。その後、バッファー内のデータのサイズが batchSize オプションで指定された値を超えると、MaxCompute sink はバッファー内のデータを送信先の MaxCompute テーブルに書き込みます。

    単位:バイト。

    説明

    このオプションは flushIntervalMs オプションと併用できます。どちらかの条件が満たされると、フラッシュ操作がトリガーされます。

    numFlushThreads

    MaxCompute Tunnel のライターのバッファー内のデータをフラッシュするために使用されるスレッド数。

    INTEGER

    いいえ

    1

    各 MaxCompute 結果テーブルは、numFlushThreads オプションで指定された数のスレッドを作成してデータをフラッシュします。このオプションの値が 1 より大きい場合、異なるパーティションのデータを同時にフラッシュできます。これにより、フラッシュ操作の効率が向上します。

    slotNum

    Flink から MaxCompute へデータを受信する際に MaxCompute が使用するトンネルスロット数。

    INTEGER

    いいえ

    0

    スロット数の制限については、「MaxCompute ドキュメント」の「データ伝送サービスの概要」をご参照ください。

    dynamicPartitionLimit

    データ書き込み対象の動的パーティションの最大数。

    INTEGER

    いいえ

    100

    チェックポイント間で結果テーブルへ書き込まれる動的パーティション数が dynamicPartitionLimit オプションの値を超えると、次のエラーメッセージが表示されます:"動的パーティション数が多すぎます"

    重要

    MaxCompute テーブルの多数のパーティションへデータを書き込むと、MaxCompute サービスのワークロードが高まり、チェックポイントおよびフラッシュの速度が低下します。これを防ぐため、多数のパーティションへデータを書き込む必要があるかどうかを確認してください。ビジネス要件により多数のパーティションへデータを書き込む必要がある場合は、手動で dynamicPartitionLimit オプションの値を増加させてください。

    retryTimes

    MaxCompute サーバーに対するリクエストの最大リトライ回数。

    INTEGER

    いいえ

    3

    セッションの作成、セッションの送信、またはデータのフラッシュ時に、MaxCompute サービスが一時的に利用不可になることがあります。MaxCompute サービスが利用不可になった場合、このオプションの設定に基づいて MaxCompute サーバーへのリクエストが行われます。

    sleepMillis

    リトライ間隔。

    INTEGER

    いいえ

    1000

    単位:ミリ秒。

    enableUpsert

    MaxCompute Upsert Tunnel を使用してデータをアップロードするかどうかを指定します。

    BOOLEAN

    いいえ

    false

    有効な値:

    • true:Realtime Compute for Apache Flink で INSERT、UPDATE_AFTER、DELETE データを処理するために MaxCompute Upsert Tunnel を使用します。

    • false:Realtime Compute for Apache Flink で INSERT および UPDATE_AFTER データを処理するために、useStreamTunnel オプションで指定された MaxCompute Batch Tunnel または MaxCompute Streaming Tunnel を使用します。

    重要
    • MaxCompute 結果テーブルがアップサートモードでセッションをコミットする際に、エラー、デプロイメント失敗、長時間の処理障害などの問題が発生した場合、sink 演算子の並列処理の次数を 10 以下に設定することを推奨します。

    • このオプションは VVR 8.0.6 以降でのみサポートされます。

    upsertAsyncCommit

    MaxCompute 結果テーブルがアップサートモードでセッションをコミットする際に非同期モードを使用するかどうかを指定します。

    BOOLEAN

    いいえ

    false

    有効な値:

    • true:非同期モードを使用します。非同期モードを使用すると、セッションのコミットにかかる時間が短縮されますが、セッションコミット後に書き込まれたデータは即座にクエリできません。

    • false:デフォルトで同期モードが使用されます。MaxCompute 結果テーブルがセッションをコミットする際、システムはサーバーによるセッション処理が完了するまで待機します。

    説明

    このオプションは VVR 8.0.6 以降でのみサポートされます。

    upsertCommitTimeoutMs

    MaxCompute 結果テーブルがアップサートモードでセッションをコミットする際のタイムアウト期間。

    INTEGER

    いいえ

    120000

    (120 秒)

    単位:ミリ秒。

    説明

    このオプションは VVR 8.0.6 以降でのみサポートされます。

    sink.operation

    Delta テーブルへの書き込み操作モード。

    STRING

    いいえ

    insert

    有効な値:

    • insert:データを追加モードでテーブルに書き込みます。

    • upsert:データを更新します。

    説明

    このオプションは VVR 8.0.10 以降でのみサポートされます。

    sink.parallelism

    Delta テーブルへのデータ書き込み時の並列処理の次数。

    INTEGER

    いいえ

    なし

    • データ書き込みの並列度。このオプションを設定しない場合、デフォルトで上流のデータ並列度が使用されます。

    • このオプションは VVR 8.0.10 以降でのみサポートされます。

    重要

    write.bucket.num オプションの値が sink.parallelism オプションの値の整数倍になるように設定してください。これにより、最適な書き込みパフォーマンスが得られ、sink ノードのメモリ消費を効率的に節約できます。

    sink.file-cached.enable

    Delta テーブルの動的パーティションへのデータ書き込み時にファイルキャッシュモードを有効化するかどうかを指定します。

    BOOLEAN

    いいえ

    false

    有効な値:

    • true:ファイルキャッシュモードを有効化します。

    • false:ファイルキャッシュモードは無効化されています。

    ファイルキャッシュモードを有効化すると、サーバーへ書き込まれる小規模ファイルの数が減少しますが、書き込みレイテンシーが高くなります。sink テーブルの並列度が高い場合、ファイルキャッシュモードを有効化することを推奨します。

    説明

    このオプションは VVR 8.0.10 以降でのみサポートされます。

    sink.file-cached.writer.num

    ファイルキャッシュモードで、1 つのタスク内でデータを同時アップロードするために使用されるスレッド数。

    INTEGER

    いいえ

    16

    • このオプションは sink.file-cached.enable オプションが true に設定されている場合にのみ有効です。

    • このオプションの値を大きく設定しないことを推奨します。多数のパーティションへ同時にデータを書き込むと、メモリ不足(OOM)エラーが発生する可能性があります。

      説明

      このオプションは VVR 8.0.10 以降でのみサポートされます。

    sink.bucket.check-interval

    ファイルキャッシュモードでファイルサイズをチェックする間隔。単位:ミリ秒。

    INTEGER

    いいえ

    60000

    • このオプションは sink.file-cached.enable オプションが true に設定されている場合にのみ有効です。

    • このオプションは VVR 8.0.10 以降でのみサポートされます。

    sink.file-cached.rolling.max-size

    ファイルキャッシュモードにおける単一キャッシュファイルの最大値。

    MEMORYSIZE

    いいえ

    16 MB

    • このオプションは sink.file-cached.enable オプションが true に設定されている場合にのみ有効です。

    • ファイルサイズがこのオプションの値を超えると、ファイルデータがサーバーへアップロードされます。

      説明

      このオプションは VVR 8.0.10 以降でのみサポートされます。

    sink.file-cached.memory

    ファイルキャッシュモードでファイルへのデータ書き込みに使用されるオフヒープメモリの最大サイズ。

    MEMORYSIZE

    いいえ

    64 MB

    • このオプションは sink.file-cached.enable オプションが true に設定されている場合にのみ有効です。

    • このオプションは VVR 8.0.10 以降でのみサポートされます。

    sink.file-cached.memory.segment-size

    ファイルキャッシュモードでファイルへのデータ書き込みに使用されるバッファーのサイズ。

    MEMORYSIZE

    いいえ

    128 KB

    • このオプションは sink.file-cached.enable オプションが true に設定されている場合にのみ有効です。

    • このオプションは VVR 8.0.10 以降でのみサポートされます。

    sink.file-cached.flush.always

    ファイルキャッシュモードでファイルへのデータ書き込みにキャッシュを使用するかどうかを指定します。

    BOOLEAN

    いいえ

    true

    • このオプションは sink.file-cached.enable オプションが true に設定されている場合にのみ有効です。

    • このオプションは VVR 8.0.10 以降でのみサポートされます。

    sink.file-cached.write.max-retries

    ファイルキャッシュモードでのデータアップロードのリトライ回数。

    INTEGER

    いいえ

    3

    • このオプションは sink.file-cached.enable オプションが true に設定されている場合にのみ有効です。

    • このオプションは VVR 8.0.10 以降でのみサポートされます。

    upsert.writer.max-retries

    Upsert Writer セッションでバケットへのデータ書き込みの最大リトライ回数。

    INTEGER

    いいえ

    3

    このオプションは VVR 8.0.10 以降でのみサポートされます。

    upsert.writer.buffer-size

    Realtime Compute for Apache Flink における Upsert Writer セッションのバッファー容量。

    MEMORYSIZE

    いいえ

    64 MB

    • すべてのバケットの合計バッファー容量が指定されたしきい値に達すると、システムが自動的にデータをサーバーへ更新します。

    説明

    Upsert Writer セッションのデータは、同時に複数のバケットへ書き込まれることがあります。書き込み効率を向上させるには、このオプションの値を増加させることを推奨します。

    多数のパーティションへデータを書き込む場合、OOM エラーが発生する可能性があります。これを防ぐには、このオプションの値を減らすことができます。

    • このオプションは VVR 8.0.10 以降でのみサポートされます。

    upsert.writer.bucket.buffer-size

    Realtime Compute for Apache Flink における単一バケットのバッファー容量。

    MEMORYSIZE

    いいえ

    1 MB

    • Flink サーバーのメモリリソースが不足している場合、このオプションの値を減らすことができます。

    • このオプションは VVR 8.0.10 以降でのみサポートされます。

    upsert.write.bucket.num

    データ書き込み先テーブルのバケット数。

    INTEGER

    はい

    なし

    • このオプションの値は、データ書き込み先 Delta テーブルで設定された write.bucket.num オプションの値と同じにする必要があります。

    • このオプションは VVR 8.0.10 以降でのみサポートされます。

    upsert.write.slot-num

    セッションで使用されるトンネルスロット数。

    INTEGER

    いいえ

    1

    このオプションは VVR 8.0.10 以降でのみサポートされます。

    upsert.commit.max-retries

    アップサートセッションコミットの最大リトライ回数。

    INTEGER

    いいえ

    3

    このオプションは VVR 8.0.10 以降でのみサポートされます。

    upsert.commit.thread-num

    アップサートセッションコミットの並列処理の次数。

    INTEGER

    いいえ

    16

    • このオプションの値を大きすぎないように設定することを推奨します。過剰なアップサートセッションコミットが同時に実行されると、リソース消費が増加し、パフォーマンスの問題や過剰なリソース消費が発生する可能性があります。

    • このオプションは VVR 8.0.10 以降でのみサポートされます。

    upsert.commit.timeout

    アップサートセッションコミットのタイムアウト期間。単位:秒。

    INTEGER

    いいえ

    600

    このオプションは VVR 8.0.10 以降でのみサポートされます。

    upsert.flush.concurrent

    1 つのパーティション内のデータを同時に書き込める最大バケット数。

    INTEGER

    いいえ

    2

    • バケット内のデータがリフレッシュされるたびに、トンネルスロットが占有されます。

    • このオプションは VVR 8.0.10 以降でのみサポートされます。

    insert.commit.thread-num

    コミットセッションの並列処理の次数。

    INTEGER

    いいえ

    16

    このオプションは VVR 8.0.10 以降でのみサポートされます。

    insert.arrow-writer.enable

    Arrow フォーマットを使用するかどうかを指定します。

    BOOLEAN

    いいえ

    false

    有効な値:

    • true:Arrow フォーマットを使用します。

    • false:Arrow フォーマットを使用しません。

    説明

    このオプションは VVR 8.0.10 以降でのみサポートされます。

    insert.arrow-writer.batch-size

    Arrow 形式のデータの 1 バッチあたりの最大行数。

    INTEGER

    いいえ

    512

    このオプションは VVR 8.0.10 以降でのみサポートされます。

    insert.arrow-writer.flush-interval

    ライターがデータをフラッシュする間隔。単位:ミリ秒。

    INTEGER

    いいえ

    100000

    このオプションは VVR 8.0.10 以降でのみサポートされます。

    insert.writer.buffer-size

    バッファライターのキャッシュサイズ。

    MEMORYSIZE

    いいえ

    64 MB

    このオプションは VVR 8.0.10 以降でのみサポートされます。

    upsert.partial-column.enable

    特定のカラムのデータのみを更新するかどうかを指定します。

    BOOLEAN

    いいえ

    false

    このオプションは、MaxCompute Delta テーブルへデータを書き込む結果テーブルにのみ適用されます。「MaxCompute ドキュメント」の「特定カラムのデータ更新」をご参照ください。

    有効な値:

    • true

    • false

    データ更新の動作は、sink に新しいデータと同じプライマリキーを持つレコードが存在するかどうかによって異なります。

    • sink テーブルに同じプライマリキーを持つデータが存在する場合、プライマリキーに基づいて対応するフィールドが更新されます。具体的には、値が null でない場合、指定されたフィールドが新しい値で上書きされます。

    • sink テーブルに同じプライマリキーを持つレコードが存在しない場合、新しいレコードが追加されます。指定されたカラムには新しい値が挿入され、他のすべてのカラムには null が挿入されます。

    説明

    このオプションは VVR 8.0.11 以降でのみサポートされます。

  • ディメンションテーブル固有

    デプロイメント開始時に、ディメンションテーブルは partition オプションで指定されたパーティションから完全データをプルします。このオプションは max_pt() 関数をサポートしています。キャッシュエントリの有効期限が切れてキャッシュが再読み込みされる場合、partition オプションで指定された最新のパーティションのデータが再解析されます。partition オプションが max_two_pt() に設定されている場合、ディメンションテーブルは 2 つのパーティションからデータをプルできます。それ以外の場合、1 つのパーティションからのみデータをプルできます。

    オプション

    説明

    データ型

    必須

    デフォルト値

    備考

    cache

    キャッシュポリシー。

    STRING

    はい

    デフォルト値なし

    ディメンションテーブルでは、cache オプションを ALL に設定し、DDL 文で明示的に宣言する必要があります。リモートテーブルのデータ量が少なく、多数の欠落キーが存在する場合、このオプションを ALL に設定することを推奨します。ソースとディメンションテーブルは ON 句に基づいて関連付けられません。

    ALL:ディメンションテーブルのすべてのデータをキャッシュすることを意味します。システムがデプロイメントを実行する前に、ディメンションテーブルのすべてのデータがキャッシュに読み込まれます。その後、ディメンションテーブルに対するすべてのクエリはキャッシュを検索します。キーが存在しない場合、システムはキャッシュ内にデータレコードを見つけることができません。キャッシュエントリの有効期限が切れた後、システムはキャッシュ内のすべてのデータを再読み込みします。

    説明
    • cache オプションを ALL に設定する場合、ディメンションテーブルのデータを非同期で読み込むため、結合ノードのメモリを増加させる必要があります。リモートテーブルのデータ量の少なくとも 4 倍のメモリサイズを増加させることを推奨します。メモリサイズは MaxCompute のストレージ圧縮アルゴリズムに関係します。

    • ディメンションテーブルのデータ量が非常に多い場合、SHUFFLE_HASH ヒントを使用してデータを各サブタスクに均等に分散させることができます。「上流・下流ストレージに関するよくある質問」トピックの「ディメンションテーブルで SHUFFLE_HASH ヒントを使用する方法」セクションをご参照ください。

    • 超大規模なディメンションテーブルを使用する場合、Java 仮想マシン(JVM)のガーベジコレクション(GC)が頻繁に発生し、デプロイメント例外が発生する可能性があります。この問題を解決するには、ディメンションテーブルが他のテーブルと結合されるノードのメモリを増加させます。それでも問題が解決しない場合は、LRU(Least Recently Used)キャッシュポリシーをサポートするキー・バリュー型のディメンションテーブルに変換することを推奨します。たとえば、ApsaraDB for HBase ディメンションテーブルをキー・バリュー型のディメンションテーブルとして使用できます。

    cacheSize

    キャッシュ可能なデータ行数の最大値。

    LONG

    いいえ

    100000

    ディメンションテーブルのデータレコード数が cacheSize オプションの値を超えると、次のエラーメッセージが表示されます:"テーブル <table-name> のパーティション <partition-name> の行数が maxRowCount 制限を超えています"

    重要

    ディメンションテーブルに大量のデータレコードが存在する場合、JVM ヒープメモリの消費量が大きくなります。この場合、デプロイメントの起動速度およびディメンションテーブルの更新速度が低下します。これを防ぐため、大量のデータレコードをキャッシュする必要があるかどうかを確認してください。ビジネス要件によりディメンションテーブルに大量のデータレコードをキャッシュする必要がある場合は、手動でこのオプションの値を増加させてください。

    cacheTTLMs

    キャッシュのタイムアウト期間。

    LONG

    いいえ

    Long.MAX_VALUE

    単位:ミリ秒。

    cacheReloadTimeBlackList

    キャッシュがリフレッシュされない時間帯。このオプションで指定された時間帯には、キャッシュがリフレッシュされません。

    STRING

    いいえ

    デフォルト値なし

    このオプションは、ピーク時間のキャンペーンなど、大規模なオンラインプロモーションイベントに適用されます。このオプションを指定することで、キャッシュのリフレッシュ時にデプロイメントが不安定になるのを防ぐことができます。「上流・下流ストレージに関するよくある質問」トピックの「CacheReloadTimeBlackList オプションの設定方法」セクションをご参照ください。

    maxLoadRetries

    キャッシュのリフレッシュを試行する最大リトライ回数。デプロイメント開始時に最初にデータをプルする際、キャッシュがリフレッシュされます。リトライ回数がこのオプションの値を超えると、デプロイメントの実行に失敗します。

    INTEGER

    いいえ

    10

    該当なし。

データ型のマッピング

MaxCompute がサポートするデータ型の詳細については、「MaxCompute データ型システム バージョン 2.0」をご参照ください。

MaxCompute のデータ型

Realtime Compute for Apache Flink のデータ型

BOOLEAN

BOOLEAN

TINYINT

TINYINT

SMALLINT

SMALLINT

INT

INTEGER

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

DECIMAL(precision, scale)

DECIMAL(precision, scale)

CHAR(n)

CHAR(n)

VARCHAR(n)

VARCHAR(n)

STRING

STRING

BINARY

BYTES

DATE

DATE

DATETIME

TIMESTAMP(3)

TIMESTAMP

TIMESTAMP(9)

TIMESTAMP_NTZ

TIMESTAMP(9)

ARRAY

ARRAY

MAP

MAP

STRUCT

ROW

JSON

STRING

重要

MaxCompute 物理テーブルにネストされた複合データ型(ARRAY、MAP、STRUCT など)および JSON 型のフィールドが含まれる場合、Realtime Compute for Apache Flink が物理テーブルからデータを読み取り・書き込みできるようにするには、MaxCompute 物理テーブルを作成する際に tblproperties('columnar.nested.type'='true') を指定する必要があります。

Flink CDC(パブリックプレビュー)

MaxCompute コネクタは、YAML ベースのジョブでデータインジェスト sink として使用できます。

VVR エンジンの要件

VVR 11.1 以降

構文

source:
  type: xxx

sink:
   type: maxcompute
   name: MaxComputeSinkaccess-id: ${your_accessId}
   access-key: ${your_accessKey}
   endpoint: ${your_maxcompute_endpoint}
   project: ${your_project}buckets-num: 8

構成オプション

オプション

必須?

デフォルト値

データ型

説明

type

はい

デフォルト値なし。

String

使用するコネクタ。値を maxcompute に設定します。

name

いいえ

デフォルト値なし。

String

シンク名。

access-id

はい

デフォルト値なし。

String

お使いの Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID です。[リソースアクセス管理コンソール] で取得してください。

access-key

はい

デフォルト値なし。

String

お客様の AccessKey Secret です。

endpoint

はい

デフォルト値なし。

String

MaxCompute エンドポイント。MaxCompute プロジェクトが配置されているリージョンおよびネットワーク接続方法に基づいて設定してください。「エンドポイント」をご参照ください。

project

はい

デフォルト値なし。

String

MaxCompute プロジェクト名。以下の手順で取得できます:

  1. MaxCompute コンソールにログインします。

  2. 左側のナビゲーションウィンドウで、ワークスペース>プロジェクト を選択します。

  3. MaxCompute プロジェクトを見つけ、その名前をコピーします。

tunnel.endpoint

いいえ

デフォルト値なし。

String

MaxCompute Tunnel のエンドポイント。通常、このエンドポイントは MaxCompute によって endpoint 設定から自動的に推論されます。ただし、プロキシサーバーを使用する場合など、特殊なネットワーク環境では、これを明示的に定義する必要があります。

quota.name

いいえ

デフォルト値なし。

String

専用リソースグループ のクォータ名。このオプションを明示的に指定しない場合、共有リソースグループが使用されます。

sts-token

いいえ

デフォルト値なし。

String

RAM ロールの STS トークン。RAM ロールを使用して MaxCompute にアクセスする場合、身分認証のためにこのオプションが必要です。

buckets-num

いいえ

16

Integer

自動作成される MaxCompute Delta テーブルのバケット数。「ニアリアルタイムデータウェアハウス」をご参照ください。

compress.algorithm

いいえ

zlib

String

データ圧縮アルゴリズム。有効な値:

  • raw:データは圧縮されません。

  • zlib

  • snappy

total.buffer-size

いいえ

64 MB

String

メモリ内バッファーのサイズ。パーティションテーブルの場合、このバッファーはパーティションレベルで適用されます。非パーティション化テーブルの場合、テーブルレベルで適用されます。異なるパーティションまたはテーブルのバッファーは独立しています。バッファーが容量に達すると、そのデータは MaxCompute へフラッシュされます。

bucket.buffer-size

いいえ

4 MB

String

バケットのメモリ内バッファーのサイズ。このオプションは、MaxCompute Delta テーブルへデータを書き込む場合にのみ適用されます。異なるバケットのバッファーは独立しています。バッファーが容量に達すると、そのデータは MaxCompute へフラッシュされます。

commit.thread-num

いいえ

16

Integer

チェックポイント中における、同時にコミット可能な最大パーティション数またはテーブル数。

flush.concurrent-num

いいえ

4

Integer

Flink が同時にフラッシュできる最大バケット数を指定します。このオプションは、MaxCompute Delta テーブルへデータを書き込む場合にのみ適用されます。

テーブル位置のマッピング

コネクタが MaxCompute で自動テーブル作成をトリガーする場合、位置は次のようにマッピングされます:

重要

MaxCompute プロジェクトでスキーマ機能が無効になっている場合、コネクタは tableId.namespace を無視します。この場合、単一のデータベースまたはその論理的な同等物のみが MaxCompute へ取り込まれます。たとえば、MySQL から MaxCompute へデータを取り込む場合、1 つの MySQL データベースのみが取り込まれます。

MySQL の位置

Flink CDC の概要

MaxCompute の位置

該当なし

構成ファイルのプロジェクト

プロジェクト

データベース

TableId.namespace

スキーマ

説明

MaxCompute プロジェクトでスキーマが無効になっている場合、この設定は無視されます。

テーブル

TableId.tableName

テーブル

データ型のマッピング

Flink CDC 型

MaxCompute 型

CHAR

STRING

VARCHAR

STRING

BOOLEAN

BOOLEAN

BINARY/VARBINARY

BINARY

DECIMAL

DECIMAL

TINYINT

TINYINT

SMALLINT

SMALLINT

INTEGER

INTEGER

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

TIME_WITHOUT_TIME_ZONE

STRING

DATE

DATE

TIMESTAMP_WITHOUT_TIME_ZONE

TIMESTAMP_NTZ

TIMESTAMP_WITH_LOCAL_TIME_ZONE(Precision>3)

TIMESTAMP

TIMESTAMP_WITH_LOCAL_TIME_ZONE(Precision<=3)

DATETIME

TIMESTAMP_WITH_TIME_ZONE(Precision>3)

TIMESTAMP

TIMESTAMP_WITH_TIME_ZONE(Precision<=3)

DATETIME

ARRAY

ARRAY

MAP

MAP

ROW

STRUCT

使用例

SQL API

ソーステーブル

特定パーティションの全データ読み取り

partition オプションで指定されたパーティションのすべてのデータを読み取ります。

CREATE TEMPORARY TABLE odps_source (
  cid VARCHAR,
  rt DOUBLE
) WITH (
  'connector' = 'odps',
  'endpoint' = '<yourEndpointName>',
  'project' = '<yourProjectName>',
  'tableName' = '<yourTableName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'partition' = 'ds=201809*'
);

CREATE TEMPORARY TABLE blackhole_sink (
  cid VARCHAR,
  invoke_count BIGINT
) WITH (
  'connector' = 'blackhole'
);

INSERT INTO blackhole_sink
SELECT
   cid,
   COUNT(*) AS invoke_count
FROM odps_source GROUP BY cid;
増分データの読み取り

startPartition で指定されたパーティションからデータの読み取りを開始し、新しいレコードを継続的に監視します。

CREATE TEMPORARY TABLE odps_source (
  cid VARCHAR,
  rt DOUBLE
) WITH (
  'connector' = 'odps',
  'endpoint' = '<yourEndpointName>',
  'project' = '<yourProjectName>',
  'tableName' = '<yourTableName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'startPartition' = 'yyyy=2018,MM=09,dd=05' -- 20180905 パーティションから読み取りを開始します。
);

CREATE TEMPORARY TABLE blackhole_sink (
  cid VARCHAR,
  invoke_count BIGINT
) WITH (
  'connector' = 'blackhole'
);

INSERT INTO blackhole_sink
SELECT cid, COUNT(*) AS invoke_count
FROM odps_source GROUP BY cid;

結果テーブル

静的パーティションへの書き込み

partition オプションで指定されたパーティションに書き込みます:

CREATE TEMPORARY TABLE datagen_source (
  id INT,
  len INT,
  content VARCHAR
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE odps_sink (
  id INT,
  len INT,
  content VARCHAR
) WITH (
  'connector' = 'odps',
  'endpoint' = '<yourEndpoint>',
  'project' = '<yourProjectName>',
  'tableName' = '<yourTableName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'partition' = 'ds=20180905' -- データはパーティション 20180905 に書き込まれます。
);

INSERT INTO odps_sink
SELECT
  id, len, content
FROM datagen_source;
動的パーティションへの書き込み

partition オプションに基づいて動的にパーティションにデータを書き込みます:

CREATE TEMPORARY TABLE datagen_source (
  id INT,
  len INT,
  content VARCHAR,
  c TIMESTAMP
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE odps_sink (
  id  INT,
  len INT,
  content VARCHAR,
  ds VARCHAR -- 動的パーティション列を明示的に指定します。
) WITH (
  'connector' = 'odps',
  'endpoint' = '<yourEndpoint>',
  'project' = '<yourProjectName>',
  'tableName' = '<yourTableName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'partition' = 'ds' -- ds の値を指定しません。データは ds フィールドの値に基づいて異なるパーティションに書き込まれます。
);

INSERT INTO odps_sink
SELECT
   id,
   len,
   content,
   DATE_FORMAT(c, 'yyMMdd') as ds
FROM datagen_source;

ディメンションテーブル

単一値キー

各キーが一意の値を持つ場合にプライマリキーを指定します:

CREATE TEMPORARY TABLE datagen_source (
  k INT,
  v VARCHAR
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE odps_dim (
  k INT,
  v VARCHAR,
  PRIMARY KEY (k) NOT ENFORCED  -- プライマリキーを指定します。
) WITH (
  'connector' = 'odps',
  'endpoint' = '<yourEndpoint>',
  'project' = '<yourProjectName>',
  'tableName' = '<yourTableName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'partition' = 'ds=20180905',
  'cache' = 'ALL'
);

CREATE TEMPORARY TABLE blackhole_sink (
  k VARCHAR,
  v1 VARCHAR,
  v2 VARCHAR
) WITH (
  'connector' = 'blackhole'
);

INSERT INTO blackhole_sink
SELECT k, s.v, d.v
FROM datagen_source AS s
INNER JOIN odps_dim FOR SYSTEM_TIME AS OF PROCTIME() AS d ON s.k = d.k;
複数値キー

キーが複数の値を持つ可能性がある場合、プライマリキーを指定しません:

CREATE TEMPORARY TABLE datagen_source (
  k INT,
  v VARCHAR
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE odps_dim (
  k INT,
  v VARCHAR
  -- プライマリキーの指定は必須ではありません。
) WITH (
  'connector' = 'odps',
  'endpoint' = '<yourEndpoint>',
  'project' = '<yourProjectName>',
  'tableName' = '<yourTableName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'partition' = 'ds=20180905',
  'cache' = 'ALL'
);

CREATE TEMPORARY TABLE blackhole_sink (
  k VARCHAR,
  v1 VARCHAR,
  v2 VARCHAR
) WITH (
  'connector' = 'blackhole'
);

INSERT INTO blackhole_sink
SELECT k, s.v, d.v
FROM datagen_source AS s
INNER JOIN odps_dim FOR SYSTEM_TIME AS OF PROCTIME() AS d ON s.k = d.k;

DataStream API

重要
  • DataStream API を呼び出してデータを読み書きする場合は、関連するタイプの DataStream コネクタを使用する必要があります。DataStream コネクタの設定方法の詳細については、「DataStream コネクタの統合」をご参照ください。

  • 知的財産保護のため、VVR 6.0.6 以降では、MaxCompute コネクタを使用した DataStream プログラムのオンプレミスでのデバッグは最大 30 分間に制限されています。これを超えるデバッグセッションは、プログラムがエラーで終了します。詳細については、「コネクタのローカルデバッグ」をご参照ください。

  • MaxCompute Delta テーブルからのデータ読み取りはサポートされていません。Delta テーブルは、指定された primary key とプロパティ transactional=true で作成されたテーブルです。詳細については、「用語」をご参照ください。

MaxCompute DataStream コネクタを使用する場合、SQL 文を使用して MaxCompute テーブルを宣言することを推奨します。Table API オペレーションを呼び出して MaxCompute テーブルにアクセスしたり、DataStream API オペレーションを呼び出してデータストリームにアクセスしたりできます。

ソーステーブルへの接続

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql(String.join(
    "\n",
    "CREATE TEMPORARY TABLE IF NOT EXISTS odps_source (",
    "  cid VARCHAR,",
    "  rt DOUBLE",
    ") WITH (",
    "  'connector' = 'odps',",
    "  'endpoint' = '<yourEndpointName>',",
    "  'project' = '<yourProjectName>',",
    "  'tableName' = '<yourTableName>',",
    "  'accessId' = '<yourAccessId>',",
    "  'accessKey' = '<yourAccessPassword>',",
    "  'partition' = 'ds=201809*'",
    ")");
DataStream<Row> source = tEnv.toDataStream(tEnv.from("odps_source"));
source.print();
env.execute("odps source"); 

シンクに接続

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql(String.join(
    "\n",
    "CREATE TEMPORARY TABLE IF NOT EXISTS odps_sink (",
    "  cid VARCHAR,",
    "  rt DOUBLE",
    ") WITH (",
    "  'connector' = 'odps',",
    "  'endpoint' = '<yourEndpointName>',",
    "  'project' = '<yourProjectName>',",
    "  'tableName' = '<yourTableName>',",
    "  'accessId' = '<yourAccessId>',",
    "  'accessKey' = '<yourAccessPassword>',",
    "  'partition' = 'ds=20180905'",
    ")");
DataStream<Row> data = env.fromElements(
    Row.of("id0", 3.),
    Row.of("id1", 4.));
tEnv.fromDataStream(data).insertInto("odps_sink").execute();

XML

MaxCompute コネクタの Maven 依存関係には、完全ソース、増分ソース、結果テーブル、ディメンションテーブルを作成するために必要なクラスが含まれています。さまざまなバージョンの MaxCompute DataStream コネクタは、Maven 中央リポジトリに格納されています。

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-odps</artifactId>
    <version>${vvr-version}</version>
</dependency>

参考情報