Hologres は Real-time Compute for Flink の排他モードと深く統合されており、コネクタを使用して Hologres 結果テーブルにデータを書き込むことができます。書き込まれたデータは即座にクエリ可能です。本トピックでは、Real-time Compute for Flink の排他モードから Hologres 結果テーブルにデータを書き込む方法について説明します。
制限事項
-
Real-time Compute for Flink の排他モードのバージョンによって、開発者向けのセマンティクスが異なります。作業を開始する前に、ご利用のバージョンを確認し、該当バージョンの例を参照してください。
-
接続エラーを防ぐため、Real-time Compute for Flink サービスと Hologres インスタンスが同一リージョンにあることを確認してください。
-
Real-time Compute for Flink の排他モードでバージョン 3.6 より前のものには、組み込みの Hologres コネクタが含まれていません。Hologres にリアルタイムでデータを書き込むには、JAR ファイルを参照する必要があります。トラブルシューティングについては、「一般的なスペックアップ準備失敗エラー」をご参照ください。フィードバックについては、Hologres DingTalk グループにご参加いただけます。グループへの参加方法の詳細については、「オンラインサポートをさらに利用する方法」をご参照ください。
説明ジョブを実行するには、バージョン 3.6 以降へのスペックアップを推奨します。
-
Real-time Compute for Flink の排他モード バージョン 3.7 では、Hologres パーティションテーブルの自動作成がサポートされています。ただし、ジョブ内で
createparttable='true'を構成する必要があります。パーティションテーブルを使用する場合、以下の点にご注意ください。-
Hologres では現在、リストパーティションのみをサポートしています。
-
パーティションテーブルを作成する際は、パーティションキー列を明示的に指定する必要があります。現在、パーティションキー列としてサポートされているのは text 型および int4 型のみです。パーティション値にはハイフン (-) を含めることはできません(例:
2020-09-12)。 -
パーティションテーブルにプライマリキーが設定されている場合、パーティションキー列はプライマリキーの一部である必要があります。
-
子パーティションテーブルを作成する際、子テーブルのパーティションキー列には静的フィールド値を指定する必要があります。
-
子パーティションテーブルに書き込まれるデータのパーティションキー列の値は、子テーブル作成時に定義された値と完全に一致している必要があります。一致しない場合はエラーが報告されます。
-
DEFAULT パーティション機能はサポートされていません。
-
-
宛先の Hologres テーブルにプライマリキーが設定されている場合、リアルタイム書き込みのデフォルトセマンティクスでは、プライマリキーに基づくデータ更新は行われません。重複するプライマリキーを持つデータは破棄されます。
-
Hologres は非同期でデータを書き込みます。ジョブに
blink.checkpoint.fail_on_checkpoint_error=true構成を追加する必要があります。これにより、ジョブが失敗した際にフェールオーバーがトリガーされます。このパラメーターは、Real-time Compute for Flink の排他モード バージョン 3.7.6 以降では不要です。
DDL セマンティクス
次の文を使用して、Hologres 結果テーブルを作成します。
create table Hologres_sink(
name varchar,
age BIGINT,
birthday BIGINT
) with (
type='hologres',
dbname='<yourDbname>', --Hologres データベースの名前。
tablename='<yourTablename>', --Hologres でデータを受信するテーブルの名前。
username='<yourUsername>', --Alibaba Cloud アカウントの AccessKey ID。
password='<yourPassword>', --Alibaba Cloud アカウントの AccessKey Secret。
endpoint='<yourEndpoint>'); --Hologres インスタンスの VPC ネットワークのエンドポイント。
WITH パラメーター
|
パラメーター |
説明 |
例 |
|
type |
結果テーブルのタイプ。値は hologres に固定されています。 |
hologres |
|
endpoint |
Hologres インスタンスの VPC ネットワークアドレス。 Hologres 管理コンソール にアクセスし、宛先インスタンスの製品ページで、Network Information セクションからエンドポイントを取得してください。エンドポイントには ip:port 形式でポート番号を含める必要があります。 |
demo-cn-hangzhou-vpc.hologres.aliyuncs.com:80 |
|
username |
AccessKey ID AccessKey 管理 から AccessKey ID を取得できます。 |
xxxxm3FMWaxxxx |
|
password |
AccessKey Secret AccessKey 管理 から AccessKey Secret を取得できます。 |
xxxxm355fffaxxxx |
|
dbname |
Hologres データベースの名前。 |
Holodb |
|
tablename |
Hologres データベース内のテーブル名。 |
blink_test |
|
arraydelimiter |
Hologres 結果テーブルは、field_delimiter に基づいて STRING フィールドを配列に分割し、Hologres にインポートすることをサポートしています。 デフォルト値は \u0002 です。 |
\u0002 |
|
mutatetype |
データ書き込みモード。詳細については、「Hologres 結果テーブル」をご参照ください。 デフォルト値は insertorignore です。 |
insertorignore |
|
ignoredelete |
リトラクションメッセージを無視するかどうかを指定します。
説明
このパラメーターは、ストリーミングセマンティクスを使用する場合にのみ有効です。 デフォルト値は false です。 通常、Flink Groupby 操作によりリトラクションメッセージが生成されます。これらのメッセージが Hologres コネクタに送信されると、DELETE リクエストが生成されます。 |
false |
|
partitionrouter |
パーティションテーブルにデータを書き込むかどうかを指定します。
デフォルト値は false です。 |
false |
|
createparttable |
パーティションテーブルに書き込む際、パーティション値に基づいて子パーティションテーブルを自動作成するかどうかを指定します。この機能は Blink 排他モード V3.7 以降でサポートされています。 デフォルト値は false です。 重要
この機能は慎重に使用してください。ダーティデータがパーティション値に含まれていると、誤ったパーティションテーブルが作成される可能性があります。 |
false |
arraydelimiter、mutatetype、ignoredelete、partitionrouter、および createparttable パラメーターは、DDL の例示文には含まれていません。アプリケーションでこれらのパラメーターを使用する場合は、上記の表の説明を参照してください。
通常の Hologres 結果テーブルへのリアルタイムデータ書き込み
-
Hologres にテーブルを作成します。
データを受信するためのテーブルを Hologres に作成します。以下はテーブル作成 SQL 文の例です。
create table blink_test (a int, b text, c text, d float8, e bigint); -
Real-time Compute for Flink ジョブを作成します。
-
Real-time Compute for Flink コンソール にログインします。
-
Real-time Compute for Flink ジョブを作成します。
-
Real-time Compute for Flink の排他モード V3.6 以降では、Hologres データソースがサポートされています。直接呼び出すことができます。以下は SQL 文の例です。
create table randomSource (a int, b VARCHAR, c VARCHAR, d DOUBLE, e BIGINT) with (type = 'random'); create table test ( a int, b VARCHAR, c VARCHAR, PRIMARY KEY (a) ) with ( type = 'hologres', `endpoint` = '$ip:$port', --Hologres インスタンスの VPC ネットワークアドレスとポート番号。 `username` = 'Alibaba Cloud アカウントの AccessKey ID', `password` = 'Alibaba Cloud アカウントの AccessKey Secret', `dbname` = 'Hologres データベースの名前', `tablename` = 'blink_test'--Hologres でデータを受信するテーブルの名前。 ); insert into test select a,b,c from randomSource;
-
-
-
ジョブを公開します。
-
ジョブを作成後、エディターで 構文チェック をクリックします。成功しました と表示された場合、構文は正しいです。
-
保存 をクリックしてジョブを保存します。
-
公開 をクリックして、ジョブを本番環境に提出します。必要に応じて、ジョブ公開設定を構成してください。

-
-
ジョブを開始します。
ジョブを本番環境に提出した後、手動で開始する必要があります。
Real-time Compute for Flink 開発者プラットフォーム ページのメニューバー右上隅で、O&M をクリックします。O&M ページで開始したいジョブを選択し、右上隅の 起動 をクリックします。

-
Hologres のデータをリアルタイムでクエリします。
Hologres でデータを受信するテーブルをクエリし、書き込まれたデータをリアルタイムで取得します。以下はクエリ SQL 文の例です。
select * from blink_test;
ワイドテーブルのマージ/部分更新機能の使用方法
複数のストリームから単一の Hologres ワイドテーブルにデータを書き込む(一般的なシナリオ)には、以下の手順に従います。
Hologres に A、B、C、D、E 列を持つ WIDE_TABLE というワイドテーブルがあり、列 A がプライマリキーであると仮定します。1 つの Flink ストリームには列 A、B、C のデータが含まれ、もう 1 つのストリームには列 A、D、E のデータが含まれているとします。
-
Flink SQL を使用して、2 つの Hologres 結果テーブルを宣言します。1 つのテーブルはフィールド A、B、C のみを宣言し、もう 1 つのテーブルはフィールド A、D、E のみを宣言します。両方のテーブルは WIDE_TABLE にマッピングされます。
-
両方の結果テーブルの mutatetype プロパティを insertorupdate に設定します。
-
リトラクションメッセージによる DELETE リクエストの生成を防ぐため、両方の結果テーブルの ignoredelete プロパティを true に設定します。
-
2 つのストリームからのデータをそれぞれ対応する結果テーブルに挿入します。
このシナリオには以下の制限が適用されます。
-
ワイドテーブルにはプライマリキーが必要です。
-
各ストリームからのデータには、プライマリキーの全フィールドが含まれている必要があります。
-
列指向テーブルのワイドテーブルマージシナリオでは、1 秒あたりのレコード数 (RPS) が高くなると CPU 使用率が高くなる可能性があります。テーブル内のフィールドの Dictionary encoding を無効にできます。
パーティション Hologres 結果テーブルへのリアルタイムデータ書き込み
Hologres は、リアルタイムデータ API を呼び出すことで、親パーティションテーブルに直接データを書き込むことをサポートしています。データは自動的に対応する子パーティションテーブルにルーティングされます。リアルタイムデータ API の詳細については、「リアルタイムデータ API」をご参照ください。
以下の制限が適用されます。
-
Hologres ではリストパーティションのみをサポートしています。
-
パーティションテーブルを作成する際は、パーティションキー列を明示的に指定する必要があります。パーティションキー列としてサポートされているのは text 型および int4 型のみです。
-
プライマリキーが設定されている場合、パーティションキー列はプライマリキーの一部である必要があります。
-
子パーティションテーブルを作成する際、子テーブルのパーティションキー列には静的フィールド値を指定する必要があります。
-
子パーティションテーブルに書き込まれるデータのパーティションキー列の値は、子テーブル作成時に定義された値と完全に一致している必要があります。一致しない場合はエラーが報告されます。
-
Hologres はデフォルトパーティションをサポートしていません。
-
Hologres にパーティションテーブルを作成します。
データを受信するためのパーティションテーブルを Hologres に作成し、対応する子パーティションテーブルを作成します。以下はテーブル作成 SQL 文の例です。
--親パーティションテーブル test_message および対応する子パーティションテーブルを作成します。 drop table if exists test_message; begin; create table test_message ( "bizdate" text NOT NULL, "tag" text NOT NULL, "id" int4 NOT NULL, "title" text NOT NULL, "body" text, PRIMARY KEY (bizdate,tag,id) ) PARTITION BY LIST (bizdate); commit;説明-
コマンドを実行する際は、
${bizdate}パラメーターを実際の値に置き換えてください。 -
Real-time Compute for Flink の排他モード バージョン 3.7 が、自動パーティション作成をサポートする最初のバージョンです。バージョン 3.7 より前のものを使用している場合、事前に Hologres で子パーティションテーブルを作成しておく必要があります。そうしないと、データインポートが失敗します。
-
-
Real-time Compute for Flink の排他モードでジョブを作成します。
以下は、Real-time Compute for Flink の排他モードでジョブを作成するための文の例です。
説明以下の例は、Real-time Compute for Flink の排他モード V3.7 以降に適用されます。バージョン 3.7 より前のものを使用している場合は、V3.7 以降にスペックアップするか、自動子パーティションテーブル作成の構成を削除してください:
`createparttable` = 'true'。create table test_message_src( tag VARCHAR, id INTEGER, title VARCHAR, body VARCHAR ) with ( type = 'random', `interval` = '10', `count` = '100' ); create table test_message_sink ( bizdate VARCHAR, tag VARCHAR, id INTEGER, title VARCHAR, body VARCHAR ) with ( type = 'hologres', `endpoint` = '$ip:$port', --Hologres インスタンスの VPC ネットワークアドレス。 `username` ='<AccessID>', --Alibaba Cloud アカウントの AccessKey ID。 `password` = '<AccessKey>', --Alibaba Cloud アカウントの AccessKey Secret。 `dbname` = '<DBname>', --Hologres データベースの名前。 `tablename` = '<Tablename>', --Hologres データベース内のテーブル名。 `partitionrouter` = 'true', --Hologres のパーティションテーブルにデータを書き込みます。 `createparttable` = 'true' --Hologres で子パーティションテーブルを自動作成します。 ); insert into test_message_sink select "20200327",* from test_message_src; insert into test_message_sink select "20200328",* from test_message_src; -
ジョブを公開および開始します。
詳細については、「ジョブを公開」および「ジョブを開始」の手順を「通常の Hologres 結果テーブルへのリアルタイムデータ書き込み」セクションでご参照ください。
-
Hologres のデータをリアルタイムでクエリします。
Hologres でデータを受信するテーブルをクエリし、書き込まれたデータをリアルタイムで取得します。以下はクエリ SQL 文の例です。
select * from test_message; select * from test_message where bizdate = '20200327';
データ型のマッピング
Real-time Compute for Flink の排他モードと Hologres 間のデータ型マッピングの詳細については、「データ型まとめ」をご参照ください。