To build a log retrieval system, you can use Realtime Compute for Apache Flink to process log data and write it to Elasticsearch for searching. This topic uses Alibaba Cloud Simple Log Service (SLS) as an example to describe this process.
Prerequisites
You have completed the following tasks:
Activate Realtime Compute for Apache Flink and create a project.
Create an Alibaba Cloud Elasticsearch instance.
For more information, see Create an Alibaba Cloud Elasticsearch instance.
Activate SLS, and create a project and a Logstore.
For more information, see Activate Alibaba Cloud Simple Log Service, Manage projects, and Create a basic Logstore.
Background
Realtime Compute for Apache Flink is a Flink-based service that is officially supported by Alibaba Cloud. It supports various source and sink systems, such as Kafka and Elasticsearch. The combination of Realtime Compute for Apache Flink and Elasticsearch meets the requirements of typical log retrieval scenarios.
Logs from systems such as Kafka or Simple Log Service are processed by Flink using simple or complex computations, and then written to Elasticsearch for searching. By combining the powerful computing capabilities of Flink with the powerful search capabilities of Elasticsearch, you can implement real-time data transformation and querying for your business. This helps you transition to real-time services.
Realtime Compute for Apache Flink provides a simple way to connect to Elasticsearch. For example, if your business logs or data are written to Simple Log Service and require processing before being written to Elasticsearch for searching, you can use the pipeline shown in the following figure.
Procedure
Log on to the Realtime Compute for Apache Flink console.
Create a Realtime Compute for Apache Flink job.
For more information, see the Job Development > Development section of the Blink SQL Development Guide in the Alibaba Cloud Blink Exclusive Mode (Phased-Out for Alibaba Cloud) document.
Write the Flink SQL.
Create a source table for Simple Log Service.
create table sls_stream( a int, b int, c VARCHAR ) WITH ( type ='sls', endPoint ='<yourEndpoint>', accessId ='<yourAccessId>', accessKey ='<yourAccessKey>', startTime = '<yourStartTime>', project ='<yourProjectName>', logStore ='<yourLogStoreName>', consumerGroup ='<yourConsumerGroupName>' );The following table describes the parameters in the WITH clause.
Variable
Description
endPoint
The public endpoint of Alibaba Cloud Simple Log Service, which is the URL to access the corresponding project and its log data. For more information, see Endpoints.
For example, the endpoint for Simple Log Service in the China (Hangzhou) region is http://cn-hangzhou.log.aliyuncs.com. The endpoint must start with http://.
accessId
Your AccessKey ID.
accessKey
Your AccessKey secret.
startTime
The point in time to start consuming logs. When you run the Flink job, the selected time must be later than the time set here.
project
The name of the Simple Log Service project.
logStore
The name of the Logstore in the project.
consumerGroup
The name of the consumer group for Simple Log Service.
Create an Elasticsearch sink table.
ImportantRealtime Compute for Apache Flink version 3.2.2 and later supports Elasticsearch sink tables. When you create a Flink job, select a supported version.
The Elasticsearch sink table is implemented using REST APIs and is compatible with all Elasticsearch versions.
CREATE TABLE es_stream_sink( a int, cnt BIGINT, PRIMARY KEY(a) ) WITH( type ='elasticsearch-7', endPoint = 'http://<instanceid>.public.elasticsearch.aliyuncs.com:<port>', accessId = '<yourAccessId>', accessKey = '<yourAccessSecret>', index = '<yourIndex>', typeName = '<yourTypeName>' );The following table describes the parameters in the WITH clause.
Parameter
Description
endPoint
The public endpoint of your Alibaba Cloud Elasticsearch instance, in the format http://<instanceid>.public.elasticsearch.aliyuncs.com:9200. You can obtain this from the basic information page of the instance. For more information, see View the basic information of an instance.
accessId
The username to access the Alibaba Cloud Elasticsearch instance. The default is elastic.
accessKey
The password for the user. The password for the elastic user is set when you create the instance. If you forget the password, you can reset it. For more information about the precautions and steps for resetting the password, see Reset the access password of an instance.
index
The index name. If you have not created an index, create one first. For more information, see Beginner's guide: From instance creation to data retrieval. You can also enable automatic index creation. For more information, see Configure YML parameters.
typeName
The index type. For Elasticsearch instances of V7.0 or later, this must be _doc.
NoteElasticsearch supports updating documents using a primary key. Only one field can be specified as the
PRIMARY KEY. After you specify thePRIMARY KEY, the value of thePRIMARY KEYfield is used as the document ID. If aPRIMARY KEYis not specified, the system randomly generates a document ID. For more information, see Index API.Elasticsearch supports multiple update modes. You can specify the mode using the updateMode parameter in the WITH clause:
If
updateMode=full, new documents completely overwrite existing documents.If
updateMode=inc, Elasticsearch updates the corresponding fields based on the input field values.
All updates in Elasticsearch default to UPSERT semantics, which means INSERT or UPDATE.
Write the business logic to process and synchronize the data.
INSERT INTO es_stream_sink SELECT a, count(*) as cnt FROM sls_stream GROUP BY a
Publish and start the job.
After you publish and start the job, data from Simple Log Service is aggregated and then written to Alibaba Cloud Elasticsearch.
More information
You can use Realtime Compute for Apache Flink with Elasticsearch to quickly create a real-time search pipeline. If you have more complex requirements for writing data to Elasticsearch, you can use the custom sink feature of Realtime Compute for Apache Flink.