All Products
Search
Document Center

ApsaraMQ for Kafka:Create an AnalyticDB sink connector

Last Updated:Mar 11, 2026

An AnalyticDB sink connector reads messages from an ApsaraMQ for Kafka topic and writes them to an AnalyticDB for MySQL or AnalyticDB for PostgreSQL database. The connector uses Function Compute to transfer data between services within the same region.

How it works

The data flows through three components:

  1. ApsaraMQ for Kafka reads messages from the data source topic.

  2. Function Compute receives the messages and writes them to the destination database.

  3. AnalyticDB stores the data in the specified table.

Internally, the connector uses five topics (for offsets, configuration, status, dead-letter queue, and error data) and one consumer group. You can create these resources automatically or manually.

Prerequisites

Before you begin, make sure that you have:

Gather the following information for the connector configuration:

InformationDescriptionExample
Data source topic nameThe Kafka topic to export data fromadb-test-input
AnalyticDB instance IDThe destination database instance IDam-bp139yqk8u1ik****
Database nameThe destination databaseadb_demo
Table nameThe destination tableuser
Database usernameThe database login usernameadbmysql
Database passwordThe database login password********

Usage notes

  • Data export is limited to the same region. Cross-region export is not supported. For more information, see Limits.

  • Function Compute provides a free resource quota. Usage beyond the free quota is billed according to Function Compute pricing.

  • ApsaraMQ for Kafka serializes messages as UTF-8-encoded strings. Binary data is not supported.

  • If the destination database uses a private endpoint, configure the same virtual private cloud (VPC) and vSwitch for the Function Compute service. Otherwise, Function Compute cannot reach the database. For more information, see Update a service.

  • ApsaraMQ for Kafka automatically creates a service-linked role when you create a connector, if one does not already exist.

  • To troubleshoot function execution issues, use Function Compute logging. For more information, see Configure logging.

Set up the connector

To set up the connector:

  1. (Optional) Create the required topics and consumer group

  2. Create and deploy the connector

  3. Configure Function Compute networking

  4. Add the VPC CIDR block to the AnalyticDB whitelist

  5. Verify the data export

Step 1: (Optional) Create the required topics and consumer group

To have ApsaraMQ for Kafka create these resources automatically, skip this step and set Resource Creation Method to Auto in Step 2.
Important

If your ApsaraMQ for Kafka instance runs major version 0.10.2, topics that require the Local Storage engine must be created automatically. Manual creation of Local Storage topics is not supported in this version.

The connector requires five internal topics and one consumer group. Create them manually only if you need specific configurations.

Required topics

TopicName prefixPartitionsStorage engineLog cleanup policy
Task offsetconnect-offsetGreater than 1Local StorageCompact
Task configurationconnect-config1Local StorageCompact
Task statusconnect-status6 (recommended)Local StorageCompact
Dead-letter queueconnect-error6 (recommended)Local Storage or Cloud StorageAny
Error dataconnect-error6 (recommended)Local Storage or Cloud StorageAny
Tip: The dead-letter queue topic and the error data topic can share the same topic to conserve resources.

Create a topic

  1. Log on to the ApsaraMQ for Kafka console.

  2. In the Resource Distribution section of the Overview page, select the region of your instance.

Important

You must create topics in the region where your Elastic Compute Service (ECS) instance is deployed. A topic cannot be used across regions. For example, if the producers and consumers of messages run on an ECS instance deployed in the China (Beijing) region, the topic must also be created in the China (Beijing) region.

  1. On the Instances page, click the instance name.

  2. In the left-side navigation pane, click Topics.

  3. On the Topics page, click Create Topic.

  4. In the Create Topic panel, configure the following parameters and click OK.

ParameterDescription
NameThe topic name. Use the naming prefix from the table above (for example, connect-offset-kafka-adb-sink).
PartitionsThe number of partitions. See the required values in the table above.
Storage EngineThe storage engine type. Available only on Professional Edition instances. Standard Edition instances use Cloud Storage by default. Options: Cloud Storage (Alibaba Cloud disks, 3-replica distributed storage with low latency and high reliability) or Local Storage (Apache Kafka ISR algorithm, 3-replica distributed storage). Standard (High Write) instances support only Cloud Storage.
Message TypeThe message ordering guarantee. Normal Message (default for Cloud Storage): partition order may not be preserved during broker failures. Partitionally Ordered Message (default for Local Storage): order is preserved during broker failures, but affected partitions are unavailable until restored.
Log Cleanup PolicyRequired when Storage Engine is Local Storage. Delete (default): retains messages by retention period; deletes the oldest messages when storage exceeds 85%. Compact: retains only the latest value per key. Required for connector internal topics that use Kafka Connect.
Important

Log-compacted topics can only be used in specific cloud-native components, such as Kafka Connect and Confluent Schema Registry. For more information, see aliware-kafka-demos.

DescriptionAn optional description.
TagOptional tags.
  1. Repeat to create all five required topics.

Create the consumer group

The connector requires a consumer group named connect-<connector-name> (for example, connect-kafka-adb-sink).

  1. Log on to the ApsaraMQ for Kafka console.

  2. In the Resource Distribution section of the Overview page, select the region of your instance.

  3. On the Instances page, click the instance name.

  4. In the left-side navigation pane, click Groups.

  5. On the Groups page, click Create Group.

  6. In the Create Group panel, enter the group name in the Group ID field, add an optional description and tags, and then click OK.

Step 2: Create and deploy the connector

  1. Log on to the ApsaraMQ for Kafka console.

  2. In the Resource Distribution section of the Overview page, select the region of your instance.

  3. On the Instances page, click the instance name.

  4. In the left-side navigation pane, click Connectors.

  5. On the Connectors page, click Create Connector.

  6. Complete the Create Connector wizard:

Configure basic information

ParameterDescriptionExample
NameThe connector name. 1-48 characters: digits, lowercase letters, and hyphens (-). Cannot start with a hyphen. Must be unique within the instance.kafka-adb-sink
InstanceThe ApsaraMQ for Kafka instance. Displays the instance name and ID.demo alikafka_post-cn-st21p8vj****

Click Next.

Configure source service

Select Message Queue for Apache Kafka as the source service and configure the following parameters.

ParameterDescriptionExample
Data Source TopicThe topic to export data from.adb-test-input
Consumer Thread ConcurrencyThe number of concurrent consumer threads. Default: 6. Valid values: 1, 2, 3, 6, 12.6
Consumer OffsetWhere to start consuming. Earliest Offset: start from the beginning. Latest Offset: start from the most recent message.Earliest Offset

Click Configure Runtime Environment to expand advanced settings.

ParameterDescriptionExample
VPC IDThe VPC for the export task. Defaults to the VPC of the ApsaraMQ for Kafka instance.vpc-bp1xpdnd3l***
vSwitch IDThe vSwitch for the export task. Must be in the same VPC as the instance.vsw-bp1d2jgg81***
Failure Handling PolicyAction on message send failure. Continue Subscription: continue consuming and log the error. Stop Subscription: stop consuming and log the error. For log details, see Manage a connector. For error codes, see Error codes.Continue Subscription
Resource Creation MethodHow to create the required internal topics and consumer group. Auto: ApsaraMQ for Kafka creates them automatically. Manual: use the resources created in Step 1.Auto
Connector Consumer GroupThe consumer group for the connector. Format: connect-<connector-name>.connect-kafka-adb-sink
Task Offset TopicStores consumer offsets. Name prefix: connect-offset. Partitions: greater than 1. Storage engine: Local Storage. Cleanup policy: Compact.connect-offset-kafka-adb-sink
Task Configuration TopicStores task configurations. Name prefix: connect-config. Partitions: 1. Storage engine: Local Storage. Cleanup policy: Compact.connect-config-kafka-adb-sink
Task Status TopicStores task status. Name prefix: connect-status. Partitions: 6 (recommended). Storage engine: Local Storage. Cleanup policy: Compact.connect-status-kafka-adb-sink
Dead-letter Queue TopicStores Kafka Connect framework error data. Name prefix: connect-error. Partitions: 6 (recommended). Storage engine: Local Storage or Cloud Storage. Can share a topic with the error data topic.connect-error-kafka-adb-sink
Error Data TopicStores sink connector error data. Name prefix: connect-error. Partitions: 6 (recommended). Storage engine: Local Storage or Cloud Storage. Can share a topic with the dead-letter queue topic.connect-error-kafka-adb-sink

Click Next.

Configure destination service

Select AnalyticDB as the destination service and configure the following parameters.

ParameterDescriptionExample
Instance TypeThe destination database type: AnalyticDB for MySQL or AnalyticDB for PostgreSQL.AnalyticDB for MySQL
AnalyticDB Instance IDThe destination instance ID.am-bp139yqk8u1ik****
Database NameThe destination database.adb_demo
Table NameThe destination table for exported data.user
Database UsernameThe database login username.adbmysql
Database PasswordThe database login password. To reset a forgotten password: for AnalyticDB for MySQL, reset it in the AnalyticDB for MySQL console. For AnalyticDB for PostgreSQL, go to Account Management and click Reset Password.********
The database username and password are passed to Function Compute as environment variables when ApsaraMQ for Kafka creates the export task. ApsaraMQ for Kafka does not store these credentials after task creation.

Click Create.

  1. On the Connectors page, find the connector and click Deploy in the Actions column.

Step 3: Configure Function Compute networking

After deployment, Function Compute automatically creates a service (named kafka-service-<connector_name>-<random_string>) and a function (named fc-adb-<random_string>) for the connector.

  1. On the Connectors page, find the connector and click Configure Function in the Actions column. This opens the Function Compute console.

  2. In the Function Compute console, locate the automatically created service and configure the VPC and vSwitch to match the destination database. For more information, see Update a service.

Step 4: Add the VPC CIDR block to the AnalyticDB whitelist

Add the CIDR block of the VPC configured in Function Compute to the AnalyticDB whitelist. Find the CIDR block on the vSwitch page of the VPC console -- it is in the row that matches the VPC and vSwitch of the Function Compute service.

Step 5: Verify the data export

Send test messages

Important

Message content must be valid JSON. Each JSON key maps to a column name in the destination table, and each value maps to the column data. Verify that the JSON keys match the column names in your destination table before sending test messages.

  1. On the Connectors page, find the connector and click Test in the Actions column.

  2. In the Send Message panel, select a Sending Method:

    • Console:

      1. In the Message Key field, enter a key (for example, demo).

      2. In the Message Content field, enter JSON content (for example, {"key": "test"}).

      3. For Send to Specified Partition, select Yes and enter a Partition ID (for example, 0) to target a specific partition, or select No to let ApsaraMQ for Kafka assign the partition. For information about partition IDs, see View partition status.

    • Docker: Run the Docker command shown in the Run the Docker container to produce a sample message section.

    • SDK: Select an SDK for your programming language and an access method to send the test message.

Verify the result

After sending messages, check the destination table to confirm the data was exported:

  1. Log on to the AnalyticDB for MySQL console or the AnalyticDB for PostgreSQL console.

  2. Connect to the destination database.

  3. In the SQLConsole of the Data Management Service 5.0 console, open the destination table and confirm that the exported data is present.

What's next