すべてのプロダクト
Search
ドキュメントセンター

DataWorks:DRDS (PolarDB-X 1.0) データソース

最終更新日:Jan 11, 2025

DataWorks は、DRDS (PolarDB-X 1.0) データソースからのデータの読み取りと、DRDS (PolarDB-X 1.0) データソースへのデータの書き込みを行うための DRDS (PolarDB-X 1.0) Reader と DRDS (PolarDB-X 1.0) Writer を提供しています。このトピックでは、DRDS (PolarDB-X 1.0) データソースとのデータ同期機能について説明します。

サポートされている DRDS バージョン

リアルタイムデータ読み取り

DRDS (PolarDB-X 1.0) がサポートされています。 DRDS (PolarDB-X 1.0) インスタンスの作成方法の詳細については、PolarDB-X 1.0 インスタンスの作成 をご参照ください。

制限事項

バッチデータの読み取りと書き込み

  • DRDS (PolarDB-X 1.0) Reader は MySQL エンジンのみをサポートしています。 DRDS (PolarDB-X 1.0) は分散 MySQL データベースサービスです。 DRDS (PolarDB-X 1.0) で使用されるほとんどの通信プロトコルは、MySQL で使用される通信プロトコルと同じです。

  • MySQL 8.0 を実行する DRDS (PolarDB-X 1.0) インスタンスからデータを読み取るには、Data Integration 専用リソースグループ のみを使用できます。

  • DRDS (PolarDB-X 1.0) Writer は、Java Database Connectivity (JDBC) を使用してリモート DRDS (PolarDB-X 1.0) データベースのプロキシに接続し、REPLACE INTO ステートメントを実行して DRDS (PolarDB-X 1.0) データベースにデータを書き込みます。

    REPLACE INTO ステートメントを実行するには、テーブルにプライマリキーまたは一意のインデックスがあり、データの重複を防ぐ必要があります。

  • DRDS (PolarDB-X 1.0) Writer はリーダーからデータを取得し、REPLACE INTO ステートメントを実行して、データを宛先データベースに書き込みます。プライマリキーの競合または一意のインデックスの競合が発生しない場合、データは INSERT INTO ステートメントを実行した場合と同じ方法で処理されます。競合が発生した場合、宛先テーブルの競合する行のデータは新しいデータに置き換えられます。 DRDS (PolarDB-X 1.0) Writer は、バッファリングされたデータの量が特定のしきい値に達すると、データを DRDS (PolarDB-X 1.0) プロキシに送信します。プロキシは、データを 1 つ以上のテーブルに書き込むかどうか、および複数のテーブルに書き込む場合のデータのルーティング方法を決定します。

    説明

    DRDS (PolarDB-X 1.0) Writer を使用する同期タスクには、少なくとも REPLACE INTO ステートメントを実行するための権限が必要です。その他の権限が必要かどうかは、タスクを設定するときに preSql パラメーターと postSql パラメーターで指定する SQL ステートメントによって異なります。

  • ビューのデータを読み取ることができます。

リアルタイムデータ読み取り

  • Alibaba Cloud インスタンスモードで DataWorks に追加されたデータソースのみがサポートされています。接続文字列モードでデータソースを追加し、データ同期タスクでデータソースを使用すると、タスクは失敗します。

  • ストレージタイプは、MySQL 用 PolarDB または ApsaraDB RDS (MySQL 用 ApsaraDB RDS を除く) である必要があります。 ApsaraDB RDS は、既存の DRDS (PolarDB-X 1.0) インスタンスにのみ使用でき、新しく購入した DRDS (PolarDB-X 1.0) インスタンスには使用できません。

  • XA ROLLBACK ステートメントが実行されたデータのリアルタイム同期はサポートされていません。

    XA PREPARE ステートメントが実行されたトランザクションデータの場合、リアルタイム同期機能を使用して、データを宛先に同期できます。後で XA ROLLBACK ステートメントがデータに対して実行された場合、データに対するロールバックの変更は宛先に同期できません。同期するテーブルに XA ROLLBACK ステートメントが実行されるテーブルが含まれている場合は、XA ROLLBACK ステートメントが実行されるテーブルをリアルタイム同期タスクから削除し、削除したテーブルを再度追加してデータを同期する必要があります。

データ型のマッピング

ほとんどの DRDS (PolarDB-X 1.0) データ型がサポートされています。データベースのデータ型がサポートされていることを確認してください。

次の表に、DRDS (PolarDB-X 1.0) Reader または DRDS (PolarDB-X 1.0) Writer がデータ型を変換する際に基づくデータ型のマッピングを示します。

カテゴリ

DRDS (PolarDB-X 1.0) データ型

整数

INT、TINYINT、SMALLINT、MEDIUMINT、および BIGINT

浮動小数点数

FLOAT、DOUBLE、および DECIMAL

文字列

VARCHAR、CHAR、TINYTEXT、TEXT、MEDIUMTEXT、および LONGTEXT

日付と時刻

DATE、DATETIME、TIMESTAMP、TIME、および YEAR

ブール値

BIT および BOOLEAN

バイナリ

TINYBLOB、MEDIUMBLOB、BLOB、LONGBLOB、および VARBINARY

データ同期前の準備

後続の操作のために DRDS (PolarDB-X 1.0) データベースにログオンするために使用するアカウントを作成し、必要な権限をアカウントに付与する必要があります。詳細については、アカウントの作成をご参照ください。

データ同期タスクの開発

データ同期タスクの設定のエントリポイントと手順については、以下のセクションをご参照ください。パラメーター設定については、タスクの設定タブにある各パラメーターのヒントをご覧ください。

データソースの追加

特定のデータソースとのデータ同期を行うデータ同期タスクを設定する前に、DataWorks にデータソースを追加する必要があります。詳細については、データソースの追加と管理をご参照ください。

単一テーブルのデータを同期するバッチ同期タスクの設定

データベース内のすべてのデータのバッチ同期、または単一テーブルまたはデータベース内のフルデータと増分データのリアルタイム同期を実装するための同期設定

設定手順の詳細については、Data Integration での同期タスクの設定をご参照ください。

追加情報

  • 一貫性のあるビュー

    DRDS (PolarDB-X 1.0) は分散データベースサービスであるため、複数のデータベースの複数のテーブルに対して一貫性のあるビューを提供できません。 DRDS (PolarDB-X 1.0) Reader は異なるテーブルシャードから異なるスナップショットを取得しますが、同じタイムスライスでテーブルシャードのスナップショットを取得することはできません。そのため、DRDS (PolarDB-X 1.0) Reader はデータクエリの強い一貫性を保証できません。

  • 文字エンコーディング

    DRDS (PolarDB-X 1.0) は柔軟なエンコーディング設定をサポートしています。インスタンス全体と特定のフィールド、テーブル、データベースのエンコーディング形式を指定できます。フィールド、テーブル、データベース、インスタンスレベルの設定は、降順で優先されます。データベースには UTF-8 を使用することをお勧めします。

    DRDS (PolarDB-X 1.0) Reader は JDBC を使用してデータを読み取ります。これにより、DRDS (PolarDB-X 1.0) Reader は文字のエンコーディング形式を自動的に変換できます。そのため、エンコーディング形式を指定する必要はありません。

    DRDS (PolarDB-X 1.0) データベースのエンコーディング形式を指定したが、異なるエンコーディング形式で DRDS (PolarDB-X 1.0) データベースにデータが書き込まれた場合、DRDS (PolarDB-X 1.0) Reader はこの不整合を認識できず、文字化けした文字を出力する可能性があります。

  • 増分データ同期

    DRDS (PolarDB-X 1.0) Reader は JDBC を使用してデータベースに接続し、WHERE 句を含む SELECT ステートメントを使用して増分データを読み取ります。

    • バッチデータの場合、増分の追加、更新、削除操作 (論理削除操作を含む) はタイムスタンプによって区別されます。特定のタイムスタンプに基づいて WHERE 句を指定します。タイムスタンプで示される時刻は、前回の同期で最新のタイムスタンプで示される時刻よりも後である必要があります。

    • ストリーミングデータの場合、特定のレコードの ID に基づいて WHERE 句を指定します。 ID は、前回の同期に含まれる最大 ID よりも大きい必要があります。

    追加または変更されたデータを区別できない場合、DRDS (PolarDB-X 1.0) Reader はフルデータのみを読み取ることができます。

  • WHERE 句で物理テーブルのフィルター条件を設定することはできません。

付録: コードとパラメーター

付録: コードエディターを使用したバッチ同期タスクの設定

コードエディターを使用してバッチ同期タスクを設定する場合は、コードエディターの形式要件に基づいて、関連データソースのリーダーとライターのパラメーターを設定する必要があります。形式要件の詳細については、コードエディターを使用したバッチ同期タスクの設定をご参照ください。次の情報は、コードエディターのリーダーとライターのパラメーターの設定の詳細について説明しています。

DRDS (PolarDB-X 1.0) Reader のコード

{
    "type":"job",
    "version":"2.0",// バージョン番号。
    "steps":[
        {
            "stepType":"drds",// プラグイン名。
            "parameter":{
                "datasource":"",// データソースの名前。
                "column":[// 列の名前。
                    "id",
                    "name"
                ],
                "where":"",// WHERE 句。
                "table":"",// テーブルの名前。
                "splitPk": ""// シャードキー。
            },
            "name":"Reader",
            "category":"reader"
        },
        {
            "stepType":"stream",// プラグイン名。
            "parameter":{},
            "name":"Writer",
            "category":"writer"
        }
    ],
    "setting":{
        "errorLimit":{
            "record":"0"// 許容されるダーティデータレコードの最大数。
        },
        "speed":{
            "throttle":true,// スロットリングを有効にするかどうかを指定します。値 false はスロットリングが無効であることを示し、値 true はスロットリングが有効であることを示します。 mbps パラメーターは、throttle パラメーターが true に設定されている場合にのみ有効になります。
            "concurrent":1,// 並列スレッドの最大数。
            "mbps":"12"// 最大伝送速度。単位: MB/s。
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}

DRDS (PolarDB-X 1.0) Reader のコードのパラメーター

パラメーター

説明

必須

デフォルト値

datasource

データソースの名前。追加されたデータソースの名前と同じである必要があります。コードエディターを使用してデータソースを追加できます。

はい

デフォルト値なし

table

データを読み取るテーブルの名前。

はい

デフォルト値なし

column

データを読み取る列の名前。JSON 配列で名前を指定します。デフォルト値は [*] で、ソーステーブルのすべての列を示します。

  • 読み取る特定の列を選択できます。

  • 列の順序を変更できます。これは、ソーステーブルのスキーマで指定された順序とは異なる順序で列を指定できることを示します。

  • 定数がサポートされています。列名は、["id", "`table`", "1", "'bazhen.csy'", "null", "to_char(a + 1)", "2.3", "true"] など、MySQL でサポートされている SQL 構文に準拠して配置する必要があります。上記の例のパラメーターの説明:

    • id: 列名。

    • table: 予約語を含む列の名前。

    • 1: 整数定数。

    • bazhen.csy: 文字列定数。

    • null: ヌルポインター。

    • to_char(a + 1): 文字列の長さを計算するために使用される関数式。

    • 2.3: 浮動小数点定数。

    • true: ブール値。

  • column パラメーターは、データを読み取るすべての列を明示的に指定する必要があります。パラメーターを空にすることはできません。

はい

デフォルト値なし

where

WHERE 句。 DRDS (PolarDB-X 1.0) Reader は、columntable、および where パラメーターの設定に基づいて SQL ステートメントを生成し、生成されたステートメントを使用してデータを読み取ります。

  • WHERE 句を使用して増分データを読み取ることができます。

  • where パラメーターが設定されていないか空の場合、DRDS (PolarDB-X 1.0) Reader はフルデータを読み取ります。

たとえば、このパラメーターを STRTODATE('${bdp.system.bizdate}','%Y%m%d') <= today AND today < DATEADD(STRTODATE('${bdp.system.bizdate}', '%Y%m%d'), interval 1 day) に設定して、現在の日付に生成されたデータを読み取ることができます。

いいえ

デフォルト値なし

DRDS (PolarDB-X 1.0) Writer のコード

{
    "type":"job",
    "version":"2.0",// バージョン番号。
    "steps":[
        {
            "stepType":"stream",
            "parameter":{},
            "name":"Reader",
            "category":"reader"
                },
        {
            "stepType":"drds",// プラグイン名。
            "parameter":{
                "postSql":[],// 同期タスクの実行後に実行する SQL ステートメント。
                "datasource":"",// データソースの名前。
                "column":[// 列の名前。
                "id"
                ],
                "writeMode":"insert ignore",
                "batchSize":"1024",// 一度に書き込むデータレコードの数。
                "table":"test",// テーブルの名前。
                "preSql":[]// 同期タスクの実行前に実行する SQL ステートメント。
                },
            "name":"Writer",
            "category":"writer"
                }
                ],
    "setting":{
        "errorLimit":{
        "record":"0"// 許容されるダーティデータレコードの最大数。
            },
        "speed":{
            "throttle":true,// スロットリングを有効にするかどうかを指定します。値 false はスロットリングが無効であることを示し、値 true はスロットリングが有効であることを示します。 mbps パラメーターは、throttle パラメーターが true に設定されている場合にのみ有効になります。
            "concurrent":1, // 並列スレッドの最大数。
            "mbps":"12"// 最大伝送速度。単位: MB/s。
                }
            },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
                }
            ]
        }
    }

DRDS (PolarDB-X 1.0) Writer のコードのパラメーター

パラメーター

説明

必須

デフォルト値

datasource

データソースの名前。追加されたデータソースの名前と同じである必要があります。コードエディターを使用してデータソースを追加できます。

はい

デフォルト値なし

table

データを書き込むテーブルの名前。

はい

デフォルト値なし

writeMode

書き込みモード。有効な値:

  • insert ignore: プライマリキーの競合または一意のインデックスの競合が発生した場合、ソースデータを書き込むことができません。

  • replace into: プライマリキーの競合または一意のインデックスの競合が発生した場合、元のデータが削除され、新しいデータが挿入されます。

いいえ

insert ignore

column

データを書き込む列の名前。"column": ["id","name","age"] のように、名前をコンマ (,) で区切ります。宛先テーブルのすべての列にデータを書き込む場合は、"column": ["*"] のように、このパラメーターをアスタリスク (*) に設定します。

はい

デフォルト値なし

preSql

同期タスクの実行前に実行する SQL ステートメント。コードレス UI では 1 つの SQL ステートメントのみを実行でき、コードエディターでは複数の SQL ステートメントを実行できます。

たとえば、このパラメーターを delete * from table xxx; に設定して、データ同期前に xxx テーブルからデータを削除できます。ビジネス要件に基づいてこのパラメーターを設定できます。

いいえ

デフォルト値なし

postSql

同期タスクの実行後に実行する SQL ステートメント。コードレス UI では 1 つの SQL ステートメントのみを実行でき、コードエディターでは複数の SQL ステートメントを実行できます。

たとえば、このパラメーターを delete * from table xxx where xx=xx; に設定して、データ同期後に xxx テーブルから特定のデータを削除できます。ビジネス要件に基づいてこのパラメーターを設定できます。

いいえ

デフォルト値なし

batchSize

一度に書き込むデータレコードの数。ビジネス要件に基づいてこのパラメーターを適切な値に設定します。これにより、Data Integration と DRDS (PolarDB-X 1.0) の間の相互作用が大幅に削減され、スループットが向上します。このパラメーターを過度に大きな値に設定すると、データ同期中にメモリ不足 (OOM) エラーが発生する可能性があります。

いいえ

1,024