すべてのプロダクト
Search
ドキュメントセンター

DataWorks:PolarDB-X 2.0 データソース

最終更新日:Jan 11, 2025

DataWorks は、PolarDB-X 2.0 データソースとのデータの読み取りと書き込みを行うための PolarDB-X 2.0 Reader と PolarDB-X 2.0 Writer を提供しています。このトピックでは、PolarDB-X 2.0 データソースとのデータ同期機能について説明します。

制限事項

PolarDB-X 2.0 データソースとのデータの読み取りと書き込みには、Data Integration 専用リソースグループのみを使用できます。

サポートされているバージョン

バッチデータの読み取りと書き込み:PolarDB-X 2.0 がサポートされています。バッチ同期中にビューのデータを読み取ることができます。

サポートされているデータ型

PolarDB-X 2.0 のすべてのデータ型については、データ型をご参照ください。次の表は、主要なデータ型のサポート状況を示しています。

データ型

バッチデータ読み取り用 PolarDB-X 2.0 Reader

バッチデータ書き込み用 PolarDB-X 2.0 Writer

TINYINT

サポートされています

サポートされています

SMALLINT

サポートされています

サポートされています

INTEGER

サポートされています

サポートされています

BIGINT

サポートされています

サポートされています

FLOAT

サポートされています

サポートされています

DOUBLE

サポートされています

サポートされています

DECIMAL/NUMBERIC

サポートされています

サポートされています

REAL

サポートされていません

サポートされていません

VARCHAR

サポートされています

サポートされています

JSON

サポートされています

サポートされています

TEXT

サポートされています

サポートされています

MEDIUMTEXT

サポートされています

サポートされています

LONGTEXT

サポートされています

サポートされています

VARBINARY

サポートされています

サポートされています

BINARY

サポートされています

サポートされています

TINYBLOB

サポートされています

サポートされています

MEDIUMBLOB

サポートされています

サポートされています

LONGBLOB

サポートされています

サポートされています

ENUM

サポートされています

サポートされています

SET

サポートされています

サポートされています

BOOLEAN

サポートされています

サポートされています

BIT

サポートされています

サポートされています

DATE

サポートされています

サポートされています

DATETIME

サポートされています

サポートされています

TIMESTAMP

サポートされています

サポートされています

TIME

サポートされています

サポートされています

YEAR

サポートされています

サポートされています

LINESTRING

サポートされていません

サポートされていません

POLYGON

サポートされていません

サポートされていません

MULTIPOINT

サポートされていません

サポートされていません

MULTILINESTRING

サポートされていません

サポートされていません

MULTIPOLYGON

サポートされていません

サポートされていません

GEOMETRYCOLLECTION

サポートされていません

サポートされていません

データ同期前に PolarDB-X 2.0 環境を準備する

DataWorks を使用して PolarDB-X 2.0 データソースとのデータ同期を行う前に、PolarDB-X 2.0 環境を準備する必要があります。これにより、データ同期タスクを構成し、PolarDB-X 2.0 データソースとのデータ同期を想定どおりに実行できるようになります。PolarDB-X 2.0 データソースとのデータ同期のための PolarDB-X 2.0 環境の準備方法について、以下で説明します。

準備 1:PolarDB-X データベースのバージョンを確認する

PolarDB-X データベースのバージョンが PolarDB-X 1.0 の場合は、DRDS (PolarDB-X 1.0) データソースを参照して環境を準備してください。PolarDB-X データベースのバージョンが PolarDB-X 2.0 の場合は、このトピックを参照して環境を準備してください。

準備 2:必要な権限を持つアカウントを準備する

DataWorks が PolarDB-X 2.0 データベースにアクセスするためのアカウントを計画し、作成することをお勧めします。このようなアカウントを準備するには、次の手順を実行します。

  1. オプション。PolarDB-X 2.0コンソールにアカウントを作成する にログインして、します。すでにこのようなアカウントをお持ちの場合は、この手順をスキップできます。

  2. アカウントに必要な権限を付与します。

    • バッチ同期:

      • バッチデータ読み取り:アカウントには SELECT 権限が必要です。

      • バッチデータ書き込み:アカウントには INSERT、DELETE、および UPDATE 権限が必要です。

    • データベース内のデータのリアルタイム同期:

      • 特権アカウント:デフォルトでは、バイナリログデータを読み取ってリアルタイム同期を実行できます。

      • 標準アカウント:特権アカウントを使用して、特定のデータベースに対する SELECT、REPLICATION SLAVE、および REPLICATION CLIENT 権限を標準アカウントに付与します。

-- データ同期に使用できるアカウントを作成し、パスワードを指定します。これにより、任意のホストからアカウントとパスワードを使用してデータベースにアクセスできます。% はホストを示します。
-- CREATE USER 'Account for data synchronization'@'%' IDENTIFIED BY 'Password'.
-- アカウントに SELECT、REPLICATION SLAVE、および REPLICATION CLIENT 権限を付与します。
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'Account for data synchronization'@'%'; 

データ同期タスクを開発する

データ同期タスクの開始点と構成手順については、以下のセクションをご参照ください。パラメータ設定については、タスクの構成タブにある各パラメータのインフォチップを表示してください。

データソースを追加する

特定のデータソースとのデータ同期を行うデータ同期タスクを構成する前に、DataWorks にデータソースを追加する必要があります。詳細については、データソースの追加と管理をご参照ください。

単一テーブルのデータを同期するバッチ同期タスクを構成する

付録:コードとパラメータ

コードエディタを使用してバッチ同期タスクを構成する

コードエディタを使用してバッチ同期タスクを構成する場合は、コードエディタの形式要件に基づいて、関連データソースのリーダーとライターのパラメータを構成する必要があります。形式要件の詳細については、コードエディタを使用してバッチ同期タスクを構成するをご参照ください。コードエディタのリーダーとライターのパラメータの構成の詳細を以下に示します。

PolarDB-X 2.0 Reader のコード

{
    "type":"job",
    "version":"2.0",// バージョン番号。
    "steps":[
        {
            "stepType":"polardbx20",// プラグイン名。
            "parameter":{
               "connection": [
                  {
                      "datasource":"",
                      "table": [
                          "t1"
                      ]
                  }
              ],
              "column": [
                  "c1",
                  "c2",
                  "'const'"
              ],
              "where": "",
              "splitPk": "",
              "checkSlave": "true",
              "slaveDelayLimit": "300"
            },
            "name":"Reader",
            "category":"reader"
        },
        {
            "stepType":"stream",
            "parameter":{},
            "name":"Writer",
            "category":"writer"
        }
    ],
    "setting":{
        "errorLimit":{
            "record":"0"// 許容されるダーティデータレコードの最大数。
        },
        "speed":{
            "throttle":true,// スロットリングを有効にするかどうかを指定します。値 false はスロットリングが無効であることを示し、値 true はスロットリングが有効であることを示します。mbps パラメータは、throttle パラメータが true に設定されている場合にのみ有効になります。
            "concurrent":1,// 並列スレッドの最大数。
            "mbps":"12"// 最大伝送速度。
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}

PolarDB-X 2.0 Reader のコードのパラメータ

パラメータ

説明

必須

デフォルト値

datasource

データソースの名前。データソース管理ページで追加したデータソースの名前と同じである必要があります。

はい

デフォルト値なし

table

データを読み取るテーブルの名前。各同期タスクは、1 つのテーブルからのみデータを同期するために使用できます。

はい

デフォルト値なし

column

データを読み取る列の名前。JSON 配列で名前を指定します。デフォルト値は [*] で、ソーステーブルのすべての列を示します。

  • 読み取る特定の列を選択できます。

  • 列の順序を変更できます。これは、ソーステーブルのスキーマで指定された順序とは異なる順序で列を指定できることを示します。

  • 定数がサポートされています。列名は、PolarDB-X 2.0 でサポートされている SQL 構文に準拠して配置する必要があります。例:["id","table","1","'mingya.wmy'","'null'","to_char(a+1)","2.3","true"]

    • id:列名。

    • table:予約キーワードを含む列の名前。

    • 1:整数定数。

    • 'mingya.wmy':文字列定数。単一引用符 (') で囲みます。

    • null

      • " " は空の文字列を示します。

      • null は null 値を示します。

      • 'null' は文字列 null を示します。

    • to_char(a+1):文字列の長さを計算するために使用される関数式。

    • 2.3:浮動小数点定数。

    • true:ブール値。

  • column パラメータには、データを読み取るすべての列を明示的に指定する必要があります。このパラメータを空にすることはできません。

はい

デフォルト値なし

splitPk

PolarDB-X 2.0 Reader がデータを読み取るときにデータシャーディングに使用される フィールド。このパラメータを構成すると、ソーステーブルはこのパラメータの値に基づいてシャーディングされます。次に、並列スレッドが実行されてデータが読み取られます。これにより、データ同期をより効率的に行うことができます。

  • splitPk パラメータをテーブルのプライマリキー列の名前に設定することをお勧めします。プライマリキー列に基づいてデータを異なるシャードに均等に分散させることができ、特定のシャードにのみ集中的に分散されることを防ぎます。

  • splitPk パラメータは、整数データ型のフィールドに設定する必要があります。splitPk パラメータを文字列、浮動小数点、または日付データ型などのサポートされていないデータ型のフィールドに設定すると、このパラメータの設定は無視され、単一スレッドを使用してデータが読み取られます。

  • splitPk パラメータが PolarDB-X 2.0 Reader のコードに含まれていないか、空のままになっている場合、単一スレッドを使用してデータが読み取られます。

いいえ

デフォルト値なし

where

WHERE 句。たとえば、このパラメータを gmt_create>$bizdate に設定して、現在の日付に生成されたデータを読み取ることができます。

  • WHERE 句を使用して増分データを読み取ることができます。where パラメータが指定されていないか、空のままになっている場合、PolarDB-X 2.0 Reader はすべてのデータを読み取ります。

  • where パラメータを LIMIT 10 に設定しないでください。この値は、SQL WHERE 句に対する PolarDB-X 2.0 の制約に準拠していません。

いいえ

デフォルト値なし

checkSlave

プライマリインスタンスと読み取り専用セカンダリインスタンス間のデータ同期のレイテンシを確認するかどうかを指定します。PolarDB-X 2.0 読み取り専用インスタンスをデータソースとして追加する場合は、データ同期タスクを実行する前に、読み取り専用インスタンスとプライマリインスタンス間のデータ同期のレイテンシを確認する必要があります。これは、レイテンシによるデータ損失を防ぐのに役立ちます。

いいえ

true

slaveDelayLimit

プライマリインスタンスと読み取り専用インスタンス間のデータ同期の最大レイテンシ(秒単位)。レイテンシがこのパラメータの値を超えると、データ同期タスクは失敗します。これは、レイテンシによるデータ損失を防ぐのに役立ちます。

いいえ

30

PolarDB-X 2.0 Writer のコード

{
    "type":"job",
    "version":"2.0",// バージョン番号。
    "steps":[
        {
            "stepType":"stream",
            "parameter":{},
            "name":"Reader",
            "category":"reader"
        },
        {
            "stepType":"PolarDB-X 2.0",// プラグイン名。
            "parameter":{
                "postSql":[],// 同期タスクの実行後に実行する SQL ステートメント。
                "datasource":"",// データソースの名前。
                "column":[// 列の名前。
                    "id",
                    "value"
                ],
                "writeMode":"insert",// 書き込みモード。有効な値:insert と replace。
                "batchSize":1024,// 一度に書き込むデータレコードの数。
                "table":"",// テーブルの名前。
                "preSql":[
                     "delete from XXX;" // 同期タスクの実行前に実行する SQL ステートメント。
                   ]
            },
            "name":"Writer",
            "category":"writer"
        }
    ],
    "setting":{
        "errorLimit":{// 許容されるダーティデータレコードの最大数。
            "record":"0"
        },
        "speed":{
            "throttle":true,// スロットリングを有効にするかどうかを指定します。値 false はスロットリングが無効であることを示し、値 true はスロットリングが有効であることを示します。mbps パラメータは、throttle パラメータが true に設定されている場合にのみ有効になります。
            "concurrent":1, // 並列スレッドの最大数。
            "mbps":"12"// 最大伝送速度。ソースの読み取り負荷またはデスティネーションの書き込み負荷が高くなりすぎるのを防ぐために、最大伝送速度を指定できます。
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}

PolarDB-X 2.0 Writer のコードのパラメータ

パラメータ

説明

必須

デフォルト値

datasource

データソースの名前。データソース管理ページで追加したデータソースの名前と同じである必要があります。

はい

デフォルト値なし

table

データを書き込むテーブルの名前。

はい

デフォルト値なし

writeMode

書き込みモード。有効な値:

  • insert into:プライマリキーの競合または一意インデックスの競合が発生した場合、競合する行にデータを書き込むことができず、これらの行に書き込まれなかったデータはダーティデータと見なされます。コードエディタを使用して同期タスクを構成する場合は、writeMode を insert に設定します。

  • replace into:プライマリキーの競合または一意インデックスの競合が発生しない場合、データはこのパラメータを insert into に設定した場合と同じ方法で処理されます。競合が発生した場合、元の行が削除され、新しい行が挿入されます。これは、元の行のすべてのフィールドが置き換えられることを示します。コードエディタを使用して同期タスクを作成する場合は、writeMode を replace に設定します。

いいえ

insert

column

データを書き込む列の名前。"column": ["id", "name", "age"] のように、名前をコンマ (,) で区切ります。デスティネーションテーブルのすべての列にデータを書き込む場合は、"column": ["*"] のように、このパラメータをアスタリスク (*) に設定します。

はい

デフォルト値なし

preSql

同期タスクの実行前に実行する SQL ステートメント。コードレス UI では 1 つの SQL ステートメントのみを実行でき、コードエディタでは複数の SQL ステートメントを実行できます。たとえば、同期タスクの実行前に古いデータを削除するために、このパラメータを次の SQL ステートメントに設定できます。

-- tablename はテーブル名を示します。
truncate table tablename;
説明

複数の SQL ステートメントを構成した場合、ステートメントは同じトランザクションで実行されません。

いいえ

デフォルト値なし

postSql

同期タスクの実行後に実行する SQL ステートメント。コードレス UI では 1 つの SQL ステートメントのみを実行でき、コードエディタでは複数の SQL ステートメントを実行できます。たとえば、同期タスクの実行後にタイムスタンプを追加するために、このパラメータを次のステートメントに設定できます。

-- tablename はテーブル名を示し、colname は列名を示します。
alter table tablename
add colname timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP;
説明

複数の SQL ステートメントを構成した場合、ステートメントは同じトランザクションで実行されません。

いいえ

デフォルト値なし

batchSize

一度に書き込むデータレコードの数。ビジネス要件に基づいて、このパラメータを適切な値に設定します。これにより、Data Integration と PolarDB-X 2.0 間の相互作用が大幅に削減され、スループットが向上します。このパラメータを過度に大きな値に設定すると、データ同期中にメモリ不足 (OOM) エラーが発生する可能性があります。

いいえ

256