If you want to build a log retrieval system, you can use Alibaba Cloud Realtime Compute for Apache Flink to compute log data and import the processed data into Alibaba Cloud Elasticsearch for searches. This topic uses log data in Log Service to describe the detailed procedure.
- Activate Realtime Compute and create a project.
For more information, see Activate Realtime Compute for Apache Flink and create a project.
- Create an Elasticsearch cluster.
For more information, see Create an Alibaba Cloud Elasticsearch cluster.
- Activate Log Service, and create a project and a Logstore.
Realtime Compute for Apache Flink is a Flink-based service provided by Alibaba Cloud. It supports various input and output systems, such as Kafka and Elasticsearch. You can use Realtime Compute for Apache Flink and Elasticsearch to retrieve logs.
Realtime Compute for Apache Flink processes logs in Kafka or Log Service by using simple or complex Flink SQL statements. Then, it imports the processed logs into an Elasticsearch cluster as source data for searches. The computing capabilities of Realtime Compute for Apache Flink and the search capabilities of Elasticsearch allow you to process and search for data in real time. This help you transform your business into real-time services.
- Log on to the Realtime Compute development platform.
- Create a Realtime Compute job.
For more information, see Create a job.
- Write Flink SQL statements.
- Create a source table for Log Service.
create table sls_stream( a int, b int, c VARCHAR ) WITH ( type ='sls', endPoint ='<yourEndpoint>', accessId ='<yourAccessId>', accessKey ='<yourAccessKey>', startTime = '<yourStartTime>', project ='<yourProjectName>', logStore ='<yourLogStoreName>', consumerGroup ='<yourConsumerGroupName>' );The following table describes the parameters in the WITH part.
Parameter Description endPoint The URL that is used to access projects and logs in Log Service. For more information, see Endpoints.
For example, the URL that is used to access Log Service in the China (Hangzhou) region is http://cn-hangzhou.log.aliyuncs.com. Make sure that the URL starts with http://.
accessId The AccessKey ID of your Alibaba Cloud account. accessKey The AccessKey secret of your Alibaba Cloud account. startTime The time when logs start to be consumed. When you run a Realtime Compute for Apache Flink job, specify a time point that is later than the start time specified by this parameter. project The name of the Log Service project. logStore The name of the Logstore in the project. consumerGroup The name of the Log Service consumer group.
For more information about other parameters in the WITH part, see Create a Log Service source table.
- Create an Elasticsearch result table.
- Elasticsearch result tables are supported in Realtime Compute V3.2.2 and later. When you create a Realtime Compute for Apache Flink job, select a valid version.
- Elasticsearch result tables are based on RESTful APIs and are compatible with all Elasticsearch versions.
CREATE TABLE es_stream_sink( a int, cnt BIGINT, PRIMARY KEY(a) ) WITH( type ='elasticsearch', endPoint = 'http://<instanceid>.public.elasticsearch.aliyuncs.com:<port>', accessId = '<yourAccessId>', accessKey = '<yourAccessSecret>', index = '<yourIndex>', typeName = '<yourTypeName>' );The following table describes the parameters in the WITH part.
Parameter Description endPoint The URL that is used to access the Elasticsearch cluster over the Internet. Specify the URL in the format of http://<instanceid>.public.elasticsearch.aliyuncs.com:9200. You can obtain the public endpoint of the cluster from the Basic Information page of the cluster. For more information, see View the basic information of a cluster. accessId The username that is used to access the Elasticsearch cluster. The default username is elastic. accessKey The password that is used to access the Elasticsearch cluster. The password of the elastic account is specified when you create the cluster. If you forget the password, you can reset it. For more information about the procedure and precautions for resetting a password, see Reset the access password for an Elasticsearch cluster. index The name of the destination index. If no indexes are created in the Elasticsearch cluster, create one first. For more information, see Step 3: Create an index. You can also enable the Auto Indexing feature for the Elasticsearch cluster to automatically create indexes. For more information, see Configure the YML file. typeName The type of the destination index. The index type of Elasticsearch clusters of V7.0 or later must be _doc.
For more information about other parameters in the WITH part, see Create an Elasticsearch result table.Note
- Elasticsearch allows you to update documents based on the
PRIMARY KEYfield. Only one field can be specified as the
PRIMARY KEYfield. If you specify the
PRIMARY KEYfield, values of the
PRIMARY KEYfield are used as document IDs. If the
PRIMARY KEYfield is not specified, the system generates random IDs for documents. For more information, see Index API.
- Elasticsearch supports multiple update modes. You can set the
updateModeparameter to specify the update mode.
updateModeis set to full, new documents overwrite existing documents.
updateModeis set to inc, new values overwrite existing values of the related fields.
- All updates in Elasticsearch are performed by using INSERT or UPDATE statements that follow the UPSERT syntax.
- Create data consumption logic and synchronize data.
INSERT INTO es_stream_sink SELECT a, count(*) as cnt FROM sls_stream GROUP BY a
- Create a source table for Log Service.
- Submit and run the job.
Realtime Compute for Apache Flink and Elasticsearch allow you to quickly create your own real-time search services. If more complex logic is required to import data into an Elasticsearch cluster, use the user-defined sinks of Realtime Compute for Apache Flink. For more information, see Create a custom result table.