このトピックでは、Apache Flink 1.11 以降から Hologres にリアルタイムでデータを書き込む方法について説明します。
前提条件
Hologres インスタンスが作成され、開発ツールを使用してインスタンスに接続します。詳細については、「HoloWeb に接続する」をご参照ください。
Apache Flink クラスターが作成されます。この例では、Apache Flink 1.15 のクラスターが作成されます。Apache Flink の公式 Web サイトからバイナリファイルをダウンロードし、スタンドアロンモードで動作する Apache Flink クラスターを作成できます。詳細については、「Flink のローカルインストール」をご参照ください。
背景
Hologres コネクタのコードは、Apache Flink 1.11 以降でオープンソース化されています。さまざまな Apache Flink バージョンに対応する Hologres コネクタの リリースパッケージ は、Maven リポジトリで公開されています。次の pom.xml ファイルは、構成の例を示しています。
<dependency>
<groupId>com.alibaba.hologres</groupId>
<artifactId>hologres-connector-flink-1.15</artifactId>
<version>1.4.0</version>
<classifier>jar-with-dependencies</classifier>
</dependency>次の表に、Apache Flink と Hologres コネクタ のバージョンマッピングを示します。より多くの機能を使用するには、バージョン 1.15 以降を使用することを推奨します。
Apache Flink バージョン | Hologres コネクタバージョン |
Apache Flink 1.11 | hologres-connector-flink-1.11:1.0.1 |
Apache Flink 1.12 | hologres-connector-flink-1.12:1.0.1 |
Apache Flink 1.13 | hologres-connector-flink-1.13:1.3.2 |
Apache Flink 1.14 | hologres-connector-flink-1.14:1.3.2 |
Apache Flink 1.15 | hologres-connector-flink-1.15:1.4.1 |
Apache Flink 1.17 | hologres-connector-flink-1.17:1.4.1 |
サンプルコード
次の構文を使用する Flink SQL 文を実行して、Hologres にデータを書き込むことができます。
String createHologresTable =
String.format(
"create table sink("
+ " user_id bigint,"
+ " user_name string,"
+ " price decimal(38,2),"
+ " sale_timestamp timestamp"
+ ") with ("
+ " 'connector'='hologres',"
+ " 'dbname' = '%s',"
+ " 'tablename' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'endpoint' = '%s'"
+ ")",
database, tableName, userName, password, endPoint);
tEnv.executeSql(createHologresTable);
createScanTable(tEnv);
tEnv.executeSql("insert into sink select * from source");
FlinkSQLToHoloExample: Flink SQL インターフェイスに基づいて Hologres にデータを書き込むために使用されるアプリケーション。
FlinkDSAndSQLToHoloExample: データストリームをテーブルに変換して Hologres に書き込むためのアプリケーション。Flink DataStream インターフェイスは変換に使用され、Flink SQL インターフェイスはデータ書き込みに使用されます。
FlinkDataStreamToHoloExample: Flink DataStream インターフェイスに基づいて Hologres にデータストリームを書き込むためのアプリケーション。
FlinkRoaringBitmapAggJob: リアルタイムで重複排除後にユニークビジター (UV) 数をカウントし、統計を Hologres に書き込むために使用されるアプリケーション。このアプリケーションには、Flink、roaring bitmap、および Hologres ディメンションテーブルが使用されます。
FlinkToHoloRePartitionExample: Flink DataStream インターフェイスに基づいて、リアルタイムのデータ書き込み中に Hologres のシャードにデータをパーティション分割するために使用されるアプリケーション。これにより、Hologres 内の small ファイルの数が大幅に減少し、書き込みパフォーマンスが向上し、システム負荷が軽減されます。このアプリケーションは、複数のプライマリキーが構成された空のテーブルに同時にデータをインポートするシナリオに適しています。このアプリケーションは、INSERT OVERWRITE 操作と同様の効果を実現します。
Hologres コネクタのオプション
Hologres コネクタを使用して、Apache Flink から Hologres にデータを書き込むことができます。次の表に、Hologres コネクタで使用されるパラメーターを示します。
オプション | 必須 | 説明 |
connector | はい | シンクテーブルのタイプ。値を hologres に設定します。 |
dbname | はい | Hologres データベースの名前。 |
tablename | はい | データを書き込む Hologres テーブルの名前。 |
username | はい | Alibaba Cloud アカウントの AccessKey ID。 AccessKey ペア ページから AccessKey ID を取得できます。 |
password | はい | Alibaba Cloud アカウントの AccessKey Secret。 AccessKey ペア ページから AccessKey Secret を取得できます。 |
endpoint | はい | Hologres インスタンスの VPC (Virtual Private Cloud) エンドポイント。Hologres コンソール のインスタンス詳細ページで Hologres インスタンスのエンドポイントを表示できます。 説明 エンドポイントにはポート番号を含める必要があり、 |
接続固有
オプション
必須
説明
connectionSize
いいえ
Flink Hologres タスクによって作成された単一の接続プール内の Java Database Connectivity (JDBC) 接続の数。
デフォルト値: 3。この値はスループットに比例します。
connectionPoolName
いいえ
接続プールの名前。同じ TaskManager 内で、同じ接続プールが構成されているテーブルは、接続プールを共有できます。
デフォルト値はありません。各テーブルは、デフォルトで独自の接続プールを使用します。接続プール名を指定する場合、すべてのテーブルの connectionSize パラメーターの値は同じである必要があります。
fixedConnectionMode
いいえ
データ書き込みおよびポイントクエリ操作は接続を消費しません。この機能は Beta リリースであり、コネクタのバージョンが 1.2.0 以降、Hologres のバージョンが 1.3 以降である必要があります。
デフォルト値: false。
jdbcRetryCount
いいえ
接続エラーが発生した場合に、データの書き込みとクエリを再試行できる最大回数。
デフォルト値: 10。
jdbcRetrySleepInitMs
いいえ
リトライが実行される間隔は、次の数式を使用して計算されます: retrySleepInitMs パラメーターの値 + リトライ回数 × retrySleepStepMs パラメーターの値。
単位: ミリ秒。デフォルト値: 1000。
jdbcRetrySleepStepMs
いいえ
リトライが実行される間隔は、次の数式を使用して計算されます: retrySleepInitMs パラメーターの値 + リトライ回数 × retrySleepStepMs パラメーターの値。
単位: ミリ秒。デフォルト値: 5000。
jdbcConnectionMaxIdleMs
いいえ
データの書き込みおよびポイントクエリに使用される接続に適用されるアイドルタイムアウト期間。タイムアウト期間が過ぎると、接続は解放されます。
単位: ミリ秒。デフォルト値: 60000。
jdbcMetaCacheTTL
いいえ
キャッシュ内のテーブルスキーマ情報の生存時間 (TTL) 期間。
単位: ミリ秒。デフォルト値: 60000。
jdbcMetaAutoRefreshFactor
いいえ
キャッシュを自動的にリフレッシュするタイミングを決定する係数。キャッシュ内のテーブルスキーマ情報の残りの TTL 期間が、
jdbcMetaCacheTTLパラメーターの値をこのパラメーターの値で割った結果よりも短い場合、キャッシュは自動的にリフレッシュされます。デフォルト値: -1。デフォルト値は、キャッシュが自動的にリフレッシュされないことを示します。
connection.ssl.mode
いいえ
SSL 暗号化伝送を有効にするかどうかを指定します。有効な値:
disable: SSL 暗号化伝送は無効です。これがデフォルト値です。
require: SSL 暗号化伝送が有効です。クライアントは SSL 暗号化伝送を使用して、データ送信に使用される接続のみを暗号化します。
verify-ca: SSL 暗号化伝送が有効です。クライアントは SSL 暗号化伝送を使用してデータ送信接続を暗号化し、CA 証明書を使用して Hologres サーバーを検証します。
verify-full: SSL 暗号化伝送が有効です。クライアントは SSL 暗号化伝送を使用してデータ送信接続を暗号化し、CA 証明書を使用して Hologres サーバーを検証し、CA 証明書で指定された CN またはドメインネームシステム (DNS) が接続設定時に指定された Hologres エンドポイントと一致するかどうかを確認します。
connection.ssl.root-cert.location
いいえ
CA 証明書のパス。CA 証明書が Apache Flink クラスターにアップロードされていることを確認する必要があります。
説明このパラメーターは、connection.ssl.mode パラメーターを verify-ca または verify-full に設定した場合に必須です。
jdbcDirectConnect
いいえ
直接接続モードを有効にするかどうかを指定します。有効な値:
false: 無効。これがデフォルト値です。
true: 有効。
Apache Flink を使用して書き込むことができるデータ量は、VPC エンドポイントで許可されるスループットによって決まります。このパラメーターを true に設定すると、システムは現在の環境で Apache Flink が Hologres FE ノードに直接接続できるかどうかを確認します。Apache Flink が現在の環境で Hologres FE ノードに直接接続できる場合、デフォルトで直接接続が使用されます。
シンク固有
パラメーター
必須
説明
mutatetype
いいえ
データが書き込まれるモード。詳細については、「Hologres 結果テーブルの作成」の「ストリーミングセマンティクス」セクションをご参照ください。
デフォルト値: insertorignore。
ignoredelete
いいえ
リトラクトメッセージを無視するかどうかを指定します。ほとんどの場合、リトラクトメッセージは Flink の GROUP BY 操作によって生成されます。リトラクトメッセージが Hologres コネクタに転送されると、Delete リクエストが生成されます。
デフォルト値: true。このパラメーターは、ストリーミングセマンティクスが使用されている場合にのみ有効です。
createparttable
いいえ
パーティションテーブルにデータを書き込む場合に、パーティションキーの値に基づいてパーティションを自動的に作成するかどうかを指定します。
false: パーティションは自動的に作成されません。これがデフォルト値です。
true: パーティションは自動的に作成されます。
このパラメーターを使用する際は注意してください。パーティションキーの値にダーティデータが含まれていないことを確認してください。そうしないと、無効なパーティションが作成されます。
ignoreNullWhenUpdate
いいえ
mutatetype='insertOrUpdate'設定が使用されている場合に、データに書き込まれる null 値を無視するかどうかを指定します。デフォルト値: false。
jdbcWriteBatchSize
いいえ
Hologres ストリーミングシンクノードのバッチで収集できるデータエントリの最大数。
デフォルト値: 256。
jdbcWriteBatchByteSize
いいえ
スレッド内の Hologres ストリーミングシンクノードのバッチで収集できるデータの最大サイズ。単位: バイト。
デフォルト値: 2097152 (2 × 1024 × 1024)、つまり 2 MB。
jdbcWriteBatchTotalByteSize
いいえ
すべてのスレッドの Hologres ストリーミングシンクノードのバッチで収集できるデータの最大サイズ。単位: バイト。
デフォルト値: 20971520 (20 × 1024 × 1024)、つまり 20 MB。
jdbcWriteFlushInterval
いいえ
FLUSH 操作が完了するのを待つ最大時間。FLUSH 操作は、Hologres ストリーミングシンクノードのバッチで収集されたデータエントリに対して実行されます。
単位: ミリ秒。デフォルト値: 10000、つまり 10 秒。
jdbcUseLegacyPutHandler
いいえ
SQL 文で使用される構文。有効な値:
true:
insert into xxx(c0,c1,...) values (?,,..),.. on conflict;構文が使用されます。false:
insert into xxx(c0,c1,...) select unnest(?),unnest(?),.. on conflict構文が使用されます。これがデフォルト値です。
jdbcEnableDefaultForNotNullColumn
いいえ
非 NULL 制約を採用し、デフォルト値が指定されていない列に null 値が書き込まれるときに、null 値を指定されたデフォルト値に変換するかどうかを指定します。宛先列が STRING 型の場合、null 値は空の文字列 ("") に変換されます。宛先列が NUMBER 型の場合、null 値は 0 に変換されます。宛先列が DATE、TIMESTAMP、または TIMESTAMPTZ 型の場合、null 値は 1970-01-01 00:00:00 に変換されます。
デフォルト値: true。
remove-u0000-in-text.enabled
いいえ
UTF-8 でエンコードされていない TEXT 型の u0000 文字を自動的に置き換えるかどうかを指定します。有効な値:
true: u0000 文字は置き換えられます。
false: u0000 文字は置き換えられません。これがデフォルト値です。
deduplication.enabled
いいえ
同時に書き込まれるデータ量に同じプライマリキー値を持つデータレコードが含まれている場合に、重複排除を実行するかどうかを指定します。有効な値:
true: 重複排除が実行されます。他のデータレコードと同じプライマリキー値を持つ最後のデータレコードのみが保持されます。これがデフォルト値です。
false: 重複排除は実行されません。
バッチ処理されたデータが書き込まれた後、挿入する必要があるデータが書き込まれます。
説明重複排除が無効になっている場合、極端なケースではバッチデータの書き込みが実行されないことがあります。たとえば、書き込む必要があるすべてのデータが同じプライマリキー値を持つ場合、データを同時に書き込むことはできません。これは書き込みパフォーマンスに悪影響を及ぼします。
aggressive.enabled
いいえ
アグレッシブコミットモードを有効にするかどうかを指定します。有効な値:
true: 有効。
アグレッシブコミットモードでは、同時に処理されるデータレコードの数が期待される数に達したかどうかに関係なく、アイドル状態の接続を検出すると、システムは強制的にデータをコミットします。このモードでは、トラフィックが少ない場合にデータ転送のレイテンシを短縮できます。
false: 無効。これがデフォルト値です。
jdbcCopyWriteMode
いいえ
データ書き込みモードを指定します。有効な値:
true: バルクロードを使用します。
jdbcCopyWriteModeオプションもtrueに設定されている場合、この設定が有効になります。それ以外の場合は、固定コピーが使用されます。説明バルクロードは固定コピーよりも効率的です。Hologres リソースをより効果的に使用し、データ書き込みのパフォーマンスを向上させます。
プライマリキーを持つテーブルへのバルクロードは、通常、テーブルロックを引き起こします。これを防ぐには、
target-shards.enabledをtrueに設定して、ロックの粒度をシャードレベルに減らします。これにより、複数のバルクロードジョブが同時に実行され、テーブルロックの発生が減少します。バルクロードは、固定コピーと比較して Hologres インスタンスの負荷も大幅に削減します。テストでは、約 66.7% の負荷削減が示されています。プライマリキーを持つテーブルにバルクロードを使用して書き込む場合、テーブルは空である必要があります。これにより、書き込みプロセス中のプライマリキーの重複排除とパフォーマンスの低下が回避されます。
false (デフォルト): INSERT メソッドを使用してデータを書き込みます。
説明Hologres V1.3.1 以降のみがこのオプションをサポートしています。
jdbcCopyWriteFormat
いいえ
バイナリプロトコルを使用するかどうかを指定します。
binary: バイナリプロトコルが使用され、より高速な転送速度を提供します。これがデフォルト値です。
text: テキストモードが使用されます。
Hologres V1.3.1 以降のみがこのパラメーターをサポートしています。
bulkLoad
バルクロードを使用してデータを書き込むかどうかを指定します。有効な値:
true: バルクロードを使用してデータを書き込みます。このオプション値は、
jdbcCopyWriteModeがtrueに設定されている場合にのみ有効です。説明固定コピーと比較して、バルクロードはより良いリソース使用率を提供します。
デフォルトでは、バルクロードはプライマリキーのないテーブルにデータを書き込む場合にのみサポートされます。プライマリキーが構成されたテーブルにデータを書き込むと、テーブルレベルのロックが取得される場合があります。これにより、制限が生じたり、データの書き込みが複雑になったりする可能性があります。
false (デフォルト): バルクロードを使用しません。
説明Hologres V1.4.0 以降のみがこのオプションをサポートしています。
target-shards.enabled
ターゲットシャードへのバルクロードを有効にするかどうかを指定します。有効な値:
true
false (デフォルト)
説明Hologres V1.4.1 以降のみがこのオプションをサポートしています。
ポイントクエリ固有
パラメーター
必須
説明
jdbcReadBatchSize
いいえ
ディメンションテーブルに基づいてポイントクエリを実行するための、スレッド内のバッチで許可されるリクエストの最大数。
デフォルト値: 128。
jdbcReadBatchQueueSize
いいえ
ディメンションテーブルに基づいてポイントクエリを実行するための、スレッドで許可されるキューに入れられたリクエストの最大数。
デフォルト値: 256。
async
いいえ
非同期モードでデータを同期するかどうかを指定します。
デフォルト値: false。非同期モードでは、複数のリクエストと応答が同時に処理されます。したがって、連続するリクエストは互いにブロックせず、クエリのスループットが向上します。ただし、非同期モードでは、リクエストは絶対的な順序で処理されません。
cache
いいえ
キャッシュポリシー。
デフォルト値: None。有効な値: None および LRU。None はデータがキャッシュされないことを示します。LRU は、ディメンションテーブル内の一部のデータがキャッシュされることを示します。システムは、ソーステーブルからデータレコードを受信するたびに、キャッシュ内でデータレコードを検索します。システムがキャッシュ内でレコードを見つけられない場合、システムは物理ディメンションテーブルでデータレコードを検索します。
cachesize
いいえ
キャッシュできるデータレコードの最大数。
デフォルト値: 10000。cache パラメーターを LRU に設定した場合、cachesize パラメーターを構成できます。
cachettlms
いいえ
キャッシュが更新される間隔。単位: ミリ秒。
cache パラメーターを LRU に設定した場合、cachettlms パラメーターを構成できます。デフォルトでは、キャッシュエントリは有効期限切れになりません。
cacheempty
いいえ
戻り結果が空の JOIN クエリのデータをキャッシュするかどうかを指定します。
デフォルト値: true。この値は、システムが戻り結果が空の JOIN クエリのデータをキャッシュすることを示します。false: システムは、戻り結果が空の JOIN クエリのデータをキャッシュしません。
データの型のマッピング
フルマネージド Flink と Hologres 間のデータの型のマッピングについては、「データの型」の「Realtime Compute for Apache Flink または Blink と Hologres 間のデータの型のマッピング」セクションをご参照ください。