This topic describes how to create an Elasticsearch sink connector to synchronize data from a data source topic in your Message Queue for Apache Kafka instance to an index of your Elasticsearch instance.
Prerequisites
- Message Queue for Apache Kafka
- The connector feature is enabled for the Message Queue for Apache Kafka instance. For more information, see Enable the connector feature.
- A data source topic is created in the Message Queue for Apache Kafka instance. For more information, see Step 1: Create a topic.
- Function Compute
- Function Compute is activated. For more information, see Activate Function Compute.
- Elasticsearch
- An instance and an index are created in the Elasticsearch console. For more information, see Quick start.
- The CIDR block of the Function Compute endpoint that you use is added to the whitelist of the Elasticsearch instance. For more information, see Configure a public or private IP address whitelist for an Elasticsearch cluster.
Note- The version of the Elasticsearch client used by Function Compute is 7.7.0. To maintain compatibility, you must create an Elasticsearch instance of version 7.0 or later.
- When you configure the whitelist, you can specify 0.0.0.0/0 as the CIDR block, which indicates that the Elasticsearch instance can be accessed from all IP addresses in the virtual private cloud (VPC) that you use. After the access succeeds, change the CIDR block as needed.
Usage notes
- To synchronize data from Message Queue for Apache Kafka to Elasticsearch, the Message Queue for Apache Kafka instance that contains the data source topic and the Elasticsearch instance must be in the same region. Message Queue for Apache Kafka first synchronizes the data to Function Compute. Then, Function Compute synchronizes the data to Elasticsearch. For information about limits on connectors, see Limits.
- Elasticsearch sink connectors are provided based on Function Compute. Function Compute provides you with a free quota. If your usage exceeds the free quota, you are charged for the excess usage based on the billing policies of Function Compute. For more information, see Billing.
- Function Compute allows you to query the logs of function invocations so that you can troubleshoot issues at the earliest opportunity. For more information, see Configure logging.
- Message Queue for Apache Kafka serializes messages into UTF-8-encoded strings for message transfer. Message Queue for Apache Kafka does not support the BINARY data type.
- By default, if you specify the private endpoint of the Elasticsearch instance for the Elasticsearch sink connector, Function Compute cannot access the Elasticsearch instance. To ensure network connectivity, you must specify the same VPC and vSwitch as those of the Elasticsearch instance for the related Function Compute service in the Function Compute console. For more information, see Update a Service.
Create and deploy an Elasticsearch sink connector
- Log on to the Message Queue for Apache Kafka console.
- In the Resource Distribution section of the Overview page, select the region where your instance resides.
- In the left-side navigation pane, click Connectors.
- On the Connectors page, click Create Connector.
- Complete the Create Connector wizard.
- Go to the Connectors page, find the connector that you created, and then click Deploy in the Actions column.
Configure the related Function Compute service
After you create and deploy the Elasticsearch sink connector in the Message Queue for Apache Kafka console, Function Compute automatically creates a Function Compute service for the
connector and names the service in the kafka-service-<Connector name>-<Random string>
format.
Send a test message
You can send a message to the data source topic in Message Queue for Apache Kafka to test whether the data can be synchronized to Elasticsearch.
- On the Connectors page, find the connector that you created, and click Test in the Actions column.
- In the Send Message panel, set the parameters or use the method as prompted to send a test message.
- Set the Method of Sending parameter to Console.
- In the Message Key field, enter the key of the test message, such as demo.
- In the Message Content field, enter the content of the test message, such as {"key": "test"}.
- Set the Send to Specified Partition parameter to specify whether to send the test message to a specific partition.
- If you want to send the test message to a specific partition, click Yes and enter the partition ID, such as 0, in the Partition ID field. For more information about how to query partition IDs, see View partition status.
- If you do not want to send the test message to a specific partition, click No.
- Set the Method of Sending parameter to Docker and run the docker commands provided in the Run the Docker container to produce a sample message section to send the test message.
- Set the Method of Sending parameter to SDK, select a programming language or a framework, and then select an access method to use the corresponding SDK to send the test message.
- Set the Method of Sending parameter to Console.
Verify the results
After you send a message to the data source topic in your Message Queue for Apache Kafka instance, log on to the Kibana console and run the GET /<index_name>/_search
command to view the Elasticsearch index and verify whether the data is synchronized.
{
"took" : 8,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 1,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "product_****",
"_type" : "_doc",
"_id" : "TX3TZHgBfHNEDGoZ****",
"_score" : 1.0,
"_source" : {
"msg_body" : {
"key" : "test",
"offset" : 2,
"overflowFlag" : false,
"partition" : 2,
"timestamp" : 1616599282417,
"topic" : "dv****",
"value" : "test1",
"valueSize" : 8
},
"doc_as_upsert" : true
}
}
]
}
}