すべてのプロダクト
Search
ドキュメントセンター

DataWorks:PolarDB データソース

最終更新日:Jul 09, 2025

DataWorks は、PolarDB データソースとのデータの読み取りと書き込みを行うための PolarDB Reader と PolarDB Writer を提供しています。 コーディングレス ユーザーインターフェース (UI) またはコードエディタを使用して、PolarDB データソースの同期タスクを設定できます。

制限事項

バッチデータの読み取りと書き込み

ビューのデータを読み取ることができます。

リアルタイムデータの読み取り

同期タスクのソースが PolarDB for MySQL クラスタの場合、クラスタのバイナリロギング機能を有効にする必要があります。 PolarDB for MySQL は MySQL と完全に互換性があり、高レベルの物理ログを使用してバイナリログを置き換えます。 PolarDB と MySQL エコシステム間の統合を容易にするために、PolarDB クラスタのバイナリロギング機能を有効にすることができます。

データ型マッピング

バッチデータの読み取り

次の表に、PolarDB Reader がデータ型を変換する際のデータ型マッピングを示します。

カテゴリ

PolarDB データ型

整数

INT、TINYINT、SMALLINT、MEDIUMINT、および BIGINT

浮動小数点

FLOAT、DOUBLE、および DECIMAL

文字列

VARCHAR、CHAR、TINYTEXT、TEXT、MEDIUMTEXT、および LONGTEXT

日付と時刻

DATE、DATETIME、TIMESTAMP、TIME、および YEAR

ブール値

BIT および BOOL

バイナリ

TINYBLOB、MEDIUMBLOB、BLOB、LONGBLOB、および VARBINARY

説明
  • 上記の表に記載されていないデータ型はサポートされていません。

  • PolarDB Reader は TINYINT (1) を整数データ型として処理します。

バッチデータの書き込み

PolarDB Reader と同様に、PolarDB Writer はほとんどの PolarDB データ型をサポートしています。 データベースのデータ型がサポートされていることを確認してください。

次の表に、PolarDB Writer がデータ型を変換する際のデータ型マッピングを示します。

カテゴリ

PolarDB データ型

整数

INT、TINYINT、SMALLINT、MEDIUMINT、BIGINT、および YEAR

浮動小数点

FLOAT、DOUBLE、および DECIMAL

文字列

VARCHAR、CHAR、TINYTEXT、TEXT、MEDIUMTEXT、および LONGTEXT

日付と時刻

DATE、DATETIME、TIMESTAMP、および TIME

ブール値

BOOL

バイナリ

TINYBLOB、MEDIUMBLOB、BLOB、LONGBLOB、および VARBINARY

データ同期前に PolarDB 環境を準備する

IP アドレスホワイトリストを設定する

Data Integration 専用リソースグループが存在する仮想プライベートクラウド (VPC) の CIDR ブロックを、PolarDB for MySQL クラスタの IP アドレスホワイトリストに追加する必要があります。

必要な権限を持つアカウントを準備する

アカウントを作成し、必要な権限をアカウントに付与します。

PolarDB for MySQL クラスタのデータベースにログオンするためのアカウントを作成する必要があります。 SELECT、REPLICATION SLAVE、および REPLICATION CLIENT 権限をアカウントに付与する必要があります。

  1. アカウントを作成します。

    詳細については、「データベースアカウントを作成および管理する」をご参照ください。

  2. 必要な権限をアカウントに付与します。

    次のコマンドを実行して必要な権限をアカウントに付与するか、SUPER ロールをアカウントに直接割り当てることができます。

    -- CREATE USER 'データ同期用アカウント'@'%' IDENTIFIED BY 'データ同期用アカウント';
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'データ同期用アカウント'@'%';

バイナリロギング機能を有効にする

詳細については、「バイナリロギングを有効にする」をご参照ください。

データソースを追加する

DataWorks で同期タスクを開発する前に、「データソース管理」の手順に従って、必要なデータソースを DataWorks に追加する必要があります。 データソースを追加する際に、DataWorks コンソールのパラメータのヒントを参照して、パラメータの意味を理解することができます

PolarDB データソースに基づいてデータ同期タスクを開発する

同期タスクの設定のエントリポイントと手順については、次の設定ガイドを参照してください。

単一テーブルのデータを同期するためのバッチ同期タスクを設定する

単一テーブルのデータまたはデータベースのすべてのデータを同期するためのリアルタイム同期タスクを設定する

設定手順の詳細については、「DataStudio でリアルタイム同期タスクを設定する」をご参照ください。

データベース内のすべてのデータのバッチ同期と、単一テーブルのデータまたはデータベース内の完全データまたは増分データのリアルタイム同期を実装するための同期設定を行う

設定手順の詳細については、「Data Integration で同期タスクを設定する」をご参照ください。

よくある質問

Oracle、PolarDB、または MySQL からデータを同期するためにリアルタイム同期タスクを実行すると、エラーが繰り返し報告されるのはなぜですか?

付録:コードとパラメータ

コードエディタを使用してバッチ同期タスクを設定する

コードエディタを使用してバッチ同期タスクを設定する場合は、統一スクリプト形式の要件に基づいて、スクリプトに関連パラメータを設定する必要があります。 詳細については、「コードエディタを使用してバッチ同期タスクを設定する」をご参照ください。 次の情報では、コードエディタを使用してバッチ同期タスクを設定する際にデータソースに設定する必要があるパラメータについて説明します。

PolarDB Reader のコード

次のコードでは、単一テーブルからデータを読み取るバッチ同期タスクが設定されています。 パラメータについては、PolarDB Reader のコードのパラメータを参照してください。

{
    "type": "job",
    "steps": [
        {
            "parameter": {
                "datasource": "test_005",// データソースの名前。
                "column": [// 列の名前。
                    "id",
                    "name",
                    "age",
                    "sex",
                    "salary",
                    "interest"
                ],
                "where": "id=1001",// WHERE 句。
                "splitPk": "id",// シャードキー。
                "table": "PolarDB_person",// テーブルの名前。
              	"useReadonly": "false"// セカンダリデータベースからデータを読み取るかどうかを指定します。
            },
            "name": "Reader",
            "category": "reader"
        },
        {
            "parameter": {}
    ],
    "version": "2.0",// バージョン番号。
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    },
    "setting": {
        "errorLimit": {// ダーティデータレコードの最大許容数。
            "record": ""
        },
        "speed": {
            "concurrent": 6,// 並列スレッドの最大数。
            "throttle": true,// スロットリングを有効にするかどうかを指定します。 値 false はスロットリングが無効であることを示し、値 true はスロットリングが有効であることを示します。 mbps パラメータは、throttle パラメータが true に設定されている場合にのみ有効になります。
      "mbps":"12",// 最大伝送速度。 単位:MB/s。
        }
    }
}

PolarDB Reader のコードのパラメータ

パラメータ

説明

必須

デフォルト値

datasource

データソースの名前。 追加されたデータソースの名前と同じである必要があります。 コードエディタを使用してデータソースを追加できます。

はい

デフォルト値なし

table

データを読み取るテーブルの名前。

はい

デフォルト値なし

useReadonly

セカンダリデータベースからデータを読み取るかどうかを指定します。 読み取り/書き込み分割を実装し、PolarDB for MySQL クラスタのセカンダリデータベースからデータを読み取る場合は、このパラメータを true に設定します。 このパラメータを空のままにすると、デフォルト値の false が使用されます。これは、プライマリデータベースからデータが読み取られることを示します。

いいえ

false

column

データを読み取る列の名前。 JSON 配列で名前を指定します。 デフォルト値は [*] で、ソーステーブルのすべての列を示します。

  • 読み取る特定の列を選択できます。

  • 列の順序を変更できます。 これは、ソーステーブルのスキーマで指定された順序とは異なる順序で列を指定できることを示します。

  • 定数がサポートされています。 列名は、["id", "table","1","'mingya.wmy'","'null'", "to_char(a+1)","2.3","true"] など、PolarDB でサポートされている SQL 構文に準拠して配置する必要があります。

    • id: 列名。

    • table: 予約キーワードを含む列の名前。

    • 1: 整数定数。

    • 'mingya.wmy': 文字列定数。 シングルクォーテーション (') で囲みます。

    • 'null': 文字列 null。

    • to_char(a+1): 文字列の長さを計算するために使用される関数式。

    • 2.3: 浮動小数点定数。

    • true: ブール値。

  • column パラメータでは、データを読み取るすべての列を明示的に指定する必要があります。 このパラメータを空にすることはできません。

はい

デフォルト値なし

splitPk

PolarDB Reader がデータを読み取るときに データシャーディング に使用されるフィールド。 このパラメータを設定すると、ソーステーブルはこのパラメータの値に基づいてシャーディングされます。 Data Integration は、並列スレッドを実行してデータを読み取ります。 これにより、データ同期の効率が向上します。

  • splitPk パラメータをテーブルのプライマリキー列の名前に設定することをお勧めします。 データは、特定のシャードにのみ集中的に分散されるのではなく、プライマリキー列に基づいて異なるシャードに均等に分散できます。

  • splitPk パラメータは、整数データ型のデータのシャーディングのみをサポートします。 このパラメータを、文字列、浮動小数点、日付データ型などのサポートされていないデータ型のフィールドに設定すると、PolarDB Reader は splitPk パラメータの設定を無視し、単一スレッドを使用してデータを読み取ります。

  • splitPk パラメータが指定されていないか空の場合、PolarDB Reader は単一スレッドを使用してデータを読み取ります。

いいえ

デフォルト値なし

splitFactor

シャーディング係数。 同期されるデータをいくつの部分にシャーディングするかを決定します。 バッチ同期タスクに並列処理を設定する場合、部分の数は次の式に基づいて計算されます。 並列スレッド数 × シャーディング係数。 たとえば、並列スレッド数とシャーディング係数が 5 の場合、同期されるデータがシャーディングされる部分の数は 25 です。

説明

1 ~ 100 の範囲のシャーディング係数を指定することをお勧めします。 100 より大きいシャーディング係数を指定すると、メモリ不足 (OOM) エラーが発生する可能性があります。

いいえ

5

where

WHERE 句。 たとえば、このパラメータを gmt_create>$bizdate に設定して、現在の日付に生成されたデータを読み取ることができます。

  • WHERE 句を使用して増分データを読み取ることができます。 where パラメータが指定されていないか空の場合、PolarDB Reader はすべてのデータを読み取ります。

  • where パラメータを limit 10 に設定しないでください。 この値は、SQL WHERE 句に対する PolarDB の制約に準拠していません。

いいえ

デフォルト値なし

querySql (高度なパラメータ。 コードエディタでのみ使用可能)

詳細なデータフィルタリングに使用される SQL 文。 このパラメータを設定すると、PolarDB Reader は columntablewhere パラメータの設定を無視し、このパラメータの値のみに基づいてデータをフィルタリングします。 たとえば、データ同期のために複数のテーブルを結合する場合は、このパラメータを select a,b from table_a join table_b on table_a.id = table_b.id に設定できます。 querySql パラメータの優先度は、columntablewhere、splitPk パラメータの優先度よりも高くなります。 datasource パラメータで指定されたデータソースは、このパラメータからユーザー名とパスワードなどの情報を解析します。

いいえ

デフォルト値なし

PolarDB Writer のコード

次のコードでは、PolarDB にデータを書き込むバッチ同期タスクが設定されています。 パラメータについては、PolarDB Writer のコードのパラメータを参照してください。

{
    "type": "job",
    "steps": [
        {
            "parameter": {},
            "name": "Reader",
            "category": "reader"
        },
        {
            "parameter": {
                "postSql": [],// バッチ同期タスクの実行後に実行する SQL 文。
                "datasource": "test_005",// データソースの名前。
                "column": [// 列の名前。
                    "id",
                    "name",
                    "age",
                    "sex",
                    "salary",
                    "interest"
                ],
                "writeMode": "insert",// 書き込みモード。
                "batchSize": 256,// 一度に書き込むデータレコードの数。
                "table": "PolarDB_person_copy",// テーブルの名前。
                "preSql": []// バッチ同期タスクの実行前に実行する SQL 文。
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "version": "2.0",// バージョン番号。
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    },
    "setting": {
        "errorLimit": {// ダーティデータレコードの最大許容数。
            "record": ""
        },
        "speed": {
            "throttle":true,// 帯域幅スロットリングを有効にするかどうかを指定します。 値 false は帯域幅スロットリングが無効であることを示し、値 true は帯域幅スロットリングが有効であることを示します。 mbps パラメータは、throttle パラメータが true に設定されている場合にのみ有効になります。
            "concurrent":6, // 並列スレッドの最大数。
            "mbps":"12",// 最大伝送速度。 単位:MB/s。
        }
    }
}

PolarDB Writer のコードのパラメータ

  • PolarDB Writer のコードのパラメータ

    パラメータ

    説明

    必須

    デフォルト値

    datasource

    データソースの名前。 追加されたデータソースの名前と同じである必要があります。 コードエディタを使用してデータソースを追加できます。

    はい

    デフォルト値なし

    table

    データを読み取るテーブルの名前。

    はい

    デフォルト値なし

    writeMode

    書き込みモード。 有効値:

    • insert: このモードは、コーディングレス UI の INSERT INTO と同等です。

    • update (コーディングレス UI の ON DUPLICATE KEY UPDATE と同等)

    • replace: このモードは、コーディングレス UI の REPLACE INTO と同等です。

    書き込みモードとデータ例の詳細については、writeMode パラメータの説明 を参照してください。

    説明

    デスティネーションが PolarDB for PostgreSQL の場合、insert モードのみがサポートされます。 データを更新し、プライマリキーの競合を回避するには、バッチ同期を実行する前に、デスティネーションテーブル内の重複レコードを削除することをお勧めします。 次のアプローチのいずれかを使用できます。

    アプローチ 1:preSql パラメータ (コーディングレス UI の デスティネーションにデータを書き込む前に実行される文 と同等) に TRUNCATE 文を設定して、デスティネーションテーブルをクリアします。

    アプローチ 2:バッチ同期ノードのアップストリームノードでデスティネーションテーブルを処理して、データ同期中にプライマリキーの競合が発生しないようにします。

    いいえ

    insert

    column

    データを書き込む列の名前。 名前をコンマ (,) で区切ります。 例:"column":["id","name","age"]。 デスティネーションテーブルのすべての列にデータを書き込む場合は、このパラメータをアスタリスク (*) に設定します (例:"column":["*"])。

    はい

    デフォルト値なし

    preSql

    バッチ同期タスクの実行前に実行する SQL 文。 たとえば、このパラメータを古いデータを削除するために使用される SQL 文に設定できます。 コーディングレス UI では 1 つの SQL 文のみを実行でき、コードエディタでは複数の SQL 文を実行できます。

    いいえ

    デフォルト値なし

    postSql

    バッチ同期タスクの実行後に実行する SQL 文。 たとえば、このパラメータをタイムスタンプを追加するために使用される SQL 文に設定できます。 コーディングレス UI では 1 つの SQL 文のみを実行でき、コードエディタでは複数の SQL 文を実行できます。

    いいえ

    デフォルト値なし

    batchSize

    一度に書き込むデータレコードの数。 ビジネス要件に基づいて、このパラメータを適切な値に設定します。 これにより、Data Integration と PolarDB 間の相互作用が大幅に削減され、スループットが向上します。 このパラメータを過度に大きい値に設定すると、データ同期中にメモリ不足 (OOM) エラーが発生する可能性があります。

    いいえ

    1,024

    updateColumn

    プライマリキーの競合または一意なインデックスの競合が発生した場合に更新される列の名前。 このパラメータは、writeMode パラメータが update に設定されている場合にのみ有効になります。 "updateColumn": ["name", "age"] など、名前をコンマ (,) で区切ります。

    説明

    PolarDB for MySQL データソースのみがこのパラメータをサポートしています。

    いいえ

    デフォルト値なし

  • writeMode パラメータの説明

    項目

    insert (コーディングレス UI の INSERT INTO と同等)

    update (コーディングレス UI の ON DUPLICATE KEY UPDATE と同等)

    replace (コーディングレス UI の REPLACE INTO と同等)

    処理ルール

    プライマリキーの競合または一意なインデックスの競合が発生した場合、データは競合する行に書き込まれず、これらの行に書き込まれないデータはダーティデータと見なされます。

    プライマリキーの競合または一意なインデックスの競合が発生しない場合、データは、このパラメータを insert に設定した場合と同じ方法で処理されます。 競合が発生した場合、デスティネーションテーブル内の競合する行のデータは新しいデータに置き換えられます。

    プライマリキーの競合または一意なインデックスの競合が発生しない場合、データは、このパラメータを insert に設定した場合と同じ方法で処理されます。 競合が発生した場合、元の行は削除され、新しい行が挿入されます。 これは、元の行のすべてのフィールドが置き換えられることを示します。

    データ例

    • ソーステーブルのデータ

      +----+---------+-----+
      | id | name    | age |
      +----+---------+-----+
      | 1  | zhangsan| 1   |
      | 2  | lisi    |     |
      +----+---------+-----+
    • デスティネーションテーブルの元のデータ

      +----+---------+-----+
      | id | name    | age |
      +----+---------+-----+
      | 2  | wangwu  |     |
      +----+---------+-----+
    • バッチ同期タスクの実行後、1 つのデータレコードがデスティネーションテーブルに書き込まれ、1 つのデータレコードが生成されます。

      +----+---------+-----+
      | id | name    | age |
      +----+---------+-----+
      | 1  | zhangsan| 1   |
      | 2  | wangwu  |     |
      +----+---------+-----+
    • シナリオ 1:一部の列のみが指定されている:"column": ["id","name"]

      • ソーステーブルのデータ

        +----+---------+-----+
        | id | name    | age |
        +----+---------+-----+
        | 1  | zhangsan| 1   |
        | 2  | lisi    |     |
        +----+---------+-----+
      • デスティネーションテーブルの元のデータ

        +----+---------+-----+
        | id | name    | age |
        +----+---------+-----+
        | 2  | wangwu  |  3  |
        +----+---------+-----+
      • バッチ同期タスクの実行後、2 つのデータレコードがデスティネーションテーブルに書き込まれ、ダーティデータレコードは生成されません。

        +----+---------+-----+
        | id | name    | age |
        +----+---------+-----+
        | 1  | zhangsan| 1   |
        | 2  | lisi    | 3   |
        +----+---------+-----+
    • シナリオ 2:すべての列が指定されている:"column": ["id","name","age"]

      • ソーステーブルのデータ

        +----+---------+-----+
        | id | name    | age |
        +----+---------+-----+
        | 1  | zhangsan| 1   |
        | 2  | lisi    |     |
        +----+---------+-----+
      • デスティネーションテーブルの元のデータ

        +----+---------+-----+
        | id | name    | age |
        +----+---------+-----+
        | 2  | wangwu  |  3  |
        +----+---------+-----+
      • バッチ同期タスクの実行後、2 つのデータレコードがデスティネーションテーブルに書き込まれ、ダーティデータレコードは生成されません。

        +----+---------+-----+
        | id | name    | age |
        +----+---------+-----+
        | 1  | zhangsan| 1   |
        | 2  | lisi    |     |
        +----+---------+-----+
    • ソーステーブルのデータ

      +----+---------+-----+
      | id | name    | age |
      +----+---------+-----+
      | 1  | zhangsan| 1   |
      | 2  | lisi    |     |
      +----+---------+-----+
    • デスティネーションテーブルの元のデータ

      +----+---------+-----+
      | id | name    | age |
      +----+---------+-----+
      | 2  | wangwu  |  3  |
      +----+---------+-----+
    • バッチ同期タスクの実行後、2 つのデータレコードがデスティネーションテーブルに書き込まれ、ダーティデータレコードは生成されません。

      +----+---------+-----+
      | id | name    | age |
      +----+---------+-----+
      | 1  | zhangsan| 1   |
      | 2  | lisi    |     |
      +----+---------+-----+