全部產品
Search
文件中心

DataWorks:MySQL整庫離線同步至Elasticsearch

更新時間:Jun 19, 2025

Data Integration目前支援將MySQL、PolarDB、SQL Server等源頭的資料整庫離線同步至Elasticsearch。本文以MySQL為源端、Elasticsearch為目標端情境為例,為您介紹如何將MySQL整個資料庫的資料離線同步至Elasticsearch。

前提條件

操作步驟

一、選擇同步任務類型

  1. 進入Data Integration頁面。

    登入DataWorks控制台,切換至目標地區後,單擊左側導覽列的Data Integration > Data Integration,在下拉框中選擇對應工作空間後單擊進入Data Integration

  2. 在左側導覽列單擊同步任務,然後在頁面頂部單擊建立同步任務,進入同步任務的建立頁面,配置如下基本資料。

    • 資料來源和去向MySQLElasticsearch

    • 新任務名稱:自訂同步任務名稱。

    • 同步類型整庫離線同步至Elasticsearch

二、網路與資源配置

  1. 網路與資源配置地區,選擇同步任務所使用的資源群組。您可以為該任務分配任務資源佔用CU數。

  2. 來來源資料源選擇已添加的MySQL資料來源,去向資料來源選擇已添加的Elasticsearch資料來源後,單擊測試連通性

    image

  3. 確保來來源資料源與去向資料來源均連通成功後,單擊下一步

三、設定同步來源和規則

  1. 選擇需要同步的表。

    此步驟中,您可以在源端庫表地區選取項目源端資料來源下需要同步的表,並單擊image表徵圖,將其移動至右側已選庫表

    image

  2. 設定表名到索引名的映射規則。

    選擇源端資料來源中需要同步的庫和表後,同步任務預設將源端資料庫、資料表寫入目標端同名schema或同名表中,如果目標端不存在該schema或表,將會自動建立。同時,您可以通過設定表名到索引名的映射規則自訂最終寫入目標端的Schema或索引名稱。

    配置說明:

    • 源庫名和目標Schema名轉換規則:所有的該轉換規則都是針對原始庫名的轉換,轉換完成之後的結果可以使用${db_name_src_transed},在目標索引名規則中作為變數來使用。

      重要
      • 如果不使用目標索引名規則,則此規則將直接影響最終實際目標Schema名。

      • 如果使用目標索引名規則,則此規則不僅影響變數${db_name_src_transed}的值,還會影響最終實際目標Schema名。

    • 源表名和目標索引名轉換規則:所有的該轉換規則都是針對原始表名的轉換,轉換完成之後的結果可以使用${db_table_name_src_transed},在目標索引名規則中作為變數來使用。

      重要
      • 如果不使用目標索引名規則,則此規則將直接影響最終實際目標索引名。

      • 如果使用目標索引名規則,則此規則將隻影響變數${db_table_name_src_transed}的值,不直接影響最終實際目標索引名。最終目標索引名由目標索引名規則決定。

    • 目標索引名規則:可以使用內建的變數命名目標索引名。

      可以使用的內建變數有:

      • ${db_table_name_src_transed}:“源表名和目標索引名轉換規則”中的轉換完成之後的索引名。

      • ${db_name_src_transed}:“源庫名和目標Schema名轉換規則”中的轉換完成之後的目標Schema名。

      • ${ds_name_src}:來源資料源名。

    例如,將源端doc_首碼的庫名替換為pre_首碼、將源端table_01table_02table_03的表同步至一個名為my_table的索引中,最終為這個索引添加尾碼_post。應該做如下配置:

    image

  3. 單擊下一步,設定目標索引。

四、設定目標索引

單擊重新整理源表和Elasticsearch 索引映射將根據您在步驟三配置的設定表名到索引名的映射規則來產生目標索引,若步驟三未配置映射規則,將預設寫入與源表同名的目標索引,若目標端不存在該同名索引,將預設建立。同時,您可以修改同步主鍵、索引建立方式。

說明

目標表名將根據您在設定表名到索引名的映射規則階段配置的表名轉換規則自動轉換。

  1. 同步主鍵列,選擇主鍵替代方案。

    • 如果來源庫有主鍵,則同步資料時會直接使用該主鍵進行去重。

    • 如果來源庫沒有主鍵,則您需要單擊image表徵圖,自訂主鍵,即使用其他非主鍵的一個或幾個欄位的聯合,代替主鍵進行同步資料時進行去重判斷。

  2. 索引建立方式列,選擇自動建索引或使用已有索引。

    • 索引建立方式選擇自動建索引時,Elasticsearch索引名列顯示自動建立的Elasticsearch索引名。您可以單擊索引名稱,查看和修改索引相關屬性。

    • 索引建立方式選擇使用已有索引時,您可以在Elasticsearch索引名列對應的下拉式清單中選擇需要使用的索引名稱。

  3. 單擊下一步,設定同步規則。

五、同步規則設定

當前支援以下整庫離線同步方案,您可以按需選擇,不同同步方案,需配置的參數存在差異。

同步方案

描述

全量一次性同步後周期性增量

先將來來源資料源的所有資料全量同步至Elasticsearch,再按照指定的過濾條件和周期任務,後續每次執行任務時僅將增量資料同步至Elasticsearch中。

只全量一次性同步

只執行一次同步操作,將來來源資料源的所有資料,全量同步至Elasticsearch中。

只增量一次性同步

只執行一次同步操作,按照指定的過濾條件,將來來源資料源的增量資料同步至Elasticsearch中。

周期性全量同步

按照配置的周期任務,每次執行任務時都將來來源資料源的所有資料,全量同步至Elasticsearch中。

周期性增量同步處理

按照指定的過濾條件和周期任務,每次執行任務時僅將增量資料同步至Elasticsearch中。

全量一次性同步後周期增量

全量同步

參數

參數說明

寫入前清空對應的原有index(索引)

  • :寫入資料前會清空索引中原有的資料。

  • :寫入資料前不會清空索引中原有的資料。

重要

配置該參數為時,則會在寫入資料前刪除目標索引中所有的資料,請謹慎選擇。

寫入類型

  • 插入:預設值,直接向Elasticsearch的index中插入資料

  • 更新:插入時如果有相同的主鍵就更新資料,如果沒有就插入資料。

    說明

    更新時是先將原來一行的資料全部刪除後再插入。

每批次寫入的條數

每次批量寫入Elasticsearch的資料條數,即攢夠一定條數的資料後,一次性寫入Elasticsearch。預設為1000。您可以根據實際網路情況及資料量大小進行合理配置,減少不必要的網路開銷。

增量同步處理

寫入類型

  • 插入:預設值,直接向Elasticsearch的index中插入資料

  • 更新:插入時如果有相同的主鍵就更新資料,如果沒有就插入資料。

    說明

    更新時是先將原來一行的資料全部刪除後再插入。

每批次寫入的條數

每次批量寫入Elasticsearch的資料條數,即攢夠一定條數的資料後,一次性寫入Elasticsearch。預設為1000。您可以根據實際網路情況及資料量大小進行合理配置,減少不必要的網路開銷。

增量條件

您可通過WHERE語句對待同步的資料表進行過濾,且只需在增量條件框中填寫WHERE子句,無需寫WHERE關鍵字。同時,在寫WHERE子句時,您可以使用系統內建變數,例如使用${bdp.system.bizdate}指代業務日期、使用${bdp.system.cyctime}指代定時時間等。

周期設定

由於需要進行周期性調度,所以需要在此定義周期性調度任務時的相關屬性,包括調度周期生效日期暫停調度等。當前同步的調度配置與資料開發中節點的調度配置一致,參數詳情可參見節點調度

只全量一次性同步

全量同步

參數

參數說明

寫入前清空對應的原有index(索引)

  • :寫入資料前會清空索引中原有的資料。

  • :寫入資料前不會清空索引中原有的資料。

重要

配置該參數為時,則會在寫入資料前刪除目標索引中所有的資料,請謹慎選擇。

寫入類型

  • 插入:預設值,直接向Elasticsearch的index中插入資料

  • 更新:插入時如果有相同的主鍵就更新資料,如果沒有就插入資料。

    說明

    更新時是先將原來一行的資料全部刪除後再插入。

每批次寫入的條數

每次批量寫入Elasticsearch的資料條數,即攢夠一定條數的資料後,一次性寫入Elasticsearch。預設為1000。您可以根據實際網路情況及資料量大小進行合理配置,減少不必要的網路開銷。

只增量一次性同步

增量同步處理

參數

參數說明

寫入類型

  • 插入:預設值,直接向Elasticsearch的index中插入資料

  • 更新:插入時如果有相同的主鍵就更新資料,如果沒有就插入資料。

    說明

    更新時是先將原來一行的資料全部刪除後再插入。

每批次寫入的條數

每次批量寫入Elasticsearch的資料條數,即攢夠一定條數的資料後,一次性寫入Elasticsearch。預設為1000。您可以根據實際網路情況及資料量大小進行合理配置,減少不必要的網路開銷。

增量條件

您可通過WHERE語句對待同步的資料表進行過濾,且只需在增量條件框中填寫WHERE子句,無需寫WHERE關鍵字。同時,在寫WHERE子句時,您可以使用系統內建變數,例如使用${bdp.system.bizdate}指代業務日期、使用${bdp.system.cyctime}指代定時時間等。

周期性全量同步

全量同步

參數

參數說明

寫入前清空對應的原有index(索引)

  • :寫入資料前會清空索引中原有的資料。

  • :寫入資料前不會清空索引中原有的資料。

重要

配置該參數為時,則會在寫入資料前刪除目標索引中所有的資料,請謹慎選擇。

寫入類型

  • 插入:預設值,直接向Elasticsearch的index中插入資料

  • 更新:插入時如果有相同的主鍵就更新資料,如果沒有就插入資料。

    說明

    更新時是先將原來一行的資料全部刪除後再插入。

每批次寫入的條數

每次批量寫入Elasticsearch的資料條數,即攢夠一定條數的資料後,一次性寫入Elasticsearch。預設為1000。您可以根據實際網路情況及資料量大小進行合理配置,減少不必要的網路開銷。

周期設定

由於需要進行周期性調度,所以需要在此定義周期性調度任務時的相關屬性,包括調度周期生效日期暫停調度等。當前同步的調度配置與資料開發中節點的調度配置一致,參數詳情可參見節點調度

周期性增量同步處理

增量同步處理

參數

參數說明

寫入類型

  • 插入:預設值,直接向Elasticsearch的index中插入資料

  • 更新:插入時如果有相同的主鍵就更新資料,如果沒有就插入資料。

    說明

    更新時是先將原來一行的資料全部刪除後再插入。

每批次寫入的條數

每次批量寫入Elasticsearch的資料條數,即攢夠一定條數的資料後,一次性寫入Elasticsearch。預設為1000。您可以根據實際網路情況及資料量大小進行合理配置,減少不必要的網路開銷。

增量條件

您可通過WHERE語句對待同步的資料表進行過濾,且只需在增量條件框中填寫WHERE子句,無需寫WHERE關鍵字。同時,在寫WHERE子句時,您可以使用系統內建變數,例如使用${bdp.system.bizdate}指代業務日期、使用${bdp.system.cyctime}指代定時時間等。

周期設定

由於需要進行周期性調度,所以需要在此定義周期性調度任務時的相關屬性,包括調度周期生效日期暫停調度等。當前同步的調度配置與資料開發中節點的調度配置一致,參數詳情可參見節點調度

配置完成後,單擊下一步,設定運行資源。

六、設定運行資源

根據上一步選擇的不同同步方案,此步驟需設定的運行資源存在差異。同步任務將分別建立全量離線同步任務和增量離線同步任務,您可以配置任務名稱及任務執行所使用的資源群組(全量離線任務資源群組、增量離線任務資源群組、調度資源群組),同時,您可通過進階配置修改Data Integration提供的任務期望最大並發數同步速率容忍髒資料來源端讀取支援最大串連數等進階參數。

七、執行同步任務

  1. 完成所有配置後,單擊頁面底部的完成配置

  2. Data Integration > 同步任務介面,找到已建立的同步任務,單擊操作列的提交執行

  3. 單擊工作清單中對應任務的名稱/ID,查看任務的詳細執行過程。

同步任務營運

查看任務運行狀態

建立完成同步任務後,您可以在同步任務頁面查看當前已建立的同步工作清單及各個同步任務的基本資料。image

您可以在操作列單擊執行詳情,進入任務詳情頁,查看任務執行情況。

image