すべてのプロダクト
Search
ドキュメントセンター

DataWorks:MySQL データベース全体の Elasticsearch へのオフライン同期

最終更新日:Nov 09, 2025

データ統合は、MySQL、PolarDB、SQL Server などのソースから Elasticsearch へのデータベース全体のオフライン同期をサポートしています。このトピックでは、MySQL をソース、Elasticsearch を宛先として使用し、MySQL データベース全体をオフラインモードで Elasticsearch に同期する方法について説明します。

前提条件

手順

ステップ 1: 同期タスクタイプを選択する

  1. データ統合ページに移動します。

    DataWorks コンソールにログインします。上部のナビゲーションバーで、目的のリージョンを選択します。左側のナビゲーションウィンドウで、[データ統合] > [データ統合] を選択します。表示されたページで、ドロップダウンリストから目的のワークスペースを選択し、[データ統合へ移動] をクリックします。

  2. 左側のナビゲーションウィンドウで [同期タスク] をクリックし、ページの上部にある [同期タスクの作成] をクリックして同期タスク作成ページに移動します。次の基本情報を構成します:

    • データソースと宛先: MySQLElasticsearch

    • 新しいタスク名: 同期タスクのカスタム名。

    • 同期タイプ: データベース全体の Elasticsearch へのオフライン同期

ステップ 2: ネットワークとリソースを構成する

  1. [ネットワークとリソース] セクションで、同期タスクの [リソースグループ] を選択します。[タスクリソース使用量] に計算ユニット (CU) の数を割り当てることができます。

  2. [ソースデータソース] で、追加した MySQL データソースを選択します。[宛先データソース] で、追加した Elasticsearch データソースを選択します。次に、[接続性のテスト] をクリックします。

    image

  3. ソースと宛先の両方のデータソースが接続されていることを確認したら、[次へ] をクリックします。

ステップ 3: 同期ソースとルールを構成する

  1. 同期するテーブルを選択します。

    このステップでは、[ソースデータベースのテーブル] セクションでソースデータベースから同期するテーブルを選択し、image アイコンをクリックして右側の [選択したテーブル] セクションに移動できます。

    image

  2. テーブル名からインデックス名へのマッピングルールを設定します。

    [テーブル名からインデックス名へのマッピングルールの設定] を使用して、宛先に書き込まれるスキーマ名またはインデックス名をカスタマイズすることもできます。ソースデータベースとテーブルを選択すると、デフォルトでは、データはソースデータベースおよびテーブルと同じ名前の宛先スキーマおよびテーブルに書き込まれます。そのような宛先スキーマまたはテーブルが存在しない場合、システムは宛先にスキーマまたはテーブルを自動的に作成します。

    構成の説明:

    • ソースデータベース名と宛先スキーマ名の変換ルール: すべての変換ルールは、元のデータベース名に適用されます。変換が完了したら、その結果を [宛先インデックス名ルール] の変数 ${db_name_src_transed} として使用できます。

      重要
      • [宛先インデックス名ルール] を使用しない場合、このルールは最終的な実際の宛先スキーマ名に直接影響します。

      • [宛先インデックス名ルール] を使用する場合、このルールは変数 ${db_name_src_transed} の値に影響するだけでなく、最終的な実際の宛先スキーマ名にも影響します。

    • ソーステーブル名と宛先インデックス名の変換ルール: すべての変換ルールは、元のテーブル名に適用されます。変換が完了したら、その結果を [宛先インデックス名ルール] の変数 ${db_table_name_src_transed} として使用できます。

      重要
      • [宛先インデックス名ルール] を使用しない場合、このルールは最終的な実際の宛先インデックス名に直接影響します。

      • [宛先インデックス名ルール] を使用する場合、このルールは変数 ${db_table_name_src_transed} の値にのみ影響し、最終的な実際の宛先インデックス名には直接影響しません。最終的な宛先インデックス名は、[宛先インデックス名ルール] によって決定されます。

    • 宛先インデックス名ルール: 組み込み変数を使用して、宛先インデックスに名前を付けることができます。

      次の組み込み変数が利用可能です:

      • ${db_table_name_src_transed}: 「ソーステーブル名と宛先インデックス名の変換ルール」での変換後のインデックス名。

      • ${db_name_src_transed}: 「ソースデータベース名と宛先スキーマ名の変換ルール」での変換後の宛先スキーマ名。

      • ${ds_name_src}: ソースデータソース名。

    たとえば、ソースデータベース名のプレフィックス doc_ をプレフィックス pre_ に置き換え、ソーステーブル table_01table_02table_03my_table という名前のインデックスに同期し、最後にこのインデックスにサフィックス _post を追加するには、次のように構成する必要があります:

    image

  3. [次へ] をクリックして宛先インデックスを構成します。

ステップ 4: 宛先インデックスを構成する

[ソーステーブルと Elasticsearch インデックスマッピングの更新] をクリックして、ステップ 3 で構成した [テーブル名からインデックス名へのマッピングルールの設定] に基づいて宛先インデックスを生成します。ステップ 3 でマッピングルールを構成しなかった場合、デフォルトでは、データはソーステーブルと同じ名前の宛先インデックスに書き込まれます。そのような宛先インデックスが存在しない場合、システムはデフォルトで 1 つ作成します。同期プライマリキーとインデックス作成方法を変更することもできます。

説明

宛先テーブル名は、[テーブル名からインデックス名へのマッピングルールの設定] で構成したテーブル名変換ルールに基づいて自動的に変換されます。

  1. [同期プライマリキー] 列で、プライマリキーの置換ソリューションを選択します。

    • ソースデータベースのテーブルにプライマリキーがある場合、システムはデータ同期中にプライマリキーに基づいて重複データを削除します。

    • ソースデータベースのテーブルにプライマリキーがない場合は、image アイコンをクリックしてプライマリキーをカスタマイズする必要があります。これは、1 つ以上の非プライマリキーフィールドを複合キーとして使用してプライマリキーを置き換え、同期中に重複データを削除することを意味します。

  2. [インデックス作成方法] 列で、インデックスを自動的に作成するか、既存のインデックスを使用するかを選択します。

    • [インデックス作成方法][インデックスを自動的に作成] に設定されている場合、[Elasticsearch インデックス名] 列には自動的に作成された Elasticsearch インデックス名が表示されます。インデックスの名前をクリックして、インデックスに関連するパラメーターの値を変更できます。

    • [インデックス作成方法][既存のインデックスを使用] に設定されている場合、[Elasticsearch インデックス名] 列のドロップダウンリストから使用するインデックス名を選択できます。

  3. [次へ] をクリックして同期ルールを構成します。

ステップ 5: 同期ルールを構成する

データベース全体のオフライン同期ソリューションとして、以下がサポートされています。ニーズに応じてソリューションを選択できます。ソリューションごとに必要なパラメーターが異なります。

同期ソリューション

説明

初回全量同期後の周期的増分同期

このメソッドを選択すると、システムはまずすべてのデータを Elasticsearch に同期します。その後、指定されたフィルター条件とスケジューリングサイクルに基づいてシステムがソリューションを実行するたびに、ソースの増分データのみを Elasticsearch に同期します。

初回全量同期のみ

このメソッドを選択した場合、同期操作を 1 回実行するだけで、ソースのすべてのデータを Elasticsearch に同期できます。

初回増分同期のみ

このメソッドを選択した場合、同期操作を 1 回実行するだけで、指定されたフィルター条件に基づいてソースの増分データを Elasticsearch に同期できます。

周期的全量同期

このメソッドを選択した場合は、バッチ同期ソリューションのスケジューリングサイクルを指定する必要があります。その後、指定されたスケジューリングサイクルに基づいてシステムがソリューションを実行するたびに、ソースのすべてのデータを Elasticsearch に同期します。

周期的増分同期

このメソッドを選択すると、指定されたフィルター条件とスケジューリングサイクルに基づいてシステムがソリューションを実行するたびに、ソースの増分データのみを Elasticsearch に同期します。

初回全量同期後の周期的増分同期

全量同期

パラメーター

説明

書き込み前に元のインデックスをクリア

  • はい: 新しいデータを書き込む前に、システムはインデックス内の元のデータをクリアします。

  • いいえ: 新しいデータを書き込む前に、システムはインデックス内の元のデータをクリアしません。

重要

このパラメーターを [はい] に設定すると、新しいデータが書き込まれる前に宛先インデックスのすべてのデータが削除されます。このオプションを選択するときは注意してください。

書き込みタイプ

  • 挿入: これはデフォルト値です。システムはデータを Elasticsearch インデックスに直接挿入します。

  • 更新: 同じプライマリキーを持つレコードが存在する場合、システムはレコードを更新します。それ以外の場合、システムはレコードを挿入します。

    説明

    システムがレコードを更新するとき、まず元のレコード全体を削除してから、新しいレコードを挿入します。

バッチサイズ

一度に Elasticsearch に書き込むことができるデータレコードの数。デフォルト値は 1000 です。実際のネットワーク状況と同期したいデータ量に基づいてこのパラメーターを構成し、ネットワークのオーバーヘッドを削減できます。

増分同期

書き込みタイプ

  • 挿入: これはデフォルト値です。システムはデータを Elasticsearch インデックスに直接挿入します。

  • 更新: 同じプライマリキーを持つレコードが存在する場合、システムはレコードを更新します。それ以外の場合、システムはレコードを挿入します。

    説明

    システムがレコードを更新するとき、まず元のレコード全体を削除してから、新しいレコードを挿入します。

バッチサイズ

一度に Elasticsearch に書き込むことができるデータレコードの数。デフォルト値は 1000 です。実際のネットワーク状況と同期したいデータ量に基づいてこのパラメーターを構成し、ネットワークのオーバーヘッドを削減できます。

増分条件

WHERE 句を使用して、同期するテーブルをフィルターできます。[増分条件] フィールドに WHERE 句を入力するだけで、WHERE キーワードは不要です。WHERE 句を記述する際には、${bdp.system.bizdate} (データタイムスタンプ) や ${bdp.system.cyctime} (スケジュールされた時刻) などの組み込みシステム変数を使用できます。

スケジューリング設定

定期的なスケジューリングが必要なため、スケジューリングサイクル有効日スケジューリングの一時停止など、定期的なスケジューリングタスクに関連するプロパティを定義する必要があります。現在の同期タスクのスケジューリング構成は、Data Development のノードのスケジューリング構成と同じです。パラメーターの詳細については、「ノードのスケジューリング」をご参照ください。

初回全量同期のみ

全量同期

パラメーター

説明

書き込み前に元のインデックスをクリア

  • はい: 新しいデータを書き込む前に、システムはインデックス内の元のデータをクリアします。

  • いいえ: 新しいデータを書き込む前に、システムはインデックス内の元のデータをクリアしません。

重要

このパラメーターを [はい] に設定すると、新しいデータが書き込まれる前に宛先インデックスのすべてのデータが削除されます。このオプションを選択するときは注意してください。

書き込みタイプ

  • 挿入: これはデフォルト値です。システムはデータを Elasticsearch インデックスに直接挿入します。

  • 更新: 同じプライマリキーを持つレコードが存在する場合、システムはレコードを更新します。それ以外の場合、システムはレコードを挿入します。

    説明

    システムがレコードを更新するとき、まず元のレコード全体を削除してから、新しいレコードを挿入します。

バッチサイズ

一度に Elasticsearch に書き込むことができるデータレコードの数。デフォルト値は 1000 です。実際のネットワーク状況と同期したいデータ量に基づいてこのパラメーターを構成し、ネットワークのオーバーヘッドを削減できます。

初回増分同期のみ

増分同期

パラメーター

説明

書き込みタイプ

  • 挿入: これはデフォルト値です。システムはデータを Elasticsearch インデックスに直接挿入します。

  • 更新: 同じプライマリキーを持つレコードが存在する場合、システムはレコードを更新します。それ以外の場合、システムはレコードを挿入します。

    説明

    システムがレコードを更新するとき、まず元のレコード全体を削除してから、新しいレコードを挿入します。

バッチサイズ

一度に Elasticsearch に書き込むことができるデータレコードの数。デフォルト値は 1000 です。実際のネットワーク状況と同期したいデータ量に基づいてこのパラメーターを構成し、ネットワークのオーバーヘッドを削減できます。

増分条件

WHERE 句を使用して、同期するテーブルをフィルターできます。[増分条件] フィールドに WHERE 句を入力するだけで、WHERE キーワードは不要です。WHERE 句を記述する際には、${bdp.system.bizdate} (データタイムスタンプ) や ${bdp.system.cyctime} (スケジュールされた時刻) などの組み込みシステム変数を使用できます。

周期的全量同期

全量同期

パラメーター

説明

書き込み前に元のインデックスをクリア

  • はい: 新しいデータを書き込む前に、システムはインデックス内の元のデータをクリアします。

  • いいえ: 新しいデータを書き込む前に、システムはインデックス内の元のデータをクリアしません。

重要

このパラメーターを [はい] に設定すると、新しいデータが書き込まれる前に宛先インデックスのすべてのデータが削除されます。このオプションを選択するときは注意してください。

書き込みタイプ

  • 挿入: これはデフォルト値です。システムはデータを Elasticsearch インデックスに直接挿入します。

  • 更新: 同じプライマリキーを持つレコードが存在する場合、システムはレコードを更新します。それ以外の場合、システムはレコードを挿入します。

    説明

    システムがレコードを更新するとき、まず元のレコード全体を削除してから、新しいレコードを挿入します。

バッチサイズ

一度に Elasticsearch に書き込むことができるデータレコードの数。デフォルト値は 1000 です。実際のネットワーク状況と同期したいデータ量に基づいてこのパラメーターを構成し、ネットワークのオーバーヘッドを削減できます。

スケジューリング設定

定期的なスケジューリングが必要なため、スケジューリングサイクル有効日スケジューリングの一時停止など、定期的なスケジューリングタスクに関連するプロパティを定義する必要があります。現在の同期タスクのスケジューリング構成は、Data Development のノードのスケジューリング構成と同じです。パラメーターの詳細については、「ノードのスケジューリング」をご参照ください。

周期的増分同期

増分同期

パラメーター

説明

書き込みタイプ

  • 挿入: これはデフォルト値です。システムはデータを Elasticsearch インデックスに直接挿入します。

  • 更新: 同じプライマリキーを持つレコードが存在する場合、システムはレコードを更新します。それ以外の場合、システムはレコードを挿入します。

    説明

    システムがレコードを更新するとき、まず元のレコード全体を削除してから、新しいレコードを挿入します。

バッチサイズ

一度に Elasticsearch に書き込むことができるデータレコードの数。デフォルト値は 1000 です。実際のネットワーク状況と同期したいデータ量に基づいてこのパラメーターを構成し、ネットワークのオーバーヘッドを削減できます。

増分条件

WHERE 句を使用して、同期するテーブルをフィルターできます。[増分条件] フィールドに WHERE 句を入力するだけで、WHERE キーワードは不要です。WHERE 句を記述する際には、${bdp.system.bizdate} (データタイムスタンプ) や ${bdp.system.cyctime} (スケジュールされた時刻) などの組み込みシステム変数を使用できます。

スケジューリング設定

定期的なスケジューリングが必要なため、スケジューリングサイクル有効日スケジューリングの一時停止など、定期的なスケジューリングタスクに関連するプロパティを定義する必要があります。現在の同期タスクのスケジューリング構成は、Data Development のノードのスケジューリング構成と同じです。パラメーターの詳細については、「ノードのスケジューリング」をご参照ください。

構成が完了したら、[次へ] をクリックしてランタイムリソースを構成します。

ステップ 6: ランタイムリソースを構成する

前のステップで選択した同期ソリューションに基づいて、このステップでは異なるランタイムリソースを構成する必要があります。同期タスクは、全量オフライン同期タスクと増分オフライン同期タスクの両方を作成します。タスクの実行に使用されるタスク名とリソースグループ (全量オフラインタスクリソースグループ、増分オフラインタスクリソースグループ、およびスケジューリングリソースグループ) を構成できます。また、[詳細設定][最大並列実行数][同期レート][ダーティデータ許容度][ソースがサポートする最大接続数] など、Data Integration が提供する高度なパラメーターを変更することもできます。

ステップ 7: 同期タスクを実行する

  1. すべての構成が完了したら、ページの下部にある [設定の完了] をクリックします。

  2. [データ統合] > [同期タスク] ページで、作成した同期タスクを見つけ、[操作] 列の [送信して実行] をクリックします。

  3. [タスクリスト] をクリックし、対応するタスクの [名前/ID] をクリックして、タスクの詳細な実行手順を表示します。

同期タスクの O&M 操作を実行する

同期タスクのステータスを表示する

同期タスクを作成した後、[同期タスク] ページで作成された同期タスクのリストと各タスクの基本情報を表示できます。image

[操作] 列の [実行詳細] をクリックしてタスク詳細ページに移動し、タスクの実行ステータスを表示できます。

image