All Products
Search
Document Center

ApsaraMQ for Kafka:Create an Elasticsearch sink connector

Last Updated:Mar 11, 2026

An Elasticsearch sink connector reads messages from a topic in your ApsaraMQ for Kafka instance and writes them to an Elasticsearch index. The connector uses Function Compute as an intermediary: it consumes messages from the source topic, passes them to a Function Compute function, and the function writes them to Elasticsearch through the Bulk API. Each message becomes a document in the target index, with metadata such as the topic, partition, offset, and timestamp.

Before you begin

Complete the following setup before you create the connector.

ApsaraMQ for Kafka

Function Compute

Elasticsearch

Information to collect

Gather the following details before you start the wizard.

InformationWhere to find itExample
Elasticsearch instance IDElasticsearch consolees-cn-oew1o67x0000****
Elasticsearch endpoint (public or private)Cluster basic informationes-cn-oew1o67x0000****.elasticsearch.aliyuncs.com
Elasticsearch portCluster basic information9200 (HTTP/HTTPS) or 9300 (TCP)
Elasticsearch username and passwordSet during cluster creation; reset if neededelastic / ********
Elasticsearch index nameElasticsearch consoleelastic_test
Source topic nameApsaraMQ for Kafka consoleelasticsearch-test-input

Limits

  • The ApsaraMQ for Kafka instance and the Elasticsearch cluster must be in the same region.

  • ApsaraMQ for Kafka serializes messages as UTF-8 strings. Binary data is not supported.

  • If you specify the private endpoint of the Elasticsearch cluster, Function Compute cannot access it by default. To enable connectivity, configure the Function Compute service to use the same VPC and vSwitch as the Elasticsearch cluster. See Configure the Function Compute service.

  • For additional connector limits, see Limits.

Billing

The connector uses Function Compute to export data. Function Compute provides a free tier. Usage beyond the free tier is billed according to Function Compute billing.

Create and deploy the connector

  1. Log on to the ApsaraMQ for Kafka console.

  2. In the Resource Distribution section of the Overview page, select your region.

  3. In the left-side navigation pane, click Connectors.

  4. On the Connectors page, select your instance from the Select Instance drop-down list and click Create Connector.

Step 1: Configure basic information

In the Configure Basic Information step, set the connector name and review the instance details.

ParameterDescriptionExample
NameA unique name within the ApsaraMQ for Kafka instance. Use 1 to 48 characters: digits, lowercase letters, and hyphens (-). Cannot start with a hyphen. The connector automatically creates a consumer group named connect-<connector-name>.kafka-elasticsearch-sink
InstanceDisplays the name and ID of the current instance.demo alikafka_post-cn-st21p8vj****
Important

By default, Authorize to Create Service Linked Role is selected. ApsaraMQ for Kafka creates a service-linked role if one does not already exist.

Click Next.

Step 2: Configure the source service

In the Configure Source Service step, select Message Queue for Apache Kafka as the source service and configure the following parameters.

ParameterDescriptionExample
Data Source TopicThe topic from which data is consumed.elasticsearch-test-input
Consumer Thread ConcurrencyNumber of concurrent consumer threads. Valid values: 1, 2, 3, 6, 12. Default: 6.6
Consumer OffsetWhere to start consuming. Earliest Offset reads from the beginning. Latest Offset reads only new messages.Earliest Offset

Click Configure Runtime Environment to expand additional parameters.

ParameterDescriptionExample
VPC IDVPC of the source instance. Auto-populated; no change needed.vpc-bp1xpdnd3l***
vSwitch IDvSwitch of the source instance. Must be in the same VPC.vsw-bp1d2jgg81***
Failure Handling PolicyAction to take when a message fails. Continue Subscription logs the error and keeps consuming. Stop Subscription logs the error and stops the partition. See Manage a connector for log details and Error codes for troubleshooting.
Note
Continue Subscription
Resource Creation MethodAuto creates the required internal topics automatically. Manual lets you create them yourself.Auto
Connector Consumer GroupConsumer group for the connector task. Format: connect-<connector-name>.connect-kafka-elasticsearch-sink

Internal topics (manual creation only)

If you set Resource Creation Method to Manual, create the following topics. All topics that require Local Storage are available only on Professional Edition instances.

ParameterNaming conventionPartitionsStorage enginecleanup.policy
Task Offset Topicconnect-offset-*More than 1Local StorageCompact
Task Configuration Topicconnect-config-*Exactly 1Local StorageCompact
Task Status Topicconnect-status-*6 (recommended)Local StorageCompact
Dead-letter Queue Topicconnect-error-*6 (recommended)Local Storage or Cloud Storage--
Error Data Topicconnect-error-*6 (recommended)Local Storage or Cloud Storage--
Note

To save topic resources, use the same topic for both the dead-letter queue topic and the error data topic.

Click Next.

Step 3: Configure the destination service

In the Configure Destination Service step, select Elasticsearch as the destination service and configure the following parameters.

ParameterDescriptionExample
Elasticsearch Instance IDThe ID of the Elasticsearch cluster.es-cn-oew1o67x0000****
EndpointThe public or private endpoint of the cluster. See View cluster basic information.es-cn-oew1o67x0000****.elasticsearch.aliyuncs.com
Port9200 for HTTP/HTTPS, or 9300 for TCP.9300
UsernameThe Elasticsearch username. Default: elastic. Customize through X-Pack RBAC if needed. The account must have write permissions on the target index.elastic
PasswordThe password set during cluster creation. Reset the password if you forgot it.********
IndexThe target Elasticsearch index name.elastic_test
Note
  • The username and password are passed to Function Compute as environment variables when the connector task is created. ApsaraMQ for Kafka does not store these credentials after task creation.
  • The account must have permissions to write to the index, because messages are shipped through the Elasticsearch Bulk API.

Click Create.

Deploy the connector

After creation, the connector appears on the Connectors page. Click Deploy in the Actions column to start the connector.

Configure the Function Compute service

After you deploy the connector, Function Compute automatically creates a service named kafka-service-<connector-name>-<random-string>. If the Elasticsearch cluster uses a private endpoint, configure the Function Compute service to use the same VPC and vSwitch as the Elasticsearch cluster.

  1. On the Connectors page, find the connector. In the Actions column, choose More > Configure Function.

  2. In the Function Compute console, locate the auto-created service and update the VPC and vSwitch settings to match the Elasticsearch cluster.

Verify the data flow

Send a test message to confirm that data flows from ApsaraMQ for Kafka to Elasticsearch.

Send a test message

  1. On the Connectors page, find the connector and click Test in the Actions column.

  2. In the Send Message panel, set Method of Sending to Console.

  3. In the Message Key field, enter a key, for example, demo.

  4. In the Message Content field, enter a JSON body, for example:

       {"key": "test"}
  5. For Send to Specified Partition, click Yes and enter a Partition ID (for example, 0) to target a specific partition, or click No to let the system assign one. To look up partition IDs, see View partition status.

You can also send test messages through Docker or an SDK. Select the corresponding option in the Method of Sending field and follow the on-screen instructions.

Check the Elasticsearch index

  1. Log on to the Kibana console.

  2. Run the following query to search the target index:

       GET /<index_name>/_search
  3. Confirm that the response contains the message you sent. A successful response looks like this:

       {
         "took": 8,
         "timed_out": false,
         "_shards": {
           "total": 5,
           "successful": 5,
           "skipped": 0,
           "failed": 0
         },
         "hits": {
           "total": {
             "value": 1,
             "relation": "eq"
           },
           "max_score": 1.0,
           "hits": [
             {
               "_index": "product_****",
               "_type": "_doc",
               "_id": "TX3TZHgBfHNEDGoZ****",
               "_score": 1.0,
               "_source": {
                 "msg_body": {
                   "key": "test",
                   "offset": 2,
                   "overflowFlag": false,
                   "partition": 2,
                   "timestamp": 1616599282417,
                   "topic": "dv****",
                   "value": "test1",
                   "valueSize": 8
                 },
                 "doc_as_upsert": true
               }
             }
           ]
         }
       }

Troubleshooting

SymptomPossible causeResolution
Connector fails to write to ElasticsearchFunction Compute cannot reach the Elasticsearch clusterVerify that the Function Compute service uses the same VPC and vSwitch as the Elasticsearch cluster. See Configure the Function Compute service.
Messages are not consumedIncorrect consumer offset settingCheck the Consumer Offset setting. Use Earliest Offset to read all existing messages, or Latest Offset for new messages only.
Authentication errorsInvalid credentials or insufficient permissionsConfirm that the username and password are correct and that the account has write permissions on the target index.

For Function Compute function call logs, see Configure the logging feature.