MySQL データソースは、MySQL との間で読み書きを行うための双方向チャネルを提供します。このトピックでは、DataWorks の MySQL に対するデータ同期機能について説明します。
サポートされている MySQL のバージョン
オフラインでの読み書き:
MySQL 5.5.x、MySQL 5.6.x、MySQL 5.7.x、および MySQL 8.0.x をサポートします。Amazon RDS for MySQL、Azure MySQL、および Amazon Aurora MySQL と互換性があります。
オフライン同期はビューからのデータ読み取りをサポートします。
リアルタイムでの読み取り:
Data Integration は、MySQL バイナリログをサブスクライブすることで、MySQL データをリアルタイムで読み取ります。リアルタイム同期は、MySQL 5.5.x、5.6.x、5.7.x、および 8.0.x でサポートされています。バージョン 8.0.x については、関数インデックスなどの新機能を除くすべての機能がサポートされています。Amazon RDS for MySQL、Azure MySQL、および Amazon Aurora MySQL と互換性があります。
重要DRDS for MySQL データベースからデータを同期するには、MySQL データソースとして構成しないでください。詳細については、「DRDS データソースを構成する」をご参照ください。
制限
リアルタイム読み取り
MySQL 読み取り専用インスタンスからのデータ同期はサポートされていません。
関数インデックスを含むテーブルの同期はサポートされていません。
XA ROLLBACK はサポートされていません。
XA PREPARE 状態のトランザクション内のデータについては、リアルタイム同期がデータを宛先に書き込みます。XA ROLLBACK が発生した場合、リアルタイム同期はこのデータをロールバックしません。このシナリオを処理するには、リアルタイム同期タスクからテーブルを手動で削除し、再度追加して同期を再開する必要があります。
MySQL サーバーのバイナリロギングでは、ROW フォーマットのみがサポートされています。
リアルタイム同期は、カスケード削除操作によって削除された関連テーブルのレコードを同期しません。
Amazon Aurora MySQL データベースの場合、プライマリまたはライターデータベースインスタンスに接続する必要があります。これは、AWS が Aurora MySQL の読み取り専用レプリカでバイナリロギング機能を有効にすることを許可していないためです。リアルタイム同期タスクでは、増分更新を実行するためにバイナリログが必要です。
オンライン DDL 操作のリアルタイム同期は、Data Management DMS を使用した MySQL テーブルへの列の追加 (Add Column) のみをサポートします。
MySQL からのストアドプロシージャの読み取りはサポートされていません。
オフライン読み取り
MySQL Reader プラグインがシャード化されたデータベース内の複数のテーブルからデータを同期する場合、デフォルトではシャード化されたタスクの数はテーブルの数と等しくなります。単一のテーブルでシャーディングを有効にするには、同時タスクの数をテーブルの数より大きい値に設定する必要があります。
MySQL からのストアドプロシージャの読み取りはサポートされていません。
サポートされているデータの型
各 MySQL バージョンのデータ型の完全なリストについては、MySQL の公式ドキュメントをご参照ください。次の表は、例として MySQL 8.0.x でサポートされている主要なデータ型をリストしています。
データの型 | オフライン読み取り (MySQL Reader) | オフライン書き込み (MySQL Writer) | リアルタイム読み取り | リアルタイム書き込み |
TINYINT | ||||
SMALLINT | ||||
INTEGER | ||||
BIGINT | ||||
FLOAT | ||||
DOUBLE | ||||
DECIMAL/NUMERIC | ||||
REAL | ||||
VARCHAR | ||||
JSON | ||||
TEXT | ||||
MEDIUMTEXT | ||||
LONGTEXT | ||||
VARBINARY | ||||
BINARY | ||||
TINYBLOB | ||||
MEDIUMBLOB | ||||
LONGBLOB | ||||
ENUM | ||||
SET | ||||
BOOLEAN | ||||
BIT | ||||
DATE | ||||
DATETIME | ||||
TIMESTAMP | ||||
TIME | ||||
YEAR | ||||
LINESTRING | ||||
POLYGON | ||||
MULTIPOINT | ||||
MULTILINESTRING | ||||
MULTIPOLYGON | ||||
GEOMETRYCOLLECTION |
準備
DataWorks で MySQL データソースを構成する前に、後続のタスクが期待どおりに実行されるように、このセクションで説明する準備を完了してください。
次のセクションでは、MySQL データを同期するために必要な準備について説明します。
MySQL バージョンの確認
Data Integration には MySQL に対する特定のバージョン要件があります。詳細については、「サポートされている MySQL のバージョン」セクションをご参照ください。MySQL データベースで次の文を実行して、現在のバージョンを確認できます。
SELECT version();アカウント権限の構成
DataWorks がデータソースにアクセスするための専用の MySQL アカウントを作成します。手順は次のとおりです。
オプション: アカウントを作成します。
詳細については、「MySQL アカウントを作成する」をご参照ください。
権限を構成します。
オフライン
オフライン同期
MySQL からデータを読み取るには、アカウントにターゲットテーブルに対する
SELECT権限が必要です。MySQL にデータを書き込むには、アカウントにターゲットテーブルに対する
INSERT、DELETE、およびUPDATE権限が必要です。
リアルタイム同期
リアルタイム同期の場合、アカウントにはデータベースに対する
SELECT、REPLICATION SLAVE、およびREPLICATION CLIENT権限が必要です。
次のコマンドを実行してアカウントに権限を付与するか、
SUPER権限を直接付与できます。次の文では、'sync_account'を作成したアカウントに置き換えてください。-- CREATE USER 'sync_account'@'%' IDENTIFIED BY 'password'; //同期アカウントを作成し、パスワードを設定し、任意のホストからデータベースにログインできるようにします。パーセント記号 (%) は任意のホストを示します。 GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'sync_account'@'%'; // データベースに対する SELECT、REPLICATION SLAVE、および REPLICATION CLIENT 権限を同期アカウントに付与します。*.*は、すべてデータベースのすべてのテーブルに対する指定された権限をアカウントに付与します。ターゲットデータベースの特定のテーブルに権限を付与することもできます。たとえば、test データベースの user テーブルに `SELECT` および `REPLICATION CLIENT` 権限を付与するには、次の文を実行します:GRANT SELECT, REPLICATION CLIENT ON test.user TO 'sync_account'@'%';。説明REPLICATION SLAVEはグローバル権限です。この権限をターゲットデータベースの特定のテーブルに付与することはできません。
MySQL バイナリロギングを有効にする (リアルタイム同期のみ)
Data Integration は、MySQL バイナリログをサブスクライブすることで、増分データのリアルタイム同期を実行します。DataWorks で同期タスクを構成する前に、MySQL バイナリロギングサービスを有効にする必要があります。手順は次のとおりです:
バイナリログが消費されている場合、データベースはそれを削除できません。リアルタイム同期タスクで高いレイテンシーが発生すると、ソースデータベース上のバイナリログが長時間保持される可能性があります。タスクにレイテンシーアラートを構成し、データベースのディスク領域を監視してください。
バイナリログは少なくとも 72 時間保持する必要があります。これにより、タスクが失敗し、バイナリログが削除されたために以前のオフセットにリセットできない場合のデータ損失を防ぎます。この場合、データをバックフィルするために完全なオフライン同期を実行する必要があります。
バイナリロギングが有効になっているかどうかを確認します。
次の文を実行して、バイナリロギングが有効になっているかどうかを確認します。
SHOW variables LIKE "log_bin";コマンドが ON を返した場合、バイナリロギングは有効になっています。
セカンダリデータベースからデータを同期する場合、次の文を実行してバイナリロギングが有効になっているかどうかを確認することもできます。
SHOW variables LIKE "log_slave_updates";コマンドが ON を返した場合、セカンダリデータベースでバイナリロギングが有効になっています。
結果が一致しない場合:
オープンソースの MySQL の場合、MySQL の公式ドキュメントを参照してバイナリロギングを有効にしてください。
ApsaraDB RDS for MySQL の場合、RDS for MySQL のログバックアップを参照してバイナリロギングを有効にしてください。
PolarDB for MySQL の場合、バイナリロギングを有効にするを参照してバイナリロギングを有効にしてください。
バイナリログのフォーマットを確認します。
次の文を実行して、バイナリログのフォーマットを確認します。
SHOW variables LIKE "binlog_format";考えられる戻り値は次のとおりです:
ROW: バイナリログのフォーマットは ROW です。
STATEMENT: バイナリログのフォーマットは STATEMENT です。
MIXED: バイナリログのフォーマットは MIXED です。
重要DataWorks のリアルタイム同期は、MySQL サーバーのバイナリロギングに対して ROW フォーマットのみをサポートします。コマンドが ROW 以外の値を返す場合は、バイナリログのフォーマットを変更する必要があります。
フル行イメージロギングが有効になっているかどうかを確認します。
次の文を実行して、フル行イメージロギングが有効になっているかどうかを確認します。
SHOW variables LIKE "binlog_row_image";考えられる戻り値は次のとおりです:
FULL: バイナリログに対してフル行イメージロギングが有効になっています。
MINIMAL: バイナリログに対して最小行イメージロギングが有効になっています。フル行イメージロギングは有効になっていません。
重要DataWorks のリアルタイム同期は、フル行イメージロギングが有効になっている MySQL サーバーのみをサポートします。コマンドが FULL 以外の値を返す場合は、binlog_row_image 構成を変更する必要があります。
OSS からのバイナリログ読み取りの権限付与を構成する
MySQL データソースを追加する際に、[構成モード] を [Alibaba Cloud インスタンスモード] に設定し、RDS for MySQL インスタンスが DataWorks ワークスペースと同じリージョンにある場合、[OSS からのバイナリログ読み取りを有効にする] を有効にできます。この機能を有効にすると、DataWorks が RDS バイナリログにアクセスできない場合、OSS からバイナリログを取得しようとします。これにより、リアルタイム同期タスクが中断されるのを防ぎます。
[OSS からバイナリログにアクセスするための ID] を [Alibaba Cloud RAM ユーザー] または [Alibaba Cloud RAM ロール] に設定した場合、以下で説明するようにアカウント権限も構成する必要があります。
Alibaba Cloud RAM ユーザー
RAM コンソールにログインし、権限を付与したい RAM ユーザーを見つけます。
[アクション] 列で、[権限の追加] をクリックします。
以下の主要なパラメーターを構成し、[権限を付与] をクリックします。
[リソース範囲] をアカウントレベルに設定します。
[ポリシー] をシステムポリシーに設定します。
[ポリシー名] を
AliyunDataWorksAccessingRdsOSSBinlogPolicyに設定します。

Alibaba Cloud RAM ロール
RAM コンソールにログインし、RAM ロールを作成します。詳細については、「信頼できる Alibaba Cloud アカウントの RAM ロールを作成する」をご参照ください。
主要なパラメーター:
[プリンシパルタイプ] を Alibaba Cloud アカウントに設定します。
[プリンシパル名] を他の Alibaba Cloud アカウントに設定し、DataWorks ワークスペースを所有する Alibaba Cloud アカウントの ID を入力します。
[ロール名] をカスタム名に設定します。
作成した RAM ロールに権限を付与します。詳細については、「RAM ロールに権限を付与する」をご参照ください。
主要なパラメーター:
[ポリシー] をシステムポリシーに設定します。
[ポリシー名] を
AliyunDataWorksAccessingRdsOSSBinlogPolicyに設定します。
作成した RAM ロールの信頼ポリシーを変更します。詳細については、「RAM ロールの信頼ポリシーを変更する」をご参照ください。
{ "Statement": [ { "Action": "sts:AssumeRole", "Effect": "Allow", "Principal": { "Service": [ "<DataWorks ユーザーの Alibaba Cloud アカウント ID>@di.dataworks.aliyuncs.com", "<DataWorks ユーザーの Alibaba Cloud アカウント ID>@dataworks.aliyuncs.com" ] } } ], "Version": "1" }
データソースの追加
Alibaba Cloud インスタンスモード
MySQL データベースが Alibaba Cloud RDS インスタンスである場合、Alibaba Cloud インスタンスモードでデータソースを作成できます。次の表にパラメーターを示します:
パラメーター | 説明 |
データソース名 | データソースの名前。名前はワークスペース内で一意である必要があります。ビジネスと環境を識別する名前を使用してください。例: |
構成モード | Alibaba Cloud インスタンスモードを選択します。詳細については、「シナリオ 1: インスタンスモード (現在の Alibaba Cloud アカウント)」および「シナリオ 2: インスタンスモード (他の Alibaba Cloud アカウント)」をご参照ください。 |
Alibaba Cloud アカウント | インスタンスが属するアカウントを選択します。[他の Alibaba Cloud アカウント] を選択した場合、クロスアカウント権限を構成する必要があります。詳細については、「クロスアカウント権限付与 (RDS、Hive、または Kafka)」をご参照ください。 他の Alibaba Cloud アカウントを選択した場合、次の情報を指定する必要があります:
|
リージョン | インスタンスが配置されているリージョン。 |
インスタンス | 接続するインスタンスの名前。 |
セカンダリインスタンスの構成 | データソースに 読み取り専用インスタンス (読み取り専用レプリカ) がある場合、タスクを構成する際に読み取り操作に読み取り専用レプリカを選択できます。これにより、プライマリデータベースへの干渉を防ぎ、そのパフォーマンスに影響を与えません。 |
インスタンスアドレス | 正しいインスタンスを選択した後、[最新のエンドポイントを取得] をクリックして、インスタンスのパブリックまたはプライベートエンドポイント、VPC、および vSwitch を表示します。 |
データベース | データソースがアクセスする必要のあるデータベースの名前。指定されたユーザーがデータベースにアクセスする権限を持っていることを確認してください。 |
ユーザー名/パスワード | MySQL データベースアカウントのユーザー名とパスワード。RDS インスタンスを使用する場合、インスタンスの [アカウント管理] セクションでアカウントを作成および管理できます。 |
OSS からのバイナリログ読み取りを有効にする | この機能を有効にすると、DataWorks が RDS バイナリログにアクセスできない場合、OSS からバイナリログを取得しようとします。これにより、リアルタイム同期タスクが中断されるのを防ぎます。構成の詳細については、「OSS からのバイナリログ読み取りの権限付与を構成する」をご参照ください。次に、権限付与構成に基づいて [OSS バイナリログアクセス ID] を設定します。 |
認証方法 | [認証なし] または [SSL 認証] を選択できます。[SSL 認証] を選択した場合、インスタンス自体で SSL 認証が有効になっている必要があります。証明書ファイルを準備し、[証明書ファイル管理] にアップロードします。 |
バージョン | MySQL サーバーにログインし、`SELECT VERSION()` コマンドを実行してバージョン番号を表示できます。 |
接続文字列モード
より柔軟性を高めるために、接続文字列モードでデータソースを作成することもできます。次の表にパラメーターを示します:
パラメーター | 説明 |
データソース名 | データソースの名前。名前はワークスペース内で一意である必要があります。ビジネスと環境を識別する名前を使用してください。例: |
構成モード | [接続文字列モード] を選択します。このモードは JDBC URL を使用してデータベースに接続します。 |
JDBC 接続文字列プレビュー | エンドポイントとデータベース名を入力すると、DataWorks はそれらを自動的に連結して JDBC URL をプレビュー用に作成します。 |
接続アドレス | ホスト IP: データベースホストの IP アドレスまたはドメイン名。データベースが Alibaba Cloud RDS インスタンスの場合、インスタンスの製品ページでエンドポイントを表示できます。 ポート: データベースポート。デフォルト値は 3306 です。 |
データベース名 | データソースがアクセスする必要のあるデータベースの名前。指定されたユーザーがデータベースにアクセスする権限を持っていることを確認してください。 |
ユーザー名/パスワード | MySQL データベースアカウントのユーザー名とパスワード。RDS インスタンスを使用する場合、インスタンスの [アカウント管理] セクションでアカウントを作成および管理できます。 |
バージョン | MySQL サーバーにログインし、`SELECT VERSION()` コマンドを実行してバージョン番号を表示できます。 |
認証方法 | [認証なし] または [SSL 認証] を選択できます。[SSL 認証] を選択した場合、インスタンス自体で SSL 認証が有効になっている必要があります。証明書ファイルを準備し、[認証ファイル管理] にアップロードします。 |
詳細パラメーター | パラメーター: パラメーターのドロップダウンリストをクリックし、connectTimeout などのサポートされているパラメーター名を選択します。 値: 3000 などの選択したパラメーターの値を入力します。 URL は自動的に jdbc:mysql://192.168.90.28:3306/test?connectTimeout=50000 に更新されます。 |
リソースグループがデータソースに接続できることを確認する必要があります。そうしないと、後続のタスクは実行できません。必要なネットワーク構成は、データソースのネットワーク環境と接続モードによって異なります。詳細については、「接続性をテストする」をご参照ください。
データ同期タスクの開発: MySQL 同期プロセスガイド
同期タスクの構成のエントリポイントと手順については、以下の構成ガイドをご参照ください。
単一テーブルのオフライン同期タスクの構成ガイド
手順については、「コードレス UI でオフライン同期タスクを構成する」および「コードエディタでオフライン同期タスクを構成する」をご参照ください。
コードエディタのパラメーターの完全なリストとスクリプトの例については、「付録: MySQL スクリプトの例とパラメーターの説明」をご参照ください。
単一テーブルのリアルタイム同期タスクの構成ガイド
手順については、「DataStudio でリアルタイム同期タスクを構成する」をご参照ください。
データベースレベルの同期タスクの構成ガイド: オフライン、完全および増分 (リアルタイム)、およびシャード化 (リアルタイム)
手順については、「Data Integration で同期タスクを構成する」をご参照ください。
よくある質問
Data Integration の他の一般的な問題に関する詳細については、「Data Integration に関するよくある質問」をご参照ください。
付録: MySQL スクリプトの例とパラメーターの説明
コードエディタを使用してバッチ同期タスクを構成する
コードエディタを使用してバッチ同期タスクを構成する場合、統一されたスクリプト形式の要件に基づいて、スクリプト内の関連パラメーターを構成する必要があります。詳細については、「コードエディタでタスクを構成する」をご参照ください。以下の情報は、コードエディタを使用してバッチ同期タスクを構成する際に、データソースに対して構成する必要があるパラメーターについて説明しています。
Reader スクリプトの例
このセクションでは、単一データベース内の単一テーブルとシャード化されたテーブルの構成例を示します:
以下の JSON の例のコメントは、デモンストレーションのみを目的としています。スクリプトを構成する際には、コメントを削除する必要があります。
単一データベース内の単一テーブル
{ "type": "job", "version": "2.0",// バージョン番号。 "steps": [ { "stepType": "mysql",// プラグイン名。 "parameter": { "column": [// 列名。 "id" ], "connection": [ { "querySql": [ "select a,b from join1 c join join2 d on c.id = d.id;" ], "datasource": ""// データソース名。 } ], "where": "",// フィルター条件。 "splitPk": "",// シャードキー。 "encoding": "UTF-8"// エンコード形式。 }, "name": "Reader", "category": "reader" }, { "stepType": "stream", "parameter": {}, "name": "Writer", "category": "writer" } ], "setting": { "errorLimit": { "record": "0"// 許可されるエラーレコードの数。 }, "speed": { "throttle": true,// throttle が false に設定されている場合、mbps パラメーターは効果がなく、データ転送レートは制限されません。throttle が true に設定されている場合、データ転送レートは制限されます。 "concurrent": 1,// 同時ジョブの数。 "mbps": "12"// 最大データ転送レート。1 mbps = 1 MB/s。 } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }シャード化されたテーブル
説明シャーディングにより、MySQL Reader は同じスキーマを持つ複数の MySQL テーブルを選択できます。この文脈では、シャーディングとは、複数のソース MySQL テーブルから単一の宛先テーブルにデータを書き込むことを意味します。データベースレベルのシャーディングを構成するには、Data Integration でタスクを作成し、データベースレベルのシャーディング機能を選択できます。
{ "type": "job", "version": "2.0", "steps": [ { "stepType": "mysql", "parameter": { "indexes": [ { "type": "unique", "column": [ "id" ] } ], "envType": 0, "useSpecialSecret": false, "column": [ "id", "buyer_name", "seller_name", "item_id", "city", "zone" ], "tableComment": "Test order table", "connection": [ { "datasource": "rds_dataservice", "table": [ "rds_table" ] }, { "datasource": "rds_workshop_log", "table": [ "rds_table" ] } ], "where": "", "splitPk": "id", "encoding": "UTF-8" }, "name": "Reader", "category": "reader" }, { "stepType": "odps", "parameter": {}, "name": "Writer", "category": "writer" }, { "name": "Processor", "stepType": null, "category": "processor", "copies": 1, "parameter": { "nodes": [], "edges": [], "groups": [], "version": "2.0" } } ], "setting": { "executeMode": null, "errorLimit": { "record": "" }, "speed": { "concurrent": 2, "throttle": false } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }
Reader スクリプトパラメーター
パラメーター | 説明 | 必須 | デフォルト値 |
datasource | データソースの名前。コードエディタはデータソースの追加をサポートしています。このパラメーターの値は、追加されたデータソースの名前と同じでなければなりません。 | はい | なし |
table | データを同期するテーブルの名前。データ統合タスクは、1 つのテーブルからのみデータを読み取ることができます。 table パラメーターの高度な使用法として、範囲を構成する例を以下に示します:
説明 タスクは一致するすべてのテーブルを読み取ります。具体的には、これらのテーブルの column パラメーターで指定された列を読み取ります。テーブルまたは列が存在しない場合、タスクは失敗します。 | はい | なし |
column | 構成されたテーブルから同期する列名のコレクション。JSON 配列を使用してフィールド情報を記述します。デフォルトでは、すべての列が構成されます。例: [*]。
| はい | なし |
splitPk | MySQL Reader がデータを抽出する際に、splitPk パラメーターを指定すると、splitPk が表すフィールドに基づいてデータがシャード化されます。これにより、データ同期が同時タスクとして実行され、同期効率が向上します。
| いいえ | なし |
splitFactor | シャーディング係数。データ同期のシャード数を構成できます。複数の同時スレッドを構成した場合、データは `同時スレッド数 × splitFactor` の部分にシャード化されます。たとえば、同時スレッド数が 5 で splitFactor が 5 の場合、データは 5 × 5 = 25 の部分にシャード化され、5 つの同時スレッドによって処理されます。 説明 推奨される値の範囲は 1 から 100 です。値が大きすぎると、メモリ不足 (OOM) エラーが発生する可能性があります。 | いいえ | 5 |
where | フィルター条件。多くのビジネスシナリオでは、当日のデータのみを同期したい場合があります。where 条件を
| いいえ | なし |
querySql (高度なモード。このパラメーターはコードレス UI ではサポートされていません。) | 一部のビジネスシナリオでは、`where` パラメーターではフィルター条件を十分に記述できない場合があります。このパラメーターを使用して、カスタムのフィルター SQL 文を定義できます。このパラメーターを構成すると、システムは `tables`、`columns`、および `splitPk` パラメーターを無視し、このパラメーターの内容を使用してデータをフィルター処理します。たとえば、複数テーブルの結合後にデータを同期するには、 説明 querySql パラメーターは大文字と小文字を区別します。たとえば、querysql と書くと、効果がありません。 | いいえ | なし |
useSpecialSecret | 複数のソースデータソースがある場合、各データソースのパスワードを使用するかどうかを指定します。有効な値:
複数のソースデータソースを構成し、各データソースが異なるユーザー名とパスワードを使用している場合、このパラメーターを true に設定して、各データソースのパスワードを使用できます。 | いいえ | false |
Writer スクリプトの例
{
"type": "job",
"version": "2.0",// バージョン番号。
"steps": [
{
"stepType": "stream",
"parameter": {},
"name": "Reader",
"category": "reader"
},
{
"stepType": "mysql",// プラグイン名。
"parameter": {
"postSql": [],// インポート後に実行される SQL 文。
"datasource": "",// データソース。
"column": [// 列名。
"id",
"value"
],
"writeMode": "insert",// 書き込みモード。insert、replace、または update に設定できます。
"batchSize": 1024,// 1 つのバッチで送信されるレコードの数。
"table": "",// テーブル名。
"nullMode": "skipNull",// null 値を処理するためのポリシー。
"skipNullColumn": [// null 値をスキップする列。
"id",
"value"
],
"preSql": [
"delete from XXX;"// インポート前に実行される SQL 文。
]
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {// 許可されるエラーレコードの数。
"record": "0"
},
"speed": {
"throttle": true,// throttle が false に設定されている場合、mbps パラメーターは効果がなく、データ転送レートは制限されません。throttle が true に設定されている場合、データ転送レートは制限されます。
"concurrent": 1,// 同時ジョブの数。
"mbps": "12"// 最大データ転送レート。これにより、上流/下流のデータベースへの過剰な読み取り/書き込み圧力を防ぎます。1 mbps = 1 MB/s。
}
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}Writer スクリプトパラメーター
パラメーター | 説明 | 必須 | デフォルト値 |
datasource | データソースの名前。コードエディタはデータソースの追加をサポートしています。このパラメーターの値は、追加されたデータソースの名前と同じでなければなりません。 | はい | なし |
table | データを同期するテーブルの名前。 | はい | なし |
writeMode | インポートモード。INSERT INTO、ON DUPLICATE KEY UPDATE、または REPLACE INTO を選択できます。
| いいえ | insert |
nullMode | null 値を処理するためのポリシー。有効な値:
重要 このパラメーターを skipNull に設定すると、タスクは宛先のデフォルト値をサポートするためにデータを書き込むための SQL 文を動的に生成します。これにより、FLUSH 操作の数が増加し、同期速度が低下します。最悪の場合、各データレコードに対して FLUSH 操作が実行されます。 | いいえ | writeNull |
skipNullColumn | nullMode が skipNull に設定されている場合、このパラメーターに構成された列は強制的に フォーマット: | いいえ | デフォルトでは、タスクに構成されたすべての列。 |
column | データが書き込まれる宛先テーブルのフィールド。フィールドをカンマ (,) で区切ります。例: | はい | なし |
preSql | データ同期タスクが開始される前に実行される SQL 文。コードレス UI では、1 つの SQL 文のみを実行できます。コードエディタでは、複数の SQL 文を実行できます。たとえば、実行前にテーブルから古いデータをクリアできます: `TRUNCATE TABLE tablename`。 説明 複数の SQL 文に対するトランザクションはサポートされていません。 | いいえ | なし |
postSql | データ同期タスクが完了した後に実行される SQL 文。コードレス UI では、1 つの SQL 文のみを実行できます。コードエディタでは、複数の SQL 文を実行できます。たとえば、タイムスタンプを追加できます: 説明 複数の SQL 文に対するトランザクションはサポートされていません。 | いいえ | なし |
batchSize | 1 つのバッチで送信されるレコードの数。値を大きくすると、データ同期システムと MySQL 間のネットワークインタラクションが大幅に減少し、全体のスループットが向上します。この値が高すぎると、データ同期プロセスでメモリ不足 (OOM) エラーが発生する可能性があります。 | いいえ | 256 |
updateColumn | writeMode が update に設定されている場合、これらはプライマリキーまたは一意なインデックスの競合が発生したときに更新されるフィールドです。フィールドをカンマ (,) で区切ります。例: | いいえ | なし |