すべてのプロダクト
Search
ドキュメントセンター

DataWorks:AnalyticDB for PostgreSQL データソース

最終更新日:Jan 11, 2025

DataWorks は、AnalyticDB for PostgreSQL データソースからデータを読み取り、AnalyticDB for PostgreSQL データソースにデータを書き込むための AnalyticDB for PostgreSQL Reader と AnalyticDB for PostgreSQL Writer を提供しています。このトピックでは、AnalyticDB for PostgreSQL データソースとのデータ同期機能について説明します。

制限事項

バッチ同期中にビューのデータを読み取ることができます。

サポートされている AnalyticDB for PostgreSQL のバージョン

V7.0 以前の AnalyticDB for PostgreSQL がサポートされています。

データ型マッピング

バッチデータ読み取り

AnalyticDB for PostgreSQL Reader は、ほとんどの AnalyticDB for PostgreSQL データ型をサポートしています。データベースのデータ型がサポートされていることを確認してください。

次の表に、AnalyticDB for PostgreSQL Reader がデータ型を変換する際に基づくデータ型マッピングを示します。

カテゴリ

AnalyticDB for PostgreSQL データ型

整数

BIGINT、BIGSERIAL、INTEGER、SMALLINT、SERIAL、および GEOMETRY

浮動小数点

DOUBLE、PRECISION、MONEY、NUMERIC、および REAL

文字列

VARCHAR、CHAR、TEXT、BIT、および INET

日付と時刻

DATE、TIME、および TIMESTAMP

ブール値

BOOL

バイナリ

BYTEA

バッチデータ書き込み

AnalyticDB for PostgreSQL Writer は、ほとんどの AnalyticDB for PostgreSQL データ型をサポートしています。データベースのデータ型がサポートされていることを確認してください。

次の表に、AnalyticDB for PostgreSQL Writer がデータ型を変換する際に基づくデータ型マッピングを示します。

カテゴリ

AnalyticDB for PostgreSQL データ型

LONG

BIGINT、BIGSERIAL、INTEGER、SMALLINT、および SERIAL

DOUBLE

DOUBLE、PRECISION、MONEY、NUMERIC、および REAL

STRING

VARCHAR、CHAR、TEXT、BIT、INET、および GEOMETRY

DATE

DATE、TIME、および TIMESTAMP

BOOLEAN

BOOL

BYTES

BYTEA

説明

AnalyticDB for PostgreSQL Writer が MONEY、INET、または BIT データ型にデータを変換する場合は、a_inet::varchar などの構文が必要です。

データ同期タスクの開発

データ同期タスクの設定のエントリポイントと手順については、以下のセクションを参照してください。パラメータ設定については、タスクの設定タブにある各パラメータの情報ヒントを参照してください。

データソースの追加

特定のデータソースとのデータ同期タスクを設定する前に、DataWorks にデータソースを追加する必要があります。詳細については、「データソースの追加と管理」をご参照ください。

単一テーブルのデータを同期するためのバッチ同期タスクの設定

データベース内のすべてのデータのバッチ同期を実装するための同期設定

設定手順の詳細については、「Data Integration での同期タスクの設定」をご参照ください。

付録:コードとパラメータ

付録:コードエディタを使用したバッチ同期タスクの設定

コードエディタを使用してバッチ同期タスクを設定する場合は、コードエディタのフォーマット要件に基づいて、関連データソースのリーダーとライターのパラメータを設定する必要があります。フォーマット要件の詳細については、「コードエディタを使用したバッチ同期タスクの設定」をご参照ください。以下では、コードエディタにおけるリーダーとライターのパラメータの設定の詳細について説明します。

AnalyticDB for PostgreSQL Reader のコード

{
    "type": "job",
    "steps": [
        {
            "parameter": {
                "datasource": "test_004",// データソースの名前。
                "column": [// 列の名前。
                    "id",
                    "name",
                    "sex",
                    "salary",
                    "age"
                ],
                "where": "id=1001",// WHERE 句。
                "splitPk": "id",// シャードキー。
                "table": "public.person"// テーブルの名前。
            },
            "name": "Reader",
            "category": "reader"
        },
        {
            "parameter": {},
            "name": "Writer",
            "category": "writer"
        }
    ],
    "version": "2.0",// バージョン番号。
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    },
    "setting": {
        "errorLimit": {// 許容されるダーティデータレコードの最大数。
            "record": ""
        },
        "speed": {
            "concurrent": 6,// 並列スレッドの最大数。
            "throttle": true,// スロットリングを有効にするかどうかを指定します。値 false はスロットリングが無効であることを示し、値 true はスロットリングが有効であることを示します。mbps パラメータは、throttle パラメータが true に設定されている場合にのみ有効になります。
           "mbps":"12"// 最大転送速度。単位:MB/秒。
        }
    }
}

AnalyticDB for PostgreSQL Reader のコードのパラメータ

パラメータ

説明

必須

デフォルト値

datasource

データソースの名前。追加されたデータソースの名前と同じである必要があります。コードエディタを使用してデータソースを追加できます。

はい

デフォルト値なし

table

データを読み取るテーブルの名前。

はい

デフォルト値なし

column

データを読み取る列の名前。JSON 配列で名前を指定します。デフォルト値は [*] で、ソーステーブルのすべての列を示します。

  • 読み取る特定の列を選択できます。

  • 列の順序を変更できます。これは、ソーステーブルのスキーマで指定された順序とは異なる順序で列を指定できることを示します。

  • 定数がサポートされています。列名は、["id", "table","1","'mingya.wmy'","'null'", "to_char(a+1)","2.3","true"] など、AnalyticDB for PostgreSQL でサポートされている SQL 構文に準拠して配置する必要があります。

    • id: 列名。

    • table: 予約語を含む列の名前。

    • 1: 整数定数。

    • 'mingya.wmy': 文字列定数。単一引用符 (') で囲みます。

    • 'null': 文字列 null。

    • to_char(a+1): 文字列の長さを計算するために使用される関数式。

    • 2.3: 浮動小数点定数。

    • true: ブール値。

  • column パラメータは、データを読み取るすべての列を明示的に指定する必要があります。このパラメータを空にすることはできません。

はい

デフォルト値なし

splitPk

AnalyticDB for PostgreSQL Reader がデータを読み取るときにデータシャーディングに使用されるフィールド。このパラメータを指定すると、ソーステーブルはこのパラメータの値に基づいてシャーディングされます。Data Integration は、並列スレッドを実行してデータを読み取ります。これにより、データ同期の効率が向上します。

  • splitPk パラメータをテーブルのプライマリキー列の名前に設定することをお勧めします。プライマリキー列に基づいてデータを異なるシャードに均等に分散できます。特定のシャードにのみ集中的に分散されることはありません。

  • splitPk パラメータは、整数データ型のデータのみのシャーディングをサポートしています。 splitPk パラメータを、文字列、浮動小数点、日付データ型などのサポートされていないデータ型のフィールドに設定すると、このパラメータの設定は無視され、単一スレッドを使用してデータが読み取られます。

  • splitPk パラメータが指定されていないか空の場合、単一スレッドを使用してデータが読み取られます。

いいえ

デフォルト値なし

where

WHERE 句。AnalyticDB for PostgreSQL Reader は、tablecolumn、および where パラメータの設定に基づいて SQL ステートメントを生成し、生成されたステートメントを使用してデータを読み取ります。たとえば、テストを実行する場合、where パラメータを id>2 and sex=1 に設定して、現在の日付に生成されたデータを読み取ることができます。

  • WHERE 句を使用して増分データを読み取ることができます。

  • where パラメータが指定されていないか空の場合、AnalyticDB for PostgreSQL Reader はすべてのデータを読み取ります。

いいえ

デフォルト値なし

querySql (高度なパラメータ。コードエディタでのみ使用可能)

絞り込みデータフィルタリングに使用される SQL ステートメント。このパラメータを指定すると、AnalyticDB for PostgreSQL Reader はこのパラメータの値のみに基づいてデータをフィルタリングします。たとえば、複数のテーブルを結合してデータ同期を行う場合は、このパラメータを select a,b from table_a join table_b on table_a.id = table_b.id に設定します。

このパラメータを指定すると、AnalyticDB for PostgreSQL Reader は columntable、および where パラメータの設定を無視します。

いいえ

デフォルト値なし

fetchSize

一度に読み取るデータレコードの数。このパラメータは、Data Integration とソースデータベース間のインタラクションの数を決定し、読み取り効率に影響します。

説明

このパラメータを 2048 より大きい値に設定すると、データ同期中にメモリ不足 (OOM) エラーが発生する可能性があります。

いいえ

512

AnalyticDB for PostgreSQL Writer のコード

{
    "type": "job",
    "steps": [
        {
            "parameter": {},
            "name": "Reader",
            "category": "reader"
        },
        {
            "parameter": {
                "postSql": [],// 同期タスクの実行後に実行する SQL ステートメント。
                "datasource": "test_004",// データソースの名前。
                "column": [// 列の名前。
                    "id",
                    "name",
                    "sex",
                    "salary",
                    "age"
                ],
                "table": "public.person",// テーブルの名前。
                "preSql": []// 同期タスクの実行前に実行する SQL ステートメント。
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "version": "2.0",// バージョン番号。
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    },
    "setting": {
        "errorLimit": {// 許容されるダーティデータレコードの最大数。
            "record": ""
        },
        "speed": {
            "throttle":true,// スロットリングを有効にするかどうかを指定します。値 false はスロットリングが無効であることを示し、値 true はスロットリングが有効であることを示します。mbps パラメータは、throttle パラメータが true に設定されている場合にのみ有効になります。
            "concurrent":6, // 並列スレッドの最大数。
            "mbps":"12"// 最大転送速度。
        }
    }
}

AnalyticDB for PostgreSQL Writer のコードのパラメータ

パラメータ

説明

必須

デフォルト値

datasource

データソースの名前。追加されたデータソースの名前と同じである必要があります。コードエディタを使用してデータソースを追加できます。

はい

デフォルト値なし

table

データを書き込むテーブルの名前。

はい

デフォルト値なし

writeMode

書き込みモード。有効な値:insert、copy、および upsert。

  • insert: AnalyticDB for PostgreSQL Writer は、INSERT INTO...VALUES... ステートメントを実行して、AnalyticDB for PostgreSQL データベースにデータを書き込みます。ほとんどの場合、このモードを選択することをお勧めします。

  • copy: AnalyticDB for PostgreSQL は、テーブルと標準入力または標準出力ファイル間でデータをコピーするための copy コマンドを提供します。Data Integration は、ファイルからテーブルにデータをコピーできる COPY FROM ステートメントをサポートしています。パフォーマンスの問題が発生した場合は、このモードを使用することをお勧めします。

  • upsert: データの書き込み時に競合が発生した場合、conflictMode パラメータの設定に基づいて、新規データと古いデータが処理されます。

いいえ

insert

conflictMode

writeModeupsert に設定されていて、データの書き込み時にプライマリキーの競合または一意インデックスの競合が発生した場合、次のポリシーを使用して競合を処理できます。

  • replace: データの書き込み時に競合が発生した場合、同期される新しいデータは古いデータを上書きします。

  • ignore: データの書き込み時に競合が発生した場合、新しいデータは無視され、古いデータが保持されます。

説明

このパラメータは、コードエディタを使用して同期タスクを設定する場合にのみ設定できます。

いいえ

replace

column

データを書き込む列の名前。"column":["id","name","age"] のように、名前をコンマ (,) で区切ります。宛先テーブルのすべての列にデータを書き込む場合は、このパラメータをアスタリスク (*) に設定します (例: "column":["*"])。

はい

デフォルト値なし

preSql

同期タスクの実行前に実行する SQL ステートメント。たとえば、このパラメータを古いデータを削除するために使用される SQL ステートメントに設定できます。コードレス UI では 1 つの SQL ステートメントのみを実行でき、コードエディタでは複数の SQL ステートメントを実行できます。

いいえ

デフォルト値なし

postSql

同期タスクの実行後に実行する SQL ステートメント。たとえば、このパラメータをタイムスタンプを追加するために使用される SQL ステートメントに設定できます。コードレス UI では 1 つの SQL ステートメントのみを実行でき、コードエディタでは複数の SQL ステートメントを実行できます。

いいえ

デフォルト値なし

batchSize

一度に書き込むデータレコードの数。ビジネス要件に基づいて、このパラメータを適切な値に設定します。これにより、Data Integration と AnalyticDB for PostgreSQL 間のインタラクションが大幅に削減され、スループットが向上します。このパラメータを過度に大きい値に設定すると、データ同期中に OOM エラーが発生する可能性があります。

いいえ

1,024