DM データソースは、データハブとして機能します。DataWorks は、DM データソースからデータを読み書きするための DM Reader と DM Writer を提供しています。これにより、大量のデータの計算問題を迅速に解決できます。このトピックでは、DM データソースとのデータ同期機能について説明します。
制限事項
バッチ同期中にビューのデータを読み取ることができます。
DM Reader と DM Writer は、Data Integration 専用のリソースグループのみをサポートしています。
データ型マッピング
DM Reader と DM Writer は、数値型や文字列型など、一般的なリレーショナルデータベースのほとんどのデータ型をサポートしています。データベースのデータ型がサポートされていることを確認してください。
次の表に、DM Reader がデータ型を変換する際のデータ型マッピングを示します。
カテゴリ | DM データ型 |
整数 | INT、TINYINT、SMALLINT、および BIGINT |
浮動小数点 | REAL、FLOAT、DOUBLE、NUMBER、および DECIMAL |
文字列 | CHAR、VARCHAR、LONGVARCHAR、および TEXT |
日付と時刻 | DATE、DATETIME、TIMESTAMP、および TIME |
ブール値 | BIT |
バイナリ | BINARY、VARBINARY、および BLOB |
データ同期タスクの開発
データ同期タスクの設定のエントリポイントと手順については、以下のセクションを参照してください。パラメータ設定については、タスクの設定タブにある各パラメータのインフォチップを参照してください。
データソースの追加
特定のデータソースとのデータ同期タスクを設定する前に、DataWorks にデータソースを追加する必要があります。詳細については、「データソースの追加と管理」をご参照ください。
単一テーブルのデータを同期するためのバッチ同期タスクの設定
設定手順の詳細については、「コードレス UI を使用したバッチ同期タスクの設定」および「コードエディタを使用したバッチ同期タスクの設定」をご参照ください。
コードエディタを使用してバッチ同期タスクを設定する場合に設定されるすべてのパラメータと実行されるコードについては、「付録:コードとパラメータ」をご参照ください。
データベース内のすべてのデータのバッチ同期を実装するための同期設定
設定手順の詳細については、「Data Integration での同期タスクの設定」をご参照ください。
付録:コードとパラメータ
付録:コードエディタを使用したバッチ同期タスクの設定
コードエディタを使用してバッチ同期タスクを設定する場合は、コードエディタのフォーマット要件に基づいて、関連データソースのリーダーとライターのパラメータを設定する必要があります。フォーマット要件の詳細については、「コードエディタを使用したバッチ同期タスクの設定」をご参照ください。以下の情報は、コードエディタにおけるリーダーとライターのパラメータの設定の詳細について説明しています。
DM Reader のコード
{
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
},
"setting": {
"errorLimit": {
"record": "0"
},
"speed": {
"throttle":true,// スロットリングを有効にするかどうかを指定します。値 false はスロットリングが無効であることを示し、値 true はスロットリングが有効であることを示します。 mbps パラメータは、throttle パラメータが true に設定されている場合にのみ有効になります。
"concurrent":1, // 並列スレッドの最大数。
"mbps":"12"// 最大転送速度。単位:MB/秒。
}
},
"steps": [
{
"category": "reader",
"name": "Reader",
"parameter": {
"datasource": "dm_datasource",
"table": "table",
"column": [
"*"
],
"preSql": [
"delete from XXX;"
],
"fetchSize": 2048
},
"stepType": "dm"
},
{
"category": "writer",
"name": "Writer",
"parameter": {},
"stepType": "stream"
}
],
"type": "job",
"version": "2.0"
}DM Reader のコードのパラメータ
パラメータ | 説明 | 必須 | デフォルト値 |
datasource | データを読み取るデータソースの名前。データソースを追加する方法の詳細については、「DM データソース」をご参照ください。 | はい | デフォルト値なし |
table | データを読み取るテーブルの名前。 | はい | デフォルト値なし |
column | データを読み取るカラムの名前。JSON 配列で名前を指定します。デフォルト値は [ * ] で、ソーステーブルのすべてのカラムを示します。
| はい | デフォルト値なし |
splitPk | DM Readerがデータを読み取る際のデータシャーディングに使用するフィールド。このパラメーターを指定すると、このパラメーターの値に基づいてテーブルがシャーディングされます。 その後、Data Integrationは並列スレッドを実行してデータを読み取ります。このようにして、データはより効率的に同期できます。
| いいえ | デフォルト値なし |
where | WHERE 句。DM Reader は、column、table、および where パラメータの設定に基づいて SQL ステートメントを生成し、生成されたステートメントを使用してデータを読み取ります。たとえば、テストを実行する場合、where パラメータを limit 10 に設定できます。 現在の日付に生成されたデータを読み取るには、where パラメータを
| いいえ | デフォルト値なし |
querySql | 絞り込まれたデータフィルタリングに使用される SQL ステートメント。 このパラメーターを指定した場合、このパラメーターの値に基づいてのみデータがフィルタリングされます。 たとえば、データ同期のために複数のテーブルを結合する場合、このパラメーターを | いいえ | デフォルト値なし |
fetchSize | 一度に読み取るデータレコードの数。このパラメータは、Data Integration とソースデータベース間のインタラクションの回数を決定し、読み取り効率に影響します。 説明 このパラメーターを 2048 より大きい値に設定すると、データ同期中にメモリ不足 (OOM) エラーが発生する可能性があります。 | いいえ | 1,024 |
DM Writer のコード
{
"type": "job",
"steps": [
{
"stepType": "oracle",
"parameter": {
"datasource": "aaa",
"column": [
"PROD_ID",
"name"
],
"where": "",
"splitPk": "",
"encoding": "UTF-8",
"table": "PENGXI.SALES"
},
"name": "Reader",
"category": "reader"
},
{
"stepType": "dm",
"parameter": {
"datasource": "dm_datasource",
"table": "table",
"column": [
"id",
"name"
],
"preSql": [
"delete from XXX;"
]
},
"name": "Writer",
"category": "writer"
}
],
"version": "2.0",
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
},
"setting": {
"errorLimit": {
"record": ""
},
"speed": {
"throttle":true,// スロットリングを有効にするかどうかを指定します。値 false はスロットリングが無効であることを示し、値 true はスロットリングが有効であることを示します。 mbps パラメータは、throttle パラメータが true に設定されている場合にのみ有効になります。
"concurrent":2, // 並列スレッドの最大数。
"mbps":"12"// 最大転送速度。単位:MB/秒。
}
}
}
DM Writer のコードのパラメータ
パラメータ | 説明 | 必須 | デフォルト値 |
datasource | データを書き込むデータソースの名前。データソースを追加する方法の詳細については、「DM データソース」をご参照ください。 | はい | デフォルト値なし |
table | データを書き込むテーブルの名前。テーブルが宛先データベースのデフォルトスキーマを使用する場合、このパラメータの値はテーブルの名前のみで構成されます。テーブルがカスタムスキーマを使用する場合、このパラメータの値は、カスタムスキーマの名前とテーブルの名前の 2 つの部分で構成されます。 | はい | デフォルト値なし |
column | データを書き込むカラムの名前。名前をカンマ (,) で区切ります。 説明 このパラメータを空にしないことをお勧めします。 | はい | デフォルト値なし |
preSql | 同期タスクを実行する前に実行する SQL ステートメント。たとえば、このパラメータを古いデータを削除するために使用される SQL ステートメントに設定できます。1 つのトランザクションで実行できる SQL ステートメントは 1 つだけです。 説明 複数の SQL ステートメントを指定した場合、ステートメントは同じトランザクションで実行されません。 | いいえ | デフォルト値なし |
postSql | 同期タスクを実行した後に実行する SQL ステートメント。たとえば、このパラメータをタイムスタンプを追加するために使用される SQL ステートメントに設定できます。1 つのトランザクションで実行できる SQL ステートメントは 1 つだけです。 説明 複数の SQL ステートメントを指定した場合、ステートメントは同じトランザクションで実行されません。 | いいえ | デフォルト値なし |
batchSize | 一度に書き込むデータレコードの数。ビジネス要件に基づいてこのパラメータを適切な値に設定します。これにより、Data Integration と宛先データベース間のインタラクションが大幅に削減され、スループットが向上します。このパラメータを過度に大きい値に設定すると、データ同期中にメモリ不足 (OOM) エラーが発生する可能性があります。 | いいえ | 1024 |