全部产品
Search
文档中心

Realtime Compute for Apache Flink:Konektor Tablestore

更新时间:Jul 06, 2025

Topik ini menjelaskan cara menggunakan Konektor Tablestore.

Informasi latar belakang

Tablestore adalah layanan penyimpanan serverless berbasis tabel dengan biaya rendah yang dioptimalkan untuk menyimpan sejumlah besar data terstruktur. Tablestore memungkinkan Anda menanyakan dan mengambil data daring dalam hitungan milidetik serta menganalisis data yang tersimpan dalam beberapa dimensi. Tablestore cocok untuk berbagai skenario seperti tagihan dalam jumlah besar, pesan instan (IM), IoT, Internet of Vehicles (IoV), manajemen risiko, dan rekomendasi cerdas. Tablestore juga menyediakan solusi penyimpanan ujung ke ujung yang dioptimalkan secara mendalam untuk aplikasi IoT. Untuk informasi lebih lanjut, lihat Apa itu Tablestore?

Tabel berikut menjelaskan kemampuan yang didukung oleh Konektor Tablestore.

Item

Deskripsi

Mode operasi

Mode streaming

Jenis API

SQL API

Jenis tabel

Tabel sumber, tabel dimensi, dan sink table

Format data

Tidak tersedia

Metrik

  • Data deret waktu untuk tabel sumber: tidak ada

  • Data deret waktu untuk tabel dimensi: tidak ada

  • Data deret waktu untuk sink table:

    • numBytesOut

    • numBytesOutPerSecond

    • numRecordsOut

    • numRecordsOutPerSecond

    • currentSendTime

Catatan

Untuk informasi lebih lanjut tentang data deret waktu, lihat Data Deret Waktu.

Pembaruan atau penghapusan data dalam sink table

Didukung

Prasyarat

Sebuah instance Tablestore telah dibeli dan sebuah tabel Tablestore telah dibuat. Untuk informasi lebih lanjut, lihat Gunakan Tablestore.

Sintaksis

  • Pernyataan untuk membuat sink table

    CREATE TABLE ots_sink (
      name VARCHAR,
      age BIGINT,
      birthday BIGINT,
      primary key(name,age) not enforced
    ) WITH (
      'connector'='ots',
      'instanceName'='<yourInstanceName>',
      'tableName'='<yourTableName>',
      'accessId'='${ak_id}',
      'accessKey'='${ak_secret}',
      'endPoint'='<yourEndpoint>',
      'valueColumns'='birthday'
    );
    Catatan

    Anda harus menentukan primary key untuk sink table Tablestore. Data keluaran terbaru ditambahkan ke sink table Tablestore untuk memperbarui data tabel.

  • Pernyataan untuk membuat tabel dimensi

    CREATE TABLE ots_dim (
      id int,
      len int,
      content STRING
    ) WITH (
      'connector'='ots',
      'endPoint'='<yourEndpoint>',
      'instanceName'='<yourInstanceName>',
      'tableName'='<yourTableName>',
      'accessId'='${ak_id}',
      'accessKey'='${ak_secret}'
    );
  • Pernyataan untuk membuat tabel sumber

    CREATE TABLE tablestore_stream(
      `order` VARCHAR,
      orderid VARCHAR,
      customerid VARCHAR,
      customername VARCHAR
    ) WITH (
      'connector'='ots',
      'endPoint' ='<yourEndpoint>',
      'instanceName' = 'flink-source',
      'tableName' ='flink_source_table',
      'tunnelName' = 'flinksourcestream',
      'accessId' ='${ak_id}',
      'accessKey' ='${ak_secret}',
      'ignoreDelete' = 'false'
    );

    Bidang yang datanya perlu dikonsumsi dan bidang OtsRecordType dan OtsRecordTimestamp dalam data yang dikembalikan oleh Layanan Tunnel dapat dibaca dan ditulis sebagai kolom atribut. Tabel berikut menjelaskan bidang tersebut.

    Bidang

    Kolom pemetaan di Realtime Compute for Apache Flink

    Deskripsi

    OtsRecordType

    type

    Jenis operasi data.

    OtsRecordTimestamp

    timestamp

    Waktu operasi data. Satuan: mikrodetik.

    Catatan

    Jika membaca data penuh, nilai parameter OtsRecordTimestamp diatur ke 0.

    Jika Anda ingin membaca bidang OtsRecordType dan OtsRecordTimestamp, Anda dapat menggunakan kata kunci METADATA yang disediakan oleh Realtime Compute for Apache Flink untuk mendapatkan bidang atribut dari tabel sumber Tablestore. Contoh berikut menunjukkan pernyataan DDL.

    CREATE TABLE tablestore_stream(
      `order` VARCHAR,
      orderid VARCHAR,
      customerid VARCHAR,
      customername VARCHAR,
      record_type STRING METADATA FROM 'type',
      record_timestamp BIGINT METADATA FROM 'timestamp'
    ) WITH (
      ...
    );

Opsi konektor dalam klausa WITH

  • Umum

    Opsi

    Deskripsi

    Tipe data

    Diperlukan?

    Nilai default

    Catatan

    connector

    Jenis tabel.

    String

    Ya

    Tidak ada nilai default

    Atur nilainya menjadi ots.

    instanceName

    Nama instance Tablestore.

    String

    Ya

    Tidak ada nilai default

    endPoint

    Titik akhir instance Tablestore.

    String

    Ya

    Tidak ada nilai default

    Untuk informasi lebih lanjut, lihat Titik Akhir.

    tableName

    Nama tabel

    String

    Ya

    Tidak ada nilai default

    accessId

    ID AccessKey akun Alibaba Cloud Anda atau Pengguna Resource Access Management (RAM).

    String

    Ya

    Tidak ada nilai default

    Lihat Bagaimana cara melihat pasangan AccessKey akun?

    Penting

    Untuk melindungi pasangan AccessKey Anda, gunakan variabel daripada mengkodekan langsung pasangan AccessKey Anda.

    accessKey

    Rahasia AccessKey akun Alibaba Cloud Anda atau Pengguna RAM.

    String

    Ya

    Tidak ada nilai default

    connectTimeout

    Periode timeout untuk konektor Tablestore menghubungkan ke Tablestore.

    Integer

    Tidak

    30000

    Satuan: milidetik.

    socketTimeout

    Periode timeout soket untuk konektor Tablestore menghubungkan ke Tablestore.

    Integer

    Tidak

    30000

    Satuan: milidetik.

    ioThreadCount

    Jumlah thread I/O.

    Integer

    Tidak

    4

    callbackThreadPoolSize

    Ukuran pool thread callback.

    Bilangan Bulat

    Tidak

    4

  • Spesifik Sumber

    Opsi

    Deskripsi

    Tipe data

    Diperlukan?

    Nilai default

    Catatan

    tunnelName

    Nama tunnel tabel sumber Tablestore.

    String

    Ya

    Tidak ada nilai default

    Anda harus membuat tunnel di konsol Tablestore terlebih dahulu. Saat membuat tunnel, tentukan nama tunnel dan jenis tunnel. Jenis tunnel bisa Incremental, Full, atau Differential. Untuk informasi lebih lanjut tentang cara membuat tunnel, lihat bagian "Buat tunnel" pada topik Memulai Cepat.

    ignoreDelete

    Menentukan apakah akan mengabaikan operasi penghapusan.

    Boolean

    Tidak

    false

    Nilai valid:

    • true: Operasi penghapusan diabaikan.

    • false (default): Operasi penghapusan tidak diabaikan.

    skipInvalidData

    Menentukan apakah mengabaikan data kotor. Jika data kotor tidak diabaikan, kesalahan akan dilaporkan saat sistem memproses data kotor.

    Boolean

    Tidak

    false

    Nilai valid:

    • true: Data kotor diabaikan.

    • false (default): Data kotor tidak diabaikan.

    Catatan

    Hanya VVR 8.0.4 atau lebih baru yang mendukung opsi ini.

    strategiPengulangan

    Kebijakan pengulangan.

    Enum

    Tidak

    WAKTU

    Nilai yang valid:

    • TIME: Sistem terus mencoba hingga periode timeout yang ditentukan oleh parameter retryTimeoutMs berakhir.

    • COUNT: Sistem terus mencoba hingga jumlah maksimum percobaan ulang yang ditentukan oleh parameter retryCount tercapai.

    jumlahPercobaanUlang

    Jumlah maksimum percobaan ulang.

    Bilangan Bulat

    Tidak

    3

    Jika Anda mengatur parameter retryStrategy ke COUNT, Anda dapat menentukan parameter ini.

    retryTimeoutMs

    Periode batas waktu untuk percobaan ulang.

    Bilangan Bulat

    Tidak

    180000

    Jika Anda menyetel parameter retryStrategy ke TIME, Anda dapat menentukan parameter ini. Satuan: milidetik.

    streamOriginColumnMapping

    Pemetaan antara nama kolom asli dan nama kolom nyata terkait.

    String

    Tidak

    Tidak ada nilai default

    Pisahkan nama kolom asli dan nama kolom nyata terkait dengan titik dua (:). Pisahkan beberapa pemetaan dengan koma (,). Contoh: origin_col1:col1,origin_col2:col2.

    outputSpecificRowType

    Menentukan apakah akan melewati tipe baris tertentu.

    Boolean

    Tidak

    false

    Nilai yang valid:

    • false: tidak melewati tipe baris tertentu. Semua data adalah tipe INSERT.

    • true: melewati tipe baris tertentu. Data dapat berupa tipe INSERT, DELETE, atau UPDATE_AFTER.

    dataFetchTimeoutMs

    Durasi maksimum untuk mengambil data dari partisi.

    Integer

    Tidak

    10000

    Satuan: milidetik.

    Saat menyinkronkan banyak partisi dengan persyaratan latensi rendah, kurangi nilai opsi ini untuk mengurangi latensi sinkronisasi keseluruhan.

    Catatan

    Opsi ini didukung di VVR 8.0.10 atau lebih baru.

    enableRequestCompression

    Menentukan apakah akan mengaktifkan kompresi data.

    Boolean

    Tidak

    false

    Mengaktifkan opsi ini dapat menghemat bandwidth tetapi meningkatkan beban CPU.

    Catatan

    Opsi ini didukung di VVR 8.0.10 atau lebih baru.

  • Spesifik Sink

    Opsi

    Deskripsi

    Tipe data

    Diperlukan?

    Nilai default

    Catatan

    retryIntervalMs

    Interval percobaan ulang.

    Integer

    Tidak

    1000

    Satuan: milidetik.

    maxRetryTimes

    Jumlah maksimum percobaan ulang.

    Integer

    Tidak

    10

    valueColumns

    Nama kolom yang ingin Anda sisipkan.

    String

    Ya

    Tidak ada nilai default

    Pisahkan beberapa bidang, seperti bidang ID atau NAMA, dengan koma (,).

    bufferSize

    Jumlah maksimum catatan data yang dapat disimpan dalam buffer sebelum data ditulis ke sink table.

    Integer

    Tidak

    5000

    batchWriteTimeoutMs

    Periode timeout penulisan.

    Integer

    Tidak

    5000

    Satuan: milidetik. Jika jumlah catatan data yang di-cache tidak mencapai batas atas dalam periode waktu yang ditentukan oleh parameter batchWriteTimeoutMs, semua data yang di-cache ditulis ke sink table.

    batchSize

    Jumlah catatan data yang dapat ditulis sekaligus.

    Integer

    Tidak

    100

    Nilai maksimum: 200.

    ignoreDelete

    Menentukan apakah mengabaikan operasi penghapusan.

    Boolean

    Tidak

    False

    Tidak tersedia.

    autoIncrementKey

    Nama kolom primary key auto-increment. Jika sink table berisi kolom primary key auto-increment, Anda dapat mengonfigurasi parameter ini untuk menentukan nama kolom primary key auto-increment.

    String

    Tidak

    Tidak ada nilai default

    Jika sink table tidak memiliki kolom primary key auto-increment, Anda tidak perlu mengonfigurasi parameter ini.

    Catatan

    Hanya Realtime Compute for Apache Flink yang menggunakan VVR 8.0.4 atau versi lebih baru yang mendukung parameter ini.

    modeTimpa

    Mode penimpaan data.

    Enum

    Tidak

    PUT

    Nilai yang valid:

    • PUT: Data ditulis ke tabel Tablestore dalam mode PUT.

    • UPDATE: Data ditulis ke tabel Tablestore dalam mode UPDATE.

    Catatan

    Hanya mode UPDATE yang didukung dalam mode kolom dinamis.

    defaultTimestampInMillisecond

    Timestamp default yang digunakan untuk menulis data ke tabel Tablestore.

    Panjang

    Tidak

    -1

    Jika Anda tidak menentukan parameter ini, cap waktu dari waktu sistem saat ini akan digunakan.

    dynamicColumnSink

    Menentukan apakah akan mengaktifkan mode kolom dinamis.

    Boolean

    Tidak

    false

    Mode kolom dinamis cocok untuk skenario di mana tidak ada kolom yang ditentukan dalam tabel dan kolom dimasukkan ke dalam tabel berdasarkan status penyebaran. Beberapa kolom pertama didefinisikan sebagai primary key dalam pernyataan pembuatan tabel. Nilai dari kolom pertama di dua kolom terakhir digunakan sebagai nama kolom, nilai dari kolom terakhir digunakan sebagai nilai dari kolom sebelumnya, dan tipe data dari dua kolom terakhir harus berupa STRING.

    Catatan

    Jika Anda mengaktifkan mode kolom dinamis, kolom primary key auto-increment tidak didukung dan Anda harus menetapkan parameter overwriteMode ke UPDATE.

    checkSinkTableMeta

    Menentukan apakah akan memeriksa metadata tabel sink.

    Boolean

    Tidak

    benar

    Jika Anda menetapkan parameter ini ke true, sistem akan memeriksa apakah kolom kunci utama tabel Tablestore sama dengan kunci utama yang ditentukan dalam pernyataan pembuatan tabel.

    enableRequestCompression

    Menentukan apakah akan mengaktifkan kompresi data selama penulisan data.

    Boolean

    Tidak

    false

    maxColumnsCount

    Jumlah maksimum kolom yang ditulis ke tabel hilir.

    Integer

    Tidak

    128

    Jika opsi ini diatur ke nilai lebih dari 128, kesalahan Jumlah kolom atribut melebihi maksimum akan terjadi. Untuk menyelesaikan ini, sesuaikan nilai opsi tersebut.

    Catatan

    Opsi ini didukung oleh 8.0.10 atau lebih baru.

    storageType

    Tipe tabel sink.

    String

    Tidak

    WIDE_COLUMN

    Nilai valid:

    • WIDE_COLUMN: Tabel sink adalah tabel lebar.

    • TIMESERIES: Tabel sink adalah tabel deret waktu.

  • Spesifik Tabel Dimensi

    Opsi

    Deskripsi

    Tipe data

    Diperlukan?

    Nilai default

    Catatan

    retryIntervalMs

    Interval percobaan ulang.

    Integer

    Tidak

    1000

    Satuan: milidetik.

    maxRetryTimes

    Jumlah maksimum percobaan ulang.

    Integer

    Tidak

    10

    Tidak tersedia.

    cache

    Kebijakan cache.

    String

    Tidak

    ALL

    Nilai valid:

    • None: Tidak ada data yang di-cache.

    • LRU: Hanya data tertentu dalam tabel dimensi yang di-cache. Setiap kali sistem menerima catatan data, sistem mencari cache. Jika sistem tidak menemukan catatan dalam cache, sistem mencari catatan data dalam tabel dimensi fisik.

      Jika kebijakan cache ini digunakan, Anda harus mengonfigurasi parameter cacheSize dan cacheTTLMs.

    • ALL (default): Semua data dalam tabel dimensi di-cache. Sebelum pekerjaan berjalan, sistem memuat semua data dalam tabel dimensi ke cache. Dengan cara ini, cache dicari untuk semua kueri berikutnya dalam tabel dimensi. Jika tidak ada kunci yang ada, sistem tidak dapat menemukan catatan data dalam cache. Sistem memuat ulang semua data dalam cache setelah entri cache kedaluwarsa.

      Jika jumlah data dalam tabel jarak jauh kecil dan sejumlah besar kunci yang hilang ada, kami sarankan Anda mengatur parameter ini ke ALL. Tabel sumber dan tabel dimensi tidak dapat dikaitkan berdasarkan klausa ON. Jika Anda menggunakan kebijakan cache ini, Anda harus mengonfigurasi parameter cacheTTLMs dan cacheReloadTimeBlackList.

      Catatan

      Jika Anda mengatur parameter cache ke ALL, Anda harus meningkatkan memori node untuk bergabung dengan tabel karena sistem secara asinkron memuat data dari tabel dimensi. Ukuran memori yang ditingkatkan adalah dua kali ukuran tabel jarak jauh.

    cacheSize

    Jumlah maksimum catatan data yang dapat di-cache.

    Integer

    Tidak

    Tidak ada nilai default

    Jika Anda mengatur parameter cache ke LRU, Anda dapat menentukan parameter ini.

    Catatan

    Nilai parameter ini adalah jumlah maksimum catatan data yang dapat di-cache.

    cacheTTLMs

    Periode timeout cache.

    Integer

    Tidak

    Tidak ada nilai default

    Satuan: milidetik. Konfigurasi parameter cacheTTLMs bervariasi berdasarkan nilai parameter cache.

    • Jika Anda mengatur parameter cache ke None, parameter cacheTTLMs dapat dibiarkan kosong. Ini menunjukkan bahwa entri cache tidak kedaluwarsa.

    • Jika Anda mengatur parameter cache ke LRU, parameter cacheTTLMs menentukan periode timeout cache. Secara default, entri cache tidak kedaluwarsa.

    • Jika Anda mengatur parameter cache ke ALL, parameter cacheTTLMs menentukan interval di mana sistem menyegarkan cache. Secara default, cache tidak dimuat ulang.

    cacheEmpty

    Menentukan apakah mencache hasil kosong.

    Boolean

    Tidak

    Tidak ada nilai default

    • true: Hasil kosong di-cache.

    • false: Hasil kosong tidak di-cache.

    cacheReloadTimeBlackList

    Periode waktu selama cache tidak disegarkan. Parameter ini berlaku ketika parameter cache diatur ke ALL. Cache tidak disegarkan selama periode waktu yang Anda tentukan untuk parameter ini. Parameter ini cocok untuk acara promosi online berskala besar seperti Double 11.

    String

    Tidak

    Tidak ada nilai default

    Contoh berikut menunjukkan format nilai: 2017-10-24 14:00 -> 2017-10-24 15:00, 2017-11-10 23:30 -> 2017-11-11 08:00. Gunakan pemisah berdasarkan aturan berikut:

    • Pisahkan beberapa periode waktu dengan koma (,).

    • Pisahkan waktu mulai dan waktu akhir setiap periode waktu dengan panah (->) yang merupakan kombinasi tanda hubung (-) dan tanda kurung penutup (>).

    async

    Menentukan apakah mengaktifkan sinkronisasi data dalam mode asinkron.

    Boolean

    Tidak

    false

    • true: Sinkronisasi data dalam mode asinkron diaktifkan. Secara default, data tidak diurutkan saat data disinkronkan dalam mode asinkron.

    • false (default): Sinkronisasi data dalam mode asinkron dinonaktifkan.

Pemetaan tipe data

  • Tabel Sumber

    Tipe data kolom di Tablestore

    Tipe data kolom di Realtime Compute for Apache Flink

    INTEGER

    BIGINT

    STRING

    STRING

    BOOLEAN

    BOOLEAN

    DOUBLE

    DOUBLE

    BINARY

    BINARY

  • Tabel Sink

    Tipe data kolom di Realtime Compute for Apache Flink

    Tipe data kolom di Tablestore

    BINARY

    BINARY

    VARBINARY

    CHAR

    STRING

    VARCHAR

    TINYINT

    INTEGER

    SMALLINT

    INTEGER

    BIGINT

    FLOAT

    DOUBLE

    DOUBLE

    BOOLEAN

    BOOLEAN

Contoh

Contoh 1

Membaca data dari Tablestore dan menulis ke Tablestore:

CREATE TEMPORARY TABLE tablestore_stream(
 `order` VARCHAR,
  orderid VARCHAR,
  customerid VARCHAR,
  customername VARCHAR
) WITH 
  'connector'='ots',
  'endPoint' ='<yourEndpoint>',
  'instanceName' = 'flink-source',
  'tableName' ='flink_source_table',
  'tunnelName' = 'flinksourcestream',
  'accessId' ='${ak_id}',
  'accessKey' ='${ak_secret}',
  'ignoreDelete' = 'false',
  'skipInvalidData' ='false' 
);

CREATE TEMPORARY TABLE ots_sink (
  `order` VARCHAR,
  orderid VARCHAR,
  customerid VARCHAR,
  customername VARCHAR,
  PRIMARY KEY (`order`,orderid) NOT ENFORCED
) WITH (
  'connector'='ots',
  'endPoint'='<yourEndpoint>',
  'instanceName'='flink-sink',
  'tableName'='flink_sink_table',
  'accessId'='${ak_id}',
  'accessKey'='${ak_secret}',
  'valueColumns'='customerid,customername',
  'autoIncrementKey'='${auto_increment_primary_key_name}' 
);

INSERT INTO ots_sink
SELECT `order`, orderid, customerid, customername FROM tablestore_stream;

Contoh 2

Sinkronisasi data dari tabel lebar ke tabel deret waktu.

CREATE TEMPORARY TABLE timeseries_source (
    measurement STRING,
    datasource STRING,
    tag_a STRING,
    `time` BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING,
    tag_b STRING,
    tag_c STRING,
    tag_d STRING,
    tag_e STRING,
    tag_f STRING
) 
WITH (
    'connector' = 'ots',
    'endPoint' = 'https://iotstore-test.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'iotstore-test',
    'tableName' = 'test_ots_timeseries_2',
    'tunnelName' = 'timeseries_source_tunnel_2',
    'accessId' = '${ak_id}',
    'accessKey' = '${ak_secret}',
    'ignoreDelete' = 'true', -- Abaikan penghapusan
);
CREATE TEMPORARY TABLE timeseries_sink (
    measurement STRING,
    datasource STRING,
    tags Map<String, String>, 
    `time` BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING,
    tag_b STRING,
    tag_c STRING,
    tag_d STRING,
    tag_e STRING,
    tag_f STRING,
    PRIMARY KEY(measurement, datasource, tags, `time`) NOT ENFORCED
) 
WITH (
    'connector' = 'ots',
    'endPoint' = 'https://iotstore-test.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'iotstore-test',
    'tableName' = 'test_timeseries_sink_table_2',
    'accessId' = '${ak_id}',
    'accessKey' = '${ak_secret}',
    'storageType' = 'TIMESERIES',
);

-- Masukkan data dari tabel sumber ke tabel sink
INSERT INTO timeseries_sink
    select 
        m_name,
        data_source,
        MAP["tag_a":tag_a,"tag_b":tag_b,"tag_c":tag_c,"tag_d":tag_d,"tag_e":tag_e,"tag_f":tag_f] AS tags,
        `time`,
        cpu_sys,
        cpu_user,
        disk_0,
        disk_1,
        disk_2,
        memory_used,
        net_in,
        net_out 
    from
        timeseries_source;