This tutorial shows you how to build a log retrieval pipeline that reads data from Alibaba Cloud Simple Log Service (SLS), processes it with Realtime Compute for Apache Flink using Flink SQL, and writes the results to Elasticsearch for full-text search.
Prerequisites
Before you begin, ensure that you have:
Activated Realtime Compute for Apache Flink and created a project
For more information, see the Job Development > Development section of the Blink SQL Development Guide in the Alibaba Cloud Blink Exclusive Mode (Phased-Out for Alibaba Cloud) document.
Created an Alibaba Cloud Elasticsearch instance. For more information, see Create an Alibaba Cloud Elasticsearch instance
Activated SLS and created a project and a Logstore. For more information, see Activate Alibaba Cloud Simple Log Service, Manage projects, and Create a basic Logstore
How it works
Log data from SLS streams into Realtime Compute for Apache Flink. Flink applies your SQL transformations, then writes the processed records to Elasticsearch over its REST API. Elasticsearch indexes each record as a document, making it immediately searchable.

The Elasticsearch sink connector is implemented using REST APIs and is compatible with all Elasticsearch versions. Realtime Compute for Apache Flink version 3.2.2 and later supports Elasticsearch sink tables.
Set up the Flink SQL pipeline
Step 1: Create a Flink job
Log on to the Realtime Compute for Apache Flink console.
Create a Realtime Compute for Apache Flink job. For more information, see the Job Development > Development section of the Blink SQL Development Guide in the Alibaba Cloud Blink Exclusive Mode (Phased-Out for Alibaba Cloud) document.
Step 2: Create the SLS source table
Define a source table that reads from SLS:
create table sls_stream(
a int,
b int,
c VARCHAR
)
WITH (
type ='sls',
endPoint ='<yourEndpoint>',
accessId ='<yourAccessId>',
accessKey ='<yourAccessKey>',
startTime = '<yourStartTime>',
project ='<yourProjectName>',
logStore ='<yourLogStoreName>',
consumerGroup ='<yourConsumerGroupName>'
);Replace the placeholders with your actual values:
Parameter | Required | Description |
| Required | Public endpoint of SLS. For example, |
| Required | Your AccessKey ID. |
| Required | Your AccessKey secret. |
| Required | The point in time from which to start consuming logs. When you run the job, the selected time must be later than this value. |
| Required | Name of the SLS project. |
| Required | Name of the Logstore in the project. |
| Required | Name of the consumer group for SLS. |
Step 3: Create the Elasticsearch sink table
Define a sink table that writes to Elasticsearch:
CREATE TABLE es_stream_sink(
a int,
cnt BIGINT,
PRIMARY KEY(a)
)
WITH(
type ='elasticsearch-7',
endPoint = 'http://<instanceid>.public.elasticsearch.aliyuncs.com:<port>',
accessId = '<yourAccessId>',
accessKey = '<yourAccessSecret>',
index = '<yourIndex>',
typeName = '<yourTypeName>'
);Replace the placeholders with your actual values:
Parameter | Required | Default | Description |
| Required | — | Public endpoint of your Elasticsearch instance, in the format |
| Required |
| Username for the Elasticsearch instance. The default username is |
| Required | — | Password for the user. Set at instance creation. If you forget the password, see Reset the access password of an instance. |
| Required | — | Index name. Create the index before running the job, or enable automatic index creation. See Beginner's guide: From instance creation to data retrieval and Configure YML parameters. |
| Required | — | Index type. For Elasticsearch instances of V7.0 or later, this must be |
Step 4: Write the business logic
Insert the aggregated results into the sink table:
INSERT INTO es_stream_sink
SELECT
a,
count(*) as cnt
FROM sls_stream GROUP BY aStep 5: Publish and start the job
Publish and start the job. Data from SLS is aggregated and written to Elasticsearch.
Key handling and update modes
How Flink writes documents to Elasticsearch depends on whether a primary key is defined in the sink table DDL.
Mode | Trigger | Document ID |
Upsert |
| Primary key field value |
Append | No | System randomly generates a document ID |
Only one field can be specified as PRIMARY KEY.
When running in upsert mode, the updateMode parameter controls how existing documents are updated:
| Behavior |
| Incoming records completely overwrite existing documents. |
| Only the fields present in the incoming record are updated; other fields remain unchanged. |
All updates use upsert semantics by default (INSERT or UPDATE). For more information, see Index API.
What's next
To implement custom write logic beyond what the built-in Elasticsearch connector supports, use the custom sink feature of Realtime Compute for Apache Flink.