すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:AnalyticDB for MySQL (ADB) 3.0 コネクタ

最終更新日:Nov 09, 2025

このトピックでは、AnalyticDB for MySQL 3.0 コネクタの使用方法について説明します。

背景情報

AnalyticDB for MySQL 3.0 は、データベースとビッグデータ技術を統合した、クラウドネイティブのエンタープライズクラスのデータウェアハウジングサービスです。高スループット、リアルタイムのデータ書き込み、更新、削除、低レイテンシーのリアルタイムデータ分析、および複雑な抽出、変換、ロード (ETL) 操作をサポートします。AnalyticDB for MySQL は、アップストリームおよびダウンストリームのエコシステムツールと互換性があり、エンタープライズクラスのレポートシステム、データウェアハウス、およびデータサービスエンジンを構築するために使用できます。

次の表に、AnalyticDB for MySQL 3.0 コネクタがサポートする機能を示します。

項目

説明

サポートされているタイプ

ソーステーブル、ディメンションテーブル、および結果テーブル

説明

Ververica Runtime (VVR) 8.0.4 以降を使用する Realtime Compute for Apache Flink のみがソーステーブルをサポートします。ソーステーブルのパラメーターと構成の詳細については、「Flink を使用してバイナリログをサブスクライブする」をご参照ください。ディメンションテーブルと結果テーブルのパラメーターの詳細については、「WITH 句のパラメーター」をご参照ください。

実行モード

ストリーミングモードとバッチモード

データフォーマット

該当なし

固有のメトリック

該当なし

API タイプ

SQL

結果テーブルでのデータの更新または削除

はい

前提条件

構文

CREATE TEMPORARY TABLE adb_table (
  `id` INT,
  `num` BIGINT,
  PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
  'connector' = 'adb3.0',
  'url' = '<yourUrl>',
  'userName' = '<yourUsername>',
  'password' = '<yourPassword>',
  'tableName' = '<yourTablename>'
);
重要

Flink データ定義言語 (DDL) で定義されたプライマリキーは、列名を含め、AnalyticDB for MySQL データベースの物理テーブルのプライマリキーと一致する必要があります。不一致があると、データの正確性に影響を与える可能性があります。

WITH パラメーター

  • 一般パラメーター

    パラメーター

    説明

    データの型

    必須

    デフォルト値

    備考

    connector

    結果テーブルのタイプ。

    String

    はい

    デフォルト値なし

    値を adb3.0 に設定します。

    url

    データベースの Java Database Connectivity (JDBC) URL。

    String

    はい

    デフォルト値なし

    AnalyticDB for MySQL データベースの JDBC 接続 URL。URL は jdbc:mysql://<endpoint>:<port>/<databaseName> フォーマットです。

    • endpoint と port: [AnalyticDB for MySQL コンソール] にログインします。対応するクラスターの名前をクリックします。[クラスター情報] ページで、[ネットワーク情報] セクションからエンドポイントとポートを取得します。

    • databaseName: AnalyticDB for MySQL データベースの名前。

    userName

    データベースにアクセスするためのユーザー名。

    String

    はい

    デフォルト値なし

    該当なし。

    password

    データベースのパスワード。

    String

    はい

    デフォルト値なし

    該当なし。

    tableName

    データベース内のテーブルの名前。

    String

    はい

    デフォルト値なし

    該当なし。

    maxRetryTimes

    データの書き込みまたは読み取りが失敗した場合の最大再試行回数。

    Integer

    いいえ

    10

    該当なし。

  • 結果テーブル固有のパラメーター

    パラメーター

    注意

    データの型

    必須

    デフォルト値

    備考

    batchSize

    バッチ書き込みあたりのレコード数。

    Integer

    いいえ

    1000

    このパラメーターは、プライマリキーを指定した後にのみ有効になります。

    bufferSize

    メモリにキャッシュするデータレコードの最大数。書き込みは、batchSize または bufferSize のいずれかのしきい値に達したときにトリガーされます。

    Integer

    いいえ

    1000

    このパラメーターは、プライマリキーを指定した後にのみ有効になります。

    flushIntervalMs

    キャッシュがクリアされる間隔。この期間が経過してもキャッシュ内のデータが出力条件を満たさない場合、システムはキャッシュされたすべてのデータを自動的に出力します。

    Integer

    いいえ

    3000

    単位: ミリ秒。

    ignoreDelete

    削除操作を無視するかどうかを指定します。

    Boolean

    いいえ

    false

    有効な値:

    • true: 削除操作を無視します。

    • false: 削除操作を受け入れます。

    replaceMode

    DDL 文でプライマリキーが定義されている場合にデータを挿入するパターン。

    • VVR 11.2 以降の場合、タイプは文字列です。

    • 11.2 より前の VVR の場合、タイプはブール値です。

    いいえ

    • Ververica Runtime (VVR) 11.2 以降の場合、デフォルト値は replace です。

    • 11.2 より前の VVR の場合、デフォルト値は true です。

    Ververica Runtime (VVR) 11.2 以降では、次の値を使用できます:

    • replace: replace into 構文を使用してデータを書き込みます。プライマリキーが重複している場合、新しいデータ行が既存の行を置き換えます。

    • upsert: insert into on duplicate key update 構文を使用してデータを書き込みます。この構文は、プライマリキーが存在しない場合は新しい行を挿入し、プライマリキーが存在する場合は既存の行を更新します。たとえば、AnalyticDB for MySQL テーブルに a (プライマリキー)、b、c、d の 4 つのフィールドがあり、結果テーブルが a と b のフィールドのデータのみを提供する場合、重複したプライマリキーが見つかったときに更新されるのは b フィールドのみです。c と d フィールドは変更されません。

    • insert: insert ignore into 構文を使用してデータを書き込みます。プライマリキーが重複している場合、最初のデータ入力が保持され、後続の入力は無視されます。

    VVR 11.2 より前のバージョンでは、ブール値のみがサポートされます:

    • true: replace 値と同じです。

    • false: upsert 値と同じです。

    注意: VVR 11.2 以降は、以前のバージョンの true および false 値と互換性があります。

    説明
    • このパラメーターは、AnalyticDB for MySQL 3.1.3.5 以降でのみサポートされます。

    • このパラメーターは、結果テーブルの DDL でプライマリキーが定義されている場合にのみ有効になります。結果テーブルの DDL でプライマリキーが定義されていない場合、データは常に insert ignore into 構文を使用して挿入されます。

    excludeUpdateColumns

    同じプライマリキーを持つデータを更新するときにスキップするフィールド。

    String

    いいえ

    空の文字列

    更新で複数のフィールドを無視するには、フィールド名をコンマ (,) で区切ります。例: excludeUpdateColumns='column1,column2'

    たとえば、結果テーブルに a (プライマリキー)、b、c、d の 4 つのフィールドがあるとします。`excludeUpdateColumns` を `'c,d'` に設定します。プライマリキーが一意の場合、システムは a、b、c、d の 4 つのフィールドすべての値を挿入します。プライマリキーが重複している場合、システムは b フィールドのみを更新します。c と d フィールドの値は変更されません。

    説明
    • このパラメーターは、`replaceMode` が `'upsert'` または `'false'` に設定されている場合にのみ有効になります。

    • 無視するフィールドの名前を 1 行に記述します。改行は使用しないでください。

    connectionMaxActive

    スレッドプールの最大サイズ。

    Integer

    いいえ

    40

    該当なし。

  • ディメンションテーブルのみ

    パラメーター

    説明

    データの型

    必須

    デフォルト値

    注意

    cache

    キャッシュポリシー。

    String

    いいえ

    ALL

    AnalyticDB for MySQL 3.0 ディメンションテーブルは、次の 3 つのキャッシュポリシーをサポートしています:

    • None: キャッシュなし。

    • LRU: ディメンションテーブルのデータの一部をキャッシュします。ソーステーブルの各レコードについて、システムはまずキャッシュを検索します。レコードが見つからない場合、システムは物理ディメンションテーブルを検索します。

    • ALL (デフォルト): ディメンションテーブルのすべてのデータをキャッシュします。ジョブが実行される前に、システムはディメンションテーブルからすべてのデータをキャッシュにロードします。その後、ディメンションテーブルのすべてのルックアップはキャッシュを使用して実行されます。キーがキャッシュに見つからない場合、そのキーは存在しないと見なされます。キャッシュの有効期限が切れると、システムはキャッシュ全体を再読み込みします。

    これは、リモートテーブルが小さく、ソーステーブルとディメンションテーブルの間の結合で多くのキーミスが発生するシナリオに適しています。

    説明
    • キャッシュポリシーを ALL に設定した場合は、ノードのメモリサイズを監視して、メモリ不足 (OOM) エラーを防ぎます。

    • システムはディメンションテーブルのデータを非同期にロードするため、ALL キャッシュポリシーを使用する場合は、ディメンションテーブル結合ノードのメモリを増やしてください。メモリサイズは、リモートテーブルのデータサイズの 2 倍である必要があります。

    cacheSize

    キャッシュできるデータレコードの最大数。

    Integer

    いいえ

    100000

    cacheSize パラメーターは LRU キャッシュに関連しています。キャッシュが LRU に設定されている場合は、cacheSize パラメーターを設定する必要があります。

    cacheTTLMs

    キャッシュのタイムアウト (ミリ秒単位)。

    Integer

    いいえ

    Long.MAX_VALUE

    cacheTTLMs パラメーターは、cache パラメーターが LRU または ALL に設定されている場合にのみ適用されます。

    • cache が LRU に設定されている場合、cacheTTLMs はキャッシュのタイムアウト期間を指定します。デフォルト値は Long.MAX_VALUE で、キャッシュエントリが期限切れにならないことを意味します。

    • cache が ALL に設定されている場合、cacheTTLMs は物理テーブルからデータを再読み込みする間隔を指定します。デフォルト値は Long.MAX_VALUE で、データが再読み込みされないことを意味します。

    説明

    cache が None に設定されている場合は、cacheTTLMs を設定しないでください。この設定はキャッシュを無効にするため、キャッシュのタイムアウトは必要ありません。

    maxJoinRows

    プライマリテーブルの各レコードに対するディメンションテーブルからの最大一致数。

    Integer

    いいえ

    1024

    プライマリテーブルのレコードがディメンションテーブルの最大 n 個のレコードに一致する場合、効率的なリアルタイムコンピューティングのために maxJoinRows='n' を設定します。

    説明

    結合中、このパラメーターは、プライマリテーブルの各レコードに対してディメンションテーブルが返すことができる一致レコードの数を制限します。

型マッピング

AnalyticDB for MySQL 3.0 データの型

Flink データの型

BOOLEAN

BOOLEAN

TINYINT

TINYINT

SMALLINT

SMALLINT

INT

INT

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

DECIMAL(p, s) or NUMERIC(p, s)

DECIMAL(p, s)

VARCHAR

STRING

BINARY

BYTES

DATE

DATE

TIME

TIME

DATETIME

TIMESTAMP

TIMESTAMP

TIMESTAMP

POINT

STRING

  • シンクテーブル

    CREATE TEMPORARY TABLE datagen_source (
      `name` VARCHAR,
      `age` INT
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE adb_sink (
      `name` VARCHAR,
      `age` INT
    ) WITH (
      'connector' = 'adb3.0',
      'url' = '<yourUrl>',
      'userName' = '<yourUsername>',
      'password' = '<yourPassword>',
      'tableName' = '<yourTablename>'
    );
    
    INSERT INTO adb_sink
    SELECT * FROM datagen_source;
  • ディメンションテーブル

    CREATE TEMPORARY TABLE datagen_source(
      `a` INT,
      `b` VARCHAR,
      `c` STRING,
      `proctime` AS PROCTIME()
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE adb_dim (
      `a` INT,
      `b` VARCHAR,
      `c` VARCHAR
    ) WITH (
      'connector' = 'adb3.0',
      'url' = '<yourUrl>',
      'userName' = '<yourUsername>',
      'password' = '<yourPassword>',
      'tableName' = '<yourTablename>'
    );
    p
    CREATE TEMPORARY TABLE blackhole_sink(
      `a` INT,
      `b` VARCHAR
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink SELECT T.a,H.b
    FROM datagen_source AS T JOIN adb_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;

関連ドキュメント

「multi-statement be found.」エラーのトラブルシューティング