All Products
Search
Document Center

Elasticsearch:Use Logstash to synchronize data from PolarDB-X 1.0 to Elasticsearch in real time

Last Updated:May 22, 2026

If your business data is stored in PolarDB-X 1.0 and you want to perform full-text searches and semantic analytics on the data, you can use Alibaba Cloud Elasticsearch and Alibaba Cloud Logstash. This topic describes how to use Alibaba Cloud Logstash to synchronize data from PolarDB-X 1.0 to Alibaba Cloud Elasticsearch in real time.

Background information

Alibaba Cloud Logstash is a powerful tool for data collection and processing. It can ingest, transform, optimize, and output data. The logstash-input-jdbc plugin, which is installed by default and is non-removable, lets you query data from PolarDB-X in batches and synchronize it to Elasticsearch. The logstash-input-jdbc plugin also performs periodic polling queries on PolarDB-X to synchronize records that have been inserted or changed since the last poll. This solution is ideal for both initial full synchronization and subsequent incremental updates, which typically have a latency of a few seconds.

Prerequisites

A PolarDB-X 1.0 instance, an Alibaba Cloud Elasticsearch cluster, and an Alibaba Cloud Logstash cluster are created. In addition, a database is created in the PolarDB-X 1.0 instance. We recommend that you create the PolarDB-X 1.0 instance, Elasticsearch cluster, and Logstash cluster in the same virtual private cloud (VPC).

Limits

  • The values of the _id field in the Elasticsearch cluster must be the same as the values of the id field in the PolarDB-X 1.0 database.

    This condition ensures that the data synchronization task can establish a mapping between data records in the PolarDB-X 1.0 database and documents in the Elasticsearch cluster. If you update a data record in the PolarDB-X 1.0 database, the data synchronization task uses the updated data record to overwrite the document that has the same ID in the Elasticsearch cluster.

    Note

    In essence, an update operation in Elasticsearch deletes the original document and indexes the new document. Therefore, the overwrite operation is as efficient as an update operation performed by the data synchronization task.

  • If you insert a data record into or update a data record in the PolarDB-X 1.0 database, the data record must contain a field that indicates the time when the data record is inserted or updated.

    Each time the logstash-input-jdbc plug-in performs a round robin, the plug-in records the time when the last data record in the round robin is inserted into or updated in the PolarDB-X 1.0 database. Logstash synchronizes only data records that meet the following requirements from the PolarDB-X 1.0 database: The time when the data records are inserted into or updated in the PolarDB-X 1.0 database is later than the time when the last data record in the previous round robin is inserted into or updated in the PolarDB-X 1.0 database.

    Important

    If you delete data records in the PolarDB-X 1.0 database, the logstash-input-jdbc plug-in cannot delete the documents that have the same IDs from the Elasticsearch cluster. To delete the documents from the Elasticsearch cluster, you must run the related command on the Elasticsearch cluster.

Procedure

Step 1: Make preparations

  1. Create a table in your PolarDB-X 1.0 instance and populate it with test data.

    In this example, the following statement is used to create a table:

    CREATE table food(
      id int PRIMARY key AUTO_INCREMENT,
      name VARCHAR (32),
      insert_time DATETIME,
      update_time DATETIME
    );

    The following statements are used to insert data into the table:

    INSERT INTO food values(null,'chocolate',now(),now());
    INSERT INTO food values(null,'yogurt',now(),now());
    INSERT INTO food values(null,'ham sausage',now(),now());
  2. Enable the Auto Indexing feature for the Elasticsearch cluster. For more information, see Access and configure an Elasticsearch cluster.

  3. In the Logstash cluster, upload an SQL JDBC driver whose version is compatible with the version of the PolarDB-X 1.0 database. For more information, see Configure third-party libraries. In this example, the mysql-connector-java-5.1.35 driver is used.

    Note

    In this example, a MySQL JDBC driver is used to connect to the PolarDB-X 1.0 database. You can also use a PolarDB JDBC driver to connect to the PolarDB-X 1.0 database. However, for a PolarDB-X 2.0 database, a PolarDB JDBC driver may not work. We recommend that you use a MySQL JDBC driver.

  4. Obtain the IP addresses of the nodes in the Logstash cluster on the Basic Information page of the Logstash cluster in the Elasticsearch console. Then, add the IP addresses to the IP address whitelist of the PolarDB-X 1.0 instance. For more information, see Set an IP address whitelist.

Step 2: Configure a Logstash pipeline

  1. Go to the Logstash Clusters page.

  2. Navigate to the target cluster.

    1. In the top navigation bar, select the region where the cluster resides.

    2. On the Logstash Clusters page, find the cluster and click its ID.

  3. In the left-side navigation pane, click Pipelines.
  4. Click Create Pipeline.

  5. On the Create page, enter a Pipeline ID and configure the pipeline in the Config text box.

    In this example, the following configurations are entered in the Config Settings field:

    input {
      jdbc {
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        jdbc_driver_library => "/ssd/1/share/<Logstash cluster ID>/logstash/current/config/custom/mysql-connector-java-5.1.35.jar"
        jdbc_connection_string => "jdbc:mysql://drdshbga51x6****.drds.aliyuncs.com:3306/<Your database name>?useUnicode=true&characterEncoding=utf-8&useSSL=false&allowLoadLocalInfile=false&autoDeserialize=false"
        jdbc_user => "db_user"
        jdbc_password => "db_password"
        jdbc_paging_enabled => "true"
        jdbc_page_size => "50000"
        statement => "select * from food where update_time >= :sql_last_value"
        schedule => "* * * * *"
        record_last_run => true
        last_run_metadata_path => "/ssd/1/<Logstash cluster ID>/logstash/data/last_run_metadata_update_time.txt"
        clean_run => false
        tracking_column_type => "timestamp"
        use_column_value => true
        tracking_column => "update_time"
      }
    }
    filter {
    
    }
    output {
      elasticsearch {
        hosts => "http://es-cn-n6w1o1x0w001c****.elasticsearch.aliyuncs.com:9200"
        user => "elastic"
        password => "es_password"
        index => "drds_test"
        document_id => "%{id}"
      }
    }
    Note

    In the code, replace <Logstash cluster ID> with the ID of your Logstash cluster. For more information, see View cluster list overview.

    Table 1. Parameters in the input part

    Parameter

    Description

    jdbc_driver_class

    The class of the JDBC driver.

    jdbc_driver_library

    The path of the JDBC driver file. For more information, see Configure third-party libraries.

    jdbc_connection_string

    The JDBC connection string that is used to connect to the PolarDB-X 1.0 database. The JDBC connection string contains the endpoint, port number, and name of the PolarDB-X 1.0 database.

    jdbc_user

    The username that is used to access the PolarDB-X 1.0 database.

    jdbc_password

    The password that is used to access the PolarDB-X 1.0 database.

    jdbc_paging_enabled

    Specifies whether to enable paging. Default value: false.

    jdbc_page_size

    The number of entries per page.

    statement

    The SQL statement.

    schedule

    The schedule for the synchronization task. "* * * * *" means that data is synchronized every minute.

    record_last_run

    Specifies whether to record the last execution result. If this parameter is set to true, the value of tracking_column in the last execution result is stored in the file in the path specified by using the last_run_metadata_path parameter.

    last_run_metadata_path

    The path of the file that contains the last execution time. A file path is provided at the backend. The path is in the /ssd/1/<Logstash cluster ID>/logstash/data/ format. After you specify a path, Logstash automatically generates a file in the path, but you cannot view the data in the file.

    clean_run

    Specifies whether to clear the path that is specified by the last_run_metadata_path parameter. Default value: false. If this parameter is set to true, each query starts from the first entry in the database.

    use_column_value

    Specifies whether to record the values of a specific column.

    tracking_column_type

    The type of the column whose values you want to track. Default value: numeric.

    tracking_column

    The column whose values you want to track. The values must be sorted in ascending order. In most cases, this column is the primary key of the table.

    Table 2. Parameters in the output part

    Parameter

    Description

    hosts

    The private endpoint of the Alibaba Cloud Elasticsearch cluster, in the format http://<Private endpoint of the cluster>:9200. You can find the private endpoint on the Basic information page. For more information, see View the basic information of a cluster.

    user

    The username that is used to access the Elasticsearch cluster. The default username is elastic.

    password

    The password of the elastic account. The password of the elastic account is specified when you create the Elasticsearch cluster. If you forget the password, you can reset it. For more information about the procedure and precautions for resetting the password, see Reset the access password for an Elasticsearch cluster.

    index

    The name of the index in the Elasticsearch cluster.

    document_id

    The IDs of documents in the Elasticsearch cluster. Set this parameter to %{id}, which indicates that the IDs of documents are the same as the IDs of data records in the PolarDB-X 1.0 database.

    Important
    • The preceding configurations are based on test data. You can configure the pipeline based on your business requirements. For more information about other parameters supported by the input plug-in, see Logstash Jdbc input plugin.

    • If your configuration includes parameters such as last_run_metadata_path, the file path must be provided by the Alibaba Cloud Logstash service. The system provides the /ssd/1/<Logstash cluster ID>/logstash/data/ path for testing purposes. Data in this directory is not deleted. Ensure that you have sufficient disk space. Logstash automatically creates a file at the specified path, but you cannot view its content.

    • For enhanced security, if you use a JDBC driver in your pipeline configuration, you must append allowLoadLocalInfile=false&autoDeserialize=false to the jdbc_connection_string parameter. Otherwise, the scheduling system reports a validation failure when you add the Logstash configuration file. For example: jdbc_connection_string => "jdbc:mysql://drdshbga51x6****.drds.aliyuncs.com:3306/<Your database name>?allowLoadLocalInfile=false&autoDeserialize=false".

    For more information about how to configure parameters in the Config Settings field, see Logstash configuration files.

  6. Click Next step and configure the pipeline parameters.

    Parameter

    Description

    Pipeline Workers

    The number of worker threads to run the filter and output stages of the pipeline in parallel. If you have an event backlog or your CPU is underutilized, consider increasing this value to improve performance. Default value: the number of CPU cores in the instance.

    Pipeline Batch Size

    The maximum number of events a worker thread collects from inputs before executing filters and outputs. A larger batch size may lead to higher memory overhead. To use a larger batch size effectively, you may need to increase the JVM heap size by setting the LS_HEAP_SIZE variable. Default value: 125.

    Pipeline Batch Delay

    The duration in milliseconds to wait for each event before dispatching a small batch to a pipeline worker thread. Default value: 50 ms.

    Queue Type

    The internal queuing model for event buffering. Valid values:

    • MEMORY: The default value. This specifies a traditional in-memory queue.

    • PERSISTED: A disk-based persistent queue.

    Queue Max Bytes

    The maximum amount of data that the queue can store, in MB. The value must be an integer from 1 to 253-1. Default value: 1024.

    Note

    Make sure that this value is less than your total disk capacity.

    Queue Checkpoint Writes

    When the persistent queue is enabled, this is the maximum number of events that can be written before a checkpoint is forced. A value of 0 indicates no limit. Default value: 1024.

    Warning

    After configuration, you must save and deploy the settings for them to take effect. This action triggers an instance restart. Proceed only if this restart will not impact your business.

  7. Click Save or Save and Deploy.

    • Save: Saves the pipeline configuration but does not apply it. After saving, you are returned to the Pipelines page. In the Pipelines section, you can click Deploy Now in the Actions column to restart the instance and apply the configuration.

    • Save and Deploy: Saves and deploys the configuration, restarting the instance to apply the changes.

Step 3: Verify the result

  1. Log on to the Kibana console of the Elasticsearch cluster.

    For more information, see Log on to the Kibana console.

  2. In the left-side navigation pane, click Dev Tools.

  3. In the Console, run the following command to view the number of synchronized documents.

    GET drds_test/_count
    {
      "query": {"match_all": {}}
    }

    If the command is successfully run, the following result is returned:

    {
      "count" : 3,
      "_shards" : {
        "total" : 1,
        "successful" : 1,
        "skipped" : 0,
        "failed" : 0
      }
    }
  4. Update data in the table and insert data into the table.

    UPDATE food SET name='Chocolates',update_time=now() where id = 1;
    INSERT INTO food values(null,'egg',now(),now());
  5. View the updated and inserted data.

    • Query the data record in which the value of name is Chocolates.

      GET drds_test/_search
      {
        "query": {
          "match": {
            "name": "Chocolates"
         }}
      }

      If the command is successfully run, the result shown in the following figure is returned.Returned result

    • Query all data.

      GET drds_test/_search
      {
        "query": {
          "match_all": {}
        }
      }

      If the command is successfully run, the result shown in the following figure is returned.Returned result

FAQ

FAQ about data transfer by using Logstash