This topic describes how to create an Elasticsearch sink connector to synchronize data from a data source topic in your Message Queue for Apache Kafka instance to an index of your Elasticsearch instance.

Prerequisites

Before you synchronize data, make sure that the following requirements are met:
  • Message Queue for Apache Kafka
    • The connector feature is enabled for the Message Queue for Apache Kafka instance. For more information, see Enable the connector feature.
    • A data source topic is created in the Message Queue for Apache Kafka instance. For more information, see Step 1: Create a topic.
  • Function Compute
  • Elasticsearch
    Note
    • The version of the Elasticsearch client used by Function Compute is 7.7.0. To maintain compatibility, you must create an Elasticsearch instance of version 7.0 or later.
    • When you configure the whitelist, you can specify 0.0.0.0/0 as the CIDR block, which indicates that the Elasticsearch instance can be accessed from all IP addresses in the virtual private cloud (VPC) that you use. After the access succeeds, change the CIDR block as needed.

Usage notes

  • To synchronize data from Message Queue for Apache Kafka to Elasticsearch, the Message Queue for Apache Kafka instance that contains the data source topic and the Elasticsearch instance must be in the same region. Message Queue for Apache Kafka first synchronizes the data to Function Compute. Then, Function Compute synchronizes the data to Elasticsearch. For information about limits on connectors, see Limits.
  • Elasticsearch sink connectors are provided based on Function Compute. Function Compute provides you with a free quota. If your usage exceeds the free quota, you are charged for the excess usage based on the billing policies of Function Compute. For more information, see Billing.
  • Function Compute allows you to query the logs of function invocations so that you can troubleshoot issues at the earliest opportunity. For more information, see Configure logging.
  • Message Queue for Apache Kafka serializes messages into UTF-8-encoded strings for message transfer. Message Queue for Apache Kafka does not support the BINARY data type.
  • By default, if you specify the private endpoint of the Elasticsearch instance for the Elasticsearch sink connector, Function Compute cannot access the Elasticsearch instance. To ensure network connectivity, you must specify the same VPC and vSwitch as those of the Elasticsearch instance for the related Function Compute service in the Function Compute console. For more information, see Update a Service.

Create and deploy an Elasticsearch sink connector

  1. Log on to the Message Queue for Apache Kafka console.
  2. In the Resource Distribution section of the Overview page, select the region where your instance resides.
  3. In the left-side navigation pane, click Connectors.
  4. On the Connectors page, click Create Connector.
  5. Complete the Create Connector wizard.
    1. In the Configure Basic Information step, set the parameters that are described in the following table and click Next.
      Notice By default, Authorize to Create Service Linked Role is selected. This means that Message Queue for Apache Kafka will create the required service-linked role based on the following rules:
      • If you have not created the required service-linked role, Message Queue for Apache Kafka automatically creates the service-linked role for you to use the Elasticsearch sink connector to synchronize data from Message Queue for Apache Kafka to Elasticsearch.
      • If you have created the required service-linked role, Message Queue for Apache Kafka does not create it again.
      For more information, see Service-linked roles.
      Parameter Description Example
      Name The name of the connector. Take note of the following rules when you specify a connector name:
      • The connector name must be 1 to 48 characters in length. It can contain digits, lowercase letters, and hyphens (-), but cannot start with a hyphen (-).
      • Each connector name must be unique within a Message Queue for Apache Kafka instance.

      The data synchronization task of the connector must use a consumer group that is named in the connect-Task name format.Group If you have not created such a consumer group, Message Queue for Apache Kafka automatically creates one for you.Group

      kafka-elasticsearch-sink
      Instance The information about the Message Queue for Apache Kafka instance. By default, the name and ID of the instance are displayed. demo alikafka_post-cn-st21p8vj****
    2. In the Configure Source Service step, select Message Queue for Apache Kafka as the source service, set the parameters that are described in the following table, and then click Next.
      Parameter Description Example
      Data Source Topic The name of the data source topic from which data is to be synchronized. elasticsearch-test-input
      Consumer Thread Concurrency The number of concurrent consumer threads used to synchronize data from the data source topic. Default value: 6. Valid values:
      • 1
      • 2
      • 3
      • 6
      • 12
      6
      Consumer Offset The offset where consumption starts. Valid values:
      • Earliest Offset: Consumption starts from the earliest offset.
      • Latest Offset: Consumption starts from the latest offset.
      Earliest Offset
      VPC ID The ID of the VPC where the data synchronization task runs. Click Configure Runtime Environment to display the parameter. The default value is the VPC ID that you specified when you deployed the Message Queue for Apache Kafka instance. You do not need to change the value. vpc-bp1xpdnd3l***
      vSwitch ID The ID of the vSwitch where the data synchronization task runs. Click Configure Runtime Environment to display the parameter. The vSwitch must be deployed in the same VPC as the Message Queue for Apache Kafka instance. The default value is the vSwitch ID that you specified when you deployed the Message Queue for Apache Kafka instance. vsw-bp1d2jgg81***
      Failure Handling Policy Specifies whether to retain the subscription to the partition where an error occurs after the relevant message fails to be sent. Click Configure Runtime Environment to display the parameter. Valid values:
      • Continue Subscription: retains the subscription to the partition where an error occurs and returns the logs.
      • Stop Subscription: stops the subscription to the partition where an error occurs and returns the logs.
      Note
      • For information about how to view the connector logs, see View connector logs.
      • For information about how to troubleshoot errors based on error codes, see Error codes.
      • To resume the subscription to the partition where an error occurs, submit a ticket to the technical support of Message Queue for Apache Kafka.
      Continue Subscription
      Resource Creation Method The method to create the topics and consumer group that are required by the Elasticsearch sink connector. Click Configure Runtime Environment to display the parameter. Valid values:
      • Auto
      • Manual
      Auto
      Connector Consumer Group The name of the consumer group that is required by the data synchronization task of the connector.Group Click Configure Runtime Environment to display the parameter. The name of this consumer group must be in the connect-Task name format.Group connect-kafka-elasticsearch-sink
      Task Offset Topic The topic that is used to store consumer offsets. Click Configure Runtime Environment to display the parameter.
      • Topic: We recommend that you start the topic name with connect-offset.
      • Partitions: The number of partitions in the topic must be greater than 1.
      • Storage Engine: The storage engine of the topic must be set to Local Storage.
      • cleanup.policy: The log cleanup policy for the topic must be set to compact.
      connect-offset-kafka-elasticsearch-sink
      Task Configuration Topic The topic that is used to store task configurations. Click Configure Runtime Environment to display the parameter.
      • Topic: We recommend that you start the topic name with connect-config.
      • Partitions: The topic can contain only one partition.
      • Storage Engine: The storage engine of the topic must be set to Local Storage.
      • cleanup.policy: The log cleanup policy for the topic must be set to compact.
      connect-config-kafka-elasticsearch-sink
      Task Status Topic The topic that is used to store the task status. Click Configure Runtime Environment to display the parameter.
      • Topic: We recommend that you start the topic name with connect-status.
      • Partitions: We recommend that you set the number of partitions in the topic to 6.
      • Storage Engine: The storage engine of the topic must be set to Local Storage.
      • cleanup.policy: The log cleanup policy for the topic must be set to compact.
      connect-status-kafka-elasticsearch-sink
      Dead-letter Queue Topic The topic that is used to store the error data of the connector framework. Click Configure Runtime Environment to display the parameter. To save topic resources, you can create a topic as both the dead-letter queue topic and the error data topic.
      • Topic: We recommend that you start the topic name with connect-error.
      • Partitions: We recommend that you set the number of partitions in the topic to 6.
      • Storage Engine: The storage engine of the topic can be set to Local Storage or Cloud Storage.
      connect-error-kafka-elasticsearch-sink
      Error Data Topic The topic that is used to store the error data of the connector. Click Configure Runtime Environment to display the parameter. To save topic resources, you can create a topic as both the dead-letter queue topic and the error data topic.
      • Topic: We recommend that you start the topic name with connect-error.
      • Partitions: We recommend that you set the number of partitions in the topic to 6.
      • Storage Engine: The storage engine of the topic can be set to Local Storage or Cloud Storage.
      connect-error-kafka-elasticsearch-sink
    3. In the Configure Destination Service step, select Elasticsearch as the destination service, set the parameters that are described in the following table, and then click Create.
      Parameter Description Example
      Elasticsearch Instance ID The ID of the Elasticsearch instance. es-cn-oew1o67x0000****
      Endpoint The public or private endpoint of the Elasticsearch instance. For more information, see View the basic information of a cluster. es-cn-oew1o67x0000****.elasticsearch.aliyuncs.com
      Port The public or private port used to access the Elasticsearch instance. Valid values:
      • 9200: for HTTP and HTTPS
      • 9300: for TCP

      For more information, see View the basic information of a cluster.

      9300
      Username The username that is used to log on to the Kibana console. Default value: elastic. You can also customize the username. For more information, see Use the RBAC mechanism provided by Elasticsearch X-Pack to implement access control. elastic
      Password The password that is used to log on to the Kibana console. The password of the elastic user is specified when you create the Elasticsearch instance. If you forget the password, you can reset it. For more information, see Reset the access password for an Elasticsearch cluster. ********
      Index The name of the Elasticsearch index. elastic_test
      Note
      • The username and password are used to initialize Elasticsearch objects. To ship messages by using bulk, make sure that the account has the write permissions on the index.
      • The username and password are passed to functions of Function Compute as environment variables when Message Queue for Apache Kafka creates a synchronization task. After the task is created, Message Queue for Apache Kafka does not save the username or password.
      After the connector is created, you can view it on the Connectors page.
  6. Go to the Connectors page, find the connector that you created, and then click Deploy in the Actions column.

Configure the related Function Compute service

After you create and deploy the Elasticsearch sink connector in the Message Queue for Apache Kafka console, Function Compute automatically creates a Function Compute service for the connector and names the service in the kafka-service-<Connector name>-<Random string> format.

  1. On the Connectors page, find the connector that you created. In the Actions column of the connector, choose More > Configure Function.
    You are redirected to the Function Compute console.
  2. In the Function Compute console, find the automatically created service and specify a VPC and a vSwitch for the service. Make sure that the VPC and vSwitch are the same as those specified for your Elasticsearch instance. For more information, see Update a Service.

Send a test message

You can send a message to the data source topic in Message Queue for Apache Kafka to test whether the data can be synchronized to Elasticsearch.

  1. On the Connectors page, find the connector that you created, and click Test in the Actions column.
  2. In the Send Message panel, set the parameters or use the method as prompted to send a test message.
    • Set the Method of Sending parameter to Console.
      1. In the Message Key field, enter the key of the test message, such as demo.
      2. In the Message Content field, enter the content of the test message, such as {"key": "test"}.
      3. Set the Send to Specified Partition parameter to specify whether to send the test message to a specific partition.
        • If you want to send the test message to a specific partition, click Yes and enter the partition ID, such as 0, in the Partition ID field. For more information about how to query partition IDs, see View partition status.
        • If you do not want to send the test message to a specific partition, click No.
    • Set the Method of Sending parameter to Docker and run the docker commands provided in the Run the Docker container to produce a sample message section to send the test message.
    • Set the Method of Sending parameter to SDK, select a programming language or a framework, and then select an access method to use the corresponding SDK to send the test message.

Verify the results

After you send a message to the data source topic in your Message Queue for Apache Kafka instance, log on to the Kibana console and run the GET /<index_name>/_search command to view the Elasticsearch index and verify whether the data is synchronized.

The following code provides an example of the data synchronized from Message Queue for Apache Kafka to Elasticsearch.
{
  "took" : 8,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 1,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "product_****",
        "_type" : "_doc",
        "_id" : "TX3TZHgBfHNEDGoZ****",
        "_score" : 1.0,
        "_source" : {
          "msg_body" : {
            "key" : "test",
            "offset" : 2,
            "overflowFlag" : false,
            "partition" : 2,
            "timestamp" : 1616599282417,
            "topic" : "dv****",
            "value" : "test1",
            "valueSize" : 8
          },
          "doc_as_upsert" : true
        }
      }
    ]
  }
}