All Products
Search
Document Center

Elasticsearch:Use Real-time Computing to process and sync data to ES

Last Updated:Mar 27, 2026

This tutorial shows you how to build a log retrieval pipeline that reads data from Alibaba Cloud Simple Log Service (SLS), processes it with Realtime Compute for Apache Flink using Flink SQL, and writes the results to Elasticsearch for full-text search.

Prerequisites

Before you begin, ensure that you have:

How it works

Log data from SLS streams into Realtime Compute for Apache Flink. Flink applies your SQL transformations, then writes the processed records to Elasticsearch over its REST API. Elasticsearch indexes each record as a document, making it immediately searchable.

Flink+ES data link

Note

The Elasticsearch sink connector is implemented using REST APIs and is compatible with all Elasticsearch versions. Realtime Compute for Apache Flink version 3.2.2 and later supports Elasticsearch sink tables.

Set up the Flink SQL pipeline

Step 1: Create a Flink job

  1. Log on to the Realtime Compute for Apache Flink console.

  2. Create a Realtime Compute for Apache Flink job. For more information, see the Job Development > Development section of the Blink SQL Development Guide in the Alibaba Cloud Blink Exclusive Mode (Phased-Out for Alibaba Cloud) document.

Step 2: Create the SLS source table

Define a source table that reads from SLS:

create table sls_stream(
  a int,
  b int,
  c VARCHAR
)
WITH (
  type ='sls',
  endPoint ='<yourEndpoint>',
  accessId ='<yourAccessId>',
  accessKey ='<yourAccessKey>',
  startTime = '<yourStartTime>',
  project ='<yourProjectName>',
  logStore ='<yourLogStoreName>',
  consumerGroup ='<yourConsumerGroupName>'
);

Replace the placeholders with your actual values:

Parameter

Required

Description

endPoint

Required

Public endpoint of SLS. For example, http://cn-hangzhou.log.aliyuncs.com. Must start with http://. For the full list of endpoints, see Endpoints.

accessId

Required

Your AccessKey ID.

accessKey

Required

Your AccessKey secret.

startTime

Required

The point in time from which to start consuming logs. When you run the job, the selected time must be later than this value.

project

Required

Name of the SLS project.

logStore

Required

Name of the Logstore in the project.

consumerGroup

Required

Name of the consumer group for SLS.

Step 3: Create the Elasticsearch sink table

Define a sink table that writes to Elasticsearch:

CREATE TABLE es_stream_sink(
  a int,
  cnt BIGINT,
  PRIMARY KEY(a)
)
WITH(
  type ='elasticsearch-7',
  endPoint = 'http://<instanceid>.public.elasticsearch.aliyuncs.com:<port>',
  accessId = '<yourAccessId>',
  accessKey = '<yourAccessSecret>',
  index = '<yourIndex>',
  typeName = '<yourTypeName>'
);

Replace the placeholders with your actual values:

Parameter

Required

Default

Description

endPoint

Required

Public endpoint of your Elasticsearch instance, in the format http://<instanceid>.public.elasticsearch.aliyuncs.com:9200. Find this on the basic information page of your instance. See View the basic information of an instance.

accessId

Required

elastic

Username for the Elasticsearch instance. The default username is elastic.

accessKey

Required

Password for the user. Set at instance creation. If you forget the password, see Reset the access password of an instance.

index

Required

Index name. Create the index before running the job, or enable automatic index creation. See Beginner's guide: From instance creation to data retrieval and Configure YML parameters.

typeName

Required

Index type. For Elasticsearch instances of V7.0 or later, this must be _doc.

Step 4: Write the business logic

Insert the aggregated results into the sink table:

INSERT INTO es_stream_sink
SELECT
  a,
  count(*) as cnt
FROM sls_stream GROUP BY a

Step 5: Publish and start the job

Publish and start the job. Data from SLS is aggregated and written to Elasticsearch.

Key handling and update modes

How Flink writes documents to Elasticsearch depends on whether a primary key is defined in the sink table DDL.

Mode

Trigger

Document ID

Upsert

PRIMARY KEY defined

Primary key field value

Append

No PRIMARY KEY

System randomly generates a document ID

Important

Only one field can be specified as PRIMARY KEY.

When running in upsert mode, the updateMode parameter controls how existing documents are updated:

updateMode value

Behavior

full

Incoming records completely overwrite existing documents.

inc

Only the fields present in the incoming record are updated; other fields remain unchanged.

All updates use upsert semantics by default (INSERT or UPDATE). For more information, see Index API.

What's next

To implement custom write logic beyond what the built-in Elasticsearch connector supports, use the custom sink feature of Realtime Compute for Apache Flink.