Topik ini menjelaskan cara menggunakan Alibaba Cloud Realtime Compute for Apache Flink untuk membaca data dari dan menulis data ke AnalyticDB for PostgreSQL.
Informasi latar belakang
AnalyticDB for PostgreSQL adalah gudang data berbasis arsitektur pemrosesan paralel masif (MPP) yang menyediakan layanan analisis daring untuk volume data besar. Realtime Compute for Apache Flink adalah platform analitik big data real-time berbasis Apache Flink yang menyediakan berbagai konektor hulu dan hilir untuk mendukung beragam skenario bisnis serta menawarkan layanan komputasi real-time yang efisien dan fleksibel. Realtime Compute for Apache Flink dapat membaca data dari AnalyticDB for PostgreSQL, sehingga memanfaatkan sepenuhnya keunggulan AnalyticDB for PostgreSQL guna meningkatkan efisiensi dan akurasi analitik data.
Batasan
Realtime Compute for Apache Flink tidak dapat membaca data dari AnalyticDB for PostgreSQL dalam mode arsitektur tanpa server.
Hanya Realtime Compute for Apache Flink yang menggunakan Ververica Runtime (VVR) versi 6.0.0 atau lebih baru yang mendukung konektor AnalyticDB for PostgreSQL.
Hanya Realtime Compute for Apache Flink yang menggunakan VVR versi 8.0.1 atau lebih baru yang mendukung AnalyticDB for PostgreSQL V7.0.
CatatanJika Anda menggunakan konektor kustom, ikuti petunjuk dalam Kelola konektor kustom.
Prasyarat
Instans AnalyticDB for PostgreSQL dan ruang kerja Flink yang sepenuhnya dikelola berada dalam virtual private cloud (VPC) yang sama.
CatatanJika instans AnalyticDB for PostgreSQL dan ruang kerja Flink yang sepenuhnya dikelola tidak berada dalam VPC yang sama, ikuti petunjuk dalam Bagaimana Flink yang sepenuhnya dikelola mengakses layanan lintas VPC?
Ruang kerja Flink yang sepenuhnya dikelola telah dibuat. Untuk informasi selengkapnya, lihat Aktifkan Flink yang sepenuhnya dikelola.
Instans AnalyticDB for PostgreSQL dan akun telah dibuat. Untuk informasi selengkapnya, lihat Buat instans dan Buat akun istimewa.
Langkah 1: Konfigurasi daftar putih dan siapkan data
- Masuk ke Konsol AnalyticDB for PostgreSQL.
Tambahkan blok CIDR dari ruang kerja Flink yang sepenuhnya dikelola ke daftar putih instans AnalyticDB for PostgreSQL.
Lihat blok CIDR dari vSwitch tempat ruang kerja Flink yang sepenuhnya dikelola berada. Untuk informasi selengkapnya, lihat Bagaimana cara mengonfigurasi daftar putih?.
Tambahkan blok CIDR ruang kerja Flink yang sepenuhnya dikelola ke daftar putih instans AnalyticDB for PostgreSQL tujuan. Untuk informasi selengkapnya, lihat Prosedur.
CatatanJika Anda mengakses instans AnalyticDB for PostgreSQL melalui Internet, tambahkan Alamat IP publik ke daftar putih.
Di pojok kanan atas halaman detail instans, klik Log On to Database dan masukkan nama pengguna serta kata sandi. Untuk informasi selengkapnya tentang cara mengakses database, lihat Gunakan alat klien untuk terhubung ke instans.
Buat tabel bernama adbpg_dim_table di database tujuan instans tersebut dan masukkan 50 baris data ke dalam tabel tersebut.
Pernyataan contoh:
-- Buat tabel dimensi bernama adbpg_dim_table. CREATE TABLE adbpg_dim_table( id int, username text, PRIMARY KEY(id) ); -- Masukkan 50 baris data ke dalam tabel adbpg_dim_table. Nilai bidang id adalah bilangan bulat dari 1 hingga 50, dan nilai bidang username adalah teks nomor baris saat ini yang diikuti oleh string username. INSERT INTO adbpg_dim_table(id, username) SELECT i, 'username'||i::text FROM generate_series(1, 50) AS t(i);Anda dapat menjalankan pernyataan
select * from adbpg_dim_table order by id;untuk melihat data yang dimasukkan.Buat tabel hasil bernama adbpg_sink_table tempat Realtime Compute for Apache Flink menulis data hasil.
CREATE TABLE adbpg_sink_table( id int, username text, score int );
Langkah 2: Buat draf pekerjaan
Masuk ke Konsol Realtime Compute for Apache Flink, temukan ruang kerja Anda, lalu klik Console pada kolom Actions.
Di panel navigasi sebelah kiri, klik . Di pojok kiri atas halaman SQL Editor, klik ikon + dan pilih New Blank Stream Draft.
Pada kotak dialog New Draft, konfigurasikan parameter draf. Tabel berikut menjelaskan parameter-parameter tersebut.
Parameter
Deskripsi
Contoh
Name
Nama draf yang ingin Anda buat.
CatatanNama draf harus unik dalam proyek saat ini.
adbpg-test
Location
Folder tempat file kode draf disimpan.
Anda juga dapat mengklik ikon
di sebelah kanan folder yang sudah ada untuk membuat subfolder. Draft
Engine Version
Versi mesin Flink yang digunakan oleh draf. Untuk informasi selengkapnya tentang versi mesin, pemetaan versi, dan titik waktu penting dalam siklus hidup setiap versi, lihat Engine version.
vvr-8.0.1-flink-1.17
Klik Create.
Langkah 3: Tulis kode draf dan terapkan draf
Salin kode draf berikut ke editor kode.
--- Buat tabel sumber Datagen. Pada contoh ini, Anda tidak perlu mengubah parameter dalam klausa WITH. CREATE TEMPORARY TABLE datagen_source ( id INT, score INT ) WITH ( 'connector' = 'datagen', 'fields.id.kind'='sequence', 'fields.id.start'='1', 'fields.id.end'='50', 'fields.score.kind'='random', 'fields.score.min'='70', 'fields.score.max'='100' ); -- Buat tabel dimensi AnalyticDB for PostgreSQL. Anda perlu mengubah parameter dalam klausa WITH sesuai kebutuhan bisnis Anda. CREATE TEMPORARY TABLE dim_adbpg( id int, username varchar, PRIMARY KEY(id) not ENFORCED ) WITH( 'connector' = 'adbpg', 'url' = 'jdbc:postgresql://gp-2ze****3tysk255b5-master.gpdb.rds.aliyuncs.com:5432/flinktest', 'tablename' = 'adbpg_dim_table', 'username' = 'flinktest', 'password' = '${secret_values.adb_password}', 'maxRetryTimes'='2', -- Tentukan jumlah maksimum percobaan ulang menulis setelah data gagal ditulis ke tabel. 'cache'='lru', -- Tentukan kebijakan cache. 'cacheSize'='100' -- Tentukan ukuran cache. ); -- Buat tabel hasil AnalyticDB for PostgreSQL. Anda perlu mengubah parameter dalam klausa WITH sesuai kebutuhan bisnis Anda. CREATE TEMPORARY TABLE sink_adbpg ( id int, username varchar, score int ) WITH ( 'connector' = 'adbpg', 'url' = 'jdbc:postgresql://gp-2ze****3tysk255b5-master.gpdb.rds.aliyuncs.com:5432/flinktest', 'tablename' = 'adbpg_sink_table', 'username' = 'flinktest', 'password' = '${secret_values.adb_password}', 'maxRetryTimes' = '2', 'conflictMode' = 'ignore',-- Tentukan kebijakan yang digunakan ketika terjadi konflik kunci primer atau konflik indeks selama penyisipan data. 'retryWaitTime' = '200' -- Tentukan interval antar percobaan ulang. ); -- Masukkan hasil yang diperoleh setelah tabel dimensi dan tabel sumber digabungkan ke dalam tabel hasil AnalyticDB for PostgreSQL. INSERT INTO sink_adbpg SELECT ts.id,ts.username,ds.score FROM datagen_source AS ds JOIN dim_adbpg FOR SYSTEM_TIME AS OF PROCTIME() AS ts on ds.id = ts.id;Ubah parameter sesuai kebutuhan bisnis Anda.
Pada contoh ini, Anda tidak perlu mengubah tabel sumber Datagen. Namun, Anda harus menyesuaikan parameter tabel dimensi AnalyticDB for PostgreSQL dan tabel hasil AnalyticDB for PostgreSQL sesuai kebutuhan bisnis Anda. Tabel berikut menjelaskan parameter-parameter tersebut. Untuk informasi selengkapnya tentang parameter dan pemetaan tipe data konektor terkait, lihat Referensi.
Parameter
Wajib
Deskripsi
url
Ya
URL JDBC yang digunakan untuk menghubungkan ke instans AnalyticDB for PostgreSQL. URL JDBC memiliki format
jdbc:postgresql://<Titik akhir internal>:<Nomor Port>/<Nama database>. Anda dapat masuk ke Konsol AnalyticDB for PostgreSQL untuk melihat URL tersebut di halaman Database Connection instans.tablename
Ya
Nama tabel di database AnalyticDB for PostgreSQL.
username
Ya
Nama pengguna yang digunakan untuk mengakses database AnalyticDB for PostgreSQL.
password
Ya
Kata sandi akun database yang digunakan untuk menghubungkan ke database AnalyticDB for PostgreSQL.
targetSchema
Tidak
Nama skema. Nilai default: public. Jika Anda menggunakan skema lain di database, tentukan parameter ini.
Di pojok kanan atas halaman SQL Editor, klik Validate untuk melakukan pemeriksaan sintaksis.
Di pojok kanan atas halaman SQL Editor, klik Deploy.
Pada halaman , temukan penerapan yang diinginkan dan klik Start pada kolom Actions.
Langkah 4: Lihat data yang ditulis Realtime Compute for Apache Flink ke tabel hasil
- Masuk ke Konsol AnalyticDB for PostgreSQL.
Klik Log On to Database. Untuk informasi selengkapnya tentang cara menghubungkan ke database, lihat Client connection.
Jalankan pernyataan berikut untuk melihat data yang ditulis Realtime Compute for Apache Flink ke tabel hasil.
SELECT * FROM adbpg_sink_table ORDER BY id;Gambar berikut menunjukkan hasilnya.
