This topic describes how to use Alibaba Cloud Logstash to synchronize data from a PolarDB-X 1.0 database to Alibaba Cloud Elasticsearch for full-text search and semantic analytics.
How it works
Alibaba Cloud Logstash uses the logstash-input-jdbc plug-in to periodically poll PolarDB-X 1.0 for records that were inserted or updated since the last poll. The plug-in is installed on all Logstash clusters by default and cannot be removed.
For incremental sync to work correctly, two conditions must be met:
-
ID field mapping: The
_idfield in Elasticsearch must match theidfield in PolarDB-X 1.0. This mapping lets the sync pipeline overwrite the correct Elasticsearch document when a record is updated in PolarDB-X 1.0. -
Timestamp field: Every record in the source table must include a field that stores the insertion or update time. The plug-in tracks this field to determine which records to include in each poll—only records with a timestamp later than the previous poll are synced.
Use this approach if you need to sync full data with a latency of a few seconds, or if you need to query and sync specific records at scheduled intervals.
Prerequisites
Before you begin, ensure that you have:
-
A PolarDB-X 1.0 instance with a database created
-
An Alibaba Cloud Elasticsearch cluster (this example uses V6.7, Standard Edition)
-
An Alibaba Cloud Logstash cluster
Deploy all three resources in the same virtual private cloud (VPC) to minimize network latency and avoid additional configuration.
-
To create a PolarDB-X 1.0 instance and database, see Create a PolarDB-X 1.0 instance.
-
To create an Elasticsearch cluster, see Create an Alibaba Cloud Elasticsearch cluster.
-
To create a Logstash cluster, see Create an Alibaba Cloud Logstash cluster.
To collect data from the Internet or transfer data to the Internet via Logstash, configure a Network Address Translation (NAT) gateway. See Configure a NAT gateway for data transmission over the Internet.
Limitations
-
Deletions are not synced: The logstash-input-jdbc plug-in cannot propagate deletions from PolarDB-X 1.0 to Elasticsearch. To handle deleted records, use one of the following strategies:
-
Soft delete (recommended): Add an
is_deletedfield to your table. When a record is logically deleted, setis_deletedtotrue. The plug-in syncs this change to Elasticsearch, where you can filter out soft-deleted documents in queries. -
External deletion: Ensure that any system responsible for deleting records in PolarDB-X 1.0 also runs the corresponding delete command on the Elasticsearch cluster directly.
-
-
ID field requirement: The
_idfield in Elasticsearch must match theidfield in PolarDB-X 1.0. This is required for the pipeline to correctly overwrite documents when records are updated. In Elasticsearch, an overwrite is equivalent to deleting the original document and indexing the updated one—this is as efficient as a native update operation. -
Timestamp field requirement: Every inserted or updated record must include a field that captures the insertion or update time. Records without this field are not included in incremental sync.
-
Tracking column order: The column specified as
tracking_columnmust have values in ascending order.
Synchronize data from PolarDB-X 1.0 to Elasticsearch
Step 1: Set up the source and destination
-
Create a table in PolarDB-X 1.0 and insert test data. The following example creates a
foodtable:CREATE TABLE food( id INT PRIMARY KEY AUTO_INCREMENT, name VARCHAR(32), insert_time DATETIME, update_time DATETIME );Key columns:
-
id: The primary key. Its value is mapped to the_idfield in Elasticsearch. Required for the sync pipeline to correctly identify and overwrite documents. -
update_time: The tracking timestamp. The logstash-input-jdbc plug-in uses this field to detect changes since the last poll. Required for incremental sync to work. -
insert_time: Records when the row was first created. Not required for sync, but useful for auditing.
Insert sample data:
INSERT INTO food VALUES(null, 'Chocolates', NOW(), NOW()); INSERT INTO food VALUES(null, 'Yogurt', NOW(), NOW()); INSERT INTO food VALUES(null, 'Ham sausage', NOW(), NOW()); -
-
Enable the Auto Indexing feature for your Elasticsearch cluster. For details, see Access and configure an Elasticsearch cluster.
-
Upload a MySQL JDBC driver to the Logstash cluster. The driver version must be compatible with your PolarDB-X 1.0 instance. This example uses
mysql-connector-java-5.1.35. For upload instructions, see Configure third-party libraries.NoteA MySQL JDBC driver is recommended for connecting to PolarDB-X 1.0. A PolarDB JDBC driver may not work with PolarDB-X 2.0.
-
Add the IP addresses of the Logstash cluster nodes to the IP address whitelist of your PolarDB-X 1.0 instance. Find the node IP addresses on the Basic Information page of the Logstash cluster in the Elasticsearch console. For whitelisting instructions, see Set an IP address whitelist.
Step 2: Configure a Logstash pipeline
-
Go to the Logstash Clusters page of the Alibaba Cloud Elasticsearch console.
-
In the top navigation bar, select the region where your cluster resides.
-
On the Logstash Clusters page, find your cluster and click its ID.
-
In the left-side navigation pane, click Pipelines.
-
On the Pipelines page, click Create Pipeline.
-
On the Create Task page, enter a pipeline ID in the Pipeline ID field, then enter the following configuration in the Config Settings field. Replace
<Logstash cluster ID>,<Database name>, and the credentials with your actual values. To find your Logstash cluster ID, see Overview of the Logstash Clusters page.ImportantFor security, always append
allowLoadLocalInfile=false&autoDeserialize=falsetojdbc_connection_stringwhen using a JDBC driver. Without these parameters, the pipeline configuration check fails when you save.Input plug-in parameters
Parameter Type Default Description jdbc_driver_classString — The JDBC driver class. For MySQL, use com.mysql.jdbc.Driver.jdbc_driver_libraryString — The path to the JDBC driver file on the Logstash cluster. The backend provides paths in the /ssd/1/share/<Logstash cluster ID>/logstash/current/config/custom/format. For details, see Configure third-party libraries.jdbc_connection_stringString — The JDBC connection string, including the endpoint, port number, and database name of the PolarDB-X 1.0 instance. jdbc_userString — The username for accessing the PolarDB-X 1.0 database. jdbc_passwordString — The password for accessing the PolarDB-X 1.0 database. jdbc_paging_enabledBoolean falseWhether to enable pagination for query results. jdbc_page_sizeInteger — The number of records per page when pagination is enabled. statementString — The SQL statement used to query records. Use :sql_last_valueas a placeholder for the last tracked value. The example uses>=to include records inserted or updated at exactly the last tracked timestamp, minimizing data loss at poll boundaries.scheduleString — A cron expression that controls the polling interval. * * * * *runs the query every minute.record_last_runBoolean falseWhether to persist the last run value. When true, the value oftracking_columnfrom the last query is stored in the file atlast_run_metadata_path.last_run_metadata_pathString — The file path where the last run value is stored. Use a path in the /ssd/1/<Logstash cluster ID>/logstash/data/format provided by Alibaba Cloud Logstash. Logstash creates the file automatically. You cannot view the file contents. The system does not delete the data in this path—make sure your disk has enough space.clean_runBoolean falseWhether to ignore the last run value and start from the first record in the database. Set to trueto force a full resync.use_column_valueBoolean falseWhether to use the value of tracking_columnas:sql_last_value. Whenfalse,:sql_last_valueis the timestamp of the last query execution.tracking_column_typeString numericThe data type of the tracking column. Valid values: numeric,timestamp.tracking_columnString — The column to track for incremental sync. Values must be in ascending order. Set this to the column that stores the insertion or update time (for example, update_time).Output plug-in parameters
Parameter Type Default Description hostsString — The internal endpoint URL of the Elasticsearch cluster, in the format http://<internal endpoint>:9200. Find the endpoint on the Basic Information page of the cluster. For details, see View the basic information of a cluster.userString elasticThe username for accessing the Elasticsearch cluster. passwordString — The password for the elasticaccount. If forgotten, you can reset it. For details, see Reset the access password for an Elasticsearch cluster.indexString — The name of the Elasticsearch index where synced data is stored. document_idString — The document ID in Elasticsearch. Set to %{id}to use theidfield from PolarDB-X 1.0, so each database record maps to exactly one Elasticsearch document.input { jdbc { jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_driver_library => "/ssd/1/share/<Logstash cluster ID>/logstash/current/config/custom/mysql-connector-java-5.1.35.jar" jdbc_connection_string => "jdbc:mysql://drdshbga51x6****.drds.aliyuncs.com:3306/<Database name>?useUnicode=true&characterEncoding=utf-8&useSSL=false&allowLoadLocalInfile=false&autoDeserialize=false" jdbc_user => "db_user" jdbc_password => "db_password" jdbc_paging_enabled => "true" jdbc_page_size => "50000" statement => "select * from food where update_time >= :sql_last_value" schedule => "* * * * *" record_last_run => true last_run_metadata_path => "/ssd/1/<Logstash cluster ID>/logstash/data/last_run_metadata_update_time.txt" clean_run => false tracking_column_type => "timestamp" use_column_value => true tracking_column => "update_time" } } filter { } output { elasticsearch { hosts => "http://es-cn-n6w1o1x0w001c****.elasticsearch.aliyuncs.com:9200" user => "elastic" password => "es_password" index => "drds_test" document_id => "%{id}" } }For the full list of input plug-in parameters, see Logstash JDBC input plugin. For general pipeline configuration syntax, see Logstash configuration files.
-
Click Next to configure pipeline parameters.
WarningSaving and deploying the pipeline triggers a restart of the Logstash cluster. Verify that a restart does not affect your services before proceeding.
Parameter Default Description Pipeline Workers Number of vCPUs The number of worker threads that run filter and output plug-ins in parallel. Increase this value if events are backlogged or CPU resources are underutilized. Pipeline Batch Size 125 The maximum number of events a worker thread collects from input plug-ins before running filter and output plug-ins. Larger values increase throughput but consume more memory. To accommodate larger batches, increase the JVM heap size using the LS_HEAP_SIZEvariable.Pipeline Batch Delay 50 ms The wait time before assigning a small batch to a pipeline worker thread. Queue Type MEMORY The internal queue model for buffering events. MEMORY uses a memory-based queue. PERSISTED uses a disk-based persistent queue with ACK support. Queue Max Bytes 1024 MB The maximum queue size. Must be less than total disk capacity. Queue Checkpoint Writes 1024 The number of events written before a checkpoint is enforced (for persistent queues only). Set to 0for no limit.
-
Click Save or Save and Deploy.
-
Save: Stores the pipeline configuration and triggers a cluster change, but does not activate the pipeline. To activate it, go to the Pipelines page, find the pipeline, and click Deploy in the Actions column.
-
Save and Deploy: Saves and immediately restarts the Logstash cluster to activate the pipeline.
-
Step 3: Verify the result
-
Log on to the Kibana console of the Elasticsearch cluster. For details, see Log on to the Kibana console.
-
In the left-side navigation pane, click Dev Tools.
-
On the Console tab, run the following query to check that the three sample records were synced:
GET drds_test/_count { "query": {"match_all": {}} }Expected response:
{ "count": 3, "_shards": { "total": 1, "successful": 1, "skipped": 0, "failed": 0 } } -
Update an existing record and insert a new one in PolarDB-X 1.0:
UPDATE food SET name='Chocolates', update_time=NOW() WHERE id = 1; INSERT INTO food VALUES(null, 'Egg', NOW(), NOW()); -
After the next polling cycle completes (within one minute), verify the updates in Kibana. Query the updated record:
GET drds_test/_search { "query": { "match": { "name": "Chocolates" } } }If the command runs successfully, the result shown in the following figure is returned.
Query all records to confirm the new row was added:GET drds_test/_search { "query": { "match_all": {} } }If the command runs successfully, the result shown in the following figure is returned.
