This topic describes how to create an Elasticsearch sink connector to synchronize data from a source topic in your ApsaraMQ for Kafka instance to an index of your Elasticsearch cluster.

Prerequisites

Before you synchronize data, make sure that the following requirements are met:
  • ApsaraMQ for Kafka
  • Function Compute
  • Elasticsearch
    Note
    • The version of the Elasticsearch client that is used by Function Compute is 7.7.0. To ensure compatibility, create an Elasticsearch cluster of version 7.0 or later.
    • When you configure the whitelist, you can specify 0.0.0.0/0 as the CIDR block, which indicates that the Elasticsearch cluster can be accessed from all IP addresses in the virtual private cloud (VPC) that you use. After the access succeeds, change the CIDR block as needed.

Usage notes

  • To synchronize data from ApsaraMQ for Kafka to Elasticsearch, the Message Queue for Apache Kafka instance that contains the source topic and the Elasticsearch cluster must be in the same region. Message Queue for Apache Kafka first synchronizes the data to Function Compute. Then, Function Compute synchronizes the data to Elasticsearch. For information about the limits on connectors, see Limits.
  • Elasticsearch sink connectors export data by using Function Compute. Function Compute provides a certain amount of resources for free. When you use up this free quota, you are charged for the Function Compute resources that you use based on the billing rules. For more information, see Billing overview.
  • Function Compute allows you to query the logs of function calls to troubleshoot issues. For more information, see Configure the logging feature.
  • ApsaraMQ for Kafka serializes messages into UTF-8-encoded strings for transfer. Message Queue for Apache Kafka does not support binary data.
  • By default, if you specify the private endpoint of the Elasticsearch cluster for the Elasticsearch sink connector, Function Compute cannot access the Elasticsearch cluster. To ensure network connectivity, you must specify the same VPC and vSwitch as those of the Elasticsearch cluster for the related Function Compute service in the Function Compute console. For more information, see Update a service.

Create and deploy an Elasticsearch sink connector

  1. Log on to the ApsaraMQ for Kafka console.
  2. In the Resource Distribution section of the Overview page, select the region where your instance is deployed.
  3. In the left-side navigation pane, click Connectors.
  4. On the Connectors page, select the instance in which the data source topic resides from the Select Instance drop-down list and click Create Connector.
  5. Perform the following operations to complete the Create Connector wizard.
    1. In the Configure Basic Information step, configure parameters and click Next. The following table describes the parameters.
      Important By default, Authorize to Create Service Linked Role is selected. This means that ApsaraMQ for Kafka will create a service-linked role based on the following rules:
      • If you have not created a service-linked role, ApsaraMQ for Kafka automatically creates a service-linked role for you to use the Elasticsearch sink connector to synchronize data from ApsaraMQ for Kafka to Elasticsearch.
      • If a service-linked role is available, ApsaraMQ for Kafka does not create a new role.
      For more information about service-linked roles, see Service-linked roles.
      ParameterDescriptionExample
      NameThe name of the connector. Specify a connector name based on the following naming conventions:
      • The connector name must be 1 to 48 characters in length and can contain digits, lowercase letters, and hyphens (-). The name cannot start with a hyphen (-).
      • The name must be unique within a ApsaraMQ for Kafka instance.

      The data synchronization task of the connector must use a consumer group that is named in the connect-Task name format. If you have not created such a consumer group, Message Queue for Apache Kafka automatically creates one for you.

      kafka-elasticsearch-sink
      InstanceThe information about the Message Queue for Apache Kafka instance. By default, the name and ID of the instance are displayed. demo alikafka_post-cn-st21p8vj****
    2. In the Configure Source Service step, select Message Queue for Apache Kafka as the source service, configure parameters, and then click Next. The following table describes the parameters.
      ParameterDescriptionExample
      Data Source TopicThe name of the topic from which data is to be synchronized. elasticsearch-test-input
      Consumer Thread ConcurrencyThe number of concurrent consumer threads that are used to synchronize data from the source topic. Default value: 6. Valid values:
      • 1
      • 2
      • 3
      • 6
      • 12
      6
      Consumer OffsetThe consumer offset from which you want message consumption to start. Valid values:
      • Earliest Offset: Message consumption starts from the earliest consumer offset.
      • Latest Offset: Message consumption starts from the latest consumer offset.
      Earliest Offset
      VPC IDThe ID of the VPC in which the source instance is deployed. Click Configure Runtime Environment to display the parameter. By default, the VPC ID that you specified when you deployed the source ApsaraMQ for Kafka instance is displayed. You do not need to configure this parameter. vpc-bp1xpdnd3l***
      vSwitch IDThe ID of the vSwitch to which the source instance is connected. Click Configure Runtime Environment to display the parameter. The vSwitch must be deployed in the same VPC as the source ApsaraMQ for Kafka instance. The default value is the vSwitch ID that you specified when you deployed the ApsaraMQ for Kafka instance. vsw-bp1d2jgg81***
      Failure Handling PolicySpecifies whether to retain the subscription to a partition after a message sending error occurs in that partition. Click Configure Runtime Environment to display the parameter. Valid values:
      • Continue Subscription: retains the subscription. A log entry is generated for the error.
      • Stop Subscription: stops the subscription. A log entry is generated for the error.
      Note
      • For information about how to view log information, see Manage a connector.
      • For information about how to troubleshoot errors based on error codes, see Error codes.
      Continue Subscription
      Resource Creation MethodThe method to create the topic and group that are required by the Elasticsearch sink connector. Click Configure Runtime Environment to display the parameter.
      • Auto
      • Manual
      Auto
      Connector Consumer GroupThe consumer group that is used by the data synchronization task of the connector.Group Click Configure Runtime Environment to display the parameter. The name of this consumer group must be in the connect-Task name format. connect-kafka-elasticsearch-sink
      Task Offset TopicThe topic that is used to store consumer offsets. Click Configure Runtime Environment to display the parameter.
      • Topic: We recommend that you start the topic name with connect-offset.
      • Partitions: The number of partitions in the topic must be greater than 1.
      • Storage Engine: You must set the storage engine of the topic to Local Storage.
        Note You can set the storage engine to Local Storage only when you create a topic for Professional Edition Message Queue for Apache Kafka instances.
      • cleanup.policy: You must set the log cleanup policy for the topic to Compact.
      connect-offset-kafka-elasticsearch-sink
      Task Configuration TopicThe topic that is used to store the task configurations. Click Configure Runtime Environment to display the parameter.
      • Topic: We recommend that you start the topic name with connect-config.
      • Partitions: The topic can contain only one partition.
      • Storage Engine: You must set the storage engine of the topic to Local Storage.
        Note You can set the storage engine to Local Storage only when you create a topic for Professional Edition Message Queue for Apache Kafka instances.
      • cleanup.policy: You must set the log cleanup policy for the topic to Compact.
      connect-config-kafka-elasticsearch-sink
      Task Status TopicThe topic that is used to store the task status. Click Configure Runtime Environment to display the parameter.
      • Topic: We recommend that you start the topic name with connect-status.
      • Partitions: We recommend that you set the number of partitions in the topic to 6.
      • Storage Engine: You must set the storage engine of the topic to Local Storage.
        Note You can set the storage engine to Local Storage only when you create a topic for Professional Edition Message Queue for Apache Kafka instances.
      • cleanup.policy: You must set the log cleanup policy for the topic to Compact.
      connect-status-kafka-elasticsearch-sink
      Dead-letter Queue TopicThe topic that is used to store the error data of the Kafka Connect framework. Click Configure Runtime Environment to display the parameter. To save topic resources, you can create a topic and use the topic as both the dead-letter queue topic and the error data topic.
      • Topic: We recommend that you start the topic name with connect-error.
      • Partitions: We recommend that you set the number of partitions in the topic to 6.
      • Storage Engine: You can set the storage engine of the topic to Local Storage or Cloud Storage.
        Note You can set the storage engine to Local Storage only when you create a topic for Professional Edition Message Queue for Apache Kafka instances.
      connect-error-kafka-elasticsearch-sink
      Error Data TopicThe topic that is used to store the error data of the sink connector. Click Configure Runtime Environment to display the parameter. To save topic resources, you can create a topic and use the topic as both the dead-letter queue topic and the error data topic.
      • Topic: We recommend that you start the topic name with connect-error.
      • Partitions: We recommend that you set the number of partitions in the topic to 6.
      • Storage Engine: You can set the storage engine of the topic to Local Storage or Cloud Storage.
        Note You can set the storage engine to Local Storage only when you create a topic for Professional Edition Message Queue for Apache Kafka instances.
      connect-error-kafka-elasticsearch-sink
    3. In the Configure Destination Service step, select Elasticsearch as the destination service, configure parameters, and then click Create. The following table describes the parameters.
      ParameterDescriptionExample
      Elasticsearch Instance IDThe ID of the Elasticsearch cluster. es-cn-oew1o67x0000****
      EndpointThe public or private endpoint of the Elasticsearch cluster. For more information, see View the basic information of a cluster. es-cn-oew1o67x0000****.elasticsearch.aliyuncs.com
      PortThe public or private port that is used to access the Elasticsearch cluster. Valid values:
      • 9200: for HTTP and HTTPS
      • 9300: for TCP

      For more information, see View the basic information of a cluster.

      9300
      UsernameThe username that is used to log on to the Kibana console. Default value: elastic. You can also customize the username. For more information, see Use the RBAC mechanism provided by Elasticsearch X-Pack to implement access control. elastic
      PasswordThe password that is used to log on to the Kibana console. The password of the elastic user is specified when you create the Elasticsearch cluster. If you forget the password, you can reset it. For more information, see Reset the access password for an Elasticsearch cluster. ********
      IndexThe name of the Elasticsearch index. elastic_test
      Note
      • The username and password are used to initialize Elasticsearch objects. To ship messages by using bulk, make sure that the account has permissions to write the index.
      • The username and password are passed to the functions in Function Compute as environment variables when ApsaraMQ for Kafka creates a data export task. After the task is created, ApsaraMQ for Kafka does not save the username or password.
      After the connector is created, you can view the connector on the Connectors page.
  6. Go to the Connectors page, find the connector that you created, and then click Deploy in the Actions column.

Configure the related Function Compute service

After you create and deploy the Elasticsearch sink connector in the ApsaraMQ for Kafka console, Function Compute automatically creates a Function Compute service for the connector and names the service in the kafka-service-<Connector_name>-<Random string> format.

  1. On the Connectors page, find the connector that you created. In the Actions column of the connector, choose More > Configure Function.
    You are redirected to the Function Compute console.
  2. In the Function Compute console, find the automatically created service and configure a VPC and vSwitch for the service. Make sure that the VPC and vSwitch are the same as those specified for your Elasticsearch cluster. For more information, see Update a service.

Send messages

You can send a message to the source topic in your ApsaraMQ for Kafka instance to test whether the data can be synchronized to Elasticsearch.

  1. On the Connectors page, find the connector that you want to use and click Test in the Actions column.
  2. In the Send Message panel, configure the required parameters to send a test message.
    • Set the Method of Sending parameter to Console.
      1. In the Message Key field, enter the key of the message. For example, you can enter demo as the key of the message.
      2. In the Message Content field, enter the content of the message. For example, you can enter {"key": "test"} as the content of the message.
      3. Configure the Send to Specified Partition parameter to specify whether to send the message to a specified partition.
        • If you want to send the message to a specified partition, click Yes and enter the partition ID in the Partition ID field. For example, you can enter 0 as the partition ID. For information about how to query partition IDs, see View partition status.
        • If you do not want to send the message to a specified partition, click No.
    • Set the Method of Sending parameter to Docker and run the docker commands that are provided in the Run the Docker container to produce a sample message section to send a test message.
    • Set the Method of Sending parameter to SDK and click the link to the topic that describes how to obtain and use the SDK that you want to use. Then, use the SDK to send and consume a test message. Message Queue for Apache Kafka provides topics that describe how to use SDKs for different programming languages based on different connection types.

Verify the results

After you send a message to the source topic in your ApsaraMQ for Kafka instance, log on to the Kibana console and run the GET /<index_name>/_search command to view the Elasticsearch index and verify whether the data is synchronized.

The following code shows an example of the data synchronized from ApsaraMQ for Kafka to Elasticsearch.
{
  "took" : 8,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 1,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "product_****",
        "_type" : "_doc",
        "_id" : "TX3TZHgBfHNEDGoZ****",
        "_score" : 1.0,
        "_source" : {
          "msg_body" : {
            "key" : "test",
            "offset" : 2,
            "overflowFlag" : false,
            "partition" : 2,
            "timestamp" : 1616599282417,
            "topic" : "dv****",
            "value" : "test1",
            "valueSize" : 8
          },
          "doc_as_upsert" : true
        }
      }
    ]
  }
}