DataWorks は、PolarDB データソースとのデータの読み取りと書き込みを行うための PolarDB Reader と PolarDB Writer を提供しています。 コーディングレス ユーザーインターフェース (UI) またはコードエディタを使用して、PolarDB データソースの同期タスクを設定できます。
制限事項
バッチデータの読み取りと書き込み
ビューのデータを読み取ることができます。
リアルタイムデータの読み取り
同期タスクのソースが PolarDB for MySQL クラスタの場合、クラスタのバイナリロギング機能を有効にする必要があります。 PolarDB for MySQL は MySQL と完全に互換性があり、高レベルの物理ログを使用してバイナリログを置き換えます。 PolarDB と MySQL エコシステム間の統合を容易にするために、PolarDB クラスタのバイナリロギング機能を有効にすることができます。
データ型マッピング
バッチデータの読み取り
次の表に、PolarDB Reader がデータ型を変換する際のデータ型マッピングを示します。
カテゴリ | PolarDB データ型 |
整数 | INT、TINYINT、SMALLINT、MEDIUMINT、および BIGINT |
浮動小数点 | FLOAT、DOUBLE、および DECIMAL |
文字列 | VARCHAR、CHAR、TINYTEXT、TEXT、MEDIUMTEXT、および LONGTEXT |
日付と時刻 | DATE、DATETIME、TIMESTAMP、TIME、および YEAR |
ブール値 | BIT および BOOL |
バイナリ | TINYBLOB、MEDIUMBLOB、BLOB、LONGBLOB、および VARBINARY |
上記の表に記載されていないデータ型はサポートされていません。
PolarDB Reader は TINYINT (1) を整数データ型として処理します。
バッチデータの書き込み
PolarDB Reader と同様に、PolarDB Writer はほとんどの PolarDB データ型をサポートしています。 データベースのデータ型がサポートされていることを確認してください。
次の表に、PolarDB Writer がデータ型を変換する際のデータ型マッピングを示します。
カテゴリ | PolarDB データ型 |
整数 | INT、TINYINT、SMALLINT、MEDIUMINT、BIGINT、および YEAR |
浮動小数点 | FLOAT、DOUBLE、および DECIMAL |
文字列 | VARCHAR、CHAR、TINYTEXT、TEXT、MEDIUMTEXT、および LONGTEXT |
日付と時刻 | DATE、DATETIME、TIMESTAMP、および TIME |
ブール値 | BOOL |
バイナリ | TINYBLOB、MEDIUMBLOB、BLOB、LONGBLOB、および VARBINARY |
データ同期前に PolarDB 環境を準備する
IP アドレスホワイトリストを設定する
Data Integration 専用リソースグループが存在する仮想プライベートクラウド (VPC) の CIDR ブロックを、PolarDB for MySQL クラスタの IP アドレスホワイトリストに追加する必要があります。
必要な権限を持つアカウントを準備する
アカウントを作成し、必要な権限をアカウントに付与します。
PolarDB for MySQL クラスタのデータベースにログオンするためのアカウントを作成する必要があります。 SELECT、REPLICATION SLAVE、および REPLICATION CLIENT 権限をアカウントに付与する必要があります。
アカウントを作成します。
詳細については、「データベースアカウントを作成および管理する」をご参照ください。
必要な権限をアカウントに付与します。
次のコマンドを実行して必要な権限をアカウントに付与するか、
SUPERロールをアカウントに直接割り当てることができます。-- CREATE USER 'データ同期用アカウント'@'%' IDENTIFIED BY 'データ同期用アカウント'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'データ同期用アカウント'@'%';
バイナリロギング機能を有効にする
詳細については、「バイナリロギングを有効にする」をご参照ください。
データソースを追加する
DataWorks で同期タスクを開発する前に、「データソース管理」の手順に従って、必要なデータソースを DataWorks に追加する必要があります。 データソースを追加する際に、DataWorks コンソールのパラメータのヒントを参照して、パラメータの意味を理解することができます。
PolarDB データソースに基づいてデータ同期タスクを開発する
同期タスクの設定のエントリポイントと手順については、次の設定ガイドを参照してください。
単一テーブルのデータを同期するためのバッチ同期タスクを設定する
設定手順の詳細については、「コーディングレス UI を使用してバッチ同期タスクを設定する」および「コードエディタを使用してバッチ同期タスクを設定する」をご参照ください。
コードエディタを使用してバッチ同期タスクを設定する際に設定されるすべてのパラメータと実行されるコードについては、「付録:コードとパラメータ」をご参照ください。
単一テーブルのデータまたはデータベースのすべてのデータを同期するためのリアルタイム同期タスクを設定する
設定手順の詳細については、「DataStudio でリアルタイム同期タスクを設定する」をご参照ください。
データベース内のすべてのデータのバッチ同期と、単一テーブルのデータまたはデータベース内の完全データまたは増分データのリアルタイム同期を実装するための同期設定を行う
設定手順の詳細については、「Data Integration で同期タスクを設定する」をご参照ください。
よくある質問
Oracle、PolarDB、または MySQL からデータを同期するためにリアルタイム同期タスクを実行すると、エラーが繰り返し報告されるのはなぜですか?
付録:コードとパラメータ
コードエディタを使用してバッチ同期タスクを設定する
コードエディタを使用してバッチ同期タスクを設定する場合は、統一スクリプト形式の要件に基づいて、スクリプトに関連パラメータを設定する必要があります。 詳細については、「コードエディタを使用してバッチ同期タスクを設定する」をご参照ください。 次の情報では、コードエディタを使用してバッチ同期タスクを設定する際にデータソースに設定する必要があるパラメータについて説明します。
PolarDB Reader のコード
次のコードでは、単一テーブルからデータを読み取るバッチ同期タスクが設定されています。 パラメータについては、PolarDB Reader のコードのパラメータを参照してください。
{
"type": "job",
"steps": [
{
"parameter": {
"datasource": "test_005",// データソースの名前。
"column": [// 列の名前。
"id",
"name",
"age",
"sex",
"salary",
"interest"
],
"where": "id=1001",// WHERE 句。
"splitPk": "id",// シャードキー。
"table": "PolarDB_person",// テーブルの名前。
"useReadonly": "false"// セカンダリデータベースからデータを読み取るかどうかを指定します。
},
"name": "Reader",
"category": "reader"
},
{
"parameter": {}
],
"version": "2.0",// バージョン番号。
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
},
"setting": {
"errorLimit": {// ダーティデータレコードの最大許容数。
"record": ""
},
"speed": {
"concurrent": 6,// 並列スレッドの最大数。
"throttle": true,// スロットリングを有効にするかどうかを指定します。 値 false はスロットリングが無効であることを示し、値 true はスロットリングが有効であることを示します。 mbps パラメータは、throttle パラメータが true に設定されている場合にのみ有効になります。
"mbps":"12",// 最大伝送速度。 単位:MB/s。
}
}
}PolarDB Reader のコードのパラメータ
パラメータ | 説明 | 必須 | デフォルト値 |
datasource | データソースの名前。 追加されたデータソースの名前と同じである必要があります。 コードエディタを使用してデータソースを追加できます。 | はい | デフォルト値なし |
table | データを読み取るテーブルの名前。 | はい | デフォルト値なし |
useReadonly | セカンダリデータベースからデータを読み取るかどうかを指定します。 読み取り/書き込み分割を実装し、PolarDB for MySQL クラスタのセカンダリデータベースからデータを読み取る場合は、このパラメータを true に設定します。 このパラメータを空のままにすると、デフォルト値の false が使用されます。これは、プライマリデータベースからデータが読み取られることを示します。 | いいえ | false |
column | データを読み取る列の名前。 JSON 配列で名前を指定します。 デフォルト値は [*] で、ソーステーブルのすべての列を示します。
| はい | デフォルト値なし |
splitPk | PolarDB Reader がデータを読み取るときに データシャーディング に使用されるフィールド。 このパラメータを設定すると、ソーステーブルはこのパラメータの値に基づいてシャーディングされます。 Data Integration は、並列スレッドを実行してデータを読み取ります。 これにより、データ同期の効率が向上します。
| いいえ | デフォルト値なし |
splitFactor | シャーディング係数。 同期されるデータをいくつの部分にシャーディングするかを決定します。 バッチ同期タスクに並列処理を設定する場合、部分の数は次の式に基づいて計算されます。 並列スレッド数 × シャーディング係数。 たとえば、並列スレッド数とシャーディング係数が 5 の場合、同期されるデータがシャーディングされる部分の数は 25 です。 説明 1 ~ 100 の範囲のシャーディング係数を指定することをお勧めします。 100 より大きいシャーディング係数を指定すると、メモリ不足 (OOM) エラーが発生する可能性があります。 | いいえ | 5 |
where | WHERE 句。 たとえば、このパラメータを
| いいえ | デフォルト値なし |
querySql (高度なパラメータ。 コードエディタでのみ使用可能) | 詳細なデータフィルタリングに使用される SQL 文。 このパラメータを設定すると、PolarDB Reader は column、table、where パラメータの設定を無視し、このパラメータの値のみに基づいてデータをフィルタリングします。 たとえば、データ同期のために複数のテーブルを結合する場合は、このパラメータを | いいえ | デフォルト値なし |
PolarDB Writer のコード
次のコードでは、PolarDB にデータを書き込むバッチ同期タスクが設定されています。 パラメータについては、PolarDB Writer のコードのパラメータを参照してください。
{
"type": "job",
"steps": [
{
"parameter": {},
"name": "Reader",
"category": "reader"
},
{
"parameter": {
"postSql": [],// バッチ同期タスクの実行後に実行する SQL 文。
"datasource": "test_005",// データソースの名前。
"column": [// 列の名前。
"id",
"name",
"age",
"sex",
"salary",
"interest"
],
"writeMode": "insert",// 書き込みモード。
"batchSize": 256,// 一度に書き込むデータレコードの数。
"table": "PolarDB_person_copy",// テーブルの名前。
"preSql": []// バッチ同期タスクの実行前に実行する SQL 文。
},
"name": "Writer",
"category": "writer"
}
],
"version": "2.0",// バージョン番号。
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
},
"setting": {
"errorLimit": {// ダーティデータレコードの最大許容数。
"record": ""
},
"speed": {
"throttle":true,// 帯域幅スロットリングを有効にするかどうかを指定します。 値 false は帯域幅スロットリングが無効であることを示し、値 true は帯域幅スロットリングが有効であることを示します。 mbps パラメータは、throttle パラメータが true に設定されている場合にのみ有効になります。
"concurrent":6, // 並列スレッドの最大数。
"mbps":"12",// 最大伝送速度。 単位:MB/s。
}
}
}PolarDB Writer のコードのパラメータ
PolarDB Writer のコードのパラメータ
パラメータ
説明
必須
デフォルト値
datasource
データソースの名前。 追加されたデータソースの名前と同じである必要があります。 コードエディタを使用してデータソースを追加できます。
はい
デフォルト値なし
table
データを読み取るテーブルの名前。
はい
デフォルト値なし
writeMode
書き込みモード。 有効値:
insert: このモードは、コーディングレス UI の INSERT INTO と同等です。
update (コーディングレス UI の ON DUPLICATE KEY UPDATE と同等)
replace: このモードは、コーディングレス UI の REPLACE INTO と同等です。
書き込みモードとデータ例の詳細については、writeMode パラメータの説明 を参照してください。
説明デスティネーションが PolarDB for PostgreSQL の場合、insert モードのみがサポートされます。 データを更新し、プライマリキーの競合を回避するには、バッチ同期を実行する前に、デスティネーションテーブル内の重複レコードを削除することをお勧めします。 次のアプローチのいずれかを使用できます。
アプローチ 1:preSql パラメータ (コーディングレス UI の デスティネーションにデータを書き込む前に実行される文 と同等) に
TRUNCATE文を設定して、デスティネーションテーブルをクリアします。アプローチ 2:バッチ同期ノードのアップストリームノードでデスティネーションテーブルを処理して、データ同期中にプライマリキーの競合が発生しないようにします。
いいえ
insert
column
データを書き込む列の名前。 名前をコンマ (,) で区切ります。 例:
"column":["id","name","age"]。 デスティネーションテーブルのすべての列にデータを書き込む場合は、このパラメータをアスタリスク (*) に設定します (例:"column":["*"])。はい
デフォルト値なし
preSql
バッチ同期タスクの実行前に実行する SQL 文。 たとえば、このパラメータを古いデータを削除するために使用される SQL 文に設定できます。 コーディングレス UI では 1 つの SQL 文のみを実行でき、コードエディタでは複数の SQL 文を実行できます。
いいえ
デフォルト値なし
postSql
バッチ同期タスクの実行後に実行する SQL 文。 たとえば、このパラメータをタイムスタンプを追加するために使用される SQL 文に設定できます。 コーディングレス UI では 1 つの SQL 文のみを実行でき、コードエディタでは複数の SQL 文を実行できます。
いいえ
デフォルト値なし
batchSize
一度に書き込むデータレコードの数。 ビジネス要件に基づいて、このパラメータを適切な値に設定します。 これにより、Data Integration と PolarDB 間の相互作用が大幅に削減され、スループットが向上します。 このパラメータを過度に大きい値に設定すると、データ同期中にメモリ不足 (OOM) エラーが発生する可能性があります。
いいえ
1,024
updateColumn
プライマリキーの競合または一意なインデックスの競合が発生した場合に更新される列の名前。 このパラメータは、writeMode パラメータが update に設定されている場合にのみ有効になります。
"updateColumn": ["name", "age"]など、名前をコンマ (,) で区切ります。説明PolarDB for MySQL データソースのみがこのパラメータをサポートしています。
いいえ
デフォルト値なし
writeMode パラメータの説明
項目
insert (コーディングレス UI の INSERT INTO と同等)
update (コーディングレス UI の ON DUPLICATE KEY UPDATE と同等)
replace (コーディングレス UI の REPLACE INTO と同等)
処理ルール
プライマリキーの競合または一意なインデックスの競合が発生した場合、データは競合する行に書き込まれず、これらの行に書き込まれないデータはダーティデータと見なされます。
プライマリキーの競合または一意なインデックスの競合が発生しない場合、データは、このパラメータを insert に設定した場合と同じ方法で処理されます。 競合が発生した場合、デスティネーションテーブル内の競合する行のデータは新しいデータに置き換えられます。
プライマリキーの競合または一意なインデックスの競合が発生しない場合、データは、このパラメータを insert に設定した場合と同じ方法で処理されます。 競合が発生した場合、元の行は削除され、新しい行が挿入されます。 これは、元の行のすべてのフィールドが置き換えられることを示します。
データ例
ソーステーブルのデータ
+----+---------+-----+ | id | name | age | +----+---------+-----+ | 1 | zhangsan| 1 | | 2 | lisi | | +----+---------+-----+デスティネーションテーブルの元のデータ
+----+---------+-----+ | id | name | age | +----+---------+-----+ | 2 | wangwu | | +----+---------+-----+バッチ同期タスクの実行後、1 つのデータレコードがデスティネーションテーブルに書き込まれ、1 つのデータレコードが生成されます。
+----+---------+-----+ | id | name | age | +----+---------+-----+ | 1 | zhangsan| 1 | | 2 | wangwu | | +----+---------+-----+
シナリオ 1:一部の列のみが指定されている:
"column": ["id","name"]。ソーステーブルのデータ
+----+---------+-----+ | id | name | age | +----+---------+-----+ | 1 | zhangsan| 1 | | 2 | lisi | | +----+---------+-----+デスティネーションテーブルの元のデータ
+----+---------+-----+ | id | name | age | +----+---------+-----+ | 2 | wangwu | 3 | +----+---------+-----+バッチ同期タスクの実行後、2 つのデータレコードがデスティネーションテーブルに書き込まれ、ダーティデータレコードは生成されません。
+----+---------+-----+ | id | name | age | +----+---------+-----+ | 1 | zhangsan| 1 | | 2 | lisi | 3 | +----+---------+-----+
シナリオ 2:すべての列が指定されている:
"column": ["id","name","age"]。ソーステーブルのデータ
+----+---------+-----+ | id | name | age | +----+---------+-----+ | 1 | zhangsan| 1 | | 2 | lisi | | +----+---------+-----+デスティネーションテーブルの元のデータ
+----+---------+-----+ | id | name | age | +----+---------+-----+ | 2 | wangwu | 3 | +----+---------+-----+バッチ同期タスクの実行後、2 つのデータレコードがデスティネーションテーブルに書き込まれ、ダーティデータレコードは生成されません。
+----+---------+-----+ | id | name | age | +----+---------+-----+ | 1 | zhangsan| 1 | | 2 | lisi | | +----+---------+-----+
ソーステーブルのデータ
+----+---------+-----+ | id | name | age | +----+---------+-----+ | 1 | zhangsan| 1 | | 2 | lisi | | +----+---------+-----+デスティネーションテーブルの元のデータ
+----+---------+-----+ | id | name | age | +----+---------+-----+ | 2 | wangwu | 3 | +----+---------+-----+バッチ同期タスクの実行後、2 つのデータレコードがデスティネーションテーブルに書き込まれ、ダーティデータレコードは生成されません。
+----+---------+-----+ | id | name | age | +----+---------+-----+ | 1 | zhangsan| 1 | | 2 | lisi | | +----+---------+-----+