このトピックでは、PolarDB for Oracle 1.0コネクタの使用方法について説明します。
背景情報
PolarDB for PostgreSQL(Oracle互換)は、Alibaba Cloudによって開発された新世代のPolarDBです。 PolarDB for PostgreSQL(Oracle互換)は、コンピューティングとストレージを分離し、統合されたソフトウェアとハードウェアを使用します。 PolarDB for PostgreSQL(Oracle互換)は、自動スケーリング、高パフォーマンス、および大容量ストレージを提供する安全で信頼性の高いデータベースサービスです。 このサービスは、Oracleとの互換性が高いです。
次の表に、PolarDB for Oracle 1.0コネクタでサポートされている機能を示します。
項目 | 説明 |
テーブルタイプ | シンクテーブル |
実行モード | ストリーミングモードとバッチモード |
データ形式 | 該当なし |
メトリック |
説明 詳細については、「メトリック」をご参照ください。 |
APIタイプ | SQL API |
シンクテーブルでのデータの更新または削除 | サポートされています |
前提条件
PolarDB for Oracle 1.0クラスタが作成され、テーブルが作成されていること。 詳細については、「クラスタの作成」および「テーブルの作成」をご参照ください。
PolarDB for Oracle 1.0のホワイトリストが構成されていること。 詳細については、「クラスタのホワイトリストの構成」をご参照ください。
制限事項
このコネクタは、PolarDB for PostgreSQL (Oracle 互換) 1.0 のみサポートしています。PolarDB for PostgreSQL (Oracle 互換) 2.0 にデータを取り込むには、JDBC コネクタ を使用します。
このコネクタは、Ververica Runtime (VVR) 8.0.5 以降でのみサポートされています。
構文
CREATE TABLE polardbo_table (
id INT,
len INT,
content VARCHAR,
PRIMARY KEY(id)
) WITH (
'connector'='polardbo',
'url'='jdbc:postgresql://<Address>:<PortId>/<DatabaseName>', // データベースのJDBC URL
'tableName'='<yourDatabaseTableName>', // テーブル名
'userName'='<yourDatabaseUserName>', // ユーザー名
'password'='<yourDatabasePassword>' // パスワード
);WITH句のパラメータ
パラメータ | 説明 | データ型 | 必須 | デフォルト値 | 備考 |
connector | テーブルのタイプ。 | STRING | はい | デフォルト値なし | 値を polardbo に設定します。 |
url | データベースのJava Database Connectivity(JDBC) URL。 | STRING | はい | デフォルト値なし | URL は |
tableName | データベース内のテーブルの名前。 | STRING | はい | デフォルト値なし | 該当なし。 |
userName | データベースへのアクセスに使用するユーザー名。 | STRING | はい | デフォルト値なし | 該当なし。 |
password | データベースへのアクセスに使用するパスワード。 | STRING | はい | デフォルト値なし | セキュリティを強化するために、資格情報をプレーンテキストでハードコーディングする代わりに変数を使用します。詳細については、「変数を管理する」をご参照ください。 |
maxRetryTimes | データ書き込み試行が失敗した場合に、テーブルへのデータ書き込みを再試行できる最大回数。 | INTEGER | いいえ | 3 | 該当なし。 |
targetSchema | スキーマの名前。 | STRING | いいえ | public | 該当なし。 |
caseSensitive | 大文字と小文字を区別するかどうかを指定します。 | STRING | いいえ | false | 有効な値:
|
connectionMaxActive | 接続プール内の最大接続数。 | INTEGER | いいえ | 5 | システムは、アイドル状態の接続をデータベースサービスに自動的に解放します。 重要 このパラメータに過度に大きな値を設定すると、サーバー接続数が異常になる可能性があります。 |
retryWaitTime | 再試行間隔。 | INTEGER | いいえ | 100 | 単位:ミリ秒。 |
batchSize | 一度にテーブルに書き込むことができるデータレコードの数。 | INTEGER | いいえ | 500 | 該当なし。 |
flushIntervalMs | キャッシュがクリアされる間隔。 | INTEGER | いいえ | デフォルト値なし | 単位:ミリ秒。 指定された期間内にキャッシュされたデータレコード数が上限に達しない場合、キャッシュされたすべてのデータがシンクテーブルに書き込まれます。 |
writeMode | システムが初めてテーブルにデータを書き込もうとする書き込みモード。 | STRING | いいえ | insert | 有効な値:
|
conflictMode | テーブルにデータを挿入するときに、主キーの競合またはインデックスの競合を処理するためのポリシー。 | STRING | いいえ | strict | 有効な値:
|
データ型のマッピング
次の表に、PolarDB for Oracle 1.0コネクタがシンクテーブルに使用される場合の、FlinkフィールドからPolarDB for Oracle 1.0のフィールドへのデータ型のマッピングを示します。
PolarDB for Oracle 1.0のデータ型 | Flinkのデータ型 |
BOOLEAN | BOOLEAN |
INT | INT |
NUMBER | BIGINT |
NUMBER | DOUBLE |
VARCHAR | VARCHAR |
TIMESTAMP | TIMESTAMP |
VARCHAR | DATE |
サンプルコード
シンクテーブルのサンプルコード
CREATE TEMPORARY TABLE datagen_source ( `name` VARCHAR, `age` INT ) COMMENT 'datagenソーステーブル' // datagen source table WITH ( 'connector' = 'datagen' ); CREATE TABLE polardbo_sink ( name VARCHAR, age INT ) WITH ( 'connector'='polardbo', 'url'='jdbc:postgresql://<Address>:<PortId>/<DatabaseName>', // データベースのJDBC URL 'tableName'='<yourDatabaseTableName>', // テーブル名 'userName'='<yourDatabaseUserName>', // ユーザー名 'password'='<yourDatabasePassword>' // パスワード ); INSERT INTO polardbo_sink SELECT * FROM datagen_source;