全部产品
Search
文档中心

Realtime Compute for Apache Flink:Konektor Lindorm

更新时间:Oct 30, 2025

Topik ini menjelaskan cara menggunakan Konektor Lindorm.

Latar Belakang

Lindorm adalah layanan basis data hiper-konvergen cloud-native yang dioptimalkan untuk menyimpan dan memproses data multimodal dalam berbagai skenario seperti IoT, Internet, dan Internet of Vehicles (IoV). Lindorm cocok untuk skenario seperti pencatatan, pemantauan, penagihan, periklanan, jejaring sosial, perjalanan, dan manajemen risiko. Lindorm juga merupakan salah satu layanan basis data yang mendukung bisnis inti Alibaba Group.

Lindorm menyediakan fitur-fitur berikut:

  • Mendukung akses dan pemrosesan terpadu dari berbagai jenis data seperti tabel lebar, deret waktu, teks, objek, aliran, dan ruang.

  • Kompatibel dengan beberapa antarmuka standar seperti SQL, Apache HBase, Apache Cassandra, Amazon S3, Time Series Database (TSDB), Hadoop Distributed File System (HDFS), Apache Solr, dan Kafka. Lindorm juga dapat diintegrasikan secara mulus dengan alat ekosistem pihak ketiga.

Tabel berikut menggambarkan kemampuan yang didukung oleh Konektor Lindorm.

Item

Deskripsi

Jenis tabel

Tabel dimensi dan tabel sink

Mode operasi

Mode streaming

Format data

Tidak tersedia

Metrik

Indikator pemantauan

Metrik untuk tabel sink:

  • numBytesOut

  • numBytesOutPerSecond

  • numRecordsOut

  • numRecordsOutPerSecond

Catatan

Untuk informasi lebih lanjut tentang metrik, lihat Metrik.

Jenis API

SQL API

Mesin Lindorm

LindormTable

Pembaruan atau penghapusan data dalam tabel sink

Didukung

Catatan penggunaan

  • Tabel HBase Lindorm tidak didukung.

  • Mesin tabel lebar Lindorm dan tabel Lindorm harus dibuat terlebih dahulu. Untuk informasi lebih lanjut, lihat Buat Instans.

  • Koneksi jaringan harus dibuat antara kluster Lindorm dan ruang kerja Flink. Sebagai contoh, kluster Lindorm dan Realtime Compute for Apache Flink berada dalam virtual private cloud (VPC) yang sama.

Sintaksis

CREATE TABLE white_list (
id varchar,
name varchar,
age int,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'lindorm',
'seedserver' = '<yourSeedServer>',
'namespace' = '<yourNamespace>',
'username' = '<yourUsername>',
'password' = '<yourPassword>',
'tableName' = '<yourTableName>',
'columnFamily' = '<yourColumnFamily>'
);

Opsi Konektor

  • Umum

    Opsi

    Deskripsi

    Tipe

    Diperlukan

    Nilai default

    Catatan

    connector

    Jenis tabel.

    String

    Ya

    Tidak ada nilai default

    Setel nilainya menjadi lindorm.

    seedserver

    Titik akhir server Lindorm.

    String

    Ya

    Tidak ada nilai default

    Realtime Compute for Apache Flink memanggil API Java ApsaraDB untuk HBase untuk mengakses Lindorm dan menggunakan LindormTable. Titik akhir server Lindorm berada dalam format host:port. Untuk informasi lebih lanjut, lihat Gunakan Flink untuk terhubung dan menggunakan LindormTable.

    namespace

    Namespace database Lindorm.

    String

    Ya

    Tidak ada nilai default

    Tidak tersedia

    username

    Nama pengguna yang digunakan untuk mengakses database Lindorm.

    String

    Ya

    Tidak ada nilai default

    Tidak tersedia

    password

    Kata sandi yang digunakan untuk mengakses database Lindorm.

    String

    Ya

    Tidak ada nilai default

    Tidak tersedia

    tableName

    Nama tabel Lindorm.

    String

    Ya

    Tidak ada nilai default

    Tidak tersedia

    columnFamily

    Nama keluarga kolom tabel Lindorm.

    String

    Ya

    Tidak ada nilai default

    Jika nama keluarga kolom tidak ditentukan saat Anda membuat tabel Lindorm, masukkan nama keluarga kolom default f.

    retryIntervalMs

    Interval waktu operasi baca diulang ketika pembacaan data gagal.

    Integer

    Tidak

    1000

    Satuan: milidetik.

    maxRetryTimes

    Jumlah maksimum percobaan ulang untuk membaca atau menulis data.

    Integer

    Tidak

    5

    Tidak tersedia

  • Spesifik Sink

    Opsi

    Deskripsi

    Tipe

    Diperlukan

    Nilai default

    Catatan

    bufferSize

    Jumlah catatan data yang dapat ditulis pada satu waktu.

    Integer

    Tidak

    500

    Tidak tersedia

    flushIntervalMs

    Interval waktu data ditulis ke tabel ketika jumlah data sedikit.

    Integer

    Tidak

    2000

    Satuan: milidetik.

    ignoreDelete

    Menentukan apakah akan melewati operasi penghapusan.

    Boolean

    Tidak

    false

    Nilai valid:

    • true

    • false (default)

    dynamicColumnSink

    Menentukan apakah akan mengaktifkan fitur tabel dinamis. Untuk informasi lebih lanjut tentang fitur tabel dinamis, lihat bagian Tabel Dinamis dari topik ini.

    Boolean

    Tidak

    false

    Nilai valid:

    • true

    • false (default)

    excludeUpdateColumns

    Bidang yang tidak diperbarui. Nilai pembaruan dari bidang yang ditentukan tidak dimasukkan ke dalam tabel sink.

    String

    Tidak

    Tidak ada nilai default

    Pisahkan beberapa bidang dengan koma (,). Sebagai contoh, jika Anda menyetel opsi excludeUpdateColumns menjadi a,b,c, pembaruan pada bidang a, b, dan c diabaikan.

    Catatan

    Hanya VVR 8.0.9 atau yang lebih baru yang mendukung opsi ini.

  • Spesifik Tabel Dimensi

    Opsi

    Deskripsi

    Tipe

    Diperlukan

    Nilai default

    Catatan

    partitionedJoin

    Menentukan apakah akan menggunakan JoinKey untuk partisi.

    Boolean

    Tidak

    false

    Nilai valid:

    • true: JoinKey digunakan untuk partisi. Data didistribusikan ke setiap node JOIN untuk meningkatkan tingkat hit cache.

    • false (default): JoinKey tidak digunakan untuk partisi.

    shuffleEmptyKey

    Menentukan apakah akan mengacak kunci upstream kosong secara acak ke node downstream.

    Boolean

    Tidak

    false

    Nilai valid:

    • true: Sistem mengacak kunci upstream kosong secara acak ke node downstream.

    • false (default): Sistem mengacak kunci upstream kosong ke thread paralel pertama dari setiap node downstream. Thread paralel pertama bernomor 0.

    cache

    Kebijakan cache.

    String

    Tidak

    None

    Nilai valid:

    • None (default): Tidak ada data yang di-cache.

    • LRU: Hanya data yang baru saja diakses dalam tabel dimensi yang di-cache.

    Jika Anda menyetel opsi cache menjadi LRU, Anda harus mengonfigurasi opsi cacheSize dan cacheTTLMs.

    cacheSize

    Jumlah baris data yang dapat di-cache.

    Integer

    Tidak

    1000

    Jika Anda menyetel opsi cache menjadi LRU, Anda dapat mengonfigurasi opsi cacheSize.

    cacheTTLMs

    Periode waktu habis cache.

    Integer

    Tidak

    Tidak ada nilai default

    Satuan: milidetik. Jika Anda menyetel opsi cache menjadi LRU, Anda dapat mengonfigurasi opsi cacheTTLMs. Secara default, entri cache tidak kedaluwarsa.

    cacheEmpty

    Menentukan apakah akan menyimpan cache kueri JOIN yang nilai balikannya kosong.

    Boolean

    Tidak

    true

    Catatan

    Konektor Lindorm mendukung join pencarian satu-ke-banyak. Perhatikan dengan cermat strategi caching dan throughput untuk performa optimal.

    async

    Menentukan apakah akan mengaktifkan sinkronisasi data dalam mode asinkron.

    Boolean

    Tidak

    false

    Nilai valid:

    • true

    • false (default)

    asyncLindormRpcTimeoutMs

    Periode timeout ketika data diminta dalam mode asinkron.

    Integer

    Tidak

    300000

    Satuan: milidetik.

Tabel dinamis

Fitur tabel dinamis cocok untuk skenario di mana tidak ada kolom yang ditentukan dalam tabel dan kolom dibuat serta disisipkan ke dalam tabel berdasarkan status penyebaran. Sebagai contoh, Anda menggunakan hari sebagai kunci utama dan jam sebagai kolom untuk menghitung volume transaksi per jam setiap hari. Data untuk setiap jam dihasilkan secara dinamis. Tabel berikut menunjukkan tabel dinamis.

Kunci utama

Nama kolom: 00:00

Nama kolom: 01:00

2025-06-01

45

32

2025-06-02

76

34

Tabel dinamis harus mematuhi aturan DDL berikut: Beberapa kolom pertama didefinisikan sebagai kunci utama. Nilai kolom pertama di dua kolom terakhir digunakan sebagai nama kolom, nilai kolom terakhir digunakan sebagai nilai dari kolom sebelumnya, dan tipe data dari dua kolom terakhir harus VARCHAR. Contoh kode:

CREATE TABLE lindorm_dynamic_output(
pk1 varchar,
pk2 varchar,
pk3 varchar,
c1 varchar,
c2 varchar,
PRIMARY KEY (pk1,pk2,pk3) NOT ENFORCED
) WITH (
'connector' = 'lindorm',
'seedserver' = '<yourSeedServer>',
'namespace' = '<yourNamespace>',
'username' = '<yourUsername>',
'password' = '<yourPassword>',
'tableName' = '<yourTableName>',
'columnFamily' = '<yourColumnFamily>'
);

Dalam contoh sebelumnya, pk1, pk2, dan pk3 digunakan sebagai kunci utama. c1 dan c2 adalah dua kolom yang diperlukan untuk tabel dinamis dan harus menjadi dua kolom terakhir. Kecuali c1 dan c2, kolom yang tidak digunakan sebagai kunci utama tidak diperbolehkan. Setiap kali data ditulis ke tabel sink Lindorm, kolom ditambahkan atau dimodifikasi dalam catatan data yang sesuai dengan nilai <pk1, pk2, pk3> dari kunci utama. Nilai c1 digunakan sebagai nama kolom, dan nilai c2 digunakan sebagai nilai kolom. Setiap kali catatan data diterima, hanya nilai dalam satu kolom yang ditambahkan atau diubah. Nilai kolom lainnya tetap tidak berubah.

Pemetaan tipe data

Semua data di Lindorm berada dalam format biner. Tabel berikut menunjukkan cara mengonversi data menjadi byte data biner atau mengurai byte data biner berdasarkan tipe data bidang di Realtime Compute for Apache Flink.

Tipe data SQL Flink

Metode untuk mengonversi data menjadi byte untuk Lindorm

Metode untuk mengurai byte dari Lindorm

CHAR

org.apache.flink.table.data.StringData::toBytes

org.apache.flink.table.data.StringData::fromBytes

VARCHAR

BOOLEAN

com.alibaba.lindorm.client.core.utils.Bytes::toBytes(boolean)

com.alibaba.lindorm.client.core.utils.Bytes::toBigDecimal

BINARY

Konversi langsung data menjadi byte.

Kembalikan byte secara langsung.

VARBINARY

DECIMAL

com.alibaba.lindorm.client.core.utils.Bytes::toBytes(BigDecimal)

com.alibaba.lindorm.client.core.utils.Bytes::toBigDecimal

TINYINT

Enkapsulasi langsung data ke byte pertama dari byte[].

Kembalikan bytes[0] secara langsung.

SMALLINT

com.alibaba.lindorm.client.core.utils.Bytes::toBytes(short)

com.alibaba.lindorm.client.core.utils.Bytes::toShort

INT

com.alibaba.lindorm.client.core.utils.Bytes::toBytes(int)

com.alibaba.lindorm.client.core.utils.Bytes::toInt

BIGINT

com.alibaba.lindorm.client.core.utils.Bytes::toBytes(long)

com.alibaba.lindorm.client.core.utils.Bytes::toLong

FLOAT

com.alibaba.lindorm.client.core.utils.Bytes::toBytes(float)

com.alibaba.lindorm.client.core.utils.Bytes::toFloat

DOUBLE

com.alibaba.lindorm.client.core.utils.Bytes::toBytes(double)

com.alibaba.lindorm.client.core.utils.Bytes::toDouble

DATE

Panggil com.alibaba.lindorm.client.core.utils.Bytes::toBytes(int) setelah jumlah hari sejak 1 Januari 1970 diperoleh.

Panggil com.alibaba.lindorm.client.core.utils.Bytes::toInt untuk mendapatkan jumlah hari sejak 1 Januari 1970.

TIME

Panggil com.alibaba.lindorm.client.core.utils.Bytes::toBytes(int) setelah jumlah milidetik sejak 00:00:00 hari saat ini diperoleh.

Panggil com.alibaba.lindorm.client.core.utils.Bytes::toInt untuk mendapatkan jumlah milidetik sejak 00:00:00 hari saat ini.

TIMESTAMP

Panggil com.alibaba.lindorm.client.core.utils.Bytes::toBytes(long) setelah jumlah milidetik sejak 00:00:00 pada 1 Januari 1970 diperoleh.

Panggil com.alibaba.lindorm.client.core.utils.Bytes::toLong untuk mendapatkan jumlah milidetik sejak 00:00:00 pada 1 Januari 1970.

Contoh kode

CREATE TEMPORARY TABLE example_source(
 id INT,
 proc_time AS PROCTIME()
) WITH (
 'connector' = 'datagen',
 'number-of-rows' = '10',
 'fields.id.kind' = 'sequence',
 'fields.id.start' = '0',
 'fields.id.end' = '9'
);

CREATE TEMPORARY TABLE lindorm_hbase_dim(
 `id` INT,
 `name` VARCHAR,
 `birth` VARCHAR,
 PRIMARY KEY (id) NOT ENFORCED
) WITH (
 'connector'='lindorm',
 'tablename'='${lindorm_dim_table}',
 'seedserver'='${lindorm_seed_server}',
 'namespace'='default',
 'username'='${lindorm_username}',
 'password'='${lindorm_username}'
);

CREATE TEMPORARY TABLE lindorm_hbase_sink(
 `id` INT,
 `name` VARCHAR,
 `birth` VARCHAR,
 PRIMARY KEY (id) NOT ENFORCED
) WITH (
 'connector'='lindorm',
 'tablename'='${lindorm_sink_table}',
 'seedserver'='${lindorm_seed_server}',
 'namespace'='default',
 'username'='${lindorm_username}',
 'password'='${lindorm_username}'
);

INSERT INTO lindorm_hbase_sink
SELECT example_source.id as id, lindorm_hbase_dim.name as name, lindorm_hbase_dim.birth as birth
FROM example_source JOIN lindorm_hbase_dim FOR SYSTEM_TIME AS OF PROCTIME() ON example_source.id = lindorm_hbase_dim.id;

FAQ

Kesalahan Koneksi Lindorm dan Solusinya