All Products
Search
Document Center

Elasticsearch:Use Real-time Computing to process and sync data to ES

Last Updated:Dec 09, 2025

To build a log retrieval system, you can use Realtime Compute for Apache Flink to process log data and write it to Elasticsearch for searching. This topic uses Alibaba Cloud Simple Log Service (SLS) as an example to describe this process.

Prerequisites

You have completed the following tasks:

Background

Realtime Compute for Apache Flink is a Flink-based service that is officially supported by Alibaba Cloud. It supports various source and sink systems, such as Kafka and Elasticsearch. The combination of Realtime Compute for Apache Flink and Elasticsearch meets the requirements of typical log retrieval scenarios.

Logs from systems such as Kafka or Simple Log Service are processed by Flink using simple or complex computations, and then written to Elasticsearch for searching. By combining the powerful computing capabilities of Flink with the powerful search capabilities of Elasticsearch, you can implement real-time data transformation and querying for your business. This helps you transition to real-time services.

Realtime Compute for Apache Flink provides a simple way to connect to Elasticsearch. For example, if your business logs or data are written to Simple Log Service and require processing before being written to Elasticsearch for searching, you can use the pipeline shown in the following figure.Flink+ES data link

Procedure

  1. Log on to the Realtime Compute for Apache Flink console.

  2. Create a Realtime Compute for Apache Flink job.

    For more information, see the Job Development > Development section of the Blink SQL Development Guide in the Alibaba Cloud Blink Exclusive Mode (Phased-Out for Alibaba Cloud) document.

  3. Write the Flink SQL.

    1. Create a source table for Simple Log Service.

      create table sls_stream(
        a int,
        b int,
        c VARCHAR
      )
      WITH (
        type ='sls',  
        endPoint ='<yourEndpoint>',
        accessId ='<yourAccessId>',
        accessKey ='<yourAccessKey>',
        startTime = '<yourStartTime>',
        project ='<yourProjectName>',
        logStore ='<yourLogStoreName>',
        consumerGroup ='<yourConsumerGroupName>'
      );

      The following table describes the parameters in the WITH clause.

      Variable

      Description

      endPoint

      The public endpoint of Alibaba Cloud Simple Log Service, which is the URL to access the corresponding project and its log data. For more information, see Endpoints.

      For example, the endpoint for Simple Log Service in the China (Hangzhou) region is http://cn-hangzhou.log.aliyuncs.com. The endpoint must start with http://.

      accessId

      Your AccessKey ID.

      accessKey

      Your AccessKey secret.

      startTime

      The point in time to start consuming logs. When you run the Flink job, the selected time must be later than the time set here.

      project

      The name of the Simple Log Service project.

      logStore

      The name of the Logstore in the project.

      consumerGroup

      The name of the consumer group for Simple Log Service.

    2. Create an Elasticsearch sink table.

      Important
      • Realtime Compute for Apache Flink version 3.2.2 and later supports Elasticsearch sink tables. When you create a Flink job, select a supported version.

      • The Elasticsearch sink table is implemented using REST APIs and is compatible with all Elasticsearch versions.

      CREATE TABLE es_stream_sink(
        a int,
        cnt BIGINT,
        PRIMARY KEY(a)
      )
      WITH(
        type ='elasticsearch-7',
        endPoint = 'http://<instanceid>.public.elasticsearch.aliyuncs.com:<port>',
        accessId = '<yourAccessId>',
        accessKey = '<yourAccessSecret>',
        index = '<yourIndex>',
        typeName = '<yourTypeName>'
      );

      The following table describes the parameters in the WITH clause.

      Parameter

      Description

      endPoint

      The public endpoint of your Alibaba Cloud Elasticsearch instance, in the format http://<instanceid>.public.elasticsearch.aliyuncs.com:9200. You can obtain this from the basic information page of the instance. For more information, see View the basic information of an instance.

      accessId

      The username to access the Alibaba Cloud Elasticsearch instance. The default is elastic.

      accessKey

      The password for the user. The password for the elastic user is set when you create the instance. If you forget the password, you can reset it. For more information about the precautions and steps for resetting the password, see Reset the access password of an instance.

      index

      The index name. If you have not created an index, create one first. For more information, see Beginner's guide: From instance creation to data retrieval. You can also enable automatic index creation. For more information, see Configure YML parameters.

      typeName

      The index type. For Elasticsearch instances of V7.0 or later, this must be _doc.

      Note
      • Elasticsearch supports updating documents using a primary key. Only one field can be specified as the PRIMARY KEY. After you specify the PRIMARY KEY, the value of the PRIMARY KEY field is used as the document ID. If a PRIMARY KEY is not specified, the system randomly generates a document ID. For more information, see Index API.

      • Elasticsearch supports multiple update modes. You can specify the mode using the updateMode parameter in the WITH clause:

        • If updateMode=full, new documents completely overwrite existing documents.

        • If updateMode=inc, Elasticsearch updates the corresponding fields based on the input field values.

      • All updates in Elasticsearch default to UPSERT semantics, which means INSERT or UPDATE.

    3. Write the business logic to process and synchronize the data.

      INSERT INTO es_stream_sink
      SELECT 
        a,
        count(*) as cnt
      FROM sls_stream GROUP BY a
  4. Publish and start the job.

    After you publish and start the job, data from Simple Log Service is aggregated and then written to Alibaba Cloud Elasticsearch.

More information

You can use Realtime Compute for Apache Flink with Elasticsearch to quickly create a real-time search pipeline. If you have more complex requirements for writing data to Elasticsearch, you can use the custom sink feature of Realtime Compute for Apache Flink.