すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:Flinkコネクタ

最終更新日:Jan 11, 2025

Flink StarRocksコネクタを使用してデータをキャッシュし、Stream LoadモードでStarRocksに一度にインポートできます。このトピックでは、Flink StarRocksコネクタの使用方法と例について説明します。

背景情報

Apache Flinkによって提供されるFlink Java Database Connectivity(JDBC)コネクタは、StarRocksのインポートパフォーマンス要件を満たすことができません。そのため、Alibaba Cloudは、データをキャッシュしてからStream LoadモードでStarRocksに一度にインポートするのに役立つFlink StarRocksコネクタを提供しています。

Flinkコネクタの使用

GitHubのstarrocks-connector-for-apache-flink ページから、テスト用のFlink StarRocksコネクタのソースコードをダウンロードできます。

プロジェクトの pom.xml ファイルに次のコードを追加します。

<dependency>
    <groupId>com.starrocks</groupId>
    <artifactId>flink-connector-starrocks</artifactId>
    <!-- flink-1.11、flink-1.12の場合 -->
    <version>x.x.x_flink-1.11</version>
    <!-- flink-1.13の場合 -->
    <version>x.x.x_flink-1.13</version>
</dependency>
説明

バージョン情報 ページでFlink StarRocksコネクタの最新バージョンを確認し、上記のコードの x.x.x を最新バージョンに置き換えることができます。

サンプルコード:

  • 方法 1

    // -------- 生のJSON文字列ストリームを持つシンク --------
    fromElements(new String[]{
        "{\"score\": \"99\", \"name\": \"stephen\"}",
        "{\"score\": \"100\", \"name\": \"lebron\"}"
    }).addSink(
        StarRocksSink.sink(
            // シンクオプション
            StarRocksSinkOptions.builder()
                .withProperty("jdbc-url", "jdbc:mysql://fe1_ip:query_port,fe2_ip:query_port,fe3_ip:query_port?xxxxx")
                .withProperty("load-url", "fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port")
                .withProperty("username", "xxx")
                .withProperty("password", "xxx")
                .withProperty("table-name", "xxx")
                .withProperty("database-name", "xxx")
                .withProperty("sink.properties.format", "json")
                .withProperty("sink.properties.strip_outer_array", "true")
                .build()
        )
    );
    
    
    // -------- ストリーム変換を持つシンク --------
    class RowData {
        public int score;
        public String name;
        public RowData(int score, String name) {
            ......
        }
    }
    fromElements(
        new RowData[]{
            new RowData(99, "stephen"),
            new RowData(100, "lebron")
        }
    ).addSink(
        StarRocksSink.sink(
            // テーブル構造
            TableSchema.builder()
                .field("score", DataTypes.INT())
                .field("name", DataTypes.VARCHAR(20))
                .build(),
            // シンクオプション
            StarRocksSinkOptions.builder()
                .withProperty("jdbc-url", "jdbc:mysql://fe1_ip:query_port,fe2_ip:query_port,fe3_ip:query_port?xxxxx")
                .withProperty("load-url", "fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port")
                .withProperty("username", "xxx")
                .withProperty("password", "xxx")
                .withProperty("table-name", "xxx")
                .withProperty("database-name", "xxx")
                .withProperty("sink.properties.column_separator", "\\x01")
                .withProperty("sink.properties.row_delimiter", "\\x02")
                .build(),
            // streamRowDataでスロットを設定する
            (slots, streamRowData) -> {
                slots[0] = streamRowData.score;
                slots[1] = streamRowData.name;
            }
        )
    );
  • 方法 2

    // `structure`と`properties`でテーブルを作成する
    // 必要:`com.starrocks.connector.flink.table.StarRocksDynamicTableSinkFactory` を `src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory` に追加する
    tEnv.executeSql(
        "CREATE TABLE USER_RESULT(" +
            "name VARCHAR," +
            "score BIGINT" +
        ") WITH ( " +
            "'connector' = 'starrocks'," +
            "'jdbc-url'='jdbc:mysql://fe1_ip:query_port,fe2_ip:query_port,fe3_ip:query_port?xxxxx'," +
            "'load-url'='fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port'," +
            "'database-name' = 'xxx'," +
            "'table-name' = 'xxx'," +
            "'username' = 'xxx'," +
            "'password' = 'xxx'," +
            "'sink.buffer-flush.max-rows' = '1000000'," +
            "'sink.buffer-flush.max-bytes' = '300000000'," +
            "'sink.buffer-flush.interval-ms' = '5000'," +
            "'sink.properties.column_separator' = '\\x01'," +
            "'sink.properties.row_delimiter' = '\\x02'," +
            "'sink.max-retries' = '3'," +
            "'sink.properties.*' = 'xxx'" + // `'sink.properties.columns' = 'k1, v1'` のようなストリームロードプロパティ
        ")"
    );

次の表に、Flinkシンクのパラメータを示します。

パラメータ

必須

デフォルト値

タイプ

説明

connector

はい

デフォルト値なし

String

コネクタのタイプ。値を starrocks に設定します。

jdbc-url

はい

デフォルト値なし

String

StarRocksに接続し、StarRocksでクエリを実行するために使用されるJDBC URL。

load-url

はい

デフォルト値なし

String

フロントエンドのIPアドレスとHTTPポート。 fe_ip:http_port;fe_ip:http_port の形式。複数のフロントエンドのIPアドレスとHTTPポートはセミコロン(;)で区切ります。

database-name

はい

デフォルト値なし

String

StarRocksデータベースの名前。

table-name

はい

デフォルト値なし

String

StarRocksテーブルの名前。

username

はい

デフォルト値なし

String

StarRocksデータベースに接続するために使用されるユーザー名。

password

はい

デフォルト値なし

String

StarRocksデータベースに接続するために使用されるパスワード。

sink.semantic

いいえ

at-least-once

String

シンクのセマンティクス。有効な値: at-least-once および exactly-once 。

sink.buffer-flush.max-bytes

いいえ

94371840 (90 MB)

String

バッファに許可されるデータの最大量。 64 MB から 10 GB の範囲のデータ量を指定できます。

sink.buffer-flush.max-rows

いいえ

500000

String

バッファに許可されるデータ行の最大数。有効な値: 64000 から 5000000 。

sink.buffer-flush.interval-ms

いいえ

300000

String

バッファの更新間隔。有効な値: 1000 から 3600000 。単位:ミリ秒。

sink.max-retries

いいえ

1

String

最大再試行回数。有効な値: 0 から 10 。

sink.connect.timeout-ms

いいえ

1000

String

load-url パラメータで指定されたフロントエンドのIPアドレスとHTTPポートへの接続のタイムアウト期間。有効な値: 100 から 60000 。単位:ミリ秒。

sink.properties.*

いいえ

デフォルト値なし

String

シンクのプロパティ。

重要
  • Flinkシンクのexactly-onceセマンティクスを確保するには、外部システムが2フェーズコミットプロトコルをサポートするメカニズムを提供する必要があります。 StarRocksはこのメカニズムを持っていないため、Flinkのチェックポイント機能に依存する必要があります。 Flinkがチェックポイントを生成するたびに、データのバッチとそのラベルが状態としてキャッシュされます。チェックポイントが生成された後、状態のキャッシュされたデータがStarRocksに書き込まれるまで、システムはブロックされます。その後、Flinkは次のチェックポイントの生成を開始します。これが、StarRocksがexactly-onceセマンティクスを確保する方法です。この場合、StarRocksが故障すると、接続障害により、Flinkシンクのストリームオペレータが長時間ブロックされる可能性があります。その結果、アラートがトリガーされ、Flinkインポートジョブが強制終了されます。

  • デフォルトでは、データはCSV形式でインポートされます。 sink.properties.row_delimiter パラメータを \\x02 に設定して行区切り文字をカスタマイズし、 sink.properties.column_separator パラメータを \\x01 に設定して列区切り文字をカスタマイズできます。 sink.properties.row_delimiter パラメータは、StarRocks 1.15.0以降でサポートされています。

  • インポートジョブが予期せず停止した場合は、ジョブのメモリ容量を増やすことができます。

  • コードが正しく実行され、データはクエリできるが、データの書き込みに失敗した場合は、マシンがバックエンドのHTTPポートにアクセスできるかどうかを確認する必要があります。つまり、バックエンドのIPアドレスにPINGメッセージで到達できるかどうかを確認する必要があります。

    たとえば、マシンにパブリックIPアドレスと内部IPアドレスがあり、クラスターのフロントエンドまたはバックエンドにパブリックIPアドレスとHTTPポートを使用してアクセスでき、クラスター内のバックエンドへのアクセスに内部IPアドレスが使用されているとします。インポートジョブを送信するときに load-url パラメータを使用してフロントエンドのパブリックIPアドレスとHTTPポートを指定すると、フロントエンドは書き込み操作をバックエンドの内部IPアドレスとHTTPポートに転送します。マシンのPINGメッセージでバックエンドの内部IPアドレスに到達できない場合、書き込み操作は失敗します。

例:Flink StarRocksコネクタを使用してMySQLデータベースからデータを同期する

基本原則

Flink Change Data Capture(CDC)コネクタとStarRocks移行ツールを使用することで、MySQLデータベースからStarRocksクラスターに数秒以内にデータを同期できます。Flink

説明

このトピックの画像と一部の情報は、オープンソースStarRocksのMySQLからのリアルタイム同期からのものです。

StarRocks移行ツールは、MySQLデータベースとStarRocksクラスターの情報とテーブルスキーマに基づいて、ソーステーブルとシンクテーブルのCREATE TABLEステートメントを自動的に生成できます。

手順

  1. 準備を行います。

    • Flink のインストールパッケージをダウンロードします。

      Apache Flink 1.13を使用することをお勧めします。サポートされている最も古いバージョンはApache Flink 1.11です。

    • Flink CDCコネクタ のパッケージをダウンロードします。

      使用しているApache Flinkのバージョンに基づいて、MySQL用のFlink CDCコネクタのパッケージをダウンロードします。

    • Flink StarRocksコネクタ のパッケージをダウンロードします。

      重要

      Flink StarRocksコネクタは、Apache Flink 1.13、1.11、および1.12で異なります。

  2. ダウンロードした flink-sql-connector-mysql-cdc-xxx.jar パッケージと flink-connector-starrocks-xxx.jar パッケージを flink-xxx/lib/ ディレクトリにコピーして貼り付けます。

  3. smt.tar.gz パッケージをダウンロードし、パッケージを解凍してから、構成ファイルを変更します。

    構成ファイルの次のパラメータを変更します。

    • db:MySQLデータベースの接続情報。

    • be_num:StarRocksクラスターのノード数。

    • [table-rule.1]:一致ルール。正規表現を使用して、CREATE TABLEステートメントの作成に使用されるデータベース名とテーブル名を照合できます。複数のルールのセットを構成することもできます。

    • flink.starrocks.*:StarRocksクラスターの構成情報。

    サンプルコード:

    [db]
    host = 192.168.**.**
    port = 3306
    user = root
    password =
    
    [other]
    # StarRocksのバックエンドの数
    be_num = 3
    # `decimal_v3` は StarRocks-1.18.1 以降でサポートされています
    use_decimal_v3 = false
    # 変換された DDL SQL を保存するファイル
    output_dir = ./result
    
    [table-rule.1]
    # プロパティを設定するためのデータベースに一致するパターン
    database = ^console_19321.*$
    # プロパティを設定するためのテーブルに一致するパターン
    table = ^.*$
    
    ############################################
    ### flink シンク構成
    ### `connector`、`table-name`、`database-name` は設定しないでください。これらは自動生成されます
    ############################################
    flink.starrocks.jdbc-url=jdbc:mysql://192.168.**.**:9030
    flink.starrocks.load-url= 192.168.**.**:8030
    flink.starrocks.username=root
    flink.starrocks.password=
    flink.starrocks.sink.properties.column_separator=\x01
    flink.starrocks.sink.properties.row_delimiter=\x02
    flink.starrocks.sink.buffer-flush.interval-ms=15000
  4. starrocks-migrate-tool コマンドを実行して、 /result ディレクトリにすべてのCREATE TABLEステートメントを生成します。

    ls result コマンドを実行して、/resultディレクトリ内のファイルを表示できます。

    flink-create.1.sql    smt.tar.gz              starrocks-create.all.sql
    flink-create.all.sql  starrocks-create.1.sql
  5. 次のコマンドを実行して、StarRocksクラスターにデータベースとテーブルを作成します。

    Mysql -h <フロントエンドのIPアドレス> -P 9030 -u root -p < starrocks-create.1.sql
  6. 次のコマンドを実行して、Flinkでテーブルを生成し、データを継続的に同期します。

    bin/sql-client.sh -f flink-create.1.sql
    重要

    1.13より前のバージョンのApache Flinkを使用している場合、SQLスクリプトを直接実行できない可能性があります。スクリプト内のSQLステートメントを1つずつ実行し、MySQLデータベースのバイナリロギングを有効にする必要があります。

  7. 次のコマンドを実行して、Flinkジョブのステータスをクエリします。

    bin/flink list -running

    FlinkのWeb UIまたは$FLINK_HOME/logディレクトリにあるログファイルで、Flinkジョブの詳細とステータスを確認できます。

このトピックで提供されているサンプルコードを実行する場合は、次の項目に注意してください。

  • 複数のルールのセットを構成する必要がある場合は、データベース名とテーブル名を照合するために使用されるルールを構成し、各ルールのセットに対してFlink StarRocksコネクタを構成する必要があります。

    サンプルコード:

    [table-rule.1]
    # プロパティを設定するためのデータベースに一致するパターン
    database = ^console_19321.*$
    # プロパティを設定するためのテーブルに一致するパターン
    table = ^.*$
    
    ############################################
    ### flink シンク構成
    ### `connector`、`table-name`、`database-name` は設定しないでください。これらは自動生成されます
    ############################################
    flink.starrocks.jdbc-url=jdbc:mysql://192.168.**.**:9030
    flink.starrocks.load-url= 192.168.**.**:8030
    flink.starrocks.username=root
    flink.starrocks.password=
    flink.starrocks.sink.properties.column_separator=\x01
    flink.starrocks.sink.properties.row_delimiter=\x02
    flink.starrocks.sink.buffer-flush.interval-ms=15000
    
    [table-rule.2]
    # プロパティを設定するためのデータベースに一致するパターン
    database = ^database2.*$
    # プロパティを設定するためのテーブルに一致するパターン
    table = ^.*$
    
    ############################################
    ### flink シンク構成
    ### `connector`、`table-name`、`database-name` は設定しないでください。これらは自動生成されます
    ############################################
    flink.starrocks.jdbc-url=jdbc:mysql://192.168.**.**:9030
    flink.starrocks.load-url= 192.168.**.**:8030
    flink.starrocks.username=root
    flink.starrocks.password=
    # CSV形式でデータをインポートするための適切な区切り文字を選択できない場合は、JSON形式でデータをインポートできます。ただし、インポートのパフォーマンスが低下する可能性があります。JSON形式でデータをインポートするには、flink.starrocks.sink.properties.column_separatorパラメータとflink.starrocks.sink.properties.row_delimiterパラメータを次のパラメータに置き換えます。
    flink.starrocks.sink.properties.strip_outer_array=true
    flink.starrocks.sink.properties.format=json
    説明

    flink.starrocks.sinkパラメータを使用して、各ルールのセットのプロパティ(インポート頻度など)を構成できます。

  • シャーディングが実行されると、大きなテーブルのデータが複数のテーブルに分割されたり、複数のデータベースに分散されたりする可能性があります。この場合、複数のテーブルから1つのテーブルにデータを同期するためのルールのセットを構成できます。

    たとえば、edu_db_1データベースとedu_db_2データベースの両方に、course_1テーブルとcourse_2テーブルの2つのテーブルがあるとします。すべてのテーブルのスキーマは同じです。次のコードを実行して、これらのテーブルからStarRocksのテーブルにデータを同期し、さらに分析することができます。

    [table-rule.3]
    # プロパティを設定するためのデータベースに一致するパターン
    database = ^edu_db_[0-9]*$
    # プロパティを設定するためのテーブルに一致するパターン
    table = ^course_[0-9]*$
    
    ############################################
    ### flink シンク構成
    ### `connector`、`table-name`、`database-name` は設定しないでください。これらは自動生成されます
    ############################################
    flink.starrocks.jdbc-url=jdbc:mysql://192.168.**.**:9030
    flink.starrocks.load-url= 192.168.**.**:8030
    flink.starrocks.username=root
    flink.starrocks.password=
    flink.starrocks.sink.properties.column_separator=\x01
    flink.starrocks.sink.properties.row_delimiter=\x02
    flink.starrocks.sink.buffer-flush.interval-ms=5000

    コードを実行すると、多対1の同期関係が自動的に生成されます。デフォルトでは、StarRocksのテーブルの名前はcourse__auto_shardです。生成されたSQLファイルでテーブル名を変更できます。

  • SQLクライアントのCLIでテーブルを作成し、データを同期する必要がある場合は、バックスラッシュ(\)をエスケープする必要があります。

    サンプルコード:

    'sink.properties.column_separator' = '\\x01'
    'sink.properties.row_delimiter' = '\\x02'