Topik ini menjelaskan cara menggunakan Konektor Tablestore.
Informasi latar belakang
Tablestore adalah layanan penyimpanan serverless berbasis tabel dengan biaya rendah yang dioptimalkan untuk menyimpan sejumlah besar data terstruktur. Tablestore memungkinkan Anda menanyakan dan mengambil data daring dalam hitungan milidetik serta menganalisis data yang tersimpan dalam beberapa dimensi. Tablestore cocok untuk berbagai skenario seperti tagihan dalam jumlah besar, pesan instan (IM), IoT, Internet of Vehicles (IoV), manajemen risiko, dan rekomendasi cerdas. Tablestore juga menyediakan solusi penyimpanan ujung ke ujung yang dioptimalkan secara mendalam untuk aplikasi IoT. Untuk informasi lebih lanjut, lihat Apa itu Tablestore?
Tabel berikut menjelaskan kemampuan yang didukung oleh Konektor Tablestore.
Item | Deskripsi |
Mode operasi | Mode streaming |
Jenis API | SQL API |
Jenis tabel | Tabel sumber, tabel dimensi, dan sink table |
Format data | Tidak tersedia |
Metrik |
Catatan Untuk informasi lebih lanjut tentang data deret waktu, lihat Data Deret Waktu. |
Pembaruan atau penghapusan data dalam sink table | Didukung |
Prasyarat
Sebuah instance Tablestore telah dibeli dan sebuah tabel Tablestore telah dibuat. Untuk informasi lebih lanjut, lihat Gunakan Tablestore.
Sintaksis
Pernyataan untuk membuat sink table
CREATE TABLE ots_sink ( name VARCHAR, age BIGINT, birthday BIGINT, primary key(name,age) not enforced ) WITH ( 'connector'='ots', 'instanceName'='<yourInstanceName>', 'tableName'='<yourTableName>', 'accessId'='${ak_id}', 'accessKey'='${ak_secret}', 'endPoint'='<yourEndpoint>', 'valueColumns'='birthday' );CatatanAnda harus menentukan primary key untuk sink table Tablestore. Data keluaran terbaru ditambahkan ke sink table Tablestore untuk memperbarui data tabel.
Pernyataan untuk membuat tabel dimensi
CREATE TABLE ots_dim ( id int, len int, content STRING ) WITH ( 'connector'='ots', 'endPoint'='<yourEndpoint>', 'instanceName'='<yourInstanceName>', 'tableName'='<yourTableName>', 'accessId'='${ak_id}', 'accessKey'='${ak_secret}' );Pernyataan untuk membuat tabel sumber
CREATE TABLE tablestore_stream( `order` VARCHAR, orderid VARCHAR, customerid VARCHAR, customername VARCHAR ) WITH ( 'connector'='ots', 'endPoint' ='<yourEndpoint>', 'instanceName' = 'flink-source', 'tableName' ='flink_source_table', 'tunnelName' = 'flinksourcestream', 'accessId' ='${ak_id}', 'accessKey' ='${ak_secret}', 'ignoreDelete' = 'false' );Bidang yang datanya perlu dikonsumsi dan bidang
OtsRecordTypedanOtsRecordTimestampdalam data yang dikembalikan oleh Layanan Tunnel dapat dibaca dan ditulis sebagai kolom atribut. Tabel berikut menjelaskan bidang tersebut.Bidang
Kolom pemetaan di Realtime Compute for Apache Flink
Deskripsi
OtsRecordType
type
Jenis operasi data.
OtsRecordTimestamp
timestamp
Waktu operasi data. Satuan: mikrodetik.
CatatanJika membaca data penuh, nilai parameter OtsRecordTimestamp diatur ke 0.
Jika Anda ingin membaca bidang
OtsRecordTypedanOtsRecordTimestamp, Anda dapat menggunakan kata kunci METADATA yang disediakan oleh Realtime Compute for Apache Flink untuk mendapatkan bidang atribut dari tabel sumber Tablestore. Contoh berikut menunjukkan pernyataan DDL.CREATE TABLE tablestore_stream( `order` VARCHAR, orderid VARCHAR, customerid VARCHAR, customername VARCHAR, record_type STRING METADATA FROM 'type', record_timestamp BIGINT METADATA FROM 'timestamp' ) WITH ( ... );
Opsi konektor dalam klausa WITH
Umum
Opsi
Deskripsi
Tipe data
Diperlukan?
Nilai default
Catatan
connector
Jenis tabel.
String
Ya
Tidak ada nilai default
Atur nilainya menjadi
ots.instanceName
Nama instance Tablestore.
String
Ya
Tidak ada nilai default
endPoint
Titik akhir instance Tablestore.
String
Ya
Tidak ada nilai default
Untuk informasi lebih lanjut, lihat Titik Akhir.
tableName
Nama tabel
String
Ya
Tidak ada nilai default
accessId
ID AccessKey akun Alibaba Cloud Anda atau Pengguna Resource Access Management (RAM).
String
Ya
Tidak ada nilai default
Lihat Bagaimana cara melihat pasangan AccessKey akun?
PentingUntuk melindungi pasangan AccessKey Anda, gunakan variabel daripada mengkodekan langsung pasangan AccessKey Anda.
accessKey
Rahasia AccessKey akun Alibaba Cloud Anda atau Pengguna RAM.
String
Ya
Tidak ada nilai default
connectTimeout
Periode timeout untuk konektor Tablestore menghubungkan ke Tablestore.
Integer
Tidak
30000
Satuan: milidetik.
socketTimeout
Periode timeout soket untuk konektor Tablestore menghubungkan ke Tablestore.
Integer
Tidak
30000
Satuan: milidetik.
ioThreadCount
Jumlah thread I/O.
Integer
Tidak
4
callbackThreadPoolSize
Ukuran pool thread callback.
Bilangan Bulat
Tidak
4
Spesifik Sumber
Opsi
Deskripsi
Tipe data
Diperlukan?
Nilai default
Catatan
tunnelName
Nama tunnel tabel sumber Tablestore.
String
Ya
Tidak ada nilai default
Anda harus membuat tunnel di konsol Tablestore terlebih dahulu. Saat membuat tunnel, tentukan nama tunnel dan jenis tunnel. Jenis tunnel bisa Incremental, Full, atau Differential. Untuk informasi lebih lanjut tentang cara membuat tunnel, lihat bagian "Buat tunnel" pada topik Memulai Cepat.
ignoreDelete
Menentukan apakah akan mengabaikan operasi penghapusan.
Boolean
Tidak
false
Nilai valid:
true: Operasi penghapusan diabaikan.
false (default): Operasi penghapusan tidak diabaikan.
skipInvalidData
Menentukan apakah mengabaikan data kotor. Jika data kotor tidak diabaikan, kesalahan akan dilaporkan saat sistem memproses data kotor.
Boolean
Tidak
false
Nilai valid:
true: Data kotor diabaikan.
false (default): Data kotor tidak diabaikan.
CatatanHanya VVR 8.0.4 atau lebih baru yang mendukung opsi ini.
strategiPengulangan
Kebijakan pengulangan.
Enum
Tidak
WAKTU
Nilai yang valid:
TIME: Sistem terus mencoba hingga periode timeout yang ditentukan oleh parameter retryTimeoutMs berakhir.
COUNT: Sistem terus mencoba hingga jumlah maksimum percobaan ulang yang ditentukan oleh parameter retryCount tercapai.
jumlahPercobaanUlang
Jumlah maksimum percobaan ulang.
Bilangan Bulat
Tidak
3
Jika Anda mengatur parameter retryStrategy ke COUNT, Anda dapat menentukan parameter ini.
retryTimeoutMs
Periode batas waktu untuk percobaan ulang.
Bilangan Bulat
Tidak
180000
Jika Anda menyetel parameter retryStrategy ke TIME, Anda dapat menentukan parameter ini. Satuan: milidetik.
streamOriginColumnMapping
Pemetaan antara nama kolom asli dan nama kolom nyata terkait.
String
Tidak
Tidak ada nilai default
Pisahkan nama kolom asli dan nama kolom nyata terkait dengan titik dua (:). Pisahkan beberapa pemetaan dengan koma (,). Contoh:
origin_col1:col1,origin_col2:col2.outputSpecificRowType
Menentukan apakah akan melewati tipe baris tertentu.
Boolean
Tidak
false
Nilai yang valid:
false: tidak melewati tipe baris tertentu. Semua data adalah tipe INSERT.
true: melewati tipe baris tertentu. Data dapat berupa tipe INSERT, DELETE, atau UPDATE_AFTER.
dataFetchTimeoutMs
Durasi maksimum untuk mengambil data dari partisi.
Integer
Tidak
10000
Satuan: milidetik.
Saat menyinkronkan banyak partisi dengan persyaratan latensi rendah, kurangi nilai opsi ini untuk mengurangi latensi sinkronisasi keseluruhan.
CatatanOpsi ini didukung di VVR 8.0.10 atau lebih baru.
enableRequestCompression
Menentukan apakah akan mengaktifkan kompresi data.
Boolean
Tidak
false
Mengaktifkan opsi ini dapat menghemat bandwidth tetapi meningkatkan beban CPU.
CatatanOpsi ini didukung di VVR 8.0.10 atau lebih baru.
Spesifik Sink
Opsi
Deskripsi
Tipe data
Diperlukan?
Nilai default
Catatan
retryIntervalMs
Interval percobaan ulang.
Integer
Tidak
1000
Satuan: milidetik.
maxRetryTimes
Jumlah maksimum percobaan ulang.
Integer
Tidak
10
valueColumns
Nama kolom yang ingin Anda sisipkan.
String
Ya
Tidak ada nilai default
Pisahkan beberapa bidang, seperti bidang ID atau NAMA, dengan koma (,).
bufferSize
Jumlah maksimum catatan data yang dapat disimpan dalam buffer sebelum data ditulis ke sink table.
Integer
Tidak
5000
batchWriteTimeoutMs
Periode timeout penulisan.
Integer
Tidak
5000
Satuan: milidetik. Jika jumlah catatan data yang di-cache tidak mencapai batas atas dalam periode waktu yang ditentukan oleh parameter batchWriteTimeoutMs, semua data yang di-cache ditulis ke sink table.
batchSize
Jumlah catatan data yang dapat ditulis sekaligus.
Integer
Tidak
100
Nilai maksimum: 200.
ignoreDelete
Menentukan apakah mengabaikan operasi penghapusan.
Boolean
Tidak
False
Tidak tersedia.
autoIncrementKey
Nama kolom primary key auto-increment. Jika sink table berisi kolom primary key auto-increment, Anda dapat mengonfigurasi parameter ini untuk menentukan nama kolom primary key auto-increment.
String
Tidak
Tidak ada nilai default
Jika sink table tidak memiliki kolom primary key auto-increment, Anda tidak perlu mengonfigurasi parameter ini.
CatatanHanya Realtime Compute for Apache Flink yang menggunakan VVR 8.0.4 atau versi lebih baru yang mendukung parameter ini.
modeTimpa
Mode penimpaan data.
Enum
Tidak
PUT
Nilai yang valid:
PUT: Data ditulis ke tabel Tablestore dalam mode PUT.
UPDATE: Data ditulis ke tabel Tablestore dalam mode UPDATE.
CatatanHanya mode UPDATE yang didukung dalam mode kolom dinamis.
defaultTimestampInMillisecond
Timestamp default yang digunakan untuk menulis data ke tabel Tablestore.
Panjang
Tidak
-1
Jika Anda tidak menentukan parameter ini, cap waktu dari waktu sistem saat ini akan digunakan.
dynamicColumnSink
Menentukan apakah akan mengaktifkan mode kolom dinamis.
Boolean
Tidak
false
Mode kolom dinamis cocok untuk skenario di mana tidak ada kolom yang ditentukan dalam tabel dan kolom dimasukkan ke dalam tabel berdasarkan status penyebaran. Beberapa kolom pertama didefinisikan sebagai primary key dalam pernyataan pembuatan tabel. Nilai dari kolom pertama di dua kolom terakhir digunakan sebagai nama kolom, nilai dari kolom terakhir digunakan sebagai nilai dari kolom sebelumnya, dan tipe data dari dua kolom terakhir harus berupa STRING.
CatatanJika Anda mengaktifkan mode kolom dinamis, kolom primary key auto-increment tidak didukung dan Anda harus menetapkan parameter overwriteMode ke UPDATE.
checkSinkTableMeta
Menentukan apakah akan memeriksa metadata tabel sink.
Boolean
Tidak
benar
Jika Anda menetapkan parameter ini ke true, sistem akan memeriksa apakah kolom kunci utama tabel Tablestore sama dengan kunci utama yang ditentukan dalam pernyataan pembuatan tabel.
enableRequestCompression
Menentukan apakah akan mengaktifkan kompresi data selama penulisan data.
Boolean
Tidak
false
maxColumnsCount
Jumlah maksimum kolom yang ditulis ke tabel hilir.
Integer
Tidak
128
Jika opsi ini diatur ke nilai lebih dari 128, kesalahan
Jumlah kolom atribut melebihi maksimumakan terjadi. Untuk menyelesaikan ini, sesuaikan nilai opsi tersebut.CatatanOpsi ini didukung oleh 8.0.10 atau lebih baru.
storageType
Tipe tabel sink.
String
Tidak
WIDE_COLUMNNilai valid:
WIDE_COLUMN: Tabel sink adalah tabel lebar.TIMESERIES: Tabel sink adalah tabel deret waktu.
Spesifik Tabel Dimensi
Opsi
Deskripsi
Tipe data
Diperlukan?
Nilai default
Catatan
retryIntervalMs
Interval percobaan ulang.
Integer
Tidak
1000
Satuan: milidetik.
maxRetryTimes
Jumlah maksimum percobaan ulang.
Integer
Tidak
10
Tidak tersedia.
cache
Kebijakan cache.
String
Tidak
ALL
Nilai valid:
None: Tidak ada data yang di-cache.
LRU: Hanya data tertentu dalam tabel dimensi yang di-cache. Setiap kali sistem menerima catatan data, sistem mencari cache. Jika sistem tidak menemukan catatan dalam cache, sistem mencari catatan data dalam tabel dimensi fisik.
Jika kebijakan cache ini digunakan, Anda harus mengonfigurasi parameter cacheSize dan cacheTTLMs.
ALL (default): Semua data dalam tabel dimensi di-cache. Sebelum pekerjaan berjalan, sistem memuat semua data dalam tabel dimensi ke cache. Dengan cara ini, cache dicari untuk semua kueri berikutnya dalam tabel dimensi. Jika tidak ada kunci yang ada, sistem tidak dapat menemukan catatan data dalam cache. Sistem memuat ulang semua data dalam cache setelah entri cache kedaluwarsa.
Jika jumlah data dalam tabel jarak jauh kecil dan sejumlah besar kunci yang hilang ada, kami sarankan Anda mengatur parameter ini ke ALL. Tabel sumber dan tabel dimensi tidak dapat dikaitkan berdasarkan klausa ON. Jika Anda menggunakan kebijakan cache ini, Anda harus mengonfigurasi parameter cacheTTLMs dan cacheReloadTimeBlackList.
CatatanJika Anda mengatur parameter cache ke ALL, Anda harus meningkatkan memori node untuk bergabung dengan tabel karena sistem secara asinkron memuat data dari tabel dimensi. Ukuran memori yang ditingkatkan adalah dua kali ukuran tabel jarak jauh.
cacheSize
Jumlah maksimum catatan data yang dapat di-cache.
Integer
Tidak
Tidak ada nilai default
Jika Anda mengatur parameter cache ke LRU, Anda dapat menentukan parameter ini.
CatatanNilai parameter ini adalah jumlah maksimum catatan data yang dapat di-cache.
cacheTTLMs
Periode timeout cache.
Integer
Tidak
Tidak ada nilai default
Satuan: milidetik. Konfigurasi parameter cacheTTLMs bervariasi berdasarkan nilai parameter cache.
Jika Anda mengatur parameter cache ke None, parameter cacheTTLMs dapat dibiarkan kosong. Ini menunjukkan bahwa entri cache tidak kedaluwarsa.
Jika Anda mengatur parameter cache ke LRU, parameter cacheTTLMs menentukan periode timeout cache. Secara default, entri cache tidak kedaluwarsa.
Jika Anda mengatur parameter cache ke ALL, parameter cacheTTLMs menentukan interval di mana sistem menyegarkan cache. Secara default, cache tidak dimuat ulang.
cacheEmpty
Menentukan apakah mencache hasil kosong.
Boolean
Tidak
Tidak ada nilai default
true: Hasil kosong di-cache.
false: Hasil kosong tidak di-cache.
cacheReloadTimeBlackList
Periode waktu selama cache tidak disegarkan. Parameter ini berlaku ketika parameter cache diatur ke ALL. Cache tidak disegarkan selama periode waktu yang Anda tentukan untuk parameter ini. Parameter ini cocok untuk acara promosi online berskala besar seperti Double 11.
String
Tidak
Tidak ada nilai default
Contoh berikut menunjukkan format nilai: 2017-10-24 14:00 -> 2017-10-24 15:00, 2017-11-10 23:30 -> 2017-11-11 08:00. Gunakan pemisah berdasarkan aturan berikut:
Pisahkan beberapa periode waktu dengan koma (,).
Pisahkan waktu mulai dan waktu akhir setiap periode waktu dengan panah (->) yang merupakan kombinasi tanda hubung (-) dan tanda kurung penutup (>).
async
Menentukan apakah mengaktifkan sinkronisasi data dalam mode asinkron.
Boolean
Tidak
false
true: Sinkronisasi data dalam mode asinkron diaktifkan. Secara default, data tidak diurutkan saat data disinkronkan dalam mode asinkron.
false (default): Sinkronisasi data dalam mode asinkron dinonaktifkan.
Pemetaan tipe data
Tabel Sumber
Tipe data kolom di Tablestore
Tipe data kolom di Realtime Compute for Apache Flink
INTEGER
BIGINT
STRING
STRING
BOOLEAN
BOOLEAN
DOUBLE
DOUBLE
BINARY
BINARY
Tabel Sink
Tipe data kolom di Realtime Compute for Apache Flink
Tipe data kolom di Tablestore
BINARY
BINARY
VARBINARY
CHAR
STRING
VARCHAR
TINYINT
INTEGER
SMALLINT
INTEGER
BIGINT
FLOAT
DOUBLE
DOUBLE
BOOLEAN
BOOLEAN
Contoh
Contoh 1
Membaca data dari Tablestore dan menulis ke Tablestore:
CREATE TEMPORARY TABLE tablestore_stream(
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR
) WITH
'connector'='ots',
'endPoint' ='<yourEndpoint>',
'instanceName' = 'flink-source',
'tableName' ='flink_source_table',
'tunnelName' = 'flinksourcestream',
'accessId' ='${ak_id}',
'accessKey' ='${ak_secret}',
'ignoreDelete' = 'false',
'skipInvalidData' ='false'
);
CREATE TEMPORARY TABLE ots_sink (
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR,
PRIMARY KEY (`order`,orderid) NOT ENFORCED
) WITH (
'connector'='ots',
'endPoint'='<yourEndpoint>',
'instanceName'='flink-sink',
'tableName'='flink_sink_table',
'accessId'='${ak_id}',
'accessKey'='${ak_secret}',
'valueColumns'='customerid,customername',
'autoIncrementKey'='${auto_increment_primary_key_name}'
);
INSERT INTO ots_sink
SELECT `order`, orderid, customerid, customername FROM tablestore_stream;Contoh 2
Sinkronisasi data dari tabel lebar ke tabel deret waktu.
CREATE TEMPORARY TABLE timeseries_source (
measurement STRING,
datasource STRING,
tag_a STRING,
`time` BIGINT,
binary_value BINARY,
bool_value BOOLEAN,
double_value DOUBLE,
long_value BIGINT,
string_value STRING,
tag_b STRING,
tag_c STRING,
tag_d STRING,
tag_e STRING,
tag_f STRING
)
WITH (
'connector' = 'ots',
'endPoint' = 'https://iotstore-test.cn-hangzhou.vpc.tablestore.aliyuncs.com',
'instanceName' = 'iotstore-test',
'tableName' = 'test_ots_timeseries_2',
'tunnelName' = 'timeseries_source_tunnel_2',
'accessId' = '${ak_id}',
'accessKey' = '${ak_secret}',
'ignoreDelete' = 'true', -- Abaikan penghapusan
);
CREATE TEMPORARY TABLE timeseries_sink (
measurement STRING,
datasource STRING,
tags Map<String, String>,
`time` BIGINT,
binary_value BINARY,
bool_value BOOLEAN,
double_value DOUBLE,
long_value BIGINT,
string_value STRING,
tag_b STRING,
tag_c STRING,
tag_d STRING,
tag_e STRING,
tag_f STRING,
PRIMARY KEY(measurement, datasource, tags, `time`) NOT ENFORCED
)
WITH (
'connector' = 'ots',
'endPoint' = 'https://iotstore-test.cn-hangzhou.vpc.tablestore.aliyuncs.com',
'instanceName' = 'iotstore-test',
'tableName' = 'test_timeseries_sink_table_2',
'accessId' = '${ak_id}',
'accessKey' = '${ak_secret}',
'storageType' = 'TIMESERIES',
);
-- Masukkan data dari tabel sumber ke tabel sink
INSERT INTO timeseries_sink
select
m_name,
data_source,
MAP["tag_a":tag_a,"tag_b":tag_b,"tag_c":tag_c,"tag_d":tag_d,"tag_e":tag_e,"tag_f":tag_f] AS tags,
`time`,
cpu_sys,
cpu_user,
disk_0,
disk_1,
disk_2,
memory_used,
net_in,
net_out
from
timeseries_source;