すべてのプロダクト
Search
ドキュメントセンター

Hologres:Apache Flink 1.11 以降から Hologres にリアルタイムでデータを書き込む

最終更新日:Nov 20, 2025

このトピックでは、Apache Flink 1.11 以降から Hologres にリアルタイムでデータを書き込む方法について説明します。

前提条件

  • Hologres インスタンスが作成され、開発ツールを使用してインスタンスに接続します。詳細については、「HoloWeb に接続する」をご参照ください。

  • Apache Flink クラスターが作成されます。この例では、Apache Flink 1.15 のクラスターが作成されます。Apache Flink の公式 Web サイトからバイナリファイルをダウンロードし、スタンドアロンモードで動作する Apache Flink クラスターを作成できます。詳細については、「Flink のローカルインストール」をご参照ください。

背景

Hologres コネクタのコードは、Apache Flink 1.11 以降でオープンソース化されています。さまざまな Apache Flink バージョンに対応する Hologres コネクタの リリースパッケージ は、Maven リポジトリで公開されています。次の pom.xml ファイルは、構成の例を示しています。

<dependency>
    <groupId>com.alibaba.hologres</groupId>
    <artifactId>hologres-connector-flink-1.15</artifactId>
    <version>1.4.0</version>
    <classifier>jar-with-dependencies</classifier>
</dependency>

次の表に、Apache Flink と Hologres コネクタ のバージョンマッピングを示します。より多くの機能を使用するには、バージョン 1.15 以降を使用することを推奨します。

Apache Flink バージョン

Hologres コネクタバージョン

Apache Flink 1.11

hologres-connector-flink-1.11:1.0.1

Apache Flink 1.12

hologres-connector-flink-1.12:1.0.1

Apache Flink 1.13

hologres-connector-flink-1.13:1.3.2

Apache Flink 1.14

hologres-connector-flink-1.14:1.3.2

Apache Flink 1.15

hologres-connector-flink-1.15:1.4.1

Apache Flink 1.17

hologres-connector-flink-1.17:1.4.1

サンプルコード

次の構文を使用する Flink SQL 文を実行して、Hologres にデータを書き込むことができます。

        String createHologresTable =
                String.format(
                        "create table sink("
                                + "  user_id bigint,"
                                + "  user_name string,"
                                + "  price decimal(38,2),"
                                + "  sale_timestamp timestamp"
                                + ") with ("
                                + "  'connector'='hologres',"
                                + "  'dbname' = '%s',"
                                + "  'tablename' = '%s',"
                                + "  'username' = '%s',"
                                + "  'password' = '%s',"
                                + "  'endpoint' = '%s'"
                                + ")",
                        database, tableName, userName, password, endPoint);
        tEnv.executeSql(createHologresTable);

        createScanTable(tEnv);

        tEnv.executeSql("insert into sink select * from source");

  • FlinkSQLToHoloExample: Flink SQL インターフェイスに基づいて Hologres にデータを書き込むために使用されるアプリケーション。

  • FlinkDSAndSQLToHoloExample: データストリームをテーブルに変換して Hologres に書き込むためのアプリケーション。Flink DataStream インターフェイスは変換に使用され、Flink SQL インターフェイスはデータ書き込みに使用されます。

  • FlinkDataStreamToHoloExample: Flink DataStream インターフェイスに基づいて Hologres にデータストリームを書き込むためのアプリケーション。

  • FlinkRoaringBitmapAggJob: リアルタイムで重複排除後にユニークビジター (UV) 数をカウントし、統計を Hologres に書き込むために使用されるアプリケーション。このアプリケーションには、Flink、roaring bitmap、および Hologres ディメンションテーブルが使用されます。

  • FlinkToHoloRePartitionExample: Flink DataStream インターフェイスに基づいて、リアルタイムのデータ書き込み中に Hologres のシャードにデータをパーティション分割するために使用されるアプリケーション。これにより、Hologres 内の small ファイルの数が大幅に減少し、書き込みパフォーマンスが向上し、システム負荷が軽減されます。このアプリケーションは、複数のプライマリキーが構成された空のテーブルに同時にデータをインポートするシナリオに適しています。このアプリケーションは、INSERT OVERWRITE 操作と同様の効果を実現します。

Hologres コネクタのオプション

Hologres コネクタを使用して、Apache Flink から Hologres にデータを書き込むことができます。次の表に、Hologres コネクタで使用されるパラメーターを示します。

オプション

必須

説明

connector

はい

シンクテーブルのタイプ。値を hologres に設定します。

dbname

はい

Hologres データベースの名前。

tablename

はい

データを書き込む Hologres テーブルの名前。

username

はい

Alibaba Cloud アカウントの AccessKey ID。

AccessKey ペア ページから AccessKey ID を取得できます。

password

はい

Alibaba Cloud アカウントの AccessKey Secret。

AccessKey ペア ページから AccessKey Secret を取得できます。

endpoint

はい

Hologres インスタンスの VPC (Virtual Private Cloud) エンドポイント。Hologres コンソール のインスタンス詳細ページで Hologres インスタンスのエンドポイントを表示できます。

説明

エンドポイントにはポート番号を含める必要があり、IP アドレス:ポート番号 形式である必要があります。Hologres インスタンスと Apache Flink クラスターが同じリージョンにある場合は、Hologres インスタンスの VPC エンドポイントを使用します。Hologres インスタンスと Apache Flink クラスターが異なるリージョンにある場合は、Hologres インスタンスのパブリックエンドポイントを使用します。

  • 接続固有

    オプション

    必須

    説明

    connectionSize

    いいえ

    Flink Hologres タスクによって作成された単一の接続プール内の Java Database Connectivity (JDBC) 接続の数。

    デフォルト値: 3。この値はスループットに比例します。

    connectionPoolName

    いいえ

    接続プールの名前。同じ TaskManager 内で、同じ接続プールが構成されているテーブルは、接続プールを共有できます。

    デフォルト値はありません。各テーブルは、デフォルトで独自の接続プールを使用します。接続プール名を指定する場合、すべてのテーブルの connectionSize パラメーターの値は同じである必要があります。

    fixedConnectionMode

    いいえ

    データ書き込みおよびポイントクエリ操作は接続を消費しません。この機能は Beta リリースであり、コネクタのバージョンが 1.2.0 以降、Hologres のバージョンが 1.3 以降である必要があります。

    デフォルト値: false。

    jdbcRetryCount

    いいえ

    接続エラーが発生した場合に、データの書き込みとクエリを再試行できる最大回数。

    デフォルト値: 10。

    jdbcRetrySleepInitMs

    いいえ

    リトライが実行される間隔は、次の数式を使用して計算されます: retrySleepInitMs パラメーターの値 + リトライ回数 × retrySleepStepMs パラメーターの値。

    単位: ミリ秒。デフォルト値: 1000。

    jdbcRetrySleepStepMs

    いいえ

    リトライが実行される間隔は、次の数式を使用して計算されます: retrySleepInitMs パラメーターの値 + リトライ回数 × retrySleepStepMs パラメーターの値。

    単位: ミリ秒。デフォルト値: 5000。

    jdbcConnectionMaxIdleMs

    いいえ

    データの書き込みおよびポイントクエリに使用される接続に適用されるアイドルタイムアウト期間。タイムアウト期間が過ぎると、接続は解放されます。

    単位: ミリ秒。デフォルト値: 60000。

    jdbcMetaCacheTTL

    いいえ

    キャッシュ内のテーブルスキーマ情報の生存時間 (TTL) 期間。

    単位: ミリ秒。デフォルト値: 60000。

    jdbcMetaAutoRefreshFactor

    いいえ

    キャッシュを自動的にリフレッシュするタイミングを決定する係数。キャッシュ内のテーブルスキーマ情報の残りの TTL 期間が、jdbcMetaCacheTTL パラメーターの値をこのパラメーターの値で割った結果よりも短い場合、キャッシュは自動的にリフレッシュされます。

    デフォルト値: -1。デフォルト値は、キャッシュが自動的にリフレッシュされないことを示します。

    connection.ssl.mode

    いいえ

    SSL 暗号化伝送を有効にするかどうかを指定します。有効な値:

    • disable: SSL 暗号化伝送は無効です。これがデフォルト値です。

    • require: SSL 暗号化伝送が有効です。クライアントは SSL 暗号化伝送を使用して、データ送信に使用される接続のみを暗号化します。

    • verify-ca: SSL 暗号化伝送が有効です。クライアントは SSL 暗号化伝送を使用してデータ送信接続を暗号化し、CA 証明書を使用して Hologres サーバーを検証します。

    • verify-full: SSL 暗号化伝送が有効です。クライアントは SSL 暗号化伝送を使用してデータ送信接続を暗号化し、CA 証明書を使用して Hologres サーバーを検証し、CA 証明書で指定された CN またはドメインネームシステム (DNS) が接続設定時に指定された Hologres エンドポイントと一致するかどうかを確認します。

    connection.ssl.root-cert.location

    いいえ

    CA 証明書のパス。CA 証明書が Apache Flink クラスターにアップロードされていることを確認する必要があります。

    説明

    このパラメーターは、connection.ssl.mode パラメーターを verify-ca または verify-full に設定した場合に必須です。

    jdbcDirectConnect

    いいえ

    直接接続モードを有効にするかどうかを指定します。有効な値:

    • false: 無効。これがデフォルト値です。

    • true: 有効。

      Apache Flink を使用して書き込むことができるデータ量は、VPC エンドポイントで許可されるスループットによって決まります。このパラメーターを true に設定すると、システムは現在の環境で Apache Flink が Hologres FE ノードに直接接続できるかどうかを確認します。Apache Flink が現在の環境で Hologres FE ノードに直接接続できる場合、デフォルトで直接接続が使用されます。

  • シンク固有

    パラメーター

    必須

    説明

    mutatetype

    いいえ

    データが書き込まれるモード。詳細については、「Hologres 結果テーブルの作成」の「ストリーミングセマンティクス」セクションをご参照ください。

    デフォルト値: insertorignore。

    ignoredelete

    いいえ

    リトラクトメッセージを無視するかどうかを指定します。ほとんどの場合、リトラクトメッセージは Flink の GROUP BY 操作によって生成されます。リトラクトメッセージが Hologres コネクタに転送されると、Delete リクエストが生成されます。

    デフォルト値: true。このパラメーターは、ストリーミングセマンティクスが使用されている場合にのみ有効です。

    createparttable

    いいえ

    パーティションテーブルにデータを書き込む場合に、パーティションキーの値に基づいてパーティションを自動的に作成するかどうかを指定します。

    • false: パーティションは自動的に作成されません。これがデフォルト値です。

    • true: パーティションは自動的に作成されます。

    このパラメーターを使用する際は注意してください。パーティションキーの値にダーティデータが含まれていないことを確認してください。そうしないと、無効なパーティションが作成されます。

    ignoreNullWhenUpdate

    いいえ

    mutatetype='insertOrUpdate' 設定が使用されている場合に、データに書き込まれる null 値を無視するかどうかを指定します。

    デフォルト値: false。

    jdbcWriteBatchSize

    いいえ

    Hologres ストリーミングシンクノードのバッチで収集できるデータエントリの最大数。

    デフォルト値: 256。

    jdbcWriteBatchByteSize

    いいえ

    スレッド内の Hologres ストリーミングシンクノードのバッチで収集できるデータの最大サイズ。単位: バイト。

    デフォルト値: 2097152 (2 × 1024 × 1024)、つまり 2 MB。

    jdbcWriteBatchTotalByteSize

    いいえ

    すべてのスレッドの Hologres ストリーミングシンクノードのバッチで収集できるデータの最大サイズ。単位: バイト。

    デフォルト値: 20971520 (20 × 1024 × 1024)、つまり 20 MB。

    jdbcWriteFlushInterval

    いいえ

    FLUSH 操作が完了するのを待つ最大時間。FLUSH 操作は、Hologres ストリーミングシンクノードのバッチで収集されたデータエントリに対して実行されます。

    単位: ミリ秒。デフォルト値: 10000、つまり 10 秒。

    jdbcUseLegacyPutHandler

    いいえ

    SQL 文で使用される構文。有効な値:

    • true: insert into xxx(c0,c1,...) values (?,,..),.. on conflict; 構文が使用されます。

    • false: insert into xxx(c0,c1,...) select unnest(?),unnest(?),.. on conflict 構文が使用されます。これがデフォルト値です。

    jdbcEnableDefaultForNotNullColumn

    いいえ

    非 NULL 制約を採用し、デフォルト値が指定されていない列に null 値が書き込まれるときに、null 値を指定されたデフォルト値に変換するかどうかを指定します。宛先列が STRING 型の場合、null 値は空の文字列 ("") に変換されます。宛先列が NUMBER 型の場合、null 値は 0 に変換されます。宛先列が DATE、TIMESTAMP、または TIMESTAMPTZ 型の場合、null 値は 1970-01-01 00:00:00 に変換されます。

    デフォルト値: true。

    remove-u0000-in-text.enabled

    いいえ

    UTF-8 でエンコードされていない TEXT 型の u0000 文字を自動的に置き換えるかどうかを指定します。有効な値:

    • true: u0000 文字は置き換えられます。

    • false: u0000 文字は置き換えられません。これがデフォルト値です。

    deduplication.enabled

    いいえ

    同時に書き込まれるデータ量に同じプライマリキー値を持つデータレコードが含まれている場合に、重複排除を実行するかどうかを指定します。有効な値:

    • true: 重複排除が実行されます。他のデータレコードと同じプライマリキー値を持つ最後のデータレコードのみが保持されます。これがデフォルト値です。

    • false: 重複排除は実行されません。

      バッチ処理されたデータが書き込まれた後、挿入する必要があるデータが書き込まれます。

      説明

      重複排除が無効になっている場合、極端なケースではバッチデータの書き込みが実行されないことがあります。たとえば、書き込む必要があるすべてのデータが同じプライマリキー値を持つ場合、データを同時に書き込むことはできません。これは書き込みパフォーマンスに悪影響を及ぼします。

    aggressive.enabled

    いいえ

    アグレッシブコミットモードを有効にするかどうかを指定します。有効な値:

    • true: 有効。

      アグレッシブコミットモードでは、同時に処理されるデータレコードの数が期待される数に達したかどうかに関係なく、アイドル状態の接続を検出すると、システムは強制的にデータをコミットします。このモードでは、トラフィックが少ない場合にデータ転送のレイテンシを短縮できます。

    • false: 無効。これがデフォルト値です。

    jdbcCopyWriteMode

    いいえ

    データ書き込みモードを指定します。有効な値:

    • true: バルクロードを使用します。jdbcCopyWriteMode オプションも true に設定されている場合、この設定が有効になります。それ以外の場合は、固定コピーが使用されます。

      説明
      • バルクロードは固定コピーよりも効率的です。Hologres リソースをより効果的に使用し、データ書き込みのパフォーマンスを向上させます。

      • プライマリキーを持つテーブルへのバルクロードは、通常、テーブルロックを引き起こします。これを防ぐには、target-shards.enabledtrue に設定して、ロックの粒度をシャードレベルに減らします。これにより、複数のバルクロードジョブが同時に実行され、テーブルロックの発生が減少します。バルクロードは、固定コピーと比較して Hologres インスタンスの負荷も大幅に削減します。テストでは、約 66.7% の負荷削減が示されています。

      • プライマリキーを持つテーブルにバルクロードを使用して書き込む場合、テーブルは空である必要があります。これにより、書き込みプロセス中のプライマリキーの重複排除とパフォーマンスの低下が回避されます。

    • false (デフォルト): INSERT メソッドを使用してデータを書き込みます。

    説明

    Hologres V1.3.1 以降のみがこのオプションをサポートしています。

    jdbcCopyWriteFormat

    いいえ

    バイナリプロトコルを使用するかどうかを指定します。

    • binary: バイナリプロトコルが使用され、より高速な転送速度を提供します。これがデフォルト値です。

    • text: テキストモードが使用されます。

    Hologres V1.3.1 以降のみがこのパラメーターをサポートしています。

    bulkLoad

    バルクロードを使用してデータを書き込むかどうかを指定します。有効な値:

    • true: バルクロードを使用してデータを書き込みます。このオプション値は、jdbcCopyWriteModetrue に設定されている場合にのみ有効です。

      説明
      • 固定コピーと比較して、バルクロードはより良いリソース使用率を提供します。

      • デフォルトでは、バルクロードはプライマリキーのないテーブルにデータを書き込む場合にのみサポートされます。プライマリキーが構成されたテーブルにデータを書き込むと、テーブルレベルのロックが取得される場合があります。これにより、制限が生じたり、データの書き込みが複雑になったりする可能性があります。

    • false (デフォルト): バルクロードを使用しません。

    説明

    Hologres V1.4.0 以降のみがこのオプションをサポートしています。

    target-shards.enabled

    ターゲットシャードへのバルクロードを有効にするかどうかを指定します。有効な値:

    • true

    • false (デフォルト)

    説明

    Hologres V1.4.1 以降のみがこのオプションをサポートしています。

  • ポイントクエリ固有

    パラメーター

    必須

    説明

    jdbcReadBatchSize

    いいえ

    ディメンションテーブルに基づいてポイントクエリを実行するための、スレッド内のバッチで許可されるリクエストの最大数。

    デフォルト値: 128。

    jdbcReadBatchQueueSize

    いいえ

    ディメンションテーブルに基づいてポイントクエリを実行するための、スレッドで許可されるキューに入れられたリクエストの最大数。

    デフォルト値: 256。

    async

    いいえ

    非同期モードでデータを同期するかどうかを指定します。

    デフォルト値: false。非同期モードでは、複数のリクエストと応答が同時に処理されます。したがって、連続するリクエストは互いにブロックせず、クエリのスループットが向上します。ただし、非同期モードでは、リクエストは絶対的な順序で処理されません。

    cache

    いいえ

    キャッシュポリシー。

    デフォルト値: None。有効な値: None および LRU。None はデータがキャッシュされないことを示します。LRU は、ディメンションテーブル内の一部のデータがキャッシュされることを示します。システムは、ソーステーブルからデータレコードを受信するたびに、キャッシュ内でデータレコードを検索します。システムがキャッシュ内でレコードを見つけられない場合、システムは物理ディメンションテーブルでデータレコードを検索します。

    cachesize

    いいえ

    キャッシュできるデータレコードの最大数。

    デフォルト値: 10000。cache パラメーターを LRU に設定した場合、cachesize パラメーターを構成できます。

    cachettlms

    いいえ

    キャッシュが更新される間隔。単位: ミリ秒。

    cache パラメーターを LRU に設定した場合、cachettlms パラメーターを構成できます。デフォルトでは、キャッシュエントリは有効期限切れになりません。

    cacheempty

    いいえ

    戻り結果が空の JOIN クエリのデータをキャッシュするかどうかを指定します。

    デフォルト値: true。この値は、システムが戻り結果が空の JOIN クエリのデータをキャッシュすることを示します。false: システムは、戻り結果が空の JOIN クエリのデータをキャッシュしません。

  • データの型のマッピング

    フルマネージド Flink と Hologres 間のデータの型のマッピングについては、「データの型」の「Realtime Compute for Apache Flink または Blink と Hologres 間のデータの型のマッピング」セクションをご参照ください。