すべてのプロダクト
Search
ドキュメントセンター

AnalyticDB:Flink を使用したバイナリログのサブスクライブ

最終更新日:May 20, 2026

Realtime Compute for Apache Flink は、AnalyticDB for MySQL を含む AnalyticDB for MySQL クラスターをサブスクライブし、データベースの変更をリアルタイムでキャプチャして処理します。これにより、効率的なデータ同期とストリームコンピューティングが可能になります。

前提条件

制限事項

  • Flink は、AnalyticDB for MySQL のバイナリログからは、基本データ型JSON 複合データ型しか処理できません。

  • Flink は、AnalyticDB for MySQL のバイナリログから、パーティションテーブルの DDL 操作と自動パーティション削除レコードを無視します。

ステップ1:バイナリロギングの有効化

  1. ソース AnalyticDB for MySQL クラスターのテーブルでバイナリログ機能を有効にします。この例では、`source_table` という名前のテーブルを使用します。

    説明

    バイナリログ機能は、AnalyticDB for MySQL のテーブルでのみ有効にできます。

    テーブル作成時にバイナリログ機能を有効にする

    CREATE TABLE source_table (
      `id` INT,
      `num` BIGINT,
      PRIMARY KEY (`id`)
    )DISTRIBUTED BY HASH (id) BINLOG=true;

    既存のテーブルでバイナリログ機能を有効にする

    ALTER TABLE source_table BINLOG=true;
  2. (オプション) バイナリログの保持期間を変更します。

    binlog_ttl パラメーターを変更して、バイナリログの保持期間を変更できます。パラメーターのデフォルト値は 6h です。次の文を実行して、`source_table` テーブルのバイナリログの保持期間を 1 日に変更します。

    ALTER TABLE source_table binlog_ttl='1d';

    binlog_ttl パラメーターは、次のフォーマットの値をサポートします。

    • ミリ秒: 純粋な数値。たとえば、60 は 60 ミリ秒を指定します。

    • 秒: 数値 + s。たとえば、30s は 30 秒を指定します。

    • 時間: 数値 + h。たとえば、2h は 2 時間を指定します。

    • 日: 数値 + d。たとえば、1d は 1 日を指定します。

    説明
    • カーネルバージョンが 3.2.1.9 以降 (バージョン 3.2.1 の場合)、3.2.2.14 以降 (バージョン 3.2.2 の場合)、3.2.3.8 以降 (バージョン 3.2.3 の場合)、3.2.4.4 以降 (バージョン 3.2.4 の場合)、または 3.2.5.1 以降 (バージョン 3.2.5 の場合) のクラスターでは、バイナリログの最大保持期間は 365 日です。前述のバージョンより前のカーネルバージョンを持つクラスターの場合、バイナリログの最大保持期間は 21 日です。

    • バイナリログの保持期間は、binlog_ttl パラメーターのデフォルト値以上に設定することをお勧めします。保持期間を小さい値に設定すると、バイナリログが削除され、データ同期が失敗する可能性があります。

    • バイナリログの現在の保持期間をクエリするには、SHOW CREATE TABLE source_table; 文を実行します。

ステップ 2: AnalyticDB for MySQL コネクターのアップロード

  1. コネクタの JAR ファイルをダウンロードします。flink-sql-connector-adb-mysql-cdc-2.4-20260420.jar

  2. Realtime Compute for Apache Flink コンソールにログインします。

  3. [Flink] タブで、対象のワークスペースを見つけ、操作 列の [Console] をクリックします。

  4. 左側のナビゲーションペインで、[Connectors] をクリックします。

  5. [Connectors] ページで、[Create Custom Connector] をクリックします。

  6. ダウンロードしたコネクタをアップロードし、次へ をクリックします。

  7. 完了 をクリックします。作成されたカスタムコネクタがコネクタのリストに表示されます。

ステップ3:バイナリログのサブスクライブ

  1. Realtime Compute for Apache Flink コンソールにログインし、SQL ジョブを作成します。

  2. AnalyticDB for MySQL に接続し、特定のテーブル (source_table) からバイナリログデータを読み取るソーステーブルを作成します。

    説明
    • Flink DDL で定義されたプライマリキーは、キー列とプライマリキー名の両方において、AnalyticDB for MySQL クラスターの物理テーブルにあるプライマリキーと一致する必要があります。そうでない場合、データが不正確になる可能性があります。

    • Flink のデータ型は、AnalyticDB for MySQL のデータ型と互換性がある必要があります。データ型のマッピングについては、「型マッピング」をご参照ください。

    CREATE TEMPORARY TABLE adb_source (
      `id` INT,
      `num` BIGINT,
      PRIMARY KEY (`id`) NOT ENFORCED
    ) WITH (
      'connector' = 'adb-mysql-cdc',
      'hostname' = 'amv-2zepb9n1l58ct01z50000****.ads.aliyuncs.com',
      'username' = 'testUser',
      'password' = 'Test12****',
      'database-name' = 'binlog',
      'table-name' = 'source_table'
    );

    次の表に、WITH パラメーターを示します。

    パラメーター

    必須

    デフォルト

    説明

    connector

    はい

    なし

    STRING

    使用するコネクタ。

    これはカスタムコネクタです。このパラメーターを adb-mysql-cdc に設定します。

    hostname

    はい

    なし

    STRING

    AnalyticDB for MySQL クラスターのエンドポイント。

    username

    はい

    なし

    STRING

    AnalyticDB for MySQL クラスターのデータベースアカウント。

    password

    はい

    なし

    STRING

    AnalyticDB for MySQL データベースアカウントのパスワード。

    database-name

    はい

    なし

    STRING

    AnalyticDB for MySQL データベースの名前。

    AnalyticDB for MySQL はテーブルレベルのバイナリロギングを実装しているため、データベースは 1 つしか指定できません。

    table-name

    はい

    なし

    STRING

    AnalyticDB for MySQL データベースのテーブル名。

    AnalyticDB for MySQL はテーブルレベルのバイナリロギングを実装しているため、指定できるテーブルは 1 つだけです。

    port

    いいえ

    3306

    INTEGER

    ポート番号。

    scan.incremental.snapshot.enabled

    いいえ

    true

    BOOLEAN

    増分スナップショット読み取りメカニズムを有効にするかどうかを指定します。

    この機能はデフォルトで有効になっています。増分スナップショットは、テーブルスナップショットを読み取るための新しいメカニズムです。以前のスナップショットメカニズムと比較して、増分スナップショットには次の利点があります。

    • ソースはスナップショット読み取り中に同時読み取りをサポートします。

    • ソースはスナップショット読み取り中にチャンクレベルのチェックポイントをサポートします。

    • ソースはスナップショットを読み取る前にデータベースのロック権限を取得する必要がありません。

    scan.incremental.snapshot.chunk.size

    いいえ

    8096

    INTEGER

    テーブルスナップショットのチャンクあたりの行数。

    scan.snapshot.fetch.size

    いいえ

    1024

    INTEGER

    テーブルスナップショットの読み取り時に一度にフェッチする最大行数。

    scan.startup.mode

    いいえ

    initial

    STRING

    データ消費の起動モード。

    有効な値:

    • initial (デフォルト):テーブルの初期スナップショットを実行し、最新のバイナリログを読み取ります。

    • earliest-offset:スナップショットフェーズをスキップし、利用可能な最も古いバイナリログから読み取りを開始します。

    • specific-offset: スナップショットフェーズをスキップし、指定されたバイナリログの位置から開始します。scan.startup.specific-offset.filescan.startup.specific-offset.pos の両方のパラメーターを設定して、バイナリログのファイル名とオフセットを指定します。

    • latest-offset:スナップショットフェーズをスキップし、コネクタの起動後に発生した変更のみを読み取ります。

    • タイムスタンプ: スナップショットフェーズをスキップし、指定されたタイムスタンプから読み取りを開始します。scan.startup.timestamp-millis パラメーターを使用して、タイムスタンプをミリ秒 (ms) 単位で設定します。

    重要

    earliest-offset、specific-offset、または timestamp 起動モードを使用する場合、指定された開始位置とジョブの起動の間でテーブルスキーマが変更されないようにしてください。変更されると、スキーマの変更が原因でジョブが失敗する可能性があります。

    scan.startup.specific-offset.file

    いいえ

    なし

    STRING

    specific-offset 起動モードでの、開始位置のバイナリログファイル名。

    最新のバイナリログファイル名を取得するには、SHOW MASTER STATUS for table_name; ステートメントを実行します。

    scan.startup.specific-offset.pos

    いいえ

    なし

    LONG

    specific-offset 起動モードでの、開始位置のバイナリログファイル内の位置。

    最新のバイナリログの位置を取得するには、SHOW MASTER STATUS for table_name; ステートメントを実行します。

    scan.startup.specific-offset.skip-events

    いいえ

    なし

    LONG

    指定された開始位置の後にスキップするイベントの数。

    scan.startup.specific-offset.skip-rows

    いいえ

    なし

    LONG

    指定された開始位置の後にスキップするデータ行の数。

    scan.startup.timestamp-millis

    いいえ

    なし

    LONG

    timestamp 起動モードを使用する場合の、開始位置のミリ秒単位のタイムスタンプ。

    このパラメーターを使用する場合、scan.startup.modetimestamp に設定する必要があります。タイムスタンプはミリ秒 (ms) です。

    server-time-zone

    いいえ

    システムデフォルト

    STRING

    データベースサーバーのセッションタイムゾーン。 例:「Asia/Shanghai」。

    これは、AnalyticDB for MySQLAnalyticDB for MySQLSTRING データ型に変換される方法を制御します。このパラメーターが設定されていない場合、ZONELD.SYSTEMDEFAULT() を使用してサーバーのタイムゾーンが決定されます。

    debezium.min.row.count.to.stream.result

    いいえ

    1000

    INTEGER

    テーブルの行数がこの値より大きい場合、コネクタは結果をストリーミングします。

    このパラメーターを 0 に設定すると、すべてのテーブルサイズのチェックがスキップされ、スナップショットフェーズ中にすべての結果がストリーミングされます。

    connect.timeout

    いいえ

    30s

    DURATION

    データベース接続の試行がタイムアウトするまでの最大待機時間。

    デフォルトの単位は秒 (s) です。

    connect.max-retries

    いいえ

    3

    INTEGER

    データベース接続の失敗後の最大再試行回数。

  3. 宛先データベースに、処理済みのデータを格納するための物理テーブルを作成します。 このトピックでは、宛先として AnalyticDB for MySQL を使用します。 Flink でサポートされているコネクターについては、「サポートされているコネクター」をご参照ください。

    CREATE TABLE target_table (
      `id` INT,
      `num` BIGINT,
      PRIMARY KEY (`id`)
    )
  4. 前の手順で作成したテーブルに接続するシンクテーブルを作成します。 シンクテーブルは、処理済みのデータを AnalyticDB for MySQL の指定されたテーブルに書き込みます。

    CREATE TEMPORARY TABLE adb_sink (
      `id` INT,
      `num` BIGINT,
      PRIMARY KEY (`id`) NOT ENFORCED
    ) WITH (
      'connector' = 'adb3.0',
      'url' = 'jdbc:mysql://amv-2zepb9n1l58ct01z50000****.ads.aliyuncs.com:3306/flinktest',
      'userName' = 'testUser',
      'password' = 'Test12****',
      'tableName' = 'target_table'
    );

    シンクテーブルの WITH パラメーターと型マッピングの詳細については、「AnalyticDB for MySQL V3.0 コネクタ」をご参照ください。

  5. INSERT INTO ステートメントを使用して、ソーステーブルからシンクテーブルにデータを送信します。

    INSERT INTO adb_sink
    SELECT * FROM adb_source;
  6. 保存 をクリックします。

  7. [Validate] をクリックします。

    検証機能は、SQL のセマンティクス、ネットワーク接続性、およびジョブで使用されるテーブルのメタデータをチェックします。結果エリアで [SQL Advice] をクリックして、SQL のリスクプロンプトや最適化の提案を表示することもできます。

  8. (オプション) Debug をクリックします。

    ジョブデバッグ機能を使用して、ジョブの実行をシミュレートし、出力結果を確認し、SELECT または INSERT ステートメントのビジネスロジックを検証し、開発効率を向上させ、データ品質のリスクを低減できます。

  9. Deploy をクリックします。

    ジョブを開発および検証した後、本番環境にデプロイします。その後、[O&M] ページに移動し、ジョブを開始します

  10. (オプション) バイナリログに関する情報を表示します。

    説明

    次の SQL 文を実行してバイナリログに関する情報をクエリした後、バイナリログ機能を有効にしただけの場合は 0 が返されます。ログ情報は、バイナリログをサブスクライブした後にのみ表示されます。

    • 最新のバイナリログのファイル名と場所をクエリするには、次の文を実行します。

      SHOW MASTER STATUS FOR source_table;
    • クリアされていないすべての履歴バイナリログとそのサイズをクエリするには、次の文を実行します。

      SHOW BINARY LOGS FOR source_table;

型マッピング

次の表では、AnalyticDB for MySQL のデータ型と、それに対応する Flink のデータ型との対応を示します。

AnalyticDB for MySQL タイプ

Flink タイプ

BOOLEAN

BOOLEAN

TINYINT

TINYINT

SMALLINT

SMALLINT

INT

INT

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

DECIMAL(p, s) or NUMERIC(p, s)

DECIMAL(p, s)

VARCHAR

STRING

BINARY

BYTES

DATE

DATE

TIME

TIME

DATETIME

TIMESTAMP

TIMESTAMP

TIMESTAMP

POINT

STRING

JSON

STRING