AnalyticDB for PostgreSQL データソースを使用すると、AnalyticDB for PostgreSQL からのデータの読み取りと書き込みが可能になります。このトピックでは、DataWorks における AnalyticDB for PostgreSQL のデータ同期機能について説明します。
制限事項
オフライン同期タスクは、ビューからのデータ読み取りをサポートしています。
サポート対象バージョン
バージョン 7.0 までがサポート対象です。
サポートされるフィールドタイプ
オフライン読み取り
AnalyticDB for PostgreSQL Reader は、AnalyticDB for PostgreSQL のほとんどのデータ型をサポートしていますが、すべてではありません。続行する前に、ご利用のデータ型がサポートされていることを確認してください。
次の表に、AnalyticDB for PostgreSQL Reader のデータ型マッピングを示します。
カテゴリ | AnalyticDB for PostgreSQL データ型 |
整数型 | BIGINT、BIGSERIAL、INTEGER、SMALLINT、SERIAL、GEOMETRY |
浮動小数点型 | DOUBLE、PRECISION、MONEY、NUMERIC、REAL |
文字列型 | VARCHAR、CHAR、TEXT、BIT、INET |
日時型 | DATE、TIME、TIMESTAMP |
ブール型 | BOOL |
バイナリ型 | BYTEA |
オフライン書き込み
AnalyticDB for PostgreSQL Writer は、AnalyticDB for PostgreSQL のほとんどのデータ型をサポートしていますが、すべてではありません。続行する前に、ご利用のデータ型がサポートされていることを確認してください。
次の表に、AnalyticDB for PostgreSQL Writer のデータ型マッピングを示します。
カテゴリ | AnalyticDB for PostgreSQL データ型 |
LONG | BIGINT、BIGSERIAL、INTEGER、SMALLINT、SERIAL |
DOUBLE | DOUBLE、PRECISION、MONEY、NUMERIC、REAL |
STRING | VARCHAR、CHAR、TEXT、BIT、INET、GEOMETRY |
DATE | DATE、TIME、TIMESTAMP |
BOOLEAN | BOOL |
BYTES | BYTEA |
MONEY、INET、BIT データ型を変換するには、a_inet::varchar のような構文を使用します。
データソースの追加
DataWorks で同期タスクを開発する前に、「データソース管理」の手順に従って、必要なデータソースを DataWorks に追加する必要があります。データソースを追加する際に、DataWorks コンソールでパラメーターの説明を表示して、各パラメーターの意味を確認できます。
データ同期タスクの開発
同期タスクの設定のエントリポイントと手順については、以下の設定ガイドをご参照ください。
単一テーブルのオフライン同期タスクの設定ガイド
詳細については、「コードレス UI を使用した同期タスクの設定」および「コードエディタを使用した同期タスクの設定」をご参照ください。
コードエディタのすべてのパラメーターとスクリプトデモについては、「付録:スクリプトデモとパラメーターの説明」をご参照ください。
データベース全体のオフライン読み取り同期タスクの設定ガイド
詳細については、「データベース全体のリアルタイム同期タスクの設定」をご参照ください。
付録:スクリプトデモとパラメーターの説明
コードエディタを使用したバッチ同期タスクの設定
コードエディタを使用してバッチ同期タスクを設定する場合、統一されたスクリプト形式の要件に基づいて、スクリプト内の関連パラメーターを設定する必要があります。詳細については、「コードエディタでのタスク設定」をご参照ください。以下では、コードエディタを使用してバッチ同期タスクを設定する際に、データソースに対して設定する必要があるパラメーターについて説明します。
Reader スクリプトデモ
{
"type": "job",
"steps": [
{
"parameter": {
"datasource": "test_004",// データソース名。
"column": [// ソーステーブルの列。
"id",
"name",
"sex",
"salary",
"age"
],
"where": "id=1001",// フィルター条件。
"splitPk": "id",// 分割キー。
"table": "public.person"// ソーステーブル名。
},
"name": "Reader",
"category": "reader"
},
{
"parameter": {},
"name": "Writer",
"category": "writer"
}
],
"version": "2.0",// バージョン番号。
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
},
"setting": {
"errorLimit": {// エラーレコード数。
"record": ""
},
"speed": {
"concurrent": 6,// 同時実行スレッド数。
"throttle": true,// throttle を false に設定すると、mbps パラメーターは有効にならず、レート制限は適用されません。throttle を true に設定すると、レート制限が適用されます。
"mbps":"12"// レート制限。1 mbps は 1 MB/s に相当します。
}
}
}Reader スクリプトのパラメーター
パラメーター | 説明 | 必須 | デフォルト値 |
datasource | データソース名。コードエディタはデータソースの追加をサポートしています。このパラメーターの値は、追加されたデータソースの名前と同じである必要があります。 | はい | なし |
table | データを同期する元のテーブルの名前。 | はい | なし |
column | 同期する列。JSON 配列を使用して列を指定します。デフォルトでは、すべての列が同期されます。例:[*]。
| はい | なし |
splitPk | AnalyticDB for PostgreSQL Reader がデータを抽出する際に、splitPk パラメーターを指定すると、指定された分割キーに基づいてデータがパーティション分割されます。Data Integration は同時実行タスクを開始してデータを同期するため、同期パフォーマンスが向上します。
| いいえ | なし |
where | フィルター条件。AnalyticDB for PostgreSQL Reader は、指定された column、table、および where パラメーターに基づいて SQL 文を構築し、データを抽出します。たとえば、テスト用に当日に生成されたデータを同期するには、where パラメーターを
| いいえ | なし |
querySql (このパラメーターはコードエディタでのみ使用可能です。) | 一部のビジネスシナリオでは、where 設定項目だけではフィルタリング条件を定義するのに不十分な場合があります。この設定項目を使用して、代わりにカスタム SQL 文を指定できます。このカスタム文を指定すると、データ同期システムは column や table などの他の設定項目を無視し、直接この文を使用してデータをフィルタリングします。たとえば、複数テーブルの結合後にデータを同期するには、 querySql を設定すると、AnalyticDB for PostgreSQL Reader は column、table、および where 条件の設定を直接無視します。 | いいえ | なし |
fetchSize | このパラメーターは、データベースサーバーから各バッチでフェッチするレコード数を指定します。値を大きくすると、Data Integration とサーバー間のネットワークインタラクションの数が減り、データ抽出パフォーマンスが向上する可能性があります。 説明 fetchSize の値が大きすぎる (2048 を超える) 場合、Data Integration プロセスでメモリ不足 (OOM) エラーが発生する可能性があります。 | いいえ | 512 |
Writer スクリプトデモ
{
"type": "job",
"steps": [
{
"parameter": {},
"name": "Reader",
"category": "reader"
},
{
"parameter": {
"postSql": [],// インポート後に実行する SQL 文。
"datasource": "test_004",// データソース名。
"column": [// ターゲットテーブルの列。
"id",
"name",
"sex",
"salary",
"age"
],
"table": "public.person",// ターゲットテーブル名。
"preSql": []// インポート前に実行する SQL 文。
},
"name": "Writer",
"category": "writer"
}
],
"version": "2.0",// バージョン番号。
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
},
"setting": {
"errorLimit": {// エラーレコード数。
"record": ""
},
"speed": {
"throttle":true,// throttle を false に設定すると、mbps パラメーターは有効にならず、レート制限は適用されません。throttle を true に設定すると、レート制限が適用されます。
"concurrent":6, // 同時実行ジョブ数。
"mbps":"12"// レート制限。
}
}
}Writer スクリプトのパラメーター
パラメーター | 説明 | 必須 | デフォルト値 |
datasource | データソース名。コードエディタはデータソースの追加をサポートしています。このパラメーターの値は、追加されたデータソースの名前と同じである必要があります。 | はい | なし |
table | データを同期する先のテーブルの名前。 | はい | なし |
writeMode | インポートモード。有効な値は insert、copy、および upsert です。
| いいえ | insert |
conflictMode | writeMode を upsert に設定し、PostgreSQL へのデータ書き込み時にプライマリキーまたは一意のインデックスの競合が発生した場合、次のいずれかの競合解決ポリシーを選択できます。
説明 競合解決ポリシーは、コードエディタでのみ設定できます。 | いいえ | replace |
column | データを書き込むターゲットテーブルの列。列はカンマ (,) で区切ります。例: | はい | なし |
preSql | データ同期タスクが開始される前に実行する SQL 文。コードレス UI では、1 つの SQL 文しか実行できません。コードエディタでは、既存のデータをクリアする文など、複数の SQL 文を実行できます。 | いいえ | なし |
postSql | データ同期タスクが完了した後に実行する SQL 文。コードレス UI では、1 つの SQL 文しか実行できません。コードエディタでは、タイムスタンプを追加する文など、複数の SQL 文を実行できます。 | いいえ | なし |
batchSize | 1 回のバッチで書き込むレコード数。値を大きくすると、Data Integration と AnalyticDB for PostgreSQL 間のネットワークインタラクションの数が大幅に減り、スループットが向上する可能性があります。ただし、この値が大きすぎると、Data Integration プロセスでメモリ不足 (OOM) エラーが発生する可能性があります。 | いいえ | 1,024 |