すべてのプロダクト
Search
ドキュメントセンター

ApsaraDB for SelectDB:Flink を使用したデータインポート

最終更新日:Feb 26, 2026

ApsaraDB for SelectDB は Apache Doris と完全互換です。Flink Doris Connector を使用して、MySQL、Oracle、PostgreSQL、SQL Server、Kafka などのデータソースから既存データを SelectDB にインポートできます。また、Flink で Change Data Capture(CDC)ジョブを有効にすると、データソースからの増分データも SelectDB に同期されます。

概要

説明

Flink Doris Connector は、SelectDB へのデータ書き込みにのみ使用できます。SelectDB インスタンスのバックエンド(BE)ノードに直接接続して効率的にデータを読み取るには、Flink Doris Connector を使用し、必要なアクセス権限を SelectDB テクニカルサポートチームに依頼してください。

SelectDB からデータを読み取るには、Flink Java Database Connectivity(JDBC)Connector を使用することもできます。

Flink Doris Connector は Apache Flink と Apache Doris を接続します。これにより、Doris からデータを読み取り、Doris にデータを書き込むことができ、リアルタイムのデータ処理および分析が可能になります。SelectDB は Apache Doris と完全互換であるため、Flink Doris Connector は SelectDB へのストリーミングデータ投入の一般的な方法です。

Flink のストリーム処理機能は、Source、Transform、Sink の 3 つのコンポーネントに依存しています。各コンポーネントの機能は以下のとおりです。

  • Source:

    • 機能:Source は Flink データストリームのエントリポイントです。メッセージキュー、データベース、ファイルシステムなどの外部システムからデータを読み取ります。

    • 例:Kafka をデータソースとして使用してリアルタイムメッセージを読み取る、またはファイルからデータを読み取ることができます。

  • Transform:

    • 機能:Transform ステージでは、入力データストリームを処理および変換します。これらの操作には、フィルタリング、マッピング、集約、ウィンドウ処理などが含まれます。

    • 例:入力ストリームをマッピングしてデータ構造を変換する、またはデータを集約して 1 分ごとのメトリックを計算することができます。

  • Sink:

    • 機能:Sink は Flink データストリームの出口ポイントです。処理されたデータをデータベース、ファイル、メッセージキューなどの外部システムに書き込みます。

    • 例:処理結果を MySQL データベースに書き込む、または別の Kafka トピックにデータを送信することができます。

次の図は、Flink Doris Connector を使用して SelectDB にデータをインポートする際のデータフローを示しています。

image

前提条件

  • データソース、Flink、および SelectDB がネットワーク経由で相互に通信できること:

    1. ApsaraDB for SelectDB インスタンスのパブリックエンドポイントを申請します。詳細については、「パブリックエンドポイントの申請またはリリース」をご参照ください。

      Flink およびデータソースが Alibaba Cloud のプロダクトであるか、Alibaba Cloud Elastic Compute Service(ECS)インスタンス上にデプロイされており、かつ ApsaraDB for SelectDB インスタンスと同じ VPC 内にある場合は、この手順をスキップできます。

    2. Flink およびデータソースの IP アドレスを ApsaraDB for SelectDB インスタンスのホワイトリストに追加します。詳細については、「IP アドレスホワイトリストの設定」をご参照ください。

  • Flink および Flink Doris Connector がインストールされていること。

    次の表は、バージョン要件を示しています。

    Flink バージョン

    Flink Doris Connector バージョン

    ダウンロード URL

    Realtime Compute for Apache Flink:1.17 以降

    オープンソース:1.15 以降

    バージョン 1.5.2 以降。最新バージョンの使用を推奨します。

    Flink Doris Connector

    Flink Doris Connector のインストール方法の詳細については、「Flink Doris Connector のインストール方法」をご参照ください。

Flink Doris Connector のインストール方法

シナリオに応じて、Flink Doris Connector をインストールできます。

  • Realtime Compute for Apache Flink を使用して SelectDB にデータをインポートする場合、カスタムコネクタ機能を使用して Flink Doris Connector をアップロード、使用、更新できます。詳細については、「カスタムコネクタの管理」をご参照ください。

  • セルフマネージドのオープンソース Flink クラスターを使用する場合、対応する Flink Doris Connector バージョンの JAR パッケージをダウンロードし、Flink インストールディレクトリの `lib` ディレクトリに配置します。ダウンロードリンクについては、「JAR パッケージ」をご参照ください。

  • Maven を使用して Flink Doris Connector をインポートする場合、プロジェクトの依存関係設定ファイルに次のコードを追加します。他のバージョンについては、「Maven リポジトリ」をご参照ください。

    <!-- flink-doris-connector -->
    <dependency>
      <groupId>org.apache.doris</groupId>
      <artifactId>flink-doris-connector-1.16</artifactId>
      <version>1.5.2</version>
    </dependency>  

使用例

サンプル環境

このセクションでは、MySQL インスタンスの `test` データベース内の `employees` テーブルから、SelectDB インスタンスの `test` データベース内の `employees` テーブルへ、Flink SQL、Flink CDC、および DataStream を使用してデータを移行する方法の例を示します。必要に応じてパラメーターを変更できます。サンプル環境は以下のとおりです。

  • Flink 1.16 スタンドアロン環境

  • Java

  • 宛先データベース:test

  • 宛先テーブル:employees

  • ソースデータベース:test

  • ソーステーブル:employees

環境の準備

Flink 環境の準備

  1. Java 環境を準備します。

    Flink は Java 環境に依存します。Java 開発キット(JDK)をインストールし、JAVA_HOME 環境変数を設定する必要があります。

    必要な Java バージョンは Flink バージョンによって異なります。Flink がサポートする Java バージョンの詳細については、「Java compatibility」をご参照ください。この例では、Java 8 をインストールします。詳細については、「JDK のインストール」をご参照ください。

  2. Flink インストールパッケージ `flink-1.16.3-bin-scala_2.12.tgz` をダウンロードします。このバージョンが期限切れの場合は、別のバージョンをダウンロードできます。その他のバージョンについては、「Apache Flink」をご参照ください。

    wget https://www.apache.si/flink/flink-1.16.3/flink-1.16.3-bin-scala_2.12.tgz
  3. パッケージを展開します。

    tar -zxvf flink-1.16.3-bin-scala_2.12.tgz
  4. Flink インストールディレクトリの `lib` ディレクトリに移動し、次の操作に必要なコネクタを追加します。

    • Flink Doris Connector を追加します。

      wget https://repo.maven.apache.org/maven2/org/apache/doris/flink-doris-connector-1.16/1.5.2/flink-doris-connector-1.16-1.5.2.jar
    • Flink MySQL Connector を追加します。

      wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.4.2/flink-sql-connector-mysql-cdc-2.4.2.jar
  5. Flink クラスターを起動します。

    Flink インストールディレクトリの `bin` ディレクトリで次のコマンドを実行します。

    ./start-cluster.sh 

宛先の SelectDB データベースおよびテーブルの準備

  1. ApsaraDB for SelectDB インスタンスを作成します。詳細については、「インスタンスの作成」をご参照ください。

  2. インスタンスに接続します。詳細については、「インスタンスへの接続」をご参照ください。

  3. `test` データベースを作成します。

    CREATE DATABASE test;
  4. `employees` テーブルを作成します。

    USE test;
    
    -- Create table.
    CREATE TABLE employees (
        emp_no       int NOT NULL,
        birth_date   date,
        first_name   varchar(20),
        last_name    varchar(20),
        gender       char(2),
        hire_date    date
    )
    UNIQUE KEY(`emp_no`)
    DISTRIBUTED BY HASH(`emp_no`) BUCKETS 1;

ソースの MySQL データベースおよびテーブルの準備

  1. MySQL インスタンスを作成します。詳細については、「ApsaraDB RDS for MySQL インスタンスの迅速な作成とデータベースの設定」をご参照ください。

  2. `test` データベースを作成します。

    CREATE DATABASE test;
  3. `employees` テーブルを作成します。

    USE test;
    
    CREATE TABLE employees (
        emp_no INT NOT NULL PRIMARY KEY,
        birth_date DATE,
        first_name VARCHAR(20),
        last_name VARCHAR(20),
        gender CHAR(2),
        hire_date DATE
    );
  4. データを挿入します。

    INSERT INTO employees (emp_no, birth_date, first_name, last_name, gender, hire_date) VALUES
    (1001, '1985-05-15', 'John', 'Doe', 'M', '2010-06-20'),
    (1002, '1990-08-22', 'Jane', 'Smith', 'F', '2012-03-15'),
    (1003, '1987-11-02', 'Robert', 'Johnson', 'M', '2015-07-30'),
    (1004, '1992-01-18', 'Emily', 'Davis', 'F', '2018-01-05'),
    (1005, '1980-12-09', 'Michael', 'Brown', 'M', '2008-11-21');

Flink SQL を使用したデータインポート

  1. Flink SQL Client サービスを起動します。

    Flink インストールディレクトリの `bin` ディレクトリで次のコマンドを実行します。

    ./sql-client.sh
  2. Flink SQL Client 上で Flink ジョブを送信します。

    1. MySQL ソーステーブルを作成します。

      次の文の `WITH` 句内の項目は、MySQL CDC Source の設定項目です。設定項目の詳細については、「MySQL | Apache Flink CDC」をご参照ください。

      CREATE TABLE employees_source (
          emp_no INT,
          birth_date DATE,
          first_name STRING,
          last_name STRING,
          gender STRING,
          hire_date DATE,
          PRIMARY KEY (`emp_no`) NOT ENFORCED
      ) WITH (
          'connector' = 'mysql-cdc',
          'hostname' = '127.0.0.1', 
          'port' = '3306',
          'username' = 'root',
          'password' = '****',
          'database-name' = 'test',
          'table-name' = 'employees'
      );
    2. SelectDB 結果テーブルを作成します。

      次の文の `WITH` 句内の項目は、SelectDB の設定項目です。設定項目の詳細については、「結果テーブルの設定項目」をご参照ください。

      CREATE TABLE employees_sink (
          emp_no       INT ,
          birth_date   DATE,
          first_name   STRING,
          last_name    STRING,
          gender       STRING,
          hire_date    DATE
      ) 
      WITH (
        'connector' = 'doris',
        'fenodes' = 'selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080',
        'table.identifier' = 'test.employees',
        'username' = 'admin',
        'password' = '****'
      );
    3. MySQL ソーステーブルから SelectDB 結果テーブルへデータを同期します。

      INSERT INTO employees_sink SELECT * FROM employees_source;
  3. データインポート結果を確認します。

    SelectDB に接続し、次の文を実行して結果を確認します。

    SELECT * FROM test.employees;

Flink CDC を使用したデータインポート

重要

Realtime Compute for Apache Flink は JAR ジョブをサポートしていません。代わりに、CDC 3.0 用の YAML ジョブをサポートしています。

このセクションでは、Flink CDC を使用してデータベースから SelectDB にデータをインポートする方法について説明します。

Flink インストールディレクトリで、flink プログラムを使用して Flink CDC を実行できます。構文は以下のとおりです。

<FLINK_HOME>/bin/flink run \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    -c org.apache.doris.flink.tools.cdc.CdcTools \
    lib/flink-doris-connector-1.16-1.5.2.jar \
    <mysql-sync-database|oracle-sync-database|postgres-sync-database|sqlserver-sync-database> \
    --database <selectdb-database-name> \
    [--job-name <flink-job-name>] \
    [--table-prefix <selectdb-table-prefix>] \
    [--table-suffix <selectdb-table-suffix>] \
    [--including-tables <mysql-table-name|name-regular-expr>] \
    [--excluding-tables <mysql-table-name|name-regular-expr>] \
    --mysql-conf <mysql-cdc-source-conf> [--mysql-conf <mysql-cdc-source-conf> ...] \
    --oracle-conf <oracle-cdc-source-conf> [--oracle-conf <oracle-cdc-source-conf> ...] \
    --sink-conf <doris-sink-conf> [--table-conf <doris-sink-conf> ...] \
    [--table-conf <selectdb-table-conf> [--table-conf <selectdb-table-conf> ...]]

パラメーター

パラメーター

説明

execution.checkpointing.interval

Flink チェックポイント間隔。データ同期頻度に影響します。`10s` の使用を推奨します。

parallelism.default

Flink ジョブの並列処理の次数。並列処理の次数を高くすると、データ同期速度が向上します。

job-name

Flink ジョブの名前。

database

データを同期する SelectDB のデータベース名。

table-prefix

SelectDB テーブル名のプレフィックス。例:--table-prefix ods_

table-suffix

SelectDB テーブル名のサフィックス。

including-tables

同期するテーブル。複数のテーブルを指定する場合は縦棒(|)で区切ります。正規表現もサポートされています。例:--including-tables table1|tbl.* は、`table1` および `tbl.` で始まるすべてのテーブルを同期します。

excluding-tables

同期しないテーブル。このパラメーターは `including-tables` と同様に設定します。

mysql-conf

MySQL CDC Source の設定。詳細については、「MySQL CDC Connector」をご参照ください。hostnameusernamepassword、および database-name パラメーターは必須です。

oracle-conf

Oracle CDC Source の設定。詳細については、「Oracle CDC Connector」をご参照ください。hostnameusernamepassworddatabase-name、および schema-name パラメーターは必須です。

sink-conf

Doris Sink のすべての設定。詳細については、「結果テーブルの設定項目」をご参照ください。

table-conf

SelectDB テーブルの設定項目。SelectDB テーブルを作成する際に `properties` に含まれる項目です。

説明
  1. 同期を行うには、対応する Flink CDC 依存関係(例:`flink-sql-connector-mysql-cdc-${version}.jar` または `flink-sql-connector-oracle-cdc-${version}.jar`)を `$FLINK_HOME/lib` ディレクトリに追加する必要があります。

  2. Flink 1.15 以降のバージョンでは、データベース全体の同期がサポートされています。Flink Doris Connector の異なるバージョンのダウンロードリンクについては、「Flink Doris Connector」をご参照ください。

シンク設定項目

パラメーター

デフォルト値

必須

説明

fenodes

なし

はい

ApsaraDB for SelectDB インスタンスのエンドポイントおよび HTTP プロトコルポート。

ApsaraDB for SelectDB コンソールの インスタンス詳細 > ネットワーク情報 ページから、VPC エンドポイント(または パブリックエンドポイント)および HTTP ポート を取得できます。

例:selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:8080

table.identifier

なし

はい

データベース名およびテーブル名。例:test_db.test_table

username

なし

はい

ApsaraDB for SelectDB インスタンスのデータベースユーザー名。

password

なし

はい

ApsaraDB for SelectDB インスタンスのデータベースユーザーのパスワード。

jdbc-url

なし

いいえ

ApsaraDB for SelectDB インスタンスの JDBC 接続情報。

ApsaraDB for SelectDB コンソールの インスタンス詳細 > ネットワーク情報 ページから、VPC エンドポイント(または パブリックエンドポイント)および MySQL ポート を取得できます。

例:jdbc:mysql://selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:9030

auto-redirect

true

いいえ

Stream Load リクエストのリダイレクトを有効にするかどうかを指定します。有効にすると、Stream Load はフロントエンド(FE)経由で書き込みを行い、バックエンド(BE)情報を明示的に取得しなくなります。

doris.request.retries

3

いいえ

SelectDB へのリクエスト送信時の再試行回数。

doris.request.connect.timeout

30s

いいえ

SelectDB へのリクエスト送信時の接続タイムアウト。

doris.request.read.timeout

30s

いいえ

SelectDB へのリクエスト送信時の読み取りタイムアウト。

sink.label-prefix

""

はい

Stream Load インポートで使用されるラベルプレフィックス。2PC(two-phase commit)シナリオでは、Flink の Exactly-Once Semantics(EOS)を保証するためにグローバルに一意である必要があります。

sink.properties

なし

いいえ

Stream Load のインポートパラメーター。プロパティ設定を指定します。

  • CSV 形式の場合:

    sink.properties.format='csv' 
    sink.properties.column_separator=','
    sink.properties.line_delimiter='\n' 
  • JSON 形式の場合:

    sink.properties.format='json' 

パラメーターの詳細については、「Stream Load」をご参照ください。

sink.buffer-size

1048576

いいえ

書き込みデータバッファーのサイズ(バイト単位)。このパラメーターは変更せず、1 MB のデフォルト設定を使用してください。

sink.buffer-count

3

いいえ

書き込みデータバッファーの数。このパラメーターは変更せず、デフォルト設定を使用してください。

sink.max-retries

3

いいえ

コミットフェーズでの失敗後の最大再試行回数。デフォルトは 3 です。

sink.use-cache

false

いいえ

例外発生時に復旧のためにメモリキャッシュを使用するかどうかを指定します。有効にすると、チェックポイント期間のデータがキャッシュに保持されます。

sink.enable-delete

true

いいえ

削除イベントを同期するかどうかを指定します。Unique Key モデルのみサポートされます。

sink.enable-2pc

true

いいえ

2PC(two-phase commit)プロトコルを有効にするかどうかを指定します。デフォルトは true で、EOS を保証します。

sink.enable.batch-mode

false

いいえ

SelectDB への書き込みにバッチモードを使用するかどうかを指定します。有効にすると、書き込みタイミングはチェックポイントに依存せず、`sink.buffer-flush.max-rows`、`sink.buffer-flush.max-bytes`、および `sink.buffer-flush.interval` パラメーターによって制御されます。

このモードを有効にすると、EOS は保証されません。べき等性を実現するために Unique Key モデルを使用できます。

sink.flush.queue-size

2

いいえ

バッチ処理モードにおけるキャッシュキューのサイズ。

sink.buffer-flush.max-rows

50000

いいえ

バッチ処理モードにおいて、1 回のバッチで書き込むデータ行の最大数。

sink.buffer-flush.max-bytes

10MB

いいえ

バッチ処理モードにおいて、1 回のバッチで書き込むバイト数の最大値。

sink.buffer-flush.interval

10s

いいえ

バッチ処理モードにおけるキャッシュの非同期フラッシュ間隔。最小値は 1s です。

sink.ignore.update-before

true

いいえ

`update-before` イベントを無視するかどうかを指定します。デフォルトでは、これらのイベントは無視されます。

同期の例

MySQL 同期の例

<FLINK_HOME>/bin/flink run \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    -c org.apache.doris.flink.tools.cdc.CdcTools \
    lib/flink-doris-connector-1.16-1.5.2.jar \
    oracle-sync-database \
    --database test_db \
    --oracle-conf hostname=127.0.0.1 \
    --oracle-conf port=1521 \
    --oracle-conf username=admin \
    --oracle-conf password="password" \
    --oracle-conf database-name=XE \
    --oracle-conf schema-name=ADMIN \
    --including-tables "tbl1|test.*" \
    --sink-conf fenodes=selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080 \
    --sink-conf username=admin \
    --sink-conf password=****

Oracle 同期の例

<FLINK_HOME>/bin/flink run \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    -c org.apache.doris.flink.tools.cdc.CdcTools \
    lib/flink-doris-connector-1.16-1.5.2.jar \
    oracle-sync-database \
    --database test_db \
    --oracle-conf hostname=127.0.0.1 \
    --oracle-conf port=1521 \
    --oracle-conf username=admin \
    --oracle-conf password="password" \
    --oracle-conf database-name=XE \
    --oracle-conf schema-name=ADMIN \
    --including-tables "tbl1|test.*" \
    --sink-conf fenodes=selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080 \
    --sink-conf username=admin \
    --sink-conf password=****

PostgreSQL 同期の例

<FLINK_HOME>/bin/flink run \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    -c org.apache.doris.flink.tools.cdc.CdcTools \
    lib/flink-doris-connector-1.16-1.5.2.jar \
    postgres-sync-database \
    --database db1\
    --postgres-conf hostname=127.0.0.1 \
    --postgres-conf port=5432 \
    --postgres-conf username=postgres \
    --postgres-conf password="123456" \
    --postgres-conf database-name=postgres \
    --postgres-conf schema-name=public \
    --postgres-conf slot.name=test \
    --postgres-conf decoding.plugin.name=pgoutput \
    --including-tables "tbl1|test.*" \
    --sink-conf fenodes=selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080 \
    --sink-conf username=admin \
    --sink-conf password=****

SQL Server 同期の例

<FLINK_HOME>/bin/flink run \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    -c org.apache.doris.flink.tools.cdc.CdcTools \
    lib/flink-doris-connector-1.16-1.5.2.jar \
    sqlserver-sync-database \
    --database db1\
    --sqlserver-conf hostname=127.0.0.1 \
    --sqlserver-conf port=1433 \
    --sqlserver-conf username=sa \
    --sqlserver-conf password="123456" \
    --sqlserver-conf database-name=CDC_DB \
    --sqlserver-conf schema-name=dbo \
    --including-tables "tbl1|test.*" \
    --sink-conf fenodes=selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080 \
    --sink-conf username=admin \
    --sink-conf password=****

DataStream API を使用したデータインポート

  1. Maven プロジェクトに必要な依存関係を追加します。

    必要な Maven 依存関係

    <properties>
            <maven.compiler.source>8</maven.compiler.source>
            <maven.compiler.target>8</maven.compiler.target>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <scala.version>2.12</scala.version>
            <java.version>1.8</java.version>
            <flink.version>1.16.3</flink.version>
            <fastjson.version>1.2.62</fastjson.version>
            <scope.mode>compile</scope.mode>
        </properties>
        <dependencies>
            <!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
            <dependency>
                <groupId>com.google.guava</groupId>
                <artifactId>guava</artifactId>
                <version>28.1-jre</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-lang3</artifactId>
                <version>3.14.0</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.doris</groupId>
                <artifactId>flink-doris-connector-1.16</artifactId>
                <version>1.5.2</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-api-scala-bridge_${scala.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-planner_${scala.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-scala_${scala.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-clients</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-jdbc</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.doris</groupId>
                <artifactId>flink-doris-connector-1.16</artifactId>
                <version>1.5.2</version>
            </dependency>
    
            <dependency>
                <groupId>com.ververica</groupId>
                <artifactId>flink-sql-connector-mysql-cdc</artifactId>
                <version>2.4.2</version>
                <exclusions>
                    <exclusion>
                        <artifactId>flink-shaded-guava</artifactId>
                        <groupId>org.apache.flink</groupId>
                    </exclusion>
                </exclusions>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-runtime-web</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
        </dependencies>
  2. Java コアコード。

    次のコードでは、MySQL ソーステーブルおよび SelectDB 結果テーブルのパラメーターは「Flink SQL を使用したデータインポート」セクションの設定に対応しています。詳細については、「MySQL | Apache Flink CDC」および「結果テーブルの設定項目」をご参照ください。

    package org.example;
    
    import com.ververica.cdc.connectors.mysql.source.MySqlSource;
    import com.ververica.cdc.connectors.mysql.table.StartupOptions;
    import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverterConfig;
    import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
    
    import org.apache.doris.flink.cfg.DorisExecutionOptions;
    import org.apache.doris.flink.cfg.DorisOptions;
    import org.apache.doris.flink.sink.DorisSink;
    import org.apache.doris.flink.sink.writer.serializer.JsonDebeziumSchemaSerializer;
    import org.apache.doris.flink.tools.cdc.mysql.DateToStringConverter;
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Properties;
    
    public class Main {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            env.enableCheckpointing(10000);
    
            Map<String, Object> customConverterConfigs = new HashMap<>();
            customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, "numeric");
            JsonDebeziumDeserializationSchema schema =
                    new JsonDebeziumDeserializationSchema(false, customConverterConfigs);
            
            // Configure the MySQL source table.
            MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                    .hostname("rm-xxx.mysql.rds.aliyuncs***")
                    .port(3306)
                    .startupOptions(StartupOptions.initial())
                    .databaseList("db_test")
                    .tableList("db_test.employees")
                    .username("root")
                    .password("test_123")
                    .debeziumProperties(DateToStringConverter.DEFAULT_PROPS)
                    .deserializer(schema)
                    .serverTimeZone("Asia/Shanghai")
                    .build();
    
            // Configure the SelectDB sink table.
            DorisSink.Builder<String> sinkBuilder = DorisSink.builder();
            DorisOptions.Builder dorisBuilder = DorisOptions.builder();
            dorisBuilder.setFenodes("selectdb-cn-xxx-public.selectdbfe.rds.aliyunc****:8080")
                    .setTableIdentifier("db_test.employees")
                    .setUsername("admin")
                    .setPassword("test_123");
            DorisOptions dorisOptions = dorisBuilder.build();
    
            // Configure Stream Load parameters in sink.properties.
            Properties properties = new Properties();
            properties.setProperty("format", "json");
            properties.setProperty("read_json_by_line", "true");
            DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
            executionBuilder.setStreamLoadProp(properties);
    
            sinkBuilder.setDorisExecutionOptions(executionBuilder.build())
                    .setSerializer(JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions).build()) //serialize according to string
                    .setDorisOptions(dorisOptions);
    
            DataStreamSource<String> dataStreamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source");
            dataStreamSource.sinkTo(sinkBuilder.build());
            env.execute("MySQL to SelectDB");
        }
    }

高度な使用方法

Flink SQL を使用した部分カラムの更新

-- enable checkpoint
SET 'execution.checkpointing.interval' = '10s';

CREATE TABLE cdc_mysql_source (
   id INT
  ,name STRING
  ,bank STRING
  ,age INT
  ,PRIMARY KEY (id) NOT ENFORCED
) WITH (
 'connector' = 'mysql-cdc',
 'hostname' = '127.0.0.1',
 'port' = '3306',
 'username' = 'root',
 'password' = 'password',
 'database-name' = 'database',
 'table-name' = 'table'
);

CREATE TABLE selectdb_sink (
    id INT,
    name STRING,
    bank STRING,
    age INT
) 
WITH (
  'connector' = 'doris',
  'fenodes' = 'selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080',
  'table.identifier' = 'database.table',
  'username' = 'admin',
  'password' = '****',
  'sink.properties.format' = 'json',
  'sink.properties.read_json_by_line' = 'true',
  'sink.properties.columns' = 'id,name,bank,age',
  'sink.properties.partial_columns' = 'true' -- Enable partial column updates.
);


INSERT INTO selectdb_sink SELECT id,name,bank,age FROM cdc_mysql_source;

Flink SQL を使用した特定カラムに基づくデータ削除

CDC ソースのシナリオでは、Doris Sink は `RowKind` に基づいてイベントタイプを区別し、隠しカラム __DORIS_DELETE_SIGN__ に値を割り当てて削除操作を実行します。Kafka メッセージソースのシナリオでは、Doris Sink は `RowKind` を直接使用して操作タイプを区別できません。代わりに、メッセージ内の特定のフィールド(例:{"op_type":"delete",data:{...}})に依存して操作タイプをマークする必要があります。このようなデータで `op_type` が 'delete' のレコードを削除するには、ビジネスロジックに基づいて隠しカラムの値を明示的に渡す必要があります。次の Flink SQL の例は、Kafka データ内の特定フィールドに基づいて SelectDB のデータを削除する方法を示しています。

-- Example data: {"op_type":"delete",data:{"id":1,"name":"zhangsan"}}
CREATE TABLE KAFKA_SOURCE(
  data STRING,
  op_type STRING
) WITH (
  'connector' = 'kafka',
  ...
);

CREATE TABLE SELECTDB_SINK(
  id INT,
  name STRING,
  __DORIS_DELETE_SIGN__ INT
) WITH (
  'connector' = 'doris',
  'fenodes' = 'selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080',
  'table.identifier' = 'db.table',
  'username' = 'admin',
  'password' = '****',
  'sink.enable-delete' = 'false',        -- false indicates that the event type is not obtained from RowKind.
  'sink.properties.columns' = 'id, name, __DORIS_DELETE_SIGN__'  -- Explicitly specify the columns to import for Stream Load.
);

INSERT INTO SELECTDB_SINK
SELECT json_value(data,'$.id') as id,
json_value(data,'$.name') as name, 
if(op_type='delete',1,0) as __DORIS_DELETE_SIGN__ 
FROM KAFKA_SOURCE;

よくある質問

  • Q:BITMAP 型のデータを書き込むにはどうすればよいですか?

    A:次の例を参照してください。

    CREATE TABLE bitmap_sink (
      dt INT,
      page STRING,
      user_id INT 
    )
    WITH ( 
      'connector' = 'doris', 
      'fenodes' = 'selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080',
      'table.identifier' = 'test.bitmap_test', 
      'username' = 'admin', 
      'password' = '****', 
      'sink.label-prefix' = 'selectdb_label', 
      'sink.properties.columns' = 'dt,page,user_id,user_id=to_bitmap(user_id)'
    );
  • Q:エラー errCode = 2, detailMessage = Label[label_0_1]has already been used, relate to txn[19650] を解決するにはどうすればよいですか?

    A:Exactly-Once シナリオでは、Flink ジョブは最新のチェックポイントまたはセーブポイントから再起動する必要があります。そうでないと、このエラーが発生します。Exactly-Once が不要な場合は、two-phase commit(2PC)プロトコルを無効にする(sink.enable-2pc=false)か、異なる `sink.label-prefix` を使用することでこのエラーを解決できます。

  • Q:エラー errCode = 2, detailMessage = transaction[19650]not found を解決するにはどうすればよいですか?

    A:このエラーはコミットフェーズ中に発生します。チェックポイントに記録されたトランザクション ID が SelectDB 側で期限切れになっています。トランザクションを再度コミットしようとすると、このエラーが発生します。この場合、チェックポイントからの再起動はできません。SelectDB の streaming_label_keep_max_second パラメーターを変更して有効期限を延長できます。デフォルト値は 12 時間です。

  • Q:エラー errCode = 2, detailMessage = current running txns on db 10006 is 100, larger than limit 100 を解決するにはどうすればよいですか?

    A:このエラーは、同じデータベースに対する同時データインポートジョブの数が 100 を超えたために発生します。SelectDB パラメーター max_running_txn_num_per_db を調整することでこの問題を解決できます。詳細については、「max_running_txn_num_per_db」をご参照ください。

    このエラーは、ラベルを頻繁に変更してジョブを再起動する場合にも発生することがあります。Duplicate または Aggregate Key モデルを使用する 2PC シナリオでは、各ジョブのラベルは一意である必要があります。チェックポイントから再起動すると、Flink ジョブは開始されたが完了していない(pre-committed だが committed ではない)トランザクションのみを中止します。ラベルを頻繁に変更して再起動すると、正常に pre-committed された多くのトランザクション(txn)が中止されず、トランザクションスロットを消費します。Unique Key モデルでは、2PC を無効にして、シンクがべき等な書き込みを実行するように設計することもできます。

  • Q:Flink が Unique Key モデルを使用するテーブルに書き込む場合、バッチ内のデータ順序を保証するにはどうすればよいですか?

    A:シーケンスカラム設定を追加して順序を保証できます。詳細については、「SEQUENCE」をご参照ください。

  • Q:Flink ジョブにエラーが報告されていないのにデータが同期されないのはなぜですか?

    A:バージョン 1.1.0 より前では、コネクタはデータ駆動型のバッチ書き込みを使用していました。そのため、アップストリームソースからデータが書き込まれているかどうかを確認する必要がありました。バージョン 1.1.0 以降では、書き込みはチェックポイントに依存します。データを書き込むには、チェックポイントを有効にする必要があります。

  • Q:エラー tablet writer write failed, tablet_id=190958, txn_id=3505530, err=-235 を解決するにはどうすればよいですか?

    A:このエラーは通常、バージョン 1.1.0 より前のコネクタで発生します。書き込み頻度が高すぎてバージョン数が過剰になることが原因です。sink.buffer-flush.max-bytes および sink.buffer-flush.interval パラメーターを設定して Stream Load の頻度を下げることができます。

  • Q:Flink インポート中にダーティデータをスキップするにはどうすればよいですか?

    A:Flink インポート中に、フィールドのフォーマットや長さに関する問題など、ダーティデータが存在すると Stream Load が失敗し、Flink が継続的に再試行します。ダーティデータをスキップするには、Stream Load の strict mode を無効にする(strict_mode=false,max_filter_ratio=1)か、データがシンクオペレーターに到達する前にフィルタリングを行ってください。

  • Q:ソーステーブルと SelectDB テーブルはどのように対応させるべきですか?

    A:Flink Doris Connector を使用してデータをインポートする場合、次の 2 点を確認する必要があります。第一に、ソーステーブルのカラムと型が Flink SQL のカラムと型に対応していること。第二に、Flink SQL のカラムと型が SelectDB テーブルのカラムと型に対応していること。

  • Q:エラー TApplicationException: get_next failed: out of sequence response: expected 4 but got 3 を解決するにはどうすればよいですか?

    A:このエラーは Thrift フレームワークの並行性バグによって発生します。この問題を解決するには、可能な限り最新のコネクタバージョンと互換性のある Flink バージョンを使用してください。

  • Q:エラー DorisRuntimeException: Fail to abort transaction 26153 with urlhttp://192.168.XX.XX を解決するにはどうすればよいですか?

    A:TaskManager ログで abort transaction response を検索してください。HTTP リターンコードにより、問題がクライアント側にあるかサーバー側にあるかを判断できます。