すべてのプロダクト
Search
ドキュメントセンター

DataWorks:MySQL データソース

最終更新日:Jun 09, 2025

DataWorks は、MySQL データソースからデータを読み取ったり、MySQL データソースにデータを書き込んだりする際に使用できる MySQL Reader と MySQL Writer を提供しています。このトピックでは、MySQL データソースとのデータ同期機能について説明します。

サポートされている MySQL のバージョン

  • バッチデータの読み取りと書き込み:

    MySQL 5.5.X、MySQL 5.6.X、MySQL 5.7.X、および MySQL 8.0.X。Amazon RDS for MySQLAzure MySQL、およびAmazon Aurora MySQL データソースと互換性があります。

    バッチ同期の際にビューのデータを読み取ることができます。

  • リアルタイムデータの読み取り:

    MySQL からのデータのリアルタイム同期は、MySQL バイナリログへのリアルタイム サブスクリプションに基づいて実行され、MySQL 5.5.X、MySQL 5.6.X、MySQL 5.7.X、および MySQL 8.0.X でのみサポートされています。リアルタイム同期機能は、関数インデックスなど、MySQL 8.0.X の新機能とは互換性がありません。Amazon RDS for MySQLAzure MySQL、およびAmazon Aurora MySQL データソースと互換性があります。

    重要

    MySQL を実行する DRDS データソースを MySQL データソースとして構成することはできません。MySQL を実行する DRDS データソースからデータをリアルタイムで同期する場合は、「DRDS データソースを追加する」のトピックを参照して、DRDS データソースを追加し、そのデータソースのリアルタイム同期タスクを構成できます。

制限事項

リアルタイムデータの読み取り

  • Data Integration は、MySQL Reader を使用した読み取り専用 MySQL インスタンスからのデータのリアルタイム同期をサポートしていません。

  • Data Integration は、関数インデックスを含むテーブルからのデータのリアルタイム同期をサポートしていません。

  • Data Integration は、XA ROLLBACK 文が実行されたデータのリアルタイム同期をサポートしていません。

    XA PREPARE 文が実行されたトランザクションデータについては、リアルタイム同期機能を使用して、データを宛先に同期できます。後で XA ROLLBACK 文がデータに対して実行された場合、データへのロールバックの変更は宛先に同期されません。同期するテーブルに XA ROLLBACK 文が実行されたテーブルが含まれている場合は、XA ROLLBACK 文が実行されたテーブルをリアルタイム同期タスクから削除し、削除したテーブルを再度追加してデータを同期する必要があります。

  • MySQL からのデータのリアルタイム同期は、行形式のバイナリログのみをサポートしています。

  • Data Integration は、カスケード削除操作が実行された関連テーブルのデータレコードのリアルタイム同期をサポートしていません。

  • Amazon Aurora MySQL データベースからデータを読み取る場合は、データ書き込みのために Amazon Aurora MySQL データベースをプライマリデータベースに接続する必要があります。これは、AWS では Amazon Aurora MySQL の読み取り専用レプリカでバイナリロギング機能を有効にすることができないためです。バイナリロギング機能は、リアルタイム同期タスクが増分データを同期するために必要です。

  • MySQL テーブルに対するすべてのオンライン DDL 操作のうち、Data Management (DMS) を使用して実行される列の追加操作のみがリアルタイムで同期できます。

  • MySQL からストアドプロシージャを読み取ることはできません。

バッチデータの読み取り

  • シャーディングなどのマルチテーブル同期シナリオで MySQL Reader を使用して MySQL からデータを同期する場合、テーブルを分割するには、指定した並列スレッド数がテーブル数よりも大きい必要があります。指定した並列スレッド数がテーブル数以下の場合、テーブルは分割されず、実際に使用される並列スレッド数は同期されるテーブル数と同じになります。

  • MySQL からストアドプロシージャを読み取ることはできません。

サポートされているデータ型

各 MySQL バージョンのすべてのデータ型については、MySQL 公式ドキュメントを参照してください。次の表は、MySQL 8.0.X の主なデータ型のサポート状況を示しています。

データ型

バッチデータ読み取り用 MySQL Reader

バッチデータ書き込み用 MySQL Writer

リアルタイムデータ読み取り用 MySQL Reader

リアルタイムデータ書き込み用 MySQL Writer

TINYINT

image

image

image

image

SMALLINT

image

image

image

image

INTEGER

image

image

image

image

BIGINT

image

image

image

image

FLOAT

image

image

image

image

DOUBLE

image

image

image

image

DECIMAL/NUMBERIC

image

image

image

image

REAL

image

image

image

image

VARCHAR

image

image

image

image

JSON

image

image

image

image

TEXT

image

image

image

image

MEDIUMTEXT

image

image

image

image

LONGTEXT

image

image

image

image

VARBINARY

image

image

image

image

BINARY

image

image

image

image

TINYBLOB

image

image

image

image

MEDIUMBLOB

image

image

image

image

LONGBLOB

image

image

image

image

ENUM

image

image

image

image

SET

image

image

image

image

BOOLEAN

image

image

image

image

BIT

image

image

image

image

DATE

image

image

image

image

DATETIME

image

image

image

image

TIMESTAMP

image

image

image

image

TIME

image

image

image

image

YEAR

image

image

image

image

LINESTRING

image

image

image

image

POLYGON

image

image

image

image

MULTIPOINT

image

image

image

image

MULTILINESTRING

image

image

image

image

MULTIPOLYGON

image

image

image

image

GEOMETRYCOLLECTION

image

image

image

image

データ同期の前に MySQL 環境を準備する

DataWorks を使用して MySQL データソースとの間でデータを同期する前に、MySQL 環境を準備する必要があります。これにより、データ同期タスクを構成し、MySQL データソースとの間で想定どおりにデータを同期できるようになります。MySQL データソースとの間でデータを同期するための MySQL 環境を準備する方法は次のとおりです。

MySQL データベースのバージョンを確認する

Data Integration には、MySQL のバージョンに関する特定の要件があります。サポートされている MySQL のバージョンを参照して、MySQL データベースのバージョンが要件を満たしているかどうかを確認できます。次の文を実行して、MySQL データベースのバージョンを確認できます。

SELECT version();

必要な権限を持つアカウントを準備する

DataWorks が MySQL データベースにアクセスするためのアカウントを計画して作成することをお勧めします。このようなアカウントを準備するには、次の手順を実行します。

  1. オプション。 アカウントを作成します。

    詳細については、「MySQL データベースにアクセスするためのアカウントを作成する」をご参照ください。

  2. アカウントに必要な権限を付与します。

    • バッチ同期

      バッチデータの読み取りとバッチデータの書き込みには、異なる権限が必要です。

      • バッチデータの読み取り: アカウントには SELECT 権限が必要です。

      • バッチデータの書き込み: アカウントには INSERTDELETE、および UPDATE 権限が必要です。

    • リアルタイム同期

      アカウントには、MySQL データベースに対する SELECTREPLICATION SLAVE、および REPLICATION CLIENT 権限が必要です。

    次の文を実行して、アカウントに権限を付与できます。または、アカウントに SUPER 権限を付与することもできます。文中の Account for data synchronization を、作成したアカウントに置き換えます。

    -- CREATE USER 'Account for data synchronization'@'%' IDENTIFIED BY 'Password'; //データ同期に使用できるアカウントを作成し、パスワードを指定します。このようにして、任意のホストからアカウントとパスワードを使用してデータベースにアクセスできます。 % はホストを示します。
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'Account for data synchronization'@'%'; // アカウントに SELECT、REPLICATION SLAVE、および REPLICATION CLIENT 権限を付与します。

    *.* は、すべてのデータベースのすべてのテーブルに対する上記の権限がアカウントに付与されていることを示します。データベース内の特定のテーブルに対する上記の権限をアカウントに付与することもできます。たとえば、test データベースの user テーブルに対する上記の権限をアカウントに付与するには、次の文を実行します: GRANT SELECT, REPLICATION CLIENT ON test.user TO 'Account for data synchronization'@'%';

    説明

    REPLICATION SLAVE 権限はグローバル権限です。データベース内の特定のテーブルに対するこの権限をアカウントに付与することはできません。

リアルタイム同期のシナリオでのみバイナリロギング機能を有効にする

MySQL からの増分データのリアルタイム同期は、MySQL バイナリログのリアルタイム サブスクリプションに基づいて実行されます。MySQL から増分データを同期するためにリアルタイム同期タスクを構成する前に、バイナリロギング機能を有効にする必要があります。バイナリロギング機能を有効にするには、次の手順を実行します。

重要
  • リアルタイム同期タスクが MySQL データソースのバイナリログからデータを同期している場合、関連データベースからバイナリログを削除することはできません。MySQL データソースのリアルタイム同期タスクでレイテンシが発生した場合、タスクがバイナリログからデータを読み取るのに長時間かかる場合があります。タスクに適切なアラートルールを構成して、タスクでレイテンシが発生したときにシステムがアラート通知を送信できるようにする必要があります。さらに、関連データベースのディスク容量にも注意を払う必要があります。

  • バイナリログは少なくとも 72 時間保持する必要があります。バイナリログの保持期間が短い場合、失敗した同期タスクを再実行する前にバイナリログがクリアされる可能性があります。この場合、同期タスクが失敗する前の位置にバイナリログのオフセットをリセットすることはできません。これにより、データが失われます。データが失われた場合は、完全データのバッチ同期のみを実行して、不足しているデータを補足できます。

  1. バイナリロギング機能が有効になっているかどうかを確認します。

    • 次の文を実行して、バイナリロギング機能が有効になっているかどうかを確認します。

      SHOW variables LIKE "log_bin";

      返された結果に ON が表示されている場合、バイナリロギング機能は有効になっています。

    • セカンダリ MySQL データベースから増分データを同期する場合、次の文を実行して、セカンダリ MySQL データベースでバイナリロギング機能が有効になっているかどうかを確認します。

      SHOW variables LIKE "log_slave_updates";

      返された結果に ON が表示されている場合、セカンダリ MySQL データベースでバイナリロギング機能は有効になっています。

    返された結果に ON が表示されていない場合:

  2. バイナリログのフォーマットをクエリします。

    次の文を実行して、バイナリログのフォーマットをクエリします。

    SHOW variables LIKE "binlog_format";

    システムは次のいずれかの結果を返す場合があります。

    • ROW: バイナリログのフォーマットは row です。

    • STATEMENT: バイナリログのフォーマットは statement です。

    • MIXED: バイナリログのフォーマットは mixed です。

    重要

    MySQL からのデータのリアルタイム同期では、row フォーマットのバイナリログのみがサポートされています。バイナリログのフォーマットが row でない場合は、フォーマットを row に変更します。

  3. binlog_row_image パラメータの設定をクエリします。

    次の文を実行して、binlog_row_image パラメータの設定をクエリします。

    SHOW variables LIKE "binlog_row_image";

    システムは次のいずれかの結果を返す場合があります。

    • FULL: before イメージと after イメージの両方のすべての列をログに記録します。

    • MINIMAL: before イメージまたは after イメージの特定の列のみをログに記録します。

    重要

    注: MySQL からのデータのリアルタイム同期は、binlog_row_image パラメーターに指定された FULL 値のみをサポートします。 binlog_row_image パラメーターの値が FULL でない場合は、値をFULLに変更します。

OSS からのバイナリログの読み取りの承認

MySQL データソースを追加するときに、[構成モード] パラメータを [Alibaba Cloud インスタンスモード] に設定し、リージョンパラメータを現在の DataWorks ワークスペースが存在するリージョンに設定した場合、[OSS からのバイナリログの読み取りを有効にする] をオンにすることができます。このスイッチをオンにすると、DataWorks は ApsaraDB RDS for MySQL からバイナリログを読み取れない場合に Object Storage Service (OSS) からバイナリログを取得しようとします。これにより、リアルタイム同期タスクが中断されるのを防ぎます。

[OSS からバイナリログにアクセスするための ID] パラメータを [Alibaba Cloud RAM ユーザー] または [Alibaba Cloud RAM ロール] に設定した場合は、RAM ユーザーまたは RAM ロールに権限を付与するために、次の操作も実行する必要があります。

  • Alibaba Cloud RAM ユーザー

    1. RAM コンソールの [ユーザー] ページに移動します。権限を付与する RAM ユーザーを見つけます。

    2. [アクション] 列の [権限の追加] をクリックします。

    3. [権限の付与] パネルでパラメータを構成し、[権限の付与] をクリックします。主なパラメータ:

      • [リソーススコープ]: アカウントを選択します。

      • [ポリシー]: ドロップダウンリストから [システムポリシー] を選択します。

      • 検索ボックスに AliyunDataWorksAccessingRdsOSSBinlogPolicy と入力してポリシーを見つけ、[選択済みポリシー] 領域にポリシーを追加します。

      image

  • Alibaba Cloud RAM ロール

    1. RAM コンソールの [ロール] ページに移動します。[ロール] ページで RAM ロールを作成します。詳細については、「信頼できる Alibaba Cloud アカウントの RAM ロールを作成する」をご参照ください。

      主なパラメータ:

      • [プリンシパルタイプ]: このパラメータを [クラウドアカウント] に設定します。

      • [プリンシパル名]: このパラメータを [その他のアカウント] に設定し、表示されるフィールドに DataWorks ワークスペースが属する Alibaba Cloud アカウントの ID を入力します。

      • [ロール名]: カスタムロール名を指定します。

    2. 作成した RAM ロールに権限を付与します。詳細については、「RAM ロールに権限を付与する」をご参照ください。

      主なパラメータ:

      • [ポリシー]: ドロップダウンリストから [システムポリシー] を選択します。

      • AliyunDataWorksAccessingRdsOSSBinlogPolicy ポリシーを RAM ロールにアタッチします。

    3. 作成した RAM ロールの信頼ポリシーを変更します。詳細については、「RAM ロールの信頼ポリシーを編集する」をご参照ください。

      {
          "Statement": [
              {
                  "Action": "sts:AssumeRole",
                  "Effect": "Allow",
                  "Principal": {
                      "Service": [
                          "<DataWorks ユーザーの Alibaba Cloud アカウントの ID>@cdp.aliyuncs.com",
                          "<DataWorks ユーザーの Alibaba Cloud アカウントの ID>@dataworks.aliyuncs.com"
                      ]
                  }
              }
          ],
          "Version": "1"
      }

データソースを追加する

DataWorks で同期タスクを開発する前に、データソースを追加および管理するの手順に従って、必要なデータソースを DataWorks に追加する必要があります。 DataWorks コンソールでパラメーターのヒントを表示して、データソースを追加するときのパラメーターの意味を理解できます。

ネットワーク環境設定に基づいて接続モードを選択します。

シナリオ 1:VPC 経由の接続(推奨)

仮想プライベートクラウド(VPC)は、低レイテンシとより安全なデータ伝送を特長としています。インターネットアクセスのための追加構成は必要ありません。

  • シナリオ:MySQL インスタンスは、使用するサーバーレス リソースグループと同じ VPC にデプロイされています。

  • サポートされているモード:

    • [Alibaba Cloud インスタンスモード]:使用するリソースグループと同じ VPC にデプロイされている MySQL インスタンスを選択します。システムは自動的に接続情報を取得します。手動構成は必要ありません。

    • [接続文字列モード]:インスタンスの内部エンドポイントとポート番号を手動で指定します。

シナリオ 2:インターネット経由の接続

インターネット経由のデータ伝送にはリスクがあります。インターネット経由で接続を確立する場合は、ホワイトリストや IP アドレスベースの認証などのセキュリティポリシーを構成することをお勧めします。

  • シナリオ:インターネット経由で MySQL インスタンスにアクセスします。たとえば、リージョン間またはオンプレミス環境の MySQL インスタンスにアクセスします。

  • サポートされているモード:

    • [接続文字列モード]:インスタンスのパブリックエンドポイントとポート番号を手動で指定します。MySQL インスタンスでインターネットアクセスが有効になっていることを確認してください。

説明

デフォルトでは、サーバーレス リソースグループはインターネットにアクセスできません。サーバーレス リソースグループを使用してインターネット経由で MySQL インスタンスにアクセスする場合は、リソースグループが関連付けられている VPC にNAT ゲートウェイを構成する必要があります。これにより、リソースグループは NAT ゲートウェイに関連付けられた Elastic IP アドレス(EIP)を使用して、インターネット経由で MySQL インスタンスにアクセスできます。

データ同期タスクを開発する

同期タスクの開始点と構成手順については、以下の構成ガイドを参照してください。

単一テーブルのデータを同期するためのバッチ同期タスクを構成する

単一テーブルのデータを同期するためのリアルタイム同期タスクを構成する

構成手順の詳細については、「DataStudio でリアルタイム同期タスクを構成する」をご参照ください。

データベース内のすべてのデータのバッチ同期、データベース内の完全データまたは増分データのリアルタイム同期、およびシャーディングデータベース内のシャーディングテーブルからのデータのリアルタイム同期を実装するための同期設定を構成する

構成手順の詳細については、「Data Integration で同期タスクを構成する」をご参照ください。

よくある質問

詳細については、「Data Integration に関するよくある質問」をご参照ください。

接続パラメータ

コードエディタを使用してバッチ同期タスクを設定する

コードエディタを使用してバッチ同期タスクを設定する場合、統一スクリプト形式の要件に基づいて、スクリプトに関連パラメーターを設定する必要があります。詳細については、「コードエディタを使用してバッチ同期タスクを設定する」をご参照ください。次の情報は、コードエディタを使用してバッチ同期タスクを設定する際に、データソースに対して設定する必要のあるパラメーターについて説明しています。

MySQL Reader のコード

次のサンプルコードは、シャーディングされていないテーブルからデータを読み取る同期タスクを構成する方法と、シャーディングされたテーブルからデータを読み取る同期タスクを構成する方法の例を示しています。

説明

JSON の例に含まれるコメントは、主要なパラメーターの定義を示すためにのみ使用されています。パラメーターを構成する際は、コメントを削除してください。

  • シャーディングされていないテーブルからデータを読み取る同期タスクを構成する

    {
      "type": "job",
      "version": "2.0",// バージョン番号。
      "steps": [
        {
          "stepType": "mysql",// プラグイン名。
          "parameter": {
            "column": [// 列の名前。
              "id"
            ],
            "connection": [
              {
                "querySql": [
                  "select a,b from join1 c join join2 d on c.id = d.id;"
                ],
                "datasource": ""// データソースの名前。
              }
            ],
            "where": "",// WHERE 句。
            "splitPk": "",// シャードキー。
            "encoding": "UTF-8"// エンコード形式。
          },
          "name": "Reader",
          "category": "reader"
        },
        {
          "stepType": "stream",
          "parameter": {},
          "name": "Writer",
          "category": "writer"
        }
      ],
      "setting": {
        "errorLimit": {
          "record": "0"// ダーティデータレコードの最大許容数。
        },
        "speed": {
          "throttle": true,// スロットルを有効にするかどうかを指定します。値 false はスロットルが無効であることを示し、値 true はスロットルが有効であることを示します。 mbps パラメーターは、throttle パラメーターが true に設定されている場合にのみ有効になります。
          "concurrent": 1,// 並列スレッドの最大数。
          "mbps": "12"// 最大転送速度。単位: MB/秒。
        }
      },
      "order": {
        "hops": [
          {
            "from": "Reader",
            "to": "Writer"
          }
        ]
      }
    }
  • シャーディングされたテーブルからデータを読み取る同期タスクを構成する

    説明

    シャーディングのシナリオでは、シャーディングされた MySQL テーブルの同じスキーマを持つ複数のテーブルシャードからデータを読み取り、同じ宛先テーブルにデータを書き込むことができます。 データベースシャードからすべてのデータを同期する場合は、Data Integration でデータ同期タスクを作成し、必要な同期方法を選択できます。

    {
      "type": "job",
      "version": "2.0",
      "steps": [
        {
          "stepType": "mysql",
          "parameter": {
            "indexes": [
              {
                "type": "unique",
                "column": [
                  "id"
                ]
              }
            ],
            "envType": 0,
            "useSpecialSecret": false,
            "column": [
              "id",
              "buyer_name",
              "seller_name",
              "item_id",
              "city",
              "zone"
            ],
            "tableComment": "テスト注文テーブル",
            "connection": [
              {
                "datasource": "rds_dataservice",
                "table": [
                  "rds_table"
                ]
              },
              {
                "datasource": "rds_workshop_log",
                "table": [
                  "rds_table"
                ]
              }
            ],
            "where": "",
            "splitPk": "id",
            "encoding": "UTF-8"
          },
          "name": "Reader",
          "category": "reader"
        },
        {
          "stepType": "odps",
          "parameter": {},
          "name": "Writer",
          "category": "writer"
        },
        {
          "name": "Processor",
          "stepType": null,
          "category": "processor",
          "copies": 1,
          "parameter": {
            "nodes": [],
            "edges": [],
            "groups": [],
            "version": "2.0"
          }
        }
      ],
      "setting": {
        "executeMode": null,
        "errorLimit": {
          "record": ""
        },
        "speed": {
          "concurrent": 2,
          "throttle": false
        }
      },
      "order": {
        "hops": [
          {
            "from": "Reader",
            "to": "Writer"
          }
        ]
      }
    }

MySQL Reader のコードのパラメーター

パラメーター

説明

必須

デフォルト値

datasource

データソースの名前。追加されたデータソースの名前と同じである必要があります。コードエディターを使用してデータソースを追加できます。

はい

デフォルト値なし

table

データを読み取るテーブルの名前。各同期タスクは、1 つのテーブルからのデータの同期にのみ使用できます。

シャーディングテーブルの場合、table パラメーターを使用して、データを読み取るパーティションを指定できます。例:

  • table パラメーターを 'table_[0-99]' に設定します。この値は、MySQL Reader がシャーディングテーブルのパーティション 'table_0' から 'table_99' までデータを読み取ることを示します。

  • table パラメーターを '"table":["table_00[0-9]","table_0[10-99]","table_[100-999]"]' に設定します。この値は、MySQL Reader がシャーディングテーブルのパーティション 'table_000' から 'table_999' までデータを読み取ることを示します。すべてのパーティション名の数値サフィックスの長さが同じ場合にのみ、このメソッドを使用できます。

説明

MySQL Reader は、column パラメーターで指定された列から、table パラメーターで指定されたパーティションのデータを読み取ります。指定されたパーティションまたは列が存在しない場合、同期タスクは失敗します。

はい

デフォルト値なし

column

データを読み取る列の名前。JSON 配列で名前を指定します。デフォルト値は [*] で、ソーステーブルのすべての列を示します。

  • 読み取る特定の列を選択できます。

  • 列の順序を変更できます。これは、ソーステーブルのスキーマで指定された順序とは異なる順序で列を指定できることを示します。

  • 定数がサポートされています。列名は、["id","table","1","'mingya.wmy'","'null'","to_char(a+1)","2.3","true"] など、MySQL でサポートされている SQL 構文に準拠して配置する必要があります。

    • id: 列名。

    • table: 予約キーワードを含む列の名前。

    • 1: 整数定数。

    • 'mingya.wmy': 文字列定数。単一引用符 (') で囲みます。

    • null:

      • " " は空の文字列を示します。

      • null は null 値を示します。

      • 'null' は文字列 null を示します。

    • to_char(a+1): 文字列の長さを計算するために使用される関数式。

    • 2.3: 浮動小数点定数。

    • true: ブール値。

  • column パラメーターは、データを読み取るすべての列を明示的に指定する必要があります。このパラメーターを空にすることはできません。

はい

デフォルト値なし

splitPk

MySQL Reader がデータを読み取るときに データシャーディング に使用される フィールド。このパラメーターを指定すると、ソーステーブルはこのパラメーターの値に基づいてシャーディングされます。Data Integration は、並列スレッドを実行してデータを読み取ります。これにより、データの同期をより効率的に行うことができます。

  • splitPk パラメーターをテーブルのプライマリキー列の名前に設定することをお勧めします。プライマリキー列に基づいてデータを異なるシャードに均等に分散させることができ、特定のシャードにのみ集中的に分散されるのを防ぐことができます。

  • splitPk パラメーターは、整数データ型のデータのみのシャーディングをサポートしています。 splitPk パラメーターを、文字列、浮動小数点、日付データ型などのサポートされていないデータ型のフィールドに設定すると、このパラメーターの設定は無視され、単一スレッドを使用してデータが読み取られます。

  • splitPk パラメーターが指定されていないか空の場合、単一スレッドを使用してデータが読み取られます。

いいえ

デフォルト値なし

where

WHERE 句。たとえば、このパラメーターを gmt_create>$bizdate に設定して、現在の日付に生成されたデータを読み取ることができます。

  • WHERE 句を使用して増分データを読み取ることができます。 where パラメーターが指定されていないか空の場合、MySQL Reader はすべてのデータを読み取ります。

  • where パラメーターを limit 10 に設定しないでください。この値は、SQL WHERE 句に対する MySQL の制約に準拠していません。

いいえ

デフォルト値なし

querySql (高度なパラメーター。コードエディターでのみ使用可能)

絞り込んだデータフィルタリングに使用する SQL 文。このパラメーターを設定すると、データはこのパラメーターの値のみに基づいてフィルタリングされます。たとえば、複数のテーブルを結合してデータ同期を行う場合は、このパラメーターを select a,b from table_a join table_b on table_a.id = table_b.id に設定します。 querySql パラメーターの優先度は、tablecolumnwheresplitPk パラメーターの優先度よりも高くなります。 querySql パラメーターを設定すると、MySQL Reader は table、column、where、splitPk パラメーターの設定を無視します。システムは、datasource パラメーターで指定されたデータソースのユーザー名やパスワードなどの情報を querySql パラメーターから解析します。

説明

querySql パラメーターの名前では大文字と小文字が区別されます。たとえば、querysql は有効になりません。

いいえ

デフォルト値なし

useSpecialSecret

複数のデータソースからデータを同期する場合に、各データソースのアクセスパスワードを使用するかどうかを指定します。有効な値:

  • true

  • false

複数のデータソースを追加し、データソースへのアクセスに使用するユーザー名とパスワードが異なる場合は、このパラメーターを true に設定して、対応するデータソースへのアクセスにユーザー名とパスワードを個別に使用できます。

いいえ

false

MySQL Writer のコード

{
  "type": "job",
  "version": "2.0",// バージョン番号。
  "steps": [
    {
      "stepType": "stream",
      "parameter": {},
      "name": "Reader",
      "category": "reader"
    },
    {
      "stepType": "mysql",// プラグイン名。
      "parameter": {
        "postSql": [],// 同期タスクの実行後に実行する SQL 文。
        "datasource": "",// データソースの名前。
        "column": [// 列の名前。
          "id",
          "value"
        ],
        "writeMode": "insert",// 書き込みモード。有効な値:insert、replace、update。
        "batchSize": 1024,// 一度に書き込むデータレコードの数。
        "table": "",// テーブルの名前。
        "nullMode": "skipNull",// NULL 値を処理するためのポリシー。
        "skipNullColumn": [// NULL 値をスキップする必要がある列を指定します。
          "id",
          "value"
        ],
        "preSql": [
          "delete from XXX;"// 同期タスクの実行前に実行する SQL 文。
        ]
      },
      "name": "Writer",
      "category": "writer"
    }
  ],
  "setting": {
    "errorLimit": {// 許容されるダーティデータレコードの最大数。
      "record": "0"
    },
    "speed": {
      "throttle": true,// スロットルを有効にするかどうかを指定します。値 false はスロットルが無効になっていることを示し、値 true はスロットルが有効になっていることを示します。 mbps パラメーターは、throttle パラメーターが true に設定されている場合にのみ有効になります。
      "concurrent": 1,// 並列スレッドの最大数。
      "mbps": "12"// 最大転送速度。単位:MB/秒。ソースでの重い読み取りワークロードやデスティネーションでの重い書き込みワークロードを防ぐために、最大転送速度を指定できます。
    }
  },
  "order": {
    "hops": [
      {
        "from": "Reader",
        "to": "Writer"
      }
    ]
  }
}

MySQL Writer のコードのパラメーター

パラメーター

説明

必須

デフォルト値

datasource

データソースの名前。追加されたデータソースの名前と同じである必要があります。コードエディタを使用してデータソースを追加できます。

はい

デフォルト値なし

table

データを書き込むテーブルの名前。

はい

デフォルト値なし

writeMode

書き込みモード。 insert intoon duplicate key updatereplace into の各モードがサポートされています。

  • insert into: プライマリキーの競合または一意なインデックスの競合が発生した場合、競合する行にデータを書き込むことができず、これらの行に書き込まれなかったデータはダーティデータと見なされます。

    コードエディタを使用して同期タスクを設定する場合は、writeModeinsert に設定します。

  • on duplicate key update: プライマリキーの競合または一意なインデックスの競合が発生しない場合、データはこのパラメーターを insert に設定した場合と同じ方法で処理されます。競合が発生した場合、元の行の指定されたフィールドが新しい行に置き換えられ、データが MySQL に書き込まれます。

    コードエディタを使用して同期タスクを設定する場合は、writeModeupdate に設定します。

  • replace into: プライマリキーの競合または一意なインデックスの競合が発生しない場合、データはこのパラメーターを insert に設定した場合と同じ方法で処理されます。競合が発生した場合、元の行が削除され、新しい行が挿入されます。これは、元の行のすべてのフィールドが置き換えられることを示します。

    コードエディタを使用して同期タスクを設定する場合は、writeModereplace に設定します。

いいえ

insert

nullMode

NULL 値を処理するためのポリシー。有効な値:

  • writeNull: ソースの列の値が NULL の場合、NULL 値がデスティネーションの対応する列に書き込まれます。

  • skipNull: ソースの列の値が NULL の場合、NULL 値はデスティネーションの対応する列に書き込まれません。デスティネーションの列にデフォルト値が指定されている場合は、デフォルト値が使用されます。デフォルト値が指定されていない場合は、NULL が使用されます。 nullMode パラメーターを skipNull に設定する場合は、skipNullColumn パラメーターも設定する必要があります。

重要

このパラメーターを skipNull に設定すると、データを書き込むために使用される SQL 文が動的に連結され、デスティネーションでのデフォルト値がサポートされます。これにより、フラッシュの回数が増加し、データ同期が遅くなります。最悪の場合、各データレコードが 1 回フラッシュされます。

いいえ

writeNull

skipNullColumn

NULL 値をスキップする必要がある列を指定します。 nullMode パラメーターを skipNull に設定すると、NULL 値はこのパラメーターで指定された列に強制的に書き込まれません。列に指定されたデフォルト値が優先的に使用されます。

このパラメーターは、["c1","c2",...] の形式で設定します。 c1c2 は、column パラメーターで指定された列のサブセットに置き換える必要があります。

いいえ

タスクに設定されているすべての列

column

データを書き込む列の名前。 "column":["id","name","age"] のように、名前をコンマ (,) で区切ります。デスティネーションテーブルのすべての列にデータを書き込む場合は、"column":["*"] のように、このパラメーターをアスタリスク (*) に設定します。

はい

デフォルト値なし

preSql

同期タスクが実行される前に実行する SQL 文。コードレス UI では 1 つの SQL 文のみを実行でき、コードエディタでは複数の SQL 文を実行できます。たとえば、同期タスクが実行される前に、TRUNCATE TABLE tablename 文を実行して古いデータを削除できます。

説明

複数の SQL 文を設定した場合、文は同じトランザクション内で実行されません。

いいえ

デフォルト値なし

postSql

同期タスクが実行された後に実行する SQL 文。コードレス UI では 1 つの SQL 文のみを実行でき、コードエディタでは複数の SQL 文を実行できます。たとえば、同期タスクが実行された後に、ALTER TABLE tablename ADD colname TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP 文を実行してタイムスタンプを追加できます。

説明

複数の SQL 文を設定した場合、文は同じトランザクション内で実行されません。

いいえ

デフォルト値なし

batchSize

一度に書き込むデータレコードの数。ビジネス要件に基づいて、このパラメーターを適切な値に設定します。これにより、Data Integration と MySQL の間の相互作用が大幅に削減され、スループットが向上します。このパラメーターを過度に大きな値に設定すると、データ同期の際にメモリ不足 (OOM) エラーが発生する可能性があります。

いいえ

256

updateColumn

プライマリキーの競合または一意なインデックスの競合が発生した場合に更新される列の名前。このパラメーターは、writeMode パラメーターが on duplicate key update に設定されている場合にのみ有効になります。複数の列名はコンマ (,) で区切ります。例: "updateColumn":["name","age"]

いいえ

デフォルト値なし