An AnalyticDB sink connector synchronizes data from a source topic in an ApsaraMQ for Kafka instance to an AnalyticDB for MySQL or AnalyticDB for PostgreSQL table.
Before you begin
Make sure the general connector prerequisites are met. For details, see Prerequisites.
Gather the following information before you start:
| Item | Details |
|---|---|
| ApsaraMQ for Kafka instance ID | The instance that produces messages |
| Source topic name | The topic that contains the messages to route |
| Consumer group ID | A dedicated consumer group for this connector. Do not reuse an existing consumer group, or in-flight messages may be lost |
| AnalyticDB instance | An AnalyticDB for MySQL or AnalyticDB for PostgreSQL instance with a database, a database account, and a target table |
| Database credentials | Username and password for the AnalyticDB database account |
| Network IDs | Virtual Private Cloud (VPC), vSwitch, and security group IDs (if your connector uses VPC networking) |
If you have not created your AnalyticDB resources yet, start with Step 1: Create AnalyticDB resources.
Step 1: Create AnalyticDB resources
Create the AnalyticDB cluster or instance, a database account, a database, and a target table. Choose the instructions for your AnalyticDB engine:
AnalyticDB for MySQL
In the AnalyticDB for MySQL console, complete the following steps:
AnalyticDB for PostgreSQL
In the AnalyticDB for PostgreSQL console, complete the following steps:
The examples in this topic use an AnalyticDB for MySQL database named adb_sink_database and a table named adb_sink_table.
Step 2: Create and enable the connector
Log on to the ApsaraMQ for Kafka console.
In the Resource Distribution section of the Overview page, select the region of your ApsaraMQ for Kafka instance.
In the left-side navigation pane, choose Connector Ecosystem Integration > Tasks.
On the Tasks page, click Create Task.
Configure the source. In the Source step, set Data Provider to ApsaraMQ for Kafka and configure the following parameters. Then, click Next Step.
Parameter Description Example Region The region of the ApsaraMQ for Kafka instance. China (Beijing) ApsaraMQ for Kafka Instance The instance that produces the messages to route. MQ_INST_115964845466\*\*\*\*\_ByBeUp3p Topic The source topic that contains the messages to route. topic Group ID A consumer group dedicated to this connector. Do not reuse a consumer group that is already in use; otherwise, existing messages may fail to be sent or received. GID_http_1 Consumer Offset The offset from which to start consuming messages. Latest Offset Network Configuration The network type for message routing. Basic Network VPC The VPC ID. Required only when Network Configuration is set to Self-managed Internet. vpc-bp17fapfdj0dwzjkd\*\*\*\* vSwitch The vSwitch ID. Required only when Network Configuration is set to Self-managed Internet. vsw-bp1gbjhj53hdjdkg\*\*\*\* Security Group The security group ID. Required only when Network Configuration is set to Self-managed Internet. alikafka_pre-cn-7mz2\*\*\*\* Messages The maximum number of messages per function invocation. Requests are sent only when the backlog reaches this value. Valid values: 1 to 10000. 100 Interval (Unit: Seconds) The interval (in seconds) at which aggregated messages are sent to Function Compute. Valid values: 0 to 15. Set to 0 to send messages immediately after aggregation. 3 Configure filtering. In the Filtering step, define an event pattern in the Pattern Content code editor to filter messages before they reach the sink. For details, see Event patterns.
Configure transformation. In the Transformation step, specify a data cleansing method to process messages. Supported operations include splitting, mapping, enrichment, and dynamic routing. For details, see Data cleansing.
Configure the sink. In the Sink step, set Service Type to AnalyticDB and configure the following parameters. Then, click Save.
Parameter Description Example Instance Type The AnalyticDB engine type. Valid values: AnalyticDB for MySQL, AnalyticDB for PostgreSQL. AnalyticDB for MySQL AnalyticDB Instance ID The ID of the AnalyticDB instance. gp-bp10uo5n536wd\*\*\*\* Database Name The name of the target database. adb_sink_database Table Name The name of the target table. adb_sink_table Data Mapping JSONPath rules that map Kafka message fields to table columns. See Data mapping format for details. $.data.value.useridDatabase Username The username of the database account. user Database Password The password of the database account. \*\*\*\*\*\* Network Settings The network type for delivering messages to AnalyticDB. VPC: deliver through a VPC. Internet: deliver over the public internet. VPC VPC The VPC ID. Required only when Network Settings is set to VPC. vpc-bp17fapfdj0dwzjkd\*\*\*\* vSwitch The vSwitch ID. Required only when Network Settings is set to VPC. ImportantAdd the CIDR block of the selected vSwitch to the IP address whitelist of your AnalyticDB instance. For details, see Configure an IP address whitelist.
vsw-bp1gbjhj53hdjdkg\*\*\*\* Security Group The security group ID. Required only when Network Settings is set to VPC. test_group Enable the connector. Go back to the Tasks page, find the AnalyticDB sink connector, and click Enable in the Actions column. Click OK in the confirmation dialog. The connector takes 30 to 60 seconds to start. Track the progress in the Status column.
Data mapping format
When Data Format is set to Json in the Source step, each message forwarded from ApsaraMQ for Kafka has the following structure:
{
"data": {
"topic": "demo-topic",
"partition": 0,
"offset": 2,
"timestamp": 1739756629123,
"headers": {
"headers": [],
"isReadOnly": false
},
"key": "adb-sink-k1",
"value": {
"userid": "xiaoming",
"source": "shanghai"
}
},
"id": "7702ca16-f944-4b08-***-***-0-2",
"source": "acs:alikafka",
"specversion": "1.0",
"type": "alikafka:Topic:Message",
"datacontenttype": "application/json; charset=utf-8",
"time": "2025-02-17T01:43:49.123Z",
"subject": "acs:alikafka:alikafka_serverless-cn-lf6418u6701:topic:demo-topic",
"aliyunaccountid": "1******6789"
}To map a message field to a table column, write a JSONPath expression that targets the field. The expression must correspond to the column name in the target table.
Example mapping:
| Table column | JSONPath expression | Extracted value |
|---|---|---|
userid | $.data.value.userid | xiaoming |
source | $.data.value.source | shanghai |
topic | $.data.topic | demo-topic |
offset | $.data.offset | 2 |
Step 3: Verify the connector
After the connector is enabled, send a test message and check that it arrives in the AnalyticDB table.
On the Tasks page, find the connector and click the source topic name in the Event Source column.
On the Topic Details page, click Send Message.
In the Start to Send and Consume Message panel, enter a JSON message that contains values for every column in the target table, then click OK.
NoteThe message body must be a JSON object whose field names match the column names in the target table. The connector writes each matching field to the corresponding column.

On the Tasks page, click the destination instance name in the Event Target column.
In the upper-right corner of the Basic Information page, click Log on to Database.
In the Data Management (DMS) console, run the following query to check the data: If the connector is working correctly, the query returns the values from the test message.
SELECT * FROM adb_sink_table;