すべてのプロダクト
Search
ドキュメントセンター

DataWorks:MySQL データベース全体を Elasticsearch に同期する

最終更新日:Mar 07, 2026

Data Integration を使用すると、MySQL ソースから Elasticsearch 宛てに、データベース全体をリアルタイムで同期できます。本トピックでは、MySQL をソース、Elasticsearch をデスティネーションとして、完全同期と増分同期(変更データキャプチャ:CDC)を組み合わせたリアルタイム同期タスクの設定方法を説明します。

前提条件

タスクの設定

ステップ 1:同期タスクの作成

  1. Data Integration ページに移動します。

    DataWorks コンソール にログインします。上部のナビゲーションバーで、目的のリージョンを選択します。左側のナビゲーションウィンドウで、Data Integration > Data Integration を選択します。表示されたページで、ドロップダウンリストから目的のワークスペースを選択し、Go to Data Integration をクリックします。

  2. 左側のナビゲーションウィンドウで、Synchronization Task をクリックします。ページ上部で、Create Synchronization Task をクリックし、タスク情報を設定します:

    • Source Type: ソースとして MySQL を選択します。

    • Destination Type: デスティネーションとして Elasticsearch を選択します。

    • Specific TypeReal-time synchronization for an entire database を選択します。

    • Synchronization Mode

      • Schema Migration: デスティネーションに一致するインデックス構造(インデックスやフィールドマッピングなど)を自動的に作成しますが、データは含みません。

      • Full Synchronization(任意): 指定されたソーステーブルからすべての既存データをデスティネーションにコピーします。

      • Incremental Sync(任意): 完全同期完了後、ソースからデスティネーションへ、データの変更(挿入、更新、削除)を継続的にキャプチャして同期します。

ステップ 2:データソースおよび計算リソースの設定

  1. Source には、ご利用の MySQL データソースを選択します。Destination には、ご利用の Elasticsearch データソースを選択します。

ステップ 3:同期ソリューションの設定

2. デスティネーションインデックスのマッピング

操作

説明

Refresh

システムが選択したソーステーブルを自動的に一覧表示しますが、デスティネーションインデックスの属性を確認するには、マッピングをリフレッシュする必要があります。

  • 同期対象のテーブルを選択し、[一括リフレッシュマッピング] をクリックします。

  • デスティネーションインデックス名: デスティネーションインデックス名は、Customize Mapping Rules for Destination Table Names に基づいて自動生成されます。デフォルト値は ${source_database_name}_${table_name} です。この名前のインデックスがデスティネーションに存在しない場合、システムが自動的に作成します。

Custom Mapping Rule for Destination Index Name(任意)

システムにはデフォルトのインデックス命名ルールがあります:${source_database_name}_${table_name}。また、Customize Mapping Rules for Destination Table Names 列にある Edit ボタンをクリックすることで、カスタムルールを追加することもできます。

  • ルール名: ルールに名前を付けます。ビジネス上の目的を明確に反映する名前を付けることを推奨します。

  • デスティネーションインデックス名: image アイコンをクリックし、Manual Input および Built-in Variable のオプションを組み合わせることで、デスティネーションインデックス名を構築します。サポートされる変数には、ソースデータソース名、ソースデータベース名、ソーステーブル名が含まれます。

  • 組み込み変数の編集: 組み込み変数に対して文字列変換を適用できます。

この機能は、以下のシナリオをサポートします:

  1. プレフィックスまたはサフィックスの追加: 定数を設定することで、ソーステーブル名にプレフィックスまたはサフィックスを追加します。

    ルール設定

    結果

    image

    image

  2. 統一的な文字列置換: ソーステーブル名に含まれる文字列 dev_prd_ に置換します。

    ルール設定

    結果

    image

    image

  3. 複数テーブルからのデータを単一インデックスに書き込む: デスティネーションインデックス名を定数に設定します。

    ルール設定

    結果

    image

    image

フィールド型マッピングの編集(任意)

システムは、ソース型デスティネーション型 の間でデフォルトのマッピングを提供します。表の右上隅にある Edit Mapping of Field Data Types をクリックすることで、ソースからデスティネーションへのフィールド型マッピングをカスタマイズできます。設定後、Apply and Refresh Mapping をクリックします。

フィールド型マッピングを編集する際は、変換ルールが正しいことを確認してください。誤ったルールは型変換失敗を引き起こし、不正なデータ(Dirty Data)を生成してタスク実行に影響を与える可能性があります。

デスティネーションインデックスの編集(任意)

システムは、インデックス名マッピングルールを使用して、新しいデスティネーションインデックスを作成するか、同じ名前の既存のインデックスを再利用します。

DataWorks は、ソーステーブルスキーマに基づいてデスティネーションインデックス構造を自動的に生成します。ほとんどの場合、手動での介入は必要ありません。

デスティネーションインデックスのステータスが 作成予定 の場合、その構造に新しいフィールドを追加できます。次の手順を実行します:

  1. デスティネーションインデックスへのフィールドの追加

    • 単一のインデックスにフィールドを追加するには、Destination Index Name 列の image.png アイコンをクリックし、Statement Used to Create Index を編集してフィールドを追加します。

      • Dynamic Mapping Status: データ同期中にソーステーブルから検出された新規フィールドをデスティネーションインデックスに追加するかどうかを制御します。有効な値は以下のとおりです:

        • true: ソーステーブルで新規フィールドが検出された場合、そのフィールドはデスティネーションインデックスに追加され、検索可能になります。これがデフォルト値です。

        • false: ソーステーブルで新規フィールドが検出された場合、そのフィールドはデスティネーションインデックスに追加されますが、検索可能にはなりません。

        • strict: ソーステーブルで新規フィールドが検出された場合、そのフィールドの同期は拒否され、エラーが発生します。エラーの詳細はログで確認できます。

        • runtime: ソーステーブルで新規フィールドが検出された場合、そのフィールドはデスティネーションインデックスマッピングに追加されません。代わりに、クエリ時にランタイムフィールドとして扱われ、スクリプト計算および検索で使用できます。

        ダイナミックマッピングの詳細については、「dynamic」をご参照ください。

      • Shards および Replica Shards: インデックスのプライマリシャード数およびレプリカシャード数です。インデックスは複数のシャードに分割され、異なる Elasticsearch ノードに分散されることで、分散検索を可能にし、クエリ効率を向上させます。詳細については、「基本概念」をご参照ください。

        説明

        Shards および Replica Shards のパラメーターは、インデックス作成後に変更できません。デフォルト値は 1 です。

    • 一括でのフィールド追加: 同期対象のすべてのテーブルを選択し、表の下部で Batch Modify > Destination Index Structure - Batch Add Fields を選択します。

Value assignment

ソースフィールド名がデスティネーションのフィールド名と一致する場合は、自動的にマッピングされます。前項で追加した新規フィールドおよびデスティネーションインデックスのプロパティについては、手動で値を割り当てる必要があります。次の手順を実行します:

  • 単一インデックスへの値の割り当て: Value assignment 列の Configuration ボタンをクリックして値を割り当てます。

  • 一括での値の割り当て: リストの下部で、Batch Modify > Value assignment を選択して、複数のデスティネーションインデックスにわたって同一フィールドに値を割り当てます。

定数または変数を割り当てることができます。Value Type メニューでタイプを切り替えます。以下のオプションがサポートされています:

  • デスティネーションインデックスフィールド

    • 手動割り当てabc のような定数値を入力します。

    • ソースフィールド: ソーステーブルフィールドの値をデスティネーションフィールド値として使用します。フィールド値または時刻値のいずれかを選択できます。

      • フィールド値: ソースフィールドの値を直接デスティネーションに書き込みます。

      • 時刻値: ソースフィールドに時刻値が含まれている場合、異なるフォーマットに基づいて処理し、抽出された値を Destination Format で整形できます。

        • 時刻文字列: 「2018-10-23 02:13:56」や「2021/05/18」などのように時刻を表す文字列です。ソースおよびデスティネーションの時刻フォーマットを指定することで、文字列を時刻オブジェクトに解析します。上記の例では、yyyy-MM-dd HH:mm:ssyyyy/MM/dd のようなフォーマットを使用できます。

        • 時間オブジェクト: ソース値が既に DateDatetime などの時間型である場合に、このタイプを選択します。

        • Unix タイムスタンプ(秒): 秒単位のタイムスタンプです。10 桁のタイムスタンプ形式の数字または文字列(例:1610529203"1610529203")もサポートされます。

        • Unix タイムスタンプ(ミリ秒): ミリ秒単位のタイムスタンプです。13 桁のタイムスタンプ形式の数字または文字列(例:1610529203002"1610529203002")もサポートされます。

    • 変数の選択: システムが提供する変数を値のソースとして選択します。

    • 関数: ソースフィールドに対して簡単な変換を実行したうえで、その値を割り当てます。使用方法の詳細については、「関数式を使用してデスティネーションフィールドに値を割り当てる」をご参照ください。

  • デスティネーションインデックスプロパティの割り当て: 同期に使用されるデスティネーションインデックスのプライマリキーに値を割り当てます。複数のソースフィールドを連結して複合プライマリキーを作成できます。結果として得られる値が一意であることを確認してください。

ソース分割カラム

ソース分割カラムでは、ソーステーブルからフィールドを選択するか、Not Split を選択できます。同期タスク実行時に、このフィールドに基づいてタスクが複数のタスクに分割され、データを同時かつバッチ単位で読み取ります。

テーブルのプライマリキーをソース分割カラムとして使用します。文字列、浮動小数点、日付型はサポートされていません。

現在、ソース分割カラムは MySQL ソースのみでサポートされています。

完全同期のスキップ

タスクで完全同期を有効化している場合、特定のテーブルについてこれを選択的に無効化できます。これは、そのテーブルの完全データが他の手段で既にデスティネーションに同期済みである場合に便利です。

Full condition

完全同期フェーズ中にソースデータにフィルターを適用します。WHERE キーワードを除いた WHERE 句の内容のみを入力します。

Configure DML Rule

DML メッセージ処理により、ソースからキャプチャされた変更データ(InsertUpdateDelete)を、デスティネーションへの書き込み前にフィルター処理および制御できます。このルールは、増分同期フェーズでのみ適用されます。

ステップ 4:高度な設定

高度なパラメーターの設定

タスクを微調整し、カスタム同期要件を満たすために、Advanced Parameters タブで高度なパラメーターを変更します。

  1. ページ右上隅の Advanced Settings をクリックして、高度なパラメーター設定ページに移動します。

  2. 記載されている説明に従って、パラメーター値を変更します。

  3. AI を活用した設定も可能です。タスクの並列度の調整などの自然言語コマンドを入力すると、大規模言語モデルが推奨されるパラメーター値を生成します。その後、AI 生成のパラメーターを採用するかどうかを選択できます。

    image

重要

これらのパラメーターを変更する場合は、その目的を十分に理解している必要があります。誤った設定は、タスクの遅延、他のタスクをブロックする過剰なリソース消費、データ損失、その他の予期しない問題を引き起こす可能性があります。

ステップ 7:その他の設定

アラーム設定

1. アラームの追加

image

(1) Create Rule をクリックして、アラームルールを設定します。

Alert Reason を設定して、タスクの Business delayフェイルオーバーTask statusDDL NotificationTask Resource Utilization を監視します。指定されたしきい値に基づいて、CRITICAL または WARNING のアラームレベルを設定できます。

  • Configure Advanced Parameters を設定することで、アラームメッセージの送信間隔を制御し、アラート疲労およびメッセージのバックログを防止できます。

  • Business delayTask status、または Task Resource Utilization をアラーム理由として選択した場合、復旧通知を有効化して、タスクが正常に戻った際に受信者に通知することもできます。

(2) アラームルールの管理

作成済みのアラームルールについては、アラームスイッチを使用して、アラームルールを有効にするかどうかを制御できます。アラームレベルに応じて、特定の受信者にアラームを送信します。

2. アラームの表示

タスクリストで More > Configure Alert Rule を展開して、アラームイベントページに移動し、発生したアラーム情報を表示します。

リソースグループの設定

タスクで使用されるリソースグループおよびその設定は、インターフェイス右上隅の Configure Resource Group パネルで管理できます。

1. リソースグループの表示および切り替え

  • Configure Resource Group をクリックして、タスクに現在バインドされているリソースグループを表示します。

  • リソースグループを変更するには、ここから別の利用可能なリソースグループに切り替えます。

2. リソースの調整および「リソース不足」エラーのトラブルシューティング

  • タスクログに Please confirm whether there are enough resources... というメッセージが表示される場合、現在のリソースグループの利用可能な計算ユニット(CU)が、タスクの起動または実行に十分ではありません。 Configure Resource Group パネルで、タスクが占有する CU 数を増やすことで、より多くの計算リソースを割り当てられます。

推奨リソース設定については、「Data Integration 推奨 CU 数」をご参照ください。実際の状況に応じて設定を調整してください。

高度なパラメーター設定

カスタム同期要件に応じて、ConfigureAdvanced Settings 列でクリックして、高度なパラメーターを変更します。

  1. インターフェイス右上隅の Advanced Settings をクリックして、高度なパラメーター設定ページに移動します。

  2. プロンプトに従ってパラメーター値を変更します。各パラメーターの意味は、パラメーター名の後に説明されています。

重要

問題(タスクの遅延、他のタスクをブロックする過剰なリソース消費、データ損失など)を防ぐため、パラメーターを変更する前に、その内容を十分に理解してください。

タスクの管理

タスクの編集

  1. Data Integration > Synchronization Task ページで、作成済みの同期タスクを見つけます。Operation 列で More をクリックし、Edit をクリックしてタスク情報を編集します。手順は、新規タスクの設定と同様です。

  2. 実行中でない タスクの場合、設定を直接変更し、保存して本番環境にデプロイすることで、変更を適用できます。

  3. Running タスクの場合、Start immediately after deployment を選択せずに編集およびデプロイを行うと、アクションボタンが Apply Updates に変わります。このボタンをクリックすることで、変更が本番環境に反映されます。

  4. [Apply Update] をクリックすると、システムは順次以下のアクションを実行します:StopPublishRestart

    • 変更内容が テーブルの追加または既存テーブルの切り替え を含む場合:

      更新の適用時にチェックポイントを選択することはできません。確認後、システムは新規テーブルに対して Schema Migration および Full Data Initialization を実行します。完全初期化が完了すると、他の元のテーブルとともに増分同期を開始します。

    • その他の情報を変更する場合:

      更新の適用時にチェックポイントを選択できます。確認後、タスクは指定されたチェックポイントから再開されます。チェックポイントを指定しない場合、最後に記録されたチェックポイントから再開されます。

    変更されていないテーブルには影響はありません。更新および再起動後、それらは最後に記録されたチェックポイントから再開されます。

タスクの表示

同期タスクを作成した後、同期タスクページで、作成済みのすべてのタスクおよびその基本情報を一覧表示できます。

  • 操作列では、同期タスクを Start または Stop できます。「More」から、EditView などのその他の操作を実行できます。image

  • 実行中のタスクでは、Execution Overview で基本ステータスを確認できます。概要内の特定の領域をクリックすると、詳細な実行情報を表示できます。image

    MySQL から Elasticsearch へのリアルタイムデータベース同期タスクは、以下の 3 つのステップで構成されます:

    • スキーマ移行: デスティネーションインデックスの作成方法(既存インデックス/自動作成インデックス)を含みます。インデックスが自動作成される場合、DDL 文が表示されます。

    • 完全データ初期化: 同期対象のテーブル、その進行状況、書き込まれた行数に関する情報を含みます。

    • リアルタイム同期: リアルタイム進行状況、DDL レコード、DML レコード、アラート情報などのリアルタイム同期統計を含みます。

同期タスクの再実行

一部の特殊なケースでは、ソースにテーブルを追加または削除する場合、または宛先テーブルのスキーマまたは名前を変更する場合、同期タスクの [操作] 列で [詳細] をクリックし、その後 [再実行] をクリックして、変更後にタスクを再実行できます。再実行プロセス中、同期タスクは、新しく追加されたテーブルから宛先へのデータのみ、またはスキーマまたは名前が変更された宛先テーブルへのマップされたソーステーブルからのデータのみを同期します。

  • タスクの構成を変更せずに同期タスクを再実行し、完全同期と増分同期を再度実行するには、「操作」列で「その他」をクリックし、次に[再実行]をクリックします。

  • タスクへのテーブルの追加または削除後に同期タスクを再実行するには、変更後に [完了] をクリックします。この場合、同期タスクの操作列に [更新を適用] が表示されます。[更新を適用] をクリックすると、同期タスクの再実行がトリガーされます。再実行プロセス中、同期タスクは新しく追加されたテーブルのデータを宛先に同期します。元のテーブルのデータは再度同期されません。

チェックポイントの再開

ユースケース

タスクの開始または再起動時にチェックポイントを手動でリセットすることは、以下のシナリオで有用です:

  • タスクの回復およびデータの継続: タスクが中断された場合、中断時刻を新しい開始チェックポイントとして手動で指定することで、正確にその時点からデータを再開できます。

  • データのトラブルシューティングおよびロールバック: 同期後に欠落または異常なデータが見つかった場合、問題が発生する前の時点までチェックポイントをロールバックして、問題のあるデータを再処理および修正できます。

  • 主要なタスク構成変更: デスティネーションインデックス構造やフィールドマッピングなどのタスク構成を大幅に変更した後、新しい構成下でのデータ精度を確保するために、チェックポイントを特定の時点にリセットすることを推奨します。

手順

Start をクリックすると、ポップアップウィンドウで Whether to reset the site を選択します:

  • チェックポイントをリセットしない: タスクは最後の停止時刻(最後のチェックポイント)から再開されます。

  • チェックポイントをリセットして時刻を選択: タスクは指定された時刻から開始されます。選択した時刻が、ソースのバイナリログ(binlog)で利用可能な最も早い時刻よりも前になっていないことを確認してください。

重要

チェックポイントが無効または存在しないというエラーが発生した場合、以下の解決策を試してください:

  • チェックポイントのリセット: リアルタイム同期タスクを開始する際に、チェックポイントをリセットし、ソースデータベースで利用可能な最も早い時刻を選択します。

  • ログ保持期間の調整: データベースのチェックポイントが期限切れになっている場合、データベース設定でログ保持期間(例:7 日間)を延長することを検討してください。

  • データの再同期: データが失われている場合、新しい完全同期を実行するか、欠落したデータを手動で同期するためにオフライン同期タスクを設定することを検討してください。

よくある質問

リアルタイムデータベース同期に関するよくある質問については、「Data Integration よくある質問」および「Data Integration のエラー」をご参照ください。