このトピックでは、Hologres を Realtime Compute for Apache Flink に接続して、リアルタイムデータウェアハウス分析ダッシュボードをクイックに構築するためのベストプラクティスについて説明します。
前提条件
-
Hologres をアクティブ化し、開発ツールを接続します。 詳細については、HoloWeb に接続してクエリを実行をご参照ください。
-
Realtime Compute for Apache Flink をアクティブ化します。 詳細については、Realtime Compute for Apache Flink のアクティブ化をご参照ください。
説明Realtime Compute for Apache Flink と Hologres が同じリージョンにあり、同じ vSwitch と VPC を使用していることを確認してください。
-
DataV をアクティブ化します。 詳細については、DataV Service のアクティブ化をご参照ください。
背景情報
Hologres は、Alibaba Cloud のリアルタイム対話型分析プロダクトです。 組み込みのリアルタイムデータ API を介して Realtime Compute for Apache Flink に接続しますは、高同時実行性のリアルタイムデータ書き込みと秒単位のクエリをサポートします。
Hologres は PostgreSQL と互換性があります。 ビジネスインテリジェンス (BI) 分析ツールに直接接続して、クエリされたデータを迅速に分析および可視化できます。
このトピックでは、e コマースストアのリアルタイム運用ダッシュボードを構築する例を使用します。 このダッシュボードには、ストアの総トラフィック、ストアごとの訪問数、地域別の販売量、ベストセラー商品が表示されます。
次のイベントチェーン図は、Hologres を使用してリアルタイム運用ダッシュボードを構築する全プロセスを示しています。
-
ソースデータを収集し、リアルタイムで Realtime Compute for Apache Flink に書き込みます。 Realtime Compute for Apache Flink を使用して、データをクレンジングおよび集計します。
-
処理されたデータをリアルタイムで Hologres に書き込みます。 Hologres を使用して対話型検索を行います。
-
Hologres を DataV データダッシュボードに接続して、リアルタイム運用ダッシュボードを表示します。
操作手順
-
ソースデータを取得します。
DataHub または他のビジネスログからソースデータを取得します。
便宜上、このチュートリアルでは Realtime Compute for Apache Flink から直接データを生成します。 詳細については、手順 3 をご参照ください。
-
Hologres でデータ受信テーブルを作成します。
HoloWeb を使用して、ソーステーブルと同じフィールドとデータ型を持つテーブルを作成します。 このテーブルはリアルタイムデータを受け入れます。 具体的な手順については、HoloWeb に接続してクエリを実行をご参照ください。 次の例は SQL 文を示しています。
BEGIN; CREATE TABLE public.order_details ( "user_id" int8, "user_name" text, "item_id" int8, "item_name" text, "price" numeric(38,2), "province" text, "city" text, "ip" text, "longitude" text, "latitude" text, "sale_timestamp" timestamptz NOT NULL ); CALL SET_TABLE_PROPERTY('public.order_details','orientation', 'column'); CALL SET_TABLE_PROPERTY('public.order_details','clustering_key', 'sale_timestamp:asc'); CALL SET_TABLE_PROPERTY('public.order_details','segment_key', 'sale_timestamp'); CALL SET_TABLE_PROPERTY('public.order_details','bitmap_columns', 'user_name,item_name,province,city,ip,longitude,latitude'); CALL SET_TABLE_PROPERTY('public.order_details','dictionary_encoding_columns','user_name:auto,item_name:auto,province:auto,city:auto,ip:auto,longitude:auto,latitude:auto'); CALL SET_TABLE_PROPERTY('public.order_details','time_to_live_in_seconds', '3153600000'); CALL SET_TABLE_PROPERTY('public.order_details','distribution_key', 'user_id'); CALL SET_TABLE_PROPERTY('public.order_details','storage_format', 'orc'); COMMIT; -
Realtime Compute for Apache Flink コンソールで、カスタムコネクタの JAR リソース ordergen をアップロードします。 具体的な手順については、カスタムコネクタのアップロードと使用をご参照ください。
-
リアルタイムでデータをクレンジングします。
Realtime Compute for Apache Flink コンソールに移動します。 Realtime Compute for Apache Flink を使用してソースデータをクレンジングおよび集計します。 次に、リアルタイムデータ API を使用して、データをリアルタイムで Hologres に書き込みます。 具体的な手順については、ジョブ開発マップをご参照ください。 次の例は SQL 文を示しています。
CREATE TEMPORARY TABLE source_table ( user_id BIGINT, user_name VARCHAR, item_id BIGINT, item_name VARCHAR, price numeric (38, 2), province VARCHAR, city VARCHAR, longitude VARCHAR, latitude VARCHAR, ip VARCHAR, sale_timestamp TIMESTAMP ) WITH ('connector' = 'ordergen'); CREATE TEMPORARY TABLE hologres_sink ( user_id BIGINT, user_name VARCHAR, item_id BIGINT, item_name VARCHAR, price numeric (38, 2), province VARCHAR, city VARCHAR, longitude VARCHAR, latitude VARCHAR, ip VARCHAR, sale_timestamp TIMESTAMP ) WITH ( 'connector' = 'hologres', 'dbname' = '<holo_db>', 'tablename' = '<receive_table>', 'username' = '<uid>', 'password' = '<pid>', 'endpoint' = '<host>' ); INSERT INTO hologres_sink SELECT user_id, user_name, item_id, item_name, price, province, city, longitude, latitude, ip, sale_timestamp FROM source_table;次の表にパラメーターを説明します。
パラメーター名
説明
holo_db
Hologres データベースの名前。
receive_table
データを受け入れる Hologres 内のテーブルの名前。 このトピックでは、テーブルは
public.order_detailsです。uid
ご利用の Alibaba Cloud アカウントの AccessKey ID。
pid
ご利用の Alibaba Cloud アカウントの AccessKey Secret。
host
指定された VPC ネットワーク内の Hologres インスタンスのドメイン名。 Hologres コンソールのインスタンス詳細ページに移動し、Network Information セクションからドメイン名を取得します。
-
Realtime Compute for Apache Flink コンソールの [ジョブ O&M] ページに移動し、ジョブが実行中になるまで開始します。 詳細については、ジョブの開始をご参照ください。

-
Hologres でリアルタイムにデータをクエリします。
Hologres で、必要に応じてさまざまなディメンションからリアルタイムデータをクエリします。 次の例は、いくつかのクエリを示しています。
SELECT SUM(price) AS "GMV" FROM order_details ; SELECT COUNT(DISTINCT user_id) AS "UV" FROM order_details ; SELECT city AS "city", COUNT(DISTINCT user_id) AS "user_count" FROM order_details GROUP BY "city" ORDER BY "user_count" DESC limit 100; SELECT item_name AS "product", SUM(price) AS "sales_amount" FROM order_details GROUP BY "product" ORDER BY "sales_amount" DESC limit 100; SELECT to_char(sale_timestamp, 'MM-DD') AS "date", SUM(price) AS "GMV" FROM order_details GROUP BY "date" ORDER BY "GMV" DESC limit 100; -
リアルタイムの DataV ダッシュボードを表示します。
Hologres からクエリされたデータを DataV に直接接続して、リアルタイムダッシュボードを作成します。 手順は次のとおりです。
-
データソースを追加します。
-
DataV コンソールに移動します。 左側のナビゲーションウィンドウで、 を選択します。 [データソース] ページで、[データソースを追加] をクリックします。
-
[データソースを追加] パネルで、Hologres データソースのパラメーターを設定します。
-
OK をクリックします。
-
-
リアルタイムダッシュボードを作成します。
ダッシュボードに表示したい内容に基づいて、必要なウィジェットを選択し、データソースを設定します。 詳細については、概要をご参照ください。
このチュートリアルでは、基本的な棒グラフ、カルーセル、基本的な平面マップ、ティッカーボードを使用します。 次の例は、ティッカーボードの設定方法を示しています。
-
データソースを設定します。
-
ティッカーボードの罫線、フォント、色を設定します。
-
-
リアルタイムダッシュボードを表示します。
ダッシュボードのプラグインパラメーターとデータソースを設定した後、必要に応じて装飾要素を追加してプラグインを強化できます。
-
左側には、リアルタイムのプロダクト訪問数と都市別の売上が表示されます。
-
中央のマップには、各トランザクションの場所、総売上、総訪問数がリアルタイムで表示されます。
-
右側には、プロダクトの売上比率と売上ランキングがリアルタイムで表示されます。
-
-