An Elasticsearch sink connector reads messages from a topic in your ApsaraMQ for Kafka instance and writes them to an Elasticsearch index. The connector uses Function Compute as an intermediary: it consumes messages from the source topic, passes them to a Function Compute function, and the function writes them to Elasticsearch through the Bulk API. Each message becomes a document in the target index, with metadata such as the topic, partition, offset, and timestamp.
Before you begin
Complete the following setup before you create the connector.
ApsaraMQ for Kafka
Enable the connector feature for your instance.
Create a topic to use as the data source.
Function Compute
Elasticsearch
Create an Elasticsearch cluster and index in the Elasticsearch console. Use version 7.0 or later for compatibility with the Function Compute client (version 7.7.0).
Add the Function Compute endpoint CIDR block to the Elasticsearch whitelist. For initial testing, specify
0.0.0.0/0to allow all IP addresses in the VPC, then restrict the range after you verify connectivity.
Information to collect
Gather the following details before you start the wizard.
| Information | Where to find it | Example |
|---|---|---|
| Elasticsearch instance ID | Elasticsearch console | es-cn-oew1o67x0000**** |
| Elasticsearch endpoint (public or private) | Cluster basic information | es-cn-oew1o67x0000****.elasticsearch.aliyuncs.com |
| Elasticsearch port | Cluster basic information | 9200 (HTTP/HTTPS) or 9300 (TCP) |
| Elasticsearch username and password | Set during cluster creation; reset if needed | elastic / ******** |
| Elasticsearch index name | Elasticsearch console | elastic_test |
| Source topic name | ApsaraMQ for Kafka console | elasticsearch-test-input |
Limits
The ApsaraMQ for Kafka instance and the Elasticsearch cluster must be in the same region.
ApsaraMQ for Kafka serializes messages as UTF-8 strings. Binary data is not supported.
If you specify the private endpoint of the Elasticsearch cluster, Function Compute cannot access it by default. To enable connectivity, configure the Function Compute service to use the same VPC and vSwitch as the Elasticsearch cluster. See Configure the Function Compute service.
For additional connector limits, see Limits.
Billing
The connector uses Function Compute to export data. Function Compute provides a free tier. Usage beyond the free tier is billed according to Function Compute billing.
Create and deploy the connector
Log on to the ApsaraMQ for Kafka console.
In the Resource Distribution section of the Overview page, select your region.
In the left-side navigation pane, click Connectors.
On the Connectors page, select your instance from the Select Instance drop-down list and click Create Connector.
Step 1: Configure basic information
In the Configure Basic Information step, set the connector name and review the instance details.
| Parameter | Description | Example |
|---|---|---|
| Name | A unique name within the ApsaraMQ for Kafka instance. Use 1 to 48 characters: digits, lowercase letters, and hyphens (-). Cannot start with a hyphen. The connector automatically creates a consumer group named connect-<connector-name>. | kafka-elasticsearch-sink |
| Instance | Displays the name and ID of the current instance. | demo alikafka_post-cn-st21p8vj**** |
By default, Authorize to Create Service Linked Role is selected. ApsaraMQ for Kafka creates a service-linked role if one does not already exist.
Click Next.
Step 2: Configure the source service
In the Configure Source Service step, select Message Queue for Apache Kafka as the source service and configure the following parameters.
| Parameter | Description | Example |
|---|---|---|
| Data Source Topic | The topic from which data is consumed. | elasticsearch-test-input |
| Consumer Thread Concurrency | Number of concurrent consumer threads. Valid values: 1, 2, 3, 6, 12. Default: 6. | 6 |
| Consumer Offset | Where to start consuming. Earliest Offset reads from the beginning. Latest Offset reads only new messages. | Earliest Offset |
Click Configure Runtime Environment to expand additional parameters.
| Parameter | Description | Example |
|---|---|---|
| VPC ID | VPC of the source instance. Auto-populated; no change needed. | vpc-bp1xpdnd3l*** |
| vSwitch ID | vSwitch of the source instance. Must be in the same VPC. | vsw-bp1d2jgg81*** |
| Failure Handling Policy | Action to take when a message fails. Continue Subscription logs the error and keeps consuming. Stop Subscription logs the error and stops the partition. See Manage a connector for log details and Error codes for troubleshooting. Note
| Continue Subscription |
| Resource Creation Method | Auto creates the required internal topics automatically. Manual lets you create them yourself. | Auto |
| Connector Consumer Group | Consumer group for the connector task. Format: connect-<connector-name>. | connect-kafka-elasticsearch-sink |
Internal topics (manual creation only)
If you set Resource Creation Method to Manual, create the following topics. All topics that require Local Storage are available only on Professional Edition instances.
| Parameter | Naming convention | Partitions | Storage engine | cleanup.policy |
|---|---|---|---|---|
| Task Offset Topic | connect-offset-* | More than 1 | Local Storage | Compact |
| Task Configuration Topic | connect-config-* | Exactly 1 | Local Storage | Compact |
| Task Status Topic | connect-status-* | 6 (recommended) | Local Storage | Compact |
| Dead-letter Queue Topic | connect-error-* | 6 (recommended) | Local Storage or Cloud Storage | -- |
| Error Data Topic | connect-error-* | 6 (recommended) | Local Storage or Cloud Storage | -- |
To save topic resources, use the same topic for both the dead-letter queue topic and the error data topic.
Click Next.
Step 3: Configure the destination service
In the Configure Destination Service step, select Elasticsearch as the destination service and configure the following parameters.
| Parameter | Description | Example |
|---|---|---|
| Elasticsearch Instance ID | The ID of the Elasticsearch cluster. | es-cn-oew1o67x0000**** |
| Endpoint | The public or private endpoint of the cluster. See View cluster basic information. | es-cn-oew1o67x0000****.elasticsearch.aliyuncs.com |
| Port | 9200 for HTTP/HTTPS, or 9300 for TCP. | 9300 |
| Username | The Elasticsearch username. Default: elastic. Customize through X-Pack RBAC if needed. The account must have write permissions on the target index. | elastic |
| Password | The password set during cluster creation. Reset the password if you forgot it. | ******** |
| Index | The target Elasticsearch index name. | elastic_test |
- The username and password are passed to Function Compute as environment variables when the connector task is created. ApsaraMQ for Kafka does not store these credentials after task creation.
- The account must have permissions to write to the index, because messages are shipped through the Elasticsearch Bulk API.
Click Create.
Deploy the connector
After creation, the connector appears on the Connectors page. Click Deploy in the Actions column to start the connector.
Configure the Function Compute service
After you deploy the connector, Function Compute automatically creates a service named kafka-service-<connector-name>-<random-string>. If the Elasticsearch cluster uses a private endpoint, configure the Function Compute service to use the same VPC and vSwitch as the Elasticsearch cluster.
On the Connectors page, find the connector. In the Actions column, choose More > Configure Function.
In the Function Compute console, locate the auto-created service and update the VPC and vSwitch settings to match the Elasticsearch cluster.
Verify the data flow
Send a test message to confirm that data flows from ApsaraMQ for Kafka to Elasticsearch.
Send a test message
On the Connectors page, find the connector and click Test in the Actions column.
In the Send Message panel, set Method of Sending to Console.
In the Message Key field, enter a key, for example,
demo.In the Message Content field, enter a JSON body, for example:
{"key": "test"}For Send to Specified Partition, click Yes and enter a Partition ID (for example,
0) to target a specific partition, or click No to let the system assign one. To look up partition IDs, see View partition status.
You can also send test messages through Docker or an SDK. Select the corresponding option in the Method of Sending field and follow the on-screen instructions.
Check the Elasticsearch index
Run the following query to search the target index:
GET /<index_name>/_searchConfirm that the response contains the message you sent. A successful response looks like this:
{ "took": 8, "timed_out": false, "_shards": { "total": 5, "successful": 5, "skipped": 0, "failed": 0 }, "hits": { "total": { "value": 1, "relation": "eq" }, "max_score": 1.0, "hits": [ { "_index": "product_****", "_type": "_doc", "_id": "TX3TZHgBfHNEDGoZ****", "_score": 1.0, "_source": { "msg_body": { "key": "test", "offset": 2, "overflowFlag": false, "partition": 2, "timestamp": 1616599282417, "topic": "dv****", "value": "test1", "valueSize": 8 }, "doc_as_upsert": true } } ] } }
Troubleshooting
| Symptom | Possible cause | Resolution |
|---|---|---|
| Connector fails to write to Elasticsearch | Function Compute cannot reach the Elasticsearch cluster | Verify that the Function Compute service uses the same VPC and vSwitch as the Elasticsearch cluster. See Configure the Function Compute service. |
| Messages are not consumed | Incorrect consumer offset setting | Check the Consumer Offset setting. Use Earliest Offset to read all existing messages, or Latest Offset for new messages only. |
| Authentication errors | Invalid credentials or insufficient permissions | Confirm that the username and password are correct and that the account has write permissions on the target index. |
For Function Compute function call logs, see Configure the logging feature.