すべてのプロダクト
Search
ドキュメントセンター

DataWorks:KingbaseES データソース

最終更新日:Jan 11, 2025

DataWorks は、KingbaseES データソースとのデータの読み取りおよび書き込みを行うための KingbaseES Reader と KingbaseES Writer を提供しています。このトピックでは、KingbaseES データソースとのデータ同期機能について説明します。

制限事項

  • KingbaseES データソースは、データ統合の排他リソースグループのみをサポートしています。

  • KingbaseES Writer を使用する同期タスクには、少なくとも INSERT INTO or REPLACE INTO ステートメントを実行するための権限が必要です。その他の権限が必要かどうかは、タスクの設定時に preSql パラメーターと postSql パラメーターで指定する SQL ステートメントによって異なります。

データ型マッピング

次の表に、KingbaseES Reader がデータ型を変換する際に基づいているデータ型マッピングを示します。

カテゴリ

KingbaseES データ型

整数

INT、TINYINT、SMALLINT、MEDIUMINT、および BIGINT

浮動小数点

FLOAT、DOUBLE、および DECIMAL

文字列

VARCHAR、CHAR、TINYTEXT、TEXT、MEDIUMTEXT、および LONGTEXT

日付と時刻

DATE、DATETIME、TIMESTAMP、TIME、および YEAR

ブール値

BIT および BOOLEAN

バイナリ

TINYBLOB、MEDIUMBLOB、BLOB、LONGBLOB、および VARBINARY

重要
  • 上記の表に記載されていないデータ型はサポートされていません。

  • KingbaseES Reader は、TINYINT(1) を整数データ型として処理します。

データ同期タスクの開発

データ同期タスクの設定のエントリポイントと手順については、以下のセクションを参照してください。パラメーター設定については、タスクの設定タブにある各パラメーターのヒントを参照してください。

データソースの追加

特定のデータソースとのデータ同期タスクを設定する前に、DataWorks にデータソースを追加する必要があります。詳細については、「データソースの追加と管理」をご参照ください。

単一テーブルのデータを同期するためのバッチ同期タスクの設定

付録:コードとパラメーター

付録:コードエディターを使用したバッチ同期タスクの設定

コードエディターを使用してバッチ同期タスクを設定する場合は、コードエディターのフォーマット要件に基づいて、関連データソースのリーダーとライターのパラメーターを設定する必要があります。フォーマット要件の詳細については、「コードエディターを使用したバッチ同期タスクの設定」をご参照ください。次の情報は、コードエディターのリーダーとライターのパラメーターの設定の詳細について説明しています。

KingbaseES Reader のコード

  • シャーディングされていないテーブルからデータを読み取る同期タスクを設定する

    {
        "type":"job",
        "version":"2.0",// バージョン番号。
        "steps":[
            {
                "stepType":"kingbasees",// プラグイン名。
                "parameter":{
                    "column":[// 列の名前。
                        "id"
                    ],
                    "connection":[
                        {   "querySql":["select a,b from join1 c join join2 d on c.id = d.id;"], // ソーステーブルからデータを読み取るために使用する SQL ステートメント。
                            "datasource":"",// データソースの名前。
                            "table":[// テーブルの名前。テーブル名は角括弧 [] で囲む必要があります。
                                "xxx"
                            ]
                        }
                    ],
                    "where":"",// WHERE 句。
                    "splitPk":"",// シャードキー。
                    "encoding":"UTF-8"// エンコーディング形式。
                },
                "name":"Reader",
                "category":"reader"
            },
            {
                "stepType":"stream",
                "parameter":{},
                "name":"Writer",
                "category":"writer"
            }
        ],
        "setting":{
            "errorLimit":{
                "record":"0"// 許容されるダーティデータレコードの最大数。
            },
            "speed":{
                "throttle":true,// スロットリングを有効にするかどうかを指定します。値 false はスロットリングが無効になっていることを示し、値 true はスロットリングが有効になっていることを示します。 mbps パラメーターは、throttle パラメーターが true に設定されている場合にのみ有効になります。
                "concurrent":1, // 並列スレッドの最大数。
                "mbps":"12"// 最大転送速度。単位:MB/秒。
            }
        },
        "order":{
            "hops":[
                {
                    "from":"Reader",
                    "to":"Writer"
                }
            ]
        }
    }
  • シャーディングされたテーブルからデータを読み取る同期タスクを設定する

    説明

    シャーディングされた KingbaseES テーブルからデータを読み取る同期タスクを設定する場合、同じスキーマを持つ複数のテーブルシャードを選択できます。

    {
        "type": "job",
        "version": "1.0",
        "configuration": {
            "reader": {
                "plugin": "kingbasees",
                "parameter": {
                    "connection": [
                        {
                            "table": [
                                "tbl1",
                                "tbl2",
                                "tbl3"
                            ],
                            "datasource": "datasourceName1"
                        },
                        {
                            "table": [
                                "tbl4",
                                "tbl5",
                                "tbl6"
                            ],
                            "datasource": "datasourceName2"
                        }
                    ],
                    "singleOrMulti": "multi",
                    "splitPk": "db_id",
                    "column": [
                        "id", "name", "age"
                    ],
                    "where": "1 < id and id < 100"
                }
            },
            "writer": {            
            }
        }
    }

KingbaseES Reader のコードのパラメーター

パラメーター

説明

username

KingbaseES への接続に使用するユーザー名。

password

KingbaseES への接続に使用するパスワード。

column

データを読み取る列の名前。ソーステーブルのすべての列からデータを読み取る場合は、このパラメーターをアスタリスク (*) に設定します。

table

データを読み取るテーブルの名前。

jdbcUrl

KingbaseES への接続に使用する Java Database Connectivity (JDBC) URL。例:jdbc:kingbase8://127.0.0.1:30215?currentschema=TEST

splitPk

KingbaseES Reader がデータを読み取るときにデータシャーディングに使用されるフィールド。このパラメーターを設定すると、このパラメーターの値に基づいてソーステーブルがシャーディングされます。その後、データ統合は並列スレッドを実行してデータを読み取ります。

splitPk パラメーターには、整数データ型のフィールドを指定できます。ソーステーブルに整数データ型のフィールドが含まれていない場合は、このパラメーターを空のままにすることができます。

KingbaseES Writer のコード

次のコードでは、KingbaseES データベースにデータを書き込む同期タスクが設定されています。

{
    "type":"job",
    "version":"2.0",// バージョン番号。
    "steps":[
        {
            "stepType":"stream",
            "parameter":{},
            "name":"Reader",
            "category":"reader"
        },
        {
            "stepType":"kingbasees",// プラグイン名。
            "parameter":{
                "postSql":[],// 同期タスクの実行後に実行する SQL ステートメント。
                "datasource":"",// データソースの名前。
                "column":[// 列の名前。
                    "id",
                    "value"
                ],
                "batchSize":1024,// 一度に書き込むデータレコードの数。
                "table":"",// データを書き込むテーブルの名前。
                "preSql":[
                     "delete from XXX;" // 同期タスクの実行前に実行する SQL ステートメント。
                   ]
            },
            "name":"Writer",
            "category":"writer"
        }
    ],
    "setting":{
        "errorLimit":{// 許容されるダーティデータレコードの最大数。
            "record":"0"
        },
        "speed":{
            "throttle":true,// スロットリングを有効にするかどうかを指定します。値 false はスロットリングが無効になっていることを示し、値 true はスロットリングが有効になっていることを示します。 mbps パラメーターは、throttle パラメーターが true に設定されている場合にのみ有効になります。
            "concurrent":1, // 並列スレッドの最大数。
            "mbps":"12"// 最大転送速度。単位:MB/秒。
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}

KingbaseES Writer のコードのパラメーター

パラメーター

説明

必須

デフォルト値

datasource

データソースの名前。追加されたデータソースの名前と同じである必要があります。コードエディターを使用してデータソースを追加できます。

はい

デフォルト値なし

table

データを書き込むテーブルの名前。

はい

デフォルト値なし

column

データを書き込む列の名前。 "column": ["id", "name", "age"] のように、名前をコンマ (,) で区切ります。

宛先テーブルのすべての列にデータを書き込む場合は、"column": ["*"] のように、このパラメーターをアスタリスク (*) に設定します。

説明

指定する列名にスラッシュ (/) が含まれている場合は、バックスラッシュ (\) を使用して \"your_column_name\" の形式で列名をエスケープする必要があります。たとえば、列名が /abc/efg の場合は、\"/abc/efg\" としてエスケープする必要があります。

はい

デフォルト値なし

preSql

同期タスクの実行前に実行する SQL ステートメント。コードレス UI では 1 つの SQL ステートメントのみを実行でき、コードエディターでは複数の SQL ステートメントを実行できます。たとえば、このパラメーターを、古いデータを削除するために使用される次の SQL ステートメントに設定できます。

truncate table tablename

説明

複数の SQL ステートメントを指定した場合、すべてのステートメントが正常に実行されるかどうかは保証されません。

いいえ

デフォルト値なし

postSql

同期タスクの実行後に実行する SQL ステートメント。コードレス UI では 1 つの SQL ステートメントのみを実行でき、コードエディターでは複数の SQL ステートメントを実行できます。たとえば、このパラメーターを、タイムスタンプを追加するために使用される alter table tablenameadd colname timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP SQL ステートメントに設定できます。

いいえ

デフォルト値なし

batchSize

一度に書き込むデータレコードの数。ビジネス要件に基づいて、このパラメーターを適切な値に設定します。これにより、データ統合と KingbaseES の間の相互作用が大幅に削減され、スループットが向上します。このパラメーターを過度に大きな値に設定すると、データ同期の際にメモリ不足 (OOM) エラーが発生する可能性があります。

いいえ

1024