すべてのプロダクト
Search
ドキュメントセンター

Hologres:Simple Log Service からのデータのインポート

最終更新日:Mar 31, 2025

Simple Log Service から Hologres にデータを書き込むには、複数の方法があります。このトピックでは、Realtime Compute for Apache Flink または DataWorks を使用して、Simple Log Service から Hologres にデータをリアルタイムで同期する方法について説明します。

前提条件

  • Log Service がアクティブ化されており、プロジェクトとログストアが作成されていること。詳細については、はじめにをご参照ください。

  • Hologres インスタンスが購入済みであり、開発ツールに接続されていること。詳細については、Hologres を使用する手順をご参照ください。

  • Realtime Compute for Apache Flink を使用して Simple Log Service から Hologres にデータを書き込む場合は、Realtime Compute for Apache Flink がアクティブ化されており、プロジェクトが作成されていること。詳細については、フルマネージド Flink のアクティブ化名前空間の作成と管理をご参照ください。

  • DataWorks を使用して Log Service から Hologres にデータを書き込む場合は、DataWorks がアクティブ化されており、ワークスペースが作成されていること。詳細については、DataWorks のアクティブ化ワークスペースの作成をご参照ください。

背景情報

Simple Log Service は、ログ、メトリクス、トレースなど、複数のタイプのデータを処理するための、大規模、低コスト、リアルタイムのサービスを提供する、クラウドネイティブの可観測性および分析プラットフォームです。 Simple Log Service を使用すると、データの収集、処理、クエリ、分析、視覚化、消費、配信を行うことができます。 Simple Log Service コンソールでアラートを設定できます。 Simple Log Service は、企業が研究開発、運用管理、データセキュリティの面でデジタル機能を向上させるのに役立ちます。

Hologres は、高性能、高信頼性、費用対効果、高スケーラビリティを特長とするリアルタイムコンピューティングエンジンを提供するように設計されています。 Hologres は、大量のデータを管理するのに役立つリアルタイムデータウェアハウスソリューションと、サブ秒で応答できるインタラクティブクエリサービスを提供します。 Hologres は、リアルタイムデータミッドエンドの構築、きめ細かい分析、セルフサービス分析、マーケティングプロファイルの設定、オーディエンスのグループ化、リアルタイムのリスク管理などのシナリオで広く使用されています。 Simple Log Service から Hologres にデータを迅速に書き込んで、リアルタイム分析とクエリを実行できます。これは、ビジネスのためにデータを活用するのに役立ちます。

Realtime Compute for Apache Flink を使用して Simple Log Service から Hologres にデータを書き込む

  1. Simple Log Service でデータを準備します。

    この例では、Simple Log Service によって提供されるシミュレートデータがソースデータとして使用されます。データは、ゲームプラットフォームのログオンと消費ログをシミュレートすることによって生成されます。ビジネスデータを使用することもできます。

    1. Simple Log Service コンソールにログインします。

    2. [データのインポート] セクションで、[シミュレートデータのインポート] タブをクリックします。

    3. [シミュレートデータのインポート] タブで、シミュレーション[ゲーム操作ログ] セクションの をクリックします。

    4. [ログストアの指定] ステップで、[プロジェクト][ログストア] を選択し、[次へ] をクリックします。

    5. [シミュレートデータのインポート] ステップで、時間範囲と頻度に関連するパラメータを設定し、[インポート] をクリックします。

    6. 次の図は、クエリできるシミュレーションフィールドとデータを示しています。詳細については、ログのクエリと分析をご参照ください。

      content フィールドは JSON データ型です。模拟数据

  2. Hologres にテーブルを作成します。

    Hologres でデータを受信するために使用するテーブルを作成します。クエリの要件に基づいて、テーブルのフィールドにインデックスを作成できます。これは、クエリの効率を向上させるのに役立ちます。インデックスの詳細については、概要をご参照ください。次の DDL ステートメントは、テーブルを作成するために使用されます。

    CREATE TABLE sls_flink_holo (
        content JSONB ,
        operation TEXT,
        uid TEXT,
        topic  TEXT ,
        source TEXT ,
        c__timestamp TIMESTAMPTZ,
        receive_time BIGINT,
        PRIMARY KEY (uid)
      );
      -- テーブルを作成するためのDDLステートメントです。
    
  3. Realtime Compute for Apache Flink を使用して Hologres にデータを書き込みます。

    Realtime Compute for Apache Flink を使用して Simple Log Service から Hologres にデータを書き込むには、次の手順を実行します。

    1. Realtime Compute for Apache Flink を使用して Simple Log Service からデータを読み取ります。詳細については、Simple Log Serviceコネクタをご参照ください。

    2. Realtime Compute for Apache Flink を使用して Hologres にデータを書き込みます。詳細については、Hologres コネクタをご参照ください。

    次の SQL ジョブのサンプルは、Realtime Compute for Apache Flink を使用して Simple Log Service から Hologres にデータを書き込む方法の例を示しています。JSON 形式のフィールドは、Hologres テーブルの JSON データ型として直接書き込まれます。JSON データ型は、Realtime Compute for Apache Flink の VARCHAR 型に対応します。

    説明
    • Realtime Compute for Apache Flink で SQL ジョブを作成および実行する方法の詳細については、SQL ドラフトの開発デプロイメントの開始をご参照ください。

    • Simple Log Service から読み取られたデータに JSON 形式のデータが含まれている場合、Realtime Compute for Apache Flink にデータを書き込む前に、そのようなデータを解析できます。また、Simple Log Service から Hologres に JSON 形式のデータを直接書き込むこともできます。

    CREATE TEMPORARY TABLE sls_input (
        content STRING,
        operation STRING,
        uid STRING,
        `__topic__` STRING METADATA VIRTUAL,
        `__source__` STRING METADATA VIRTUAL,
        `__timestamp__` BIGINT METADATA VIRTUAL,
        `__tag__` MAP<VARCHAR, VARCHAR> METADATA VIRTUAL
      )
    WITH (
        'connector' = 'sls',
        'endpoint' = 'Internal endpoint of the Simple Log Service project',-- Simple Log Serviceプロジェクトにアクセスするために使用される内部エンドポイント。
        'accessid'='AccessKey ID of your Alibaba Cloud account',-- Alibaba CloudアカウントのAccessKey ID。
        'accesskey'='AccessKey secret of your Alibaba Cloud account',-- Alibaba CloudアカウントのAccessKeyシークレット。
        'starttime' = '2024-08-30 00:00:00',-- ログの消費を開始する時刻。
        'project'='Project name',-- Simple Log Serviceプロジェクトの名前。
        'logstore'='Logstore name'-- ログストアの名前。
      );
    
    CREATE TEMPORARY TABLE hologres_sink (
        content VARCHAR,
        operation VARCHAR,
        uid VARCHAR,
        topic  STRING ,
        source STRING ,
        c__timestamp TIMESTAMP ,
        receive_time BIGINT
      )
    WITH (
        'connector' = 'hologres',
        'dbname'='Name of the Hologres database',-- Hologresデータベースの名前。
        'tablename'='Name of the Hologres table',-- データを書き込むHologresテーブルの名前。
        'username'='AccessKey ID of your Alibaba Cloud account',-- Alibaba CloudアカウントのAccessKey ID。
        'password'='AccessKey secret of your Alibaba Cloud account',-- Alibaba CloudアカウントのAccessKeyシークレット。
        'endpoint'='VPC endpoint of your Hologres instance.'-- HologresインスタンスのVPCエンドポイント。
      );
    
    INSERT INTO hologres_sink
    SELECT
       content,
       operation,
       uid,
       `__topic__` ,
       `__source__` ,
        CAST (
        FROM_UNIXTIME (`__timestamp__`) AS TIMESTAMP
      ),
       CAST (__tag__['__receive_time__'] AS BIGINT) AS receive_time
    FROM
      sls_input;
    
  4. Hologres でデータをクエリします。

    Realtime Compute for Apache Flink を使用して Simple Log Service から Hologres に書き込まれたデータをクエリできます。ビジネス要件に基づいてデータを開発することもできます。Query data in Hologres

DataWorks を使用して Hologres にデータを書き込む

  1. Simple Log Service でデータを準備します。

    この例では、Simple Log Service によって提供されるシミュレートデータがソースデータとして使用されます。データは、ゲームプラットフォームのログオンと消費ログをシミュレートすることによって生成されます。ビジネスデータを使用することもできます。

    1. Simple Log Service コンソールにログインします。

    2. [データのインポート] セクションで、[シミュレートデータのインポート] タブをクリックします。

    3. [シミュレートデータのインポート] タブで、シミュレーション[ゲーム操作ログ] セクションの をクリックします。

    4. [ログストアの指定] ステップで、[プロジェクト][ログストア] を選択し、[次へ] をクリックします。

    5. [シミュレートデータのインポート] ステップで、時間範囲と頻度に関連するパラメータを設定し、[インポート] をクリックします。

    6. 次の図は、クエリできるシミュレーションフィールドとデータを示しています。詳細については、ログのクエリと分析をご参照ください。

      content フィールドは JSON データ型です。模拟数据

  2. Hologres にテーブルを作成します。

    Hologres でデータを受信するために使用するテーブルを作成します。クエリの要件に基づいて、テーブルのフィールドにインデックスを作成できます。これは、クエリの効率を向上させるのに役立ちます。インデックスの詳細については、概要をご参照ください。次の DDL ステートメントは、テーブルを作成するために使用されます。

    説明
    • この例では、uid フィールドは、データを一意に識別するために使用されるプライマリキーとして設定されています。ビジネス要件に基づいてプライマリキーを設定できます。

    • uid フィールドは、配布キーとしても設定されています。このように、uid フィールドの値が同じであるデータレコードは、同じシャードに書き込まれます。これは、クエリのパフォーマンスを向上させるのに役立ちます。

    BEGIN;
    CREATE  TABLE sls_dw_holo (
        content JSONB ,
        operation TEXT,
        uid TEXT,
        C_Topic  TEXT ,
        C_Source TEXT ,
        timestamp BIGINT,
        PRIMARY KEY (uid)
      );
      CALL set_table_property('sls_dw_holo', 'distribution_key', 'uid');
      CALL set_table_property('sls_dw_holo', 'event_time_column', 'timestamp');
    COMMIT;
                            -- テーブルを作成するためのDDLステートメントです。
    
  3. データソースを設定します。

    DataWorks の Data Integration サービスを使用して Hologres にデータを書き込む前に、指定された DataWorks ワークスペースに Simple Log Service データソースと Hologres データソースを追加する必要があります。

  4. Simple Log Service から Hologres にデータをリアルタイムで同期します。

    Data Integration でリアルタイム同期ノードを作成して実行します。詳細については、DataStudio でリアルタイム同期タスクを作成するリアルタイム同期ノードの運用と保守をご参照ください。

    この例では、Simple Log Service LogHub がリアルタイム同期ノードのソースデータソースとして設定され、Hologres がデスティネーションデータソースとして設定されています。フィールドマッピングは、ソースからデスティネーションにデータを同期するように設定されています。次の図は、フィールドマッピングの例を示しています。Field mappings

  5. Hologres でデータをクエリします。

    リアルタイム同期ノードが実行された後、DataWorks の Data Integration サービスを使用して Hologres に書き込まれたデータを Hologres でクエリできます。Query data in Hologres