すべてのプロダクト
Search
ドキュメントセンター

AnalyticDB:Flink を使用してバイナリログをサブスクライブする

最終更新日:Nov 19, 2025

Realtime Compute for Apache Flink は、AnalyticDB for MySQL をサブスクライブして、データベースの変更データをリアルタイムでキャプチャおよび処理できます。これにより、効率的なデータ同期とストリームコンピューティングが可能になります。このトピックでは、 Flink を使用して AnalyticDB for MySQL のバイナリログをサブスクライブする方法について説明します。

前提条件

制限事項

  • XUANWU_V2 テーブルではバイナリログ機能を有効にできません。したがって、バイナリログのサブスクリプションを使用して、AnalyticDB for MySQL クラスター内の XUANWU_V2 テーブルでデータ同期やストリームコンピューティングを実行することはできません。

  • Flink は、AnalyticDB for MySQL のバイナリログを、基本データ型複雑な JSON データ型に対してのみ処理できます。

  • Flink は、パーティションテーブルに対する DDL 操作または自動パーティション削除操作に関連する AnalyticDB for MySQL バイナリログ内のレコードを処理しません。

ステップ 1: バイナリログ機能を有効にする

  1. ソース AnalyticDB for MySQL クラスターのテーブルでバイナリログ機能を有効にします。この例では、source_table という名前のテーブルを使用します。

    説明

    AnalyticDB for MySQL のテーブルでのみバイナリログ機能を有効にできます。

    テーブル作成時にバイナリログ機能を有効にする

    CREATE TABLE source_table (
      `id` INT,
      `num` BIGINT,
      PRIMARY KEY (`id`)
    )DISTRIBUTED BY HASH (id) BINLOG=true;

    既存のテーブルでバイナリログ機能を有効にする

    ALTER TABLE source_table BINLOG=true;
  2. (オプション) バイナリログの保存期間を変更します。

    binlog_ttl パラメーターを変更して、バイナリログの保存期間を変更できます。パラメーターのデフォルト値は 6h です。次のステートメントを実行して、source_table テーブルのバイナリログの保存期間を 1 日に変更します。

    ALTER TABLE source_table binlog_ttl='1d';

    binlog_ttl パラメーターは、次のフォーマットの値をサポートしています。

    • ミリ秒: 純粋な数値。たとえば、60 は 60 ミリ秒を指定します。

    • 秒: 数値 + s。たとえば、30s は 30 秒を指定します。

    • 時間: 数値 + h。たとえば、2h は 2 時間を指定します。

    • 日: 数値 + d。たとえば、1d は 1 日を指定します。

    説明
    • カーネルバージョンが 3.2.1.9 以降 (バージョン 3.2.1 の場合)、3.2.2.14 以降 (バージョン 3.2.2 の場合)、3.2.3.8 以降 (バージョン 3.2.3 の場合)、3.2.4.4 以降 (バージョン 3.2.4 の場合)、または 3.2.5.1 以降 (バージョン 3.2.5 の場合) のクラスターでは、バイナリログの最大保存期間は 365 日です。前述のバージョンより前のカーネルバージョンのクラスターでは、バイナリログの最大保存期間は 21 日です。

    • binlog_ttl パラメーターのデフォルト値以上の値にバイナリログの保存期間を設定することをお勧めします。保存期間を小さい値に設定すると、バイナリログが削除され、データ同期が失敗する可能性があります。

    • 現在のバイナリログの保存期間をクエリするには、SHOW CREATE TABLE source_table; ステートメントを実行します。

ステップ 2: AnalyticDB for MySQL コネクタを Flink にアップロードする

  1. コネクタをダウンロードします。

  2. Realtime Compute for Apache Flink コンソールにログインします。

  3. [フルマネージド Flink] タブで、管理するワークスペースを見つけ、[アクション] 列の [コンソール] をクリックします。

  4. 左側のナビゲーションウィンドウで、[コネクタ] をクリックします。

  5. [コネクタ] ページで、[カスタムコネクタの作成] をクリックします。

  6. ダウンロードしたコネクタをアップロードし、[次へ] をクリックします。

  7. [完了] をクリックします。カスタムコネクタがコネクタリストに表示されます。

ステップ 3: バイナリログをサブスクライブする

  1. Realtime Compute for Apache Flink コンソールにログインし、SQL ジョブを作成します

  2. AnalyticDB for MySQL に接続し、指定されたテーブル (source_table) からバイナリログデータを読み取るためのソーステーブルを作成します。

    説明
    • Flink DDL ステートメントで定義されたプライマリキーは、キー名を含め、AnalyticDB for MySQL クラスターの物理テーブルのプライマリキーと同一である必要があります。同一でない場合、データの正確性に影響します。

    • Flink のデータ型は、AnalyticDB for MySQL のデータ型と互換性がある必要があります。詳細については、「型のマッピング」をご参照ください。

    CREATE TEMPORARY TABLE adb_source (
      `id` INT,
      `num` BIGINT,
      PRIMARY KEY (`id`) NOT ENFORCED
    ) WITH (
      'connector' = 'adb-mysql-cdc',
      'hostname' = 'amv-2zepb9n1l58ct01z50000****.ads.aliyuncs.com',
      'username' = 'testUser',
      'password' = 'Test12****',
      'database-name' = 'binlog',
      'table-name' = 'source_table'
    );

    次の表に、WITH 句のパラメーターを示します。

    パラメーター

    必須

    デフォルト値

    データの型

    説明

    connector

    はい

    なし

    STRING

    使用するコネクタ。

    このパラメーターは必須です。値を adb-mysql-cdc に設定します。

    hostname

    はい

    なし

    STRING

    AnalyticDB for MySQL の VPC エンドポイント。

    username

    はい

    なし

    STRING

    AnalyticDB for MySQL データベースアカウント。

    password

    はい

    なし

    STRING

    AnalyticDB for MySQL データベースアカウントのパスワード。

    database-name

    はい

    なし

    STRING

    AnalyticDB for MySQL データベースの名前。

    AnalyticDB for MySQL はテーブルレベルのバイナリログを実装しているため、指定できるデータベースは 1 つだけです。

    table-name

    はい

    なし

    STRING

    AnalyticDB for MySQL データベース内のテーブルの名前。

    AnalyticDB for MySQL はテーブルレベルのバイナリログを実装しているため、指定できるテーブルは 1 つだけです。

    port

    いいえ

    3306

    INTEGER

    ポート番号。

    scan.incremental.snapshot.enabled

    いいえ

    true

    BOOLEAN

    増分スナップショット。

    この機能はデフォルトで有効になっています。増分スナップショットは、テーブルスナップショットを読み取るための新しいメカニズムです。従来のスナップショットメカニズムと比較して、増分スナップショットメカニズムには次の利点があります。

    • スナップショットの読み取り中、ソースは同時読み取りをサポートします。

    • スナップショットの読み取り中、ソースはチャンク粒度でのチェックポイントをサポートします。

    • スナップショットの読み取り前に、ソースはデータベースロック権限を取得する必要はありません。

    scan.incremental.snapshot.chunk.size

    いいえ

    8096

    INTEGER

    テーブルスナップショット チャンクのサイズ。チャンクに含まれる行数です。

    増分スナップショット読み取りが有効な場合、テーブルは読み取りのために複数のチャンクに分割されます。

    scan.snapshot.fetch.size

    いいえ

    1024

    INTEGER

    テーブルスナップショットを 1 回読み取るごとに読み取ることができる最大行数。

    scan.startup.mode

    いいえ

    initial

    STRING

    データ消費の起動モード。

    有効な値:

    • initial (デフォルト): ジョブが初めて開始されると、すべての既存データをスキャンし、最新のバイナリログデータを読み取ります。

    • earliest-offset: ジョブは既存データをスキャンせず、利用可能な最も古いバイナリログからデータの読み取りを開始します。

    • specific-offset: 既存の完全データをスキャンせず、指定したバイナリログオフセットから開始します。このオフセットは、scan.startup.specific-offset.filescan.startup.specific-offset.pos の両方のパラメーターを構成して、開始バイナリログファイルとオフセットを定義することで指定できます。

    • latest-offset: ジョブが初めて開始されると、既存データをスキャンせず、バイナリログの末尾 (最新のバイナリログ) からデータの読み取りを開始します。これは、ジョブがコネクタの起動後に発生した最新の変更のみを読み取ることを意味します。

    • timestamp: 既存の完全データをスキャンしません。コネクタは、指定されたタイムスタンプからバイナリログの読み取りを開始します。タイムスタンプは、scan.startup.timestamp-millis パラメーターによってミリ秒 (ms) 単位で指定されます。

    重要

    earliest-offset、specific-offset、または timestamp 起動モードを使用する場合、指定されたバイナリログ消費位置からジョブが開始されるまでの間、対応するテーブルのスキーマが変更されないようにしてください。これにより、スキーマ進化によるジョブの失敗を防ぐことができます。

    scan.startup.specific-offset.file

    いいえ

    なし

    STRING

    specific-offset 起動モードでは、このパラメーターは開始オフセットのバイナリログファイルの名前を指定します。

    最新のバイナリログファイル名を取得するには、SHOW MASTER STATUS for table_name; ステートメントを実行します。

    scan.startup.specific-offset.pos

    いいえ

    なし

    LONG

    specific-offset 起動モードでは、このパラメーターは開始オフセットのバイナリログファイル内の位置を指定します。

    SHOW MASTER STATUS for table_name; コマンドを実行して、最新のバイナリログ位置を取得できます。

    scan.startup.specific-offset.skip-events

    いいえ

    なし

    LONG

    指定された開始オフセットの後にスキップするイベントの数。

    scan.startup.specific-offset.skip-rows

    いいえ

    なし

    LONG

    指定された開始オフセットの後にスキップするデータ行の数。

    scan.startup.timestamp-millis

    いいえ

    なし

    LONG

    指定された時間モードを使用してジョブを開始する場合、このパラメーターは開始オフセットをミリ秒単位で指定します。

    この構成を使用する場合、scan.startup.modetimestamp に設定する必要があります。タイムスタンプはミリ秒 (ms) 単位です。

    server-time-zone

    いいえ

    なし

    STRING

    データベースサーバーのセッションタイムゾーン。

    例: "Asia/Shanghai"。このパラメーターは、AnalyticDB for MySQL で、TIMESTAMP 型が STRING 型にどのように変換されるかを制御します。このパラメーターが設定されていない場合、ZONELD.SYSTEMDEFAULT() がサーバーのタイムゾーンを決定するために使用されます。

    debezium.min.row.count.to.stream.result

    いいえ

    1000

    INTEGER

    テーブルの行数がこの値より大きい場合、コネクタは結果をストリーミングします。

    このパラメーターを 0 に設定すると、すべてのテーブルサイズのチェックがスキップされ、スナップショット中にすべての結果が常にストリーミングされます。

    connect.timeout

    いいえ

    30s

    DURATION

    データベースサーバーへの接続がタイムアウトするまで待機する最大時間。この時間を超えると、システムは接続を再試行します。

    デフォルトの単位は秒 (s) です。

    connect.max-retries

    いいえ

    3

    INTEGER

    データベースサービスへの接続が失敗した後の最大再試行回数。

  3. 処理されたデータを格納する宛先テーブルを作成します。この例では、AnalyticDB for MySQL を宛先として使用します。Flink がサポートするコネクタの詳細については、「サポートされているコネクタ」をご参照ください。

    CREATE TABLE target_table (
      `id` INT,
      `num` BIGINT,
      PRIMARY KEY (`id`)
    )
  4. 前のステップで作成した宛先テーブルに接続するためのシンクテーブルを作成します。シンクテーブルは、処理されたデータを AnalyticDB for MySQL の指定されたテーブルに書き込みます。

    CREATE TEMPORARY TABLE adb_sink (
      `id` INT,
      `num` BIGINT,
      PRIMARY KEY (`id`) NOT ENFORCED
    ) WITH (
      'connector' = 'adb3.0',
      'url' = 'jdbc:mysql://amv-2zepb9n1l58ct01z50000****.ads.aliyuncs.com:3306/flinktest',
      'userName' = 'testUser',
      'password' = 'Test12****',
      'tableName' = 'target_table'
    );

    シンクテーブルの WITH パラメーターと型のマッピングの詳細については、「AnalyticDB for MySQL V3.0 コネクタ」をご参照ください。

  5. キャプチャされたソースデータの変更をシンクテーブルに同期します。その後、シンクテーブルはデータを宛先に書き込みます。

    INSERT INTO adb_sink
    SELECT * FROM adb_source;
  6. [保存] をクリックします。

  7. [深度チェック]をクリックします。

    検証機能は、ジョブの SQL セマンティクス、ネットワーク接続、および使用するテーブルのメタデータをチェックします。結果エリアで [SQL 最適化] をクリックして、SQL のリスクアラートと最適化の提案を表示することもできます。

  8. (オプション) [デバッグ] をクリックします。

    ジョブデバッグ機能を使用して、ジョブの実行をシミュレートし、出力結果を確認し、SELECT または INSERT ステートメントのビジネスロジックを検証できます。これにより、開発効率が向上し、データ品質のリスクが軽減されます。

  9. [デプロイ] をクリックします。

    ジョブを開発および検証した後、ジョブを本番環境にデプロイできます。ジョブがデプロイされた後、[O&M] ページに移動して ジョブを開始できます。

  10. (オプション) バイナリログに関する情報を表示します。

    説明

    次の SQL ステートメントを実行してバイナリログに関する情報をクエリした後、バイナリログ機能を有効にしただけの場合は 0 が返されます。ログ情報は、バイナリログをサブスクライブした後にのみ表示されます。

    • 最新のバイナリログのファイル名と場所をクエリするには、次のステートメントを実行します。

      SHOW MASTER STATUS FOR source_table;
    • クリアされていないすべての既存バイナリログとそのサイズをクエリするには、次のステートメントを実行します。

      SHOW BINARY LOGS FOR source_table;

型のマッピング

次の表に、AnalyticDB for MySQLFlink の間のデータ型のマッピングを示します。

AnalyticDB for MySQL フィールドの型

Flink フィールドの型

BOOLEAN

BOOLEAN

TINYINT

TINYINT

SMALLINT

SMALLINT

INT

INT

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

DECIMAL(p,s) or NUMERIC(p,s)

DECIMAL(p,s)

VARCHAR

STRING

BINARY

BYTES

DATE

DATE

TIME

TIME

DATETIME

TIMESTAMP

TIMESTAMP

TIMESTAMP

POINT

STRING

JSON

STRING