すべてのプロダクト
Search
ドキュメントセンター

DataWorks:MySQL

最終更新日:Apr 21, 2026

MySQL データソースを使用すると、MySQL からのデータの読み取りと MySQL へのデータの書き込みが可能になります。このトピックでは、DataWorks データ統合の MySQL データ同期機能について説明します。

サポートされている MySQL のバージョン

  • バッチ同期

    MySQL 5.5.x、5.6.x、5.7.x、および 8.0.x をサポートし、Amazon RDS for MySQLAzure Database for MySQL、および Amazon Aurora MySQL と連携します。

    ビューからのデータ読み取りをサポートします。

  • リアルタイム同期

    データ統合は、リアルタイムサブスクリプションを使用して、MySQL 5.5.x、5.6.x、5.7.x、および 8.0.x のリアルタイム同期をサポートします。また、Amazon RDS for MySQLAzure Database for MySQL、および Amazon Aurora MySQL とも連携します。ただし、関数インデックスなど、MySQL 8.0.x で導入された機能はサポートしていません。

    重要

    DRDS データベースからデータを同期する必要がある場合は、MySQL データソースとして設定しないでください。代わりに、DRDS データソースとして直接設定してください。手順については、「DRDS データソースの設定」をご参照ください。

制限事項

リアルタイム同期

  • バージョン 5.6.x より前の MySQL 読み取り専用インスタンスからのデータ同期はサポートされていません。

  • 関数インデックスを含むテーブルの同期はサポートされていません。

  • XA ROLLBACK はサポートされていません。

    リアルタイム同期は、XA PREPARE 状態のデータを送信先に書き込みますが、XA ROLLBACK が発生した場合、準備されたデータはロールバックされません。このシナリオに対処するには、影響を受けるテーブルをリアルタイム同期タスクから手動で削除し、再度追加して同期を再開する必要があります。

  • MySQL サーバーのバイナリログ (binlog) の設定では、ROW フォーマットを使用する必要があります。

  • リアルタイム同期では、カスケード削除操作によって削除された関連テーブルのレコードは同期されません。

  • Amazon Aurora MySQL データベースの場合、AWS は Aurora MySQL 読み取り専用レプリカで binlog を有効にすることを許可していないため、プライマリ/ライターデータベースに接続する必要があります。リアルタイム同期タスクでは、増分更新を実行するために binlog が必要です。

  • オンライン DDL 変更の場合、リアルタイム同期は、Data Management (DMS) を使用して MySQL テーブルに列を追加 (Add Column) することにのみ対応しています。

  • MySQL からのストアドプロシージャの読み取りはサポートされていません。

オフライン同期

  • MySQL Reader プラグインを使用して複数のテーブルを同期する場合 (たとえば、データベースシャーディングのシナリオ)、同時実行タスクの数がテーブルの数を超えた場合にのみ、システムはテーブルを分割して並列処理を行います。それ以外の場合、システムはテーブルごとに 1 つのタスクを作成します。

  • MySQL からのストアドプロシージャの読み取りはサポートされていません。

サポートされているデータ型

すべての MySQL バージョンのデータ型の完全なリストについては、MySQL の公式ドキュメントをご参照ください。次の表は、MySQL 8.0.x を例として、主要なデータ型のサポート状況を示しています。

オフライン読み取り

オフライン書き込み

リアルタイム読み取り

リアルタイム書き込み

TINYINT

サポート

サポート

サポート

サポート

SMALLINT

サポート

サポート

サポート

サポート

INTEGER

サポート

サポート

サポート

サポート

BIGINT

サポート

サポート

サポート

サポート

FLOAT

サポート

サポート

サポート

サポート

DOUBLE

サポート

サポート

サポート

サポート

DECIMAL/NUMERIC

サポート

サポート

サポート

サポート

REAL

非サポート

非サポート

非サポート

非サポート

VARCHAR

サポート

サポート

サポート

サポート

JSON

サポート

サポート

サポート

サポート

TEXT

サポート

サポート

サポート

サポート

MEDIUMTEXT

サポート

サポート

サポート

サポート

LONGTEXT

サポート

サポート

サポート

サポート

VARBINARY

サポート

サポート

サポート

サポート

BINARY

サポート

サポート

サポート

サポート

TINYBLOB

サポート

サポート

サポート

サポート

MEDIUMBLOB

サポート

サポート

サポート

サポート

LONGBLOB

サポート

サポート

サポート

サポート

ENUM

サポート

サポート

サポート

サポート

SET

サポート

サポート

サポート

サポート

BOOLEAN

サポート

サポート

サポート

サポート

BIT

サポート

サポート

サポート

サポート

DATE

サポート

サポート

サポート

サポート

DATETIME

サポート

サポート

サポート

サポート

TIMESTAMP

サポート

サポート

サポート

サポート

TIME

サポート

サポート

サポート

サポート

YEAR

サポート

サポート

サポート

サポート

LINESTRING

非サポート

非サポート

非サポート

非サポート

POLYGON

非サポート

非サポート

非サポート

非サポート

MULTIPOINT

非サポート

非サポート

非サポート

非サポート

MULTILINESTRING

非サポート

非サポート

非サポート

非サポート

MULTIPOLYGON

非サポート

非サポート

非サポート

非サポート

GEOMETRYCOLLECTION

非サポート

非サポート

非サポート

非サポート

前提条件

DataWorks で MySQL データソースを追加する前に、このトピックの説明に従って MySQL 環境を準備し、同期タスクが正しく実行されるようにしてください。

MySQL バージョンの確認

データ統合は特定の MySQL バージョンをサポートしています。詳細については、「サポートされている MySQL のバージョン」セクションを参照して、ご利用のバージョンがサポートされていることを確認してください。次のステートメントを実行して、MySQL データベースのバージョンを確認します。

SELECT version();

アカウント権限の設定

DataWorks がデータソースにアクセスするために、専用の MySQL アカウントを作成することを推奨します。

  1. 任意:アカウントの作成

    詳細については、「MySQL アカウントの作成」をご参照ください。

  2. 権限の設定

    • バッチ同期

      • MySQL からデータを読み取るには、アカウントに同期対象テーブルに対する SELECT 権限が必要です。

      • MySQL にデータを書き込むには、アカウントに同期対象テーブルに対する INSERTDELETE、および UPDATE 権限が必要です。

    • リアルタイム同期

      アカウントには、データベースに対する SELECTREPLICATION SLAVE、および REPLICATION CLIENT 権限が必要です。

    次のコマンドを実行して、必要な権限を付与します。または、アカウントに SUPER 権限を付与することもできます。次のステートメントでは、'sync_account' をご利用の同期アカウントに置き換えてください。

    -- CREATE USER 'sync_account'@'%' IDENTIFIED BY ''; // 同期アカウントを作成し、パスワードを設定して、任意のホストからのログインを許可します。'%' ワイルドカードは任意のホストを表します。
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'sync_account'@'%'; // 同期アカウントに SELECT、REPLICATION SLAVE、REPLICATION CLIENT 権限を付与します。

    *.* 構文は、すべてのデータベースにあるすべてのテーブルに、指定された権限を付与します。 test データベースの user テーブルなど、特定のテーブルに権限を付与するには、次の文を実行します: GRANT SELECT, REPLICATION CLIENT ON test.user TO 'sync_account'@'%';

    説明

    REPLICATION SLAVE 権限はグローバル権限であり、特定のデータベースやテーブルに付与することはできません。

MySQL Binlog の有効化 (リアルタイム同期のみ)

データ統合は、MySQL Binlog をサブスクライブすることで、増分データをリアルタイムで同期します。リアルタイム同期タスクを設定する前に、Binlog サービスを有効にする必要があります。

重要
  • データベースは、Binlog ファイルがアクティブに使用されている間は削除できません。リアルタイム同期タスクで高いレイテンシーが発生した場合、ソースの Binlog が長期間保持され、大量のディスク領域を消費する可能性があります。タスクにレイテンシーアラートを設定し、データベースのディスク領域を監視することを推奨します。

  • Binlog ファイルは少なくとも 72 時間保持してください。これにより、タスクが失敗し、必要な Binlog ファイルが消去されたために障害点から再開できない場合のデータ損失を防ぎます。このような場合、完全バッチ同期タスクを使用して、欠落したデータをバックフィルする必要があります。

  1. Binlog が有効になっているか確認します。

    • 次のステートメントを実行して、Binlog が有効になっているか確認します。

      SHOW variables LIKE "log_bin";

      戻り値が ON の場合、Binlog が有効であることを示します。

    • データ同期にスタンバイデータベースを使用している場合は、次のステートメントを実行して Binlog が有効になっているか確認します。

      SHOW variables LIKE "log_slave_updates";

      戻り値が ON の場合、スタンバイデータベースで Binlog が有効であることを示します。

    返された結果が上記の結果と一致しない場合:

  2. Binlog のフォーマットを確認します。

    次のステートメントを実行して、Binlog のフォーマットを確認します。

    SHOW variables LIKE "binlog_format";

    戻り値は次のいずれかになります:

    • ROW:Binlog のフォーマットは ROW です。

    • STATEMENT:Binlog のフォーマットは STATEMENT です。

    • MIXED:Binlog のフォーマットは MIXED です。

    重要

    DataWorks のリアルタイム同期は、ROW Binlog フォーマットの MySQL サーバーのみをサポートします。戻り値が ROW でない場合は、Binlog のフォーマットを変更する必要があります。

  3. Binlog の完全行ロギングが有効になっているか確認します。

    次のステートメントを実行して、Binlog の完全行ロギングが有効になっているか確認します。

    SHOW variables LIKE "binlog_row_image";

    戻り値は次のいずれかになります:

    • FULL:Binlog の完全行ロギングが有効になっています。

    • MINIMAL:最小行ロギングが有効で、完全行ロギングは有効になっていません。

    重要

    DataWorks のリアルタイム同期は、Binlog の完全行ロギングが有効になっている MySQL サーバーのみをサポートします。戻り値が FULL でない場合は、binlog_row_image の設定を変更する必要があります。

OSS Binlog アクセスの権限付与

MySQL データソースを追加する際、設定モードApsaraDB for RDS に設定されており、RDS for MySQL インスタンスと DataWorks ワークスペースが同じリージョンにある場合、[OSS Binlog 読み取りを有効にする] を有効にできます。このオプションが有効な場合、データ統合は RDS Binlog にアクセスできないときに OSS から Binlog を読み取ろうとします。これにより、リアルタイム同期タスクの中断を防ぐことができます。

[OSS Binlog アクセス ID]Alibaba Cloud RAM User または Alibaba Cloud RAM Role に設定した場合は、次のセクションで説明するように権限付与も設定する必要があります。

  • RAM ユーザー

    1. Resource Access Management コンソールにログインし、[ユーザー] ページに移動します。次に、権限を付与する RAM ユーザーを見つけます。

    2. Operations 列で、[権限の追加] をクリックします。

    3. 次のパラメーターを設定し、[OK] をクリックします。

      • [スコープ]:Alibaba Cloud アカウント。

      • Permission Policy:システムポリシー。

      • Policy NameAliyunDataWorksAccessingRdsOSSBinlogPolicy

      image

  • RAM ロール

    1. Resource Access Management コンソールにログインし、RAM ロールを作成します。詳細については、「信頼できる Alibaba Cloud アカウントの RAM ロールを作成する」をご参照ください。

      主要なパラメーター:

      • [信頼できるエンティティの選択]:Alibaba Cloud アカウント。

      • [アカウントの選択]:他の Alibaba Cloud アカウント。DataWorks ワークスペースを所有する Alibaba Cloud アカウントの ID を入力します。

      • ロール名:カスタム名を入力します。

    2. RAM ロールに権限を付与します。詳細については、「RAM ロールに権限を付与する」をご参照ください。

      主要なパラメーター:

      • Permission Policy:システムポリシー。

      • Policy NameAliyunDataWorksAccessingRdsOSSBinlogPolicy

    3. RAM ロールの信頼ポリシーを変更します。詳細については、「RAM ロールの信頼ポリシーを変更する」をご参照ください。

      {
          "Statement": [
              {
                  "Action": "sts:AssumeRole",
                  "Effect": "Allow",
                  "Principal": {
                      "Service": [
                          "@di.dataworks.aliyuncs.com",

データソースの追加

Alibaba Cloud インスタンスモード

MySQL データベースが Alibaba Cloud RDS インスタンスで実行されている場合は、Alibaba Cloud インスタンスモードでデータソースを作成することを推奨します。次の表にパラメーターを示します。

パラメーター

説明

Data Source Name

データソース名は、ワークスペース内で一意である必要があります。ビジネスと環境を明確に識別できる名前を使用することを推奨します。例:rds_mysql_order_dev

設定モード

Alibaba Cloud インスタンスモードを選択します。設定モードの説明については、「シナリオ 1:インスタンスモード (現在のクラウドアカウント)」および「シナリオ 2:インスタンスモード (他のクラウドアカウント)」をご参照ください。

Alibaba Cloud Account

インスタンスを所有するクラウドアカウントを選択します。 Another Alibaba Cloud Account を選択した場合は、クロスアカウント権限を設定する必要があります。 詳細については、「クロスアカウント権限付与 (RDS、Hive、または Kafka)」をご参照ください。

他のクラウドアカウントを選択した場合は、次の情報を提供する必要があります:

  • 他のクラウドアカウントのプライマリアカウント ID:インスタンスを所有するプライマリアカウントの ID。

  • RAM ロール名:他のクラウドアカウントによって提供される RAM ロール。このロールには、ターゲットインスタンスにアクセスするための権限が必要です。

Region

インスタンスが配置されているリージョン。

Instance

接続するインスタンスを選択します。

Standby library settings

このデータソースに 読み取り専用インスタンス (スタンバイデータベース) がある場合、そこから読み取るようにジョブを設定できます。これにより、プライマリデータベースへのパフォーマンスへの影響を軽減できます。

Instance Address

正しいインスタンスを選択した後、[最新のエンドポイントを取得] をクリックして、インスタンスのパブリックネットワークまたはプライベートネットワークのエンドポイント、VPC、および vSwitch 情報を表示します。

Database

データソースがアクセスするデータベースの名前。指定されたユーザーがこのデータベースにアクセスする権限を持っていることを確認してください。

[ユーザー名/パスワード]

MySQL データベースのユーザー名とパスワード。RDS インスタンスを使用する場合、インスタンスの [アカウント管理] セクションでこれらの認証情報を作成および管理できます。

[OSS binlog 読み取りをサポート]

このオプションが有効な場合、DataWorks は RDS binlog にアクセスできないときに OSS から binlog を読み取ろうとします。これにより、リアルタイム同期ジョブの中断を防ぎます。詳細については、「OSS からの binlog 読み取り権限の設定」をご参照ください。また、権限付与の設定に基づいて [OSS binlog アクセス ID] を設定する必要があります。

Authentication Method

[認証なし] または [SSL 認証] を選択します。[SSL 認証] を選択した場合は、インスタンスで SSL 認証を有効にし、証明書ファイルを Authentication File Management にアップロードします。

Version

MySQL サーバーにログインし、SELECT VERSION() を実行してバージョンを確認します。

接続文字列モード

より柔軟性のある接続文字列モードでデータソースを作成することもできます。このモードのパラメーターを次の表に示します。

パラメーター

説明

Data Source Name

データソース名は、ワークスペース内で一意である必要があります。ビジネスと環境を明確に識別できる名前を使用することを推奨します。例:rds_mysql_order_dev

設定モード

User-created Data Store with Public IP Addresses を選択します。このモードでは、DataWorks は JDBC URL を使用してデータベースに接続します。

[接続文字列プレビュー]

接続エンドポイントとデータベース名を入力すると、DataWorks は対応する JDBC URL を自動的に生成します。

Connection Address

ホスト:データベースサーバーの IP アドレスまたはドメイン名を入力します。データベースが Alibaba Cloud RDS インスタンスの場合、インスタンスの詳細の [データベース接続] ページでエンドポイントを見つけることができます。

ポート:データベースのポート番号。デフォルトは 3306 です。

Database Name

データソースがアクセスするデータベースの名前。指定されたユーザーがこのデータベースにアクセスする権限を持っていることを確認してください。

[ユーザー名/パスワード]

MySQL データベースのユーザー名とパスワード。RDS インスタンスを使用する場合、インスタンスの [アカウント管理] セクションでこれらの認証情報を作成および管理できます。

Version

MySQL サーバーにログインし、SELECT VERSION() を実行してバージョンを確認します。

Authentication Method

[認証なし] または [SSL 認証] を選択します。[SSL 認証] を選択した場合は、インスタンスで SSL 認証を有効にし、証明書ファイルを Authentication File Management にアップロードします。

Advanced Parameters

パラメーター:パラメーターのドロップダウンリストをクリックし、connectTimeout などのサポートされているパラメーター名を選択します。

値:選択したパラメーターの値を入力します。例:3000。

URL は自動的に次のように組み立てられます:jdbc:mysql://192.168.90.28:3306/test?connectTimeout=50000

重要

データソースがネットワーク経由でリソースグループに到達できることを確認してください。そうでない場合、後続のジョブは失敗します。データソースのネットワーク環境と選択した接続モードに基づいてネットワーク接続を設定してください。詳細については、「接続性のテスト」をご参照ください。

MySQL データ同期タスク

同期タスクの設定のエントリポイントと手順については、以下の設定ガイドをご参照ください。

単一テーブルのオフライン同期

単一テーブルのリアルタイム同期

詳細な手順については、「リアルタイム同期タスクの設定 (レガシー)」をご参照ください。

データベース全体の同期

詳細な手順については、「データベース全体のリアルタイム同期タスクの設定」をご参照ください。

よくある質問

その他の一般的な問題については、「データ統合のよくある質問」をご参照ください。

付録:MySQL スクリプトのデモとパラメーター

コードエディタを使用したバッチ同期タスクの設定

コードエディタを使用してバッチ同期タスクを設定する場合、統一されたスクリプト形式の要件に基づいて、スクリプト内の関連パラメーターを設定する必要があります。詳細については、「コードエディタの使用」をご参照ください。以下の情報は、コードエディタを使用してバッチ同期タスクを設定する際に、データソースに対して設定する必要があるパラメーターについて説明しています。

Reader スクリプトのデモ

このトピックでは、1 つのデータベース内の単一テーブルとシャーディングの設定例を示します:

説明

以下の JSON 例のコメントはデモンストレーション用であり、スクリプトを実行する前に削除する必要があります。

  • 単一データベース内の単一テーブル

    {
      "type": "job",
      "version": "2.0",//バージョン番号。
      "steps": [
        {
          "stepType": "mysql",//プラグイン名。
          "parameter": {
            "column": [//列名。
              "id"
            ],
            "connection": [
              {
                "querySql": [
                  "select a,b from join1 c join join2 d on c.id = d.id;"
                ],
                "datasource": ""//データソース名。
              }
            ],
            "where": "",//フィルター条件。
            "splitPk": "",//シャードキー。
            "encoding": "UTF-8"//エンコーディング形式。
          },
          "name": "Reader",
          "category": "reader"
        },
        {
          "stepType": "stream",
          "parameter": {},
          "name": "Writer",
          "category": "writer"
        }
      ],
      "setting": {
        "errorLimit": {
          "record": "0"//エラーレコード数。
        },
        "speed": {
          "throttle": true,//throttle が false に設定されている場合、mbps パラメーターは効果がなく、速度制限は無効になります。throttle が true に設定されている場合、速度制限は有効になります。
          "concurrent": 1,//ジョブの同時実行数。
          "mbps": "12"//速度制限レート。1 mbps = 1 MB/s。
        }
      },
      "order": {
        "hops": [
          {
            "from": "Reader",
            "to": "Writer"
          }
        ]
      }
    }
  • シャーディング

    説明

    MySQL Reader はシャーディングをサポートしており、異なるデータソースにまたがる同じスキーマを持つ複数のテーブルからデータを読み取り、単一の送信先テーブルに書き込むことができます。データベース全体のシャーディングを設定するには、データ統合でタスクを作成し、全データベースシャーディング機能を選択する必要があります。

    {
      "type": "job",
      "version": "2.0",
      "steps": [
        {
          "stepType": "mysql",
          "parameter": {
            "indexes": [
              {
                "type": "unique",
                "column": [
                  "id"
                ]
              }
            ],
            "envType": 0,
            "useSpecialSecret": false,
            "column": [
              "id",
              "buyer_name",
              "seller_name",
              "item_id",
              "city",
              "zone"
            ],
            "tableComment": "テスト注文テーブル",
            "connection": [
              {
                "datasource": "rds_dataservice",
                "table": [
                  "rds_table"
                ]
              },
              {
                "datasource": "rds_workshop_log",
                "table": [
                  "rds_table"
                ]
              }
            ],
            "where": "",
            "splitPk": "id",
            "encoding": "UTF-8"
          },
          "name": "Reader",
          "category": "reader"
        },
        {
          "stepType": "odps",
          "parameter": {},
          "name": "Writer",
          "category": "writer"
        },
        {
          "name": "Processor",
          "stepType": null,
          "category": "processor",
          "copies": 1,
          "parameter": {
            "nodes": [],
            "edges": [],
            "groups": [],
            "version": "2.0"
          }
        }
      ],
      "setting": {
        "executeMode": null,
        "errorLimit": {
          "record": ""
        },
        "speed": {
          "concurrent": 2,
          "throttle": false
        }
      },
      "order": {
        "hops": [
          {
            "from": "Reader",
            "to": "Writer"
          }
        ]
      }
    }

Reader スクリプトのパラメーター

パラメーター

説明

必須

デフォルト

datasource

データソースの名前。この名前は、スクリプトモードで追加したデータソースの名前と一致する必要があります。

はい

なし

table

データの読み取り元となるテーブルの名前。1 つのデータ統合タスクで読み取れるのは、1 つの論理テーブルのみです。

table パラメーターを使用して高度なユースケースの範囲を設定する方法の例を次に示します:

  • 範囲を指定して、シャーディングされたテーブルから読み取ることができます。たとえば、'table_[0-99]''table_0''table_1''table_2' から 'table_99' までのデータを読み取ります。

  • テーブル名に 'table_000''table_001''table_002' から 'table_999' までの同じ長さの数値サフィックスがある場合、範囲を '"table":["table_00[0-9]","table_0[10-99]","table_[100-999]"]' として設定できます。

説明

タスクは、column パラメーターで指定された列を、一致するすべてのテーブルから読み取ります。テーブルまたは指定された列が存在しない場合、タスクは失敗します。

はい

なし

column

テーブルから同期する列を JSON 配列として指定します。すべての列を選択するには、[*] を使用します。

  • 列の選択:エクスポートする列のサブセットを選択できます。

  • 列の並べ替え:テーブルスキーマとは異なる順序で列をエクスポートできます。

  • 定数:MySQL SQL 形式を使用して定数を指定できます。例:["id","table","1","'mingya.wmy'","'null'","to_char(a+1)","2.3","true"]

    • id は通常の列名です。

    • table は予約語でもある列名です。

    • 1 は整数定数です。

    • 'mingya.wmy' は文字列定数です。単一引用符で囲む必要があることに注意してください。

    • null 値の処理:

      • " " は空の文字列を表します。

      • null は null 値を表します。

      • 'null' は文字列 'null' を表します。

    • to_char(a+1) は関数呼び出しまたは式の例です。

    • 2.3 は浮動小数点数です。

    • true はブール値です。

  • 同期する列を明示的に指定する必要があります。column パラメーターは空にできません。

はい

なし

splitPk

MySQL Reader がデータを抽出する際に splitPk を指定すると、splitPk で表されるフィールドをデータシャーディングに使用することを示します。これにより、データ統合は同時実行タスクを開始してデータを同期し、データ同期の効率を向上させます。

  • テーブルのプライマリキーを splitPk として使用することを推奨します。プライマリキーの値は通常、均等に分散されているため、シャード間のデータスキューを防ぐのに役立ちます。

  • 現在、splitPk は整数のデータ分割のみをサポートしており、文字列、浮動小数点数、日付などの他の型はサポートしていません。サポートされていない型を指定した場合、splitPk 機能は無視され、同期はシングルチャネルを使用して実行されます。

  • splitPk パラメーターを指定しない場合、たとえば splitPk パラメーターを提供しないか、splitPk の値を空のままにした場合、データ統合はシングルチャネルを使用してテーブルデータを同期します。

いいえ

なし

splitFactor

シャーディング係数。このパラメーターはデータシャードの数を制御します。シャードの総数は ジョブの同時実行数 × splitFactor として計算されます。たとえば、ジョブの同時実行数が 5 で splitFactor が 5 の場合、データは 25 のシャードに分割され、5 つの並行スレッドで処理されます。

説明

1 から 100 の値を使用することを推奨します。値が大きすぎると、メモリ不足 (OOM) エラーが発生する可能性があります。

いいえ

5

where

フィルター条件。多くのビジネスシナリオでは、where 条件を gmt_create>$bizdate に設定することで、当日のデータのみを同期することがあります。

  • where 条件により、ビジネスデータの効率的な増分同期が可能になります。where ステートメントを指定しない場合、たとえば where キーまたはその値を提供しない場合、データ同期は完全データ同期として扱われます。

  • limit 10 のような条件は指定できません。これは MySQL SQL の WHERE 句の制約に準拠していません。

いいえ

なし

querySql (詳細モード;このパラメーターはコードレス UI ではサポートされていません)

一部のシナリオでは、where パラメーターだけではフィルター条件を記述するのに不十分な場合があります。このパラメーターを使用して、フィルタリング用のカスタム SQL クエリを定義できます。このパラメーターを設定すると、データ同期システムは tablescolumns、および splitPk パラメーターを無視し、このパラメーターの内容を直接使用してデータをフィルタリングします。たとえば、複数テーブル結合後にデータを同期するには、select a,b from table_a join table_b on table_a.id = table_b.id を使用します。querySql を設定すると、MySQL Reader は tablecolumnwhere、および splitPk パラメーターの設定を直接無視します。querySql パラメーターは tablecolumnwhere、および splitPk オプションよりも優先されます。datasource パラメーターは、ユーザー名やパスワードなどの情報を取得するために解析されます。

説明

querySql は大文字と小文字を区別します。たとえば、querysql は効果がありません。

いいえ

なし

useSpecialSecret

複数のソースデータソースが設定されている場合に、各データソースの個別のパスワードを使用するかどうかを指定します。有効な値:

  • true

  • false

異なる認証情報を持つ複数のソースデータソースを設定している場合は、このパラメーターを true に設定して、各個別のデータソースの認証情報を使用します。

いいえ

false

Writer スクリプトのデモ

{
  "type": "job",
  "version": "2.0",//バージョン番号。
  "steps": [
    {
      "stepType": "stream",
      "parameter": {},
      "name": "Reader",
      "category": "reader"
    },
    {
      "stepType": "mysql",//プラグイン名。
      "parameter": {
        "postSql": [],//実行後 SQL ステートメント。
        "datasource": "",//データソース。
        "column": [//列名。
          "id",
          "value"
        ],
        "writeMode": "insert",//書き込みモード。insert、replace、または update に設定できます。
        "batchSize": 1024,//バッチサイズ。
        "table": "",//テーブル名。
        "nullMode": "skipNull",//NULL 値の処理ポリシー。
        "skipNullColumn": [//NULL 値がスキップされる列。
          "id",
          "value"
        ],
        "preSql": [
          "delete from XXX;"//実行前 SQL ステートメント。
        ]
      },
      "name": "Writer",
      "category": "writer"
    }
  ],
  "setting": {
    "errorLimit": {//エラーレコード数。
      "record": "0"
    },
    "speed": {
      "throttle": true,//throttle が false に設定されている場合、mbps パラメーターは効果がなく、速度制限は無効になります。throttle が true に設定されている場合、速度制限は有効になります。
      "concurrent": 1,//ジョブの同時実行数。
      "mbps": "12"//速度制限レート。ソース/送信先データベースへの過剰な読み取り/書き込み圧力を防ぐために、最大同期速度を制御します。1 mbps = 1 MB/s。
    }
  },
  "order": {
    "hops": [
      {
        "from": "Reader",
        "to": "Writer"
      }
    ]
  }
}

Writer スクリプトのパラメーター

パラメーター

説明

必須

デフォルト

datasource

データソースの名前。この名前は、スクリプトモードで追加したデータソースの名前と一致する必要があります。

はい

なし

table

データが書き込まれる送信先テーブルの名前。

はい

なし

writeMode

書き込みモード。このパラメーターは INSERT INTOON DUPLICATE KEY UPDATE、および REPLACE INTO の動作をサポートします。

  • insert:このモードは INSERT INTO ステートメントに対応します。プライマリキーまたは一意なインデックスの競合が発生した場合、タスクは競合する行を書き込まず、ダーティデータとして扱います。

    スクリプトでこのモードを使用するには、writeModeinsert に設定します。

  • update:このモードは ON DUPLICATE KEY UPDATE ステートメントに対応します。競合が発生しない場合、動作は insert モードと同じです。競合が発生した場合、タスクは既存の行を新しい行の指定されたフィールドの値で更新します。

    スクリプトでこのモードを使用するには、writeModeupdate に設定します。

  • replace:このモードは REPLACE INTO ステートメントに対応します。競合が発生しない場合、動作は insert モードと同じです。競合が発生した場合、タスクは既存の行を削除して新しい行を挿入し、元の行のすべてのフィールドを置き換えます。

    スクリプトでこのモードを使用するには、writeModereplace に設定します。

いいえ

insert

nullMode

NULL 値の処理ポリシー。有効な値:

  • writeNull:ソースフィールドが NULL の場合、送信先フィールドに NULL 値が書き込まれます。

  • skipNull:ソースフィールドに NULL 値が含まれている場合、対応する送信先フィールドには値が書き込まれません。送信先フィールドには、デフォルト値が定義されていればその値が、デフォルト値が定義されていなければ NULL が設定されます。このパラメーターを設定する場合は、skipNullColumn パラメーターも設定する必要があります。

重要

skipNull を設定すると、タスクは送信先のデフォルト値をサポートするために SQL ステートメントを動的に生成します。これにより、FLUSH 操作の数が増加し、同期速度が低下します。最悪の場合、各行に対して FLUSH 操作が実行されます。

いいえ

writeNull

skipNullColumn

nullModeskipNull に設定されている場合、このパラメーターは、NULL 値を強制する代わりに、送信先列のデフォルト値が使用される列を指定します。

フォーマット:["c1","c2",...]。ここで、c1c2column パラメーターで指定された列のサブセットである必要があります。

いいえ

タスクに設定されたすべての列。

column

書き込む送信先列を、フィールドのカンマ区切りリストとして指定します。例:"column":["id","name","age"]。すべての列に順番に書き込むには、アスタリスク (*) を使用します。例:"column":["*"]

はい

なし

preSql

データ同期タスクが開始される前に実行される 1 つ以上の実行前 SQL ステートメント。コードレス UI は単一のステートメントのみをサポートしますが、スクリプトモードは複数をサポートします。たとえば、実行前に TRUNCATE TABLE tablename を使用してテーブルから古いデータをクリアできます。

説明

複数の SQL ステートメントにまたがるトランザクションはサポートされていません。

いいえ

なし

postSql

データ同期タスクが完了した後に実行される 1 つ以上の実行後 SQL ステートメント。コードレス UI は単一のステートメントのみをサポートしますが、スクリプトモードは複数をサポートします。たとえば、ALTER TABLE tablename ADD colname TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP のようなステートメントを使用してタイムスタンプを追加できます。

説明

複数の SQL ステートメントにまたがるトランザクションはサポートされていません。

いいえ

なし

batchSize

1 回のバッチでコミットするレコード数。バッチサイズを大きくすると、MySQL とのネットワークインタラクションが減少し、スループットが向上する可能性があります。ただし、値を高く設定しすぎると、メモリ不足 (OOM) エラーが発生する可能性があります。

いいえ

256

updateColumn

writeModeupdate に設定されている場合、このパラメーターはプライマリキーまたは一意なインデックスの競合が発生したときに更新するフィールドを指定します。フィールドをカンマ区切りリストで指定します。例:"updateColumn":["name","age"]

いいえ

なし