すべてのプロダクト
Search
ドキュメントセンター

DataWorks:AnalyticDB for MySQL 3.0データソース

最終更新日:Jan 14, 2025

DataWorksは、AnalyticDB for MySQLの3.0ソースからデータを読み書きするためのAnalyticDB for MySQL 3.0 ReaderおよびAnalyticDB for MySQLのデータライターを提供してい3.0。 このトピックでは、AnalyticDB for MySQL 3.0データソースとの間でデータを同期する機能について説明します。

制限事項

  • data lakehouseエディションのAnalyticDB for MySQL 3.0データソースは、共有リソースグループに接続できず、同期タスクに使用できません。

  • データウェアハウスエディションのAnalyticDB for MySQL 3.0インスタンスの使用からデータレイクハウスエディションの同じインスタンスの使用に切り替えると、AnalyticDB for MySQL 3.0インスタンスが使用され、共有リソースグループで実行される同期タスクの実行に失敗する可能性があります。 そのため、AnalyticDB for MySQL 3.0インスタンスが使用されている同期タスクが共有リソースグループで実行されているかどうかを確認することを推奨します。 このような同期タスクが存在する場合は、共有リソースグループを排他的リソースグループに置き換えてタスクを実行します。

  • ビューのデータは、バッチ同期中に読み取ることができます。

データ型マッピング

バッチデータの読み取り

次の表に、AnalyticDB for MySQL 3.0 Readerによるデータ型の変換に基づくデータ型のマッピングを示します。

カテゴリ

AnalyticDB for MySQL 3.0データ型

Integer

INT、INTEGER、TINYINT、SMALLINT、BIGINT

浮動小数点

FLOAT、DOUBLE、DECIMAL

String

VARCHAR

日付と時刻

DATE、DATETIME、TIMESTAMP、TIME

Boolean

BOOLEAN

バッチデータ書き込み

次の表に、AnalyticDB for MySQL 3.0 Writerによるデータ型の変換に基づくデータ型マッピングを示します。

カテゴリ

AnalyticDB for MySQL 3.0データ型

Integer

INT、INTEGER、TINYINT、SMALLINT、BIGINT

浮動小数点

FLOAT、DOUBLE、DECIMAL

String

VARCHAR

日付と時刻

DATE、DATETIME、TIMESTAMP、TIME

Boolean

BOOLEAN

データ同期タスクの開発

のエントリポイントとデータ同期タスクの設定手順については、次のセクションを参照してください。 パラメーター設定の詳細については、タスクの設定タブで各パラメーターのインフォチップを参照してください。

データソースの追加

特定のデータソースとの間でデータを同期するようにデータ同期タスクを設定する前に、データソースをDataWorksに追加する必要があります。 詳細については、「データソースの追加と管理」をご参照ください。

単一のテーブルのデータを同期するようにバッチ同期タスクを設定する

単一のテーブルまたはデータベースのデータを同期するためのリアルタイム同期タスクの設定

設定手順の詳細については、「DataStudioでのリアルタイム同期タスクの設定」をご参照ください。

同期設定を構成して、データベース内のすべてのデータのバッチ同期、または単一のテーブルまたはデータベース内の完全データと増分データのリアルタイム同期を実装します。

設定手順の詳細については、「Data Integrationでの同期タスクの設定」をご参照ください。

付録: コードとパラメータ

付録: コードエディターを使用してバッチ同期タスクを構成する

コードエディターを使用してバッチ同期タスクを設定する場合は、コードエディターの形式要件に基づいて、関連するデータソースのリーダーとライターのパラメーターを設定する必要があります。 フォーマット要件の詳細については、「コードエディターを使用したバッチ同期タスクの設定」をご参照ください。 次の情報は、コードエディターのリーダーとライターのパラメーターの設定の詳細を説明しています。

AnalyticDB for MySQL 3.0リーダーのコード

{
"type": "ジョブ" 、"steps": [
{
"stepType": "analyticdb_for_mysql", // プラグイン名。 
"parameter": {
"column": [ // 列の名前。 
"id",
「値」、「テーブル」
],
"connection": [
{
"datasource": "xxx", // データソースの名前。 
"table": [ // テーブルの名前。 
"xxx"
]
}
],
"where": "", // WHERE句。 
"splitPk": "", // シャードキー。 
"encoding": "UTF-8" // エンコード形式。 
},
"name": "リーダー" 、"カテゴリ": "リーダー"
},
{
"stepType": "stream" 、"parameter": {}、"name": "Writer" 、"カテゴリ": "ライター"
}
],
"version": "2.0" 、"order": {
"hops": [
{
"from": "Reader" 、"to": "ライター"
}
]
},
"setting": {
"errorLimit": {
"record": "0" // 許可されるダーティデータレコードの最大数。 
},
"speed": {
"throttle":true,// スロットルを有効にするかどうかを指定します。 値falseはスロットリングが無効であることを示し、値trueはスロットリングが有効であることを示します。 mbpsパラメーターは、スロットルパラメーターがtrueに設定されている場合にのみ有効です。 
                        "concurrent":1 // 並列スレッドの最大数。 
                      "mbps":"12" // 最大送信レート。 単位:MB/秒。 
}
}
}

AnalyticDB for MySQL 3.0 Readerのコード内のパラメータ

パラメーター

説明

必須

デフォルト値

データソース

データソースの名前。 追加されたデータソースの名前と同じである必要があります。 コードエディターを使用してデータソースを追加できます。

デフォルト値なし

テーブル

データを読み取るテーブルの名前。

デフォルト値なし

データを読み取る列の名前。 列はJSON配列で指定されます。 デフォルト値は [*] で、すべての列を示します。

  • 読み取る特定の列を選択できます。

  • 列の順序は変更できます。 指定した列から、テーブルのスキーマで指定した順序とは異なる順序でデータを読み取ることができます。

  • 定数がサポートされています。 列名は、MySQLでサポートされているSQL構文 (["id", "'table'", "1", "'bazhen.csy'", "null", "to_char(a + 1)", "2.3", "true"] など) に従って配置する必要があります。

    • id :列名。

    • table :予約済みキーワードを含む列の名前。

    • 1 : 整数定数。

    • bazhen.csy :文字列定数。

    • null : Null ポインター。

    • to_char(a + 1): 文字列の長さを計算するために使用される関数式。

    • 2.3 :浮動小数点定数。

    • true: ブール値。

  • columnパラメーターは、データを読み取るすべての列を明示的に指定する必要があります。 このパラメーターは空白のままにできません。

デフォルト値なし

splitPk

AnalyticDB for MySQL 3.0 Readerがデータを読み取るときにデータシャーディングに使用されるフィールド。 このパラメーターを指定すると、ソーステーブルはこのパラメーターの値に基づいてシャードされます。 次に、Data Integrationは並列スレッドを実行してデータを読み取ります。 このようにして、データをより効率的に同期できる。

  • splitPkパラメーターをテーブルの主キー列の名前に設定することを推奨します。 データは、特定のシャードにのみ集中的に分散するのではなく、主キー列に基づいて異なるシャードに均等に分散できます。

  • splitPkパラメーターは、整数データ型のデータのみのシャーディングをサポートします。 splitPkパラメーターを、文字列、浮動小数点、日付などのサポートされていないデータ型のフィールドに設定した場合、このパラメーターの設定は無視され、1つのスレッドがデータの読み取りに使用されます。

  • splitPkパラメーターが指定されていないか、空のままになっている場合は、1つのスレッドを使用してデータを読み取ります。

任意

デフォルト値なし

どこで

WHERE句。 たとえば、このパラメーターをgmt_create > $bizdateに設定して、当日に生成されたデータを読み取ることができます。

  • WHERE句を使用して、増分データを読み取ることができます。 whereパラメーターが指定されていないか、空のままの場合、AnalyticDB for MySQL 3.0 Readerはすべてのデータを読み取ります。

  • whereパラメーターをlimit 10に設定しないでください。 この値は、SQL WHERE句のMySQLの制約に準拠していません。

任意

デフォルト値なし

AnalyticDB for MySQL 3.0ライターのコード

{
"type": "ジョブ" 、"steps": [
{
"stepType": "stream" 、"parameter": {}、"name": "リーダー" 、"カテゴリ": "リーダー"
},
{
"stepType": "analyticdb_for_mysql", // プラグイン名。 
"parameter": {
"postSql": [], // 同期タスクの実行後に実行するSQL文。 
"tableType": null, // 予約フィールド。 デフォルト値:null 
"datasource": "hangzhou_ads", // データソースの名前。 
"column": [ // 列の名前。 
"id",
「値」
],
"guid": null、"writeMode": "insert", // 書き込みモード。 詳細については、writeMode パラメーターの説明をご参照ください。 
"batchSize": 2048, // 一度に書き込むデータレコードの数。 詳細については、batchSizeパラメーターの説明をご参照ください。 
"encoding": "UTF-8", // エンコード形式。 
"table": "t5", // データを書き込むテーブルの名前。 
"preSql": [] // 同期タスクが実行される前に実行するSQL文。 
},
"name": "Writer" 、"カテゴリ": "ライター"
}
],
"version": "2.0", // バージョン番号。 
"order": {
"hops": [
{
"from": "Reader" 、"to": "ライター"
}
]
},
"setting": {
"errorLimit": {
"record": "0" // 許可されるダーティデータレコードの最大数。 
},
"speed": {
"throttle":true,// スロットルを有効にするかどうかを指定します。 値falseはスロットリングが無効であることを示し、値trueはスロットリングが有効であることを示します。 mbpsパラメーターは、スロットルパラメーターがtrueに設定されている場合にのみ有効です。 
                        "concurrent":2, // 並列スレッドの最大数。 
                        "mbps":"12" // 最大送信レート。 単位:MB/秒。 
}
}
}

AnalyticDB for MySQL 3.0 Writerのコード内のパラメータ

パラメーター

説明

必須

デフォルト値

データソース

データソースの名前。 追加されたデータソースの名前と同じである必要があります。 コードエディターを使用してデータソースを追加できます。

デフォルト値なし

テーブル

データを書き込むテーブルの名前。

デフォルト値なし

writeMode

書き込みモードを設定します。 有効な値: insertreplaceupdate

  • insert: プライマリキーの競合または一意のインデックスの競合が発生しない場合、データは宛先テーブルに直接書き込まれます。 プライマリキーの競合または一意のインデックスの競合が発生した場合、宛先テーブルに書き込むデータは自動的に無視され、更新は実行されません。

  • replace: プライマリキーの競合または一意のインデックスの競合が発生しない場合、データは宛先テーブルに直接書き込まれます。 主キーの競合または一意のインデックスの競合が発生すると、競合するデータを含む行が削除され、新しい行が挿入されます。 これは、元の行のすべてのフィールドが新しい行に置き換えられることを示します。

  • update: プライマリキーの競合または一意のインデックスの競合が発生しない場合、データは宛先テーブルに直接書き込まれます。 主キーの競合または一意インデックスの競合が発生した場合、元の行のすべてのフィールドが新しい行に置き換えられます。

    説明

    このモードは、スクリプトモードでのみサポートされます。

任意

挿入

データを書き込む列の名前。 名前は、"column": ["id", "name", "age"] などのコンマ (,) で区切ります。 宛先テーブルのすべての列にデータを書き込む場合は、このパラメーターをアスタリスク (*) ("column": ["*"] など) に設定します。

説明

列名にselectが含まれている場合は、列名をbackticks (') で囲みます。 たとえば、item_select_no'item_select_noとして表示されます。

デフォルト値なし

preSql

同期タスクが実行される前に実行するSQL文。 たとえば、古いデータを削除するために使用されるSQL文にこのパラメーターを設定できます。 コードレスUIで実行できるSQL文は1つだけで、コードエディターでは複数のSQL文を実行できます。

説明

複数のSQL文を指定した場合、それらの文は同じトランザクションで実行されません。

任意

デフォルト値なし

postSql

同期タスクの実行後に実行するSQL文。 たとえば、タイムスタンプの追加に使用するSQL文にこのパラメーターを設定できます。 コードレスUIで実行できるSQL文は1つだけで、コードエディターでは複数のSQL文を実行できます。

説明

複数のSQL文を指定した場合、それらの文は同じトランザクションで実行されません。

任意

デフォルト値なし

batchSize

一度に書き込まれるデータレコード数です。 このパラメーターをビジネス要件に基づいて適切な値に設定します。 これにより、Data IntegrationとAnalyticDB for MySQL 3.0間のやり取りが大幅に削減され、スループットが向上します。 このパラメーターを大きすぎる値に設定すると、データ同期中にメモリ不足 (OOM) エラーが発生する可能性があります。

任意

1,024