全部产品
Search
文档中心

Tablestore:Gunakan Realtime Compute for Apache Flink untuk memproses data Tablestore

更新时间:Jul 06, 2025

Topik ini menjelaskan cara menghitung data Tablestore menggunakan Realtime Compute for Apache Flink. Tabel data atau tabel seri waktu di Tablestore dapat digunakan sebagai tabel sumber atau tabel hasil pemrosesan data dengan Realtime Compute for Apache Flink.

Prasyarat

Kembangkan pekerjaan komputasi real-time

Langkah 1: Buat draf SQL

  1. Pergi ke halaman untuk membuat draf.

    1. Masuk ke Konsol Realtime Compute for Apache Flink.

    2. Temukan workspace yang ingin Anda kelola dan klik Console di kolom Actions.

    3. Di panel navigasi kiri, pilih Development > ETL.

  2. Klik New. Dalam kotak dialog New Draft, pilih Blank Stream Draft, lalu klik Next.

    Catatan

    Realtime Compute for Apache Flink menyediakan berbagai template kode dan mendukung sinkronisasi data. Setiap template cocok untuk skenario tertentu dan menyediakan contoh kode serta instruksi untuk Anda. Anda bisa mengklik template untuk mempelajari fitur dan sintaks terkait Realtime Compute for Apache Flink dan menerapkan logika bisnis Anda. Untuk informasi lebih lanjut, lihat Template Kode dan Template Sinkronisasi Data.

  3. Konfigurasikan parameter untuk draf. Tabel berikut menjelaskan parameter tersebut.

    Parameter

    Deskripsi

    Contoh

    Name

    Nama draf yang ingin Anda buat.

    Catatan

    Nama draf harus unik dalam proyek saat ini.

    flink-test

    Location

    Folder tempat file kode draf disimpan.

    Anda juga bisa mengklik ikon 新建文件夹 di sebelah kanan folder yang ada untuk membuat subfolder.

    Draft

    Engine Version

    Versi mesin Flink yang ingin Anda gunakan untuk draf saat ini. Untuk informasi lebih lanjut tentang versi mesin, lihat Catatan rilis dan Versi mesin.

    vvr-8.0.10-flink-1.17

  4. Klik Create.

Langkah 2: Tulis kode untuk draf

Catatan

Dalam contoh pada langkah ini, kode ditulis untuk menyinkronkan data dari tabel data ke tabel data lain. Untuk informasi tentang pernyataan SQL sampel lainnya, lihat Pernyataan SQL Sampel.

  1. Buat tabel sementara untuk tabel sumber dan tabel hasil.

    Untuk informasi lebih lanjut, lihat Lampiran 1: Konektor Tablestore.

    -- Buat tabel sementara bernama tablestore_stream untuk tabel sumber.
    CREATE TEMPORARY TABLE tablestore_stream(
        `order` VARCHAR,
        orderid VARCHAR,
        customerid VARCHAR,
        customername VARCHAR
    ) WITH (
        'connector' = 'ots', -- Tentukan tipe konektor tabel sumber. Nilainya adalah ots dan tidak dapat diubah. 
        'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com', -- Tentukan titik akhir virtual private cloud (VPC) dari instance Tablestore. 
        'instanceName' = 'xxx', -- Tentukan nama instance Tablestore. 
        'tableName' = 'flink_source_table', -- Tentukan nama tabel sumber. 
        'tunnelName' = 'flink_source_tunnel', -- Tentukan nama terowongan yang dibuat untuk tabel sumber. 
        'accessId' = 'xxxxxxxxxxx', -- Tentukan ID AccessKey dari akun Alibaba Cloud atau pengguna RAM. 
        'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx', -- Tentukan rahasia AccessKey dari akun Alibaba Cloud atau pengguna RAM. 
        'ignoreDelete' = 'false' -- Tentukan apakah akan mengabaikan data real-time yang dihasilkan oleh operasi hapus. Dalam contoh ini, parameter ini diatur ke false. 
    );
    
    -- Buat tabel sementara bernama tablestore_sink untuk tabel hasil.
    CREATE TEMPORARY TABLE tablestore_sink(
       `order` VARCHAR,
        orderid VARCHAR,
        customerid VARCHAR,
        customername VARCHAR,
        PRIMARY KEY (`order`,orderid) NOT ENFORCED -- Tentukan primary key. 
    ) WITH (
        'connector' = 'ots', -- Tentukan tipe konektor tabel hasil. Nilainya adalah ots dan tidak dapat diubah. 
        'endPoint'='https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com', -- Tentukan titik akhir VPC dari instance Tablestore. 
        'instanceName' = 'xxx', -- Tentukan nama instance Tablestore. 
        'tableName' = 'flink_sink_table', -- Tentukan nama tabel hasil. 
        'accessId' = 'xxxxxxxxxxx',  -- Tentukan ID AccessKey dari akun Alibaba Cloud atau pengguna RAM. 
        'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx', -- Tentukan rahasia AccessKey dari akun Alibaba Cloud atau pengguna RAM. 
        'valueColumns'='customerid,customername' --Tentukan nama kolom yang ingin Anda sisipkan ke tabel hasil. 
    );
  2. Tulis logika draf.

    Pernyataan SQL sampel berikut memberikan contoh tentang cara menyisipkan data dari tabel sumber ke tabel hasil:

    -- Sisipkan data dari tabel sumber ke tabel hasil.
    INSERT INTO tablestore_sink
    SELECT `order`, orderid, customerid, customername FROM tablestore_stream;

Langkah 3: (Opsional) Lihat informasi konfigurasi

Di tab kanan editor SQL, Anda dapat melihat konfigurasi atau mengonfigurasi parameter. Tabel berikut menjelaskan parameter tersebut.

Nama Tab

Deskripsi

Configurations

  • Engine Version: Versi mesin Flink yang digunakan oleh pekerjaan saat ini.

  • Additional Dependencies: dependensi tambahan yang digunakan dalam draf, seperti fungsi sementara.

    Anda dapat mengunduh dependensi Ververica Runtime (VVR), mengunggah dependensi VVR di halaman Unggah Artefak, lalu memilih dependensi VVR yang diunggah untuk Additional Dependencies. Untuk informasi lebih lanjut, lihat Lampiran 2: Konfigurasi dependensi VVR.

Structure

  • Flow Diagram: diagram alur yang memungkinkan Anda melihat arah aliran data.

  • Tree Diagram: diagram pohon yang memungkinkan Anda melihat sumber dari mana data diproses.

Versions

Anda dapat melihat versi mesin draf. Untuk informasi lebih lanjut tentang operasi yang dapat Anda lakukan di kolom Actions di panel Versi Draf, lihat Kelola versi penyebaran.

Langkah 4: (Opsional) Lakukan pemeriksaan sintaksis

Periksa semantik SQL draf, konektivitas jaringan, dan informasi metadata tabel yang digunakan oleh draf. Anda juga bisa mengklik SQL Advice di hasil perhitungan untuk melihat informasi tentang risiko SQL dan saran optimasi terkait.

  1. Di sudut kanan atas editor SQL, klik Validate.

  2. Di kotak dialog Validate, klik Confirm.

Langkah 5: (Opsional) Debug draf

Anda dapat menggunakan fitur debugging untuk mensimulasikan penyebaran berjalan, memeriksa output, dan memverifikasi logika bisnis pernyataan SELECT dan INSERT. Fitur ini meningkatkan efisiensi pengembangan dan mengurangi risiko kualitas data yang buruk.

  1. Di sudut kanan atas editor SQL, klik Debug.

  2. Di kotak dialog Debug, pilih kluster yang ingin Anda debug dan klik Next.

    Jika tidak ada kluster yang tersedia, buat kluster sesi. Pastikan bahwa kluster sesi menggunakan versi mesin yang sama dengan draf SQL dan bahwa kluster sesi sedang berjalan. Untuk informasi lebih lanjut, lihat Buat Kluster Sesi.

  3. Konfigurasikan data debugging.

    • Jika Anda menggunakan data online, lewati operasi ini.

    • Jika Anda menggunakan data debugging, klik Download mock data template, masukkan data debugging di template, lalu klik Unggah data mock untuk mengunggah data debugging. Untuk informasi lebih lanjut, lihat Debug Draf.

  4. Klik Confirm.

Langkah 6: Sebarkan draf

Di sudut kanan atas editor SQL, klik Deploy. Di kotak dialog Deploy draft, konfigurasikan parameter terkait dan klik Confirm.

Catatan

Kluster sesi cocok untuk lingkungan non-produksi, seperti lingkungan pengembangan dan pengujian. Anda dapat menyebarkan atau men-debug draf di kluster sesi untuk meningkatkan pemanfaatan sumber daya JobManager dan mempercepat startup penyebaran. Namun, kami sarankan agar Anda tidak menyebarkan draf untuk lingkungan produksi di kluster sesi. Jika tidak, masalah stabilitas mungkin terjadi.

Langkah 7: Mulai penyebaran untuk draf dan lihat hasil komputasi

  1. Di panel navigasi kiri, pilih O&M > Deployments.

  2. Temukan pekerjaan yang ingin Anda mulai, lalu klik Start di kolom Actions.

    Di panel Mulai Pekerjaan, pilih Initial Mode dan klik Start. Jika status penyebaran berubah menjadi RUNNING, penyebaran berjalan sesuai harapan. Untuk informasi lebih lanjut tentang parameter yang harus Anda konfigurasikan saat memulai penyebaran, lihat Mulai Penyebaran.

    Catatan
    • Kami sarankan Anda mengonfigurasi dua inti CPU dan 4 GB memori untuk setiap Pengelola Tugas di Realtime Compute for Apache Flink untuk memaksimalkan kemampuan komputasi setiap Pengelola Tugas. Seorang Pengelola Tugas dapat menulis 10.000 baris per detik.

    • Jika jumlah partisi di tabel sumber besar, kami sarankan Anda mengatur konkurensi kurang dari 16 di Realtime Compute for Apache Flink. Laju tulis meningkat secara linear dengan konkurensi.

  3. Di halaman Penyebaran, lihat hasil komputasi.

    1. Di panel navigasi kiri, pilih O&M > Deployments. Di halaman Penyebaran, klik nama penyebaran yang ingin Anda kelola.

    2. Di tab Logs, klik tab Running Task Managers dan klik nilai di kolom Path, ID.

    3. Klik Logs. Di tab Log, lihat informasi log terkait.

  4. (Opsional) Batalkan penyebaran.

    Jika Anda memodifikasi kode SQL untuk penyebaran, menambahkan atau menghapus parameter ke atau dari klausa WITH, atau mengubah versi penyebaran, Anda harus menyebarkan draf penyebaran, membatalkan penyebaran, lalu memulai penyebaran agar perubahan berlaku. Jika penyebaran gagal dan tidak dapat menggunakan data status untuk pulih, atau jika Anda ingin memperbarui pengaturan parameter yang tidak berlaku secara dinamis, Anda harus membatalkan dan memulai ulang penyebaran. Untuk informasi lebih lanjut tentang cara membatalkan penyebaran, lihat Batalkan Penyebaran.

Lampiran

Lampiran 1: Tablestore

Realtime Compute for Apache Flink menyediakan Konektor Tablestore bawaan untuk membaca, menulis, dan menyinkronkan data Tablestore.

Tabel Sumber

Sintaks DDL
Tabel Data

Kode sampel berikut memberikan contoh pernyataan DDL untuk membuat tabel sementara untuk tabel sumber:

-- Buat tabel sementara bernama tablestore_stream untuk tabel sumber.
CREATE TEMPORARY TABLE tablestore_stream(
    `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR
) WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_source_table',
    'tunnelName' = 'flink_source_tunnel',
    'accessId' = 'xxxxxxxxxxx',
    'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'ignoreDelete' = 'false'
);
Tabel Seri Waktu

Kode sampel berikut memberikan contoh pernyataan DDL untuk membuat tabel sementara untuk tabel sumber:

-- Buat tabel sementara bernama tablestore_stream untuk tabel sumber.
CREATE TEMPORARY TABLE tablestore_stream(
    _m_name STRING,
    _data_source STRING,
    _tags STRING,
    _time BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING
) WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_source_table',
    'tunnelName' = 'flink_source_tunnel',
    'accessId' = 'xxxxxxxxxxx',
    'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx'
);

Bidang yang datanya perlu dikonsumsi dan bidang OtsRecordType serta OtsRecordTimestamp dalam data yang dikembalikan oleh Layanan Tunnel dapat dibaca dan ditulis sebagai kolom atribut. Tabel berikut menjelaskan bidang tersebut.

Bidang

Bidang yang dipetakan di Realtime Compute for Apache Flink

Deskripsi

OtsRecordType

type

Jenis operasi.

OtsRecordTimestamp

timestamp

Waktu operasi data. Satuan: mikrodetik.

Catatan

Jika Anda ingin Realtime Compute for Apache Flink membaca semua data, atur bidang ini ke 0.

Parameter dalam klausa WITH

Parameter

Tabel yang berlaku

Diperlukan

Deskripsi

connector

Umum

Ya

Tipe konektor tabel sumber. Nilainya adalah ots dan tidak dapat diubah.

endPoint

Umum

Ya

Titik akhir dari instance Tablestore. Anda harus menggunakan titik akhir VPC. Untuk informasi lebih lanjut, lihat Titik Akhir.

instanceName

Umum

Ya

Nama instance Tablestore.

tableName

Umum

Ya

Nama tabel sumber di Tablestore.

tunnelName

Umum

Ya

Nama terowongan untuk tabel sumber di Tablestore. Untuk informasi tentang cara membuat terowongan, lihat Buat Terowongan.

accessId

Umum

Ya

Pasangan AccessKey (ID AccessKey dan Rahasia AccessKey) dari akun Alibaba Cloud atau pengguna RAM.

Penting

Untuk melindungi pasangan AccessKey Anda, kami sarankan Anda menggunakan variabel untuk menentukan pasangan AccessKey. Untuk informasi lebih lanjut, lihat Kelola Variabel.

accessKey

Umum

Ya

connectTimeout

Umum

Tidak

Periode timeout untuk konektor Tablestore menghubungkan ke Tablestore. Satuan: milidetik. Nilai default: 30000.

socketTimeout

Umum

Tidak

Periode timeout soket untuk konektor Tablestore menghubungkan ke Tablestore. Satuan: milidetik. Nilai default: 30000.

ioThreadCount

Umum

Tidak

Jumlah utas I/O. Nilai default: 4.

callbackThreadPoolSize

Umum

Tidak

Ukuran kolam utas callback. Nilai default: 4.

ignoreDelete

Tabel Data

Tidak

Menentukan apakah akan mengabaikan data real-time yang dihasilkan oleh operasi hapus. Nilai default: false, yang menentukan bahwa data real-time yang dihasilkan oleh operasi hapus tidak diabaikan.

skipInvalidData

Umum

Tidak

Menentukan apakah akan mengabaikan data kotor. Nilai default: false, yang menentukan bahwa data kotor tidak diabaikan. Jika data kotor tidak diabaikan, kesalahan akan dilaporkan saat sistem memproses data kotor.

Penting

Hanya Realtime Compute for Apache Flink yang menggunakan VVR 8.0.4 atau lebih baru yang mendukung parameter ini.

retryStrategy

Umum

Tidak

Kebijakan ulang. Nilai valid:

  • TIME: Sistem terus mencoba hingga periode timeout yang ditentukan oleh parameter retryTimeoutMs berakhir. Ini adalah nilai default.

  • COUNT: Sistem terus mencoba hingga jumlah maksimum percobaan ulang yang ditentukan oleh parameter retryCount tercapai.

retryCount

Umum

Tidak

Jumlah maksimum percobaan ulang. Jika Anda mengatur parameter retryStrategy ke COUNT, Anda dapat mengonfigurasi parameter ini. Nilai default: 3.

retryTimeoutMs

Umum

Tidak

Periode timeout untuk percobaan ulang. Satuan: milidetik. Jika Anda mengatur parameter retryStrategy ke TIME, Anda dapat mengonfigurasi parameter ini. Nilai default: 180000.

streamOriginColumnMapping

Umum

Tidak

Pemetaan antara nama kolom di tabel sumber dan nama kolom di tabel sementara.

Catatan

Pisahkan nama kolom di tabel sumber dan nama kolom di tabel sementara dengan titik dua (:). Pisahkan beberapa pemetaan dengan koma (,). Contoh: origin_col1:col1,origin_col2:col2.

outputSpecificRowType

Umum

Tidak

Menentukan apakah akan meneruskan tipe baris tertentu. Nilai valid:

  • false: tidak meneruskan tipe baris tertentu. Tipe baris semua data adalah INSERT. Ini adalah nilai default.

  • true: meneruskan tipe baris tertentu. Tipe baris data bisa berupa INSERT, DELETE, atau UPDATE_AFTER.

Pemetaan Tipe Data

Tipe data bidang di Tablestore

Tipe data bidang di Realtime Compute for Apache Flink

INTEGER

BIGINT

STRING

STRING

BOOLEAN

BOOLEAN

DOUBLE

DOUBLE

BINARY

BINARY

Tabel Hasil

Sintaks DDL
Tabel Data

Kode sampel berikut memberikan contoh pernyataan DDL untuk membuat tabel sementara untuk tabel hasil:

-- Buat tabel sementara bernama tablestore_sink untuk tabel hasil.
CREATE TEMPORARY TABLE tablestore_sink(
   `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR,
    PRIMARY KEY (`order`,orderid) NOT ENFORCED
) WITH (
    'connector' = 'ots',
    'endPoint'='https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_sink_table',
    'accessId' = 'xxxxxxxxxxx',
    'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'valueColumns'='customerid,customername'
);
Catatan

Anda harus menentukan skema primary key dan setidaknya satu kolom atribut untuk tabel hasil Tablestore. Data keluaran ditambahkan ke tabel hasil Tablestore untuk memperbarui data tabel.

Tabel Seri Waktu

Saat Anda menggunakan tabel seri waktu sebagai tabel hasil, Anda harus menentukan kolom primary key berikut untuk tabel hasil: _m_name, _data_source, _tags, dan _time. Konfigurasi lainnya sama dengan saat Anda menggunakan tabel data sebagai tabel hasil. Anda dapat menentukan kolom primary key tabel seri waktu menggunakan parameter dalam klausa WITH, primary key tabel SINK, dan primary key dalam format Map. Jika Anda menggunakan ketiga metode tersebut secara bersamaan untuk menentukan kolom primary key tabel seri waktu, kolom primary key yang ditentukan menggunakan parameter dalam klausa WITH memiliki prioritas tertinggi.

Gunakan parameter dalam klausa WITH

Kode sampel berikut memberikan contoh tentang cara menggunakan parameter dalam klausa WITH untuk mendefinisikan sintaks DDL:

-- Buat tabel sementara bernama tablestore_sink untuk tabel hasil.
CREATE TEMPORARY TABLE tablestore_sink(
    measurement STRING,
    datasource STRING,
    tag_a STRING,
    `time` BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING,
    tag_b STRING,
    tag_c STRING,
    tag_d STRING,
    tag_e STRING,
    tag_f STRING,
    PRIMARY KEY(measurement, datasource, tag_a, `time`) NOT ENFORCED
) 
WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_sink_table',
    'accessId' ='xxxxxxxxxxx',
    'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'storageType' = 'TIMESERIES',
    'timeseriesSchema' = '{"measurement":"_m_name", "datasource":"_data_source", "tag_a":"_tags", "tag_b":"_tags", "tag_c":"_tags", "tag_d":"_tags", "tag_e":"_tags", "tag_f":"_tags", "time":"_time"}'
);

-- Sisipkan data dari tabel sumber ke tabel hasil.
INSERT INTO tablestore_sink
    select 
    measurement,
    datasource,
    tag_a,
    `time`,
    binary_value,
    bool_value,
    double_value,
    long_value,
    string_value,
    tag_b,
    tag_c,
    tag_d,
    tag_e,
    tag_f
    from tablestore_stream;
Gunakan Primary Key dalam Format Map

Kode sampel berikut memberikan contoh tentang cara menggunakan primary key dalam format Map untuk mendefinisikan sintaks DDL:

Catatan

Tablestore menyediakan tipe data Map Flink untuk memudahkan pembuatan kolom _tags tabel seri waktu dalam model TimeSeries. Tipe data Map mendukung operasi pemetaan, seperti penggantian nama kolom dan fungsi sederhana. Saat menggunakan Map, pastikan bahwa kolom primary key _tags berada di posisi ketiga.

-- Buat tabel sementara bernama tablestore_sink untuk tabel hasil.
CREATE TEMPORARY TABLE tablestore_sink(
    measurement STRING,
    datasource STRING,
    tags Map<String, String>, 
    `time` BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING,
    PRIMARY KEY(measurement, datasource, tags, `time`) NOT ENFORCED
)
WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_sink_table',
    'accessId' ='xxxxxxxxxxx',
    'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'storageType' = 'TIMESERIES'
);

-- Sisipkan data dari tabel sumber ke tabel hasil.
INSERT INTO tablestore_sink
    select 
    measurement,
    datasource,
    MAP[`tag_a`, `tag_b`, `tag_c`, `tag_d`, `tag_e`, `tag_f`] AS tags,
    `time`,
    binary_value,
    bool_value,
    double_value,
    long_value,
    string_value
    from timeseries_source;
Gunakan Primary Key Tabel SINK

Kode sampel berikut memberikan contoh tentang cara menggunakan primary key tabel SINK untuk mendefinisikan sintaks DDL. Kolom primary key pertama adalah kolom _m_name, yang menentukan nama pengukuran. Kolom primary key kedua adalah kolom _data_source, yang menentukan sumber data. Kolom primary key terakhir adalah kolom _time, yang menentukan timestamp. Kolom primary key di tengah adalah kolom _tags, yang menentukan tag dari seri waktu.

-- Buat tabel sementara bernama tablestore_sink untuk tabel hasil.
CREATE TEMPORARY TABLE tablestore_sink(
    measurement STRING,
    datasource STRING,
    tag_a STRING,
    `time` BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING,
    tag_b STRING,
    tag_c STRING,
    tag_d STRING,
    tag_e STRING,
    tag_f STRING
    PRIMARY KEY(measurement, datasource, tag_a, tag_b, tag_c, tag_d, tag_e, tag_f, `time`) NOT ENFORCED
) WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_sink_table',
    'accessId' ='xxxxxxxxxxx',
    'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'storageType' = 'TIMESERIES'
);

-- Sisipkan data dari tabel sumber ke tabel hasil.
INSERT INTO tablestore_sink
    select 
    measurement,
    datasource,
    tag_a,
    tag_b,
    tag_c,
    tag_d,
    tag_e,
    tag_f,
    `time`,
    binary_value,
    bool_value,
    double_value,
    long_value,
    string_value
    from timeseries_source;
Parameter dalam klausa WITH

Parameter

Tabel yang berlaku

Diperlukan

Deskripsi

connector

Umum

Ya

Tipe konektor tabel hasil. Nilainya adalah ots dan tidak dapat diubah.

endPoint

Umum

Ya

Titik akhir instance Tablestore. Anda harus menggunakan titik akhir VPC. Untuk informasi lebih lanjut, lihat Titik Akhir.

instanceName

Umum

Ya

Nama instance Tablestore.

tableName

Umum

Ya

Nama tabel seri waktu di Tablestore.

accessId

Umum

Ya

Pasangan AccessKey (ID AccessKey dan Rahasia AccessKey) dari akun Alibaba Cloud atau pengguna RAM.

Penting

Untuk melindungi pasangan AccessKey Anda, kami sarankan Anda menggunakan variabel untuk menentukan pasangan AccessKey. Untuk informasi lebih lanjut, lihat Kelola Variabel.

accessKey

Umum

Ya

valueColumns

Tabel Data

Ya

Nama kolom yang ingin Anda sisipkan. Pisahkan beberapa bidang dengan koma (,). Contoh: ID,NAMA.

storageType

Umum

Tidak

Penting

Jika Anda menggunakan tabel seri waktu sebagai tabel hasil, atur parameter ini ke TIMESERIES.

Tipe tabel. Nilai valid:

  • WIDE_COLUMN: tabel data. Ini adalah nilai default.

  • TIMESERIES: tabel seri waktu.

timeseriesSchema

Tabel Seri Waktu

Tidak

Penting

Saat Anda menggunakan tabel seri waktu sebagai tabel hasil, jika Anda menggunakan parameter dalam klausa WITH untuk menentukan primary key tabel seri waktu, Anda harus mengonfigurasi parameter ini.

Kolom yang ingin Anda tentukan sebagai kolom primary key tabel seri waktu.

  • Tentukan nilai parameter ini menggunakan pasangan kunci-nilai dalam format JSON. Contoh: {"measurement":"_m_name","datasource":"_data_source","tag_a":"_tags","tag_b":"_tags","tag_c":"_tags","tag_d":"_tags","tag_e":"_tags","tag_f":"_tags","time":"_time"}.

  • Tipe kolom primary key yang Anda tentukan harus sama dengan tipe kolom primary key di tabel seri waktu. Kolom primary key tags dapat terdiri dari beberapa kolom.

connectTimeout

Umum

Tidak

Periode timeout untuk konektor Tablestore menghubungkan ke Tablestore. Satuan: milidetik. Nilai default: 30000.

socketTimeout

Umum

Tidak

Periode timeout soket untuk konektor Tablestore menghubungkan ke Tablestore. Satuan: milidetik. Nilai default: 30000.

ioThreadCount

Umum

Tidak

Jumlah utas I/O. Nilai default: 4.

callbackThreadPoolSize

Umum

Tidak

Ukuran kolam utas callback. Nilai default: 4.

retryIntervalMs

Umum

Tidak

Interval antar percobaan ulang. Satuan: milidetik. Nilai default: 1000.

maxRetryTimes

Umum

Tidak

Jumlah maksimum percobaan ulang. Nilai default: 10.

bufferSize

Umum

Tidak

Jumlah maksimum catatan data yang dapat disimpan di buffer sebelum data ditulis ke tabel hasil. Nilai default: 5000, yang menentukan bahwa data ditulis ke tabel hasil ketika jumlah catatan data di buffer mencapai 5.000.

batchWriteTimeoutMs

Umum

Tidak

Periode timeout tulis. Satuan: milidetik. Nilai default: 5000, yang menentukan bahwa semua data di buffer ditulis ke tabel hasil ketika jumlah catatan data di buffer tidak mencapai nilai yang ditentukan oleh parameter bufferSize dalam 5.000 milidetik.

batchSize

Umum

Tidak

Jumlah catatan data yang dapat ditulis ke tabel hasil secara bersamaan. Nilai default: 100. Nilai maksimum: 200.

ignoreDelete

Umum

Tidak

Menentukan apakah akan mengabaikan data real-time yang dihasilkan oleh operasi hapus. Nilai default: false, yang menentukan bahwa data real-time yang dihasilkan oleh operasi hapus tidak diabaikan.

Penting

Jika Anda menggunakan tabel data sebagai tabel sumber, Anda dapat mengonfigurasi parameter ini berdasarkan kebutuhan bisnis Anda.

autoIncrementKey

Tabel Data

Tidak

Nama kolom primary key auto-increment tabel hasil ketika tabel hasil berisi kolom primary key auto-increment. Jika tabel hasil tidak memiliki kolom primary key auto-increment, Anda tidak perlu mengonfigurasi parameter ini.

Penting

Hanya Realtime Compute for Apache Flink yang menggunakan VVR 8.0.4 atau lebih baru yang mendukung parameter ini.

overwriteMode

Umum

Tidak

Mode penimpaan data. Nilai valid:

  • PUT: Data ditulis ke tabel Tablestore dalam mode PUT. Ini adalah nilai default.

  • UPDATE: Data ditulis ke tabel Tablestore dalam mode UPDATE.

Catatan

Hanya mode UPDATE yang didukung dalam mode kolom dinamis.

defaultTimestampInMillisecond

Umum

Tidak

Timestamp default yang digunakan untuk menulis data ke tabel Tablestore. Jika Anda membiarkan parameter ini kosong, timestamp waktu sistem saat ini digunakan.

dynamicColumnSink

Umum

Tidak

Menentukan apakah akan mengaktifkan mode kolom dinamis. Nilai default: false, yang menentukan bahwa mode kolom dinamis dinonaktifkan.

Catatan
  • Mode kolom dinamis cocok untuk skenario di mana tidak ada kolom yang ditentukan untuk tabel dan kolom data dimasukkan ke tabel berdasarkan status penyebaran. Anda harus menentukan beberapa kolom pertama sebagai kolom primary key dalam pernyataan pembuatan tabel. Nilai kolom kedua hingga terakhir digunakan sebagai variabel nama kolom, nilai kolom terakhir digunakan sebagai nilai variabel nama kolom, dan tipe data kolom kedua hingga terakhir harus berupa String.

  • Jika Anda mengaktifkan mode kolom dinamis, fitur kolom primary key auto-increment tidak didukung dan Anda harus mengatur parameter overwriteMode ke UPDATE.

checkSinkTableMeta

Umum

Tidak

Menentukan apakah akan memeriksa metadata tabel hasil. Nilai default: true, yang menentukan bahwa sistem memeriksa apakah kolom primary key tabel Tablestore sama dengan kolom primary key yang ditentukan dalam pernyataan pembuatan tabel.

enableRequestCompression

Umum

Tidak

Menentukan apakah akan mengaktifkan kompresi data selama penulisan data. Nilai default: false, yang menentukan bahwa kompresi data dinonaktifkan selama penulisan data.

Pemetaan Tipe Data

Tipe data bidang di Realtime Compute for Apache Flink

Tipe data bidang di Tablestore

BINARY

BINARY

VARBINARY

BINARY

CHAR

STRING

VARCHAR

STRING

TINYINT

INTEGER

SMALLINT

INTEGER

INTEGER

INTEGER

BIGINT

INTEGER

FLOAT

DOUBLE

DOUBLE

DOUBLE

BOOLEAN

BOOLEAN

Pernyataan SQL Sampel

Sinkronkan data dari tabel sumber ke tabel hasil
Sinkronkan data dari tabel data ke tabel seri waktu

Baca data dari tabel data bernama flink_source_table dan tulis data tersebut ke tabel seri waktu bernama flink_sink_table.

Pernyataan SQL sampel:

-- Buat tabel sementara bernama tablestore_stream untuk tabel sumber.
CREATE TEMPORARY TABLE tablestore_stream(
    measurement STRING,
    datasource STRING,
    tag_a STRING,
    `time` BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING,
    tag_b STRING,
    tag_c STRING,
    tag_d STRING,
    tag_e STRING,
    tag_f STRING
) 
WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_source_table',
    'tunnelName' = 'flink_source_tunnel',
    'accessId' = 'xxxxxxxxxxx',
    'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'ignoreDelete' = 'false' 
);

-- Buat tabel sementara bernama tablestore_sink untuk tabel hasil menggunakan parameter dalam klausa WITH.
CREATE TEMPORARY TABLE tablestore_sink(
     measurement STRING,
     datasource STRING,
     tag_a STRING,
     `time` BIGINT,
     binary_value BINARY,
     bool_value BOOLEAN,
     double_value DOUBLE,
     long_value BIGINT,
     string_value STRING,
     tag_b STRING,
     tag_c STRING,
     tag_d STRING,
     tag_e STRING,
     tag_f STRING,
     PRIMARY KEY(measurement, datasource, tag_a, `time`) NOT ENFORCED
 ) WITH (
     'connector' = 'ots',
     'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
     'instanceName' = 'xxx',
     'tableName' = 'flink_sink_table',
     'accessId' = 'xxxxxxxxxxx',
     'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
     'storageType' = 'TIMESERIES',
     'timeseriesSchema' = '{"measurement":"_m_name","datasource":"_data_source","tag_a":"_tags","tag_b":"_tags","tag_c":"_tags","tag_d":"_tags","tag_e":"_tags","tag_f":"_tags","time":"_time"}'
 );
 
-- Masukkan data dari tabel sumber ke tabel hasil.
INSERT INTO tablestore_sink
    select 
    measurement,
    datasource,
    tag_a,
    `time`,
    binary_value,
    bool_value,
    double_value,
    long_value,
    string_value,
    tag_b,
    tag_c,
    tag_d,
    tag_e,
    tag_f
    from tablestore_stream;
Sinkronkan data dari tabel deret waktu ke tabel data

Baca data dari tabel deret waktu bernama flink_source_table dan tulis data tersebut ke tabel data bernama flink_sink_table.

Contoh Pernyataan SQL:

-- Buat tabel sementara bernama tablestore_stream untuk tabel sumber.
CREATE TEMPORARY TABLE tablestore_stream(
    _m_name STRING,
    _data_source STRING,
    _tags STRING,
    _time BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING
) WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_source_table',
    'tunnelName' = 'flink_source_tunnel',
    'accessId' = 'xxxxxxxxxxx',
    'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx'
);

-- Buat tabel sementara bernama print_table untuk tabel hasil. 
CREATE TEMPORARY TABLE tablestore_target(
    measurement STRING,
    datasource STRING,
    tags STRING,
    `time` BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING,
    PRIMARY KEY (measurement,datasource, tags, `time`) NOT ENFORCED
) WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_sink_table',
    'accessId' = 'xxxxxxxxxxx',
    'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'valueColumns'='binary_value,bool_value,double_value,long_value,string_value'
);

-- Masukkan data dari tabel sumber ke tabel hasil.
INSERT INTO tablestore_target
SELECT
    _m_name,
    _data_source,
    _tags,
    _time,
    binary_value,
    bool_value,
    double_value,
    long_value,
    string_value
    from tablestore_stream;
Baca data dari tabel sumber dan cetak data di konsol Tablestore

Baca data dari tabel sumber bernama flink_source_table secara batch. Anda dapat menggunakan fitur debugging penyebaran untuk mensimulasikan jalannya penyebaran. Hasil debugging ditampilkan di bagian bawah editor SQL.

Pernyataan SQL sampel:

-- Buat tabel sementara bernama tablestore_stream untuk tabel data sumber.
CREATE TEMPORARY TABLE tablestore_stream(
    `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR
) WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_source_table',
    'tunnelName' = 'flink_source_tunnel',
    'accessId' = 'xxxxxxxxxxx',
    'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'ignoreDelete' = 'false' 
);

-- Baca data dari tabel sumber.
SELECT * FROM tablestore_stream LIMIT 100;
Baca data dari tabel sumber dan cetak data ke log TaskManager

Baca data dari tabel sumber bernama flink_source_table dan cetak hasilnya ke log TaskManager menggunakan konektor Print.

Pernyataan SQL sampel:

-- Buat tabel sementara bernama tablestore_stream untuk tabel data sumber.
CREATE TEMPORARY TABLE tablestore_stream(
    `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR
) WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_source_table',
    'tunnelName' = 'flink_source_tunnel',
    'accessId' = 'xxxxxxxxxxx',
    'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'ignoreDelete' = 'false' 
);

-- Buat tabel sementara bernama print_table untuk tabel hasil. 
CREATE TEMPORARY TABLE print_table(
   `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR
) WITH (
  'connector' = 'print',   -- Gunakan konektor Print.
  'logger' = 'true'        -- Tampilkan hasil komputasi di konsol pengembangan Realtime Compute for Apache Flink.
);

-- Cetak bidang tabel sumber.
INSERT INTO print_table
SELECT `order`,orderid,customerid,customername from tablestore_stream;

Lampiran 2: Konfigurasikan dependensi VVR

  1. Unduh dependensi VVR.

  2. Unggah dependensi VVR.

    1. Masuk ke Konsol Realtime Compute for Apache Flink.

    2. Temukan workspace yang ingin Anda kelola dan klik Console di kolom Actions.

    3. Di panel navigasi kiri, klik Artifacts.

    4. Di halaman Artifacts, klik Upload Artifact dan pilih paket JAR tempat dependensi VVR disimpan.

  3. Di sebelah kanan editor SQL pekerjaan yang ingin Anda kelola, klik Configurations. Di bidang Additional Dependencies di panel Konfigurasi, pilih paket JAR tempat dependensi VVR disimpan.