DataWorks は、AnalyticDB for PostgreSQL データソースからデータを読み取り、AnalyticDB for PostgreSQL データソースにデータを書き込むための AnalyticDB for PostgreSQL Reader と AnalyticDB for PostgreSQL Writer を提供しています。このトピックでは、AnalyticDB for PostgreSQL データソースとのデータ同期機能について説明します。
制限事項
バッチ同期中にビューのデータを読み取ることができます。
サポートされている AnalyticDB for PostgreSQL のバージョン
V7.0 以前の AnalyticDB for PostgreSQL がサポートされています。
データ型マッピング
バッチデータ読み取り
AnalyticDB for PostgreSQL Reader は、ほとんどの AnalyticDB for PostgreSQL データ型をサポートしています。データベースのデータ型がサポートされていることを確認してください。
次の表に、AnalyticDB for PostgreSQL Reader がデータ型を変換する際に基づくデータ型マッピングを示します。
カテゴリ | AnalyticDB for PostgreSQL データ型 |
整数 | BIGINT、BIGSERIAL、INTEGER、SMALLINT、SERIAL、および GEOMETRY |
浮動小数点 | DOUBLE、PRECISION、MONEY、NUMERIC、および REAL |
文字列 | VARCHAR、CHAR、TEXT、BIT、および INET |
日付と時刻 | DATE、TIME、および TIMESTAMP |
ブール値 | BOOL |
バイナリ | BYTEA |
バッチデータ書き込み
AnalyticDB for PostgreSQL Writer は、ほとんどの AnalyticDB for PostgreSQL データ型をサポートしています。データベースのデータ型がサポートされていることを確認してください。
次の表に、AnalyticDB for PostgreSQL Writer がデータ型を変換する際に基づくデータ型マッピングを示します。
カテゴリ | AnalyticDB for PostgreSQL データ型 |
LONG | BIGINT、BIGSERIAL、INTEGER、SMALLINT、および SERIAL |
DOUBLE | DOUBLE、PRECISION、MONEY、NUMERIC、および REAL |
STRING | VARCHAR、CHAR、TEXT、BIT、INET、および GEOMETRY |
DATE | DATE、TIME、および TIMESTAMP |
BOOLEAN | BOOL |
BYTES | BYTEA |
AnalyticDB for PostgreSQL Writer が MONEY、INET、または BIT データ型にデータを変換する場合は、a_inet::varchar などの構文が必要です。
データ同期タスクの開発
データ同期タスクの設定のエントリポイントと手順については、以下のセクションを参照してください。パラメータ設定については、タスクの設定タブにある各パラメータの情報ヒントを参照してください。
データソースの追加
特定のデータソースとのデータ同期タスクを設定する前に、DataWorks にデータソースを追加する必要があります。詳細については、「データソースの追加と管理」をご参照ください。
単一テーブルのデータを同期するためのバッチ同期タスクの設定
設定手順の詳細については、「コードレス UI を使用したバッチ同期タスクの設定」および「コードエディタを使用したバッチ同期タスクの設定」をご参照ください。
コードエディタを使用してバッチ同期タスクを設定する場合に設定されるすべてのパラメータと実行されるコードについては、「付録:コードとパラメータ」をご参照ください。
データベース内のすべてのデータのバッチ同期を実装するための同期設定
設定手順の詳細については、「Data Integration での同期タスクの設定」をご参照ください。
付録:コードとパラメータ
付録:コードエディタを使用したバッチ同期タスクの設定
コードエディタを使用してバッチ同期タスクを設定する場合は、コードエディタのフォーマット要件に基づいて、関連データソースのリーダーとライターのパラメータを設定する必要があります。フォーマット要件の詳細については、「コードエディタを使用したバッチ同期タスクの設定」をご参照ください。以下では、コードエディタにおけるリーダーとライターのパラメータの設定の詳細について説明します。
AnalyticDB for PostgreSQL Reader のコード
{
"type": "job",
"steps": [
{
"parameter": {
"datasource": "test_004",// データソースの名前。
"column": [// 列の名前。
"id",
"name",
"sex",
"salary",
"age"
],
"where": "id=1001",// WHERE 句。
"splitPk": "id",// シャードキー。
"table": "public.person"// テーブルの名前。
},
"name": "Reader",
"category": "reader"
},
{
"parameter": {},
"name": "Writer",
"category": "writer"
}
],
"version": "2.0",// バージョン番号。
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
},
"setting": {
"errorLimit": {// 許容されるダーティデータレコードの最大数。
"record": ""
},
"speed": {
"concurrent": 6,// 並列スレッドの最大数。
"throttle": true,// スロットリングを有効にするかどうかを指定します。値 false はスロットリングが無効であることを示し、値 true はスロットリングが有効であることを示します。mbps パラメータは、throttle パラメータが true に設定されている場合にのみ有効になります。
"mbps":"12"// 最大転送速度。単位:MB/秒。
}
}
}AnalyticDB for PostgreSQL Reader のコードのパラメータ
パラメータ | 説明 | 必須 | デフォルト値 |
datasource | データソースの名前。追加されたデータソースの名前と同じである必要があります。コードエディタを使用してデータソースを追加できます。 | はい | デフォルト値なし |
table | データを読み取るテーブルの名前。 | はい | デフォルト値なし |
column | データを読み取る列の名前。JSON 配列で名前を指定します。デフォルト値は [*] で、ソーステーブルのすべての列を示します。
| はい | デフォルト値なし |
splitPk | AnalyticDB for PostgreSQL Reader がデータを読み取るときにデータシャーディングに使用されるフィールド。このパラメータを指定すると、ソーステーブルはこのパラメータの値に基づいてシャーディングされます。Data Integration は、並列スレッドを実行してデータを読み取ります。これにより、データ同期の効率が向上します。
| いいえ | デフォルト値なし |
where | WHERE 句。AnalyticDB for PostgreSQL Reader は、table、column、および where パラメータの設定に基づいて SQL ステートメントを生成し、生成されたステートメントを使用してデータを読み取ります。たとえば、テストを実行する場合、where パラメータを
| いいえ | デフォルト値なし |
querySql (高度なパラメータ。コードエディタでのみ使用可能) | 絞り込みデータフィルタリングに使用される SQL ステートメント。このパラメータを指定すると、AnalyticDB for PostgreSQL Reader はこのパラメータの値のみに基づいてデータをフィルタリングします。たとえば、複数のテーブルを結合してデータ同期を行う場合は、このパラメータを このパラメータを指定すると、AnalyticDB for PostgreSQL Reader は column、table、および where パラメータの設定を無視します。 | いいえ | デフォルト値なし |
fetchSize | 一度に読み取るデータレコードの数。このパラメータは、Data Integration とソースデータベース間のインタラクションの数を決定し、読み取り効率に影響します。 説明 このパラメータを 2048 より大きい値に設定すると、データ同期中にメモリ不足 (OOM) エラーが発生する可能性があります。 | いいえ | 512 |
AnalyticDB for PostgreSQL Writer のコード
{
"type": "job",
"steps": [
{
"parameter": {},
"name": "Reader",
"category": "reader"
},
{
"parameter": {
"postSql": [],// 同期タスクの実行後に実行する SQL ステートメント。
"datasource": "test_004",// データソースの名前。
"column": [// 列の名前。
"id",
"name",
"sex",
"salary",
"age"
],
"table": "public.person",// テーブルの名前。
"preSql": []// 同期タスクの実行前に実行する SQL ステートメント。
},
"name": "Writer",
"category": "writer"
}
],
"version": "2.0",// バージョン番号。
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
},
"setting": {
"errorLimit": {// 許容されるダーティデータレコードの最大数。
"record": ""
},
"speed": {
"throttle":true,// スロットリングを有効にするかどうかを指定します。値 false はスロットリングが無効であることを示し、値 true はスロットリングが有効であることを示します。mbps パラメータは、throttle パラメータが true に設定されている場合にのみ有効になります。
"concurrent":6, // 並列スレッドの最大数。
"mbps":"12"// 最大転送速度。
}
}
}AnalyticDB for PostgreSQL Writer のコードのパラメータ
パラメータ | 説明 | 必須 | デフォルト値 |
datasource | データソースの名前。追加されたデータソースの名前と同じである必要があります。コードエディタを使用してデータソースを追加できます。 | はい | デフォルト値なし |
table | データを書き込むテーブルの名前。 | はい | デフォルト値なし |
writeMode | 書き込みモード。有効な値:insert、copy、および upsert。
| いいえ | insert |
conflictMode | writeMode が upsert に設定されていて、データの書き込み時にプライマリキーの競合または一意インデックスの競合が発生した場合、次のポリシーを使用して競合を処理できます。
説明 このパラメータは、コードエディタを使用して同期タスクを設定する場合にのみ設定できます。 | いいえ | replace |
column | データを書き込む列の名前。 | はい | デフォルト値なし |
preSql | 同期タスクの実行前に実行する SQL ステートメント。たとえば、このパラメータを古いデータを削除するために使用される SQL ステートメントに設定できます。コードレス UI では 1 つの SQL ステートメントのみを実行でき、コードエディタでは複数の SQL ステートメントを実行できます。 | いいえ | デフォルト値なし |
postSql | 同期タスクの実行後に実行する SQL ステートメント。たとえば、このパラメータをタイムスタンプを追加するために使用される SQL ステートメントに設定できます。コードレス UI では 1 つの SQL ステートメントのみを実行でき、コードエディタでは複数の SQL ステートメントを実行できます。 | いいえ | デフォルト値なし |
batchSize | 一度に書き込むデータレコードの数。ビジネス要件に基づいて、このパラメータを適切な値に設定します。これにより、Data Integration と AnalyticDB for PostgreSQL 間のインタラクションが大幅に削減され、スループットが向上します。このパラメータを過度に大きい値に設定すると、データ同期中に OOM エラーが発生する可能性があります。 | いいえ | 1,024 |