Topik ini menjelaskan cara menggunakan Alibaba Cloud Realtime Compute for Apache Flink untuk berlangganan data penuh AnalyticDB for PostgreSQL.
Informasi latar belakang
AnalyticDB for PostgreSQL adalah layanan gudang data pemrosesan paralel masif (MPP) yang menyediakan analisis real-time untuk sejumlah besar data. Realtime Compute for Apache Flink adalah platform analitik data besar real-time yang dibangun berdasarkan Apache Flink. Platform ini menyediakan berbagai konektor hulu dan hilir untuk memenuhi persyaratan skenario bisnis yang berbeda serta menawarkan layanan komputasi real-time yang efisien dan fleksibel. Realtime Compute for Apache Flink dapat membaca data dari AnalyticDB for PostgreSQL, memanfaatkan sepenuhnya kemampuan AnalyticDB for PostgreSQL dan meningkatkan efisiensi serta akurasi analitik data.
Batasan
Realtime Compute for Apache Flink tidak mendukung pembacaan data dari AnalyticDB for PostgreSQL dalam mode Serverless.
Hanya Realtime Compute for Apache Flink dengan Ververica Runtime (VVR) versi 6.0.0 atau lebih baru yang mendukung konektor AnalyticDB for PostgreSQL.
Hanya Realtime Compute for Apache Flink dengan VVR versi 8.0.1 atau lebih baru yang mendukung AnalyticDB for PostgreSQL versi 7.0.
CatatanJika Anda menggunakan konektor kustom, lakukan operasi sesuai dengan instruksi yang dijelaskan dalam Kelola Konektor Kustom.
Prasyarat
Sebuah ruang kerja Flink yang sepenuhnya dikelola telah dibuat. Untuk informasi lebih lanjut, lihat Aktifkan Realtime Compute for Apache Flink.
Sebuah instance AnalyticDB for PostgreSQL telah dibuat. Untuk informasi lebih lanjut, lihat Buat Sebuah Instance.
Instance AnalyticDB for PostgreSQL dan ruang kerja Flink yang sepenuhnya dikelola berada dalam virtual private cloud (VPC) yang sama.
Langkah 1: Konfigurasikan instance AnalyticDB for PostgreSQL
- Masuk ke konsol AnalyticDB for PostgreSQL.
Tambahkan blok CIDR ruang kerja Flink yang sepenuhnya dikelola ke daftar putih alamat IP instance AnalyticDB for PostgreSQL. Untuk informasi lebih lanjut, lihat Konfigurasikan Daftar Putih Alamat IP.
Klik Log On to Database. Untuk informasi lebih lanjut tentang cara terhubung ke database, lihat Koneksi Klien.
Buat tabel dimensi bernama adbpg_dim_table pada instance AnalyticDB for PostgreSQL dan sisipkan 50 baris data ke dalam tabel.
Pernyataan contoh:
-- Buat tabel dimensi bernama adbpg_dim_table. CREATE TABLE adbpg_dim_table( id int, username text, PRIMARY KEY(id) ); -- Sisipkan 50 baris data ke dalam tabel adbpg_dim_table. Nilai bidang id adalah bilangan bulat dari 1 hingga 50, dan nilai bidang username adalah teks untuk jumlah baris saat ini yang diikuti oleh string username. INSERT INTO adbpg_dim_table(id, username) SELECT i, 'username'||i::text FROM generate_series(1, 50) AS t(i);Buat tabel hasil bernama adbpg_sink_table tempat Realtime Compute for Apache Flink menulis data hasil.
CREATE TABLE adbpg_sink_table( id int, username text, score int );
Langkah 2: Buat draft Realtime Compute for Apache Flink
Masuk ke konsol Realtime Compute for Apache Flink. Pada tab Fully Managed Flink, temukan ruang kerja yang ingin Anda kelola dan klik Console di kolom Actions.
Di panel navigasi sisi kiri, klik SQL Editor. Di sudut kiri atas halaman SQL Editor, klik New. Dalam kotak dialog New Draft, klik Blank Stream Draft pada tab SQL Scripts dan klik Next.
Dalam kotak dialog New Draft, konfigurasikan parameter yang dijelaskan dalam tabel berikut.
Parameter
Deskripsi
Contoh
Name
Nama draft yang ingin Anda buat.
CatatanNama draft harus unik dalam proyek saat ini.
adbpg-test
Location
Folder tempat file kode draft disimpan.
Anda juga dapat mengklik ikon
di sebelah kanan folder yang ada untuk membuat subfolder. Draft
Engine Version
Versi engine Flink yang digunakan oleh draft. Untuk informasi lebih lanjut tentang versi engine, pemetaan versi, dan titik waktu penting dalam siklus hidup setiap versi, lihat Versi Engine.
vvr-6.0.7-flink-1.15
Klik Create.
Langkah 3: Tulis kode draft dan terapkan draft
Salin kode draft berikut ke editor kode:
--- Buat tabel sumber Datagen. CREATE TEMPORARY TABLE datagen_source ( id INT, score INT ) WITH ( 'connector' = 'datagen', 'fields.id.kind'='sequence', 'fields.id.start'='1', 'fields.id.end'='50', 'fields.score.kind'='random', 'fields.score.min'='70', 'fields.score.max'='100' ); -- Buat tabel dimensi AnalyticDB for PostgreSQL. CREATE TEMPORARY TABLE dim_adbpg( id int, username varchar, PRIMARY KEY(id) not ENFORCED ) WITH( 'connector' = 'adbpg', 'url' = 'jdbc:postgresql://gp-2ze****3tysk255b5-master.gpdb.rds.aliyuncs.com:5432/flinktest', 'tablename' = 'adbpg_dim_table', 'username' = 'flink****test', 'password' = '*******', 'maxJoinRows'='100', 'maxRetryTimes'='1', 'cache'='lru', 'cacheSize'='1000' ); -- Buat tabel hasil AnalyticDB for PostgreSQL. CREATE TEMPORARY TABLE sink_adbpg ( id int, username varchar, score int ) WITH ( 'connector' = 'adbpg', 'url' = 'jdbc:postgresql://gp-2ze****3tysk255b5-master.gpdb.rds.aliyuncs.com:5432/flinktest', 'tablename' = 'adbpg_sink_table', 'username' = 'flink****test', 'password' = '******', 'maxRetryTimes' = '2', 'batchsize' = '5000', 'conflictMode' = 'ignore', 'writeMode' = 'insert', 'retryWaitTime' = '200' ); -- Masukkan hasil gabungan tabel dimensi dan tabel sumber ke dalam tabel hasil AnalyticDB for PostgreSQL. INSERT INTO sink_adbpg SELECT ts.id,ts.username,ds.score FROM datagen_source AS ds join dim_adbpg FOR SYSTEM_TIME AS OF PROCTIME() AS ts on ds.id = ts.id;Modifikasi parameter yang dijelaskan dalam tabel berikut berdasarkan kebutuhan bisnis Anda.
Parameter
Diperlukan
Deskripsi
URL
Ya
URL Java Database Connectivity (JDBC) yang digunakan untuk terhubung ke instance AnalyticDB for PostgreSQL. URL JDBC dalam format
jdbc:postgresql://<Titik akhir internal>:<Nomor port>/<Nama database>. Contoh:jdbc:postgresql://gp-xxxxxx.gpdb.cn-chengdu.rds.aliyuncs.com:3432/postgres.tablename
Ya
Nama tabel dalam database AnalyticDB for PostgreSQL.
username
Ya
Nama akun database yang digunakan untuk terhubung ke database AnalyticDB for PostgreSQL.
password
Ya
Kata sandi akun database AnalyticDB for PostgreSQL.
CatatanUntuk informasi lebih lanjut tentang parameter dan pemetaan tipe data, lihat Konektor AnalyticDB for PostgreSQL.
Di sudut kanan atas halaman SQL Editor, klik Validate untuk melakukan pemeriksaan sintaksis.
Di sudut kanan atas halaman SQL Editor, klik Deploy.
Di halaman Deployments, temukan penyebaran yang diinginkan dan klik Start di kolom Actions.
Langkah 4: Kueri data yang ditulis Realtime Compute for Apache Flink ke tabel hasil
- Masuk ke konsol AnalyticDB for PostgreSQL.
Klik Log On to Database. Untuk informasi lebih lanjut tentang cara terhubung ke database, lihat Koneksi Klien.
Eksekusi pernyataan berikut untuk mengkueri data yang ditulis Realtime Compute for Apache Flink ke tabel hasil:
SELECT * FROM adbpg_sink_table;