All Products
Search
Document Center

ApsaraMQ for Kafka:Create an Elasticsearch sink connector

Last Updated:Mar 11, 2026

Create a sink connector to continuously export messages from an ApsaraMQ for Kafka topic to an Elasticsearch index. The connector reads messages from Kafka, encodes them in the format you specify, and writes them to Elasticsearch through Function Compute.

Before you begin

Complete the general prerequisites for connectors. See Prerequisites.

Gather the following information before you start:

ItemWhere to find it
ApsaraMQ for Kafka instance IDOverview page in the ApsaraMQ for Kafka console
Source topic nameTopics page in the ApsaraMQ for Kafka console
Consumer group IDGroups page in the ApsaraMQ for Kafka console
Elasticsearch cluster IDInstances page in the Elasticsearch console
Elasticsearch logon name and passwordCredentials set during Elasticsearch cluster creation
Elasticsearch index nameKibana console of the Elasticsearch cluster

Step 1: Prepare Elasticsearch resources

  1. Create an Elasticsearch instance and an index in the Elasticsearch console. See Getting started.

  2. If the connector accesses Elasticsearch through a Virtual Private Cloud (VPC), add the CIDR block of the Function Compute endpoint to the IP address whitelist of the Elasticsearch cluster. See Configure a public or private IP address whitelist for an Elasticsearch cluster.

Step 2: Create the connector

  1. Log on to the ApsaraMQ for Kafka console. In the Resource Distribution section of the Overview page, select the region where your instance resides.

  2. In the left-side navigation pane, choose Connector Ecosystem Integration > Tasks.

  3. On the Tasks page, click Create Task.

  4. On the Create Task page, set Task Name and Description, then configure the source, filtering, transformation, and sink settings described in the following sections.

Configure the source

In the Source step, set Data Provider to Message Queue for Apache Kafka and configure the following parameters. Click Next Step when finished.

ParameterDescriptionExample
RegionRegion where the ApsaraMQ for Kafka instance resides.China (Hangzhou)
Message Queue for Apache Kafka InstanceID of the instance that produces the messages to export.alikafka_post-cn-9hdsbdhd\*\*\*\*
TopicTopic that contains the messages to export.guide-sink-topic
Group IDConsumer group used by the connector. Quickly Create -- the system creates a group with the ID format GID_EVENTBRIDGE_xxx. Use Existing Group -- select an existing group that is not in use. Selecting a group already in use affects existing message subscriptions.Use Existing Group
Consumer OffsetStarting position for message consumption. Latest Offset -- start from the most recent message. Earliest Offset -- start from the oldest available message.Latest Offset
Network ConfigurationSelect Internet for cross-border data transmission. Select Basic Network in other cases.Basic Network
Data FormatEncoding applied to binary message data before delivery. Json (default) -- encodes binary data as JSON using UTF-8 and places it in the payload. Text -- encodes binary data as strings using UTF-8 and places it in the payload. Binary -- encodes binary data as strings using Base64 and places it in the payload.Json
MessagesMaximum number of messages sent per function invocation. Messages are sent only when the backlog reaches this value. Valid values: 1 to 10,000.2000
Interval (Unit: Seconds)Interval at which aggregated messages are sent to Function Compute. Valid values: 0 to 15 seconds. A value of 0 sends messages immediately after aggregation.3

Configure filtering and transformation

  1. In the Filtering step, define a data pattern to filter messages. See Message filtering.

  2. In the Transformation step, specify a data cleansing method to process messages through splitting, mapping, enrichment, or dynamic routing. See Use Function Compute to perform message cleansing.

Configure the sink

In the Sink step, set Service Type to Elasticsearch acs.elasticSearch and configure the following parameters.

ParameterDescriptionExample
Elasticsearch ClusterID of the Elasticsearch cluster to write to.es-cn-pe336j0gj001e\*\*\*\*
Cluster Logon NameUsername for the Elasticsearch cluster. The default is elastic.elastic
Cluster Logon PasswordPassword set during Elasticsearch cluster creation.\*\*\*\*\*\*
Index NameTarget index in Elasticsearch. Accepts a string constant or a JSONPath expression (such as $.data.key) to extract the index name dynamically from the message.product_info
Document TypeDocument type for indexing. Accepts a string constant or a JSONPath expression. Available only for Elasticsearch versions earlier than 7. Default: _doc.\_doc
DocumentScope of the event data to deliver. Select Complete Event to send the full message, or Partial Event to send specific fields extracted by a JSONPath rule.Complete Event
Network SettingsNetwork path for message delivery. VPC -- deliver through a VPC. Internet -- deliver over the public internet.Internet
VPCVPC to which the Elasticsearch instance belongs. Required when Network Settings is set to VPC.vpc-bp17fapfdj0dwzjkd\*\*\*\*
vSwitchvSwitch to which the Elasticsearch instance belongs. Required when Network Settings is set to VPC.vsw-bp1gbjhj53hdjdkg\*\*\*\*
Security GroupSecurity group of the Elasticsearch instance. Required when Network Settings is set to VPC.test\_group

Configure the task property

Configure the retry policy for failed event delivery and the fault handling method. See Retry policies and dead-letter queues.

Save and verify

Click Save. On the Tasks page, find the connector. When the Status column changes from Starting to Running, the connector is active and processing messages.

Step 3: Verify the data flow

After the connector starts running, send a test message through the ApsaraMQ for Kafka console and confirm it arrives in Elasticsearch.

  1. On the Tasks page, find the connector and click the source topic name in the Event Source column.

  2. On the Topic Details page, click Send Message.

  3. In the Start to Send and Consume Message panel, enter a test message and click OK. For example, enter the following JSON payload:

       {
         "product_id": "12345",
         "product_name": "Example Product",
         "price": 99.99
       }

    Send a test message

  4. Log on to the Elasticsearch console and open Kibana for the target cluster. See Getting started.

  5. In the Kibana console, run the following command to confirm that the test message was written to the index: Replace <index-name> with the name of the index you created in Step 1. A successful result contains the test message in the hits array:

       GET /<index-name>/_search
       {
         "hits": {
           "total": { "value": 1 },
           "hits": [
             {
               "_source": {
                 "product_id": "12345",
                 "product_name": "Example Product",
                 "price": 99.99
               }
             }
           ]
         }
       }

    Verification result in Kibana