このトピックでは、Flink と DataWorks の Data Integration を例として、Simple Log Service (SLS) から Hologres にリアルタイムでデータを書き込む方法について説明します。
前提条件
-
Simple Log Service (SLS) を有効化し、プロジェクトと Logstore を作成します。 詳細については、「Logtail を使用した ECS インスタンスからのテキストログの収集と分析」をご参照ください。
-
Hologres を有効化し、開発ツールに接続します。 詳細については、「Hologres の利用プロセス」をご参照ください。
-
Flink を使用して SLS データを Hologres に書き込む場合は、Realtime Compute for Apache Flink を有効化し、プロジェクトを作成します。 詳細については、「フルマネージド Flink の有効化」および「プロジェクトの作成と管理」をご参照ください。
-
DataWorks の Data Integration を使用して SLS データを Hologres に書き込む場合は、DataWorks を有効化し、ワークスペースを作成します。 詳細については、「DataWorks の有効化」および「ワークスペースの作成」をご参照ください。
背景情報
SLS は、ログ、メトリック、トレースなどのデータを処理するための、大規模、低コスト、かつリアルタイムのサービスを提供するクラウドネイティブの可観測性プラットフォームです。 データ収集、処理、クエリと分析、可視化、アラート、消費、配信のワンストップ機能を提供し、研究開発 (R&D)、O&M、運用、セキュリティなどのシナリオにおけるデジタル能力を強化します。
Hologres は、パフォーマンス専有型で信頼性が高く、低コストでスケーラブルなリアルタイムコンピューティングエンジンであり、リアルタイムデータウェアハウスソリューションと、大規模データセットに対するサブ秒レベルの対話型検索サービスを提供します。 リアルタイムデータ中台の構築、詳細な分析、セルフサービス分析、マーケティングプロファイル、オーディエンスセグメンテーション、リアルタイムリスク管理などのシナリオで広く使用されています。 SLS データを Hologres に迅速に書き込み、リアルタイムの分析とクエリを実行することで、ビジネスデータを探索する能力を向上させることができます。
Flink を使用した SLS から Hologres へのデータ書き込み
-
SLS データの準備
この例の SLS データは、SLS プラットフォーム上のシミュレーションデータから取得したもので、ゲームのログインと消費ログをシミュレートします。 ビジネスデータがある場合は、直接それを使用してください。
-
Simple Log Service コンソールにログインします。
-
[データ取り込み] セクションで、[データのシミュレーション] をクリックします。
-
[データのシミュレーション] タブで、[ゲーム操作ログ] の下にある [シミュレーション] をクリックします。
-
[ログスペースの選択] ページで、[プロジェクト] と [Logstore] を選択し、Next をクリックします。
-
[データのシミュレーション] ページで、範囲と周波数を設定し、[インポート開始] をクリックします。
-
シミュレーションによって生成されたフィールドとデータを以下に示します。 詳細については、「クエリと分析のクイックスタート」をご参照ください。
content列は JSON 型です。
-
-
Hologres テーブルの作成
Hologres にデータを受信するためのテーブルを作成します。 クエリのニーズに応じて特定のフィールドにインデックスを作成し、クエリ効率を向上させることができます。 インデックスの詳細については、「テーブルの作成」をご参照ください。 この例のデータ定義言語 (DDL) 文は次のとおりです。
CREATE TABLE sls_flink_holo ( content JSONB , operation TEXT, uid TEXT, topic TEXT , source TEXT , c__timestamp TIMESTAMPTZ, receive_time BIGINT, PRIMARY KEY (uid) ); -
Flink を使用したデータ書き込み
Flink を使用して SLS データを Hologres に書き込むには、次のドキュメントをご参照ください。
-
Flink での SLS データの読み取り:SLS ソーステーブル。
-
Flink での Hologres へのデータ書き込み:Hologres 結果テーブル。
次の SQL ジョブは、Flink を使用して SLS データを Hologres に書き込む例です。 JSON フィールドは、Hologres の JSON フィールドに直接書き込まれます。 Flink はネイティブの JSON 型をサポートしていないため、代替として VARCHAR 型が使用されます。
CREATE TEMPORARY TABLE sls_input ( content STRING, operation STRING, uid STRING, `__topic__` STRING METADATA VIRTUAL, `__source__` STRING METADATA VIRTUAL, `__timestamp__` BIGINT METADATA VIRTUAL, `__tag__` MAP<VARCHAR, VARCHAR> METADATA VIRTUAL ) WITH ( 'connector' = 'sls', 'endpoint' = 'sls_private_endpoint',--SLS のプライベートエンドポイント。 'accessid' = 'your_access_id',--ご利用の AccessKey ID。 'accesskey' = 'your_access_key',--ご利用の AccessKey Secret。 'starttime' = '2024-08-30 00:00:00',--ログを消費する開始時刻。 'project' = 'your_project_name',--SLS プロジェクトの名前。 'logstore' = 'your_logstore_name'--Logstore の名前。 ); CREATE TEMPORARY TABLE hologres_sink ( content VARCHAR, operation VARCHAR, uid VARCHAR, topic STRING , source STRING , c__timestamp TIMESTAMP , receive_time BIGINT ) WITH ( 'connector' = 'hologres', 'dbname' = 'your_holo_db_name', --Hologres データベースの名前。 'tablename' = 'your_holo_table_name', --データを受信する Hologres テーブルの名前。 'username' = 'your_access_id', --ご利用の Alibaba Cloud アカウントの AccessKey ID。 'password' = 'your_access_key', --ご利用の Alibaba Cloud アカウントの AccessKey Secret。 'endpoint' = 'your_holo_vpc_endpoint' --Hologres インスタンスの VPC エンドポイント。 ); INSERT INTO hologres_sink SELECT content, operation, uid, `__topic__` , `__source__` , CAST ( FROM_UNIXTIME (`__timestamp__`) AS TIMESTAMP ), CAST (__tag__['__receive_time__'] AS BIGINT) AS receive_time FROM sls_input; -
-
データのクエリ
Flink を使用して Hologres に書き込まれた SLS データをクエリします。 その後、必要に応じてデータ開発を実行できます。

DataWorks の Data Integration を使用した SLS から Hologres へのデータ書き込み
-
SLS データの準備
この例の SLS データは、SLS プラットフォーム上のシミュレーションデータから取得したもので、ゲームのログインと消費ログをシミュレートします。 ビジネスデータがある場合は、直接それを使用してください。
-
Simple Log Service コンソールにログインします。
-
[データ取り込み] セクションで、[データのシミュレーション] をクリックします。
-
[データのシミュレーション] タブで、[ゲーム操作ログ] の下にある [シミュレーション] をクリックします。
-
[ログスペースの選択] ページで、[プロジェクト] と [Logstore] を選択し、Next をクリックします。
-
[データのシミュレーション] ページで、範囲と周波数を設定し、[インポート開始] をクリックします。
-
シミュレーションによって生成されたフィールドとデータを以下に示します。 詳細については、「クエリと分析のクイックスタート」をご参照ください。
content列は JSON 型です。
-
-
Hologres テーブルの作成
Hologres にデータを受信するためのテーブルを作成します。 クエリのニーズに応じて特定のフィールドにインデックスを作成し、クエリ効率を向上させることができます。 インデックスの詳細については、「テーブルの作成」をご参照ください。 この例の DDL 文は次のとおりです。
説明-
この例では、データの整合性を確保するために
uidがプライマリキーとして設定されています。 この設定は必要に応じて変更できます。 -
uidを分散キーとして設定します。 これにより、同じuidを持つデータが同じシャードに書き込まれ、クエリパフォーマンスが向上します。
BEGIN; CREATE TABLE sls_dw_holo ( content JSONB , operation TEXT, uid TEXT, C_Topic TEXT , C_Source TEXT , timestamp BIGINT, PRIMARY KEY (uid) ); CALL set_table_property('sls_dw_holo', 'distribution_key', 'uid'); CALL set_table_property('sls_dw_holo', 'event_time_column', 'timestamp'); COMMIT; -
-
データソースの設定
データを同期する前に、Data Integration のデータソースを DataWorks ワークスペースに追加します。
-
SLS データソースには、LogHub データソースを使用します。 詳細については、「LogHub (SLS) データソースの設定」をご参照ください。
-
Hologres データソースを設定します。 詳細については、「Hologres データソースの設定」をご参照ください。
-
-
リアルタイムでのデータ同期
Data Integration でリアルタイム同期タスクを作成して実行します。 詳細については、「単一テーブル内の増分データに対するリアルタイム同期タスクの設定」および「リアルタイム同期タスクの O&M」をご参照ください。
この例で作成されたリアルタイム同期タスクでは、入力は LogHub データソースに設定され、出力は Hologres データソースに設定されています。 同期のためのフィールドマッピングは、次の図に示すように設定されています。

-
データのクエリ
リアルタイム同期タスクが開始された後、DataWorks の Data Integration を使用して Hologres に書き込まれた SLS データをクエリできます。
