This topic describes how to use Alibaba Cloud Realtime Compute to process data and then import the data to Alibaba Cloud Elasticsearch (ES).

Prerequisites

Background information

Alibaba Cloud Realtime Compute is an Alibaba Cloud Flink product. It supports multiple data sources and data consumption services, such as Kafka and Elasticsearch.DDL overview You can use Alibaba Cloud Realtime Compute and Elasticsearch to meet business requirements in log retrieval scenarios.

Logs in Kafka or Log Service are processed by Realtime Compute with simple or complex Flink SQL statements and then imported to Alibaba Cloud Elasticsearch as source data for search services. With the powerful computing capability of Realtime Compute and the search capability of Alibaba Cloud Elasticsearch, real-time data processing and search can be achieved to help transforming your businesses to real-time services. In addition, Realtime Compute can be easily integrated with Alibaba Cloud Elasticsearch. The following example shows how to integrate Realtime Compute with Alibaba Cloud Elasticsearch.

For example, assume that logs or data has been imported to Log Service, and you need to process the data before importing it to Alibaba Cloud Elasticsearch. The following figure shows the data consumption pipeline.

Realtime Compute and Alibaba Cloud Elasticsearch integration

Create a Realtime Compute job

  1. Log on to the Realtime Compute console and create a job.
  2. Write Flink SQL statements.
    1. Create a Log Service table.
      create table sls_stream(
        a int,
        b int,
        c VARCHAR
      )
      WITH (
        type ='sls',  
        endPoint ='<yourEndpoint>',
        accessId ='<yourAccessId>',
        accessKey ='<yourAccessKey>',
        startTime = '<yourStartTime>',
        project ='<yourProjectName>',
        logStore ='<yourLogStoreName>',
        consumerGroup ='<yourConsumerGroupName>'
      );
      The WITH variables are described as follows.
      Variable Description
      <yourEndpoint> The public endpoint of Alibaba Cloud Log Service. The endpoint is a URL used to access projects and internal log data in Log Service. For more information, see Service endpoint.

      For example, the endpoint of Log Service in China (Hangzhou) is http://cn-hangzhou.log.aliyuncs.com. Make sure that the endpoint starts with http://.

      <yourAccessId> The AccessKey ID used to access Log Service.
      <yourAccessKey> The AccessKey secret used to access Log Service.
      <yourStartTime> The beginning of the time range where log data is consumed. When you run a Realtime Compute job, you must specify a time earlier than the start time specified by this variable.
      <yourProjectName> The name of the Log Service project.
      <yourLogStoreName> The name of the Logstore in the project.
      <yourConsumerGroupName> The name of the Log Service consumer group.

      For more information about the WITH variables, see Create a Log Service source table.

    2. Create an Elasticsearch result table.
      Notice
      • Realtime Compute V3.2.2 and later versions have added support for Elasticsearch result tables. When you create the Realtime Compute job, select the correct version.
      • Elasticsearch result tables are based on RESTful API. Therefore, they are compatible with all Elasticsearch versions.
      CREATE TABLE es_stream_sink(
        a int,
        cnt BIGINT,
        PRIMARY KEY(a)
      )
      WITH(
        type ='elasticsearch',
        endPoint = 'http://<instanceid>.public.elasticsearch.aliyuncs.com:<port>',
        accessId = '<yourAccessId>',
        accessKey = '<yourAccessSecret>',
        index = '<yourIndex>',
        typeName = '<yourTypeName>'
      );
      The WITH variables are described as follows.
      Variable Description
      <instanceid> The ID of the Elasticsearch instance. You can check the instance ID on the Basic Information page of the instance.

      Example: es-cn-45xxxxxxxxxxxxk1q.

      <port> The public network port of the Elasticsearch instance. You can check the public network port on the Basic Information page of the instance.

      The default port number is 9200.

      <yourAccessId> The username used to access the Elasticsearch instance and log on to the Kibana console. The default username is elastic.
      <yourAccessKey> The password used to access the Elasticsearch instance and log on to the Kibana console. The password is specified when you create the Elasticsearch instance.
      <yourIndex> The index of the documents on the Elasticsearch instance. An index is like a database name. If no index is created for the documents, create an index first. For more information, see Create an index.
      <yourTypeName> The type of the index. A type is like the name of a table in a database. If no type is specified for the index, specify a type first. For more information, see Create an index.

      For more information about the WITH variables, see Create an ElasticSearch result table.

      • Elasticsearch supports updating documents according to the document IDs contained in the PRIMARY KEY field. Only one field can be specified as the PRIMARY KEY field. After you specify the PRIMARY KEY field, values in the field are used as document IDs. Document IDs are randomly generated for documents without the PRIMARY KEY field. For more information, see Index API.
      • Elasticsearch supports multiple update modes. You can set the updateMode parameter to specify the update mode.
        • If updateMode=full, new documents overwrite existing documents.
        • If updateMode=inc, new values overwrite existing values in the relevant fields.
      • All updates in Elasticsearch follow the UPSERT syntax, which means to insert or update data.
    3. Create a data consumption logic and synchronize the data.
      INSERT INTO es_stream_sink
      SELECT 
        a,
        count(*) as cnt
      FROM sls_stream GROUP BY a
  3. Submit and run the job.
    After you submit and run the job, the data stored in Log Service is aggregated and then imported to Elasticsearch. Realtime Compute also supports other compute operations, for example, you can create data views and user-defined extensions (USXs). For more information, see Flink SQL overview.

Summary

With Alibaba Cloud Realtime Compute and Elasticsearch, you can quickly create your own real-time search services. If it requires a more complicated logic to import data to Alibaba Cloud Elasticsearch, use the custom sink feature of Realtime Compute. For more information, see Create a custom result table.