すべてのプロダクト
Search
ドキュメントセンター

ApsaraDB for SelectDB:Flink を使用してデータをインポートする

最終更新日:Apr 09, 2025

ApsaraDB for SelectDB は Apache Doris と完全な互換性があります。Flink Doris Connector を使用して、MySQL、Oracle、PostgreSQL、SQL Server、Kafka などのデータソースから既存データを SelectDB にインポートできます。Flink 変更データキャプチャ(CDC)コネクタを使用する Flink ジョブを作成した場合、増分データもデータソースから SelectDB に同期されます。

概要

説明
  • Flink Doris Connector を使用して、SelectDB インスタンスにデータを書き込むことのみができます。Flink Doris Connector を使用して SelectDB インスタンスのバックエンド(BE)クラスタ内のノードに接続してデータを効率的に読み取る場合は、SelectDB のテクニカルサポートに連絡して、必要な権限を申請する必要があります。

  • Flink JDBC Connector を使用して、SelectDB インスタンスにデータを書き込むこともできます。

Flink Doris Connector は、Apache Flink を Apache Doris に接続するために使用されます。Flink Doris Connector を使用して、リアルタイムのデータ処理と分析のために Apache Doris からデータを読み書きできます。 SelectDB は Apache Doris と完全な互換性があるため、Flink Doris Connector は、ストリーミングモードで SelectDB にデータをインポートするためによく使用されます。

Flink のストリーム処理機能は、ソース、変換、シンクのコンポーネントに基づいて実装されています。

  • ソース:

    • 外部システムからデータストリームを読み取ります。外部システムには、Apache Kafka などのメッセージキュー、データベース、ファイルシステムが含まれます。

    • たとえば、ソースを使用して、Kafka からメッセージをリアルタイムで読み取ったり、ファイルからデータを読み取ったりできます。

  • 変換:

    • 変換ステージで入力データストリームを処理および変換します。変換操作には、フィルタリング、マッピング、集計、ウィンドウの定義が含まれます。

    • たとえば、入力データストリームに対してマッピング操作を実行してデータ構造を変換したり、データを毎分集計して特定のメトリックを計算したりできます。

  • シンク:

    • 処理されたデータストリームを外部システムに配信します。シンクを使用して、データベース、ファイル、メッセージキューにデータを書き込むことができます。

    • たとえば、シンクを使用して、処理されたデータを MySQL データベースまたは Kafka トピックに書き込むことができます。

次の図は、Flink Doris Connector を使用して SelectDB にデータをインポートする方法を示しています。

image

前提条件

  • データソースと Fink は SelectDB に接続されています。ネットワーク接続を確立するには、次の手順を実行します。

    1. 使用する ApsaraDB for SelectDB インスタンスのパブリックエンドポイントを申請します。詳細については、「パブリックエンドポイントを申請またはリリースする」をご参照ください。

      Realtime Compute for Apache Flink と Alibaba Cloud が提供するデータソース、またはオープンソースの Flink と Elastic Compute Service(ECS)インスタンスにデプロイされたデータソースを使用し、Alibaba Cloud サービスまたは ECS インスタンスが ApsaraDB for SelectDB インスタンスと同じバーチャルプライベートクラウド(VPC)にある場合は、この手順をスキップします。

    2. Flink とデータソースの関連 IP アドレスを ApsaraDB for SelectDB インスタンスのホワイトリストに追加します。詳細については、「IP アドレスホワイトリストを設定する」をご参照ください。

  • Flink Doris Connector がインストールされています。

    次の表は、バージョン要件を示しています。

    Flink バージョン

    Flink Doris Connector のバージョン

    ダウンロードリンク

    Realtime Compute for Apache Flink: V1.17 以降

    オープンソース Flink: V1.15 以降

    V1.5.2 以降

    Flink Doris Connector

    詳細については、「Flink Doris Connector をインストールする」をご参照ください。

Flink Doris Connector をインストールする

ビジネス要件に基づいて、次のいずれかの方法を使用して Flink Doris Connector をインストールできます。

  • Realtime Compute for Apache Flink のカスタムコネクタ機能を使用して、Flink Doris Connector をアップロード、使用、更新します。この方法は、Realtime Compute for Apache Flink を使用してデータをインポートする場合に適用できます。SelectDB 詳細については、「カスタムコネクタを管理する」をご参照ください。

  • 必要なバージョンの Flink Doris Connector の JAR パッケージを、Flink インストールディレクトリの lib ディレクトリにダウンロードします。このメソッドは、セルフマネージドのオープンソース Flink クラスタを使用する場合に適用できます。JAR パッケージをダウンロードするには、org/apache/doris にアクセスしてください。

  • Flink Doris Connector の Maven 依存関係をインストールします。次のサンプルコードは例を示しています。その他のバージョンの依存関係を取得するには、org/apache/doris にアクセスしてください。

    <!-- flink-doris コネクタ -->
    <dependency>
      <groupId>org.apache.doris</groupId>
      <artifactId>flink-doris-connector-1.16</artifactId>
      <version>1.5.2</version>
    </dependency>  

サンプル環境

次の例は、Flink SQL、Flink CDC、および DataStream を使用して、ApsaraDB RDS for MySQL インスタンスの test データベースの employees テーブルから SelectDB インスタンスの test データベースの employees テーブルにデータをインポートする方法を示しています。ビジネス要件に基づいて、対応するパラメーターを変更できます。サンプル環境:

  • バージョン 1.16 の Flink スタンドアロンクラスター

  • Java

  • デスティネーションデータベース: test

  • デスティネーションテーブル: employees

  • ソースデータベース: test

  • ソーステーブル: employees

環境を準備する

Flink 環境

  1. Java 環境を準備します。

    Flink の実行は Java 環境に依存します。したがって、Java 開発キット (JDK) をインストールし、JAVA_HOME 環境変数を構成する必要があります。

    必要な Java バージョンは Flink バージョンに関連しています。Flink でサポートされている Java バージョンの詳細については、「Java 互換性」をご参照ください。この例では、Java 8 がインストールされています。詳細については、「Alibaba Cloud Linux 2、Alibaba Cloud Linux 3、または CentOS 7.x を実行するインスタンスに Java Web 環境をデプロイする」トピックの「ステップ 2: JDK をインストールする」セクションをご参照ください。

  2. Flink インストールパッケージ flink-1.16.3-bin-scala_2.12.tgz をダウンロードします。このバージョンが期限切れになっている場合は、Apache Flink 公式 Web サイト で別のバージョンをダウンロードしてください。

    wget https://www.apache.si/flink/flink-1.16.3/flink-1.16.3-bin-scala_2.12.tgz
  3. インストールパッケージを解凍します。

    tar -zxvf flink-1.16.3-bin-scala_2.12.tgz
  4. Flink インストールディレクトリの lib ディレクトリに移動して、必要なコネクタをインストールします。

    • Flink Doris Connector をインストールします。

      wget https://repo.maven.apache.org/maven2/org/apache/doris/flink-doris-connector-1.16/1.5.2/flink-doris-connector-1.16-1.5.2.jar
    • Flink MySQL Connector をインストールします。

      wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.4.2/flink-sql-connector-mysql-cdc-2.4.2.jar
  5. Flink クラスターを起動します。

    Flink インストールディレクトリの bin ディレクトリで、次のコマンドを実行します。

    ./start-cluster.sh 

ApsaraDB for SelectDB のデスティネーションテーブルとデータベース

  1. ApsaraDB for SelectDB インスタンスを作成します。詳細については、「インスタンスの作成」をご参照ください。

  2. ApsaraDB for SelectDB インスタンスに接続します。詳細については、「インスタンスへの接続」をご参照ください。

  3. test という名前の test データベースを作成します。

    CREATE DATABASE test;
  4. employees という名前の test テーブルを作成します。

    USE test;
    
    -- テーブルを作成します。
    CREATE TABLE employees (
        emp_no       int NOT NULL,
        birth_date   date,
        first_name   varchar(20),
        last_name    varchar(20),
        gender       char(2),
        hire_date    date
    )
    UNIQUE KEY(`emp_no`)
    DISTRIBUTED BY HASH(`emp_no`) BUCKETS 1;

ApsaraDB RDS for MySQL のソーステーブルとデータベース

  1. ApsaraDB RDS for MySQL インスタンスを作成します。詳細については、「ステップ 1: ApsaraDB RDS for MySQL インスタンスを作成し、データベースを構成する」をご参照ください。

  2. test という名前の test データベースを作成します。

    CREATE DATABASE test;
  3. employees という名前の test テーブルを作成します。

    USE test;
    
    CREATE TABLE employees (
        emp_no INT NOT NULL PRIMARY KEY,
        birth_date DATE,
        first_name VARCHAR(20),
        last_name VARCHAR(20),
        gender CHAR(2),
        hire_date DATE
    );
  4. テーブルにデータを挿入します。

    INSERT INTO employees (emp_no, birth_date, first_name, last_name, gender, hire_date) VALUES
    (1001, '1985-05-15', 'John', 'Doe', 'M', '2010-06-20'),
    (1002, '1990-08-22', 'Jane', 'Smith', 'F', '2012-03-15'),
    (1003, '1987-11-02', 'Robert', 'Johnson', 'M', '2015-07-30'),
    (1004, '1992-01-18', 'Emily', 'Davis', 'F', '2018-01-05'),
    (1005, '1980-12-09', 'Michael', 'Brown', 'M', '2008-11-21');

Flink SQL を使用してデータをインポートする

  1. Flink SQL クライアントを起動します。

    Flink インストールディレクトリの bin ディレクトリで次のコマンドを実行します。

    ./sql-client.sh
  2. 次の手順を実行して、Flink SQL クライアントで Flink ジョブを送信します。

    1. MySQL データベースにソーステーブルを作成します。

      この例では、MySQL CDC コネクタに関連する構成は WITH で指定されています。設定項目の詳細については、「MySQL CDC コネクタ」をご参照ください。

      CREATE TABLE employees_source (
          emp_no INT,
          birth_date DATE,
          first_name STRING,
          last_name STRING,
          gender STRING,
          hire_date DATE,
          PRIMARY KEY (`emp_no`) NOT ENFORCED
      ) WITH (
          'connector' = 'mysql-cdc',
          'hostname' = '127.0.0.1', 
          'port' = '3306',
          'username' = 'root',
          'password' = '****',
          'database-name' = 'test',
          'table-name' = 'employees'
      );
    2. SelectDB インスタンスにシンクテーブルを作成します。

      この例では、SelectDB インスタンスに関連する構成は WITH で指定されています。設定項目の詳細については、「Doris シンクの設定項目」をご参照ください。

      CREATE TABLE employees_sink (
          emp_no       INT ,
          birth_date   DATE,
          first_name   STRING,
          last_name    STRING,
          gender       STRING,
          hire_date    DATE
      ) 
      WITH (
        'connector' = 'doris',
        'fenodes' = 'selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080',
        'table.identifier' = 'test.employees',
        'username' = 'admin',
        'password' = '****'
      );
    3. MySQL データベースのソーステーブルから SelectDB インスタンスのシンクテーブルにデータをインポートします。

      INSERT INTO employees_sink SELECT * FROM employees_source;
  3. データインポート結果を確認します。

    SelectDB インスタンスに接続し、次のコマンドを実行して、データがインポートされているかどうかを確認します。

    SELECT * FROM test.employees;

Flink CDC を使用してデータをインポートする

重要

Realtime Compute for Apache Flink を使用してデータをインポートする場合、Flink CDC は JAR ジョブをサポートしていません。Flink CDC 3.0 は YAML ジョブを使用してデータをインポートします。

次の例は、Flink CDC を使用してデータベースから SelectDB インスタンスにデータをインポートする方法を示しています。

Flink がインストールされているディレクトリで、bin/flink を使用して Flink CDC ジョブを実行します。構文:

<FLINK_HOME>/bin/flink run \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    -c org.apache.doris.flink.tools.cdc.CdcTools \
    lib/flink-doris-connector-1.16-1.5.2.jar \
    <mysql-sync-database|oracle-sync-database|postgres-sync-database|sqlserver-sync-database> \
    --database <selectdb-database-name> \
    [--job-name <flink-job-name>] \
    [--table-prefix <selectdb-table-prefix>] \
    [--table-suffix <selectdb-table-suffix>] \
    [--including-tables <mysql-table-name|name-regular-expr>] \
    [--excluding-tables <mysql-table-name|name-regular-expr>] \
    --mysql-conf <mysql-cdc-source-conf> [--mysql-conf <mysql-cdc-source-conf> ...] \
    --oracle-conf <oracle-cdc-source-conf> [--oracle-conf <oracle-cdc-source-conf> ...] \
    --sink-conf <doris-sink-conf> [--table-conf <doris-sink-conf> ...] \
    [--table-conf <selectdb-table-conf> [--table-conf <selectdb-table-conf> ...]]

パラメーター

パラメーター

説明

execution.checkpointing.interval

チェックポイントの間隔。データ同期の頻度に影響します。このパラメーターを 10 秒に設定することをお勧めします。

parallelism.default

Flink ジョブの並列度。並列度を適切に増やすことで、データ同期を高速化できます。

job-name

Flink ジョブの名前。

database

SelectDB インスタンスでデータをインポートするデータベースの名前。

table-prefix

SelectDB テーブルの名前に追加されるプレフィックス。例: --table-prefix ods_

table-suffix

SelectDB テーブルの名前に追加されるサフィックス。

including-tables

データを同期するテーブル。複数のテーブル名を区切るには、縦棒 (|) を使用します。正規表現もサポートされています。例: --including-tables table1|tbl.*。これは、table1 と、名前が tbl で始まるすべてのテーブルからデータが同期されることを指定します。

excluding-tables

除外するテーブル。このパラメーターは、including-tables パラメーターと同じ方法で指定できます。

mysql-conf

MySQL CDC ソースの設定項目。設定項目の詳細については、「MySQL CDC コネクタ」をご参照ください。hostnameusernamepassword、および database-name パラメーターは必須です。

oracle-conf

Oracle CDC ソースの設定項目。設定項目の詳細については、「Oracle CDC コネクタ」をご参照ください。hostnameusernamepassworddatabase-name、および schema-name パラメーターは必須です。

sink-conf

Doris シンクの設定項目。詳細については、このトピックの「Doris シンクの設定項目」セクションをご参照ください。

table-conf

SelectDB テーブルの設定項目。SelectDB テーブルの作成時に、設定項目は properties に含まれます。

説明
  1. データを同期するには、$FLINK_HOME/lib ディレクトリに flink-sql-connector-mysql-cdc-${version}.jar や flink-sql-connector-oracle-cdc-${version}.jar などの Flink CDC の依存関係をインストールする必要があります。

  2. データベースから完全データを同期する場合、Flink のバージョンは 1.15 以降である必要があります。異なるバージョンの Flink Doris Connector のダウンロード方法の詳細については、「org/apache/doris」をご参照ください。

Doris Sink の構成項目

パラメーター

デフォルト値

必須

説明

fenodes

デフォルト値なし

はい

ApsaraDB for SelectDB インスタンスにアクセスするために使用されるエンドポイントと HTTP ポート。

ApsaraDB for SelectDB インスタンスの VPC エンドポイントまたはパブリックエンドポイントと HTTP ポートを取得するには、次の手順を実行します。ApsaraDB for SelectDB コンソールにログオンし、情報を確認するインスタンスの [インスタンスの詳細] ページに移動します。[基本情報] ページの [ネットワーク情報] セクションで、[VPC エンドポイント] または [パブリックエンドポイント] パラメーターと [HTTP ポート] パラメーターの値を確認します。

例: selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:8080

table.identifier

デフォルト値なし

はい

データベースとテーブルの名前。例: test_db.test_table

username

デフォルト値なし

はい

ApsaraDB for SelectDB インスタンスに接続するために使用されるユーザー名です。

password

デフォルト値なし

はい

ApsaraDB for SelectDB インスタンスに接続するために使用するパスワード。

jdbc-url

デフォルト値なし

いいえ

ApsaraDB for SelectDB インスタンスにアクセスするために使用される JDBC 接続文字列。

ApsaraDB for SelectDB インスタンスの VPC エンドポイントまたはパブリックエンドポイントと MySQL ポートを取得するには、次の手順を実行します。 ApsaraDB for SelectDB コンソールにログオンし、情報を確認するインスタンスの [インスタンスの詳細] ページに移動します。 基本情報 ページの [ネットワーク情報] セクションで、[VPC エンドポイント] パラメーターまたは [パブリックエンドポイント] パラメーターと [mysql ポート] パラメーターの値を確認します。

例: jdbc:mysql://selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:9030

auto-redirect

true

いいえ

Stream Load リクエストをリダイレクトするかどうかを指定します。 このパラメーターを true に設定すると、Stream Load リクエストはフロントエンド ( FE ) に送信されます。 バックエンド ( BE ) 情報は表示されなくなります。

doris.request.retries

3

いいえ

SelectDB インスタンスへのリクエスト送信を許可される最大再試行回数です。

doris.request.connect.timeout

30s

いいえ

SelectDB インスタンスへのリクエスト送信の接続タイムアウト期間です。

doris.request.read.timeout

30s

いいえ

SelectDB インスタンスへのリクエスト送信の読み取りタイムアウト期間です。

sink.label-prefix

""

はい

Stream Load で使用されるラベルプレフィックス。 2 フェーズコミットシナリオでは、Flink の 1 回限りのセマンティクスを確保するために、ラベルプレフィックスはグローバルに一意である必要があります。

sink.properties

デフォルト値なし

いいえ

Stream Load のデータインポートプロパティ。 次のいずれかの形式でプロパティを設定します。

  • CSV 形式:

    sink.properties.format='csv' 
    sink.properties.column_separator=',' // 列の区切り文字
    sink.properties.line_delimiter='\n' // 行の区切り文字
  • JSON 形式:

    sink.properties.format='json' 

プロパティの詳細については、「Stream Load を使用してデータをインポートする」をご参照ください。

sink.buffer-size

1048576

いいえ

書き込みデータバッファーのサイズ。単位: バイト。 デフォルト値 ( 1 MB 相当) を使用することをお勧めします。

sink.buffer-count

3

いいえ

書き込みデータバッファーの数。 デフォルト値を使用することをお勧めします。

sink.max-retries

3

いいえ

コミットリクエストが失敗した後に許可される最大再試行回数。 デフォルト値: 3 。

sink.use-cache

false

いいえ

例外が発生した場合に回復のためにメモリキャッシュを使用するかどうかを指定します。 このパラメーターを true に設定すると、キャッシュはチェックポイント中に生成されたデータを保持します。

sink.enable-delete

true

いいえ

イベントを同期的に削除するかどうかを指定します。 Unique key モデルのみがサポートされています。

sink.enable-2pc

true

いいえ

2 フェーズコミットモードを有効にするかどうかを指定します。 デフォルト値: true 。 2 フェーズコミットモードを有効にして、1 回限りのセマンティクスを確保できます。

sink.enable.batch-mode

false

いいえ

SelectDB インスタンスにデータを書き込むためのバッチモードを有効にするかどうかを指定します。バッチモードを有効にした場合、ApsaraDB for SelectDB インスタンスにデータが書き込まれる時点はチェックポイントに依存しません。代わりに、時点は sink.buffer-flush.max-rows、sink.buffer-flush.max-bytes、および sink.buffer-flush.interval パラメーターによって指定されます。

バッチモードを有効にした後は、1 回限りのセマンティクスは保証されず、Unique key モデルを使用してべき等性を実装できます。

sink.flush.queue-size

2

いいえ

バッチモードでのキャッシュキューのサイズ。

sink.buffer-flush.max-rows

50000

いいえ

バッチモードで 1 つのバッチに書き込むことができるデータ行の最大数。

sink.buffer-flush.max-bytes

10MB

いいえ

バッチモードで 1 つのバッチに書き込むことができるバイトの最大数。

sink.buffer-flush.interval

10s

いいえ

バッチモードでキャッシュが非同期的に更新される間隔。最小値は 1 です。単位: 秒。

sink.ignore.update-before

true

いいえ

update-before イベントを無視するかどうかを指定します。 デフォルト値: true 。

Doris シンクの設定項目

MySQL データベースからデータをインポートする

<FLINK_HOME>/bin/flink run \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    -c org.apache.doris.flink.tools.cdc.CdcTools \
    lib/flink-doris-connector-1.16-1.5.2.jar \
    oracle-sync-database \
    --database test_db \
    --oracle-conf hostname=127.0.0.1 \
    --oracle-conf port=1521 \
    --oracle-conf username=admin \
    --oracle-conf password="password" \
    --oracle-conf database-name=XE \
    --oracle-conf schema-name=ADMIN \
    --including-tables "tbl1|test.*" \
    --sink-conf fenodes=selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080 \
    --sink-conf username=admin \
    --sink-conf password=****

Oracle データベースからデータをインポートする

<FLINK_HOME>/bin/flink run \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    -c org.apache.doris.flink.tools.cdc.CdcTools \
    lib/flink-doris-connector-1.16-1.5.2.jar \
    oracle-sync-database \
    --database test_db \
    --oracle-conf hostname=127.0.0.1 \
    --oracle-conf port=1521 \
    --oracle-conf username=admin \
    --oracle-conf password="password" \
    --oracle-conf database-name=XE \
    --oracle-conf schema-name=ADMIN \
    --including-tables "tbl1|test.*" \
    --sink-conf fenodes=selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080 \
    --sink-conf username=admin \
    --sink-conf password=****

PostgreSQL データベースからデータをインポートする

<FLINK_HOME>/bin/flink run \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    -c org.apache.doris.flink.tools.cdc.CdcTools \
    lib/flink-doris-connector-1.16-1.5.2.jar \
    postgres-sync-database \
    --database db1\
    --postgres-conf hostname=127.0.0.1 \
    --postgres-conf port=5432 \
    --postgres-conf username=postgres \
    --postgres-conf password="123456" \
    --postgres-conf database-name=postgres \
    --postgres-conf schema-name=public \
    --postgres-conf slot.name=test \
    --postgres-conf decoding.plugin.name=pgoutput \
    --including-tables "tbl1|test.*" \
    --sink-conf fenodes=selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080 \
    --sink-conf username=admin \
    --sink-conf password=****

SQL Server データベースからデータをインポートする

<FLINK_HOME>/bin/flink run \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    -c org.apache.doris.flink.tools.cdc.CdcTools \
    lib/flink-doris-connector-1.16-1.5.2.jar \
    sqlserver-sync-database \
    --database db1\
    --sqlserver-conf hostname=127.0.0.1 \
    --sqlserver-conf port=1433 \
    --sqlserver-conf username=sa \
    --sqlserver-conf password="123456" \
    --sqlserver-conf database-name=CDC_DB \
    --sqlserver-conf schema-name=dbo \
    --including-tables "tbl1|test.*" \
    --sink-conf fenodes=selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080 \
    --sink-conf username=admin \
    --sink-conf password=****

DataStream を使用してデータをインポートする

  1. Maven プロジェクトに必要な Maven 依存関係をインストールします。

    Maven 依存関係

    <properties>
            <maven.compiler.source>8</maven.compiler.source>
            <maven.compiler.target>8</maven.compiler.target>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <scala.version>2.12</scala.version>
            <java.version>1.8</java.version>
            <flink.version>1.16.3</flink.version>
            <fastjson.version>1.2.62</fastjson.version>
            <scope.mode>compile</scope.mode>
        </properties>
        <dependencies>
            <!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
            <dependency>
                <groupId>com.google.guava</groupId>
                <artifactId>guava</artifactId>
                <version>28.1-jre</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-lang3</artifactId>
                <version>3.14.0</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.doris</groupId>
                <artifactId>flink-doris-connector-1.16</artifactId>
                <version>1.5.2</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-api-scala-bridge_${scala.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-planner_${scala.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-scala_${scala.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-clients</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-jdbc</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.doris</groupId>
                <artifactId>flink-doris-connector-1.16</artifactId>
                <version>1.5.2</version>
            </dependency>
    
            <dependency>
                <groupId>com.ververica</groupId>
                <artifactId>flink-sql-connector-mysql-cdc</artifactId>
                <version>2.4.2</version>
                <exclusions>
                    <exclusion>
                        <artifactId>flink-shaded-guava</artifactId>
                        <groupId>org.apache.flink</groupId>
                    </exclusion>
                </exclusions>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-runtime-web</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
        </dependencies>
  2. コア Java コードを実行します。

    次のコードスニペットでは、MySQL データベースのソーステーブルと ApsaraDB for SelectDB インスタンスのシンクテーブルを設定するために使用されるパラメーターは、このトピックのFlink SQL を使用してデータをインポートするセクションで説明されているパラメーターに対応しています。詳細については、「MySQL CDC Connector」と、このトピックのDoris シンクの設定項目セクションをご参照ください。

    package org.example;
    
    import com.ververica.cdc.connectors.mysql.source.MySqlSource;
    import com.ververica.cdc.connectors.mysql.table.StartupOptions;
    import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverterConfig;
    import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
    
    import org.apache.doris.flink.cfg.DorisExecutionOptions;
    import org.apache.doris.flink.cfg.DorisOptions;
    import org.apache.doris.flink.sink.DorisSink;
    import org.apache.doris.flink.sink.writer.serializer.JsonDebeziumSchemaSerializer;
    import org.apache.doris.flink.tools.cdc.mysql.DateToStringConverter;
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Properties;
    
    public class Main {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            env.enableCheckpointing(10000);
    
            Map<String, Object> customConverterConfigs = new HashMap<>();
            customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, "numeric");
            JsonDebeziumDeserializationSchema schema =
                    new JsonDebeziumDeserializationSchema(false, customConverterConfigs);
            
            // MySQL データベースのソーステーブルを構成します。
            MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                    .hostname("rm-xxx.mysql.rds.aliyuncs***")
                    .port(3306)
                    .startupOptions(StartupOptions.initial())
                    .databaseList("db_test")
                    .tableList("db_test.employees")
                    .username("root")
                    .password("test_123")
                    .debeziumProperties(DateToStringConverter.DEFAULT_PROPS)
                    .deserializer(schema)
                    .serverTimeZone("Asia/Shanghai")
                    .build();
    
            // ApsaraDB for SelectDB インスタンスのシンクテーブルを構成します。
            DorisSink.Builder<String> sinkBuilder = DorisSink.builder();
            DorisOptions.Builder dorisBuilder = DorisOptions.builder();
            dorisBuilder.setFenodes("selectdb-cn-xxx-public.selectdbfe.rds.aliyunc****:8080")
                    .setTableIdentifier("db_test.employees")
                    .setUsername("admin")
                    .setPassword("test_123");
            DorisOptions dorisOptions = dorisBuilder.build();
    
            // Stream Load に関連するパラメーターをプロパティとして構成します。
            Properties properties = new Properties();
            properties.setProperty("format", "json");
            properties.setProperty("read_json_by_line", "true");
            DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
            executionBuilder.setStreamLoadProp(properties);
    
            sinkBuilder.setDorisExecutionOptions(executionBuilder.build())
                    .setSerializer(JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions).build()) // 文字列に従ってシリアル化します
                    .setDorisOptions(dorisOptions);
    
            DataStreamSource<String> dataStreamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source");
            dataStreamSource.sinkTo(sinkBuilder.build());
            env.execute("MySQL to SelectDB");
        }
    }

高度な使用方法

Flink SQL を使用して一部の列を更新する

-- チェックポイントを有効にする
SET 'execution.checkpointing.interval' = '10s';

CREATE TABLE cdc_mysql_source (
   id INT
  ,name STRING
  ,bank STRING
  ,age INT
  ,PRIMARY KEY (id) NOT ENFORCED
) WITH (
 'connector' = 'mysql-cdc',
 'hostname' = '127.0.0.1',
 'port' = '3306',
 'username' = 'root',
 'password' = 'password',
 'database-name' = 'database',
 'table-name' = 'table'
);

CREATE TABLE selectdb_sink (
    id INT,
    name STRING,
    bank STRING,
    age INT
) 
WITH (
  'connector' = 'doris',
  'fenodes' = 'selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080',
  'table.identifier' = 'database.table',
  'username' = 'admin',
  'password' = '****',
  'sink.properties.format' = 'json',
  'sink.properties.read_json_by_line' = 'true',
  'sink.properties.columns' = 'id,name,bank,age',
  'sink.properties.partial.columns' = 'true' -- 一部の列の更新を許可します。
);


INSERT INTO selectdb_sink SELECT id,name,bank,age FROM cdc_mysql_source;

Flink SQL を使用して特定の列からデータを削除する

データソースが CDC をサポートしているシナリオでは、Doris Sink は RowKind に基づいてイベントのタイプを識別し、非表示列 __DORIS_DELETE_SIGN__ に値を割り当ててデータを削除します。Kafka メッセージがデータソースとして機能するシナリオでは、Doris Sink は RowKind に基づいて操作のタイプを識別できません。この場合、Doris Sink は、{"op_type":"delete",data:{...}} などのメッセージ内の特定のフィールドに基づいて操作タイプをマークする必要があります。 Doris Sink はこのマークを使用して、op_type フィールドの値が delete であるデータを削除できます。この場合、Doris Sink はビジネスロジックに基づいて非表示列の値を明示的に参照する必要があります。次の例は、Kafka データの特定のフィールドに基づいて、Flink SQL を使用して ApsaraDB for SelectDB インスタンスからデータを削除する方法を示しています。

-- この例では、ソースデータには {"op_type":"delete",data:{"id":1,"name":"zhangsan"}} フィールドが含まれています。
CREATE TABLE KAFKA_SOURCE(
  data STRING,
  op_type STRING
) WITH (
  'connector' = 'kafka',
  ...
);

CREATE TABLE SELECTDB_SINK(
  id INT,
  name STRING,
  __DORIS_DELETE_SIGN__ INT
) WITH (
  'connector' = 'doris',
  'fenodes' = 'selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080',
  'table.identifier' = 'db.table',
  'username' = 'admin',
  'password' = '****',
  'sink.enable-delete' = 'false',        -- false の値は、イベントタイプが RowKind に基づいて識別されないことを示します。
  'sink.properties.columns' = 'id, name, __DORIS_DELETE_SIGN__'  -- Stream Load によってインポートされた列。
);

INSERT INTO SELECTDB_SINK
SELECT json_value(data,'$.id') as id,
json_value(data,'$.name') as name, 
if(op_type='delete',1,0) as __DORIS_DELETE_SIGN__ 
FROM KAFKA_SOURCE;

よくある質問

  • Q: BITMAP 型のデータを書き込むにはどうすればよいですか?

    A: 次のサンプルコードは、BITMAP 型のデータを書き込む方法を示しています。

    CREATE TABLE bitmap_sink (
      dt INT,
      page STRING,
      user_id INT 
    )
    WITH ( 
      'connector' = 'doris',  // コネクタを doris に設定
      'fenodes' = 'selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080', // SelectDB FE のアドレス
      'table.identifier' = 'test.bitmap_test', // ターゲットテーブル名
      'username' = 'admin', // ユーザー名
      'password' = '****', // パスワード
      'sink.label-prefix' = 'selectdb_label', // ラベルのプレフィックス
      'sink.properties.columns' = 'dt,page,user_id,user_id=to_bitmap(user_id)' // user_id を BITMAP 型に変換
    );
  • Q: errCode = 2, detailMessage = Label[label_0_1]has already been used, relate to txn[19650] エラーが報告された場合はどうすればよいですか?

    A: Exactly-once シナリオでは、Flink ジョブを再起動する際に、最新のチェックポイントまたはセーブポイントから開始する必要があります。そうでない場合、上記のエラーが報告されます。Exactly-once が不要な場合は、sink.enable-2pc パラメーターを false に設定して 2 フェーズコミットモードを無効にするか、sink.label-prefix パラメーターを変更します。

  • Q: errCode = 2, detailMessage = transaction[19650]not found エラーが報告された場合はどうすればよいですか?

    A: このエラーは、コミットフェーズで発生します。チェックポイントに記録されたトランザクション ID が ApsaraDB for SelectDB インスタンスで期限切れになり、トランザクションを再度コミットすると、このエラーが発生します。この場合、チェックポイントからトランザクションを開始することはできません。ApsaraDB for SelectDB インスタンスの streaming_label_keep_max_second パラメーターを変更して、有効期間を延長できます。デフォルトの有効期間は 12 時間です。

  • Q: errCode = 2, detailMessage = current running txns on db 10006 is 100, larger than limit 100 エラーが報告された場合はどうすればよいですか?

    A: このエラーは、同じデータベースの同時データインポートジョブ数が 100 を超えたために報告されます。ApsaraDB for SelectDB インスタンスの max_running_txn_num_per_db パラメーターを変更することで、このエラーを解決できます。詳細については、「FE 設定」の max_running_txn_num_per_db をご参照ください。

    ジョブのラベルを頻繁に変更してジョブを再起動する場合、このエラーが発生する可能性があります。Duplicate key モデルまたは Aggregate key モデルを使用する 2 フェーズコミットシナリオでは、各ジョブのラベルは一意である必要があります。Flink ジョブは、チェックポイントから Flink ジョブが再起動されたときに、以前に開始されたが完了していないトランザクションをアクティブに中止します。ジョブのラベルを頻繁に変更してジョブを再起動すると、多数のプリコミットされたトランザクションが中止できず、トランザクションのクォータを占有します。Unique key モデルを使用している場合は、2 フェーズコミットモードを無効にし、Doris Sink を設定することで冪等書き込みを実装することもできます。

  • Q: Flink が Unique key モデルを使用するテーブルにデータを書き込む場合、バッチデータの順序をどのように保証しますか?

    A: シーケンス列を追加して、バッチデータの順序を保証できます。詳細については、「シーケンス列」をご参照ください。

  • Q: Flink ジョブでエラーが報告されていないのに、Flink ジョブがデータをインポートできないのはなぜですか?

    A: Flink Doris Connector 1.1.0 以前を使用してデータをインポートする場合、データはバッチモードで書き込まれます。すべてのデータ書き込みはデータによって駆動されます。データがアップストリームデータソースに書き込まれているかどうかを確認する必要があります。 1.1.0 以降の Flink Doris Connector を使用している場合、データの書き込みはチェックポイントに依存します。データを書き込むには、チェックポイントを有効にする必要があります。

  • Q: tablet writer write failed, tablet_id=190958, txn_id=3505530, err=-235 エラーが報告された場合はどうすればよいですか?

    A: ほとんどの場合、このエラーは Flink Doris Connector 1.1.0 以前を使用している場合に報告されます。このエラーは、書き込み頻度が高すぎるために発生します。sink.buffer-flush.max-bytes パラメーターと sink.buffer-flush.interval パラメーターを指定することで、Stream Load の頻度を減らすことができます。

  • Q: Flink を使用してデータをインポートするときに、ダーティデータをスキップするにはどうすればよいですか?

    A: Flink を使用してデータをインポートする場合、フィールドの形式や長さが要件を満たしていないデータなど、ダーティデータが存在すると、Stream Load はエラーを報告します。この場合、Flink はデータのインポートを再試行し続けます。ダーティデータをスキップするには、strict_mode パラメーターを false に設定し、max_filter_ratio パラメーターを 1 に設定することで Stream Load の厳密モードを無効にするか、シンクオペレーターの前にデータをフィルタリングします。

  • Q: ソーステーブルを宛先 ApsaraDB for SelectDB テーブルにマップするにはどうすればよいですか?

    A: Flink Doris Connector を使用してデータをインポートする場合は、次の 2 つの項目に注意してください。1. ソーステーブルの列と型は、Flink SQL の列と型と一致する必要があります。2. Flink SQL の列と型は、宛先 ApsaraDB for SelectDB テーブルの列と型と一致する必要があります。

  • Q: TApplicationException: get_next failed: out of sequence response: expected 4 but got 3 エラーが報告された場合はどうすればよいですか?

    A: このエラーは、Thrift フレームワークの同時実行性バグが原因で発生します。最新バージョンの Flink Doris Connector と互換性のある Flink バージョンを使用することをお勧めします。

  • Q: DorisRuntimeException: Fail to abort transaction 26153 with urlhttp://192.168.XX.XX エラーが報告された場合はどうすればよいですか?

    A: TaskManager で abort transaction response のログを検索し、返された HTTP ステータスコードに基づいて、エラーがクライアントまたはサーバーのどちらで発生したかを確認できます。