すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:MaxCompute: ビッグデータコンピューティングサービス

最終更新日:Nov 09, 2025

このトピックでは、MaxCompute コネクタの構文、WITH パラメーター、および使用例について説明します。

背景情報

MaxCompute (旧 ODPS) は、高速でフルマネージド型のエクサバイト規模のデータウェアハウスソリューションです。MaxCompute は、構造化データの大規模なバッチを保存および計算し、大規模なデータウェアハウス向けのソリューションと、分析およびモデリング向けのサービスを提供します。

次の表に、MaxCompute コネクタの特徴を示します。

カテゴリ

詳細

サポートされるテーブルタイプ

ソーステーブル、ディメンションテーブル、および結果テーブル

実行モード

ストリームモードおよびバッチモード

データフォーマット

サポートされていません

特定のモニタリングメトリック

メトリック

  • ソーステーブル

    numRecordsIn

    numRecordsInPerSecond

    numBytesIn

    numBytesInPerSecond

  • シンクテーブル

    numRecordsOut

    numRecordsOutPerSecond

    numBytesOut

    numBytesOutPerSecond

  • ディメンションテーブル

    dim.odps.cacheSize

説明

詳細については、「モニタリングメトリックの説明」をご参照ください。

API タイプ

Datastream および SQL

結果テーブルのデータの更新または削除

Batch Tunnel および Stream Tunnel モードはデータ挿入のみをサポートします。Upsert Tunnel モードはデータの挿入、更新、および削除をサポートします。

前提条件

MaxCompute テーブルを作成済みであること。詳細については、「テーブルの作成」をご参照ください。

制限事項

  • MaxCompute コネクタは、at-least-once セマンティクスのみをサポートします。

    説明

    at-least-once セマンティクスは、データ損失を防ぐために使用されます。特定のケースでは、重複データが MaxCompute に書き込まれることがあります。重複データを引き起こす条件は、使用される MaxCompute Tunnel によって異なります。詳細については、「データチャネルの選択方法」をご参照ください。

  • デフォルトでは、ソーステーブルは完全データモードです。`partition` パラメーターで指定されたパーティションのデータのみを読み取ります。すべてのデータが読み取られると、ジョブは終了します。

    新しいパーティションを継続的に監視するには、WITH 句で `startPartition` パラメーターを指定して、増分ソーステーブルモードを使用できます。

    説明
    • ディメンションテーブルは、各更新サイクル中に最新のパーティションをチェックするため、この制限の対象にはなりません。

    • ソーステーブルジョブの開始後にパーティションに新しいデータが追加された場合、その新しいデータは読み取られません。したがって、パーティション内のデータが完全になった後にのみジョブを実行する必要があります。

SQL

SQL ジョブで MaxCompute コネクタをソーステーブル、ディメンションテーブル、または結果テーブルとして使用できます。

構文

CREATE TEMPORARY TABLE odps_source(
  id INT,
  user_name VARCHAR,
  content VARCHAR
) WITH (
  'connector' = 'odps', 
  'endpoint' = '<yourEndpoint>',
  'project' = '<yourProjectName>',
  'schemaName' = '<yourSchemaName>',
  'tableName' = '<yourTableName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'partition' = 'ds=2018****'
);

WITH パラメーター

一般パラメーター

パラメーター

説明

データの型

必須

デフォルト値

備考

connector

テーブルのタイプ。

String

はい

なし

値は odps である必要があります。

endpoint

MaxCompute サービスの Endpoint。

String

はい

なし

詳細については、「Endpoint」をご参照ください。

tunnelEndpoint

MaxCompute Tunnel サービスのエンドポイント。

String

いいえ

なし

Endpoint」をご参照ください。

説明

このパラメーターを指定しない場合、MaxCompute は内部のロードバランシングサービスに基づいてトンネル接続を割り当てます。

project

MaxCompute プロジェクトの名前。

String

はい

なし

なし。

schemaName

MaxCompute スキーマの名前。

String

いいえ

なし

MaxCompute プロジェクトでスキーマ機能が有効になっている場合は、MaxCompute テーブルを含むスキーマの名前を指定します。詳細については、「スキーマ操作」をご参照ください。

説明

このパラメーターは VVR 8.0.6 以降でのみサポートされます。

tableName

MaxCompute テーブルの名前。

String

はい

なし

なし。

accessId

MaxCompute アカウントの AccessKey ID。

String

はい

なし

詳細については、「AccessKey ID と AccessKey Secret 情報を表示する方法」をご参照ください。

重要

AccessKey 情報の漏洩を避けるため、変数を使用して AccessKey の値を指定することをお勧めします。詳細については、「プロジェクト変数」をご参照ください。

accessKey

MaxCompute アカウントの AccessKey Secret。

String

はい

なし

partition

MaxCompute パーティションの名前。

String

いいえ

なし

このパラメーターは、パーティション分割されていないテーブルまたは増分ソーステーブルには必要ありません。

説明

パーティションテーブルの詳細については、「パーティションからの読み取りまたはパーティションへの書き込み時に Partition パラメーターを指定する方法」をご参照ください。

compressAlgorithm

MaxCompute Tunnel で使用される圧縮アルゴリズム。

String

いいえ

SNAPPY

有効な値:

  • RAW (圧縮なし)

  • ZLIB

  • SNAPPY

    SNAPPY は ZLIB よりも大幅に高いスループットを提供します。テストシナリオでは、スループットが約 50% 向上します。

quotaName

MaxCompute の専用 Data Transmission Service リソースのクォータ名。

String

いいえ

なし

MaxCompute の専用 Data Transmission Service リソースを使用するには、このパラメーターを設定します。

重要
  • このパラメーターは VVR 8.0.3 以降でのみサポートされます。

  • このパラメーターを設定する場合、tunnelEndpoint パラメーターを削除する必要があります。そうしないと、tunnelEndpoint で指定されたデータトンネルが引き続き使用されます。

ソーステーブル固有のパラメーター

パラメーター

説明

データの型

必須

デフォルト値

備考

maxPartitionCount

読み取り可能なパーティションの最大数。

Integer

いいえ

100

読み取るパーティションの数がこの値を超えると、The number of matched partitions exceeds the default limit エラーが報告されます。

重要

一度に多数のパーティションを読み取ると、MaxCompute の負荷が増加し、ジョブの起動が遅くなります。partition パラメーターが誤って設定されていないか確認してください。多数のパーティションを読み取るには、maxPartitionCount の値を手動で増やしてください。

useArrow

Arrow フォーマットを使用してデータを読み取るかどうかを指定します。

Boolean

いいえ

false

Arrow フォーマットを使用すると、MaxCompute Storage API を呼び出すことができます。

重要
  • このパラメーターはバッチジョブでのみ有効です。

  • このパラメーターは VVR 8.0.8 以降でのみサポートされます。

splitSize

Arrow フォーマットを使用してデータを読み取る際に一度にプルするデータのサイズ。

MemorySize

いいえ

256 MB

このパラメーターは VVR 8.0.8 以降でのみサポートされます。

重要

このパラメーターはバッチジョブでのみ有効です。

compressCodec

Arrow フォーマットでデータを読み取る際に使用される圧縮アルゴリズム。

String

いいえ

""

有効な値:

  • "" (圧縮なし)

  • ZSTD

  • LZ4_FRAME

圧縮アルゴリズムを指定すると、圧縮なしの場合と比較してスループットが向上します。

重要
  • このパラメーターはバッチジョブでのみ有効です。

  • このパラメーターは VVR 8.0.8 以降でのみサポートされます。

dynamicLoadBalance

シャードを動的に割り当てるかどうかを指定します。

Boolean

いいえ

false

有効な値:

  • true: 許可

  • false: 操作は許可されません。

シャードを動的に割り当てると、異なる Flink ノードの処理性能を活用し、ソーステーブルの全体的な読み取り時間を短縮できます。ただし、これにより、異なるノードで読み取られるデータ量が不整合になり、データスキューが発生する可能性があります。

重要
  • このパラメーターはバッチジョブでのみ有効です。

  • このパラメーターは VVR 8.0.8 以降でのみサポートされます。

増分ソーステーブル固有のパラメーター

増分ソーステーブルは、MaxCompute サーバーを定期的にポーリングしてすべてのパーティション情報を取得することで、新しいパーティションを検出します。新しいパーティションが読み取られるとき、そのパーティションのデータは完全である必要があります。詳細については、「増分 MaxCompute ソーステーブルが不完全に書き込まれたパーティションを検出した場合の対処方法」をご参照ください。`startPartition` パラメーターを使用して開始点を指定できます。開始点以上の 辞書式順序 を持つパーティションのみが読み取られることに注意してください。たとえば、パーティション year=2023,month=10 は、パーティション year=2023,month=9 よりも辞書式順序で小さくなります。このタイプのパーティション宣言では、year=2023,month=09 のようにゼロパディングを使用して正しい辞書式順序を保証できます。

パラメーター

説明

データの型

必須

デフォルト値

備考

startPartition

増分読み取りのための MaxCompute パーティションの開始オフセット (包括的)。

String

はい

なし

  • このパラメーターを使用すると、増分ソーステーブルモードが有効になり、partition パラメーターは無視されます。

  • 複数レベルのパーティションの場合、最上位レベルから最下位レベルまで、各パーティションキー列の値を宣言する必要があります。

説明

startPartition パラメーターの詳細については、「増分 MaxCompute の startPartition パラメーターを指定する方法」をご参照ください。

subscribeIntervalInSec

パーティションリストについて MaxCompute をポーリングする間隔。

Integer

いいえ

30

単位は秒です。

modifiedTableOperation

読み取りプロセス中にパーティションデータが変更された場合に実行する操作。

Enum (NONE, SKIP)

いいえ

NONE

ダウンロードセッションはチェックポイントに保存されるため、チェックポイントからの各回復は、そのセッションからの読み取りを再開しようとします。パーティションデータが変更されたためにセッションが利用できない場合、Flink ジョブは再起動ループに入ります。この場合、このパラメーターを設定できます。有効な値:

  • NONE: startPartition パラメーターを、利用できないパーティションよりも大きい値に変更し、ステートレスな状態からジョブを開始する必要があります。

  • SKIP: ステートレスな状態からジョブを開始したくない場合は、モードを SKIP に設定します。Flink がチェックポイントからセッションを回復しようとすると、利用できないパーティションをスキップします。

重要
  • このパラメーターは VVR 8.0.3 以降でのみサポートされます。

  • NONE モードと SKIP モードでは、変更されたパーティションから読み取られたデータは取り消されず、未読のデータは読み取られません。

結果テーブル固有のパラメーター

パラメーター

説明

データの型

必須

デフォルト値

備考

useStreamTunnel

MaxCompute Stream Tunnel を使用してデータをアップロードするかどうかを指定します。

Boolean

いいえ

false

有効な値:

  • true: MaxCompute Stream Tunnel を使用してデータをアップロードします。

  • false: MaxCompute Batch Tunnel を使用してデータをアップロードします。

説明

詳細については、「データトンネルの選択方法」をご参照ください。

flushIntervalMs

MaxCompute Tunnel Writer バッファーのフラッシュ間隔。

Long

いいえ

30000 (30 秒)

データは最初にバッファーに書き込まれます。バッファーがいっぱいになるか、`flushIntervalMs` に達すると、データはバッチで結果テーブルに書き込まれます。

  • Stream Tunnel: データはフラッシュされた直後に表示されます。

  • Batch Tunnel: データはチェックポイントが完了した後にのみ表示されます。`flushIntervalMs` を 0 に設定して定期的なフラッシュを無効にし、レイテンシーを回避することをお勧めします。

単位はミリ秒です。

説明

このパラメーターは `batchSize` と一緒に使用できます。いずれかの条件が満たされるとデータがフラッシュされます。

batchSize

MaxCompute Tunnel Writer バッファーのフラッシュサイズ。

Long

いいえ

67108864 (64 MB)

単位はバイトです。

レコードが書き込まれると、データはまず MaxCompute バッファーに保存されます。バッファーが特定のサイズ (`batchSize`) に達すると、バッファー内のデータは結果の MaxCompute テーブルに書き込まれます。

説明

このパラメーターは `flushIntervalMs` と一緒に使用できます。いずれかの条件が満たされるとデータがフラッシュされます。

numFlushThreads

MaxCompute Tunnel Writer バッファーをフラッシュするためのスレッド数。

Integer

いいえ

1

各同時 MaxCompute シンクは、データをフラッシュするために `numFlushThreads` 個のスレッドを作成します。この値が 1 より大きい場合、異なるパーティションのデータを同時にフラッシュできるため、フラッシュ効率が向上します。

slotNum

MaxCompute Tunnel Writer によって使用されるスロットの数。

Integer

いいえ

0

スロット数の制限については、「Data Transmission Service の概要」をご参照ください。

dynamicPartitionLimit

書き込む動的パーティションの最大数。

Integer

いいえ

100

2 つのチェックポイント間で結果テーブルに書き込まれる動的パーティションの数が `dynamicPartitionLimit` を超えると、Too many dynamic partitions エラーが報告されます。

重要

一度に多数のパーティションに書き込むと、MaxCompute サービスに負荷がかかり、結果テーブルのフラッシュとジョブのチェックポイントが遅くなります。このエラーが発生した場合は、それほど多くのパーティションに書き込む必要があるかどうかを確認してください。必要な場合は、`dynamicPartitionLimit` パラメーターの値を手動で増やしてください。

retryTimes

MaxCompute サーバーへのリクエストの最大リトライ回数。

Integer

いいえ

3

セッションの作成、セッションの送信、またはデータのフラッシュ時に、MaxCompute サービスが一時的に利用できなくなることがあります。システムはこの構成に基づいて操作をリトライします。

sleepMillis

再試行間隔。

Integer

いいえ

1000

単位はミリ秒です。

enableUpsert

MaxCompute Upsert Tunnel を使用してデータをアップロードするかどうかを指定します。

Boolean

いいえ

false

有効な値:

  • true: Upsert Tunnel を使用して Flink の INSERT、UPDATE_AFTER、および DELETE データを処理します。

  • false: useStreamTunnel パラメーターに基づいて Batch Tunnel または Stream Tunnel を使用して Flink の INSERT および UPDATE_AFTER データを処理します。

重要
  • Upsert モードで MaxCompute シンクがコミットする際にエラー、失敗、または長い遅延が発生した場合は、同時シンクノードの数を 10 以下に制限することをお勧めします。

  • このパラメーターは VVR 8.0.6 以降でのみサポートされます。

upsertAsyncCommit

Upsert モードでセッションを送信する際に非同期モードを使用するかどうかを指定します。

Boolean

いいえ

false

有効な値:

  • true: 非同期モードを使用します。コミットにかかる時間は短くなりますが、書き込まれたデータはコミット完了後すぐに読み取り可能にはなりません。

  • false: デフォルトで同期モードを使用します。システムは、コミットする前にサービスがセッションを処理するのを待ちます。

説明

このパラメーターは VVR 8.0.6 以降でのみサポートされます。

upsertCommitTimeoutMs

Upsert モードでセッションを送信するためのタイムアウト期間。

Integer

いいえ

120000

(120 秒)

単位はミリ秒です。

説明

このパラメーターは VVR 8.0.6 以降でのみサポートされます。

sink.operation

Delta Lake テーブルに書き込む際の書き込みモード。

String

いいえ

insert

有効な値:

  • insert: データを追加します。

  • upsert: データを更新します。

説明

このパラメーターは VVR 8.0.10 以降でのみサポートされます。

sink.parallelism

Delta Lake テーブルへの書き込みの並列度。

Integer

いいえ

なし

  • 書き込みの並列度。これが設定されていない場合、デフォルトでアップストリームデータの並列度が使用されます。

  • このパラメーターは VVR 8.0.10 以降でのみサポートされます。

重要

Delta Lake テーブルの `write.bucket.num` プロパティがこの構成値の整数倍であることを確認してください。これにより、最適な書き込みパフォーマンスが提供され、シンクノードで最も多くのメモリが節約されます。

sink.file-cached.enable

Delta Lake テーブルの動的パーティションに書き込む際にファイルキャッシュモードを使用するかどうかを指定します。

Boolean

いいえ

false

有効な値:

  • true: ファイルキャッシュモードを使用します。

  • false: ファイルキャッシュモードを使用しません。

ファイルキャッシュモードを使用すると、サーバーに書き込まれる小さなファイルの数が減りますが、データ出力のレイテンシーが増加します。結果テーブルの並列度が高い場合は、ファイルキャッシュモードを使用することをお勧めします。

説明

このパラメーターは VVR 8.0.10 以降でのみサポートされます。

sink.file-cached.writer.num

ファイルキャッシュモードでデータをアップロードするための同時タスクの数。

Integer

いいえ

16

  • このパラメーターは、sink.file-cached.enable が true に設定されている場合にのみ有効です。

  • この値を大幅に増やすことはお勧めしません。同時に多数のパーティションに書き込むと、メモリ不足 (OOM) エラーが発生しやすくなります。

説明

このパラメーターは VVR 8.0.10 以降でのみサポートされます。

sink.bucket.check-interval

ファイルキャッシュモードでのファイルサイズをチェックする間隔。単位はミリ秒 (ms) です。

Integer

いいえ

60000

このパラメーターは、sink.file-cached.enable が true に設定されている場合にのみ有効です。

説明

このパラメーターは VVR 8.0.10 以降でのみサポートされます。

sink.file-cached.rolling.max-size

ファイルキャッシュモードでの単一キャッシュファイルの最大サイズ。

MemorySize

いいえ

16 M

  • このパラメーターは、sink.file-cached.enable が true に設定されている場合にのみ有効です。

  • ファイルサイズがこの値を超えると、ファイルデータはサーバーにアップロードされます。

説明

このパラメーターは VVR 8.0.10 以降でのみサポートされます。

sink.file-cached.memory

ファイルキャッシュモードでファイルを書き込むための最大オフヒープメモリサイズ。

MemorySize

いいえ

64 MB

このパラメーターは、sink.file-cached.enable が true に設定されている場合にのみ有効です。

説明

このパラメーターは VVR 8.0.10 以降でのみサポートされます。

sink.file-cached.memory.segment-size

ファイルキャッシュモードでファイルを書き込むためのバッファーサイズ。

MemorySize

いいえ

128 KB

このパラメーターは、sink.file-cached.enable が true に設定されている場合にのみ有効です。

説明

このパラメーターは VVR 8.0.10 以降でのみサポートされます。

sink.file-cached.flush.always

ファイルキャッシュモードでファイルを書き込むためにキャッシュを使用するかどうかを指定します。

Boolean

いいえ

true

このパラメーターは、sink.file-cached.enable が true に設定されている場合にのみ有効です。

説明

このパラメーターは VVR 8.0.10 以降でのみサポートされます。

sink.file-cached.write.max-retries

ファイルキャッシュモードでデータをアップロードするためのリトライ回数。

Integer

いいえ

3

このパラメーターは、sink.file-cached.enable が true に設定されている場合にのみ有効です。

説明

このパラメーターは VVR 8.0.10 以降でのみサポートされます。

upsert.writer.max-retries

Upsert Writer がバケットへの書き込みに失敗した後のリトライ回数。

Integer

いいえ

3

説明

このパラメーターは VVR 8.0.10 以降でのみサポートされます。

upsert.writer.buffer-size

Flink の単一の Upsert Writer からのデータのキャッシュサイズ。

MemorySize

いいえ

64 m

  • すべてのバケットの合計バッファーサイズが事前設定されたしきい値に達すると、システムは自動的にリフレッシュ操作をトリガーしてサーバー上のデータを更新します。

  • upsert writer は同時に複数のバケットに書き込みます。書き込み効率を向上させるために、この値を増やすことをお勧めします。

  • 多数のパーティションに書き込む場合、OOM エラーのリスクがあります。このパラメーター値を減らすことを検討できます。

説明

このパラメーターは VVR 8.0.10 以降でのみサポートされます。

upsert.writer.bucket.buffer-size

Flink の単一バケットからのデータのキャッシュサイズ。

MemorySize

いいえ

1 m

クラスターのメモリリソースが逼迫している場合は、このパラメーター値を減らすことができます。

説明

このパラメーターは VVR 8.0.10 以降でのみサポートされます。

upsert.write.bucket.num

書き込み先のテーブルのバケット数。

Integer

はい

なし

値は、宛先 Delta テーブルの write.bucket.num プロパティの値と一致する必要があります。

説明

このパラメーターは VVR 8.0.10 以降でのみサポートされます。

upsert.write.slot-num

単一セッションで使用される Tunnel スロットの数。

Integer

いいえ

1

説明

このパラメーターは VVR 8.0.10 以降でのみサポートされます。

upsert.commit.max-retries

Upsert セッションコミットのリトライ回数。

Integer

いいえ

3

説明

このパラメーターは VVR 8.0.10 以降でのみサポートされます。

upsert.commit.thread-num

Upsert セッションコミットの並列度。

Integer

いいえ

16

このパラメーターを大きな値に設定することはお勧めしません。同時コミット数が多いほどリソースを多く消費し、パフォーマンスの問題や過剰なリソース消費につながる可能性があります。

説明

このパラメーターは VVR 8.0.10 以降でのみサポートされます。

upsert.commit.timeout

Upsert セッションコミットを待機するタイムアウト期間。単位は秒 (s) です。

Integer

いいえ

600

説明

このパラメーターは VVR 8.0.10 以降でのみサポートされます。

upsert.flush.concurrent

単一のパーティションに同時に書き込むことができるバケットの最大数。

Integer

いいえ

2

バケットのデータがフラッシュされるたびに、Tunnel Slot リソースが占有されます。

説明

このパラメーターは VVR 8.0.10 以降でのみサポートされます。

insert.commit.thread-num

コミットセッションの並列度。

Integer

いいえ

16

説明

このパラメーターは VVR 8.0.10 以降でのみサポートされます。

insert.arrow-writer.enable

Arrow フォーマットを使用するかどうかを指定します。

Boolean

いいえ

false

有効な値:

  • true: Arrow モードを使用します。

  • false: Arrow モードを使用しません。

説明

このパラメーターは VVR 8.0.10 以降でのみサポートされます。

insert.arrow-writer.batch-size

Arrow バッチの最大行数。

Integer

いいえ

512

説明

このパラメーターは VVR 8.0.10 以降でのみサポートされます。

insert.arrow-writer.flush-interval

ライターのフラッシュ間隔。単位はミリ秒 (ms) です。

Integer

いいえ

100000

説明

このパラメーターは VVR 8.0.10 以降でのみサポートされます。

insert.writer.buffer-size

バッファードライターのキャッシュサイズ。

MemorySize

いいえ

64 MB

説明

このパラメーターは VVR 8.0.10 以降でのみサポートされます。

upsert.partial-column.enable

一部の列のみを更新するかどうかを指定します。

Boolean

いいえ

false

これは、結果テーブルのタイプが Delta Table の場合にのみ有効です。詳細については、「部分的な列の更新」をご参照ください。

有効な値:

  • true: 一部の列のみを更新します。

  • false: すべての列を更新します。

データは、結果テーブルに更新されたデータのプライマリキーが含まれているかどうかに基づいて書き込まれます:

  • 結果テーブルに同じプライマリキーを持つレコードが存在する場合、そのレコードは指定された列からの非 null 値で更新されます。

  • 結果テーブルに同じプライマリキーを持つ行が含まれていない場合、新しい行が挿入されます。この新しい行には、指定された列の値が含まれ、他のすべての列には null が含まれます。

説明

このパラメーターは VVR 8.0.11 以降でのみサポートされます。

ディメンションテーブル固有のパラメーター

ジョブが開始されると、MaxCompute ディメンションテーブルは指定されたパーティションから完全データをプルします。`partition` パラメーターは `max_pt()` などの関数をサポートします。キャッシュが期限切れになり再読み込みされると、`partition` パラメーターが再解析され、最新のパーティションからデータがプルされます。`max_two_pt()` を使用する場合、ディメンションテーブルは 2 つのパーティションからデータをプルできます。それ以外の場合は、単一のパーティションしか指定できません。

パラメーター

説明

データの型

必須

デフォルト値

備考

cache

キャッシュポリシー。

String

はい

なし

現在、MaxCompute ディメンションテーブルは ALL ポリシーのみをサポートしています。明示的に宣言する必要があります。このポリシーは、リモートテーブルが小さく、ミスキー (JOIN 操作で ON 条件がソーステーブルとディメンションテーブルを関連付けられない) の数が多いシナリオに適しています。

ALL: ディメンションテーブルのすべてのデータをキャッシュします。ジョブが実行される前に、システムはディメンションテーブルからすべてのデータをキャッシュにロードします。ディメンションテーブルへの後続のすべてのクエリは、キャッシュを介して実行されます。キャッシュにデータが見つからない場合、キーは存在しません。完全キャッシュは期限切れになった後に再ロードされます。

説明
  • システムはディメンションテーブルのデータを非同期にロードするため、CACHE ALL を使用する場合は、ディメンションテーブルの JOIN ノードのメモリを増やす必要があります。増加させるメモリサイズは、リモートテーブルのデータ量の少なくとも 4 倍である必要があります。具体的な値は、MaxCompute ストレージの圧縮アルゴリズムによって異なります。

  • ディメンションテーブルに大量のデータが含まれている場合は、SHUFFLE_HASH ヒントを使用してデータを各サブタスクに均等に分散させることができます。詳細については、「ディメンションテーブルに SHUFFLE_HASH ヒントを使用する方法」をご参照ください。

  • 非常に大きな MaxCompute ディメンションテーブルを使用しているときに、頻繁な JVM ガベージコレクションが原因でジョブが異常になり、ディメンションテーブル JOIN ノードのメモリを増やしても問題が解決しない場合は、ApsaraDB for Hbase ディメンションテーブルなど、LRU キャッシュポリシーをサポートするキー値ディメンションテーブルに切り替えることをお勧めします。

cacheSize

キャッシュするデータレコードの最大数。

Long

いいえ

100000

ディメンションテーブルのデータ量が `cacheSize` を超えると、Row count of table <table-name> partition <partition-name> exceeds maxRowCount limit エラーが報告されます。

重要

大量のディメンションテーブルデータは、多くの JVM ヒープメモリを消費し、ジョブの起動とディメンションテーブルの更新を遅くします。これほど多くのデータをキャッシュする必要があるかどうかを確認してください。必要な場合は、このパラメーターの値を手動で増やしてください。

cacheTTLMs

キャッシュのタイムアウト期間。キャッシュ更新の間隔です。

Long

いいえ

Long.MAX_VALUE (更新しない)

単位はミリ秒です。

cacheReloadTimeBlackList

キャッシュ更新禁止時間。このパラメーターで指定された期間中、キャッシュは更新されません。

String

いいえ

なし

このパラメーターは、プロモーションイベントのピーク時など、重要な期間中にキャッシュがリフレッシュされることによってデプロイメントが不安定になるのを防ぐために使用されます。このパラメーターの設定方法の詳細については、「CacheReloadTimeBlackList パラメーターの設定方法」をご参照ください。

maxLoadRetries

ジョブ開始時の初期データプルを含む、キャッシュ更新の最大リトライ回数。この回数を超えると、ジョブは失敗します。

Integer

いいえ

10

なし。

型マッピング

MaxCompute がサポートするデータ型の詳細については、「データ型 2.0」をご参照ください。

MaxCompute 型

Flink 型

BOOLEAN

BOOLEAN

TINYINT

TINYINT

SMALLINT

SMALLINT

INT

INTEGER

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

DECIMAL(precision, scale)

DECIMAL(precision, scale)

CHAR(n)

CHAR(n)

VARCHAR(n)

VARCHAR(n)

STRING

STRING

BINARY

BYTES

DATE

DATE

DATETIME

TIMESTAMP(3)

TIMESTAMP

TIMESTAMP(9)

TIMESTAMP_NTZ

TIMESTAMP(9)

ARRAY

ARRAY

MAP

MAP

STRUCT

ROW

JSON

STRING

重要

物理 MaxCompute テーブルにネストされた複雑な型フィールド (ARRAY、MAP、または STRUCT) と JSON 型フィールドの両方が含まれている場合は、物理 MaxCompute テーブルを作成するときに tblproperties('columnar.nested.type'='true') を指定する必要があります。これにより、Flink がデータを正しく読み書きできるようになります。

データインジェスト

MaxCompute コネクタは、YAML ベースのデータインジェストジョブで、宛先にデータを書き込むために使用できます。

制限事項

この機能は VVR 11.1 以降でのみサポートされます。

構文

source:
  type: xxx

sink:
   type: maxcompute
   name: MaxComputeSink
   access-id: ${your_accessId}
   access-key: ${your_accessKey}
   endpoint: ${your_maxcompute_endpoint}
   project: ${your_project}
   buckets-num: 8

設定項目

設定項目

必須

デフォルト値

タイプ

説明

type

はい

なし

String

使用するコネクタ。これを maxcompute に設定します。

name

いいえ

なし

String

シンクの名前。

access-id

はい

なし

String

Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID。AccessKey 管理 ページで AccessKey ID を取得できます。

access-key

はい

なし

String

AccessKey ID に対応する AccessKey Secret。

endpoint

はい

なし

String

MaxCompute サービスの Endpoint。MaxCompute プロジェクトを作成する際に選択したリージョンとネットワーク接続方法に基づいて、このパラメーターを設定する必要があります。各リージョンとネットワークのエンドポイント値については、「Endpoint」をご参照ください。

project

はい

なし

String

MaxCompute プロジェクトの名前。MaxCompute コンソールにログインし、[ワークスペース] > [プロジェクト] ページでプロジェクト名を確認できます。

tunnel.endpoint

いいえ

なし

String

MaxCompute Tunnel サービスのエンドポイント。これは通常、指定されたプロジェクトのリージョンに基づいて自動的にルーティングされます。プロキシを使用する場合など、特別なネットワーク環境でのみこの設定を使用してください。

quota.name

いいえ

なし

String

MaxCompute Tunnel の専用リソースグループの名前。このパラメーターが設定されていない場合、共有リソースグループが使用されます。詳細については、「専用 Data Transmission Service リソースグループの購入と使用」をご参照ください。

sts-token

いいえ

なし

String

RAM ロールによって発行された短命のセキュリティトークンサービス (STS) トークンを認証に使用する場合、このパラメーターは必須です。

buckets-num

いいえ

16

Integer

MaxCompute Delta テーブルを自動的に作成するために使用されるバケットの数。詳細については、「ほぼリアルタイムのデータウェアハウスの概要」をご参照ください。

compress.algorithm

いいえ

zlib

String

MaxCompute への書き込み時に使用されるデータ圧縮アルゴリズム。サポートされている値は raw (圧縮なし)、zlib、および snappy です。

total.buffer-size

いいえ

64MB

String

メモリにバッファリングするデータの量。これはパーティションレベル、またはパーティション分割されていないテーブルの場合はテーブルレベルで測定されます。異なるパーティションまたはテーブルのバッファーは独立しています。しきい値に達すると、データは MaxCompute に書き込まれます。

bucket.buffer-size

いいえ

4MB

String

メモリにバッファリングするデータの量。バケットレベルで測定されます。このパラメーターは、Delta Lake テーブルに書き込む場合にのみ有効です。異なるデータバケットのバッファーは独立しています。しきい値に達すると、バケット内のデータは MaxCompute に書き込まれます。

commit.thread-num

いいえ

16

Integer

チェックポイントフェーズ中に同時に処理できるパーティションまたはテーブルの数。

flush.concurrent-num

いいえ

4

Integer

MaxCompute にデータを書き込む際に同時に書き込むことができるバケットの数。このパラメーターは、Delta Lake テーブルに書き込む場合にのみ有効です。

テーブルの場所のマッピング

コネクタがテーブルを自動的に作成するとき、次のマッピングを使用してソーステーブルの場所情報を MaxCompute テーブルにマッピングします。

重要

MaxCompute プロジェクトがスキーマモデルをサポートしていない場合、同期タスクごとに 1 つの MySQL データベースしか同期できません。たとえば、アップストリームデータソースが MySQL の場合、コネクタは `tableId.namespace` 情報を無視します。このロジックは他のデータソースにも適用されます。

データインジェストジョブのオブジェクト

MaxCompute の場所

MySQL の場所

設定の Project パラメーター

プロジェクト

なし

TableId.namespace

スキーマ (MaxCompute プロジェクトがスキーマモデルをサポートしていない場合は無視されます)

データベース

TableId.tableName

テーブル

テーブル

型マッピング

CDC 型

MaxCompute 型

CHAR

STRING

VARCHAR

STRING

BOOLEAN

BOOLEAN

BINARY/VARBINARY

BINARY

DECIMAL

DECIMAL

TINYINT

TINYINT

SMALLINT

SMALLINT

INTEGER

INTEGER

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

TIME_WITHOUT_TIME_ZONE

STRING

DATE

DATE

TIMESTAMP_WITHOUT_TIME_ZONE

TIMESTAMP_NTZ

TIMESTAMP_WITH_LOCAL_TIME_ZONE

TIMESTAMP

TIMESTAMP_WITH_TIME_ZONE

TIMESTAMP

ARRAY

ARRAY

MAP

MAP

ROW

STRUCT

使用例

SQL

ソーステーブルの例

完全読み取り

デフォルトでは、ソーステーブルは完全データモードであり、partition パラメーターで指定されたパーティションからデータを読み取ります。

CREATE TEMPORARY TABLE odps_source (
  cid VARCHAR,
  rt DOUBLE
) WITH (
  'connector' = 'odps',
  'endpoint' = '<yourEndpointName>',
  'project' = '<yourProjectName>',
  'tableName' = '<yourTableName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'partition' = 'ds=201809*'
);

CREATE TEMPORARY TABLE blackhole_sink (
  cid VARCHAR,
  invoke_count BIGINT
) WITH (
  'connector' = 'blackhole'
);

INSERT INTO blackhole_sink
SELECT
   cid,
   COUNT(*) AS invoke_count
FROM odps_source GROUP BY cid;
増分読み取り

startPartition パラメーターで指定されたパーティションからデータを増分的に読み取ります。

CREATE TEMPORARY TABLE odps_source (
  cid VARCHAR,
  rt DOUBLE
) WITH (
  'connector' = 'odps',
  'endpoint' = '<yourEndpointName>',
  'project' = '<yourProjectName>',
  'tableName' = '<yourTableName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'startPartition' = 'yyyy=2018,MM=09,dd=05' -- 20180905 に対応するパーティションから読み取りを開始します。
);

CREATE TEMPORARY TABLE blackhole_sink (
  cid VARCHAR,
  invoke_count BIGINT
) WITH (
  'connector' = 'blackhole'
);

INSERT INTO blackhole_sink
SELECT cid, COUNT(*) AS invoke_count
FROM odps_source GROUP BY cid;

結果テーブルの例

静的パーティションへの書き込み

partition パラメーターに静的な値を指定することで、静的パーティションにデータを書き込みます。

CREATE TEMPORARY TABLE datagen_source (
  id INT,
  len INT,
  content VARCHAR
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE odps_sink (
  id INT,
  len INT,
  content VARCHAR
) WITH (
  'connector' = 'odps',
  'endpoint' = '<yourEndpoint>',
  'project' = '<yourProjectName>',
  'tableName' = '<yourTableName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'partition' = 'ds=20180905' -- 静的パーティション ds=20180905 に書き込みます。
);

INSERT INTO odps_sink
SELECT
  id, len, content
FROM datagen_source;
動的パーティションへの書き込み

テーブルのパーティションキー列に基づいて partition を指定します。

CREATE TEMPORARY TABLE datagen_source (
  id INT,
  len INT,
  content VARCHAR,
  c TIMESTAMP
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE odps_sink (
  id  INT,
  len INT,
  content VARCHAR,
  ds VARCHAR -- 動的パーティション列は明示的に宣言する必要があります。
) WITH (
  'connector' = 'odps',
  'endpoint' = '<yourEndpoint>',
  'project' = '<yourProjectName>',
  'tableName' = '<yourTableName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'partition' = 'ds' -- パーティション値を指定しません。これは、ds フィールドの値に基づいて異なるパーティションにデータが書き込まれることを示します。
);

INSERT INTO odps_sink
SELECT
   id,
   len,
   content,
   DATE_FORMAT(c, 'yyMMdd') as ds
FROM datagen_source;

ディメンションテーブルの例

1 対 1 ディメンションテーブル

1 対 1 ディメンションテーブルにはプライマリキーを宣言する必要があります。

CREATE TEMPORARY TABLE datagen_source (
  k INT,
  v VARCHAR
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE odps_dim (
  k INT,
  v VARCHAR,
  PRIMARY KEY (k) NOT ENFORCED  -- 1 対 1 ディメンションテーブルにはプライマリキーを宣言する必要があります。
) WITH (
  'connector' = 'odps',
  'endpoint' = '<yourEndpoint>',
  'project' = '<yourProjectName>',
  'tableName' = '<yourTableName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'partition' = 'ds=20180905',
  'cache' = 'ALL'
);

CREATE TEMPORARY TABLE blackhole_sink (
  k VARCHAR,
  v1 VARCHAR,
  v2 VARCHAR
) WITH (
  'connector' = 'blackhole'
);

INSERT INTO blackhole_sink
SELECT k, s.v, d.v
FROM datagen_source AS s
INNER JOIN odps_dim FOR SYSTEM_TIME AS OF PROCTIME() AS d ON s.k = d.k;
1 対多ディメンションテーブル

1 対多ディメンションテーブルにはプライマリキーは必要ありません。

CREATE TEMPORARY TABLE datagen_source (
  k INT,
  v VARCHAR
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE odps_dim (
  k INT,
  v VARCHAR
  -- 1 対多ディメンションテーブルにはプライマリキーは必要ありません。
) WITH (
  'connector' = 'odps',
  'endpoint' = '<yourEndpoint>',
  'project' = '<yourProjectName>',
  'tableName' = '<yourTableName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'partition' = 'ds=20180905',
  'cache' = 'ALL'
);

CREATE TEMPORARY TABLE blackhole_sink (
  k VARCHAR,
  v1 VARCHAR,
  v2 VARCHAR
) WITH (
  'connector' = 'blackhole'
);

INSERT INTO blackhole_sink
SELECT k, s.v, d.v
FROM datagen_source AS s
INNER JOIN odps_dim FOR SYSTEM_TIME AS OF PROCTIME() AS d ON s.k = d.k;

DataStream

重要
  • DataStream API を使用してデータを読み書きするには、DataStream コネクタを使用する必要があります。DataStream コネクタの設定方法の詳細については、「DataStream コネクタの使用方法」をご参照ください。

  • 知的財産を保護するため、Ververica Runtime (VVR) 6.0.6 以降、このコネクタを使用するジョブは、ローカルデバッグセッション中に最大 30 分間実行できます。30 分後、ジョブはエラーを報告して終了します。MaxCompute コネクタを含むジョブをローカルで実行およびデバッグする方法の詳細については、「コネクタを含むジョブをローカルで実行およびデバッグする」をご参照ください。

  • Delta Tables からの読み取りはサポートされていません。Delta Tables は、primary keytransactional=true を指定して作成されたテーブルです。詳細については、「用語」をご参照ください。

DataStream ジョブで MaxCompute コネクタを使用する場合、まず SQL を使用して MaxCompute テーブルを宣言する必要があります。その後、Table と DataStream の間で変換することで、MaxCompute テーブルとデータストリームを接続できます。

ソーステーブルへの接続

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql(String.join(
    "\n",
    "CREATE TEMPORARY TABLE IF NOT EXISTS odps_source (",
    "  cid VARCHAR,",
    "  rt DOUBLE",
    ") WITH (",
    "  'connector' = 'odps',",
    "  'endpoint' = '<yourEndpointName>',",
    "  'project' = '<yourProjectName>',",
    "  'tableName' = '<yourTableName>',",
    "  'accessId' = '<yourAccessId>',",
    "  'accessKey' = '<yourAccessPassword>',",
    "  'partition' = 'ds=201809*'",
    ")");
DataStream<Row> source = tEnv.toDataStream(tEnv.from("odps_source"));
source.print();
env.execute("odps source"); 

結果テーブルへの接続

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql(String.join(
    "\n",
    "CREATE TEMPORARY TABLE IF NOT EXISTS odps_sink (",
    "  cid VARCHAR,",
    "  rt DOUBLE",
    ") WITH (",
    "  'connector' = 'odps',",
    "  'endpoint' = '<yourEndpointName>',",
    "  'project' = '<yourProjectName>',",
    "  'tableName' = '<yourTableName>',",
    "  'accessId' = '<yourAccessId>',",
    "  'accessKey' = '<yourAccessPassword>',",
    "  'partition' = 'ds=20180905'",
    ")");
DataStream<Row> data = env.fromElements(
    Row.of("id0", 3.),
    Row.of("id1", 4.));
tEnv.fromDataStream(data).insertInto("odps_sink").execute();

XML

MaxCompute コネクタの Maven 依存関係には、完全および増分ソーステーブル、結果テーブル、およびディメンションテーブルを構築するために必要なクラスが含まれています。MaxCompute DataStream コネクタは Maven Central Repository で入手できます。

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-odps</artifactId>
    <version>${vvr-version}</version>
</dependency>

よくある質問