This topic describes how to use Alibaba Cloud Realtime Compute to process and synchronize data to an Alibaba Cloud Elasticsearch cluster.

Prerequisites

Background information

Alibaba Cloud Realtime Compute is a Flink-based service provided by Alibaba Cloud. It supports multiple input and output systems, such as Kafka and Elasticsearch. You can use Realtime Compute and Elasticsearch to meet the requirements of log retrieval.

Realtime Compute processes logs in Kafka or Log Service by using simple or complex Flink SQL statements. Then, Realtime Compute imports the logs into an Elasticsearch cluster as source data for search services. The powerful computing capabilities of Realtime Compute and the search capability of Elasticsearch allow you to process and search for real-time data. This enables the transformation of your business to real-time services.

Realtime Compute can be easily integrated with Elasticsearch. Assume that logs or data records have been imported into Log Service, and data needs to be processed before it is imported into an Elasticsearch cluster. The following figure shows the data consumption pipeline.Integration of Realtime Compute with Elasticsearch

Create a Realtime Compute job

  1. Log on to the Realtime Compute development platform and create a job.
  2. Write Flink SQL statements.
    1. Create a source table for Log Service.
      create table sls_stream(
        a int,
        b int,
        c VARCHAR
      )
      WITH (
        type ='sls',  
        endPoint ='<yourEndpoint>',
        accessId ='<yourAccessId>',
        accessKey ='<yourAccessKey>',
        startTime = '<yourStartTime>',
        project ='<yourProjectName>',
        logStore ='<yourLogStoreName>',
        consumerGroup ='<yourConsumerGroupName>'
      );
      The following table describes parameters in the WITH part.
      Parameter Description
      <yourEndpoint> The public endpoint of Alibaba Cloud Log Service. The endpoint is a URL that is used to access projects and log data in Log Service. For more information, see Service endpoint.

      For example, the endpoint of Log Service in the China (Hangzhou) region is http://cn-hangzhou.log.aliyuncs.com. Make sure that the endpoint starts with http://.

      <yourAccessId> The AccessKey ID that is used to access Log Service.
      <yourAccessKey> The AccessKey secret that is used to access Log Service.
      <yourStartTime> The beginning of the time range when log data is consumed. When you run a Realtime Compute job, specify a time that is earlier than the start time specified by this parameter.
      <yourProjectName> The name of the Log Service project.
      <yourLogStoreName> The name of the Logstore in the project.
      <yourConsumerGroupName> The name of the Log Service consumer group.

      For more information about other parameters in the WITH part, see Create a Log Service source table.

    2. Create an Elasticsearch result table.
      Notice
      • Elasticsearch result tables are supported in Realtime Compute V3.2.2 and later. When you create a Realtime Compute job, select the correct version.
      • Elasticsearch result tables are based on the RESTful API. These result tables are compatible with all Elasticsearch versions.
      CREATE TABLE es_stream_sink(
        a int,
        cnt BIGINT,
        PRIMARY KEY(a)
      )
      WITH(
        type ='elasticsearch',
        endPoint = 'http://<instanceid>.public.elasticsearch.aliyuncs.com:<port>',
        accessId = '<yourAccessId>',
        accessKey = '<yourAccessSecret>',
        index = '<yourIndex>',
        typeName = '<yourTypeName>'
      );
      The following table describes parameters in the WITH part.
      Parameter Description
      <instanceid> The ID of the Elasticsearch cluster. You can query the cluster ID on the Basic Information page of the cluster.

      Example: es-cn-45xxxxxxxxxxxxk1q.

      <port> The public port of the Elasticsearch cluster. You can query the public port on the Basic Information page of the cluster.

      Default value: 9200.

      <yourAccessId> The username that is used to access the Elasticsearch cluster. It can also be used to log on to the Kibana console. Default value: elastic.
      <yourAccessKey> The password that is used to access the Elasticsearch cluster. It can also be used to log on to the Kibana console. The password is specified when you create an Elasticsearch cluster.
      <yourIndex> The name of the index for the documents in the Elasticsearch cluster. An index is similar to a database. If no index is created for the documents, create an index. For more information, see Create an index.
      <yourTypeName> The type of the index. A type is similar to a table in a database. If no type is created for the index, create a type. For more information, see Create an index.

      For more information about other parameters in the WITH part, see Create an ElasticSearch result table.

      Note
      • Elasticsearch supports document updates based on the PRIMARY KEY field. Only one field can be specified as the PRIMARY KEY field. If you specify the PRIMARY KEY field, values in the PRIMARY KEY field are used as document IDs. If the PRIMARY KEY field is not specified, the system generates random IDs for documents. For more information, see Index API.
      • Elasticsearch supports multiple update modes. You can set the updateMode parameter to specify the update mode.
        • If updateMode is set to full, new documents overwrite existing documents.
        • If updateMode is set to inc, new values overwrite existing values of the related fields.
      • All updates in Elasticsearch are performed by using statements that follow the UPSERT syntax, such as INSERT or UPDATE.
    3. Create a data consumption logic and synchronize the data.
      INSERT INTO es_stream_sink
      SELECT 
        a,
        count(*) as cnt
      FROM sls_stream GROUP BY a
  3. Submit and run the job.
    After you submit and run the job, data stored in Log Service is aggregated and then imported into an Elasticsearch cluster. Realtime Compute also supports other compute operations. For example, you can create user-defined extensions (UDXs). For more information, see Flink SQL overview.

Summary

Realtime Compute and Elasticsearch allow you to quickly create your own real-time search services. If a more complex logic is required to import data into an Elasticsearch cluster, use the user-defined sinks of Realtime Compute. For more information, see Create a custom result table.