當您的業務資料存放區在PolarDB-X(原DRDS升級版)中,需要進行全文檢索索引和語義分析時,可藉助Elasticsearch實現。本文介紹如何通過阿里雲Logstash,將PolarDB-X中的資料即時同步至Elasticsearch。
背景資訊
阿里雲Logstash是一款強大的資料收集和處理工具,提供了資料擷取、轉換、最佳化和輸出的能力。通過Logstash的logstash-input-jdbc外掛程式(預設已安裝,不可卸載),可批量查詢PolarDB-X中的資料並同步到Elasticsearch中。同時,logstash-input-jdbc外掛程式會定期對PolarDB-X中的資料進行輪詢查詢,並將自上次輪詢以來插入或更改的記錄同步到Elasticsearch。本方案適用於同步全量資料並接受秒級延遲的情境、批量查詢資料然後進行同步的情境。
前提條件
已建立PolarDB-X 1.0執行個體及資料庫、Elasticsearch執行個體、Logstash執行個體。建議您在同一專用網路下建立相關執行個體。
建立PolarDB-X 1.0執行個體及資料庫。具體操作,請參見建立PolarDB-X 1.0執行個體。
建立Elasticsearch執行個體。具體操作請參見建立Elasticsearch執行個體。本文建立的執行個體的版本為通用商業版6.7。
建立阿里雲Logstash執行個體。具體操作參見建立阿里雲Logstash執行個體。
說明您也可以使用公網環境的服務,前提是需要通過配置NAT Gateway實現與公網的連通。詳細資料,請參見配置NAT公網資料轉送。
使用限制
Elasticsearch中的_id欄位必須與資料庫中的id欄位相同。
該條件可以確保當您將資料庫中的記錄寫入Elasticsearch時,同步任務可在資料庫記錄與Elasticsearch文檔之間建立一個直接映射的關係。例如當您在資料庫中更新了某條記錄時,同步任務會覆蓋Elasticsearch中與更新記錄具有相同ID的文檔。
說明根據Elasticsearch內部原理,更新操作的本質是刪除舊文檔然後對新文檔進行索引,因此在Elasticsearch中覆蓋文檔的效率與更新操作的效率一樣高。
當您在資料庫中插入或者更新資料時,對應記錄必須有一個包含更新或插入時間的欄位。
Logstash每次對資料庫進行輪詢時,都會儲存其從資料庫中所讀取的最後一條記錄的更新或插入時間。在讀取資料時,Logstash僅讀取合格記錄,即該記錄的更新或插入時間需要晚於上一次輪詢中最後一條記錄的更新或插入時間。
重要logstash-input-jdbc外掛程式無法實現同步刪除。如果您要刪除Elasticsearch中的資料,需要在Elasticsearch中執行相關命令,手動刪除。
操作步驟
步驟一:環境準備
在PolarDB-X 1.0執行個體中建立表,並準備測試資料。
本文使用的建表語句如下。
CREATE table food( id int PRIMARY key AUTO_INCREMENT, name VARCHAR (32), insert_time DATETIME, update_time DATETIME );插入測試資料語句如下。
INSERT INTO food values(null,'巧克力',now(),now()); INSERT INTO food values(null,'優酪乳',now(),now()); INSERT INTO food values(null,'火腿腸',now(),now());在Elasticsearch執行個體中開啟自動建立索引功能。具體操作,請參見快速存取與配置。
在阿里雲Logstash執行個體中上傳與PolarDB-X資料庫版本相容的SQL JDBC驅動。具體操作,請參見配置擴充檔案。本文使用mysql-connector-java-5.1.35.jar驅動。
說明本文使用MySQL JDBC驅動串連PolarDB-X資料庫。您也可以使用PolarDB JDBC驅動,但對於一些高版本PolarDB-X資料庫,使用PolarDB JDBC驅動會有問題,建議您使用MySQL JDBC驅動。
在PolarDB-X資料庫白名單中加入阿里雲Logstash節點的IP地址(可在基本資料頁面擷取)。具體操作,請參見設定白名單。
步驟二:配置Logstash管道
- 進入Elasticsearch控制台的Logstash頁面。
- 進入目標執行個體。
- 在頂部功能表列處,選擇地區。
- 在Logstash執行個體中單擊目標執行個體ID。
- 單擊左側導覽列的管道管理。
單擊建立管道。
在建立管道任務頁面,輸入管道ID,並進行Config配置。
本文使用的Config配置如下。
input { jdbc { jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_driver_library => "/ssd/1/share/<Logstash執行個體ID>/logstash/current/config/custom/mysql-connector-java-5.1.35.jar" jdbc_connection_string => "jdbc:mysql://drdshbga51x6****.drds.aliyuncs.com:3306/<資料庫名稱>?useUnicode=true&characterEncoding=utf-8&useSSL=false&allowLoadLocalInfile=false&autoDeserialize=false" jdbc_user => "db_user" jdbc_password => "db_password" jdbc_paging_enabled => "true" jdbc_page_size => "50000" statement => "select * from food where update_time >= :sql_last_value" schedule => "* * * * *" record_last_run => true last_run_metadata_path => "/ssd/1/<Logstash執行個體ID>/logstash/data/last_run_metadata_update_time.txt" clean_run => false tracking_column_type => "timestamp" use_column_value => true tracking_column => "update_time" } } filter { } output { elasticsearch { hosts => "http://es-cn-n6w1o1x0w001c****.elasticsearch.aliyuncs.com:9200" user => "elastic" password => "es_password" index => "drds_test" document_id => "%{id}" } }說明代碼中
<Logstash執行個體ID>需要替換為您建立的Logstash執行個體的ID。擷取方式,請參見執行個體列表概覽。表 1. input參數說明 參數
描述
jdbc_driver_class
JDBC Class配置。
jdbc_driver_library
指定JDBC串連MySQL驅動檔案。具體操作請參見配置擴充檔案。
jdbc_connection_string
設定資料庫串連的網域名稱、連接埠及資料庫。
jdbc_user
資料庫使用者名稱。
jdbc_password
資料庫密碼。
jdbc_paging_enabled
是否啟用分頁,預設false。
jdbc_page_size
分頁大小。
statement
指定SQL語句。
schedule
指定定時操作,
"* * * * *"表示每分鐘定時同步資料。record_last_run
是否記錄上次執行結果。如果為true,則會把上次執行到的tracking_column欄位的值記錄下來,儲存到last_run_metadata_path指定的檔案中。
last_run_metadata_path
指定最後已耗用時間檔案存放的地址。目前後端開放了/ssd/1/<Logstash執行個體ID>/logstash/data/路徑來儲存檔案。指定參數路徑後,Logstash會在對應路徑下自動組建檔案,但不支援查看檔案內容。
clean_run
是否清除last_run_metadata_path的記錄,預設為false。如果為true,那麼每次都要從頭開始查詢所有的資料庫記錄。
use_column_value
是否需要記錄某個column的值。
tracking_column_type
跟蹤列的類型,預設是numeric。
tracking_column
指定跟蹤列,該列必須是遞增的,一般是表的主鍵。
表 2. output參數說明 參數
說明
hosts
Elasticsearch執行個體的訪問地址,格式為http://<執行個體的私網地址>:9200。其中執行個體的私網地址可在基本資料頁面擷取,詳細資料請參見查看執行個體的基本資料。
user
訪問Elasticsearch執行個體的使用者名稱,預設為elastic。
password
對應使用者的密碼。elastic使用者的密碼在建立執行個體時設定,如果忘記可重設。重設密碼的注意事項和操作,請參見重設執行個體訪問密碼。
index
索引名稱。
document_id
文檔ID。設定為%{id},表示與來源資料庫中的ID欄位保持一致。
重要以上配置按照測試資料配置,在實際業務中,請按照業務需求進行合理配置。input外掛程式支援的其他配置選項,請參見官方Logstash Jdbc input plugin。
如果配置中有類似
last_run_metadata_path的參數,需要阿里雲Logstash服務提供檔案路徑。目前後端開放了/ssd/1/<Logstash執行個體ID>/logstash/data/路徑供您測試使用,且該目錄下的資料不會被刪除。因此在使用時,請確保磁碟有充足的使用空間。指定參數路徑後,Logstash會在對應路徑下自動組建檔案,但不支援查看檔案內容。為了提升安全性,如果在配置管道時使用了JDBC驅動,需要在
jdbc_connection_string參數後面添加allowLoadLocalInfile=false&autoDeserialize=false,否則當您在添加Logstash設定檔時,調度系統會拋出校正失敗的提示,例如jdbc_connection_string => "jdbc:mysql://drdshbga51x6****.drds.aliyuncs.com:3306/<資料庫名稱>?allowLoadLocalInfile=false&autoDeserialize=false"。
更多Config配置,請參見Logstash設定檔說明。
單擊下一步,配置管道參數。

參數
說明
管道背景工作執行緒
並存執行管道的Filter和Output的背景工作執行緒數量。當事件出現積壓或CPU未飽和時,請考慮增大線程數,更好地使用CPU處理能力。預設值:執行個體的CPU核心數。
管道批大小
單個背景工作執行緒在嘗試執行Filter和Output前,可以從Input收集的最大事件數目。較大的管道批大小可能會帶來較大的記憶體開銷。您可以設定LS_HEAP_SIZE變數,來增大JVM堆大小,從而有效使用該值。預設值:125。
管道批延遲
建立管道事件批時,將過小的批指派給管道背景工作執行緒之前,要等候每個事件的時間長度,單位為毫秒。預設值:50ms。
隊列類型
用於事件緩衝的內部排隊模型。可選值:
MEMORY:預設值。基於記憶體的傳統隊列。
PERSISTED:基於磁碟的ACKed隊列(持久隊列)。
隊列最大位元組數
請確保該值小於您的磁碟總容量。預設值:1024 MB。
隊列檢查點寫入數
啟用持久性隊列時,在強制執行檢查點之前已寫入事件的最大數目。設定為0,表示無限制。預設值:1024。
警告配置完成後,需要儲存並部署才會生效。儲存並部署操作會觸發執行個體重啟,請在不影響業務的前提下,繼續執行以下步驟。
單擊儲存或者儲存並部署。
儲存:將管道資訊儲存在Logstash裡並觸發執行個體變更,配置不會生效。儲存後,系統會返回管道管理頁面。可在管道列表地區,單擊操作列下的立即部署,觸發執行個體重啟,使配置生效。
儲存並部署:儲存並且部署後,會觸發執行個體重啟,使配置生效。
步驟三:驗證結果
登入目標Elasticsearch執行個體的Kibana控制台。
具體操作,請參見登入Kibana控制台。
在左側導覽列,單擊Dev Tools(開發工具)。
在Console中,執行以下命令,查看同步成功的索引數量。
GET drds_test/_count { "query": {"match_all": {}} }運行成功後,結果如下。
{ "count" : 3, "_shards" : { "total" : 1, "successful" : 1, "skipped" : 0, "failed" : 0 } }在表中更新並插入資料。
UPDATE food SET name='Chocolates',update_time=now() where id = 1; INSERT INTO food values(null,'雞蛋',now(),now());在Kibana控制台,查看更新後的資料。
查詢name為Chocolates的資料。
GET drds_test/_search { "query": { "match": { "name": "Chocolates" }} }返回結果如下。

查詢所有資料。
GET drds_test/_search { "query": { "match_all": {} } }返回結果如下。
