このトピックでは、Lindormコネクタの使用方法について説明します。
背景
Lindorm は、IoT、インターネット、車載インターネット (IoV) などのさまざまなシナリオでマルチモーダルデータを保存および処理するために開発および最適化された、クラウドネイティブのハイパーコンバージドデータベースサービスです。 Lindorm は、ロギング、監視、課金、広告、ソーシャルネットワーキング、旅行、リスク管理など、さまざまなシナリオに適しています。 Lindorm はまた、Alibaba Group のコアビジネスをサポートするデータベースサービスの 1 つでもあります。
Lindormは、次の機能を提供します。
ワイドテーブル、時系列、テキスト、オブジェクト、ストリーム、空間など、さまざまなデータの統合アクセスと処理をサポートします。
SQL、Apache HBase、Apache Cassandra、Amazon S3、時系列データベース(TSDB)、Hadoop分散ファイルシステム(HDFS)、Apache Solr、Kafkaなど、複数の標準インターフェースと互換性があります。 Lindormは、サードパーティのエコシステムツールともシームレスに統合できます。
次の表に、Lindormコネクタでサポートされている機能を示します。
項目 | 説明 |
テーブルタイプ | ディメンションテーブルとシンクテーブル |
実行モード | ストリーミングモード |
データ形式 | 該当なし |
メトリック | |
APIタイプ | SQL API |
Lindormエンジン | LindormTable |
シンクテーブルでのデータの更新または削除 | サポートされています |
使用上の注意
Lindorm HBase テーブルはサポートされていません。
Lindorm ワイドテーブルエンジンと Lindorm テーブルは事前に作成されます。 詳細については、「インスタンスを作成する」をご参照ください。
Lindorm クラスタと Flink ワークスペース間でネットワーク接続を確立する必要があります。 たとえば、Lindorm クラスタと Realtime Compute for Apache Flink は同じ VPC にあります。
構文
CREATE TABLE white_list (
id varchar,
name varchar,
age int,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'lindorm',
'seedserver' = '<yourSeedServer>', // Lindormサーバーのエンドポイント
'namespace' = '<yourNamespace>', // Lindormデータベースの名前空間
'username' = '<yourUsername>', // Lindormデータベースにアクセスするためのユーザー名
'password' = '<yourPassword>', // Lindormデータベースにアクセスするためのパスワード
'tableName' = '<yourTableName>', // Lindormテーブルの名前
'columnFamily' = '<yourColumnFamily>' // Lindormテーブルの列ファミリの名前
);コネクタオプション
一般
オプション
説明
タイプ
必須
デフォルト値
備考
connector
テーブルタイプ。
String
はい
デフォルト値なし
値を lindorm に設定します。
seedserver
Lindorm サーバーのエンドポイント。
String
はい
デフォルト値なし
Realtime Compute for Apache Flink は、Java 用 ApsaraDB for HBase API を呼び出して Lindorm にアクセスし、LindormTable を使用します。Lindorm サーバーのエンドポイントは、
host:portフォーマットです。詳細については、「Flink を使用して LindormTable に接続して使用する」をご参照ください。namespace
Lindorm データベースの名前空間。
String
はい
デフォルト値なし
N/A
username
Lindorm データベースへのアクセスに使用されるユーザー名。
String
はい
デフォルト値なし
N/A
password
Lindorm データベースへのアクセスに使用されるパスワード。
String
はい
デフォルト値なし
N/A
tableName
Lindorm テーブルの名前。
String
はい
デフォルト値なし
N/A
columnFamily
Lindorm テーブルのカラムファミリーの名前。
String
はい
デフォルト値なし
Lindorm テーブルの作成時にカラムファミリー名を指定しない場合は、デフォルトのカラムファミリー名 f を入力します。
retryIntervalMs
データ読み取りが失敗した場合に読み取り操作がリトライされる間隔。
Integer
いいえ
1000
単位: ミリ秒。
maxRetryTimes
データの読み取りまたは書き込みの最大リトライ回数。
Integer
いいえ
5
N/A
シンク固有
オプション
説明
タイプ
必須
デフォルト値
備考
bufferSize
一度に書き込むことができるデータレコードの数。
Integer
いいえ
500
N/A
flushIntervalMs
データ量が少ない場合にテーブルにデータが書き込まれる間隔。
Integer
いいえ
2000
単位: ミリ秒。
ignoreDelete
削除操作をスキップするかどうかを指定します。
Boolean
いいえ
false
有効な値:
true
false (デフォルト)
dynamicColumnSink
動的テーブル機能を有効にするかどうかを指定します。詳細については、このトピックの「動的テーブル」セクションをご参照ください。
Boolean
いいえ
false
有効な値:
true
false (デフォルト)
excludeUpdateColumns
更新しないフィールド。指定されたフィールドの更新された値は、結果テーブルに挿入されません。
String
いいえ
デフォルト値なし
複数のフィールドはコンマ (
,) で区切ります。たとえば、excludeUpdateColumnsオプションをa,b,cに設定した場合、a、b、cフィールドの更新は無視されます。説明VVR 8.0.9 以降でのみ、このオプションをサポートしています。
ディメンションテーブル固有
オプション
説明
タイプ
必須
デフォルト値
備考
partitionedJoin
パーティション分割に JoinKey を使用するかどうかを指定します。
Boolean
いいえ
false
有効な値:
true: JoinKey はパーティション分割に使用されます。データは各 JOIN ノードに分散され、キャッシュヒット率が向上します。
false (デフォルト): JoinKey はパーティション分割に使用されません。
shuffleEmptyKey
空の上流キーをダウンストリームノードにランダムにシャッフルするかどうかを指定します。
Boolean
いいえ
false
有効な値:
true: システムは、空の上流キーをダウンストリームノードにランダムにシャッフルします。
false (デフォルト): システムは、空の上流キーを各ダウンストリームノードの最初の並列スレッドにシャッフルします。最初の並列スレッドの番号は 0 です。
cache
キャッシュポリシー。
String
いいえ
None
有効な値:
None (デフォルト): データはキャッシュされません。
LRU: ディメンションテーブルで最近アクセスされたデータのみがキャッシュされます。
cacheオプションをLRUに設定した場合は、cacheSizeおよびcacheTTLMsオプションを設定する必要があります。cacheSize
キャッシュできるデータの行数。
Integer
いいえ
1000
cacheオプションをLRUに設定した場合、cacheSizeオプションを設定できます。cacheTTLMs
キャッシュのタイムアウト期間。
Integer
いいえ
デフォルト値なし
単位: ミリ秒。
cacheオプションをLRUに設定した場合、cacheTTLMsオプションを設定できます。デフォルトでは、キャッシュエントリは期限切れになりません。cacheEmpty
戻り値が空の JOIN クエリをキャッシュするかどうかを指定します。
Boolean
いいえ
true
説明Lindorm コネクタは、1 対多のルックアップ結合をサポートしています。最適なパフォーマンスを得るには、キャッシュ戦略とスループットに細心の注意を払ってください。
async
非同期モードでデータ同期を有効にするかどうかを指定します。
Boolean
いいえ
false
有効な値:
true
false (デフォルト)
asyncLindormRpcTimeoutMs
非同期モードでデータがリクエストされた場合のタイムアウト期間。
Integer
いいえ
300000
単位: ミリ秒。
動的テーブル
動的テーブル機能は、テーブルに列が指定されておらず、デプロイ状況に基づいて列が作成されてテーブルに挿入されるシナリオに適しています。 たとえば、日数を主キーとして、時間を列として使用して、1日あたりの時間ごとのトランザクション量を計算します。 各時間のデータは動的に生成されます。 次の表は、動的テーブルを示しています。
主キー | 列名:00:00 | 列名:01:00 |
2025-06-01 | 45 | 32 |
2025-06-02 | 76 | 34 |
動的テーブルは、次のDDLルールに準拠する必要があります。最初のいくつかの列は主キーとして定義されます。 最後の2つの列の最初の列の値は列名として使用され、最後の列の値は前の列の値として使用され、最後の2つの列のデータ型はVARCHARである必要があります。 サンプルコード:
CREATE TABLE lindorm_dynamic_output(
pk1 varchar, // 主キーの一部
pk2 varchar, // 主キーの一部
pk3 varchar, // 主キーの一部
c1 varchar, // 動的列名
c2 varchar, // 動的列値
PRIMARY KEY (pk1,pk2,pk3) NOT ENFORCED
) WITH (
'connector' = 'lindorm',
'seedserver' = '<yourSeedServer>',
'namespace' = '<yourNamespace>',
'username' = '<yourUsername>',
'password' = '<yourPassword>',
'tableName' = '<yourTableName>',
'columnFamily' = '<yourColumnFamily>'
);上記の例では、pk1、pk2、およびpk3が主キーとして使用されています。 c1とc2は、動的テーブルに必要な2つの列であり、最後の2つの列である必要があります。 c1とc2を除いて、主キーとして使用されていない列は許可されません。 データがLindormシンクテーブルに書き込まれるたびに、主キーの値 <pk1、pk2、pk3> に対応するデータレコードに列が追加または変更されます。 c1の値は列の名前として使用され、c2の値は列の値として使用されます。 データレコードが受信されるたびに、1つの列の値のみが追加または変更されます。 他の列の値は変更されません。
データ型マッピング
Lindormのすべてのデータはバイナリ形式です。 次の表は、Realtime Compute for Apache Flinkのフィールドのデータ型に基づいて、データをバイナリデータのバイトに変換する方法、またはバイナリデータのバイトを解析する方法を示しています。
Flink SQLのデータ型 | Lindormのバイトに変換するために使用されるメソッド | Lindormからバイトを解析するために使用されるメソッド |
CHAR | org.apache.flink.table.data.StringData::toBytes | org.apache.flink.table.data.StringData::fromBytes |
VARCHAR | ||
BOOLEAN | com.alibaba.lindorm.client.core.utils.Bytes::toBytes(boolean) | com.alibaba.lindorm.client.core.utils.Bytes::toBigDecimal |
BINARY | データを直接バイトに変換します。 | バイトを直接返します。 |
VARBINARY | ||
DECIMAL | com.alibaba.lindorm.client.core.utils.Bytes::toBytes(BigDecimal) | com.alibaba.lindorm.client.core.utils.Bytes::toBigDecimal |
TINYINT | データをbyte[]の最初のバイトに直接カプセル化します。 | bytes[0]を直接返します。 |
SMALLINT | com.alibaba.lindorm.client.core.utils.Bytes::toBytes(short) | com.alibaba.lindorm.client.core.utils.Bytes::toShort |
INT | com.alibaba.lindorm.client.core.utils.Bytes::toBytes(int) | com.alibaba.lindorm.client.core.utils.Bytes::toInt |
BIGINT | com.alibaba.lindorm.client.core.utils.Bytes::toBytes(long) | com.alibaba.lindorm.client.core.utils.Bytes::toLong |
FLOAT | com.alibaba.lindorm.client.core.utils.Bytes::toBytes(float) | com.alibaba.lindorm.client.core.utils.Bytes::toFloat |
DOUBLE | com.alibaba.lindorm.client.core.utils.Bytes::toBytes(double) | com.alibaba.lindorm.client.core.utils.Bytes::toDouble |
DATE | 1970年1月1日からの日数が取得された後、com.alibaba.lindorm.client.core.utils.Bytes::toBytes(int)を呼び出します。 | com.alibaba.lindorm.client.core.utils.Bytes::toIntを呼び出して、1970年1月1日からの日数を取得します。 |
TIME | 現在の日の00:00:00からのミリ秒数が取得された後、com.alibaba.lindorm.client.core.utils.Bytes::toBytes(int)を呼び出します。 | com.alibaba.lindorm.client.core.utils.Bytes::toIntを呼び出して、現在の日の00:00:00からのミリ秒数を取得します。 |
TIMESTAMP | 1970年1月1日の00:00:00からのミリ秒数が取得された後、com.alibaba.lindorm.client.core.utils.Bytes::toBytes(long)を呼び出します。 | com.alibaba.lindorm.client.core.utils.Bytes::toLongを呼び出して、1970年1月1日の00:00:00からのミリ秒数を取得します。 |
サンプルコード
CREATE TEMPORARY TABLE example_source(
id INT,
proc_time AS PROCTIME()
) WITH (
'connector' = 'datagen',
'number-of-rows' = '10',
'fields.id.kind' = 'sequence',
'fields.id.start' = '0',
'fields.id.end' = '9'
);
CREATE TEMPORARY TABLE lindorm_hbase_dim(
`id` INT,
`name` VARCHAR,
`birth` VARCHAR,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector'='lindorm',
'tablename'='${lindorm_dim_table}', // Lindormディメンションテーブルの名前
'seedserver'='${lindorm_seed_server}', // Lindormサーバーのエンドポイント
'namespace'='default',
'username'='${lindorm_username}', // Lindormデータベースにアクセスするためのユーザー名
'password'='${lindorm_username}' // Lindormデータベースにアクセスするためのパスワード
);
CREATE TEMPORARY TABLE lindorm_hbase_sink(
`id` INT,
`name` VARCHAR,
`birth` VARCHAR,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector'='lindorm',
'tablename'='${lindorm_sink_table}', // Lindormシンクテーブルの名前
'seedserver'='${lindorm_seed_server}', // Lindormサーバーのエンドポイント
'namespace'='default',
'username'='${lindorm_username}', // Lindormデータベースにアクセスするためのユーザー名
'password'='${lindorm_username}' // Lindormデータベースにアクセスするためのパスワード
);
INSERT INTO lindorm_hbase_sink
SELECT example_source.id as id, lindorm_hbase_dim.name as name, lindorm_hbase_dim.birth as birth
FROM example_source JOIN lindorm_hbase_dim FOR SYSTEM_TIME AS OF PROCTIME() ON example_source.id = lindorm_hbase_dim.id;