全部产品
Search
文档中心

Realtime Compute for Apache Flink:PolarDB untuk PostgreSQL (Kompatibel dengan Oracle) 1.0 konektor

更新时间:Jul 06, 2025

Topik ini menjelaskan cara menggunakan PolarDB untuk PostgreSQL (Kompatibel dengan Oracle) 1.0 konektor.

Informasi latar belakang

PolarDB untuk PostgreSQL (Kompatibel dengan Oracle) adalah generasi baru PolarDB yang dikembangkan oleh Alibaba Cloud. PolarDB untuk PostgreSQL (Kompatibel dengan Oracle) memisahkan komputasi dari penyimpanan dan menggunakan perangkat lunak serta perangkat keras terintegrasi. Layanan basis data ini aman, andal, menyediakan penskalaan otomatis, kinerja tinggi, dan penyimpanan massal, serta sangat kompatibel dengan Oracle.

Tabel berikut menggambarkan kemampuan yang didukung oleh PolarDB untuk PostgreSQL (Kompatibel dengan Oracle) 1.0 konektor.

Item

Deskripsi

Jenis tabel

Tabel sink

Mode operasi

Mode streaming dan mode batch

Format data

Tidak tersedia

Metrik

  • Metrik untuk tabel sink

    • numRecordsOut

    • numRecordsOutPerSecond

    • numBytesOut

    • numBytesOutPerSecond

    • currentSendTime

Catatan

Untuk informasi lebih lanjut, lihat Metrik.

Jenis API

SQL API

Pembaruan atau penghapusan data dalam tabel sink

Didukung

Prasyarat

  • Kluster PolarDB untuk PostgreSQL (Kompatibel dengan Oracle) 1.0 telah dibuat dan sebuah tabel telah dibuat. Untuk informasi lebih lanjut, lihat Buat kluster dan Buat tabel.

  • Daftar putih kluster PolarDB untuk PostgreSQL (Kompatibel dengan Oracle) telah dikonfigurasi. Untuk informasi lebih lanjut, lihat Konfigurasikan daftar putih untuk kluster.

Batasan

  • Konektor ini hanya mendukung PolarDB untuk PostgreSQL (Kompatibel dengan Oracle) 1.0. Untuk menyalurkan data ke PolarDB untuk PostgreSQL (Kompatibel dengan Oracle) 2.0, gunakan JDBC konektor.

  • Konektor ini hanya didukung di Ververica Runtime (VVR) 8.0.5 atau yang lebih baru.

Sintaksis

CREATE TABLE polardbo_table (
 id INT,
 len INT,
 content VARCHAR, 
 PRIMARY KEY(id)
) WITH (
 'connector'='polardbo',
 'url'='jdbc:postgresql://<Alamat>:<IdPort>/<NamaDatabase>',
 'tableName'='<NamaTabelDatabaseAnda>',
 'userName'='<NamaPenggunaDatabaseAnda>',
 'password'='<KataSandiDatabaseAnda>'
);

Opsi konektor dalam klausa WITH

Opsi

Deskripsi

Tipe data

Diperlukan

Nilai default

Catatan

connector

Tipe tabel.

STRING

Ya

Tidak ada nilai default

Atur nilainya menjadi polardbo.

url

URL Java Database Connectivity (JDBC) dari database.

STRING

Ya

Tidak ada nilai default

URL dalam format jdbc:postgresql://<Alamat>:<IdPort>/<NamaDatabase>.

tableName

Nama tabel dalam database.

STRING

Ya

Tidak ada nilai default

Tidak tersedia.

userName

Nama pengguna yang digunakan untuk mengakses database.

STRING

Ya

Tidak ada nilai default

Tidak tersedia.

password

Kata sandi yang digunakan untuk mengakses database.

STRING

Ya

Tidak ada nilai default

Untuk meningkatkan keamanan, gunakan variabel alih-alih menuliskan kredensial Anda secara langsung dalam teks biasa. Untuk informasi lebih lanjut, lihat Kelola variabel.

maxRetryTimes

Jumlah maksimum percobaan ulang yang diizinkan untuk menulis data ke tabel jika upaya penulisan data gagal.

INTEGER

Tidak

3

Tidak tersedia.

targetSchema

Nama skema.

STRING

Tidak

public

Tidak tersedia.

caseSensitive

Menentukan apakah sensitivitas huruf besar/kecil diaktifkan.

STRING

Tidak

false

Nilai valid:

  • true: Sensitivitas huruf besar/kecil diaktifkan.

  • false: Sensitivitas huruf besar/kecil dinonaktifkan.

connectionMaxActive

Jumlah maksimum koneksi dalam kumpulan koneksi.

INTEGER

Tidak

5

Sistem secara otomatis melepaskan koneksi idle ke layanan basis data.

Penting

Jika opsi ini disetel ke nilai yang terlalu besar, jumlah koneksi server mungkin tidak normal.

retryWaitTime

Interval antara percobaan ulang.

INTEGER

Tidak

100

Unit: milidetik.

batchSize

Jumlah catatan data yang dapat ditulis ke tabel sekaligus.

INTEGER

Tidak

500

Tidak tersedia.

flushIntervalMs

Interval pembersihan cache.

INTEGER

Tidak

Tidak ada nilai default

Unit: milidetik. Jika jumlah catatan data yang di-cache tidak mencapai batas atas dalam periode waktu tertentu, semua data yang di-cache ditulis ke tabel sink.

writeMode

Mode tulis di mana sistem mencoba menulis data ke tabel untuk pertama kalinya.

STRING

Tidak

insert

Nilai valid:

  • insert: Data langsung dimasukkan ke tabel sink. Jika terjadi konflik, kebijakan penanganan ditentukan oleh parameter conflictMode. Ini adalah nilai default.

  • upsert: Data dalam tabel diperbarui secara otomatis saat terjadi konflik. Nilai ini hanya cocok untuk tabel yang memiliki kunci utama.

conflictMode

Kebijakan berdasarkan mana konflik kunci utama atau indeks ditangani saat data dimasukkan ke tabel.

STRING

Tidak

strict

Nilai valid:

  • strict: Jika terjadi konflik, sistem melaporkan kesalahan. Ini adalah nilai default.

  • ignore: Jika terjadi konflik, sistem mengabaikan konflik.

  • update: Jika terjadi konflik, sistem secara otomatis memperbarui data dalam tabel. Nilai ini cocok untuk tabel yang tidak memiliki kunci utama. Kebijakan ini mengurangi efisiensi pemrosesan data.

Pemetaan tipe data

Tabel berikut memberikan pemetaan tipe data dari bidang Flink ke bidang PolarDB untuk Oracle 1.0 saat konektor PolarDB untuk Oracle 1.0 digunakan untuk tabel sink.

Tipe data PolarDB untuk Oracle 1.0

Tipe data Flink

BOOLEAN

BOOLEAN

INT

INT

NUMBER

BIGINT

NUMBER

DOUBLE

VARCHAR

VARCHAR

TIMESTAMP

TIMESTAMP

VARCHAR

DATE

Contoh kode

  • Contoh kode untuk tabel sink

    CREATE TEMPORARY TABLE datagen_source (
     `name` VARCHAR,
     `age` INT
    )
    COMMENT 'datagen source table'
    WITH (
     'connector' = 'datagen'
    );
    
    CREATE TABLE polardbo_sink (
     name VARCHAR,
     age INT
    ) WITH (
     'connector'='polardbo',
     'url'='jdbc:postgresql://<Alamat>:<IdPort>/<NamaDatabase>',
     'tableName'='<NamaTabelDatabaseAnda>',
     'userName'='<NamaPenggunaDatabaseAnda>',
     'password'='<KataSandiDatabaseAnda>'
    );
    
    INSERT INTO polardbo_sink
    SELECT * FROM datagen_source;