This topic describes how to use Alibaba Cloud Realtime Compute to process and synchronize data to an Alibaba Cloud Elasticsearch cluster.
- Realtime Compute is activated. A project is created.
For more information, see Activate Realtime Compute and create a project.
- Alibaba Cloud Elasticsearch is activated.
For more information, see Create an Elasticsearch cluster.
- Alibaba Cloud Log Service is activated. A project and a Logstore are created. Log Service is used as an example in this topic.
Alibaba Cloud Realtime Compute is a Flink-based service provided by Alibaba Cloud. It supports multiple input and output systems, such as Kafka and Elasticsearch. You can use Realtime Compute and Elasticsearch to meet the requirements of log retrieval.
Realtime Compute processes logs in Kafka or Log Service by using simple or complex Flink SQL statements. Then, Realtime Compute imports the logs into an Elasticsearch cluster as source data for search services. The powerful computing capabilities of Realtime Compute and the search capability of Elasticsearch allow you to process and search for real-time data. This enables the transformation of your business to real-time services.
Create a Realtime Compute job
- Log on to the Realtime Compute development platform and create a job.
- Write Flink SQL statements.
- Create a source table for Log Service.
create table sls_stream( a int, b int, c VARCHAR ) WITH ( type ='sls', endPoint ='<yourEndpoint>', accessId ='<yourAccessId>', accessKey ='<yourAccessKey>', startTime = '<yourStartTime>', project ='<yourProjectName>', logStore ='<yourLogStoreName>', consumerGroup ='<yourConsumerGroupName>' );The following table describes parameters in the WITH part.
Parameter Description <yourEndpoint> The public endpoint of Alibaba Cloud Log Service. The endpoint is a URL that is used to access projects and log data in Log Service. For more information, see Service endpoint.
For example, the endpoint of Log Service in the China (Hangzhou) region is
http://cn-hangzhou.log.aliyuncs.com. Make sure that the endpoint starts with
<yourAccessId> The AccessKey ID that is used to access Log Service. <yourAccessKey> The AccessKey secret that is used to access Log Service. <yourStartTime> The beginning of the time range when log data is consumed. When you run a Realtime Compute job, specify a time that is earlier than the start time specified by this parameter. <yourProjectName> The name of the Log Service project. <yourLogStoreName> The name of the Logstore in the project. <yourConsumerGroupName> The name of the Log Service consumer group.
For more information about other parameters in the WITH part, see Create a Log Service source table.
- Create an Elasticsearch result table.
- Elasticsearch result tables are supported in Realtime Compute V3.2.2 and later. When you create a Realtime Compute job, select the correct version.
- Elasticsearch result tables are based on the RESTful API. These result tables are compatible with all Elasticsearch versions.
CREATE TABLE es_stream_sink( a int, cnt BIGINT, PRIMARY KEY(a) ) WITH( type ='elasticsearch', endPoint = 'http://<instanceid>.public.elasticsearch.aliyuncs.com:<port>', accessId = '<yourAccessId>', accessKey = '<yourAccessSecret>', index = '<yourIndex>', typeName = '<yourTypeName>' );The following table describes parameters in the WITH part.
Parameter Description <instanceid> The ID of the Elasticsearch cluster. You can query the cluster ID on the Basic Information page of the cluster.
<port> The public port of the Elasticsearch cluster. You can query the public port on the Basic Information page of the cluster.
<yourAccessId> The username that is used to access the Elasticsearch cluster. It can also be used to log on to the Kibana console. Default value: elastic. <yourAccessKey> The password that is used to access the Elasticsearch cluster. It can also be used to log on to the Kibana console. The password is specified when you create an Elasticsearch cluster. <yourIndex> The name of the index for the documents in the Elasticsearch cluster. An index is similar to a database. If no index is created for the documents, create an index. For more information, see Create an index. <yourTypeName> The type of the index. A type is similar to a table in a database. If no type is created for the index, create a type. For more information, see Create an index.
For more information about other parameters in the WITH part, see Create an ElasticSearch result table.Note
- Elasticsearch supports document updates based on the
PRIMARY KEYfield. Only one field can be specified as the
PRIMARY KEYfield. If you specify the
PRIMARY KEYfield, values in the
PRIMARY KEYfield are used as document IDs. If the
PRIMARY KEYfield is not specified, the system generates random IDs for documents. For more information, see Index API.
- Elasticsearch supports multiple update modes. You can set the
updateModeparameter to specify the update mode.
updateModeis set to
full, new documents overwrite existing documents.
updateModeis set to
inc, new values overwrite existing values of the related fields.
- All updates in Elasticsearch are performed by using statements that follow the UPSERT syntax, such as INSERT or UPDATE.
- Create a data consumption logic and synchronize the data.
INSERT INTO es_stream_sink SELECT a, count(*) as cnt FROM sls_stream GROUP BY a
- Create a source table for Log Service.
- Submit and run the job.
After you submit and run the job, data stored in Log Service is aggregated and then imported into an Elasticsearch cluster. Realtime Compute also supports other compute operations. For example, you can create user-defined extensions (UDXs). For more information, see Flink SQL overview.
Realtime Compute and Elasticsearch allow you to quickly create your own real-time search services. If a more complex logic is required to import data into an Elasticsearch cluster, use the user-defined sinks of Realtime Compute. For more information, see Create a custom result table.