Topik ini menjelaskan cara memproses data Tablestore menggunakan Realtime Compute for Apache Flink. Tabel data atau tabel time series di Tablestore dapat digunakan sebagai tabel sumber atau tabel hasil dalam pemrosesan data menggunakan Realtime Compute for Apache Flink.
Prasyarat
Tablestore telah diaktifkan dan sebuah instans telah dibuat. Untuk informasi selengkapnya, lihat Aktifkan Tablestore dan buat instans.
Anda telah membuat tabel sumber, tabel hasil, dan tunnel untuk tabel sumber. Untuk informasi selengkapnya, lihat Operasi pada tabel data, Operasi pada tabel time series, dan Buat tunnel.
Ruang kerja Realtime Compute for Apache Flink telah dibuat. Untuk informasi selengkapnya, lihat Aktifkan Realtime Compute for Apache Flink.
PentingRuang kerja Realtime Compute for Apache Flink dan instans Tablestore harus berada di wilayah yang sama. Untuk informasi mengenai wilayah yang didukung oleh Realtime Compute for Apache Flink, lihat Wilayah.
Pasangan AccessKey telah diperoleh.
PentingUntuk alasan keamanan, kami menyarankan Anda menggunakan fitur Tablestore sebagai pengguna Resource Access Management (RAM). Untuk informasi selengkapnya, lihat Gunakan pasangan AccessKey pengguna RAM untuk mengakses Tablestore.
Kembangkan pekerjaan komputasi real-time
Langkah 1: Buat draf SQL
Buka halaman pembuatan draf.
Masuk ke Konsol Realtime Compute for Apache Flink.
Pada kolom Actions ruang kerja target, klik Console.
Di panel navigasi kiri, klik Development > ETL.
Klik New. Pada kotak dialog New Draft, pilih Blank Stream Draft, lalu klik Next.
CatatanRealtime Compute for Apache Flink menyediakan berbagai templat kode dan mendukung sinkronisasi data. Setiap templat dirancang untuk skenario tertentu serta mencakup contoh kode dan instruksi penggunaan. Anda dapat mengklik templat tersebut untuk mempelajari fitur dan sintaks Realtime Compute for Apache Flink serta menerapkan logika bisnis Anda. Untuk informasi selengkapnya, lihat Templat kode dan Templat sinkronisasi data.
Masukkan Job Information.
Parameter
Deskripsi
Contoh
File Name
Nama draf yang ingin Anda buat.
CatatanNama draf harus unik dalam proyek saat ini.
flink-test
Location
Folder tempat file kode draf disimpan.
Anda juga dapat mengklik ikon
di sebelah kanan folder yang ada untuk membuat subfolder. Draft
Engine version
Versi mesin Flink yang ingin digunakan oleh draf saat ini. Untuk informasi selengkapnya tentang versi mesin, lihat Catatan rilis dan Versi mesin.
vvr-8.0.10-flink-1.17
Klik Create.
Langkah 2: Tulis draf SQL
Pada contoh dalam langkah ini, kode ditulis untuk menyinkronkan data dari tabel data ke tabel data lainnya. Untuk informasi lebih lanjut tentang contoh pernyataan SQL, lihat Contoh pernyataan SQL.
Buat tabel sementara untuk tabel sumber dan tabel hasil.
Untuk informasi selengkapnya, lihat Lampiran 1: Konektor Tablestore.
-- Buat tabel sementara bernama tablestore_stream untuk tabel sumber. CREATE TEMPORARY TABLE tablestore_stream( `order` VARCHAR, orderid VARCHAR, customerid VARCHAR, customername VARCHAR ) WITH ( 'connector' = 'ots', -- Tentukan jenis konektor tabel sumber. Nilainya adalah ots dan tidak dapat diubah. 'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com', -- Tentukan titik akhir virtual private cloud (VPC) instans Tablestore. 'instanceName' = 'xxx', -- Tentukan nama instans Tablestore. 'tableName' = 'flink_source_table', -- Tentukan nama tabel sumber. 'tunnelName' = 'flink_source_tunnel', -- Tentukan nama tunnel yang dibuat untuk tabel sumber. 'accessId' = 'xxxxxxxxxxx', -- Tentukan ID AccessKey Akun Alibaba Cloud atau pengguna RAM. 'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx', -- Tentukan Rahasia AccessKey Akun Alibaba Cloud atau pengguna RAM. 'ignoreDelete' = 'false' -- Tentukan apakah akan mengabaikan data real-time yang dihasilkan oleh operasi delete. Dalam contoh ini, parameter ini diatur ke false. ); -- Buat tabel sementara bernama tablestore_sink untuk tabel hasil. CREATE TEMPORARY TABLE tablestore_sink( `order` VARCHAR, orderid VARCHAR, customerid VARCHAR, customername VARCHAR, PRIMARY KEY (`order`,orderid) NOT ENFORCED -- Tentukan kunci primer. ) WITH ( 'connector' = 'ots', -- Tentukan jenis konektor tabel hasil. Nilainya adalah ots dan tidak dapat diubah. 'endPoint'='https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com', -- Tentukan titik akhir VPC instans Tablestore. 'instanceName' = 'xxx', -- Tentukan nama instans Tablestore. 'tableName' = 'flink_sink_table', -- Tentukan nama tabel hasil. 'accessId' = 'xxxxxxxxxxx', -- Tentukan ID AccessKey Akun Alibaba Cloud atau pengguna RAM. 'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx', -- Tentukan Rahasia AccessKey Akun Alibaba Cloud atau pengguna RAM. 'valueColumns'='customerid,customername' --Tentukan nama kolom yang ingin Anda masukkan ke tabel hasil. );Tulis logika draft.
Pernyataan SQL berikut memberikan contoh cara memasukkan data dari tabel sumber ke tabel hasil:
-- Masukkan data dari tabel sumber ke tabel hasil. INSERT INTO tablestore_sink SELECT `order`, orderid, customerid, customername FROM tablestore_stream;
Langkah 3: (Opsional) Lihat informasi konfigurasi
Di tab sebelah kanan editor SQL, Anda dapat melihat konfigurasi atau mengatur parameter. Tabel berikut menjelaskan parameter tersebut.
Nama tab | Deskripsi |
Configurations |
|
Structure |
|
Versions | Menampilkan riwayat versi draf. Untuk detail fitur di kolom Actions, lihat Kelola versi pekerjaan. |
Langkah 4: (Opsional) Lakukan pemeriksaan sintaks
Validasi memeriksa semantik SQL pekerjaan, konektivitas jaringan, dan metadata tabel. Anda juga dapat mengklik SQL Advice di area hasil untuk melihat potensi risiko SQL dan saran optimasi.
Di pojok kanan atas editor SQL, klik Validate.
Di kotak dialog Validation, klik Confirm.
Langkah 5: (Opsional) Debug draf
Anda dapat menggunakan fitur debugging untuk mensimulasikan jalannya deployment, memeriksa output, dan memverifikasi logika bisnis pernyataan SELECT dan INSERT. Fitur ini meningkatkan efisiensi pengembangan dan mengurangi risiko kualitas data yang buruk.
Di pojok kanan atas editor SQL, klik Debug.
Di kotak dialog Debug, pilih kluster sesi, lalu klik Next.
Jika tidak tersedia kluster, buat kluster sesi yang menggunakan versi mesin yang sama dengan draf SQL dan pastikan kluster sesi tersebut sedang berjalan. Untuk informasi selengkapnya, lihat Buat kluster sesi.
Konfigurasikan data debugging.
Jika Anda menggunakan data online, lewati langkah ini.
Jika ingin menggunakan data debugging, klik Download Debugging Data Template, isi templat dengan data Anda, lalu unggah file tersebut. Untuk informasi selengkapnya, lihat Debugging pekerjaan.
Setelah mengonfigurasi data, klik OK.
Langkah 6: Deploy draf
Di pojok kanan atas editor SQL, klik Deploy. Pada kotak dialog Deploy New Version, konfigurasikan parameter deployment, lalu klik OK.
Kluster sesi cocok untuk lingkungan non-produksi, seperti pengembangan dan pengujian. Anda dapat mendeploy atau mendebug draf di kluster sesi untuk meningkatkan pemanfaatan resource JobManager dan mempercepat startup deployment. Namun, kami tidak menyarankan mendeploy draf untuk lingkungan produksi di kluster sesi karena dapat menyebabkan masalah stabilitas.
Langkah 7: Mulai deployment untuk draf dan lihat hasil komputasi
Di panel navigasi sebelah kiri, klik O&M > Deployments.
Pada kolom Actions untuk deployment target, klik Start.
Pilih Start with no state, lalu klik Start. Status Running menunjukkan bahwa deployment berjalan dengan benar. Untuk informasi selengkapnya mengenai parameter startup, lihat Mulai pekerjaan.
CatatanDisarankan untuk mengonfigurasi dua core CPU dan memori 4 GB untuk setiap TaskManager di Realtime Compute for Apache Flink guna memaksimalkan kapasitas komputasi. Satu TaskManager mampu menulis hingga 10.000 baris per detik.
Jika jumlah partisi pada tabel sumber besar, disarankan mengatur konkurensi kurang dari 16 di Realtime Compute for Apache Flink karena laju penulisan meningkat secara linear seiring dengan peningkatan konkurensi.
Di halaman Deployments, lihat hasil komputasi.
Di halaman O&M > Deployments, klik nama deployment target.
Di tab Job logs, buka tab Running task managers, lalu klik tugas target pada kolom Path,ID.
Klik Logs untuk melihat informasi log.
(Opsional) Batalkan deployment.
Jika Anda mengubah kode SQL untuk deployment, menambahkan atau menghapus parameter dalam klausa WITH, atau mengubah versi deployment, Anda harus mendeploy ulang draf tersebut dengan membatalkan deployment yang ada lalu memulai ulang deployment agar perubahan diterapkan. Jika deployment gagal dan tidak dapat memulihkan data state, atau jika Anda ingin memperbarui pengaturan parameter yang tidak berlaku secara dinamis, Anda juga harus membatalkan lalu memulai ulang deployment. Untuk informasi selengkapnya tentang cara membatalkan deployment, lihat Batalkan deployment.