すべてのプロダクト
Search
ドキュメントセンター

DataWorks:MySQL データソース

最終更新日:Nov 25, 2025

MySQL データソースは、MySQL との間で読み書きを行うための双方向チャネルを提供します。このトピックでは、DataWorks の MySQL に対するデータ同期機能について説明します。

サポートされている MySQL のバージョン

  • オフラインでの読み書き:

    MySQL 5.5.x、MySQL 5.6.x、MySQL 5.7.x、および MySQL 8.0.x をサポートします。Amazon RDS for MySQLAzure MySQL、および Amazon Aurora MySQL と互換性があります。

    オフライン同期はビューからのデータ読み取りをサポートします。

  • リアルタイムでの読み取り:

    Data Integration は、MySQL バイナリログをサブスクライブすることで、MySQL データをリアルタイムで読み取ります。リアルタイム同期は、MySQL 5.5.x、5.6.x、5.7.x、および 8.0.x でサポートされています。バージョン 8.0.x については、関数インデックスなどの新機能を除くすべての機能がサポートされています。Amazon RDS for MySQLAzure MySQL、および Amazon Aurora MySQL と互換性があります。

    重要

    DRDS for MySQL データベースからデータを同期するには、MySQL データソースとして構成しないでください。詳細については、「DRDS データソースを構成する」をご参照ください。

制限

リアルタイム読み取り

  • MySQL 読み取り専用インスタンスからのデータ同期はサポートされていません。

  • 関数インデックスを含むテーブルの同期はサポートされていません。

  • XA ROLLBACK はサポートされていません。

    XA PREPARE 状態のトランザクション内のデータについては、リアルタイム同期がデータを宛先に書き込みます。XA ROLLBACK が発生した場合、リアルタイム同期はこのデータをロールバックしません。このシナリオを処理するには、リアルタイム同期タスクからテーブルを手動で削除し、再度追加して同期を再開する必要があります。

  • MySQL サーバーのバイナリロギングでは、ROW フォーマットのみがサポートされています。

  • リアルタイム同期は、カスケード削除操作によって削除された関連テーブルのレコードを同期しません。

  • Amazon Aurora MySQL データベースの場合、プライマリまたはライターデータベースインスタンスに接続する必要があります。これは、AWS が Aurora MySQL の読み取り専用レプリカでバイナリロギング機能を有効にすることを許可していないためです。リアルタイム同期タスクでは、増分更新を実行するためにバイナリログが必要です。

  • オンライン DDL 操作のリアルタイム同期は、Data Management DMS を使用した MySQL テーブルへの列の追加 (Add Column) のみをサポートします。

  • MySQL からのストアドプロシージャの読み取りはサポートされていません。

オフライン読み取り

  • MySQL Reader プラグインがシャード化されたデータベース内の複数のテーブルからデータを同期する場合、デフォルトではシャード化されたタスクの数はテーブルの数と等しくなります。単一のテーブルでシャーディングを有効にするには、同時タスクの数をテーブルの数より大きい値に設定する必要があります。

  • MySQL からのストアドプロシージャの読み取りはサポートされていません。

サポートされているデータの型

各 MySQL バージョンのデータ型の完全なリストについては、MySQL の公式ドキュメントをご参照ください。次の表は、例として MySQL 8.0.x でサポートされている主要なデータ型をリストしています。

データの型

オフライン読み取り (MySQL Reader)

オフライン書き込み (MySQL Writer)

リアルタイム読み取り

リアルタイム書き込み

TINYINT

サポートされています

サポートされています

サポートされています

サポートされています

SMALLINT

サポートされています

サポートされています

サポートされています

サポートされています

INTEGER

サポートされています

サポートされています

サポートされています

サポートされています

BIGINT

サポートされています

サポートされています

サポートされています

サポートされています

FLOAT

サポートされています

サポートされています

サポートされています

サポートされています

DOUBLE

サポートされています

サポートされています

サポートされています

サポートされています

DECIMAL/NUMERIC

サポートされています

サポートされています

サポートされています

サポートされています

REAL

サポートされていません

サポートされていません

サポートされていません

サポートされていません

VARCHAR

サポートされています

サポートされています

サポートされています

サポートされています

JSON

サポートされています

サポートされています

サポートされています

サポートされています

TEXT

サポートされています

サポートされています

サポートされています

サポートされています

MEDIUMTEXT

サポートされています

サポートされています

サポートされています

サポートされています

LONGTEXT

サポートされています

サポートされています

サポートされています

サポートされています

VARBINARY

サポートされています

サポートされています

サポートされています

サポートされています

BINARY

サポートされています

サポートされています

サポートされています

サポートされています

TINYBLOB

サポートされています

サポートされています

サポートされています

サポートされています

MEDIUMBLOB

サポートされています

サポートされています

サポートされています

サポートされています

LONGBLOB

サポートされています

サポートされています

サポートされています

サポートされています

ENUM

サポートされています

サポートされています

サポートされています

サポートされています

SET

サポートされています

サポートされています

サポートされています

サポートされています

BOOLEAN

サポートされています

サポートされています

サポートされています

サポートされています

BIT

サポートされています

サポートされています

サポートされています

サポートされています

DATE

サポートされています

サポートされています

サポートされています

サポートされています

DATETIME

サポートされています

サポートされています

サポートされています

サポートされています

TIMESTAMP

サポートされています

サポートされています

サポートされています

サポートされています

TIME

サポートされています

サポートされています

サポートされています

サポートされています

YEAR

サポートされています

サポートされています

サポートされています

サポートされています

LINESTRING

サポートされていません

サポートされていません

サポートされていません

サポートされていません

POLYGON

サポートされていません

サポートされていません

サポートされていません

サポートされていません

MULTIPOINT

サポートされていません

サポートされていません

サポートされていません

サポートされていません

MULTILINESTRING

サポートされていません

サポートされていません

サポートされていません

サポートされていません

MULTIPOLYGON

サポートされていません

サポートされていません

サポートされていません

サポートされていません

GEOMETRYCOLLECTION

サポートされていません

サポートされていません

サポートされていません

サポートされていません

準備

DataWorks で MySQL データソースを構成する前に、後続のタスクが期待どおりに実行されるように、このセクションで説明する準備を完了してください。

次のセクションでは、MySQL データを同期するために必要な準備について説明します。

MySQL バージョンの確認

Data Integration には MySQL に対する特定のバージョン要件があります。詳細については、「サポートされている MySQL のバージョン」セクションをご参照ください。MySQL データベースで次の文を実行して、現在のバージョンを確認できます。

SELECT version();

アカウント権限の構成

DataWorks がデータソースにアクセスするための専用の MySQL アカウントを作成します。手順は次のとおりです。

  1. オプション: アカウントを作成します。

    詳細については、「MySQL アカウントを作成する」をご参照ください。

  2. 権限を構成します。

    • オフライン

      オフライン同期

      • MySQL からデータを読み取るには、アカウントにターゲットテーブルに対する SELECT 権限が必要です。

      • MySQL にデータを書き込むには、アカウントにターゲットテーブルに対する INSERTDELETE、および UPDATE 権限が必要です。

    • リアルタイム同期

      リアルタイム同期の場合、アカウントにはデータベースに対する SELECTREPLICATION SLAVE、および REPLICATION CLIENT 権限が必要です。

    次のコマンドを実行してアカウントに権限を付与するか、SUPER 権限を直接付与できます。次の文では、'sync_account' を作成したアカウントに置き換えてください。

    -- CREATE USER 'sync_account'@'%' IDENTIFIED BY 'password'; //同期アカウントを作成し、パスワードを設定し、任意のホストからデータベースにログインできるようにします。パーセント記号 (%) は任意のホストを示します。
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'sync_account'@'%'; // データベースに対する SELECT、REPLICATION SLAVE、および REPLICATION CLIENT 権限を同期アカウントに付与します。

    *.* は、すべてデータベースのすべてのテーブルに対する指定された権限をアカウントに付与します。ターゲットデータベースの特定のテーブルに権限を付与することもできます。たとえば、test データベースの user テーブルに `SELECT` および `REPLICATION CLIENT` 権限を付与するには、次の文を実行します: GRANT SELECT, REPLICATION CLIENT ON test.user TO 'sync_account'@'%';

    説明

    REPLICATION SLAVE はグローバル権限です。この権限をターゲットデータベースの特定のテーブルに付与することはできません。

MySQL バイナリロギングを有効にする (リアルタイム同期のみ)

Data Integration は、MySQL バイナリログをサブスクライブすることで、増分データのリアルタイム同期を実行します。DataWorks で同期タスクを構成する前に、MySQL バイナリロギングサービスを有効にする必要があります。手順は次のとおりです:

重要
  • バイナリログが消費されている場合、データベースはそれを削除できません。リアルタイム同期タスクで高いレイテンシーが発生すると、ソースデータベース上のバイナリログが長時間保持される可能性があります。タスクにレイテンシーアラートを構成し、データベースのディスク領域を監視してください。

  • バイナリログは少なくとも 72 時間保持する必要があります。これにより、タスクが失敗し、バイナリログが削除されたために以前のオフセットにリセットできない場合のデータ損失を防ぎます。この場合、データをバックフィルするために完全なオフライン同期を実行する必要があります。

  1. バイナリロギングが有効になっているかどうかを確認します。

    • 次の文を実行して、バイナリロギングが有効になっているかどうかを確認します。

      SHOW variables LIKE "log_bin";

      コマンドが ON を返した場合、バイナリロギングは有効になっています。

    • セカンダリデータベースからデータを同期する場合、次の文を実行してバイナリロギングが有効になっているかどうかを確認することもできます。

      SHOW variables LIKE "log_slave_updates";

      コマンドが ON を返した場合、セカンダリデータベースでバイナリロギングが有効になっています。

    結果が一致しない場合:

  2. バイナリログのフォーマットを確認します。

    次の文を実行して、バイナリログのフォーマットを確認します。

    SHOW variables LIKE "binlog_format";

    考えられる戻り値は次のとおりです:

    • ROW: バイナリログのフォーマットは ROW です。

    • STATEMENT: バイナリログのフォーマットは STATEMENT です。

    • MIXED: バイナリログのフォーマットは MIXED です。

    重要

    DataWorks のリアルタイム同期は、MySQL サーバーのバイナリロギングに対して ROW フォーマットのみをサポートします。コマンドが ROW 以外の値を返す場合は、バイナリログのフォーマットを変更する必要があります。

  3. フル行イメージロギングが有効になっているかどうかを確認します。

    次の文を実行して、フル行イメージロギングが有効になっているかどうかを確認します。

    SHOW variables LIKE "binlog_row_image";

    考えられる戻り値は次のとおりです:

    • FULL: バイナリログに対してフル行イメージロギングが有効になっています。

    • MINIMAL: バイナリログに対して最小行イメージロギングが有効になっています。フル行イメージロギングは有効になっていません。

    重要

    DataWorks のリアルタイム同期は、フル行イメージロギングが有効になっている MySQL サーバーのみをサポートします。コマンドが FULL 以外の値を返す場合は、binlog_row_image 構成を変更する必要があります。

OSS からのバイナリログ読み取りの権限付与を構成する

MySQL データソースを追加する際に、[構成モード][Alibaba Cloud インスタンスモード] に設定し、RDS for MySQL インスタンスが DataWorks ワークスペースと同じリージョンにある場合、[OSS からのバイナリログ読み取りを有効にする] を有効にできます。この機能を有効にすると、DataWorks が RDS バイナリログにアクセスできない場合、OSS からバイナリログを取得しようとします。これにより、リアルタイム同期タスクが中断されるのを防ぎます。

[OSS からバイナリログにアクセスするための ID][Alibaba Cloud RAM ユーザー] または [Alibaba Cloud RAM ロール] に設定した場合、以下で説明するようにアカウント権限も構成する必要があります。

  • Alibaba Cloud RAM ユーザー

    1. RAM コンソールにログインし、権限を付与したい RAM ユーザーを見つけます。

    2. [アクション] 列で、[権限の追加] をクリックします。

    3. 以下の主要なパラメーターを構成し、[権限を付与] をクリックします。

      • [リソース範囲] をアカウントレベルに設定します。

      • [ポリシー] をシステムポリシーに設定します。

      • [ポリシー名]AliyunDataWorksAccessingRdsOSSBinlogPolicy に設定します。

      image

  • Alibaba Cloud RAM ロール

    1. RAM コンソールにログインし、RAM ロールを作成します。詳細については、「信頼できる Alibaba Cloud アカウントの RAM ロールを作成する」をご参照ください。

      主要なパラメーター:

      • [プリンシパルタイプ] を Alibaba Cloud アカウントに設定します。

      • [プリンシパル名] を他の Alibaba Cloud アカウントに設定し、DataWorks ワークスペースを所有する Alibaba Cloud アカウントの ID を入力します。

      • [ロール名] をカスタム名に設定します。

    2. 作成した RAM ロールに権限を付与します。詳細については、「RAM ロールに権限を付与する」をご参照ください。

      主要なパラメーター:

      • [ポリシー] をシステムポリシーに設定します。

      • [ポリシー名]AliyunDataWorksAccessingRdsOSSBinlogPolicy に設定します。

    3. 作成した RAM ロールの信頼ポリシーを変更します。詳細については、「RAM ロールの信頼ポリシーを変更する」をご参照ください。

      {
          "Statement": [
              {
                  "Action": "sts:AssumeRole",
                  "Effect": "Allow",
                  "Principal": {
                      "Service": [
                          "<DataWorks ユーザーの Alibaba Cloud アカウント ID>@di.dataworks.aliyuncs.com",
                          "<DataWorks ユーザーの Alibaba Cloud アカウント ID>@dataworks.aliyuncs.com"
                      ]
                  }
              }
          ],
          "Version": "1"
      }

データソースの追加

Alibaba Cloud インスタンスモード

MySQL データベースが Alibaba Cloud RDS インスタンスである場合、Alibaba Cloud インスタンスモードでデータソースを作成できます。次の表にパラメーターを示します:

パラメーター

説明

データソース名

データソースの名前。名前はワークスペース内で一意である必要があります。ビジネスと環境を識別する名前を使用してください。例: rds_mysql_order_dev

構成モード

Alibaba Cloud インスタンスモードを選択します。詳細については、「シナリオ 1: インスタンスモード (現在の Alibaba Cloud アカウント)」および「シナリオ 2: インスタンスモード (他の Alibaba Cloud アカウント)」をご参照ください。

Alibaba Cloud アカウント

インスタンスが属するアカウントを選択します。[他の Alibaba Cloud アカウント] を選択した場合、クロスアカウント権限を構成する必要があります。詳細については、「クロスアカウント権限付与 (RDS、Hive、または Kafka)」をご参照ください。

他の Alibaba Cloud アカウントを選択した場合、次の情報を指定する必要があります:

  • 他の Alibaba Cloud アカウント ID: インスタンスが属する Alibaba Cloud アカウントの ID。

  • 権限付与のための RAM ロール: 他の Alibaba Cloud アカウントによって提供される RAM ロール。このロールには、宛先インスタンスを使用する権限が必要です。

リージョン

インスタンスが配置されているリージョン。

インスタンス

接続するインスタンスの名前。

セカンダリインスタンスの構成

データソースに 読み取り専用インスタンス (読み取り専用レプリカ) がある場合、タスクを構成する際に読み取り操作に読み取り専用レプリカを選択できます。これにより、プライマリデータベースへの干渉を防ぎ、そのパフォーマンスに影響を与えません。

インスタンスアドレス

正しいインスタンスを選択した後、[最新のエンドポイントを取得] をクリックして、インスタンスのパブリックまたはプライベートエンドポイント、VPC、および vSwitch を表示します。

データベース

データソースがアクセスする必要のあるデータベースの名前。指定されたユーザーがデータベースにアクセスする権限を持っていることを確認してください。

ユーザー名/パスワード

MySQL データベースアカウントのユーザー名とパスワード。RDS インスタンスを使用する場合、インスタンスの [アカウント管理] セクションでアカウントを作成および管理できます。

OSS からのバイナリログ読み取りを有効にする

この機能を有効にすると、DataWorks が RDS バイナリログにアクセスできない場合、OSS からバイナリログを取得しようとします。これにより、リアルタイム同期タスクが中断されるのを防ぎます。構成の詳細については、「OSS からのバイナリログ読み取りの権限付与を構成する」をご参照ください。次に、権限付与構成に基づいて [OSS バイナリログアクセス ID] を設定します。

認証方法

[認証なし] または [SSL 認証] を選択できます。[SSL 認証] を選択した場合、インスタンス自体で SSL 認証が有効になっている必要があります。証明書ファイルを準備し、[証明書ファイル管理] にアップロードします。

バージョン

MySQL サーバーにログインし、`SELECT VERSION()` コマンドを実行してバージョン番号を表示できます。

接続文字列モード

より柔軟性を高めるために、接続文字列モードでデータソースを作成することもできます。次の表にパラメーターを示します:

パラメーター

説明

データソース名

データソースの名前。名前はワークスペース内で一意である必要があります。ビジネスと環境を識別する名前を使用してください。例: rds_mysql_order_dev

構成モード

[接続文字列モード] を選択します。このモードは JDBC URL を使用してデータベースに接続します。

JDBC 接続文字列プレビュー

エンドポイントとデータベース名を入力すると、DataWorks はそれらを自動的に連結して JDBC URL をプレビュー用に作成します。

接続アドレス

ホスト IP: データベースホストの IP アドレスまたはドメイン名。データベースが Alibaba Cloud RDS インスタンスの場合、インスタンスの製品ページでエンドポイントを表示できます。

ポート: データベースポート。デフォルト値は 3306 です。

データベース名

データソースがアクセスする必要のあるデータベースの名前。指定されたユーザーがデータベースにアクセスする権限を持っていることを確認してください。

ユーザー名/パスワード

MySQL データベースアカウントのユーザー名とパスワード。RDS インスタンスを使用する場合、インスタンスの [アカウント管理] セクションでアカウントを作成および管理できます。

バージョン

MySQL サーバーにログインし、`SELECT VERSION()` コマンドを実行してバージョン番号を表示できます。

認証方法

[認証なし] または [SSL 認証] を選択できます。[SSL 認証] を選択した場合、インスタンス自体で SSL 認証が有効になっている必要があります。証明書ファイルを準備し、[認証ファイル管理] にアップロードします。

詳細パラメーター

パラメーター: パラメーターのドロップダウンリストをクリックし、connectTimeout などのサポートされているパラメーター名を選択します。

値: 3000 などの選択したパラメーターの値を入力します。

URL は自動的に jdbc:mysql://192.168.90.28:3306/test?connectTimeout=50000 に更新されます。

重要

リソースグループがデータソースに接続できることを確認する必要があります。そうしないと、後続のタスクは実行できません。必要なネットワーク構成は、データソースのネットワーク環境と接続モードによって異なります。詳細については、「接続性をテストする」をご参照ください。

データ同期タスクの開発: MySQL 同期プロセスガイド

同期タスクの構成のエントリポイントと手順については、以下の構成ガイドをご参照ください。

単一テーブルのオフライン同期タスクの構成ガイド

単一テーブルのリアルタイム同期タスクの構成ガイド

手順については、「DataStudio でリアルタイム同期タスクを構成する」をご参照ください。

データベースレベルの同期タスクの構成ガイド: オフライン、完全および増分 (リアルタイム)、およびシャード化 (リアルタイム)

手順については、「Data Integration で同期タスクを構成する」をご参照ください。

よくある質問

Data Integration の他の一般的な問題に関する詳細については、「Data Integration に関するよくある質問」をご参照ください。

付録: MySQL スクリプトの例とパラメーターの説明

コードエディタを使用してバッチ同期タスクを構成する

コードエディタを使用してバッチ同期タスクを構成する場合、統一されたスクリプト形式の要件に基づいて、スクリプト内の関連パラメーターを構成する必要があります。詳細については、「コードエディタでタスクを構成する」をご参照ください。以下の情報は、コードエディタを使用してバッチ同期タスクを構成する際に、データソースに対して構成する必要があるパラメーターについて説明しています。

Reader スクリプトの例

このセクションでは、単一データベース内の単一テーブルとシャード化されたテーブルの構成例を示します:

説明

以下の JSON の例のコメントは、デモンストレーションのみを目的としています。スクリプトを構成する際には、コメントを削除する必要があります。

  • 単一データベース内の単一テーブル

    {
      "type": "job",
      "version": "2.0",// バージョン番号。
      "steps": [
        {
          "stepType": "mysql",// プラグイン名。
          "parameter": {
            "column": [// 列名。
              "id"
            ],
            "connection": [
              {
                "querySql": [
                  "select a,b from join1 c join join2 d on c.id = d.id;"
                ],
                "datasource": ""// データソース名。
              }
            ],
            "where": "",// フィルター条件。
            "splitPk": "",// シャードキー。
            "encoding": "UTF-8"// エンコード形式。
          },
          "name": "Reader",
          "category": "reader"
        },
        {
          "stepType": "stream",
          "parameter": {},
          "name": "Writer",
          "category": "writer"
        }
      ],
      "setting": {
        "errorLimit": {
          "record": "0"// 許可されるエラーレコードの数。
        },
        "speed": {
          "throttle": true,// throttle が false に設定されている場合、mbps パラメーターは効果がなく、データ転送レートは制限されません。throttle が true に設定されている場合、データ転送レートは制限されます。
          "concurrent": 1,// 同時ジョブの数。
          "mbps": "12"// 最大データ転送レート。1 mbps = 1 MB/s。
        }
      },
      "order": {
        "hops": [
          {
            "from": "Reader",
            "to": "Writer"
          }
        ]
      }
    }
  • シャード化されたテーブル

    説明

    シャーディングにより、MySQL Reader は同じスキーマを持つ複数の MySQL テーブルを選択できます。この文脈では、シャーディングとは、複数のソース MySQL テーブルから単一の宛先テーブルにデータを書き込むことを意味します。データベースレベルのシャーディングを構成するには、Data Integration でタスクを作成し、データベースレベルのシャーディング機能を選択できます。

    {
      "type": "job",
      "version": "2.0",
      "steps": [
        {
          "stepType": "mysql",
          "parameter": {
            "indexes": [
              {
                "type": "unique",
                "column": [
                  "id"
                ]
              }
            ],
            "envType": 0,
            "useSpecialSecret": false,
            "column": [
              "id",
              "buyer_name",
              "seller_name",
              "item_id",
              "city",
              "zone"
            ],
            "tableComment": "Test order table",
            "connection": [
              {
                "datasource": "rds_dataservice",
                "table": [
                  "rds_table"
                ]
              },
              {
                "datasource": "rds_workshop_log",
                "table": [
                  "rds_table"
                ]
              }
            ],
            "where": "",
            "splitPk": "id",
            "encoding": "UTF-8"
          },
          "name": "Reader",
          "category": "reader"
        },
        {
          "stepType": "odps",
          "parameter": {},
          "name": "Writer",
          "category": "writer"
        },
        {
          "name": "Processor",
          "stepType": null,
          "category": "processor",
          "copies": 1,
          "parameter": {
            "nodes": [],
            "edges": [],
            "groups": [],
            "version": "2.0"
          }
        }
      ],
      "setting": {
        "executeMode": null,
        "errorLimit": {
          "record": ""
        },
        "speed": {
          "concurrent": 2,
          "throttle": false
        }
      },
      "order": {
        "hops": [
          {
            "from": "Reader",
            "to": "Writer"
          }
        ]
      }
    }

Reader スクリプトパラメーター

パラメーター

説明

必須

デフォルト値

datasource

データソースの名前。コードエディタはデータソースの追加をサポートしています。このパラメーターの値は、追加されたデータソースの名前と同じでなければなりません。

はい

なし

table

データを同期するテーブルの名前。データ統合タスクは、1 つのテーブルからのみデータを読み取ることができます。

table パラメーターの高度な使用法として、範囲を構成する例を以下に示します:

  • 範囲を構成して、シャード化されたテーブルからデータを読み取ることができます。たとえば、'table_[0-99]' は、'table_0''table_1''table_2' から 'table_99' までのデータを読み取ることを指定します。

  • テーブルに 'table_000''table_001''table_002' から 'table_999' までの同じ長さの数値サフィックスがある場合、パラメーターを '"table":["table_00[0-9]","table_0[10-99]","table_[100-999]"]' に設定できます。

説明

タスクは一致するすべてのテーブルを読み取ります。具体的には、これらのテーブルの column パラメーターで指定された列を読み取ります。テーブルまたは列が存在しない場合、タスクは失敗します。

はい

なし

column

構成されたテーブルから同期する列名のコレクション。JSON 配列を使用してフィールド情報を記述します。デフォルトでは、すべての列が構成されます。例: [*]。

  • 列のトリミング: エクスポートする列のサブセットを選択できます。

  • 列の並べ替え: テーブルスキーマとは異なる順序で列をエクスポートできます。

  • 定数構成: MySQL SQL 構文形式に従う必要があります。例: ["id","table","1","'mingya.wmy'","'null'","to_char(a+1)","2.3","true"]

    • id は通常の列名です。

    • table は予約語である列名です。

    • 1 は整数定数です。

    • 'mingya.wmy' は文字列定数です。単一引用符で囲む必要があることに注意してください。

    • null について:

      • " " は空の文字列を示します。

      • null は null 値を示します。

      • 'null' は文字列 "null" を示します。

    • to_char(a+1) は文字列の長さを計算する関数です。

    • 2.3 は浮動小数点数です。

    • true はブール値です。

  • column パラメーターは、同期する列のコレクションを明示的に指定する必要があります。空にすることはできません。

はい

なし

splitPk

MySQL Reader がデータを抽出する際に、splitPk パラメーターを指定すると、splitPk が表すフィールドに基づいてデータがシャード化されます。これにより、データ同期が同時タスクとして実行され、同期効率が向上します。

  • splitPk にはテーブルのプライマリキーを使用することをお勧めします。プライマリキーは通常、均等に分散されているため、シャード内のデータホットスポットを防ぐのに役立ちます。

  • 現在、splitPk は整数データ型のシャーディングのみをサポートしています。文字列、浮動小数点数、日付はサポートしていません。サポートされていないデータ型を指定した場合、splitPk 関数は無視され、同期には単一のチャネルが使用されます。

  • splitPk を指定しない場合 (splitPk を提供しない、または splitPk の値を空にする場合を含む)、データ同期は単一のチャネルを使用してテーブルデータを同期します。

いいえ

なし

splitFactor

シャーディング係数。データ同期のシャード数を構成できます。複数の同時スレッドを構成した場合、データは `同時スレッド数 × splitFactor` の部分にシャード化されます。たとえば、同時スレッド数が 5 で splitFactor が 5 の場合、データは 5 × 5 = 25 の部分にシャード化され、5 つの同時スレッドによって処理されます。

説明

推奨される値の範囲は 1 から 100 です。値が大きすぎると、メモリ不足 (OOM) エラーが発生する可能性があります。

いいえ

5

where

フィルター条件。多くのビジネスシナリオでは、当日のデータのみを同期したい場合があります。where 条件を gmt_create>$bizdate に設定できます。

  • where 条件は、効率的な増分同期に使用できます。where 文を指定しない場合、または where のキーまたは値が空の場合、すべてのデータが同期されます。

  • where 条件を `limit 10` に設定しないでください。これは MySQL SQL の WHERE 句の制約に準拠していません。

いいえ

なし

querySql (高度なモード。このパラメーターはコードレス UI ではサポートされていません。)

一部のビジネスシナリオでは、`where` パラメーターではフィルター条件を十分に記述できない場合があります。このパラメーターを使用して、カスタムのフィルター SQL 文を定義できます。このパラメーターを構成すると、システムは `tables`、`columns`、および `splitPk` パラメーターを無視し、このパラメーターの内容を使用してデータをフィルター処理します。たとえば、複数テーブルの結合後にデータを同期するには、select a,b from table_a join table_b on table_a.id = table_b.id を使用します。querySql を構成すると、MySQL Reader は `table`、`column`、`where`、および `splitPk` の構成を無視します。querySql パラメーターは、tablecolumnwhere、および splitPk パラメーターよりも優先度が高くなります。datasource パラメーターは、ユーザー名やパスワードなどの情報を解析するために使用されます。

説明

querySql パラメーターは大文字と小文字を区別します。たとえば、querysql と書くと、効果がありません。

いいえ

なし

useSpecialSecret

複数のソースデータソースがある場合、各データソースのパスワードを使用するかどうかを指定します。有効な値:

  • true

  • false

複数のソースデータソースを構成し、各データソースが異なるユーザー名とパスワードを使用している場合、このパラメーターを true に設定して、各データソースのパスワードを使用できます。

いいえ

false

Writer スクリプトの例

{
  "type": "job",
  "version": "2.0",// バージョン番号。
  "steps": [
    {
      "stepType": "stream",
      "parameter": {},
      "name": "Reader",
      "category": "reader"
    },
    {
      "stepType": "mysql",// プラグイン名。
      "parameter": {
        "postSql": [],// インポート後に実行される SQL 文。
        "datasource": "",// データソース。
        "column": [// 列名。
          "id",
          "value"
        ],
        "writeMode": "insert",// 書き込みモード。insert、replace、または update に設定できます。
        "batchSize": 1024,// 1 つのバッチで送信されるレコードの数。
        "table": "",// テーブル名。
        "nullMode": "skipNull",// null 値を処理するためのポリシー。
        "skipNullColumn": [// null 値をスキップする列。
          "id",
          "value"
        ],
        "preSql": [
          "delete from XXX;"// インポート前に実行される SQL 文。
        ]
      },
      "name": "Writer",
      "category": "writer"
    }
  ],
  "setting": {
    "errorLimit": {// 許可されるエラーレコードの数。
      "record": "0"
    },
    "speed": {
      "throttle": true,// throttle が false に設定されている場合、mbps パラメーターは効果がなく、データ転送レートは制限されません。throttle が true に設定されている場合、データ転送レートは制限されます。
      "concurrent": 1,// 同時ジョブの数。
      "mbps": "12"// 最大データ転送レート。これにより、上流/下流のデータベースへの過剰な読み取り/書き込み圧力を防ぎます。1 mbps = 1 MB/s。
    }
  },
  "order": {
    "hops": [
      {
        "from": "Reader",
        "to": "Writer"
      }
    ]
  }
}

Writer スクリプトパラメーター

パラメーター

説明

必須

デフォルト値

datasource

データソースの名前。コードエディタはデータソースの追加をサポートしています。このパラメーターの値は、追加されたデータソースの名前と同じでなければなりません。

はい

なし

table

データを同期するテーブルの名前。

はい

なし

writeMode

インポートモード。INSERT INTOON DUPLICATE KEY UPDATE、または REPLACE INTO を選択できます。

  • insert into: プライマリキーまたは一意なインデックスの競合が発生した場合、競合する行は書き込まれず、ダーティデータとして扱われます。

    コードエディタでタスクを構成する場合、writeModeinsert に設定します。

  • on duplicate key update: プライマリキーまたは一意なインデックスの競合が発生しない場合、動作は insert into と同じです。競合が発生した場合、既存の行の指定されたフィールドの値が新しい行の値に置き換えられます。

    コードエディタでタスクを構成する場合、writeModeupdate に設定します。

  • replace into: プライマリキーまたは一意なインデックスの競合が発生しない場合、動作は insert into と同じです。競合が発生した場合、既存の行が削除され、新しい行が挿入されます。新しい行は既存の行のすべてのフィールドを置き換えます。

    コードエディタでタスクを構成する場合、writeModereplace に設定します。

いいえ

insert

nullMode

null 値を処理するためのポリシー。有効な値:

  • writeNull: ソースフィールドに NULL 値がある場合、宛先フィールドに NULL 値が書き込まれます。

  • skipNull: ソースフィールドに NULL 値がある場合、宛先フィールドは書き込まれません。宛先フィールドにデフォルト値が定義されている場合、そのデフォルト値が使用されます。デフォルト値が定義されていない場合、列の値は NULL になります。このパラメーターを構成する場合、skipNullColumn パラメーターも構成する必要があります。

重要

このパラメーターを skipNull に設定すると、タスクは宛先のデフォルト値をサポートするためにデータを書き込むための SQL 文を動的に生成します。これにより、FLUSH 操作の数が増加し、同期速度が低下します。最悪の場合、各データレコードに対して FLUSH 操作が実行されます。

いいえ

writeNull

skipNullColumn

nullModeskipNull に設定されている場合、このパラメーターに構成された列は強制的に NULL として書き込まれません。対応する列のデフォルト値が優先的に使用されます。

フォーマット: ["c1","c2",...]、ここで c1c2column パラメーターのサブセットでなければなりません。

いいえ

デフォルトでは、タスクに構成されたすべての列。

column

データが書き込まれる宛先テーブルのフィールド。フィールドをカンマ (,) で区切ります。例: "column":["id","name","age"]。すべての列に順番に書き込むには、アスタリスク (*) を使用します。例: "column":["*"]

はい

なし

preSql

データ同期タスクが開始される前に実行される SQL 文。コードレス UI では、1 つの SQL 文のみを実行できます。コードエディタでは、複数の SQL 文を実行できます。たとえば、実行前にテーブルから古いデータをクリアできます: `TRUNCATE TABLE tablename`。

説明

複数の SQL 文に対するトランザクションはサポートされていません。

いいえ

なし

postSql

データ同期タスクが完了した後に実行される SQL 文。コードレス UI では、1 つの SQL 文のみを実行できます。コードエディタでは、複数の SQL 文を実行できます。たとえば、タイムスタンプを追加できます: ALTER TABLE tablename ADD colname TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP

説明

複数の SQL 文に対するトランザクションはサポートされていません。

いいえ

なし

batchSize

1 つのバッチで送信されるレコードの数。値を大きくすると、データ同期システムと MySQL 間のネットワークインタラクションが大幅に減少し、全体のスループットが向上します。この値が高すぎると、データ同期プロセスでメモリ不足 (OOM) エラーが発生する可能性があります。

いいえ

256

updateColumn

writeModeupdate に設定されている場合、これらはプライマリキーまたは一意なインデックスの競合が発生したときに更新されるフィールドです。フィールドをカンマ (,) で区切ります。例: "updateColumn":["name","age"]

いいえ

なし