すべてのプロダクト
Search
ドキュメントセンター

DataWorks:AnalyticDB for PostgreSQL

最終更新日:Mar 01, 2026

AnalyticDB for PostgreSQL データソースを使用すると、AnalyticDB for PostgreSQL からのデータの読み取りと書き込みが可能になります。このトピックでは、DataWorks における AnalyticDB for PostgreSQL のデータ同期機能について説明します。

制限事項

オフライン同期タスクは、ビューからのデータ読み取りをサポートしています。

サポート対象バージョン

バージョン 7.0 までがサポート対象です。

サポートされるフィールドタイプ

オフライン読み取り

AnalyticDB for PostgreSQL Reader は、AnalyticDB for PostgreSQL のほとんどのデータ型をサポートしていますが、すべてではありません。続行する前に、ご利用のデータ型がサポートされていることを確認してください。

次の表に、AnalyticDB for PostgreSQL Reader のデータ型マッピングを示します。

カテゴリ

AnalyticDB for PostgreSQL データ型

整数型

BIGINT、BIGSERIAL、INTEGER、SMALLINT、SERIAL、GEOMETRY

浮動小数点型

DOUBLE、PRECISION、MONEY、NUMERIC、REAL

文字列型

VARCHAR、CHAR、TEXT、BIT、INET

日時型

DATE、TIME、TIMESTAMP

ブール型

BOOL

バイナリ型

BYTEA

オフライン書き込み

AnalyticDB for PostgreSQL Writer は、AnalyticDB for PostgreSQL のほとんどのデータ型をサポートしていますが、すべてではありません。続行する前に、ご利用のデータ型がサポートされていることを確認してください。

次の表に、AnalyticDB for PostgreSQL Writer のデータ型マッピングを示します。

カテゴリ

AnalyticDB for PostgreSQL データ型

LONG

BIGINT、BIGSERIAL、INTEGER、SMALLINT、SERIAL

DOUBLE

DOUBLE、PRECISION、MONEY、NUMERIC、REAL

STRING

VARCHAR、CHAR、TEXT、BIT、INET、GEOMETRY

DATE

DATE、TIME、TIMESTAMP

BOOLEAN

BOOL

BYTES

BYTEA

説明

MONEY、INET、BIT データ型を変換するには、a_inet::varchar のような構文を使用します。

データソースの追加

DataWorks で同期タスクを開発する前に、「データソース管理」の手順に従って、必要なデータソースを DataWorks に追加する必要があります。データソースを追加する際に、DataWorks コンソールでパラメーターの説明を表示して、各パラメーターの意味を確認できます

データ同期タスクの開発

同期タスクの設定のエントリポイントと手順については、以下の設定ガイドをご参照ください。

単一テーブルのオフライン同期タスクの設定ガイド

データベース全体のオフライン読み取り同期タスクの設定ガイド

詳細については、「データベース全体のリアルタイム同期タスクの設定」をご参照ください。

付録:スクリプトデモとパラメーターの説明

コードエディタを使用したバッチ同期タスクの設定

コードエディタを使用してバッチ同期タスクを設定する場合、統一されたスクリプト形式の要件に基づいて、スクリプト内の関連パラメーターを設定する必要があります。詳細については、「コードエディタでのタスク設定」をご参照ください。以下では、コードエディタを使用してバッチ同期タスクを設定する際に、データソースに対して設定する必要があるパラメーターについて説明します。

Reader スクリプトデモ

{
    "type": "job",
    "steps": [
        {
            "parameter": {
                "datasource": "test_004",// データソース名。
                "column": [// ソーステーブルの列。
                    "id",
                    "name",
                    "sex",
                    "salary",
                    "age"
                ],
                "where": "id=1001",// フィルター条件。
                "splitPk": "id",// 分割キー。
                "table": "public.person"// ソーステーブル名。
            },
            "name": "Reader",
            "category": "reader"
        },
        {
            "parameter": {},
            "name": "Writer",
            "category": "writer"
        }
    ],
    "version": "2.0",// バージョン番号。
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    },
    "setting": {
        "errorLimit": {// エラーレコード数。
            "record": ""
        },
        "speed": {
            "concurrent": 6,// 同時実行スレッド数。
            "throttle": true,// throttle を false に設定すると、mbps パラメーターは有効にならず、レート制限は適用されません。throttle を true に設定すると、レート制限が適用されます。
           "mbps":"12"// レート制限。1 mbps は 1 MB/s に相当します。
        }
    }
}

Reader スクリプトのパラメーター

パラメーター

説明

必須

デフォルト値

datasource

データソース名。コードエディタはデータソースの追加をサポートしています。このパラメーターの値は、追加されたデータソースの名前と同じである必要があります。

はい

なし

table

データを同期する元のテーブルの名前。

はい

なし

column

同期する列。JSON 配列を使用して列を指定します。デフォルトでは、すべての列が同期されます。例:[*]

  • 列のプルーニングがサポートされています。列のサブセットをエクスポートできます。

  • 列の並べ替えがサポートされています。テーブルスキーマとは異なる順序で列をエクスポートできます。

  • 定数がサポートされています。SQL 構文に従う必要があります。例:["id", "table","1","'mingya.wmy'","'null'", "to_char(a+1)","2.3","true"]

    • id は通常の列名です。

    • table は予約語である列名です。

    • 1 は整数定数です。

    • 'mingya.wmy' は文字列定数です。文字列定数は一重引用符で囲む必要があることにご注意ください。

    • 'null' は文字列定数です。

    • to_char(a+1) は関数です。

    • 2.3 は浮動小数点数です。

    • true はブール値です。

  • column パラメーターは必須です。同期する列を指定する必要があります。

はい

なし

splitPk

AnalyticDB for PostgreSQL Reader がデータを抽出する際に、splitPk パラメーターを指定すると、指定された分割キーに基づいてデータがパーティション分割されます。Data Integration は同時実行タスクを開始してデータを同期するため、同期パフォーマンスが向上します。

  • テーブルのプライマリキーを分割キーとして使用します。プライマリキーは通常、均等に分散されており、シャードでのデータホットスポットを防ぐことができます。

  • splitPk パラメーターは、整数ベースのデータパーティション分割のみをサポートします。文字列、浮動小数点数、または日付に基づくパーティション分割はサポートしていません。サポートされていないデータ型を指定した場合、分割キーは無視され、データは単一チャネルで同期されます。

  • splitPk パラメーターを指定しない場合、またはパラメーターが空の場合、データは単一チャネルで同期されます。

いいえ

なし

where

フィルター条件。AnalyticDB for PostgreSQL Reader は、指定された columntable、および where パラメーターに基づいて SQL 文を構築し、データを抽出します。たとえば、テスト用に当日に生成されたデータを同期するには、where パラメーターを id>2 and sex=1 のように設定できます。

  • 効率的な増分データ同期には、where 句を使用します。

  • このパラメーターを指定しないか、空のままにすると、テーブル内のすべてのデータが同期されます。

いいえ

なし

querySql (このパラメーターはコードエディタでのみ使用可能です。)

一部のビジネスシナリオでは、where 設定項目だけではフィルタリング条件を定義するのに不十分な場合があります。この設定項目を使用して、代わりにカスタム SQL 文を指定できます。このカスタム文を指定すると、データ同期システムは columntable などの他の設定項目を無視し、直接この文を使用してデータをフィルタリングします。たとえば、複数テーブルの結合後にデータを同期するには、select a,b from table_a join table_b on table_a.id = table_b.id のような文を使用します。

querySql を設定すると、AnalyticDB for PostgreSQL Reader は columntable、および where 条件の設定を直接無視します。

いいえ

なし

fetchSize

このパラメーターは、データベースサーバーから各バッチでフェッチするレコード数を指定します。値を大きくすると、Data Integration とサーバー間のネットワークインタラクションの数が減り、データ抽出パフォーマンスが向上する可能性があります。

説明

fetchSize の値が大きすぎる (2048 を超える) 場合、Data Integration プロセスでメモリ不足 (OOM) エラーが発生する可能性があります。

いいえ

512

Writer スクリプトデモ

{
    "type": "job",
    "steps": [
        {
            "parameter": {},
            "name": "Reader",
            "category": "reader"
        },
        {
            "parameter": {
                "postSql": [],// インポート後に実行する SQL 文。
                "datasource": "test_004",// データソース名。
                "column": [// ターゲットテーブルの列。
                    "id",
                    "name",
                    "sex",
                    "salary",
                    "age"
                ],
                "table": "public.person",// ターゲットテーブル名。
                "preSql": []// インポート前に実行する SQL 文。
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "version": "2.0",// バージョン番号。
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    },
    "setting": {
        "errorLimit": {// エラーレコード数。
            "record": ""
        },
        "speed": {
            "throttle":true,// throttle を false に設定すると、mbps パラメーターは有効にならず、レート制限は適用されません。throttle を true に設定すると、レート制限が適用されます。
            "concurrent":6, // 同時実行ジョブ数。
            "mbps":"12"// レート制限。
        }
    }
}

Writer スクリプトのパラメーター

パラメーター

説明

必須

デフォルト値

datasource

データソース名。コードエディタはデータソースの追加をサポートしています。このパラメーターの値は、追加されたデータソースの名前と同じである必要があります。

はい

なし

table

データを同期する先のテーブルの名前。

はい

なし

writeMode

インポートモード。有効な値は insert、copy、および upsert です。

  • insertinsert into...values... 文を実行して PostgreSQL にデータを書き込みます。このモードを推奨します。

  • copy:PostgreSQL は、テーブルとファイル (標準出力または標準入力) の間でデータをコピーするための copy コマンドを提供します。Data Integration は copy from コマンドを使用してテーブルにデータをロードします。パフォーマンスの問題が発生した場合は、このモードを使用してください。

  • upsert:書き込み操作中にデータ競合が発生した場合、conflictMode パラメーターの値に基づいて競合が処理されます。

いいえ

insert

conflictMode

writeModeupsert に設定し、PostgreSQL へのデータ書き込み時にプライマリキーまたは一意のインデックスの競合が発生した場合、次のいずれかの競合解決ポリシーを選択できます。

  • replace:競合が発生した場合、新しいデータが既存のデータを上書きします。

  • ignore:競合が発生した場合、新しいデータは無視され、既存のデータが保持されます。

説明

競合解決ポリシーは、コードエディタでのみ設定できます。

いいえ

replace

column

データを書き込むターゲットテーブルの列。列はカンマ (,) で区切ります。例:"column":["id","name","age"]。すべての列に順番にデータを書き込むには、アスタリスク (*) を使用します。例:`"column":["*"]`。

はい

なし

preSql

データ同期タスクが開始される前に実行する SQL 文。コードレス UI では、1 つの SQL 文しか実行できません。コードエディタでは、既存のデータをクリアする文など、複数の SQL 文を実行できます。

いいえ

なし

postSql

データ同期タスクが完了した後に実行する SQL 文。コードレス UI では、1 つの SQL 文しか実行できません。コードエディタでは、タイムスタンプを追加する文など、複数の SQL 文を実行できます。

いいえ

なし

batchSize

1 回のバッチで書き込むレコード数。値を大きくすると、Data Integration と AnalyticDB for PostgreSQL 間のネットワークインタラクションの数が大幅に減り、スループットが向上する可能性があります。ただし、この値が大きすぎると、Data Integration プロセスでメモリ不足 (OOM) エラーが発生する可能性があります。

いいえ

1,024