DataWorks の Data Integration は、Lindorm Reader と Lindorm Writer プラグインを使用して、Lindorm との間でデータの読み書きを行います。このトピックでは、DataWorks が Lindorm に対して提供するデータの読み書き機能について説明します。
適用範囲
LindormTable は、サーバーレスリソースグループ (推奨) および Data Integration 専用リソースグループをサポートします。
コンピュートエンジンは、サーバーレスリソースグループ のみをサポートします。
Lindorm はマルチモデルデータベースです。詳細については、「Lindorm ドキュメント」をご参照ください。DataWorks は現在、LindormTable とコンピュートエンジンのみをサポートしています。
サポートされるフィールドタイプ
Lindorm Reader と Lindorm Writer は、ほとんどの Lindorm データ型をサポートしますが、一部サポートされていない型もあります。ご利用のデータ型がサポートされていることを確認する必要があります。
次の表に、Lindorm Reader と Lindorm Writer のデータ型変換を示します。
型の分類 | データ型 |
整数 | INT、LONG、SHORT |
浮動小数点 | DOUBLE、FLOAT、DOUBLE |
文字列 | STRING |
日付と時刻 | DATE |
ブール値 | BOOLEAN |
バイナリ | BINARYSTRING |
データ同期タスクの開発
同期タスクの設定のエントリポイントと手順については、次の設定ガイドをご参照ください。
オフライン単一テーブル同期
サポートされるデータソース:Data Integration がサポートするすべてのデータソースタイプ。
設定ガイド:「オフライン単一テーブル同期タスク」
コードエディタのすべてのパラメーターとスクリプトデモのリストについては、「付録:スクリプトデモとパラメーター」をご参照ください。
リアルタイム単一テーブル同期
サポートされるデータソース:Kafka、LogHub、Hologres
設定ガイド:「リアルタイム単一テーブル同期タスク」
リアルタイム全データベース同期
サポートされるデータソース:PostgreSQL
設定ガイド:「リアルタイム全データベース同期タスクの設定」
付録:スクリプトデモとパラメーター
コードエディタを使用したバッチ同期タスクの設定
コードエディタを使用してバッチ同期タスクを設定する場合、統一されたスクリプト形式の要件に基づいて、スクリプト内の関連パラメーターを設定する必要があります。詳細については、「コードエディタでのタスク設定」をご参照ください。以下では、コードエディタを使用してバッチ同期タスクを設定する際に、データソースに対して設定する必要があるパラメーターについて説明します。
Reader スクリプトデモ
LindormTable の Lindorm SQL テーブルからローカルマシンにデータを抽出するジョブを設定します。
{ "type": "job", "version": "2.0", "steps": [ { "stepType": "lindorm", "parameter": { "mode": "FixedColumn", "caching": 128, "column": [ "id", "value" ], "envType": 1, "datasource": "lindorm", "tableMode": "tableService", "table": "lindorm_table" }, "name": "lindormreader", "category": "reader" }, { "stepType": "mysql", "parameter": { "postSql": [], "datasource": "lindorm", "session": [], "envType": 1, "column": [ "id", "value" ], "socketTimeout": 3600000, "writeMode": "insert", "batchSize": 1024, "encoding": "UTF-8", "table": "", "preSql": [] }, "name": "Writer", "category": "writer" } ], "setting": { "jvmOption": "", "executeMode": null, "errorLimit": { "record": "0" }, "speed": { // 転送速度を byte/s 単位で設定します。DataX はこの速度に達するように試みますが、超えることはありません。 "byte": 1048576 } // エラー制限 "errorLimit": { // エラーレコードの最大数。エラーレコードの数がこの値を超えると、ジョブは失敗します。 "record": 0, // エラーレコードの最大許容率。たとえば、1.0 は 100% を意味し、0.02 は 2% を意味します。 "percentage": 0.02 } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }LindormTable の Lindorm HBaseLike (WideColumn) テーブルからローカルマシンにデータを抽出するジョブを設定します。
{ "type": "job", "version": "2.0", "steps": [ { "stepType": "lindorm", "parameter": { "mode": "FixedColumn", "column": [ "STRING|rowkey", "INT|f:a" ], "envType": 1, "datasource": "lindorm", "tableMode": "wideColumn", "table":"lindorm_table" }, "name": "lindormreader", "category": "reader" }, { "stepType": "mysql", "parameter": { "postSql": [], "datasource": "_IDB.TAOBAO", "session": [], "envType": 1, "column": [ "id", "value" ], "socketTimeout": 3600000, "guid": "", "writeMode": "insert", "batchSize": 1024, "encoding": "UTF-8", "table": "", "preSql": [] }, "name": "Writer", "category": "writer" } ], "setting": { "jvmOption": "", "executeMode": null, "errorLimit": { "record": "0" }, "speed": { // 転送速度を byte/s 単位で設定します。DataX はこの速度に達するように試みますが、超えることはありません。 "byte": 1048576 } // エラー制限 "errorLimit": { // エラーレコードの最大数。エラーレコードの数がこの値を超えると、ジョブは失敗します。 "record": 0, // エラーレコードの最大許容率。たとえば、1.0 は 100% を意味し、0.02 は 2% を意味します。 "percentage": 0.02 } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }コンピュートエンジンテーブルからローカルマシンにデータを抽出するジョブを設定します。
{ "type": "job", "version": "2.0", "steps": [ { "stepType": "lindorm", "parameter": { "datasource": "lindorm_datasource", "column": [ "id", "value" ], "tableComment": "", "where": "", "session": [], "splitPk": "id", "table": "auto_ob_149912212480" }, "name": "lindormreader", "category": "reader" }, { "stepType": "mysql", "parameter": { "postSql": [], "datasource": "_IDB.TAOBAO", "session": [], "envType": 1, "column": [ "id", "value" ], "socketTimeout": 3600000, "guid": "", "writeMode": "insert", "batchSize": 1024, "encoding": "UTF-8", "table": "", "preSql": [] }, "name": "Writer", "category": "writer" } ], "setting": { "jvmOption": "", "executeMode": null, "errorLimit": { "record": "0" }, "speed": { // 転送速度を byte/s 単位で設定します。DataX はこの速度に達するように試みますが、超えることはありません。 "byte": 1048576 } // エラー制限 "errorLimit": { // エラーレコードの最大数。エラーレコードの数がこの値を超えると、ジョブは失敗します。 "record": 0, // エラーレコードの最大許容率。たとえば、1.0 は 100% を意味し、0.02 は 2% を意味します。 "percentage": 0.02 } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }
Reader スクリプトのパラメーター
パラメーター | 説明 | 必須 | デフォルト値 | ||||||||||||||||||||
mode | LindormTable に固有です。データの読み取りモードを指定します。有効な値は FixedColumn と DynamicColumn です。 | はい | FixedColumn | ||||||||||||||||||||
tableMode | LindormTable に固有です。有効な値は、標準テーブル SQL モードの場合は table、ワイドテーブルモードの場合は wideColumn です。デフォルト値は table です。table モードを選択した場合、このパラメーターを指定する必要はありません。 | いいえ | デフォルトでは指定されません | ||||||||||||||||||||
table | データの読み取り元となる Lindorm テーブルの名前。テーブル名では大文字と小文字が区別されます。 | はい | なし | ||||||||||||||||||||
encoding | LindormTable に固有です。コーデック。有効な値は UTF-8 と GBK です。このパラメーターは通常、バイナリで格納された Lindorm の byte[] 型を String 型に変換するために使用されます。 | いいえ | UTF-8 | ||||||||||||||||||||
caching | LindormTable に固有です。1 回のバッチで取得するレコード数。値を大きくすると、データ同期システムと Lindorm 間のネットワーク対話が大幅に減少し、全体のスループットが向上します。この値が大きすぎると、Lindorm サーバーに過度の負荷がかかったり、データ同期プロセスでメモリ不足 (OOM) エラーが発生したりする可能性があります。 | いいえ | 100 | ||||||||||||||||||||
selects | LindormTable に固有です。現在読み取り中のテーブルタイプでは、システムは自動的にデータをシャーディングしません。デフォルトでは、タスクは単一の同時実行プロセスで実行されます。データをシャーディングするには、selects パラメーターを手動で設定する必要があります。例: 制限事項:
| いいえ | なし | ||||||||||||||||||||
session | コンピュートエンジンに固有です。セッションレベルのジョブパラメーター (例: | いいえ | なし | ||||||||||||||||||||
splitPk | コンピュートエンジンに固有です。シャードキー。このパラメーターは、コンピュートエンジンテーブルからのデータ読み取りに固有です。splitPk を指定すると、指定されたフィールドに基づいてデータがシャーディングされます。データ同期は同時実行タスクを開始してデータを同期するため、効率が向上します。
| いいえ | なし | ||||||||||||||||||||
columns | 読み取るフィールドのリスト。列のトリミングや並べ替えができます。列のトリミングとは、エクスポートする列のサブセットを選択できることを意味します。列の並べ替えとは、テーブルスキーマとは異なる順序で列をエクスポートできることを意味します。
| はい | なし |
Writer スクリプトデモ
MySQL データソースから LindormTable の Lindorm SQL テーブルにデータを書き込むジョブを設定します。
{ "type": "job", "version": "2.0", "steps": [ { "stepType": "mysql", "parameter": { "checkSlave": true, "datasource": " ", "envType": 1, "column": [ "id", "value" ], "socketTimeout": 3600000, "masterSlave": "slave", "connection": [ { "datasource": " ", "table": [] } ], "where": "", "splitPk": "", "encoding": "UTF-8", "print": true }, "name": "mysqlReader", "category": "reader" }, { "stepType": "lindorm", "parameter": { "nullMode": "skip", "datasource": "lindorm_datasource", "envType": 1, "column": [ "id", "value" ], "dynamicColumn": "false", "table": "lindorm_table", "encoding": "utf8" }, "name": "Writer", "category": "writer" } ], "setting": { "jvmOption": "", "executeMode": null, "speed": { // 転送速度を byte/s 単位で設定します。DataX はこの速度に達するように試みますが、超えることはありません。 "byte": 1048576 }, // エラー制限 "errorLimit": { // エラーレコードの最大数。エラーレコードの数がこの値を超えると、ジョブは失敗します。 "record": 0, // エラーレコードの最大許容率。たとえば、1.0 は 100% を意味し、0.02 は 2% を意味します。 "percentage": 0.02 } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }MySQL データソースから LindormTable の Lindorm HBaseLike (WideColumn) テーブルにデータを書き込むジョブを設定します。
{ "type": "job", "version": "2.0", "steps": [ { "stepType": "mysql", "parameter": { "envType": 0, "datasource": " ", "column": [ "id", "value" ], "connection": [ { "datasource": " ", "table": [] } ], "where": "", "splitPk": "", "encoding": "UTF-8" }, "name": "Reader", "category": "reader" }, { "stepType": "lindorm", "parameter": { "datasource": "lindorm_datasource", "table": "xxxxxx", "encoding": "utf8", "nullMode": "skip", "dynamicColumn": "false", "caching": 128, "column": [ // ソースからのフィールドを順にマッピングします。 "ROW|STRING", // 行キー。これは固定設定です。ソースの最初のフィールドが行キーにマッピングされます。この例では、id フィールドが行キーにマッピングされます。 "cf:name|STRING" // cf はカラムファミリー名を指定します (変更可能)。name は宛先の列名を指定します (変更可能)。 ] }, "name":"Writer", "category":"writer" } ], "setting": { "jvmOption": "", "errorLimit": { "record": "0" }, "speed": { "concurrent": 3, "throttle": false } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }MySQL データソースからコンピュートエンジンテーブルにデータを書き込むジョブを設定します。
{ "type": "job", "version": "2.0", "steps": [ { "stepType": "mysql", "parameter": { "envType": 0, "datasource": " ", "column": [ "id", "value" ], "connection": [ { "datasource": " ", "table": [] } ], "where": "", "splitPk": "", "encoding": "UTF-8" }, "name": "Reader", "category": "reader" }, { "stepType": "lindorm", "parameter": { "datasource": "lindorm_datasource", "table": "xxxxxx", "column": [ "id", "value" ], "formatType": "ICEBERG" }, "name":"Writer", "category":"writer" } ], "setting": { "jvmOption": "", "errorLimit": { "record": "0" }, "speed": { "concurrent": 3, "throttle": false } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }
Writer スクリプトのパラメーター
パラメーター | 説明 | 必須 | デフォルト値 |
table | データの書き込み先となる Lindorm テーブルの名前。テーブル名では大文字と小文字が区別されます。 | はい | なし |
encoding | LindormTable のコーデックを指定します。有効な値は UTF-8 と GBK です。このパラメーターは通常、バイナリで格納された Lindorm の byte[] 型を String 型に変換するために使用されます。 | いいえ | UTF-8 |
columns | 書き込むフィールドのリストを指定します。列をトリミングできます。つまり、エクスポートする列のサブセットを選択できます。また、列を並べ替えることもできます。つまり、テーブルスキーマとは異なる順序で列をエクスポートできます。
| はい | なし |
nullMode | このパラメーターは LindormTable にのみ適用されます。ソースデータからの null 値の処理方法を指定します。
| いいえ | EMPTY_BYTES |
formatType | このパラメーターはコンピュートエンジンにのみ適用されます。同期タスクのテーブルのフォーマットを指定します。有効な値:
| いいえ | なし |