全部产品
Search
文档中心

Realtime Compute for Apache Flink:Akses AnalyticDB for PostgreSQL dari Flink

更新时间:Nov 10, 2025

Topik ini menjelaskan cara menggunakan Alibaba Cloud Realtime Compute for Apache Flink untuk membaca data dari dan menulis data ke AnalyticDB for PostgreSQL.

Informasi latar belakang

AnalyticDB for PostgreSQL adalah gudang data berbasis arsitektur pemrosesan paralel masif (MPP) yang menyediakan layanan analisis daring untuk volume data besar. Realtime Compute for Apache Flink adalah platform analitik big data real-time berbasis Apache Flink yang menyediakan berbagai konektor hulu dan hilir untuk mendukung beragam skenario bisnis serta menawarkan layanan komputasi real-time yang efisien dan fleksibel. Realtime Compute for Apache Flink dapat membaca data dari AnalyticDB for PostgreSQL, sehingga memanfaatkan sepenuhnya keunggulan AnalyticDB for PostgreSQL guna meningkatkan efisiensi dan akurasi analitik data.

Batasan

  • Realtime Compute for Apache Flink tidak dapat membaca data dari AnalyticDB for PostgreSQL dalam mode arsitektur tanpa server.

  • Hanya Realtime Compute for Apache Flink yang menggunakan Ververica Runtime (VVR) versi 6.0.0 atau lebih baru yang mendukung konektor AnalyticDB for PostgreSQL.

  • Hanya Realtime Compute for Apache Flink yang menggunakan VVR versi 8.0.1 atau lebih baru yang mendukung AnalyticDB for PostgreSQL V7.0.

    Catatan

    Jika Anda menggunakan konektor kustom, ikuti petunjuk dalam Kelola konektor kustom.

Prasyarat

Langkah 1: Konfigurasi daftar putih dan siapkan data

  1. Masuk ke Konsol AnalyticDB for PostgreSQL.
  2. Tambahkan blok CIDR dari ruang kerja Flink yang sepenuhnya dikelola ke daftar putih instans AnalyticDB for PostgreSQL.

    1. Lihat blok CIDR dari vSwitch tempat ruang kerja Flink yang sepenuhnya dikelola berada. Untuk informasi selengkapnya, lihat Bagaimana cara mengonfigurasi daftar putih?.

    2. Tambahkan blok CIDR ruang kerja Flink yang sepenuhnya dikelola ke daftar putih instans AnalyticDB for PostgreSQL tujuan. Untuk informasi selengkapnya, lihat Prosedur.

      Catatan

      Jika Anda mengakses instans AnalyticDB for PostgreSQL melalui Internet, tambahkan Alamat IP publik ke daftar putih.

  3. Di pojok kanan atas halaman detail instans, klik Log On to Database dan masukkan nama pengguna serta kata sandi. Untuk informasi selengkapnya tentang cara mengakses database, lihat Gunakan alat klien untuk terhubung ke instans.

  4. Buat tabel bernama adbpg_dim_table di database tujuan instans tersebut dan masukkan 50 baris data ke dalam tabel tersebut.

    Pernyataan contoh:

    -- Buat tabel dimensi bernama adbpg_dim_table. 
    CREATE TABLE adbpg_dim_table(
    id int,
    username text,
    PRIMARY KEY(id)
    );
    
    -- Masukkan 50 baris data ke dalam tabel adbpg_dim_table. Nilai bidang id adalah bilangan bulat dari 1 hingga 50, dan nilai bidang username adalah teks nomor baris saat ini yang diikuti oleh string username. 
    INSERT INTO adbpg_dim_table(id, username)
    SELECT i, 'username'||i::text
    FROM generate_series(1, 50) AS t(i);

    Anda dapat menjalankan pernyataan select * from adbpg_dim_table order by id; untuk melihat data yang dimasukkan.

  5. Buat tabel hasil bernama adbpg_sink_table tempat Realtime Compute for Apache Flink menulis data hasil.

    CREATE TABLE adbpg_sink_table(
      id int,
      username text,
      score int
    );

Langkah 2: Buat draf pekerjaan

  1. Masuk ke Konsol Realtime Compute for Apache Flink, temukan ruang kerja Anda, lalu klik Console pada kolom Actions.

  2. Di panel navigasi sebelah kiri, klik Development > ETL. Di pojok kiri atas halaman SQL Editor, klik ikon + dan pilih New Blank Stream Draft.

  3. Pada kotak dialog New Draft, konfigurasikan parameter draf. Tabel berikut menjelaskan parameter-parameter tersebut.

    Parameter

    Deskripsi

    Contoh

    Name

    Nama draf yang ingin Anda buat.

    Catatan

    Nama draf harus unik dalam proyek saat ini.

    adbpg-test

    Location

    Folder tempat file kode draf disimpan.

    Anda juga dapat mengklik ikon 新建文件夹 di sebelah kanan folder yang sudah ada untuk membuat subfolder.

    Draft

    Engine Version

    Versi mesin Flink yang digunakan oleh draf. Untuk informasi selengkapnya tentang versi mesin, pemetaan versi, dan titik waktu penting dalam siklus hidup setiap versi, lihat Engine version.

    vvr-8.0.1-flink-1.17

  4. Klik Create.

Langkah 3: Tulis kode draf dan terapkan draf

  1. Salin kode draf berikut ke editor kode.

    --- Buat tabel sumber Datagen. Pada contoh ini, Anda tidak perlu mengubah parameter dalam klausa WITH. 
    CREATE TEMPORARY TABLE datagen_source (
     id INT,
     score INT
    ) WITH (
     'connector' = 'datagen', 
     'fields.id.kind'='sequence',
     'fields.id.start'='1',
     'fields.id.end'='50',
     'fields.score.kind'='random',
     'fields.score.min'='70',
     'fields.score.max'='100'
    );
    
    -- Buat tabel dimensi AnalyticDB for PostgreSQL. Anda perlu mengubah parameter dalam klausa WITH sesuai kebutuhan bisnis Anda. 
    CREATE TEMPORARY TABLE dim_adbpg(
     id int,
     username varchar,
     PRIMARY KEY(id) not ENFORCED
    ) WITH(
     'connector' = 'adbpg', 
     'url' = 'jdbc:postgresql://gp-2ze****3tysk255b5-master.gpdb.rds.aliyuncs.com:5432/flinktest',
     'tablename' = 'adbpg_dim_table', 
     'username' = 'flinktest',
     'password' = '${secret_values.adb_password}',
     'maxRetryTimes'='2', -- Tentukan jumlah maksimum percobaan ulang menulis setelah data gagal ditulis ke tabel. 
     'cache'='lru',  -- Tentukan kebijakan cache. 
     'cacheSize'='100'  -- Tentukan ukuran cache.
    );
    
    -- Buat tabel hasil AnalyticDB for PostgreSQL. Anda perlu mengubah parameter dalam klausa WITH sesuai kebutuhan bisnis Anda. 
    CREATE TEMPORARY TABLE sink_adbpg (
      id int,
      username varchar,
      score int
    ) WITH (
      'connector' = 'adbpg', 
      'url' = 'jdbc:postgresql://gp-2ze****3tysk255b5-master.gpdb.rds.aliyuncs.com:5432/flinktest',
      'tablename' = 'adbpg_sink_table',  
      'username' = 'flinktest',
      'password' = '${secret_values.adb_password}',
      'maxRetryTimes' = '2',
      'conflictMode' = 'ignore',-- Tentukan kebijakan yang digunakan ketika terjadi konflik kunci primer atau konflik indeks selama penyisipan data. 
      'retryWaitTime' = '200'  -- Tentukan interval antar percobaan ulang. 
    );
    
    -- Masukkan hasil yang diperoleh setelah tabel dimensi dan tabel sumber digabungkan ke dalam tabel hasil AnalyticDB for PostgreSQL. 
    INSERT INTO sink_adbpg
    SELECT ts.id,ts.username,ds.score
    FROM datagen_source AS ds
    JOIN dim_adbpg FOR SYSTEM_TIME AS OF PROCTIME() AS ts
    on ds.id = ts.id;
  2. Ubah parameter sesuai kebutuhan bisnis Anda.

    Pada contoh ini, Anda tidak perlu mengubah tabel sumber Datagen. Namun, Anda harus menyesuaikan parameter tabel dimensi AnalyticDB for PostgreSQL dan tabel hasil AnalyticDB for PostgreSQL sesuai kebutuhan bisnis Anda. Tabel berikut menjelaskan parameter-parameter tersebut. Untuk informasi selengkapnya tentang parameter dan pemetaan tipe data konektor terkait, lihat Referensi.

    Parameter

    Wajib

    Deskripsi

    url

    Ya

    URL JDBC yang digunakan untuk menghubungkan ke instans AnalyticDB for PostgreSQL. URL JDBC memiliki format jdbc:postgresql://<Titik akhir internal>:<Nomor Port>/<Nama database>. Anda dapat masuk ke Konsol AnalyticDB for PostgreSQL untuk melihat URL tersebut di halaman Database Connection instans.

    tablename

    Ya

    Nama tabel di database AnalyticDB for PostgreSQL.

    username

    Ya

    Nama pengguna yang digunakan untuk mengakses database AnalyticDB for PostgreSQL.

    password

    Ya

    Kata sandi akun database yang digunakan untuk menghubungkan ke database AnalyticDB for PostgreSQL.

    targetSchema

    Tidak

    Nama skema. Nilai default: public. Jika Anda menggunakan skema lain di database, tentukan parameter ini.

  3. Di pojok kanan atas halaman SQL Editor, klik Validate untuk melakukan pemeriksaan sintaksis.

  4. Di pojok kanan atas halaman SQL Editor, klik Deploy.

  5. Pada halaman O&M > Deployments, temukan penerapan yang diinginkan dan klik Start pada kolom Actions.

Langkah 4: Lihat data yang ditulis Realtime Compute for Apache Flink ke tabel hasil

  1. Masuk ke Konsol AnalyticDB for PostgreSQL.
  2. Klik Log On to Database. Untuk informasi selengkapnya tentang cara menghubungkan ke database, lihat Client connection.

  3. Jalankan pernyataan berikut untuk melihat data yang ditulis Realtime Compute for Apache Flink ke tabel hasil.

    SELECT * FROM adbpg_sink_table ORDER BY id;

    Gambar berikut menunjukkan hasilnya.

    image.png

Referensi