All Products
Search
Document Center

Elasticsearch:Use Logstash to synchronize data from PolarDB-X (DRDS) to Elasticsearch

Last Updated:Mar 26, 2026

This topic describes how to use Alibaba Cloud Logstash to synchronize data from a PolarDB-X 1.0 database to Alibaba Cloud Elasticsearch for full-text search and semantic analytics.

How it works

Alibaba Cloud Logstash uses the logstash-input-jdbc plug-in to periodically poll PolarDB-X 1.0 for records that were inserted or updated since the last poll. The plug-in is installed on all Logstash clusters by default and cannot be removed.

For incremental sync to work correctly, two conditions must be met:

  1. ID field mapping: The _id field in Elasticsearch must match the id field in PolarDB-X 1.0. This mapping lets the sync pipeline overwrite the correct Elasticsearch document when a record is updated in PolarDB-X 1.0.

  2. Timestamp field: Every record in the source table must include a field that stores the insertion or update time. The plug-in tracks this field to determine which records to include in each poll—only records with a timestamp later than the previous poll are synced.

Use this approach if you need to sync full data with a latency of a few seconds, or if you need to query and sync specific records at scheduled intervals.

Prerequisites

Before you begin, ensure that you have:

  • A PolarDB-X 1.0 instance with a database created

  • An Alibaba Cloud Elasticsearch cluster (this example uses V6.7, Standard Edition)

  • An Alibaba Cloud Logstash cluster

Deploy all three resources in the same virtual private cloud (VPC) to minimize network latency and avoid additional configuration.

Note

To collect data from the Internet or transfer data to the Internet via Logstash, configure a Network Address Translation (NAT) gateway. See Configure a NAT gateway for data transmission over the Internet.

Limitations

  • Deletions are not synced: The logstash-input-jdbc plug-in cannot propagate deletions from PolarDB-X 1.0 to Elasticsearch. To handle deleted records, use one of the following strategies:

    • Soft delete (recommended): Add an is_deleted field to your table. When a record is logically deleted, set is_deleted to true. The plug-in syncs this change to Elasticsearch, where you can filter out soft-deleted documents in queries.

    • External deletion: Ensure that any system responsible for deleting records in PolarDB-X 1.0 also runs the corresponding delete command on the Elasticsearch cluster directly.

  • ID field requirement: The _id field in Elasticsearch must match the id field in PolarDB-X 1.0. This is required for the pipeline to correctly overwrite documents when records are updated. In Elasticsearch, an overwrite is equivalent to deleting the original document and indexing the updated one—this is as efficient as a native update operation.

  • Timestamp field requirement: Every inserted or updated record must include a field that captures the insertion or update time. Records without this field are not included in incremental sync.

  • Tracking column order: The column specified as tracking_column must have values in ascending order.

Synchronize data from PolarDB-X 1.0 to Elasticsearch

Step 1: Set up the source and destination

  1. Create a table in PolarDB-X 1.0 and insert test data. The following example creates a food table:

    CREATE TABLE food(
      id INT PRIMARY KEY AUTO_INCREMENT,
      name VARCHAR(32),
      insert_time DATETIME,
      update_time DATETIME
    );

    Key columns:

    • id: The primary key. Its value is mapped to the _id field in Elasticsearch. Required for the sync pipeline to correctly identify and overwrite documents.

    • update_time: The tracking timestamp. The logstash-input-jdbc plug-in uses this field to detect changes since the last poll. Required for incremental sync to work.

    • insert_time: Records when the row was first created. Not required for sync, but useful for auditing.

    Insert sample data:

    INSERT INTO food VALUES(null, 'Chocolates', NOW(), NOW());
    INSERT INTO food VALUES(null, 'Yogurt', NOW(), NOW());
    INSERT INTO food VALUES(null, 'Ham sausage', NOW(), NOW());
  2. Enable the Auto Indexing feature for your Elasticsearch cluster. For details, see Access and configure an Elasticsearch cluster.

  3. Upload a MySQL JDBC driver to the Logstash cluster. The driver version must be compatible with your PolarDB-X 1.0 instance. This example uses mysql-connector-java-5.1.35. For upload instructions, see Configure third-party libraries.

    Note

    A MySQL JDBC driver is recommended for connecting to PolarDB-X 1.0. A PolarDB JDBC driver may not work with PolarDB-X 2.0.

  4. Add the IP addresses of the Logstash cluster nodes to the IP address whitelist of your PolarDB-X 1.0 instance. Find the node IP addresses on the Basic Information page of the Logstash cluster in the Elasticsearch console. For whitelisting instructions, see Set an IP address whitelist.

Step 2: Configure a Logstash pipeline

  1. Go to the Logstash Clusters page of the Alibaba Cloud Elasticsearch console.

  2. In the top navigation bar, select the region where your cluster resides.

  3. On the Logstash Clusters page, find your cluster and click its ID.

  4. In the left-side navigation pane, click Pipelines.

  5. On the Pipelines page, click Create Pipeline.

  6. On the Create Task page, enter a pipeline ID in the Pipeline ID field, then enter the following configuration in the Config Settings field. Replace <Logstash cluster ID>, <Database name>, and the credentials with your actual values. To find your Logstash cluster ID, see Overview of the Logstash Clusters page.

    Important

    For security, always append allowLoadLocalInfile=false&autoDeserialize=false to jdbc_connection_string when using a JDBC driver. Without these parameters, the pipeline configuration check fails when you save.

    Input plug-in parameters

    Parameter Type Default Description
    jdbc_driver_class String The JDBC driver class. For MySQL, use com.mysql.jdbc.Driver.
    jdbc_driver_library String The path to the JDBC driver file on the Logstash cluster. The backend provides paths in the /ssd/1/share/<Logstash cluster ID>/logstash/current/config/custom/ format. For details, see Configure third-party libraries.
    jdbc_connection_string String The JDBC connection string, including the endpoint, port number, and database name of the PolarDB-X 1.0 instance.
    jdbc_user String The username for accessing the PolarDB-X 1.0 database.
    jdbc_password String The password for accessing the PolarDB-X 1.0 database.
    jdbc_paging_enabled Boolean false Whether to enable pagination for query results.
    jdbc_page_size Integer The number of records per page when pagination is enabled.
    statement String The SQL statement used to query records. Use :sql_last_value as a placeholder for the last tracked value. The example uses >= to include records inserted or updated at exactly the last tracked timestamp, minimizing data loss at poll boundaries.
    schedule String A cron expression that controls the polling interval. * * * * * runs the query every minute.
    record_last_run Boolean false Whether to persist the last run value. When true, the value of tracking_column from the last query is stored in the file at last_run_metadata_path.
    last_run_metadata_path String The file path where the last run value is stored. Use a path in the /ssd/1/<Logstash cluster ID>/logstash/data/ format provided by Alibaba Cloud Logstash. Logstash creates the file automatically. You cannot view the file contents. The system does not delete the data in this path—make sure your disk has enough space.
    clean_run Boolean false Whether to ignore the last run value and start from the first record in the database. Set to true to force a full resync.
    use_column_value Boolean false Whether to use the value of tracking_column as :sql_last_value. When false, :sql_last_value is the timestamp of the last query execution.
    tracking_column_type String numeric The data type of the tracking column. Valid values: numeric, timestamp.
    tracking_column String The column to track for incremental sync. Values must be in ascending order. Set this to the column that stores the insertion or update time (for example, update_time).

    Output plug-in parameters

    Parameter Type Default Description
    hosts String The internal endpoint URL of the Elasticsearch cluster, in the format http://<internal endpoint>:9200. Find the endpoint on the Basic Information page of the cluster. For details, see View the basic information of a cluster.
    user String elastic The username for accessing the Elasticsearch cluster.
    password String The password for the elastic account. If forgotten, you can reset it. For details, see Reset the access password for an Elasticsearch cluster.
    index String The name of the Elasticsearch index where synced data is stored.
    document_id String The document ID in Elasticsearch. Set to %{id} to use the id field from PolarDB-X 1.0, so each database record maps to exactly one Elasticsearch document.
    input {
      jdbc {
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        jdbc_driver_library => "/ssd/1/share/<Logstash cluster ID>/logstash/current/config/custom/mysql-connector-java-5.1.35.jar"
        jdbc_connection_string => "jdbc:mysql://drdshbga51x6****.drds.aliyuncs.com:3306/<Database name>?useUnicode=true&characterEncoding=utf-8&useSSL=false&allowLoadLocalInfile=false&autoDeserialize=false"
        jdbc_user => "db_user"
        jdbc_password => "db_password"
        jdbc_paging_enabled => "true"
        jdbc_page_size => "50000"
        statement => "select * from food where update_time >= :sql_last_value"
        schedule => "* * * * *"
        record_last_run => true
        last_run_metadata_path => "/ssd/1/<Logstash cluster ID>/logstash/data/last_run_metadata_update_time.txt"
        clean_run => false
        tracking_column_type => "timestamp"
        use_column_value => true
        tracking_column => "update_time"
      }
    }
    filter {
    
    }
    output {
      elasticsearch {
        hosts => "http://es-cn-n6w1o1x0w001c****.elasticsearch.aliyuncs.com:9200"
        user => "elastic"
        password => "es_password"
        index => "drds_test"
        document_id => "%{id}"
      }
    }

    For the full list of input plug-in parameters, see Logstash JDBC input plugin. For general pipeline configuration syntax, see Logstash configuration files.

  7. Click Next to configure pipeline parameters.

    Warning

    Saving and deploying the pipeline triggers a restart of the Logstash cluster. Verify that a restart does not affect your services before proceeding.

    Parameter Default Description
    Pipeline Workers Number of vCPUs The number of worker threads that run filter and output plug-ins in parallel. Increase this value if events are backlogged or CPU resources are underutilized.
    Pipeline Batch Size 125 The maximum number of events a worker thread collects from input plug-ins before running filter and output plug-ins. Larger values increase throughput but consume more memory. To accommodate larger batches, increase the JVM heap size using the LS_HEAP_SIZE variable.
    Pipeline Batch Delay 50 ms The wait time before assigning a small batch to a pipeline worker thread.
    Queue Type MEMORY The internal queue model for buffering events. MEMORY uses a memory-based queue. PERSISTED uses a disk-based persistent queue with ACK support.
    Queue Max Bytes 1024 MB The maximum queue size. Must be less than total disk capacity.
    Queue Checkpoint Writes 1024 The number of events written before a checkpoint is enforced (for persistent queues only). Set to 0 for no limit.

    Configure pipeline parameters

  8. Click Save or Save and Deploy.

    • Save: Stores the pipeline configuration and triggers a cluster change, but does not activate the pipeline. To activate it, go to the Pipelines page, find the pipeline, and click Deploy in the Actions column.

    • Save and Deploy: Saves and immediately restarts the Logstash cluster to activate the pipeline.

Step 3: Verify the result

  1. Log on to the Kibana console of the Elasticsearch cluster. For details, see Log on to the Kibana console.

  2. In the left-side navigation pane, click Dev Tools.

  3. On the Console tab, run the following query to check that the three sample records were synced:

    GET drds_test/_count
    {
      "query": {"match_all": {}}
    }

    Expected response:

    {
      "count": 3,
      "_shards": {
        "total": 1,
        "successful": 1,
        "skipped": 0,
        "failed": 0
      }
    }
  4. Update an existing record and insert a new one in PolarDB-X 1.0:

    UPDATE food SET name='Chocolates', update_time=NOW() WHERE id = 1;
    INSERT INTO food VALUES(null, 'Egg', NOW(), NOW());
  5. After the next polling cycle completes (within one minute), verify the updates in Kibana. Query the updated record:

    GET drds_test/_search
    {
      "query": {
        "match": {
          "name": "Chocolates"
        }
      }
    }

    If the command runs successfully, the result shown in the following figure is returned.Returned result Query all records to confirm the new row was added:

    GET drds_test/_search
    {
      "query": {
        "match_all": {}
      }
    }

    If the command runs successfully, the result shown in the following figure is returned.Returned result

FAQ

FAQ about data transfer by using Logstash