すべてのプロダクト
Search
ドキュメントセンター

Hologres:Hologres 結果テーブル

最終更新日:Feb 05, 2026

Hologres は Real-time Compute for Flink の排他モードと深く統合されており、コネクタを使用して Hologres 結果テーブルにデータを書き込むことができます。書き込まれたデータは即座にクエリ可能です。本トピックでは、Real-time Compute for Flink の排他モードから Hologres 結果テーブルにデータを書き込む方法について説明します。

制限事項

  • Real-time Compute for Flink の排他モードのバージョンによって、開発者向けのセマンティクスが異なります。作業を開始する前に、ご利用のバージョンを確認し、該当バージョンの例を参照してください。

  • 接続エラーを防ぐため、Real-time Compute for Flink サービスと Hologres インスタンスが同一リージョンにあることを確認してください。

  • Real-time Compute for Flink の排他モードでバージョン 3.6 より前のものには、組み込みの Hologres コネクタが含まれていません。Hologres にリアルタイムでデータを書き込むには、JAR ファイルを参照する必要があります。トラブルシューティングについては、「一般的なスペックアップ準備失敗エラー」をご参照ください。フィードバックについては、Hologres DingTalk グループにご参加いただけます。グループへの参加方法の詳細については、「オンラインサポートをさらに利用する方法」をご参照ください。

    説明

    ジョブを実行するには、バージョン 3.6 以降へのスペックアップを推奨します。

  • Real-time Compute for Flink の排他モード バージョン 3.7 では、Hologres パーティションテーブルの自動作成がサポートされています。ただし、ジョブ内で createparttable='true' を構成する必要があります。パーティションテーブルを使用する場合、以下の点にご注意ください。

    • Hologres では現在、リストパーティションのみをサポートしています。

    • パーティションテーブルを作成する際は、パーティションキー列を明示的に指定する必要があります。現在、パーティションキー列としてサポートされているのは text 型および int4 型のみです。パーティション値にはハイフン (-) を含めることはできません(例:2020-09-12)。

    • パーティションテーブルにプライマリキーが設定されている場合、パーティションキー列はプライマリキーの一部である必要があります。

    • 子パーティションテーブルを作成する際、子テーブルのパーティションキー列には静的フィールド値を指定する必要があります。

    • 子パーティションテーブルに書き込まれるデータのパーティションキー列の値は、子テーブル作成時に定義された値と完全に一致している必要があります。一致しない場合はエラーが報告されます。

    • DEFAULT パーティション機能はサポートされていません。

  • 宛先の Hologres テーブルにプライマリキーが設定されている場合、リアルタイム書き込みのデフォルトセマンティクスでは、プライマリキーに基づくデータ更新は行われません。重複するプライマリキーを持つデータは破棄されます。

  • Hologres は非同期でデータを書き込みます。ジョブに blink.checkpoint.fail_on_checkpoint_error=true 構成を追加する必要があります。これにより、ジョブが失敗した際にフェールオーバーがトリガーされます。このパラメーターは、Real-time Compute for Flink の排他モード バージョン 3.7.6 以降では不要です。

DDL セマンティクス

次の文を使用して、Hologres 結果テーブルを作成します。

create table Hologres_sink(
  name varchar,
  age BIGINT,
  birthday BIGINT
) with (
  type='hologres',
  dbname='<yourDbname>', --Hologres データベースの名前。
  tablename='<yourTablename>', --Hologres でデータを受信するテーブルの名前。
  username='<yourUsername>', --Alibaba Cloud アカウントの AccessKey ID。
  password='<yourPassword>', --Alibaba Cloud アカウントの AccessKey Secret。
  endpoint='<yourEndpoint>'); --Hologres インスタンスの VPC ネットワークのエンドポイント。

WITH パラメーター

パラメーター

説明

type

結果テーブルのタイプ。値は hologres に固定されています。

hologres

endpoint

Hologres インスタンスの VPC ネットワークアドレス。

Hologres 管理コンソール にアクセスし、宛先インスタンスの製品ページで、Network Information セクションからエンドポイントを取得してください。エンドポイントには ip:port 形式でポート番号を含める必要があります。

demo-cn-hangzhou-vpc.hologres.aliyuncs.com:80

username

AccessKey ID

AccessKey 管理 から AccessKey ID を取得できます。

xxxxm3FMWaxxxx

password

AccessKey Secret

AccessKey 管理 から AccessKey Secret を取得できます。

xxxxm355fffaxxxx

dbname

Hologres データベースの名前。

Holodb

tablename

Hologres データベース内のテーブル名。

blink_test

arraydelimiter

Hologres 結果テーブルは、field_delimiter に基づいて STRING フィールドを配列に分割し、Hologres にインポートすることをサポートしています。

デフォルト値は \u0002 です。

\u0002

mutatetype

データ書き込みモード。詳細については、「Hologres 結果テーブル」をご参照ください。

デフォルト値は insertorignore です。

insertorignore

ignoredelete

リトラクションメッセージを無視するかどうかを指定します。

  • true:リトラクションメッセージを無視します。

  • false:リトラクションメッセージを無視しません。

説明

このパラメーターは、ストリーミングセマンティクスを使用する場合にのみ有効です。

デフォルト値は false です。

通常、Flink Groupby 操作によりリトラクションメッセージが生成されます。これらのメッセージが Hologres コネクタに送信されると、DELETE リクエストが生成されます。

false

partitionrouter

パーティションテーブルにデータを書き込むかどうかを指定します。

  • true:パーティションテーブルにデータを書き込みます。

  • false:パーティションテーブルにデータを書き込みません。

デフォルト値は false です。

false

createparttable

パーティションテーブルに書き込む際、パーティション値に基づいて子パーティションテーブルを自動作成するかどうかを指定します。この機能は Blink 排他モード V3.7 以降でサポートされています。

デフォルト値は false です。

重要

この機能は慎重に使用してください。ダーティデータがパーティション値に含まれていると、誤ったパーティションテーブルが作成される可能性があります。

false

説明

arraydelimitermutatetypeignoredeletepartitionrouter、および createparttable パラメーターは、DDL の例示文には含まれていません。アプリケーションでこれらのパラメーターを使用する場合は、上記の表の説明を参照してください。

通常の Hologres 結果テーブルへのリアルタイムデータ書き込み

  1. Hologres にテーブルを作成します。

    データを受信するためのテーブルを Hologres に作成します。以下はテーブル作成 SQL 文の例です。

     create table blink_test (a int, b text, c text, d float8, e bigint);
  2. Real-time Compute for Flink ジョブを作成します。

    1. Real-time Compute for Flink コンソール にログインします。

    2. Real-time Compute for Flink ジョブを作成します。

      • Real-time Compute for Flink の排他モード V3.6 以降では、Hologres データソースがサポートされています。直接呼び出すことができます。以下は SQL 文の例です。

        create table randomSource (a int, b VARCHAR, c VARCHAR, d DOUBLE, e BIGINT) with (type = 'random');
        
        create table test (
          a int,
          b VARCHAR,
          c VARCHAR,
          PRIMARY KEY (a)
        ) with (
          type = 'hologres',
          `endpoint` = '$ip:$port', --Hologres インスタンスの VPC ネットワークアドレスとポート番号。
          `username` = 'Alibaba Cloud アカウントの AccessKey ID',
          `password` = 'Alibaba Cloud アカウントの AccessKey Secret',
          `dbname` = 'Hologres データベースの名前',
          `tablename` = 'blink_test'--Hologres でデータを受信するテーブルの名前。
        );
        
        insert
          into test
        select
          a,b,c
        from
          randomSource;
  3. ジョブを公開します。

    1. ジョブを作成後、エディターで 構文チェック をクリックします。成功しました と表示された場合、構文は正しいです。

    2. 保存 をクリックしてジョブを保存します。

    3. 公開 をクリックして、ジョブを本番環境に提出します。必要に応じて、ジョブ公開設定を構成してください。4

  4. ジョブを開始します。

    ジョブを本番環境に提出した後、手動で開始する必要があります。

    Real-time Compute for Flink 開発者プラットフォーム ページのメニューバー右上隅で、O&M をクリックします。O&M ページで開始したいジョブを選択し、右上隅の 起動 をクリックします。6

  5. Hologres のデータをリアルタイムでクエリします。

    Hologres でデータを受信するテーブルをクエリし、書き込まれたデータをリアルタイムで取得します。以下はクエリ SQL 文の例です。

    select * from blink_test;

ワイドテーブルのマージ/部分更新機能の使用方法

複数のストリームから単一の Hologres ワイドテーブルにデータを書き込む(一般的なシナリオ)には、以下の手順に従います。

Hologres に A、B、C、D、E 列を持つ WIDE_TABLE というワイドテーブルがあり、列 A がプライマリキーであると仮定します。1 つの Flink ストリームには列 A、B、C のデータが含まれ、もう 1 つのストリームには列 A、D、E のデータが含まれているとします。

  1. Flink SQL を使用して、2 つの Hologres 結果テーブルを宣言します。1 つのテーブルはフィールド A、B、C のみを宣言し、もう 1 つのテーブルはフィールド A、D、E のみを宣言します。両方のテーブルは WIDE_TABLE にマッピングされます。

  2. 両方の結果テーブルの mutatetype プロパティを insertorupdate に設定します。

  3. リトラクションメッセージによる DELETE リクエストの生成を防ぐため、両方の結果テーブルの ignoredelete プロパティを true に設定します。

  4. 2 つのストリームからのデータをそれぞれ対応する結果テーブルに挿入します。

このシナリオには以下の制限が適用されます。

  1. ワイドテーブルにはプライマリキーが必要です。

  2. 各ストリームからのデータには、プライマリキーの全フィールドが含まれている必要があります。

  3. 列指向テーブルのワイドテーブルマージシナリオでは、1 秒あたりのレコード数 (RPS) が高くなると CPU 使用率が高くなる可能性があります。テーブル内のフィールドの Dictionary encoding を無効にできます。

パーティション Hologres 結果テーブルへのリアルタイムデータ書き込み

Hologres は、リアルタイムデータ API を呼び出すことで、親パーティションテーブルに直接データを書き込むことをサポートしています。データは自動的に対応する子パーティションテーブルにルーティングされます。リアルタイムデータ API の詳細については、「リアルタイムデータ API」をご参照ください。

以下の制限が適用されます。

  • Hologres ではリストパーティションのみをサポートしています。

  • パーティションテーブルを作成する際は、パーティションキー列を明示的に指定する必要があります。パーティションキー列としてサポートされているのは text 型および int4 型のみです。

  • プライマリキーが設定されている場合、パーティションキー列はプライマリキーの一部である必要があります。

  • 子パーティションテーブルを作成する際、子テーブルのパーティションキー列には静的フィールド値を指定する必要があります。

  • 子パーティションテーブルに書き込まれるデータのパーティションキー列の値は、子テーブル作成時に定義された値と完全に一致している必要があります。一致しない場合はエラーが報告されます。

  • Hologres はデフォルトパーティションをサポートしていません。

  1. Hologres にパーティションテーブルを作成します。

    データを受信するためのパーティションテーブルを Hologres に作成し、対応する子パーティションテーブルを作成します。以下はテーブル作成 SQL 文の例です。

    --親パーティションテーブル test_message および対応する子パーティションテーブルを作成します。
    
    drop table if exists test_message;
    
    begin;
    create table test_message (
     "bizdate" text NOT NULL,
     "tag" text NOT NULL,
     "id" int4 NOT NULL,
     "title" text NOT NULL,
     "body" text,
    PRIMARY KEY (bizdate,tag,id)
    )
    PARTITION BY LIST (bizdate);
    commit;
    説明
    • コマンドを実行する際は、${bizdate} パラメーターを実際の値に置き換えてください。

    • Real-time Compute for Flink の排他モード バージョン 3.7 が、自動パーティション作成をサポートする最初のバージョンです。バージョン 3.7 より前のものを使用している場合、事前に Hologres で子パーティションテーブルを作成しておく必要があります。そうしないと、データインポートが失敗します。

  2. Real-time Compute for Flink の排他モードでジョブを作成します。

    以下は、Real-time Compute for Flink の排他モードでジョブを作成するための文の例です。

    説明

    以下の例は、Real-time Compute for Flink の排他モード V3.7 以降に適用されます。バージョン 3.7 より前のものを使用している場合は、V3.7 以降にスペックアップするか、自動子パーティションテーブル作成の構成を削除してください:`createparttable` = 'true'

    create table test_message_src(
      tag VARCHAR,
      id INTEGER,
      title VARCHAR,
      body VARCHAR
    ) with (
      type = 'random',
      `interval` = '10',
      `count` = '100'
    );
    
    create table test_message_sink (
      bizdate VARCHAR,
      tag VARCHAR,
      id INTEGER,
      title VARCHAR,
      body VARCHAR
    ) with (
      type = 'hologres',
      `endpoint` = '$ip:$port', --Hologres インスタンスの VPC ネットワークアドレス。
      `username` ='<AccessID>', --Alibaba Cloud アカウントの AccessKey ID。
      `password` = '<AccessKey>', --Alibaba Cloud アカウントの AccessKey Secret。
      `dbname` = '<DBname>', --Hologres データベースの名前。
      `tablename` = '<Tablename>', --Hologres データベース内のテーブル名。
      `partitionrouter` = 'true', --Hologres のパーティションテーブルにデータを書き込みます。
      `createparttable` = 'true' --Hologres で子パーティションテーブルを自動作成します。
    );
    
    insert into test_message_sink select "20200327",* from test_message_src;
    insert into test_message_sink select "20200328",* from test_message_src;
  3. ジョブを公開および開始します。

    詳細については、「ジョブを公開」および「ジョブを開始」の手順を「通常の Hologres 結果テーブルへのリアルタイムデータ書き込み」セクションでご参照ください。

  4. Hologres のデータをリアルタイムでクエリします。

    Hologres でデータを受信するテーブルをクエリし、書き込まれたデータをリアルタイムで取得します。以下はクエリ SQL 文の例です。

    select * from test_message;
    select * from test_message where bizdate = '20200327';

データ型のマッピング

Real-time Compute for Flink の排他モードと Hologres 間のデータ型マッピングの詳細については、「データ型まとめ」をご参照ください。