Hologresは、Realtime Compute for Apache Flinkのオリジナル製品ラインのサービスタイプであるBlink排他モードとシームレスに統合されています。 Holo-blinkコネクタを使用してHologres結果テーブルにデータを書き込み、リアルタイムでデータをクエリできます。 このトピックでは、Blink排他モードを使用してHologres結果テーブルにデータを書き込む方法について説明します。
制限事項
Blink排他モードのバージョンによって、サポートされる開発構文が異なります。 Realtime Compute for Apache Flinkを使用する前に、Blink排他モードのバージョンを確認し、そのバージョン用に提供されているサンプルコードを参照する必要があります。
Realtime ComputeサービスとHologresサービスが同じリージョンにあることを確認してください。 そうでない場合、サービスは相互に接続できません。
V3.6より前のバージョンのBlink排他モードでは、組み込みのHolo-blinkコネクタは提供されていません。 Hologres結果テーブルを作成するには、関連するJARファイルを参照する必要があります。 HologresコンソールでHologresインスタンスを手動でアップグレードするか、Hologres DingTalkグループに参加してインスタンスのアップグレードを申請できます。 Hologresインスタンスを手動でアップグレードする方法の詳細については、 インスタンスのアップグレード をご参照ください。DingTalkグループへの参加方法の詳細については、Hologresのオンラインサポートを受ける をご参照ください。
説明Blink排他モードをV3.6以降にアップグレードすることをお勧めします。
Blink V3.7排他モードでは、ジョブのコードに
createparttable='true'
と入力した後でのみ、Hologresパーティションテーブルを自動的に作成できます。 パーティションテーブルを使用する場合は、次の点に注意してください。Hologresテーブルは、リストパーティションのみをサポートしています。
パーティションテーブルを作成する場合は、パーティションキー列を指定する必要があります。 パーティションキー列のデータは、TEXT または INT4 タイプである必要があります。 パーティションキー値には、ハイフン(-)を含めることはできません。 たとえば、パーティションキー値
2020-09-12
は無効です。パーティションテーブルにプライマリキーを設定する場合は、プライマリキーにパーティションキー列を含める必要があります。
子テーブルを作成する場合は、パーティションキー値として固定値を指定する必要があります。
子テーブルにデータを書き込む場合は、ソースデータで指定された列の値が、子テーブルに指定したパーティションキー値の範囲内にあることを確認してください。 そうでない場合、エラーメッセージが返されます。
Hologresテーブルは、default パーティションをサポートしていません。
データの書き込み先のHologresテーブルにプライマリキーを設定した場合、リアルタイムのデータ書き込みのデフォルト構文はプライマリキーに基づいて更新されません。 書き込もうとする新しいデータが既存のデータと同じプライマリキー値を共有している場合、新しいデータは破棄されます。
Hologresはデータを非同期で書き込みます。 そのため、コードに
blink.checkpoint.fail_on_checkpoint_error=true
と入力する必要があります。 これにより、ジョブの例外が発生した場合にのみフェイルオーバーがトリガーされます。 Blink V3.7.6以降の排他モードを使用する場合は、この設定をコードに追加する必要はありません。
DDL構文
この例では、次のDDLステートメントを実行してHologres結果テーブルを作成します。
create table Hologres_sink(
name varchar,
age BIGINT,
birthday BIGINT
) with (
type='hologres',
dbname='<yourDbname>', -- 接続先のHologresデータベースの名前。
tablename='<yourTablename>', -- データを書き込むHologresテーブルの名前。
username='<yourUsername>', -- Alibaba CloudアカウントのAccessKey ID。
password='<yourPassword>', -- Alibaba CloudアカウントのAccessKeyシークレット。
endpoint='<yourEndpoint>'); -- Hologresインスタンスの仮想プライベートクラウド(VPC)エンドポイント。
WITH句のパラメーター
パラメーター | 説明 | 例 |
type | 結果テーブルのタイプ。 パラメーターをhologresに設定します。 | hologres |
endpoint | HologresインスタンスのVPCエンドポイント。 Hologresコンソール にログインし、インスタンスの詳細ページに移動して、[ネットワーク情報] セクションからエンドポイントを確認できます。 エンドポイントにはポート番号を含める必要があり、IPアドレス:ポート番号の形式に従う必要があります。 | demo-cn-hangzhou-vpc.hologres.aliyuncs.com:80 |
username | Alibaba CloudアカウントのAccessKey ID。 セキュリティ管理 ページでAccessKey IDを取得できます。 | xxxxm3FMWaxxxx |
password | Alibaba CloudアカウントのAccessKeyシークレット。 セキュリティ管理 ページでAccessKeyシークレットを取得できます。 | xxxxm355fffaxxxx |
dbname | 接続先のHologresデータベースの名前。 | Holodb |
tablename | データの書き込み先のHologresテーブルの名前。 | blink_test |
arraydelimiter | HologresストリーミングシンクがSTRINGタイプの列のデータを配列の要素に分割するために使用する区切り文字。 その後、Hologresストリーミングシンクは配列をHologresにインポートします。 デフォルト値: \u0002。 | \u0002 |
mutatetype | データを書き込むモード。 詳細については、Hologres コネクタ をご参照ください。 デフォルト値: insertorignore。 | insertorignore |
ignoredelete | 取り消しメッセージを無視するかどうかを指定します。 有効な値:
説明 このパラメーターは、ストリーミングセマンティクスが使用されている場合にのみ有効になります。 デフォルト値: false。 ほとんどの場合、取り消しメッセージはRealtime Compute for Apache FlinkのGroupby操作によって生成されます。 取り消しメッセージがHolo-blinkコネクタに転送されると、削除リクエストが生成されます。 | false |
partitionrouter | パーティションテーブルにデータを書き込むかどうかを指定します。 有効な値:
デフォルト値: false。 | false |
createparttable | partitionrouterパラメーターをtrueに設定した場合、パーティションキー値に基づいてパーティションテーブルを自動的に作成するかどうかを指定します。 この機能は、Blink V3.7以降の排他モードでのみ使用できます。 デフォルト値: false。 重要 このパラメーターを使用する場合は注意してください。 パーティションキー値にダーティデータが含まれていないことを確認してください。 そうでない場合、無効なパーティションテーブルが作成されます。 | false |
サンプルDDLステートメントでは、次のパラメーターは提供されていません: arraydelimiter、mutatetype、ignoredelete、partitionrouter、および createparttable。 これらのパラメーターを使用する必要がある場合は、上記の表の説明に基づいてこれらのパラメーターを設定してください。
標準のHologres結果テーブルにリアルタイムでデータを書き込む
Hologresにテーブルを作成します。
データを書き込むHologresテーブルを作成します。 ステートメント例:
create table blink_test (a int, b text, c text, d float8, e bigint);
リアルタイムコンピューティングジョブを作成します。
Realtime Compute for Apache Flinkコンソール にログインします。
リアルタイムコンピューティングジョブを作成します。
Hologresデータソースは、Blink V3.6以降の排他モードでサポートされています。 リアルタイムコンピューティングジョブを作成するときに、Hologresデータソースを呼び出すことができます。 ステートメント例:
create table randomSource (a int, b VARCHAR, c VARCHAR, d DOUBLE, e BIGINT) with (type = 'random'); create table test ( a int, b VARCHAR, c VARCHAR, PRIMARY KEY (a) ) with ( type = 'hologres', `endpoint` = '$ip:$port', -- HologresインスタンスのVPCエンドポイントとポート番号。 `username` = 'Alibaba CloudアカウントのAccessKey ID', `password` = 'Alibaba CloudアカウントのAccessKeyシークレット', `dbname` = '接続先のHologresデータベースの名前', `tablename` = 'blink_test'-- データを書き込むHologresテーブルの名前。 ); insert into test select a,b,c from randomSource;
ジョブを発行します。
ジョブコードを編集した後、ジョブエディターの上部にある [構文チェック] をクリックします。 [成功] メッセージが表示された場合、コードの構文は正しいです。
[保存] をクリックします。
[発行] をクリックします。 表示されるダイアログボックスで、ジョブを本番環境に発行するためのパラメーターを設定します。
ジョブを開始します。
ジョブが本番環境に発行されたら、ジョブを手動で開始します。
[開発プラットフォーム] ページの上部ナビゲーションバーで、右上隅にある [管理] をクリックします。 [管理] ページで、開始するジョブを選択し、右上隅にある [開始] をクリックします。
Hologresのデータをリアルタイムでクエリします。
手順 1 で作成したテーブルに書き込まれたデータをクエリできます。 ステートメント例:
select * from blink_test;
ワイドテーブルにデータをマージする
このセクションでは、複数のデータストリームから同じHologresワイドテーブルにデータを書き込む方法について説明します。
この例では、WIDE_TABLE という名前のHologresワイドテーブルがあります。 テーブルには、A、B、C、D、E の列があります。 列 A はプライマリキーとして設定されています。 Realtime Compute for Apache Flinkでは、1 つのストリームに列 A、B、C が含まれ、別のストリームに列 A、D、E が含まれています。
Flink SQLを使用して、2 つのHologres結果テーブルを宣言します。 1 つのテーブルには列 A、B、C を宣言します。 もう 1 つのテーブルには列 A、D、E を宣言します。 両方の結果テーブルは、WIDE_TABLE テーブルにマッピングされます。
両方の結果テーブルの mutatetype パラメーターを insertorupdate に設定します。
両方の結果テーブルの ignoredelete パラメーターを true に設定します。 これにより、取り消しメッセージによって削除リクエストが生成されるのを防ぎます。
2 つのストリームのデータをそれぞれの結果テーブルに挿入します。
次の制限事項に注意してください。
ワイドテーブルにはプライマリキーが必要です。
各ストリームのデータには、プライマリキー列のすべてのデータが含まれている必要があります。
ワイドテーブルが列指向テーブルで、1 秒あたり多数のリクエストが開始される場合、CPU使用率が高くなります。 テーブルの列の 辞書エンコーディング を無効にすることをお勧めします。
Hologresのパーティション結果テーブルにリアルタイムでデータを書き込む
HoloHub APIを呼び出して、親テーブルにデータを書き込むことができます。 その後、システムは異なるパーティションのデータを対応する子テーブルにルーティングします。 パーティションテーブルにデータを書き込むこともできます。 HoloHub APIの詳細については、HoloHub APIの概要 をご参照ください。
次の制限事項に注意してください。
Hologresテーブルは、リストパーティションのみをサポートしています。
パーティションテーブルを作成する場合は、パーティションキー列を指定する必要があります。 パーティションキー列のデータは、TEXT または INT4 タイプである必要があります。
パーティションテーブルにプライマリキーを設定する場合は、プライマリキーにパーティションキー列を含める必要があります。
子テーブルを作成する場合は、パーティションキー値として固定値を指定する必要があります。
子テーブルにデータを書き込む場合は、ソースデータで指定された列の値が、子テーブルに指定したパーティションキー値の範囲内にあることを確認してください。 そうでない場合、エラーメッセージが返されます。
Hologresテーブルは、デフォルトパーティションをサポートしていません。
Hologresにパーティションテーブルを作成します。
データを書き込むHologresにパーティションテーブルを作成し、パーティションテーブルに対応する子テーブルを作成します。 ステートメント例:
-- test_messageという名前の親テーブルと、親テーブルに対応する子テーブルを作成します。 drop table if exists test_message; begin; create table test_message ( "bizdate" text NOT NULL, "tag" text NOT NULL, "id" int4 NOT NULL, "title" text NOT NULL, "body" text, PRIMARY KEY (bizdate,tag,id) ) PARTITION BY LIST (bizdate); commit;
説明ステートメントを実行するときは、
${bizdate}
パラメーターに値を割り当てます。Blink V3.7排他モードでのみ、子テーブルを自動的に作成できます。 Blink排他モードのバージョンがV3.7より前の場合は、データをインポートする前に、Hologresで手動で子テーブルを作成する必要があります。 そうでない場合、データのインポートは失敗します。
Blink排他モードを使用する場合にリアルタイムコンピューティングジョブを作成します。
この例では、次のステートメントを実行して、Blink排他モードを使用する場合にリアルタイムコンピューティングジョブを作成します。
説明次のステートメント例は、Blink V3.7以降の排他モードに適用されます。 Blink排他モードのバージョンがV3.7より前の場合は、Blink排他モードをV3.7以降にアップグレードするか、子テーブルを自動的に作成するために使用されるコードから
`createparttable` = 'true'
を削除します。create table test_message_src( tag VARCHAR, id INTEGER, title VARCHAR, body VARCHAR ) with ( type = 'random', `interval` = '10', `count` = '100' ); create table test_message_sink ( bizdate VARCHAR, tag VARCHAR, id INTEGER, title VARCHAR, body VARCHAR ) with ( type = 'hologres', `endpoint` = '$ip:$port', -- HologresインスタンスのVPCエンドポイント。 `username` ='<AccessID>', -- Alibaba CloudアカウントのAccessKey ID。 `password` = '<AccessKey>', -- Alibaba CloudアカウントのAccessKeyシークレット。 `dbname` = '<DBname>', -- 接続先のHologresデータベースの名前。 `tablename` = '<Tablename>', -- データを書き込むHologresテーブルの名前。 `partitionrouter` = 'true', -- Hologresのパーティションテーブルにデータを書き込みます。 `createparttable` = 'true', -- Hologresに子テーブルを自動的に作成します。 ); insert into test_message_sink select "20200327",* from test_message_src; insert into test_message_sink select "20200328",* from test_message_src;
ジョブを発行して開始します。
詳細については、「ジョブを発行する」および「ジョブを開始する」の手順について、「標準のHologres結果テーブルにリアルタイムでデータを書き込む」セクションをご参照ください。
Hologresのデータをリアルタイムでクエリします。
手順 1 で作成した親テーブルと子テーブルに書き込まれたデータをクエリできます。 ステートメント例:
select * from test_message; select * from test_message where bizdate = '20200327';
データ型のマッピング
Blink排他モードとHologresのデータ型間のマッピングの詳細については、データ型 をご参照ください。