DataWorks は、KingbaseES データソースとのデータの読み取りおよび書き込みを行うための KingbaseES Reader と KingbaseES Writer を提供しています。このトピックでは、KingbaseES データソースとのデータ同期機能について説明します。
制限事項
KingbaseES データソースは、データ統合の排他リソースグループのみをサポートしています。
KingbaseES Writer を使用する同期タスクには、少なくとも
INSERT INTO or REPLACE INTOステートメントを実行するための権限が必要です。その他の権限が必要かどうかは、タスクの設定時に preSql パラメーターと postSql パラメーターで指定する SQL ステートメントによって異なります。
データ型マッピング
次の表に、KingbaseES Reader がデータ型を変換する際に基づいているデータ型マッピングを示します。
カテゴリ | KingbaseES データ型 |
整数 | INT、TINYINT、SMALLINT、MEDIUMINT、および BIGINT |
浮動小数点 | FLOAT、DOUBLE、および DECIMAL |
文字列 | VARCHAR、CHAR、TINYTEXT、TEXT、MEDIUMTEXT、および LONGTEXT |
日付と時刻 | DATE、DATETIME、TIMESTAMP、TIME、および YEAR |
ブール値 | BIT および BOOLEAN |
バイナリ | TINYBLOB、MEDIUMBLOB、BLOB、LONGBLOB、および VARBINARY |
上記の表に記載されていないデータ型はサポートされていません。
KingbaseES Reader は、TINYINT(1) を整数データ型として処理します。
データ同期タスクの開発
データ同期タスクの設定のエントリポイントと手順については、以下のセクションを参照してください。パラメーター設定については、タスクの設定タブにある各パラメーターのヒントを参照してください。
データソースの追加
特定のデータソースとのデータ同期タスクを設定する前に、DataWorks にデータソースを追加する必要があります。詳細については、「データソースの追加と管理」をご参照ください。
単一テーブルのデータを同期するためのバッチ同期タスクの設定
設定手順の詳細については、「コードレス UI を使用したバッチ同期タスクの設定」および「コードエディターを使用したバッチ同期タスクの設定」をご参照ください。
コードエディターを使用してバッチ同期タスクを設定する場合に設定されるすべてのパラメーターと実行されるコードについては、「付録:コードとパラメーター」をご参照ください。
付録:コードとパラメーター
付録:コードエディターを使用したバッチ同期タスクの設定
コードエディターを使用してバッチ同期タスクを設定する場合は、コードエディターのフォーマット要件に基づいて、関連データソースのリーダーとライターのパラメーターを設定する必要があります。フォーマット要件の詳細については、「コードエディターを使用したバッチ同期タスクの設定」をご参照ください。次の情報は、コードエディターのリーダーとライターのパラメーターの設定の詳細について説明しています。
KingbaseES Reader のコード
シャーディングされていないテーブルからデータを読み取る同期タスクを設定する
{ "type":"job", "version":"2.0",// バージョン番号。 "steps":[ { "stepType":"kingbasees",// プラグイン名。 "parameter":{ "column":[// 列の名前。 "id" ], "connection":[ { "querySql":["select a,b from join1 c join join2 d on c.id = d.id;"], // ソーステーブルからデータを読み取るために使用する SQL ステートメント。 "datasource":"",// データソースの名前。 "table":[// テーブルの名前。テーブル名は角括弧 [] で囲む必要があります。 "xxx" ] } ], "where":"",// WHERE 句。 "splitPk":"",// シャードキー。 "encoding":"UTF-8"// エンコーディング形式。 }, "name":"Reader", "category":"reader" }, { "stepType":"stream", "parameter":{}, "name":"Writer", "category":"writer" } ], "setting":{ "errorLimit":{ "record":"0"// 許容されるダーティデータレコードの最大数。 }, "speed":{ "throttle":true,// スロットリングを有効にするかどうかを指定します。値 false はスロットリングが無効になっていることを示し、値 true はスロットリングが有効になっていることを示します。 mbps パラメーターは、throttle パラメーターが true に設定されている場合にのみ有効になります。 "concurrent":1, // 並列スレッドの最大数。 "mbps":"12"// 最大転送速度。単位:MB/秒。 } }, "order":{ "hops":[ { "from":"Reader", "to":"Writer" } ] } }シャーディングされたテーブルからデータを読み取る同期タスクを設定する
説明シャーディングされた KingbaseES テーブルからデータを読み取る同期タスクを設定する場合、同じスキーマを持つ複数のテーブルシャードを選択できます。
{ "type": "job", "version": "1.0", "configuration": { "reader": { "plugin": "kingbasees", "parameter": { "connection": [ { "table": [ "tbl1", "tbl2", "tbl3" ], "datasource": "datasourceName1" }, { "table": [ "tbl4", "tbl5", "tbl6" ], "datasource": "datasourceName2" } ], "singleOrMulti": "multi", "splitPk": "db_id", "column": [ "id", "name", "age" ], "where": "1 < id and id < 100" } }, "writer": { } } }
KingbaseES Reader のコードのパラメーター
パラメーター | 説明 |
username | KingbaseES への接続に使用するユーザー名。 |
password | KingbaseES への接続に使用するパスワード。 |
column | データを読み取る列の名前。ソーステーブルのすべての列からデータを読み取る場合は、このパラメーターをアスタリスク (*) に設定します。 |
table | データを読み取るテーブルの名前。 |
jdbcUrl | KingbaseES への接続に使用する Java Database Connectivity (JDBC) URL。例:jdbc:kingbase8://127.0.0.1:30215?currentschema=TEST。 |
splitPk | KingbaseES Reader がデータを読み取るときにデータシャーディングに使用されるフィールド。このパラメーターを設定すると、このパラメーターの値に基づいてソーステーブルがシャーディングされます。その後、データ統合は並列スレッドを実行してデータを読み取ります。 splitPk パラメーターには、整数データ型のフィールドを指定できます。ソーステーブルに整数データ型のフィールドが含まれていない場合は、このパラメーターを空のままにすることができます。 |
KingbaseES Writer のコード
次のコードでは、KingbaseES データベースにデータを書き込む同期タスクが設定されています。
{
"type":"job",
"version":"2.0",// バージョン番号。
"steps":[
{
"stepType":"stream",
"parameter":{},
"name":"Reader",
"category":"reader"
},
{
"stepType":"kingbasees",// プラグイン名。
"parameter":{
"postSql":[],// 同期タスクの実行後に実行する SQL ステートメント。
"datasource":"",// データソースの名前。
"column":[// 列の名前。
"id",
"value"
],
"batchSize":1024,// 一度に書き込むデータレコードの数。
"table":"",// データを書き込むテーブルの名前。
"preSql":[
"delete from XXX;" // 同期タスクの実行前に実行する SQL ステートメント。
]
},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{// 許容されるダーティデータレコードの最大数。
"record":"0"
},
"speed":{
"throttle":true,// スロットリングを有効にするかどうかを指定します。値 false はスロットリングが無効になっていることを示し、値 true はスロットリングが有効になっていることを示します。 mbps パラメーターは、throttle パラメーターが true に設定されている場合にのみ有効になります。
"concurrent":1, // 並列スレッドの最大数。
"mbps":"12"// 最大転送速度。単位:MB/秒。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}KingbaseES Writer のコードのパラメーター
パラメーター | 説明 | 必須 | デフォルト値 |
datasource | データソースの名前。追加されたデータソースの名前と同じである必要があります。コードエディターを使用してデータソースを追加できます。 | はい | デフォルト値なし |
table | データを書き込むテーブルの名前。 | はい | デフォルト値なし |
column | データを書き込む列の名前。 宛先テーブルのすべての列にデータを書き込む場合は、 説明 指定する列名にスラッシュ (/) が含まれている場合は、バックスラッシュ (\) を使用して \"your_column_name\" の形式で列名をエスケープする必要があります。たとえば、列名が /abc/efg の場合は、\"/abc/efg\" としてエスケープする必要があります。 | はい | デフォルト値なし |
preSql | 同期タスクの実行前に実行する SQL ステートメント。コードレス UI では 1 つの SQL ステートメントのみを実行でき、コードエディターでは複数の SQL ステートメントを実行できます。たとえば、このパラメーターを、古いデータを削除するために使用される次の SQL ステートメントに設定できます。 説明 複数の SQL ステートメントを指定した場合、すべてのステートメントが正常に実行されるかどうかは保証されません。 | いいえ | デフォルト値なし |
postSql | 同期タスクの実行後に実行する SQL ステートメント。コードレス UI では 1 つの SQL ステートメントのみを実行でき、コードエディターでは複数の SQL ステートメントを実行できます。たとえば、このパラメーターを、タイムスタンプを追加するために使用される | いいえ | デフォルト値なし |
batchSize | 一度に書き込むデータレコードの数。ビジネス要件に基づいて、このパラメーターを適切な値に設定します。これにより、データ統合と KingbaseES の間の相互作用が大幅に削減され、スループットが向上します。このパラメーターを過度に大きな値に設定すると、データ同期の際にメモリ不足 (OOM) エラーが発生する可能性があります。 | いいえ | 1024 |