Flink StarRocksコネクタを使用してデータをキャッシュし、Stream LoadモードでStarRocksに一度にインポートできます。このトピックでは、Flink StarRocksコネクタの使用方法と例について説明します。
背景情報
Apache Flinkによって提供されるFlink Java Database Connectivity(JDBC)コネクタは、StarRocksのインポートパフォーマンス要件を満たすことができません。そのため、Alibaba Cloudは、データをキャッシュしてからStream LoadモードでStarRocksに一度にインポートするのに役立つFlink StarRocksコネクタを提供しています。
Flinkコネクタの使用
GitHubのstarrocks-connector-for-apache-flink ページから、テスト用のFlink StarRocksコネクタのソースコードをダウンロードできます。
プロジェクトの pom.xml ファイルに次のコードを追加します。
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>flink-connector-starrocks</artifactId>
<!-- flink-1.11、flink-1.12の場合 -->
<version>x.x.x_flink-1.11</version>
<!-- flink-1.13の場合 -->
<version>x.x.x_flink-1.13</version>
</dependency>バージョン情報 ページでFlink StarRocksコネクタの最新バージョンを確認し、上記のコードの x.x.x を最新バージョンに置き換えることができます。
サンプルコード:
方法 1
// -------- 生のJSON文字列ストリームを持つシンク -------- fromElements(new String[]{ "{\"score\": \"99\", \"name\": \"stephen\"}", "{\"score\": \"100\", \"name\": \"lebron\"}" }).addSink( StarRocksSink.sink( // シンクオプション StarRocksSinkOptions.builder() .withProperty("jdbc-url", "jdbc:mysql://fe1_ip:query_port,fe2_ip:query_port,fe3_ip:query_port?xxxxx") .withProperty("load-url", "fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port") .withProperty("username", "xxx") .withProperty("password", "xxx") .withProperty("table-name", "xxx") .withProperty("database-name", "xxx") .withProperty("sink.properties.format", "json") .withProperty("sink.properties.strip_outer_array", "true") .build() ) ); // -------- ストリーム変換を持つシンク -------- class RowData { public int score; public String name; public RowData(int score, String name) { ...... } } fromElements( new RowData[]{ new RowData(99, "stephen"), new RowData(100, "lebron") } ).addSink( StarRocksSink.sink( // テーブル構造 TableSchema.builder() .field("score", DataTypes.INT()) .field("name", DataTypes.VARCHAR(20)) .build(), // シンクオプション StarRocksSinkOptions.builder() .withProperty("jdbc-url", "jdbc:mysql://fe1_ip:query_port,fe2_ip:query_port,fe3_ip:query_port?xxxxx") .withProperty("load-url", "fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port") .withProperty("username", "xxx") .withProperty("password", "xxx") .withProperty("table-name", "xxx") .withProperty("database-name", "xxx") .withProperty("sink.properties.column_separator", "\\x01") .withProperty("sink.properties.row_delimiter", "\\x02") .build(), // streamRowDataでスロットを設定する (slots, streamRowData) -> { slots[0] = streamRowData.score; slots[1] = streamRowData.name; } ) );方法 2
// `structure`と`properties`でテーブルを作成する // 必要:`com.starrocks.connector.flink.table.StarRocksDynamicTableSinkFactory` を `src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory` に追加する tEnv.executeSql( "CREATE TABLE USER_RESULT(" + "name VARCHAR," + "score BIGINT" + ") WITH ( " + "'connector' = 'starrocks'," + "'jdbc-url'='jdbc:mysql://fe1_ip:query_port,fe2_ip:query_port,fe3_ip:query_port?xxxxx'," + "'load-url'='fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port'," + "'database-name' = 'xxx'," + "'table-name' = 'xxx'," + "'username' = 'xxx'," + "'password' = 'xxx'," + "'sink.buffer-flush.max-rows' = '1000000'," + "'sink.buffer-flush.max-bytes' = '300000000'," + "'sink.buffer-flush.interval-ms' = '5000'," + "'sink.properties.column_separator' = '\\x01'," + "'sink.properties.row_delimiter' = '\\x02'," + "'sink.max-retries' = '3'," + "'sink.properties.*' = 'xxx'" + // `'sink.properties.columns' = 'k1, v1'` のようなストリームロードプロパティ ")" );
次の表に、Flinkシンクのパラメータを示します。
パラメータ | 必須 | デフォルト値 | タイプ | 説明 |
connector | はい | デフォルト値なし | String | コネクタのタイプ。値を starrocks に設定します。 |
jdbc-url | はい | デフォルト値なし | String | StarRocksに接続し、StarRocksでクエリを実行するために使用されるJDBC URL。 |
load-url | はい | デフォルト値なし | String | フロントエンドのIPアドレスとHTTPポート。 |
database-name | はい | デフォルト値なし | String | StarRocksデータベースの名前。 |
table-name | はい | デフォルト値なし | String | StarRocksテーブルの名前。 |
username | はい | デフォルト値なし | String | StarRocksデータベースに接続するために使用されるユーザー名。 |
password | はい | デフォルト値なし | String | StarRocksデータベースに接続するために使用されるパスワード。 |
sink.semantic | いいえ | at-least-once | String | シンクのセマンティクス。有効な値: at-least-once および exactly-once 。 |
sink.buffer-flush.max-bytes | いいえ | 94371840 (90 MB) | String | バッファに許可されるデータの最大量。 64 MB から 10 GB の範囲のデータ量を指定できます。 |
sink.buffer-flush.max-rows | いいえ | 500000 | String | バッファに許可されるデータ行の最大数。有効な値: 64000 から 5000000 。 |
sink.buffer-flush.interval-ms | いいえ | 300000 | String | バッファの更新間隔。有効な値: 1000 から 3600000 。単位:ミリ秒。 |
sink.max-retries | いいえ | 1 | String | 最大再試行回数。有効な値: 0 から 10 。 |
sink.connect.timeout-ms | いいえ | 1000 | String | load-url パラメータで指定されたフロントエンドのIPアドレスとHTTPポートへの接続のタイムアウト期間。有効な値: 100 から 60000 。単位:ミリ秒。 |
sink.properties.* | いいえ | デフォルト値なし | String | シンクのプロパティ。 |
Flinkシンクのexactly-onceセマンティクスを確保するには、外部システムが2フェーズコミットプロトコルをサポートするメカニズムを提供する必要があります。 StarRocksはこのメカニズムを持っていないため、Flinkのチェックポイント機能に依存する必要があります。 Flinkがチェックポイントを生成するたびに、データのバッチとそのラベルが状態としてキャッシュされます。チェックポイントが生成された後、状態のキャッシュされたデータがStarRocksに書き込まれるまで、システムはブロックされます。その後、Flinkは次のチェックポイントの生成を開始します。これが、StarRocksがexactly-onceセマンティクスを確保する方法です。この場合、StarRocksが故障すると、接続障害により、Flinkシンクのストリームオペレータが長時間ブロックされる可能性があります。その結果、アラートがトリガーされ、Flinkインポートジョブが強制終了されます。
デフォルトでは、データはCSV形式でインポートされます。 sink.properties.row_delimiter パラメータを \\x02 に設定して行区切り文字をカスタマイズし、 sink.properties.column_separator パラメータを \\x01 に設定して列区切り文字をカスタマイズできます。 sink.properties.row_delimiter パラメータは、StarRocks 1.15.0以降でサポートされています。
インポートジョブが予期せず停止した場合は、ジョブのメモリ容量を増やすことができます。
コードが正しく実行され、データはクエリできるが、データの書き込みに失敗した場合は、マシンがバックエンドのHTTPポートにアクセスできるかどうかを確認する必要があります。つまり、バックエンドのIPアドレスにPINGメッセージで到達できるかどうかを確認する必要があります。
たとえば、マシンにパブリックIPアドレスと内部IPアドレスがあり、クラスターのフロントエンドまたはバックエンドにパブリックIPアドレスとHTTPポートを使用してアクセスでき、クラスター内のバックエンドへのアクセスに内部IPアドレスが使用されているとします。インポートジョブを送信するときに
load-urlパラメータを使用してフロントエンドのパブリックIPアドレスとHTTPポートを指定すると、フロントエンドは書き込み操作をバックエンドの内部IPアドレスとHTTPポートに転送します。マシンのPINGメッセージでバックエンドの内部IPアドレスに到達できない場合、書き込み操作は失敗します。
例:Flink StarRocksコネクタを使用してMySQLデータベースからデータを同期する
基本原則
Flink Change Data Capture(CDC)コネクタとStarRocks移行ツールを使用することで、MySQLデータベースからStarRocksクラスターに数秒以内にデータを同期できます。
このトピックの画像と一部の情報は、オープンソースStarRocksのMySQLからのリアルタイム同期からのものです。
StarRocks移行ツールは、MySQLデータベースとStarRocksクラスターの情報とテーブルスキーマに基づいて、ソーステーブルとシンクテーブルのCREATE TABLEステートメントを自動的に生成できます。
手順
準備を行います。
Flink のインストールパッケージをダウンロードします。
Apache Flink 1.13を使用することをお勧めします。サポートされている最も古いバージョンはApache Flink 1.11です。
Flink CDCコネクタ のパッケージをダウンロードします。
使用しているApache Flinkのバージョンに基づいて、MySQL用のFlink CDCコネクタのパッケージをダウンロードします。
Flink StarRocksコネクタ のパッケージをダウンロードします。
重要Flink StarRocksコネクタは、Apache Flink 1.13、1.11、および1.12で異なります。
ダウンロードした flink-sql-connector-mysql-cdc-xxx.jar パッケージと flink-connector-starrocks-xxx.jar パッケージを flink-xxx/lib/ ディレクトリにコピーして貼り付けます。
smt.tar.gz パッケージをダウンロードし、パッケージを解凍してから、構成ファイルを変更します。
構成ファイルの次のパラメータを変更します。
db:MySQLデータベースの接続情報。be_num:StarRocksクラスターのノード数。[table-rule.1]:一致ルール。正規表現を使用して、CREATE TABLEステートメントの作成に使用されるデータベース名とテーブル名を照合できます。複数のルールのセットを構成することもできます。flink.starrocks.*:StarRocksクラスターの構成情報。
サンプルコード:
[db] host = 192.168.**.** port = 3306 user = root password = [other] # StarRocksのバックエンドの数 be_num = 3 # `decimal_v3` は StarRocks-1.18.1 以降でサポートされています use_decimal_v3 = false # 変換された DDL SQL を保存するファイル output_dir = ./result [table-rule.1] # プロパティを設定するためのデータベースに一致するパターン database = ^console_19321.*$ # プロパティを設定するためのテーブルに一致するパターン table = ^.*$ ############################################ ### flink シンク構成 ### `connector`、`table-name`、`database-name` は設定しないでください。これらは自動生成されます ############################################ flink.starrocks.jdbc-url=jdbc:mysql://192.168.**.**:9030 flink.starrocks.load-url= 192.168.**.**:8030 flink.starrocks.username=root flink.starrocks.password= flink.starrocks.sink.properties.column_separator=\x01 flink.starrocks.sink.properties.row_delimiter=\x02 flink.starrocks.sink.buffer-flush.interval-ms=15000starrocks-migrate-toolコマンドを実行して、 /result ディレクトリにすべてのCREATE TABLEステートメントを生成します。ls resultコマンドを実行して、/resultディレクトリ内のファイルを表示できます。flink-create.1.sql smt.tar.gz starrocks-create.all.sql flink-create.all.sql starrocks-create.1.sql次のコマンドを実行して、StarRocksクラスターにデータベースとテーブルを作成します。
Mysql -h <フロントエンドのIPアドレス> -P 9030 -u root -p < starrocks-create.1.sql次のコマンドを実行して、Flinkでテーブルを生成し、データを継続的に同期します。
bin/sql-client.sh -f flink-create.1.sql重要1.13より前のバージョンのApache Flinkを使用している場合、SQLスクリプトを直接実行できない可能性があります。スクリプト内のSQLステートメントを1つずつ実行し、MySQLデータベースのバイナリロギングを有効にする必要があります。
次のコマンドを実行して、Flinkジョブのステータスをクエリします。
bin/flink list -runningFlinkのWeb UIまたは$FLINK_HOME/logディレクトリにあるログファイルで、Flinkジョブの詳細とステータスを確認できます。
このトピックで提供されているサンプルコードを実行する場合は、次の項目に注意してください。
複数のルールのセットを構成する必要がある場合は、データベース名とテーブル名を照合するために使用されるルールを構成し、各ルールのセットに対してFlink StarRocksコネクタを構成する必要があります。
サンプルコード:
[table-rule.1] # プロパティを設定するためのデータベースに一致するパターン database = ^console_19321.*$ # プロパティを設定するためのテーブルに一致するパターン table = ^.*$ ############################################ ### flink シンク構成 ### `connector`、`table-name`、`database-name` は設定しないでください。これらは自動生成されます ############################################ flink.starrocks.jdbc-url=jdbc:mysql://192.168.**.**:9030 flink.starrocks.load-url= 192.168.**.**:8030 flink.starrocks.username=root flink.starrocks.password= flink.starrocks.sink.properties.column_separator=\x01 flink.starrocks.sink.properties.row_delimiter=\x02 flink.starrocks.sink.buffer-flush.interval-ms=15000 [table-rule.2] # プロパティを設定するためのデータベースに一致するパターン database = ^database2.*$ # プロパティを設定するためのテーブルに一致するパターン table = ^.*$ ############################################ ### flink シンク構成 ### `connector`、`table-name`、`database-name` は設定しないでください。これらは自動生成されます ############################################ flink.starrocks.jdbc-url=jdbc:mysql://192.168.**.**:9030 flink.starrocks.load-url= 192.168.**.**:8030 flink.starrocks.username=root flink.starrocks.password= # CSV形式でデータをインポートするための適切な区切り文字を選択できない場合は、JSON形式でデータをインポートできます。ただし、インポートのパフォーマンスが低下する可能性があります。JSON形式でデータをインポートするには、flink.starrocks.sink.properties.column_separatorパラメータとflink.starrocks.sink.properties.row_delimiterパラメータを次のパラメータに置き換えます。 flink.starrocks.sink.properties.strip_outer_array=true flink.starrocks.sink.properties.format=json説明flink.starrocks.sinkパラメータを使用して、各ルールのセットのプロパティ(インポート頻度など)を構成できます。
シャーディングが実行されると、大きなテーブルのデータが複数のテーブルに分割されたり、複数のデータベースに分散されたりする可能性があります。この場合、複数のテーブルから1つのテーブルにデータを同期するためのルールのセットを構成できます。
たとえば、edu_db_1データベースとedu_db_2データベースの両方に、course_1テーブルとcourse_2テーブルの2つのテーブルがあるとします。すべてのテーブルのスキーマは同じです。次のコードを実行して、これらのテーブルからStarRocksのテーブルにデータを同期し、さらに分析することができます。
[table-rule.3] # プロパティを設定するためのデータベースに一致するパターン database = ^edu_db_[0-9]*$ # プロパティを設定するためのテーブルに一致するパターン table = ^course_[0-9]*$ ############################################ ### flink シンク構成 ### `connector`、`table-name`、`database-name` は設定しないでください。これらは自動生成されます ############################################ flink.starrocks.jdbc-url=jdbc:mysql://192.168.**.**:9030 flink.starrocks.load-url= 192.168.**.**:8030 flink.starrocks.username=root flink.starrocks.password= flink.starrocks.sink.properties.column_separator=\x01 flink.starrocks.sink.properties.row_delimiter=\x02 flink.starrocks.sink.buffer-flush.interval-ms=5000コードを実行すると、多対1の同期関係が自動的に生成されます。デフォルトでは、StarRocksのテーブルの名前はcourse__auto_shardです。生成されたSQLファイルでテーブル名を変更できます。
SQLクライアントのCLIでテーブルを作成し、データを同期する必要がある場合は、バックスラッシュ(
\)をエスケープする必要があります。サンプルコード:
'sink.properties.column_separator' = '\\x01' 'sink.properties.row_delimiter' = '\\x02'