All Products
Search
Document Center

Tablestore:Gunakan Realtime Compute for Apache Flink untuk memproses data Tablestore

Last Updated:May 09, 2026

Topik ini menjelaskan cara memproses data Tablestore menggunakan Realtime Compute for Apache Flink. Tabel data atau tabel time series di Tablestore dapat digunakan sebagai tabel sumber atau tabel hasil dalam pemrosesan data menggunakan Realtime Compute for Apache Flink.

Prasyarat

Kembangkan pekerjaan komputasi real-time

Langkah 1: Buat draf SQL

  1. Buka halaman pembuatan draf.

    1. Masuk ke Konsol Realtime Compute for Apache Flink.

    2. Pada kolom Actions ruang kerja target, klik Console.

    3. Di panel navigasi kiri, klik Development > ETL.

  2. Klik New. Pada kotak dialog New Draft, pilih Blank Stream Draft, lalu klik Next.

    Catatan

    Realtime Compute for Apache Flink menyediakan berbagai templat kode dan mendukung sinkronisasi data. Setiap templat dirancang untuk skenario tertentu serta mencakup contoh kode dan instruksi penggunaan. Anda dapat mengklik templat tersebut untuk mempelajari fitur dan sintaks Realtime Compute for Apache Flink serta menerapkan logika bisnis Anda. Untuk informasi selengkapnya, lihat Templat kode dan Templat sinkronisasi data.

  3. Masukkan Job Information.

    Parameter

    Deskripsi

    Contoh

    File Name

    Nama draf yang ingin Anda buat.

    Catatan

    Nama draf harus unik dalam proyek saat ini.

    flink-test

    Location

    Folder tempat file kode draf disimpan.

    Anda juga dapat mengklik ikon 新建文件夹 di sebelah kanan folder yang ada untuk membuat subfolder.

    Draft

    Engine version

    Versi mesin Flink yang ingin digunakan oleh draf saat ini. Untuk informasi selengkapnya tentang versi mesin, lihat Catatan rilis dan Versi mesin.

    vvr-8.0.10-flink-1.17

  4. Klik Create.

Langkah 2: Tulis draf SQL

Catatan

Pada contoh dalam langkah ini, kode ditulis untuk menyinkronkan data dari tabel data ke tabel data lainnya. Untuk informasi lebih lanjut tentang contoh pernyataan SQL, lihat Contoh pernyataan SQL.

  1. Buat tabel sementara untuk tabel sumber dan tabel hasil.

    Untuk informasi selengkapnya, lihat Lampiran 1: Konektor Tablestore.

    -- Buat tabel sementara bernama tablestore_stream untuk tabel sumber.
    CREATE TEMPORARY TABLE tablestore_stream(
        `order` VARCHAR,
        orderid VARCHAR,
        customerid VARCHAR,
        customername VARCHAR
    ) WITH (
        'connector' = 'ots', -- Tentukan jenis konektor tabel sumber. Nilainya adalah ots dan tidak dapat diubah. 
        'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com', -- Tentukan titik akhir virtual private cloud (VPC) instans Tablestore. 
        'instanceName' = 'xxx', -- Tentukan nama instans Tablestore. 
        'tableName' = 'flink_source_table', -- Tentukan nama tabel sumber. 
        'tunnelName' = 'flink_source_tunnel', -- Tentukan nama tunnel yang dibuat untuk tabel sumber. 
        'accessId' = 'xxxxxxxxxxx', -- Tentukan ID AccessKey Akun Alibaba Cloud atau pengguna RAM. 
        'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx', -- Tentukan Rahasia AccessKey Akun Alibaba Cloud atau pengguna RAM. 
        'ignoreDelete' = 'false' -- Tentukan apakah akan mengabaikan data real-time yang dihasilkan oleh operasi delete. Dalam contoh ini, parameter ini diatur ke false. 
    );
    
    -- Buat tabel sementara bernama tablestore_sink untuk tabel hasil.
    CREATE TEMPORARY TABLE tablestore_sink(
       `order` VARCHAR,
        orderid VARCHAR,
        customerid VARCHAR,
        customername VARCHAR,
        PRIMARY KEY (`order`,orderid) NOT ENFORCED -- Tentukan kunci primer. 
    ) WITH (
        'connector' = 'ots', -- Tentukan jenis konektor tabel hasil. Nilainya adalah ots dan tidak dapat diubah. 
        'endPoint'='https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com', -- Tentukan titik akhir VPC instans Tablestore. 
        'instanceName' = 'xxx', -- Tentukan nama instans Tablestore. 
        'tableName' = 'flink_sink_table', -- Tentukan nama tabel hasil. 
        'accessId' = 'xxxxxxxxxxx',  -- Tentukan ID AccessKey Akun Alibaba Cloud atau pengguna RAM. 
        'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx', -- Tentukan Rahasia AccessKey Akun Alibaba Cloud atau pengguna RAM. 
        'valueColumns'='customerid,customername' --Tentukan nama kolom yang ingin Anda masukkan ke tabel hasil. 
    );
  2. Tulis logika draft.

    Pernyataan SQL berikut memberikan contoh cara memasukkan data dari tabel sumber ke tabel hasil:

    -- Masukkan data dari tabel sumber ke tabel hasil.
    INSERT INTO tablestore_sink
    SELECT `order`, orderid, customerid, customername FROM tablestore_stream;

Langkah 3: (Opsional) Lihat informasi konfigurasi

Di tab sebelah kanan editor SQL, Anda dapat melihat konfigurasi atau mengatur parameter. Tabel berikut menjelaskan parameter tersebut.

Nama tab

Deskripsi

Configurations

  • Engine version: Versi mesin Flink untuk draf.

  • Additional dependencies: Dependensi tambahan yang diperlukan untuk pekerjaan, seperti UDF.

    Anda dapat mengunduh dependensi Ververica Runtime (VVR), mengunggahnya di halaman file resource, lalu memilih file yang diunggah sebagai additional dependencies. Untuk informasi selengkapnya, lihat Lampiran 2: Konfigurasikan dependensi VVR.

Structure

  • Data flow diagram: Memvisualisasikan aliran data pekerjaan.

  • Tree structure diagram: Memvisualisasikan alur data pekerjaan.

Versions

Menampilkan riwayat versi draf. Untuk detail fitur di kolom Actions, lihat Kelola versi pekerjaan.

Langkah 4: (Opsional) Lakukan pemeriksaan sintaks

Validasi memeriksa semantik SQL pekerjaan, konektivitas jaringan, dan metadata tabel. Anda juga dapat mengklik SQL Advice di area hasil untuk melihat potensi risiko SQL dan saran optimasi.

  1. Di pojok kanan atas editor SQL, klik Validate.

  2. Di kotak dialog Validation, klik Confirm.

Langkah 5: (Opsional) Debug draf

Anda dapat menggunakan fitur debugging untuk mensimulasikan jalannya deployment, memeriksa output, dan memverifikasi logika bisnis pernyataan SELECT dan INSERT. Fitur ini meningkatkan efisiensi pengembangan dan mengurangi risiko kualitas data yang buruk.

  1. Di pojok kanan atas editor SQL, klik Debug.

  2. Di kotak dialog Debug, pilih kluster sesi, lalu klik Next.

    Jika tidak tersedia kluster, buat kluster sesi yang menggunakan versi mesin yang sama dengan draf SQL dan pastikan kluster sesi tersebut sedang berjalan. Untuk informasi selengkapnya, lihat Buat kluster sesi.

  3. Konfigurasikan data debugging.

    • Jika Anda menggunakan data online, lewati langkah ini.

    • Jika ingin menggunakan data debugging, klik Download Debugging Data Template, isi templat dengan data Anda, lalu unggah file tersebut. Untuk informasi selengkapnya, lihat Debugging pekerjaan.

  4. Setelah mengonfigurasi data, klik OK.

Langkah 6: Deploy draf

Di pojok kanan atas editor SQL, klik Deploy. Pada kotak dialog Deploy New Version, konfigurasikan parameter deployment, lalu klik OK.

Catatan

Kluster sesi cocok untuk lingkungan non-produksi, seperti pengembangan dan pengujian. Anda dapat mendeploy atau mendebug draf di kluster sesi untuk meningkatkan pemanfaatan resource JobManager dan mempercepat startup deployment. Namun, kami tidak menyarankan mendeploy draf untuk lingkungan produksi di kluster sesi karena dapat menyebabkan masalah stabilitas.

Langkah 7: Mulai deployment untuk draf dan lihat hasil komputasi

  1. Di panel navigasi sebelah kiri, klik O&M > Deployments.

  2. Pada kolom Actions untuk deployment target, klik Start.

    Pilih Start with no state, lalu klik Start. Status Running menunjukkan bahwa deployment berjalan dengan benar. Untuk informasi selengkapnya mengenai parameter startup, lihat Mulai pekerjaan.

    Catatan
    • Disarankan untuk mengonfigurasi dua core CPU dan memori 4 GB untuk setiap TaskManager di Realtime Compute for Apache Flink guna memaksimalkan kapasitas komputasi. Satu TaskManager mampu menulis hingga 10.000 baris per detik.

    • Jika jumlah partisi pada tabel sumber besar, disarankan mengatur konkurensi kurang dari 16 di Realtime Compute for Apache Flink karena laju penulisan meningkat secara linear seiring dengan peningkatan konkurensi.

  3. Di halaman Deployments, lihat hasil komputasi.

    1. Di halaman O&M > Deployments, klik nama deployment target.

    2. Di tab Job logs, buka tab Running task managers, lalu klik tugas target pada kolom Path,ID.

    3. Klik Logs untuk melihat informasi log.

  4. (Opsional) Batalkan deployment.

    Jika Anda mengubah kode SQL untuk deployment, menambahkan atau menghapus parameter dalam klausa WITH, atau mengubah versi deployment, Anda harus mendeploy ulang draf tersebut dengan membatalkan deployment yang ada lalu memulai ulang deployment agar perubahan diterapkan. Jika deployment gagal dan tidak dapat memulihkan data state, atau jika Anda ingin memperbarui pengaturan parameter yang tidak berlaku secara dinamis, Anda juga harus membatalkan lalu memulai ulang deployment. Untuk informasi selengkapnya tentang cara membatalkan deployment, lihat Batalkan deployment.

Lampiran

Lampiran 1: Tablestore connector

Realtime Compute for Apache Flink menyediakan konektor Tablestore bawaan untuk membaca, menulis, dan menyinkronkan data Tablestore.

Tabel sumber

Sintaks DDL
Tabel data

Kode contoh berikut memberikan contoh pernyataan DDL untuk membuat tabel sementara untuk tabel sumber:

-- Buat tabel sementara bernama tablestore_stream untuk tabel sumber.
CREATE TEMPORARY TABLE tablestore_stream(
    `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR
) WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_source_table',
    'tunnelName' = 'flink_source_tunnel',
    'accessId' = 'xxxxxxxxxxx',
    'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'ignoreDelete' = 'false'
);
Tabel time series

Kode contoh berikut memberikan contoh pernyataan DDL untuk membuat tabel sementara untuk tabel sumber:

-- Buat tabel sementara bernama tablestore_stream untuk tabel sumber.
CREATE TEMPORARY TABLE tablestore_stream(
    _m_name STRING,
    _data_source STRING,
    _tags STRING,
    _time BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING
) WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_source_table',
    'tunnelName' = 'flink_source_tunnel',
    'accessId' = 'xxxxxxxxxxx',
    'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx'
);

Anda dapat membaca bidang metadata dari Tunnel Service, seperti OtsRecordType dan OtsRecordTimestamp, sebagai kolom biasa di tabel sumber. Tabel berikut menjelaskan bidang-bidang tersebut.

Bidang

Bidang yang dipetakan di Realtime Compute for Apache Flink

Deskripsi

OtsRecordType

type

Jenis operasi.

OtsRecordTimestamp

timestamp

Waktu operasi data. Satuan: mikrodetik.

Catatan

Jika Anda ingin Realtime Compute for Apache Flink membaca data lengkap, atur bidang ini ke 0.

Parameter dalam klausa WITH

Parameter

Tabel yang berlaku

Wajib

Deskripsi

connector

Umum

Ya

Jenis konektor tabel sumber. Nilainya adalah ots dan tidak dapat diubah.

endPoint

Umum

Ya

Titik akhir instans Tablestore. Anda harus menggunakan titik akhir VPC. Untuk informasi selengkapnya, lihat Titik akhir.

instanceName

Umum

Ya

Nama instans Tablestore.

tableName

Umum

Ya

Nama tabel sumber di Tablestore.

tunnelName

Umum

Ya

Nama tunnel untuk tabel sumber di Tablestore. Untuk informasi cara membuat tunnel, lihat Buat tunnel.

accessId

Umum

Ya

Pasangan AccessKey (ID AccessKey dan Rahasia AccessKey) Akun Alibaba Cloud atau pengguna RAM.

Penting

Untuk melindungi pasangan AccessKey Anda, kami menyarankan Anda menggunakan variabel untuk menentukan pasangan AccessKey. Untuk informasi selengkapnya, lihat Kelola variabel.

accessKey

Umum

Ya

connectTimeout

Umum

Tidak

Periode timeout untuk konektor Tablestore terhubung ke Tablestore. Satuan: milidetik. Nilai default: 30000.

socketTimeout

Umum

Tidak

Periode timeout socket untuk konektor Tablestore terhubung ke Tablestore. Satuan: milidetik. Nilai default: 30000.

ioThreadCount

Umum

Tidak

Jumlah thread I/O. Nilai default: 4.

callbackThreadPoolSize

Umum

Tidak

Ukuran kolam thread callback. Nilai default: 4.

ignoreDelete

Tabel data

Tidak

Menentukan apakah akan mengabaikan data real-time yang dihasilkan oleh operasi delete. Nilai default: false, yang berarti data real-time yang dihasilkan oleh operasi delete tidak diabaikan.

skipInvalidData

Umum

Tidak

Menentukan apakah akan mengabaikan data kotor. Nilai default: false, yang berarti data kotor tidak diabaikan. Jika data kotor tidak diabaikan, sistem akan melaporkan error saat memproses data kotor.

Penting

Hanya Realtime Compute for Apache Flink yang menggunakan VVR 8.0.4 atau yang lebih baru yang mendukung parameter ini.

retryStrategy

Umum

Tidak

Kebijakan retry. Nilai yang valid:

  • TIME: Sistem terus melakukan retry hingga periode timeout yang ditentukan oleh parameter retryTimeoutMs berakhir. Ini adalah nilai default.

  • COUNT: Sistem terus melakukan retry hingga jumlah maksimum retry yang ditentukan oleh parameter retryCount tercapai.

retryCount

Umum

Tidak

Jumlah maksimum retry. Jika Anda mengatur parameter retryStrategy ke COUNT, Anda dapat mengonfigurasi parameter ini. Nilai default: 3.

retryTimeoutMs

Umum

Tidak

Periode timeout untuk retry. Satuan: milidetik. Jika Anda mengatur parameter retryStrategy ke TIME, Anda dapat mengonfigurasi parameter ini. Nilai default: 180000.

streamOriginColumnMapping

Umum

Tidak

Pemetaan antara nama kolom di tabel sumber dan nama kolom di tabel sementara.

Catatan

Gunakan tanda titik dua (:) untuk memisahkan nama kolom asli dan nama kolom aktual. Gunakan koma (,) untuk memisahkan beberapa pemetaan. Contoh: origin_col1:col1,origin_col2:col2.

outputSpecificRowType

Umum

Tidak

Menentukan apakah akan meneruskan jenis baris tertentu. Nilai yang valid:

  • false: tidak meneruskan jenis baris tertentu. Jenis baris semua data adalah INSERT. Ini adalah nilai default.

  • true: meneruskan jenis baris tertentu. Jenis baris data dapat berupa INSERT, DELETE, atau UPDATE_AFTER.

Pemetaan tipe data

Tipe data bidang di Tablestore

Tipe data bidang di Realtime Compute for Apache Flink

INTEGER

BIGINT

STRING

STRING

BOOLEAN

BOOLEAN

DOUBLE

DOUBLE

BINARY

BINARY

Tabel hasil

Sintaks DDL
Tabel data

Kode contoh berikut memberikan contoh pernyataan DDL untuk membuat tabel sementara untuk tabel hasil:

-- Buat tabel sementara bernama tablestore_sink untuk tabel hasil.
CREATE TEMPORARY TABLE tablestore_sink(
   `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR,
    PRIMARY KEY (`order`,orderid) NOT ENFORCED
) WITH (
    'connector' = 'ots',
    'endPoint'='https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_sink_table',
    'accessId' = 'xxxxxxxxxxx',
    'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'valueColumns'='customerid,customername'
);
Catatan

Anda harus menentukan skema kunci primer dan setidaknya satu kolom atribut untuk tabel hasil Tablestore. Data output ditambahkan ke tabel hasil Tablestore untuk memperbarui data tabel.

Tabel time series

Tabel hasil time series memerlukan empat kunci primer: _m_name, _data_source, _tags, dan _time. Anda dapat menentukan kunci primer ini dengan tiga cara: menggunakan parameter WITH, menggunakan definisi kunci primer tabel hasil, atau menggunakan kunci primer dalam format Map. Saat mendefinisikan kolom _tags, metode parameter WITH memiliki prioritas tertinggi, diikuti oleh format Map dan definisi kunci primer tabel hasil.

Gunakan parameter dalam klausa WITH

Kode contoh berikut memberikan contoh cara menggunakan parameter dalam klausa WITH untuk mendefinisikan sintaks DDL:

-- Buat tabel sementara bernama tablestore_sink untuk tabel hasil.
CREATE TEMPORARY TABLE tablestore_sink(
    measurement STRING,
    datasource STRING,
    tag_a STRING,
    `time` BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING,
    tag_b STRING,
    tag_c STRING,
    tag_d STRING,
    tag_e STRING,
    tag_f STRING,
    PRIMARY KEY(measurement, datasource, tag_a, `time`) NOT ENFORCED
) 
WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_sink_table',
    'accessId' ='xxxxxxxxxxx',
    'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'storageType' = 'TIMESERIES',
    'timeseriesSchema' = '{"measurement":"_m_name", "datasource":"_data_source", "tag_a":"_tags", "tag_b":"_tags", "tag_c":"_tags", "tag_d":"_tags", "tag_e":"_tags", "tag_f":"_tags", "time":"_time"}'
);

-- Masukkan data dari tabel sumber ke tabel hasil.
INSERT INTO tablestore_sink
    select 
    measurement,
    datasource,
    tag_a,
    `time`,
    binary_value,
    bool_value,
    double_value,
    long_value,
    string_value,
    tag_b,
    tag_c,
    tag_d,
    tag_e,
    tag_f
    from tablestore_stream;
Gunakan kunci primer dalam format Map

Kode contoh berikut menunjukkan cara menggunakan kunci primer dalam format Map untuk mendefinisikan sintaks DDL:

Catatan

Tablestore menyediakan tipe data Map Flink untuk mempermudah pembuatan kolom _tags pada tabel time series dalam model TimeSeries. Tipe data Map mendukung operasi pemetaan, seperti penggantian nama kolom dan fungsi sederhana. Saat menggunakan Map, pastikan kolom kunci primer _tags berada di posisi ketiga.

-- Buat tabel sementara bernama tablestore_sink untuk tabel hasil.
CREATE TEMPORARY TABLE tablestore_sink(
    measurement STRING,
    datasource STRING,
    tags Map<String, String>, 
    `time` BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING,
    PRIMARY KEY(measurement, datasource, tags, `time`) NOT ENFORCED
)
WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_sink_table',
    'accessId' ='xxxxxxxxxxx',
    'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'storageType' = 'TIMESERIES'
);

-- Masukkan data dari tabel sumber ke tabel hasil.
INSERT INTO tablestore_sink
    select 
    measurement,
    datasource,
    MAP[`tag_a`, `tag_b`, `tag_c`, `tag_d`, `tag_e`, `tag_f`] AS tags,
    `time`,
    binary_value,
    bool_value,
    double_value,
    long_value,
    string_value
    from timeseries_source;
Gunakan kunci primer tabel SINK

Kode contoh berikut memberikan contoh cara menggunakan kunci primer tabel SINK untuk mendefinisikan sintaks DDL. Kolom kunci primer pertama adalah kolom _m_name, yang menentukan nama pengukuran. Kolom kunci primer kedua adalah kolom _data_source, yang menentukan sumber data. Kolom kunci primer terakhir adalah kolom _time, yang menentukan timestamp. Kolom kunci primer di tengah adalah kolom _tags, yang menentukan tag time series.

-- Buat tabel sementara bernama tablestore_sink untuk tabel hasil.
CREATE TEMPORARY TABLE tablestore_sink(
    measurement STRING,
    datasource STRING,
    tag_a STRING,
    `time` BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING,
    tag_b STRING,
    tag_c STRING,
    tag_d STRING,
    tag_e STRING,
    tag_f STRING
    PRIMARY KEY(measurement, datasource, tag_a, tag_b, tag_c, tag_d, tag_e, tag_f, `time`) NOT ENFORCED
) WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_sink_table',
    'accessId' ='xxxxxxxxxxx',
    'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'storageType' = 'TIMESERIES'
);

-- Masukkan data dari tabel sumber ke tabel hasil.
INSERT INTO tablestore_sink
    select 
    measurement,
    datasource,
    tag_a,
    tag_b,
    tag_c,
    tag_d,
    tag_e,
    tag_f,
    `time`,
    binary_value,
    bool_value,
    double_value,
    long_value,
    string_value
    from timeseries_source;
Parameter dalam klausa WITH

Parameter

Tabel yang berlaku

Wajib

Deskripsi

connector

Umum

Ya

Jenis konektor tabel hasil. Nilainya adalah ots dan tidak dapat diubah.

endPoint

Umum

Ya

Titik akhir instans Tablestore. Anda harus menggunakan titik akhir VPC. Untuk informasi selengkapnya, lihat Titik akhir.

instanceName

Umum

Ya

Nama instans Tablestore.

tableName

Umum

Ya

Nama tabel time series di Tablestore.

accessId

Umum

Ya

Pasangan AccessKey (ID AccessKey dan Rahasia AccessKey) Akun Alibaba Cloud atau pengguna RAM.

Penting

Untuk melindungi pasangan AccessKey Anda, kami menyarankan Anda menggunakan variabel untuk menentukan pasangan AccessKey. Untuk informasi selengkapnya, lihat Kelola variabel.

accessKey

Umum

Ya

valueColumns

Tabel data

Ya

Nama kolom tempat menulis data. Pisahkan beberapa nama kolom dengan koma (,). Contoh: ID,NAME.

storageType

Umum

Tidak

Penting

Jika Anda menggunakan tabel time series sebagai tabel hasil, atur parameter ini ke TIMESERIES.

Jenis tabel. Nilai yang valid:

  • WIDE_COLUMN: tabel data. Ini adalah nilai default.

  • TIMESERIES: tabel time series.

timeseriesSchema

Tabel time series

Tidak

Penting

Saat Anda menggunakan tabel time series sebagai tabel hasil, jika Anda menggunakan parameter dalam klausa WITH untuk menentukan kunci primer tabel time series, Anda harus mengonfigurasi parameter ini.

Kolom yang ingin Anda tentukan sebagai kolom kunci primer tabel time series.

  • Tentukan kunci primer tabel time series sebagai pasangan kunci-nilai dalam format JSON. Contoh: {"measurement":"_m_name","datasource":"_data_source","tag_a":"_tags","tag_b":"_tags","tag_c":"_tags","tag_d":"_tags","tag_e":"_tags","tag_f":"_tags","time":"_time"}.

  • Tipe kolom kunci primer yang Anda tentukan harus sama dengan tipe kolom kunci primer di tabel time series. Kolom kunci primer tags dapat terdiri dari beberapa kolom.

connectTimeout

Umum

Tidak

Periode timeout untuk konektor Tablestore terhubung ke Tablestore. Satuan: milidetik. Nilai default: 30000.

socketTimeout

Umum

Tidak

Periode timeout socket untuk konektor Tablestore terhubung ke Tablestore. Satuan: milidetik. Nilai default: 30000.

ioThreadCount

Umum

Tidak

Jumlah thread I/O. Nilai default: 4.

callbackThreadPoolSize

Umum

Tidak

Ukuran kolam thread callback. Nilai default: 4.

retryIntervalMs

Umum

Tidak

Interval antar retry. Satuan: milidetik. Nilai default: 1000.

maxRetryTimes

Umum

Tidak

Jumlah maksimum retry. Nilai default: 10.

bufferSize

Umum

Tidak

Jumlah maksimum catatan data yang dapat disimpan di buffer sebelum data ditulis ke tabel hasil. Nilai default: 5000, yang berarti data ditulis ke tabel hasil ketika jumlah catatan data di buffer mencapai 5.000.

batchWriteTimeoutMs

Umum

Tidak

Periode timeout penulisan. Satuan: milidetik. Nilai default: 5000, yang berarti semua data di buffer ditulis ke tabel hasil ketika jumlah catatan data di buffer tidak mencapai nilai yang ditentukan oleh parameter bufferSize dalam waktu 5.000 milidetik.

batchSize

Umum

Tidak

Jumlah catatan data yang dapat ditulis ke tabel hasil secara bersamaan. Nilai default: 100. Nilai maksimum: 200.

ignoreDelete

Umum

Tidak

Menentukan apakah akan mengabaikan data real-time yang dihasilkan oleh operasi delete. Nilai default: false, yang berarti data real-time yang dihasilkan oleh operasi delete tidak diabaikan.

Penting

Jika Anda menggunakan tabel data sebagai tabel sumber, Anda dapat mengonfigurasi parameter ini sesuai kebutuhan bisnis Anda.

autoIncrementKey

Tabel data

Tidak

Nama kolom kunci primer auto-increment tabel hasil saat tabel hasil berisi kolom kunci primer auto-increment. Jika tabel hasil tidak memiliki kolom kunci primer auto-increment, Anda tidak perlu mengonfigurasi parameter ini.

Penting

Hanya Realtime Compute for Apache Flink yang menggunakan VVR 8.0.4 atau yang lebih baru yang mendukung parameter ini.

overwriteMode

Umum

Tidak

Mode overwrite data. Nilai yang valid:

  • PUT: Data ditulis ke tabel Tablestore dalam mode PUT. Ini adalah nilai default.

  • UPDATE: Data ditulis ke tabel Tablestore dalam mode UPDATE.

Catatan

Hanya mode UPDATE yang didukung dalam mode kolom dinamis.

defaultTimestampInMillisecond

Umum

Tidak

Timestamp default yang digunakan untuk menulis data ke tabel Tablestore. Jika Anda mengosongkan parameter ini, timestamp waktu sistem saat ini akan digunakan.

dynamicColumnSink

Umum

Tidak

Menentukan apakah akan mengaktifkan mode kolom dinamis. Nilai default: false, yang berarti mode kolom dinamis dinonaktifkan.

Catatan
  • Mode kolom dinamis cocok untuk skenario di mana tidak ada kolom yang ditentukan untuk tabel dan kolom data dimasukkan ke tabel berdasarkan status deployment. Anda harus menentukan beberapa kolom pertama sebagai kolom kunci primer dalam pernyataan pembuatan tabel. Nilai kolom kedua hingga kolom terakhir digunakan sebagai variabel nama kolom, nilai kolom terakhir digunakan sebagai nilai variabel nama kolom, dan tipe data kolom kedua hingga kolom terakhir harus berupa String.

  • Jika Anda mengaktifkan mode kolom dinamis, fitur kolom kunci primer auto-increment tidak didukung dan Anda harus mengatur parameter overwriteMode ke UPDATE.

checkSinkTableMeta

Umum

Tidak

Menentukan apakah akan memeriksa metadata tabel hasil. Nilai default: true, yang berarti sistem memeriksa apakah kolom kunci primer tabel Tablestore sama dengan kolom kunci primer yang ditentukan dalam pernyataan pembuatan tabel.

enableRequestCompression

Umum

Tidak

Menentukan apakah akan mengaktifkan kompresi data selama penulisan data. Nilai default: false, yang berarti kompresi data dinonaktifkan selama penulisan data.

Pemetaan tipe data

Tipe data bidang di Realtime Compute for Apache Flink

Tipe data bidang di Tablestore

BINARY

BINARY

VARBINARY

BINARY

CHAR

STRING

VARCHAR

STRING

TINYINT

INTEGER

SMALLINT

INTEGER

INTEGER

INTEGER

BIGINT

INTEGER

FLOAT

DOUBLE

DOUBLE

DOUBLE

BOOLEAN

BOOLEAN

Contoh pernyataan SQL

Sinkronisasi data dari tabel sumber ke tabel hasil
Sinkronisasi data dari tabel data ke tabel time series

Baca data dari tabel data bernama flink_source_table dan tulis data tersebut ke tabel time series bernama flink_sink_table.

Pernyataan SQL contoh:

-- Buat tabel sementara bernama tablestore_stream untuk tabel sumber.
CREATE TEMPORARY TABLE tablestore_stream(
    measurement STRING,
    datasource STRING,
    tag_a STRING,
    `time` BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING,
    tag_b STRING,
    tag_c STRING,
    tag_d STRING,
    tag_e STRING,
    tag_f STRING
) 
WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_source_table',
    'tunnelName' = 'flink_source_tunnel',
    'accessId' = 'xxxxxxxxxxx',
    'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'ignoreDelete' = 'false' 
);

-- Buat tabel sementara bernama tablestore_sink untuk tabel hasil dengan menggunakan parameter dalam klausa WITH.
CREATE TEMPORARY TABLE tablestore_sink(
     measurement STRING,
     datasource STRING,
     tag_a STRING,
     `time` BIGINT,
     binary_value BINARY,
     bool_value BOOLEAN,
     double_value DOUBLE,
     long_value BIGINT,
     string_value STRING,
     tag_b STRING,
     tag_c STRING,
     tag_d STRING,
     tag_e STRING,
     tag_f STRING,
     PRIMARY KEY(measurement, datasource, tag_a, `time`) NOT ENFORCED
 ) WITH (
     'connector' = 'ots',
     'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
     'instanceName' = 'xxx',
     'tableName' = 'flink_sink_table',
     'accessId' = 'xxxxxxxxxxx',
     'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
     'storageType' = 'TIMESERIES',
     'timeseriesSchema' = '{"measurement":"_m_name","datasource":"_data_source","tag_a":"_tags","tag_b":"_tags","tag_c":"_tags","tag_d":"_tags","tag_e":"_tags","tag_f":"_tags","time":"_time"}'
 );
 
-- Masukkan data dari tabel sumber ke tabel hasil.
INSERT INTO tablestore_sink
    select 
    measurement,
    datasource,
    tag_a,
    `time`,
    binary_value,
    bool_value,
    double_value,
    long_value,
    string_value,
    tag_b,
    tag_c,
    tag_d,
    tag_e,
    tag_f
    from tablestore_stream;
Sinkronisasi data dari tabel time series ke tabel data

Baca data dari tabel time series bernama flink_source_table dan tulis data tersebut ke tabel data bernama flink_sink_table.

Pernyataan SQL contoh:

-- Buat tabel sementara bernama tablestore_stream untuk tabel sumber.
CREATE TEMPORARY TABLE tablestore_stream(
    _m_name STRING,
    _data_source STRING,
    _tags STRING,
    _time BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING
) WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_source_table',
    'tunnelName' = 'flink_source_tunnel',
    'accessId' = 'xxxxxxxxxxx',
    'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx'
);

-- Buat tabel sementara bernama print_table untuk tabel hasil. 
CREATE TEMPORARY TABLE tablestore_target(
    measurement STRING,
    datasource STRING,
    tags STRING,
    `time` BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING,
    PRIMARY KEY (measurement,datasource, tags, `time`) NOT ENFORCED
) WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_sink_table',
    'accessId' = 'xxxxxxxxxxx',
    'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'valueColumns'='binary_value,bool_value,double_value,long_value,string_value'
);

-- Masukkan data dari tabel sumber ke tabel hasil.
INSERT INTO tablestore_target
SELECT
    _m_name,
    _data_source,
    _tags,
    _time,
    binary_value,
    bool_value,
    double_value,
    long_value,
    string_value
    from tablestore_stream;
Baca data dari tabel sumber dan cetak data di konsol Tablestore

Baca data dari tabel sumber bernama flink_source_table secara batch. Anda dapat menggunakan fitur debugging deployment untuk mensimulasikan jalannya deployment. Hasil debugging ditampilkan di bagian bawah editor SQL.

Pernyataan SQL contoh:

-- Buat tabel sementara bernama tablestore_stream untuk tabel data sumber.
CREATE TEMPORARY TABLE tablestore_stream(
    `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR
) WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_source_table',
    'tunnelName' = 'flink_source_tunnel',
    'accessId' = 'xxxxxxxxxxx',
    'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'ignoreDelete' = 'false' 
);

-- Baca data dari tabel sumber.
SELECT * FROM tablestore_stream LIMIT 100;
Baca data dari tabel sumber dan cetak data ke log TaskManager

Baca data dari tabel sumber bernama flink_source_table dan cetak hasilnya ke log TaskManager dengan menggunakan konektor Print.

Pernyataan SQL contoh:

-- Buat tabel sementara bernama tablestore_stream untuk tabel data sumber.
CREATE TEMPORARY TABLE tablestore_stream(
    `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR
) WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_source_table',
    'tunnelName' = 'flink_source_tunnel',
    'accessId' = 'xxxxxxxxxxx',
    'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'ignoreDelete' = 'false' 
);

-- Buat tabel sementara bernama print_table untuk tabel hasil. 
CREATE TEMPORARY TABLE print_table(
   `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR
) WITH (
  'connector' = 'print',   -- Gunakan konektor Print.
  'logger' = 'true'        -- Tampilkan hasil komputasi di konsol pengembangan Realtime Compute for Apache Flink.
);

-- Cetak bidang tabel sumber.
INSERT INTO print_table
SELECT `order`,orderid,customerid,customername from tablestore_stream;

Lampiran 2: Konfigurasikan dependensi VVR

  1. Unduh dependensi VVR.

  2. Unggah dependensi VVR.

    1. Masuk ke Konsol Realtime Compute for Apache Flink.

    2. Temukan ruang kerja target, lalu klik Console pada kolom Actions.

    3. Di panel navigasi kiri, klik Artifacts.

    4. Pada tab Artifacts, klik Upload Artifact, lalu pilih paket JAR dependensi VVR.

  3. Di sisi kanan editor SQL untuk pekerjaan target, klik tab Configurations. Di kolom Additional Dependencies, pilih paket JAR dependensi VVR.