すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:Lindorm コネクタ

最終更新日:Oct 30, 2025

このトピックでは、Lindormコネクタの使用方法について説明します。

背景

Lindorm は、IoT、インターネット、車載インターネット (IoV) などのさまざまなシナリオでマルチモーダルデータを保存および処理するために開発および最適化された、クラウドネイティブのハイパーコンバージドデータベースサービスです。 Lindorm は、ロギング、監視、課金、広告、ソーシャルネットワーキング、旅行、リスク管理など、さまざまなシナリオに適しています。 Lindorm はまた、Alibaba Group のコアビジネスをサポートするデータベースサービスの 1 つでもあります。

Lindormは、次の機能を提供します。

  • ワイドテーブル、時系列、テキスト、オブジェクト、ストリーム、空間など、さまざまなデータの統合アクセスと処理をサポートします。

  • SQL、Apache HBase、Apache Cassandra、Amazon S3、時系列データベース(TSDB)、Hadoop分散ファイルシステム(HDFS)、Apache Solr、Kafkaなど、複数の標準インターフェースと互換性があります。 Lindormは、サードパーティのエコシステムツールともシームレスに統合できます。

次の表に、Lindormコネクタでサポートされている機能を示します。

項目

説明

テーブルタイプ

ディメンションテーブルとシンクテーブル

実行モード

ストリーミングモード

データ形式

該当なし

メトリック

監視メトリック

結果テーブルのメトリック:

  • numBytesOut

  • numBytesOutPerSecond

  • numRecordsOut

  • numRecordsOutPerSecond

説明

詳細については、「メトリック」をご参照ください。

APIタイプ

SQL API

Lindormエンジン

LindormTable

シンクテーブルでのデータの更新または削除

サポートされています

使用上の注意

  • Lindorm HBase テーブルはサポートされていません。

  • Lindorm ワイドテーブルエンジンと Lindorm テーブルは事前に作成されます。 詳細については、「インスタンスを作成する」をご参照ください。

  • Lindorm クラスタと Flink ワークスペース間でネットワーク接続を確立する必要があります。 たとえば、Lindorm クラスタと Realtime Compute for Apache Flink は同じ VPC にあります。

構文

CREATE TABLE white_list (
id varchar,
name varchar,
age int,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'lindorm',
'seedserver' = '<yourSeedServer>', // Lindormサーバーのエンドポイント
'namespace' = '<yourNamespace>',  // Lindormデータベースの名前空間
'username' = '<yourUsername>',  // Lindormデータベースにアクセスするためのユーザー名
'password' = '<yourPassword>',  // Lindormデータベースにアクセスするためのパスワード
'tableName' = '<yourTableName>', // Lindormテーブルの名前
'columnFamily' = '<yourColumnFamily>' // Lindormテーブルの列ファミリの名前
);

コネクタオプション

  • 一般

    オプション

    説明

    タイプ

    必須

    デフォルト値

    備考

    connector

    テーブルタイプ。

    String

    はい

    デフォルト値なし

    値を lindorm に設定します。

    seedserver

    Lindorm サーバーのエンドポイント。

    String

    はい

    デフォルト値なし

    Realtime Compute for Apache Flink は、Java 用 ApsaraDB for HBase API を呼び出して Lindorm にアクセスし、LindormTable を使用します。Lindorm サーバーのエンドポイントは、host:port フォーマットです。詳細については、「Flink を使用して LindormTable に接続して使用する」をご参照ください。

    namespace

    Lindorm データベースの名前空間。

    String

    はい

    デフォルト値なし

    N/A

    username

    Lindorm データベースへのアクセスに使用されるユーザー名。

    String

    はい

    デフォルト値なし

    N/A

    password

    Lindorm データベースへのアクセスに使用されるパスワード。

    String

    はい

    デフォルト値なし

    N/A

    tableName

    Lindorm テーブルの名前。

    String

    はい

    デフォルト値なし

    N/A

    columnFamily

    Lindorm テーブルのカラムファミリーの名前。

    String

    はい

    デフォルト値なし

    Lindorm テーブルの作成時にカラムファミリー名を指定しない場合は、デフォルトのカラムファミリー名 f を入力します。

    retryIntervalMs

    データ読み取りが失敗した場合に読み取り操作がリトライされる間隔。

    Integer

    いいえ

    1000

    単位: ミリ秒。

    maxRetryTimes

    データの読み取りまたは書き込みの最大リトライ回数。

    Integer

    いいえ

    5

    N/A

  • シンク固有

    オプション

    説明

    タイプ

    必須

    デフォルト値

    備考

    bufferSize

    一度に書き込むことができるデータレコードの数。

    Integer

    いいえ

    500

    N/A

    flushIntervalMs

    データ量が少ない場合にテーブルにデータが書き込まれる間隔。

    Integer

    いいえ

    2000

    単位: ミリ秒。

    ignoreDelete

    削除操作をスキップするかどうかを指定します。

    Boolean

    いいえ

    false

    有効な値:

    • true

    • false (デフォルト)

    dynamicColumnSink

    動的テーブル機能を有効にするかどうかを指定します。詳細については、このトピックの「動的テーブル」セクションをご参照ください。

    Boolean

    いいえ

    false

    有効な値:

    • true

    • false (デフォルト)

    excludeUpdateColumns

    更新しないフィールド。指定されたフィールドの更新された値は、結果テーブルに挿入されません。

    String

    いいえ

    デフォルト値なし

    複数のフィールドはコンマ (,) で区切ります。たとえば、excludeUpdateColumns オプションを a,b,c に設定した場合、a、b、c フィールドの更新は無視されます。

    説明

    VVR 8.0.9 以降でのみ、このオプションをサポートしています。

  • ディメンションテーブル固有

    オプション

    説明

    タイプ

    必須

    デフォルト値

    備考

    partitionedJoin

    パーティション分割に JoinKey を使用するかどうかを指定します。

    Boolean

    いいえ

    false

    有効な値:

    • true: JoinKey はパーティション分割に使用されます。データは各 JOIN ノードに分散され、キャッシュヒット率が向上します。

    • false (デフォルト): JoinKey はパーティション分割に使用されません。

    shuffleEmptyKey

    空の上流キーをダウンストリームノードにランダムにシャッフルするかどうかを指定します。

    Boolean

    いいえ

    false

    有効な値:

    • true: システムは、空の上流キーをダウンストリームノードにランダムにシャッフルします。

    • false (デフォルト): システムは、空の上流キーを各ダウンストリームノードの最初の並列スレッドにシャッフルします。最初の並列スレッドの番号は 0 です。

    cache

    キャッシュポリシー。

    String

    いいえ

    None

    有効な値:

    • None (デフォルト): データはキャッシュされません。

    • LRU: ディメンションテーブルで最近アクセスされたデータのみがキャッシュされます。

    cache オプションを LRU に設定した場合は、cacheSize および cacheTTLMs オプションを設定する必要があります。

    cacheSize

    キャッシュできるデータの行数。

    Integer

    いいえ

    1000

    cache オプションを LRU に設定した場合、cacheSize オプションを設定できます。

    cacheTTLMs

    キャッシュのタイムアウト期間。

    Integer

    いいえ

    デフォルト値なし

    単位: ミリ秒。cache オプションを LRU に設定した場合、cacheTTLMs オプションを設定できます。デフォルトでは、キャッシュエントリは期限切れになりません。

    cacheEmpty

    戻り値が空の JOIN クエリをキャッシュするかどうかを指定します。

    Boolean

    いいえ

    true

    説明

    Lindorm コネクタは、1 対多のルックアップ結合をサポートしています。最適なパフォーマンスを得るには、キャッシュ戦略とスループットに細心の注意を払ってください。

    async

    非同期モードでデータ同期を有効にするかどうかを指定します。

    Boolean

    いいえ

    false

    有効な値:

    • true

    • false (デフォルト)

    asyncLindormRpcTimeoutMs

    非同期モードでデータがリクエストされた場合のタイムアウト期間。

    Integer

    いいえ

    300000

    単位: ミリ秒。

動的テーブル

動的テーブル機能は、テーブルに列が指定されておらず、デプロイ状況に基づいて列が作成されてテーブルに挿入されるシナリオに適しています。 たとえば、日数を主キーとして、時間を列として使用して、1日あたりの時間ごとのトランザクション量を計算します。 各時間のデータは動的に生成されます。 次の表は、動的テーブルを示しています。

主キー

列名:00:00

列名:01:00

2025-06-01

45

32

2025-06-02

76

34

動的テーブルは、次のDDLルールに準拠する必要があります。最初のいくつかの列は主キーとして定義されます。 最後の2つの列の最初の列の値は列名として使用され、最後の列の値は前の列の値として使用され、最後の2つの列のデータ型はVARCHARである必要があります。 サンプルコード:

CREATE TABLE lindorm_dynamic_output(
pk1 varchar, // 主キーの一部
pk2 varchar, // 主キーの一部
pk3 varchar, // 主キーの一部
c1 varchar,  // 動的列名
c2 varchar,  // 動的列値
PRIMARY KEY (pk1,pk2,pk3) NOT ENFORCED
) WITH (
'connector' = 'lindorm',
'seedserver' = '<yourSeedServer>',
'namespace' = '<yourNamespace>',
'username' = '<yourUsername>',
'password' = '<yourPassword>',
'tableName' = '<yourTableName>',
'columnFamily' = '<yourColumnFamily>'
);

上記の例では、pk1、pk2、およびpk3が主キーとして使用されています。 c1とc2は、動的テーブルに必要な2つの列であり、最後の2つの列である必要があります。 c1とc2を除いて、主キーとして使用されていない列は許可されません。 データがLindormシンクテーブルに書き込まれるたびに、主キーの値 <pk1、pk2、pk3> に対応するデータレコードに列が追加または変更されます。 c1の値は列の名前として使用され、c2の値は列の値として使用されます。 データレコードが受信されるたびに、1つの列の値のみが追加または変更されます。 他の列の値は変更されません。

データ型マッピング

Lindormのすべてのデータはバイナリ形式です。 次の表は、Realtime Compute for Apache Flinkのフィールドのデータ型に基づいて、データをバイナリデータのバイトに変換する方法、またはバイナリデータのバイトを解析する方法を示しています。

Flink SQLのデータ型

Lindormのバイトに変換するために使用されるメソッド

Lindormからバイトを解析するために使用されるメソッド

CHAR

org.apache.flink.table.data.StringData::toBytes

org.apache.flink.table.data.StringData::fromBytes

VARCHAR

BOOLEAN

com.alibaba.lindorm.client.core.utils.Bytes::toBytes(boolean)

com.alibaba.lindorm.client.core.utils.Bytes::toBigDecimal

BINARY

データを直接バイトに変換します。

バイトを直接返します。

VARBINARY

DECIMAL

com.alibaba.lindorm.client.core.utils.Bytes::toBytes(BigDecimal)

com.alibaba.lindorm.client.core.utils.Bytes::toBigDecimal

TINYINT

データをbyte[]の最初のバイトに直接カプセル化します。

bytes[0]を直接返します。

SMALLINT

com.alibaba.lindorm.client.core.utils.Bytes::toBytes(short)

com.alibaba.lindorm.client.core.utils.Bytes::toShort

INT

com.alibaba.lindorm.client.core.utils.Bytes::toBytes(int)

com.alibaba.lindorm.client.core.utils.Bytes::toInt

BIGINT

com.alibaba.lindorm.client.core.utils.Bytes::toBytes(long)

com.alibaba.lindorm.client.core.utils.Bytes::toLong

FLOAT

com.alibaba.lindorm.client.core.utils.Bytes::toBytes(float)

com.alibaba.lindorm.client.core.utils.Bytes::toFloat

DOUBLE

com.alibaba.lindorm.client.core.utils.Bytes::toBytes(double)

com.alibaba.lindorm.client.core.utils.Bytes::toDouble

DATE

1970年1月1日からの日数が取得された後、com.alibaba.lindorm.client.core.utils.Bytes::toBytes(int)を呼び出します。

com.alibaba.lindorm.client.core.utils.Bytes::toIntを呼び出して、1970年1月1日からの日数を取得します。

TIME

現在の日の00:00:00からのミリ秒数が取得された後、com.alibaba.lindorm.client.core.utils.Bytes::toBytes(int)を呼び出します。

com.alibaba.lindorm.client.core.utils.Bytes::toIntを呼び出して、現在の日の00:00:00からのミリ秒数を取得します。

TIMESTAMP

1970年1月1日の00:00:00からのミリ秒数が取得された後、com.alibaba.lindorm.client.core.utils.Bytes::toBytes(long)を呼び出します。

com.alibaba.lindorm.client.core.utils.Bytes::toLongを呼び出して、1970年1月1日の00:00:00からのミリ秒数を取得します。

サンプルコード

CREATE TEMPORARY TABLE example_source(
 id INT,
 proc_time AS PROCTIME()
) WITH (
 'connector' = 'datagen',
 'number-of-rows' = '10',
 'fields.id.kind' = 'sequence',
 'fields.id.start' = '0',
 'fields.id.end' = '9'
);

CREATE TEMPORARY TABLE lindorm_hbase_dim(
 `id` INT,
 `name` VARCHAR,
 `birth` VARCHAR,
 PRIMARY KEY (id) NOT ENFORCED
) WITH (
 'connector'='lindorm',
 'tablename'='${lindorm_dim_table}', // Lindormディメンションテーブルの名前
 'seedserver'='${lindorm_seed_server}', // Lindormサーバーのエンドポイント
 'namespace'='default',
 'username'='${lindorm_username}', // Lindormデータベースにアクセスするためのユーザー名
 'password'='${lindorm_username}' // Lindormデータベースにアクセスするためのパスワード
);

CREATE TEMPORARY TABLE lindorm_hbase_sink(
 `id` INT,
 `name` VARCHAR,
 `birth` VARCHAR,
 PRIMARY KEY (id) NOT ENFORCED
) WITH (
 'connector'='lindorm',
 'tablename'='${lindorm_sink_table}', // Lindormシンクテーブルの名前
 'seedserver'='${lindorm_seed_server}', // Lindormサーバーのエンドポイント
 'namespace'='default',
 'username'='${lindorm_username}', // Lindormデータベースにアクセスするためのユーザー名
 'password'='${lindorm_username}' // Lindormデータベースにアクセスするためのパスワード
);

INSERT INTO lindorm_hbase_sink
SELECT example_source.id as id, lindorm_hbase_dim.name as name, lindorm_hbase_dim.birth as birth
FROM example_source JOIN lindorm_hbase_dim FOR SYSTEM_TIME AS OF PROCTIME() ON example_source.id = lindorm_hbase_dim.id;

FAQ

Lindormの接続エラーと解決策