All Products
Search
Document Center

AnalyticDB:Gunakan Realtime Compute for Apache Flink untuk membaca dan menulis data penuh secara real-time

Last Updated:Jun 26, 2025

Topik ini menjelaskan cara menggunakan Alibaba Cloud Realtime Compute for Apache Flink untuk berlangganan data penuh AnalyticDB for PostgreSQL.

Informasi latar belakang

AnalyticDB for PostgreSQL adalah layanan gudang data pemrosesan paralel masif (MPP) yang menyediakan analisis real-time untuk sejumlah besar data. Realtime Compute for Apache Flink adalah platform analitik data besar real-time yang dibangun berdasarkan Apache Flink. Platform ini menyediakan berbagai konektor hulu dan hilir untuk memenuhi persyaratan skenario bisnis yang berbeda serta menawarkan layanan komputasi real-time yang efisien dan fleksibel. Realtime Compute for Apache Flink dapat membaca data dari AnalyticDB for PostgreSQL, memanfaatkan sepenuhnya kemampuan AnalyticDB for PostgreSQL dan meningkatkan efisiensi serta akurasi analitik data.

Batasan

  • Realtime Compute for Apache Flink tidak mendukung pembacaan data dari AnalyticDB for PostgreSQL dalam mode Serverless.

  • Hanya Realtime Compute for Apache Flink dengan Ververica Runtime (VVR) versi 6.0.0 atau lebih baru yang mendukung konektor AnalyticDB for PostgreSQL.

  • Hanya Realtime Compute for Apache Flink dengan VVR versi 8.0.1 atau lebih baru yang mendukung AnalyticDB for PostgreSQL versi 7.0.

    Catatan

    Jika Anda menggunakan konektor kustom, lakukan operasi sesuai dengan instruksi yang dijelaskan dalam Kelola Konektor Kustom.

Prasyarat

  • Sebuah ruang kerja Flink yang sepenuhnya dikelola telah dibuat. Untuk informasi lebih lanjut, lihat Aktifkan Realtime Compute for Apache Flink.

  • Sebuah instance AnalyticDB for PostgreSQL telah dibuat. Untuk informasi lebih lanjut, lihat Buat Sebuah Instance.

  • Instance AnalyticDB for PostgreSQL dan ruang kerja Flink yang sepenuhnya dikelola berada dalam virtual private cloud (VPC) yang sama.

Langkah 1: Konfigurasikan instance AnalyticDB for PostgreSQL

  1. Masuk ke konsol AnalyticDB for PostgreSQL.
  2. Tambahkan blok CIDR ruang kerja Flink yang sepenuhnya dikelola ke daftar putih alamat IP instance AnalyticDB for PostgreSQL. Untuk informasi lebih lanjut, lihat Konfigurasikan Daftar Putih Alamat IP.

  3. Klik Log On to Database. Untuk informasi lebih lanjut tentang cara terhubung ke database, lihat Koneksi Klien.

  4. Buat tabel dimensi bernama adbpg_dim_table pada instance AnalyticDB for PostgreSQL dan sisipkan 50 baris data ke dalam tabel.

    Pernyataan contoh:

    -- Buat tabel dimensi bernama adbpg_dim_table. 
    CREATE TABLE adbpg_dim_table(
    id int,
    username text,
    PRIMARY KEY(id)
    );
    
    -- Sisipkan 50 baris data ke dalam tabel adbpg_dim_table. Nilai bidang id adalah bilangan bulat dari 1 hingga 50, dan nilai bidang username adalah teks untuk jumlah baris saat ini yang diikuti oleh string username. 
    INSERT INTO adbpg_dim_table(id, username)
    SELECT i, 'username'||i::text
    FROM generate_series(1, 50) AS t(i);
  5. Buat tabel hasil bernama adbpg_sink_table tempat Realtime Compute for Apache Flink menulis data hasil.

    CREATE TABLE adbpg_sink_table(
      id int,
      username text,
      score int
    );

Langkah 2: Buat draft Realtime Compute for Apache Flink

  1. Masuk ke konsol Realtime Compute for Apache Flink. Pada tab Fully Managed Flink, temukan ruang kerja yang ingin Anda kelola dan klik Console di kolom Actions.

  2. Di panel navigasi sisi kiri, klik SQL Editor. Di sudut kiri atas halaman SQL Editor, klik New. Dalam kotak dialog New Draft, klik Blank Stream Draft pada tab SQL Scripts dan klik Next.

  3. Dalam kotak dialog New Draft, konfigurasikan parameter yang dijelaskan dalam tabel berikut.

    Parameter

    Deskripsi

    Contoh

    Name

    Nama draft yang ingin Anda buat.

    Catatan

    Nama draft harus unik dalam proyek saat ini.

    adbpg-test

    Location

    Folder tempat file kode draft disimpan.

    Anda juga dapat mengklik ikon 新建文件夹 di sebelah kanan folder yang ada untuk membuat subfolder.

    Draft

    Engine Version

    Versi engine Flink yang digunakan oleh draft. Untuk informasi lebih lanjut tentang versi engine, pemetaan versi, dan titik waktu penting dalam siklus hidup setiap versi, lihat Versi Engine.

    vvr-6.0.7-flink-1.15

  4. Klik Create.

Langkah 3: Tulis kode draft dan terapkan draft

  1. Salin kode draft berikut ke editor kode:

    --- Buat tabel sumber Datagen. 
    CREATE TEMPORARY TABLE datagen_source (
     id INT,
     score INT
    ) WITH (
     'connector' = 'datagen',
     'fields.id.kind'='sequence',
     'fields.id.start'='1',
     'fields.id.end'='50',
     'fields.score.kind'='random',
     'fields.score.min'='70',
     'fields.score.max'='100'
    );
    
    -- Buat tabel dimensi AnalyticDB for PostgreSQL. 
    CREATE TEMPORARY TABLE dim_adbpg(
     id int,
     username varchar,
     PRIMARY KEY(id) not ENFORCED
    ) WITH(
     'connector' = 'adbpg', 
     'url' = 'jdbc:postgresql://gp-2ze****3tysk255b5-master.gpdb.rds.aliyuncs.com:5432/flinktest',
     'tablename' = 'adbpg_dim_table', 
     'username' = 'flink****test',
     'password' = '*******',
     'maxJoinRows'='100',
     'maxRetryTimes'='1', 
     'cache'='lru',
     'cacheSize'='1000'
    );
    
    -- Buat tabel hasil AnalyticDB for PostgreSQL. 
    CREATE TEMPORARY TABLE sink_adbpg (
      id int,
      username varchar,
      score int
    ) WITH (
      'connector' = 'adbpg', 
      'url' = 'jdbc:postgresql://gp-2ze****3tysk255b5-master.gpdb.rds.aliyuncs.com:5432/flinktest',
      'tablename' = 'adbpg_sink_table',  
      'username' = 'flink****test',
      'password' = '******',
      'maxRetryTimes' = '2',
      'batchsize' = '5000',
      'conflictMode' = 'ignore',
      'writeMode' = 'insert',
      'retryWaitTime' = '200'
    );
    
    -- Masukkan hasil gabungan tabel dimensi dan tabel sumber ke dalam tabel hasil AnalyticDB for PostgreSQL. 
    INSERT INTO sink_adbpg
    SELECT ts.id,ts.username,ds.score
    FROM datagen_source AS ds
     join
    dim_adbpg FOR SYSTEM_TIME AS OF PROCTIME() AS ts
    on ds.id = ts.id;
  2. Modifikasi parameter yang dijelaskan dalam tabel berikut berdasarkan kebutuhan bisnis Anda.

    Parameter

    Diperlukan

    Deskripsi

    URL

    Ya

    URL Java Database Connectivity (JDBC) yang digunakan untuk terhubung ke instance AnalyticDB for PostgreSQL. URL JDBC dalam format jdbc:postgresql://<Titik akhir internal>:<Nomor port>/<Nama database>. Contoh: jdbc:postgresql://gp-xxxxxx.gpdb.cn-chengdu.rds.aliyuncs.com:3432/postgres.

    tablename

    Ya

    Nama tabel dalam database AnalyticDB for PostgreSQL.

    username

    Ya

    Nama akun database yang digunakan untuk terhubung ke database AnalyticDB for PostgreSQL.

    password

    Ya

    Kata sandi akun database AnalyticDB for PostgreSQL.

    Catatan

    Untuk informasi lebih lanjut tentang parameter dan pemetaan tipe data, lihat Konektor AnalyticDB for PostgreSQL.

  3. Di sudut kanan atas halaman SQL Editor, klik Validate untuk melakukan pemeriksaan sintaksis.

  4. Di sudut kanan atas halaman SQL Editor, klik Deploy.

  5. Di halaman Deployments, temukan penyebaran yang diinginkan dan klik Start di kolom Actions.

Langkah 4: Kueri data yang ditulis Realtime Compute for Apache Flink ke tabel hasil

  1. Masuk ke konsol AnalyticDB for PostgreSQL.
  2. Klik Log On to Database. Untuk informasi lebih lanjut tentang cara terhubung ke database, lihat Koneksi Klien.

  3. Eksekusi pernyataan berikut untuk mengkueri data yang ditulis Realtime Compute for Apache Flink ke tabel hasil:

    SELECT * FROM adbpg_sink_table;

    image.png

Referensi