DataWorks は、MariaDB データソースからデータを読み取り、MariaDB データソースにデータを書き込むための MariaDB Reader と MariaDB Writer を提供します。このトピックでは、MariaDB データソースとのデータ同期機能について説明します。
サポートされている MariaDB バージョン
バッチデータの読み取りと書き込み
MariaDB 5.5.x、MariaDB 10.0.x、MariaDB 10.1.x、MariaDB 10.2.x、および MariaDB 10.3.x がサポートされています。バッチ同期中にビューのデータを読み取ることができます。
サポートされているデータ型
各 MariaDB バージョンのすべてのデータ型については、MariaDB 公式ドキュメントを参照してください。次の表は、MariaDB 10.3.x の主なデータ型のサポート状況を示しています。
データ型 | バッチデータ読み取り用の MariaDB Reader | バッチデータ書き込み用の MariaDB Writer |
TINYINT | サポートされています | サポートされています |
SMALLINT | サポートされています | サポートされています |
INTEGER | サポートされています | サポートされています |
BIGINT | サポートされています | サポートされています |
FLOAT | サポートされています | サポートされています |
DOUBLE | サポートされています | サポートされています |
DECIMAL/NUMBERIC | サポートされています | サポートされています |
REAL | サポートされていません | サポートされていません |
VARCHAR | サポートされています | サポートされています |
JSON | サポートされています | サポートされています |
TEXT | サポートされています | サポートされています |
MEDIUMTEXT | サポートされています | サポートされています |
LONGTEXT | サポートされています | サポートされています |
VARBINARY | サポートされています | サポートされています |
BINARY | サポートされています | サポートされています |
TINYBLOB | サポートされています | サポートされています |
MEDIUMBLOB | サポートされています | サポートされています |
LONGBLOB | サポートされています | サポートされています |
ENUM | サポートされています | サポートされています |
SET | サポートされています | サポートされています |
BOOLEAN | サポートされています | サポートされています |
BIT | サポートされています | サポートされています |
DATE | サポートされています | サポートされています |
DATETIME | サポートされています | サポートされています |
TIMESTAMP | サポートされています | サポートされています |
TIME | サポートされています | サポートされています |
YEAR | サポートされています | サポートされています |
LINESTRING | サポートされていません | サポートされていません |
POLYGON | サポートされていません | サポートされていません |
MULTIPOINT | サポートされていません | サポートされていません |
MULTILINESTRING | サポートされていません | サポートされていません |
MULTIPOLYGON | サポートされていません | サポートされていません |
GEOMETRYCOLLECTION | サポートされていません | サポートされていません |
データ同期前に MariaDB 環境を準備する
DataWorks を使用して MariaDB データソースとの間でデータを同期する前に、MariaDB 環境を準備する必要があります。これにより、同期タスクを構成し、MariaDB データソースとの間で期待どおりにデータを同期できるようになります。以下の情報は、MariaDB データソースとのデータ同期のために MariaDB 環境を準備する方法について説明しています。
準備 1:MariaDB データベースのバージョンを確認する
Data Integration には、MariaDB バージョンに関する特定の要件があります。サポートされている MariaDB バージョンを参照して、MariaDB データベースのバージョンが要件を満たしているかどうかを確認できます。関連するステートメントを実行して、MariaDB データベースのバージョンを確認できます。
準備 2:必要な権限を持つアカウントを準備する
DataWorks が MariaDB データベースにアクセスするためのアカウントを計画して作成することをお勧めします。このようなアカウントを準備するには、次の手順を実行します。
アカウントを作成します。この手順はオプションです。
詳細については、「MariaDB データベースへのアクセスに使用するアカウントを作成する」を参照してください。
アカウントに必要な権限を付与します。
バッチデータの読み取り:アカウントには SELECT 権限が必要です。
バッチデータの書き込み:アカウントには INSERT、DELETE、および UPDATE 権限が必要です。
次のステートメントを実行して、アカウントに権限を付与します。または、アカウントに SUPER 権限を付与します。ステートメントを実行するときは、データ同期用のアカウントを作成したアカウントに置き換えます。
-- CREATE USER 'Account for data synchronization'@'%' IDENTIFIED BY 'Password'; // データ同期に使用できるアカウントを作成し、パスワードを指定します。このようにして、任意のホストからアカウントとパスワードを使用してデータベースにアクセスできます。% はホストを示します。 GRANT SELECT, INSERT, DELETE,UPDATE CLIENT ON *.* TO 'Account for data synchronization'@'%'; // アカウントに SELECT、INSERT、DELETE、および UPDATE 権限を付与します。
データ同期タスクを開発する
データ同期タスクのエントリポイントと構成手順については、以下のセクションを参照してください。パラメータ設定については、タスクの構成タブにある各パラメータのヒントを参照してください。
データソースを追加する
特定のデータソースとの間でデータを同期するデータ同期タスクを構成する前に、DataWorks にデータソースを追加する必要があります。詳細については、「データソースを追加および管理する」を参照してください。
単一テーブルのデータを同期するためのバッチ同期タスクを構成する
構成手順の詳細については、「コードエディタを使用してバッチ同期タスクを構成する」および「コードレス UI を使用してバッチ同期タスクを構成する」を参照してください。
コードエディタを使用してバッチ同期タスクを構成するときに構成されるすべてのパラメータと実行されるコードについては、「付録:コードとパラメータ」を参照してください。
付録:コードとパラメータ
付録:コードエディタを使用してバッチ同期タスクを構成する
コードエディタを使用してバッチ同期タスクを構成する場合は、コードエディタの形式要件に基づいて、関連データソースのリーダーとライターのパラメータを構成する必要があります。形式要件の詳細については、「コードエディタを使用してバッチ同期タスクを構成する」を参照してください。以下の情報は、コードエディタのリーダーとライターのパラメータの構成の詳細について説明しています。
MariaDB Reader のコード
{
"type":"job",
"version":"2.0",// バージョン番号。
"steps":[
{
"stepType":"mariadb",// プラグイン名。
"parameter":{
"column":[// 列の名前。
"id"
],
"connection":[
{ "querySql":["select a,b from join1 c join join2 d on c.id = d.id;"], // ソーステーブルからデータを読み取るために使用される SQL ステートメント。
"datasource":"",// データソースの名前。
"table":[// ソーステーブルの名前。テーブル名は角かっこ [] で囲む必要があります。
"xxx"
]
}
],
"where":"",// WHERE 句。
"splitPk":"",// シャードキー。
"encoding":"UTF-8"// エンコーディング形式。
},
"name":"Reader",
"category":"reader"
},
{
"stepType":"stream",
"parameter":{},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"// 許容されるダーティデータレコードの最大数。
},
"speed":{
"throttle":true,// スロットリングを有効にするかどうかを指定します。値 false はスロットリングが無効になっていることを示し、値 true はスロットリングが有効になっていることを示します。mbps パラメータは、throttle パラメータが true に設定されている場合にのみ有効になります。
"concurrent":1,// 並列スレッドの最大数。
"mbps":"12"// 最大転送速度。単位:MB/s。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}MariaDB Reader のコードのパラメータ
パラメータ | 説明 | 必須 | デフォルト値 |
datasource | データソースの名前。追加されたデータソースの名前と同じである必要があります。コードエディタを使用してデータソースを追加できます。 | はい | デフォルト値なし |
table | データを読み取るテーブルの名前。各同期タスクは、1 つのテーブルにのみデータを同期するために使用できます。 次の例は、table パラメータの高度な使用方法を示しています。例では、table パラメータを使用してテーブル範囲を指定しています。
説明 MariaDB Readerは、tableパラメーターで指定されたパーティション内の、columnパラメーターで指定された列からデータを読み取ります。指定されたパーティションまたは列が存在しない場合、同期タスクは失敗します。 | はい | デフォルト値なし |
column | データを読み取る列の名前。JSON 配列で名前を指定します。デフォルト値は [ * ] で、ソーステーブルのすべての列を示します。
| はい | デフォルト値なし |
splitPk | MariaDB Reader がデータを読み取るときにデータシャーディングに使用されるフィールド。このパラメータを構成すると、ソーステーブルはこのパラメータの値に基づいてシャードされます。次に、Data Integration は並列スレッドを実行してデータを読み取ります。このようにして、データをより効率的に同期できます。
| いいえ | デフォルト値なし |
where | WHERE 句。たとえば、このパラメータを gmt_create > $bizdate に設定して、現在の日に生成されたデータを読み取ることができます。
| いいえ | デフォルト値なし |
querySql(高度なパラメータ。コードエディタでのみ使用可能) | 絞り込んだデータフィルタリングに使用される SQL ステートメント。このパラメータを構成すると、MariaDB Reader は table、column、および splitPk パラメータの設定を無視し、このパラメータの値のみに基づいてデータをフィルタリングします。 たとえば、複数のテーブルを結合してデータ同期を行う場合は、このパラメータを select a,b from table_a join table_b on table_a.id = table_b.id に設定します。querySql パラメータの優先度は、table、column、where、および splitPk パラメータの優先度よりも高くなります。querySql パラメータを構成すると、MariaDB Reader は table、column、where、および splitPk パラメータの設定を無視します。システムは、querySql パラメータから datasource パラメータで指定されたデータソースのユーザー名やパスワードなどの情報を解析します。 説明 querySql パラメータの名前は大文字と小文字が区別されます。たとえば、querysql は有効になりません。 | いいえ | デフォルト値なし |
MariaDB Writer のコード
{
"type":"job",
"version":"2.0",// バージョン番号。
"steps":[
{
"stepType":"stream",
"parameter":{},
"name":"Reader",
"category":"reader"
},
{
"stepType":"mariadb",// プラグイン名。
"parameter":{
"postSql":[],// 同期タスクの実行後に実行する SQL ステートメント。
"datasource":"",// データソースの名前。
"column":[// 列の名前。
"id",
"value"
],
"writeMode":"insert into",// 書き込みモード。有効な値:insert into、replace into、および on duplicate key update。
"batchSize":1024,// 一度に書き込むデータレコードの数。
"table":"",// テーブルの名前。
"preSql":[
"delete from XXX;" // 同期タスクの実行前に実行する SQL ステートメント。
]
},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{// 許容されるダーティデータレコードの最大数。
"record":"0"
},
"speed":{
"throttle":true,// スロットリングを有効にするかどうかを指定します。値 false はスロットリングが無効になっていることを示し、値 true はスロットリングが有効になっていることを示します。mbps パラメータは、throttle パラメータが true に設定されている場合にのみ有効になります。
"concurrent":1, // 並列スレッドの最大数。
"mbps":"12"// 最大転送速度。単位:MB/s。このパラメータを適切な値に設定すると、ソースの読み取りワークロードとデスティネーションの書き込みワークロードを削減できます。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}MariaDB Writer のコードのパラメータ
パラメータ | 説明 | 必須 | デフォルト値 |
datasource | データソースの名前。追加されたデータソースの名前と同じである必要があります。コードエディタを使用してデータソースを追加できます。 | はい | デフォルト値なし |
table | データを書き込むテーブルの名前。 | はい | デフォルト値なし |
writeMode | 書き込みモード。有効な値:insert into、on duplicate key update、および replace into。
| いいえ | insert into |
column | データを書き込む列の名前。"column": ["id", "name", "age"] など、コンマ (,) で区切って名前を指定します。デスティネーションテーブルのすべての列にデータを書き込む場合は、"column":["*"] など、このパラメータをアスタリスク (*) に設定します。 | はい | デフォルト値なし |
preSql | 同期タスクの実行前に実行する SQL ステートメント。コードレス UI では 1 つの SQL ステートメントのみを実行でき、コードエディタでは複数の SQL ステートメントを実行できます。たとえば、TRUNCATE TABLE tablename ステートメントを実行して、同期タスクの実行前に古いデータを削除できます。 説明 複数の SQL ステートメントを指定した場合、ステートメントは同じトランザクションで実行されません。 | いいえ | デフォルト値なし |
postSql | 同期タスクの実行後に実行する SQL ステートメント。コードレス UI では 1 つの SQL ステートメントのみを実行でき、コードエディタでは複数の SQL ステートメントを実行できます。たとえば、ALTER TABLE tablename add colname timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP ステートメントを実行して、同期タスクの実行後にタイムスタンプを追加できます。 説明 複数の SQL ステートメントを指定した場合、ステートメントは同じトランザクションで実行されません。 | いいえ | デフォルト値なし |
batchSize | 一度に書き込むデータレコードの数。ビジネス要件に基づいて、このパラメータを適切な値に設定します。これにより、Data Integration と MariaDB の間の相互作用が大幅に削減され、スループットが向上します。このパラメータを過度に大きな値に設定すると、データ同期中にメモリ不足 (OOM) エラーが発生する可能性があります。 | いいえ | 256 |
updateColumn | プライマリキーの競合または一意のインデックスの競合が発生したときに更新される列の名前。このパラメータは、writeMode パラメータが on duplicate key update に設定されている場合にのみ有効になります。複数の列名はコンマ (,) で区切ります。例:"updateColumn":["name", "age"]。 | いいえ | デフォルト値なし |