すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:PolarDB for Oracle 1.0コネクタ

最終更新日:Jun 07, 2025

このトピックでは、PolarDB for Oracle 1.0コネクタの使用方法について説明します。

背景情報

PolarDB for PostgreSQL(Oracle互換)は、Alibaba Cloudによって開発された新世代のPolarDBです。 PolarDB for PostgreSQL(Oracle互換)は、コンピューティングとストレージを分離し、統合されたソフトウェアとハードウェアを使用します。 PolarDB for PostgreSQL(Oracle互換)は、自動スケーリング、高パフォーマンス、および大容量ストレージを提供する安全で信頼性の高いデータベースサービスです。 このサービスは、Oracleとの互換性が高いです。

次の表に、PolarDB for Oracle 1.0コネクタでサポートされている機能を示します。

項目

説明

テーブルタイプ

シンクテーブル

実行モード

ストリーミングモードとバッチモード

データ形式

該当なし

メトリック

  • シンクテーブルのメトリック

    • numRecordsOut

    • numRecordsOutPerSecond

    • numBytesOut

    • numBytesOutPerSecond

    • currentSendTime

説明

詳細については、「メトリック」をご参照ください。

APIタイプ

SQL API

シンクテーブルでのデータの更新または削除

サポートされています

前提条件

制限事項

  • このコネクタは、PolarDB for PostgreSQL (Oracle 互換) 1.0 のみサポートしています。PolarDB for PostgreSQL (Oracle 互換) 2.0 にデータを取り込むには、JDBC コネクタ を使用します。

  • このコネクタは、Ververica Runtime (VVR) 8.0.5 以降でのみサポートされています。

構文

CREATE TABLE polardbo_table (
 id INT,
 len INT,
 content VARCHAR, 
 PRIMARY KEY(id)
) WITH (
 'connector'='polardbo',
 'url'='jdbc:postgresql://<Address>:<PortId>/<DatabaseName>',  // データベースのJDBC URL
 'tableName'='<yourDatabaseTableName>', // テーブル名
 'userName'='<yourDatabaseUserName>', // ユーザー名
 'password'='<yourDatabasePassword>' // パスワード
);

WITH句のパラメータ

パラメータ

説明

データ型

必須

デフォルト値

備考

connector

テーブルのタイプ。

STRING

はい

デフォルト値なし

値を polardbo に設定します。

url

データベースのJava Database Connectivity(JDBC) URL。

STRING

はい

デフォルト値なし

URL は jdbc:postgresql://<Address>:<PortId>/<DatabaseName> 形式です。

tableName

データベース内のテーブルの名前。

STRING

はい

デフォルト値なし

該当なし。

userName

データベースへのアクセスに使用するユーザー名。

STRING

はい

デフォルト値なし

該当なし。

password

データベースへのアクセスに使用するパスワード。

STRING

はい

デフォルト値なし

セキュリティを強化するために、資格情報をプレーンテキストでハードコーディングする代わりに変数を使用します。詳細については、「変数を管理する」をご参照ください。

maxRetryTimes

データ書き込み試行が失敗した場合に、テーブルへのデータ書き込みを再試行できる最大回数。

INTEGER

いいえ

3

該当なし。

targetSchema

スキーマの名前。

STRING

いいえ

public

該当なし。

caseSensitive

大文字と小文字を区別するかどうかを指定します。

STRING

いいえ

false

有効な値:

  • true:大文字と小文字が区別されます。

  • false: 大文字と小文字が区別されません。

connectionMaxActive

接続プール内の最大接続数。

INTEGER

いいえ

5

システムは、アイドル状態の接続をデータベースサービスに自動的に解放します。

重要

このパラメータに過度に大きな値を設定すると、サーバー接続数が異常になる可能性があります。

retryWaitTime

再試行間隔。

INTEGER

いいえ

100

単位:ミリ秒。

batchSize

一度にテーブルに書き込むことができるデータレコードの数。

INTEGER

いいえ

500

該当なし。

flushIntervalMs

キャッシュがクリアされる間隔。

INTEGER

いいえ

デフォルト値なし

単位:ミリ秒。 指定された期間内にキャッシュされたデータレコード数が上限に達しない場合、キャッシュされたすべてのデータがシンクテーブルに書き込まれます。

writeMode

システムが初めてテーブルにデータを書き込もうとする書き込みモード。

STRING

いいえ

insert

有効な値:

  • insert:データはシンクテーブルに直接挿入されます。 競合が発生した場合、処理ポリシーは conflictMode パラメータによって決定されます。 これはデフォルト値です。

  • upsert:競合が発生した場合、テーブル内のデータは自動的に更新されます。 この値は、主キーを持つテーブルにのみ適しています。

conflictMode

テーブルにデータを挿入するときに、主キーの競合またはインデックスの競合を処理するためのポリシー。

STRING

いいえ

strict

有効な値:

  • strict:競合が発生した場合、システムはエラーを報告します。 これはデフォルト値です。

  • ignore:競合が発生した場合、システムは競合を無視します。

  • update:競合が発生した場合、システムはテーブル内のデータを自動的に更新します。 この値は、主キーを持たないテーブルに適しています。 このポリシーは、データ処理効率を低下させます。

データ型のマッピング

次の表に、PolarDB for Oracle 1.0コネクタがシンクテーブルに使用される場合の、FlinkフィールドからPolarDB for Oracle 1.0のフィールドへのデータ型のマッピングを示します。

PolarDB for Oracle 1.0のデータ型

Flinkのデータ型

BOOLEAN

BOOLEAN

INT

INT

NUMBER

BIGINT

NUMBER

DOUBLE

VARCHAR

VARCHAR

TIMESTAMP

TIMESTAMP

VARCHAR

DATE

サンプルコード

  • シンクテーブルのサンプルコード

    CREATE TEMPORARY TABLE datagen_source (
     `name` VARCHAR,
     `age` INT
    )
    COMMENT 'datagenソーステーブル' // datagen source table
    WITH (
     'connector' = 'datagen'
    );
    
    CREATE TABLE polardbo_sink (
     name VARCHAR,
     age INT
    ) WITH (
     'connector'='polardbo',
     'url'='jdbc:postgresql://<Address>:<PortId>/<DatabaseName>', // データベースのJDBC URL
     'tableName'='<yourDatabaseTableName>', // テーブル名
     'userName'='<yourDatabaseUserName>', // ユーザー名
     'password'='<yourDatabasePassword>' // パスワード
    );
    
    INSERT INTO polardbo_sink
    SELECT * FROM datagen_source;