すべてのプロダクト
Search
ドキュメントセンター

Hologres:Simple Log Service データのリアルタイム同期

最終更新日:Feb 04, 2026

このトピックでは、Flink と DataWorks の Data Integration を例として、Simple Log Service (SLS) から Hologres にリアルタイムでデータを書き込む方法について説明します。

前提条件

背景情報

SLS は、ログ、メトリック、トレースなどのデータを処理するための、大規模、低コスト、かつリアルタイムのサービスを提供するクラウドネイティブの可観測性プラットフォームです。 データ収集、処理、クエリと分析、可視化、アラート、消費、配信のワンストップ機能を提供し、研究開発 (R&D)、O&M、運用、セキュリティなどのシナリオにおけるデジタル能力を強化します。

Hologres は、パフォーマンス専有型で信頼性が高く、低コストでスケーラブルなリアルタイムコンピューティングエンジンであり、リアルタイムデータウェアハウスソリューションと、大規模データセットに対するサブ秒レベルの対話型検索サービスを提供します。 リアルタイムデータ中台の構築、詳細な分析、セルフサービス分析、マーケティングプロファイル、オーディエンスセグメンテーション、リアルタイムリスク管理などのシナリオで広く使用されています。 SLS データを Hologres に迅速に書き込み、リアルタイムの分析とクエリを実行することで、ビジネスデータを探索する能力を向上させることができます。

Flink を使用した SLS から Hologres へのデータ書き込み

  1. SLS データの準備

    この例の SLS データは、SLS プラットフォーム上のシミュレーションデータから取得したもので、ゲームのログインと消費ログをシミュレートします。 ビジネスデータがある場合は、直接それを使用してください。

    1. Simple Log Service コンソールにログインします。

    2. [データ取り込み] セクションで、[データのシミュレーション] をクリックします。

    3. [データのシミュレーション] タブで、[ゲーム操作ログ] の下にある [シミュレーション] をクリックします。

    4. [ログスペースの選択] ページで、[プロジェクト][Logstore] を選択し、Next をクリックします。

    5. [データのシミュレーション] ページで、範囲と周波数を設定し、[インポート開始] をクリックします。

    6. シミュレーションによって生成されたフィールドとデータを以下に示します。 詳細については、「クエリと分析のクイックスタート」をご参照ください。

      content 列は JSON 型です。模拟数据

  2. Hologres テーブルの作成

    Hologres にデータを受信するためのテーブルを作成します。 クエリのニーズに応じて特定のフィールドにインデックスを作成し、クエリ効率を向上させることができます。 インデックスの詳細については、「テーブルの作成」をご参照ください。 この例のデータ定義言語 (DDL) 文は次のとおりです。

    CREATE TABLE sls_flink_holo (
        content JSONB ,
        operation TEXT,
        uid TEXT,
        topic  TEXT ,
        source TEXT ,
        c__timestamp TIMESTAMPTZ,
        receive_time BIGINT,
        PRIMARY KEY (uid)
      );
  3. Flink を使用したデータ書き込み

    Flink を使用して SLS データを Hologres に書き込むには、次のドキュメントをご参照ください。

    1. Flink での SLS データの読み取り:SLS ソーステーブル

    2. Flink での Hologres へのデータ書き込み:Hologres 結果テーブル

    次の SQL ジョブは、Flink を使用して SLS データを Hologres に書き込む例です。 JSON フィールドは、Hologres の JSON フィールドに直接書き込まれます。 Flink はネイティブの JSON 型をサポートしていないため、代替として VARCHAR 型が使用されます。

    説明
    • Flink で SQL ジョブを開発および実行する詳細な手順については、「ジョブ開発マップ」および「ジョブの開始」をご参照ください。

    • SLS には JSON データが含まれています。 ニーズに応じて、書き込む前に Flink で JSON データを解析するか、JSON データを直接 Hologres に書き込むことができます。

    CREATE TEMPORARY TABLE sls_input (
        content STRING,
        operation STRING,
        uid STRING,
        `__topic__` STRING METADATA VIRTUAL,
        `__source__` STRING METADATA VIRTUAL,
        `__timestamp__` BIGINT METADATA VIRTUAL,
        `__tag__` MAP<VARCHAR, VARCHAR> METADATA VIRTUAL
      )
    WITH (
        'connector' = 'sls',
        'endpoint' = 'sls_private_endpoint',--SLS のプライベートエンドポイント。
        'accessid' = 'your_access_id',--ご利用の AccessKey ID。
        'accesskey' = 'your_access_key',--ご利用の AccessKey Secret。
        'starttime' = '2024-08-30 00:00:00',--ログを消費する開始時刻。
        'project' = 'your_project_name',--SLS プロジェクトの名前。
        'logstore' = 'your_logstore_name'--Logstore の名前。
      );
    
    CREATE TEMPORARY TABLE hologres_sink (
        content VARCHAR,
        operation VARCHAR,
        uid VARCHAR,
        topic  STRING ,
        source STRING ,
        c__timestamp TIMESTAMP ,
        receive_time BIGINT
      )
    WITH (
        'connector' = 'hologres',
        'dbname' = 'your_holo_db_name', --Hologres データベースの名前。
        'tablename' = 'your_holo_table_name', --データを受信する Hologres テーブルの名前。
        'username' = 'your_access_id', --ご利用の Alibaba Cloud アカウントの AccessKey ID。
        'password' = 'your_access_key', --ご利用の Alibaba Cloud アカウントの AccessKey Secret。
        'endpoint' = 'your_holo_vpc_endpoint' --Hologres インスタンスの VPC エンドポイント。
      );
    
    INSERT INTO hologres_sink
    SELECT
       content,
       operation,
       uid,
       `__topic__` ,
       `__source__` ,
        CAST (
        FROM_UNIXTIME (`__timestamp__`) AS TIMESTAMP
      ),
       CAST (__tag__['__receive_time__'] AS BIGINT) AS receive_time
    FROM
      sls_input;
  4. データのクエリ

    Flink を使用して Hologres に書き込まれた SLS データをクエリします。 その後、必要に応じてデータ開発を実行できます。查询数据

DataWorks の Data Integration を使用した SLS から Hologres へのデータ書き込み

  1. SLS データの準備

    この例の SLS データは、SLS プラットフォーム上のシミュレーションデータから取得したもので、ゲームのログインと消費ログをシミュレートします。 ビジネスデータがある場合は、直接それを使用してください。

    1. Simple Log Service コンソールにログインします。

    2. [データ取り込み] セクションで、[データのシミュレーション] をクリックします。

    3. [データのシミュレーション] タブで、[ゲーム操作ログ] の下にある [シミュレーション] をクリックします。

    4. [ログスペースの選択] ページで、[プロジェクト][Logstore] を選択し、Next をクリックします。

    5. [データのシミュレーション] ページで、範囲と周波数を設定し、[インポート開始] をクリックします。

    6. シミュレーションによって生成されたフィールドとデータを以下に示します。 詳細については、「クエリと分析のクイックスタート」をご参照ください。

      content 列は JSON 型です。模拟数据

  2. Hologres テーブルの作成

    Hologres にデータを受信するためのテーブルを作成します。 クエリのニーズに応じて特定のフィールドにインデックスを作成し、クエリ効率を向上させることができます。 インデックスの詳細については、「テーブルの作成」をご参照ください。 この例の DDL 文は次のとおりです。

    説明
    • この例では、データの整合性を確保するために uid がプライマリキーとして設定されています。 この設定は必要に応じて変更できます。

    • uid を分散キーとして設定します。 これにより、同じ uid を持つデータが同じシャードに書き込まれ、クエリパフォーマンスが向上します。

    BEGIN;
    CREATE  TABLE sls_dw_holo (
        content JSONB ,
        operation TEXT,
        uid TEXT,
        C_Topic  TEXT ,
        C_Source TEXT ,
        timestamp BIGINT,
        PRIMARY KEY (uid)
      );
      CALL set_table_property('sls_dw_holo', 'distribution_key', 'uid');
      CALL set_table_property('sls_dw_holo', 'event_time_column', 'timestamp');
    COMMIT;
                            
  3. データソースの設定

    データを同期する前に、Data Integration のデータソースを DataWorks ワークスペースに追加します。

  4. リアルタイムでのデータ同期

    Data Integration でリアルタイム同期タスクを作成して実行します。 詳細については、「単一テーブル内の増分データに対するリアルタイム同期タスクの設定」および「リアルタイム同期タスクの O&M」をご参照ください。

    この例で作成されたリアルタイム同期タスクでは、入力は LogHub データソースに設定され、出力は Hologres データソースに設定されています。 同期のためのフィールドマッピングは、次の図に示すように設定されています。字段映射

  5. データのクエリ

    リアルタイム同期タスクが開始された後、DataWorks の Data Integration を使用して Hologres に書き込まれた SLS データをクエリできます。查询数据