An AnalyticDB sink connector reads messages from an ApsaraMQ for Kafka topic and writes them to an AnalyticDB for MySQL or AnalyticDB for PostgreSQL database. The connector uses Function Compute to transfer data between services within the same region.
How it works
The data flows through three components:
ApsaraMQ for Kafka reads messages from the data source topic.
Function Compute receives the messages and writes them to the destination database.
AnalyticDB stores the data in the specified table.
Internally, the connector uses five topics (for offsets, configuration, status, dead-letter queue, and error data) and one consumer group. You can create these resources automatically or manually.
Prerequisites
Before you begin, make sure that you have:
An ApsaraMQ for Kafka instance with the connector feature enabled
A data source topic in the ApsaraMQ for Kafka instance
An AnalyticDB destination database:
AnalyticDB for MySQL: A cluster, a database account, a client connection, and a database
AnalyticDB for PostgreSQL: An instance, a database account, and a client connection
Gather the following information for the connector configuration:
| Information | Description | Example |
|---|---|---|
| Data source topic name | The Kafka topic to export data from | adb-test-input |
| AnalyticDB instance ID | The destination database instance ID | am-bp139yqk8u1ik**** |
| Database name | The destination database | adb_demo |
| Table name | The destination table | user |
| Database username | The database login username | adbmysql |
| Database password | The database login password | ******** |
Usage notes
Data export is limited to the same region. Cross-region export is not supported. For more information, see Limits.
Function Compute provides a free resource quota. Usage beyond the free quota is billed according to Function Compute pricing.
ApsaraMQ for Kafka serializes messages as UTF-8-encoded strings. Binary data is not supported.
If the destination database uses a private endpoint, configure the same virtual private cloud (VPC) and vSwitch for the Function Compute service. Otherwise, Function Compute cannot reach the database. For more information, see Update a service.
ApsaraMQ for Kafka automatically creates a service-linked role when you create a connector, if one does not already exist.
To troubleshoot function execution issues, use Function Compute logging. For more information, see Configure logging.
Set up the connector
To set up the connector:
Step 1: (Optional) Create the required topics and consumer group
To have ApsaraMQ for Kafka create these resources automatically, skip this step and set Resource Creation Method to Auto in Step 2.
If your ApsaraMQ for Kafka instance runs major version 0.10.2, topics that require the Local Storage engine must be created automatically. Manual creation of Local Storage topics is not supported in this version.
The connector requires five internal topics and one consumer group. Create them manually only if you need specific configurations.
Required topics
| Topic | Name prefix | Partitions | Storage engine | Log cleanup policy |
|---|---|---|---|---|
| Task offset | connect-offset | Greater than 1 | Local Storage | Compact |
| Task configuration | connect-config | 1 | Local Storage | Compact |
| Task status | connect-status | 6 (recommended) | Local Storage | Compact |
| Dead-letter queue | connect-error | 6 (recommended) | Local Storage or Cloud Storage | Any |
| Error data | connect-error | 6 (recommended) | Local Storage or Cloud Storage | Any |
Tip: The dead-letter queue topic and the error data topic can share the same topic to conserve resources.
Create a topic
Log on to the ApsaraMQ for Kafka console.
In the Resource Distribution section of the Overview page, select the region of your instance.
You must create topics in the region where your Elastic Compute Service (ECS) instance is deployed. A topic cannot be used across regions. For example, if the producers and consumers of messages run on an ECS instance deployed in the China (Beijing) region, the topic must also be created in the China (Beijing) region.
On the Instances page, click the instance name.
In the left-side navigation pane, click Topics.
On the Topics page, click Create Topic.
In the Create Topic panel, configure the following parameters and click OK.
| Parameter | Description |
|---|---|
| Name | The topic name. Use the naming prefix from the table above (for example, connect-offset-kafka-adb-sink). |
| Partitions | The number of partitions. See the required values in the table above. |
| Storage Engine | The storage engine type. Available only on Professional Edition instances. Standard Edition instances use Cloud Storage by default. Options: Cloud Storage (Alibaba Cloud disks, 3-replica distributed storage with low latency and high reliability) or Local Storage (Apache Kafka ISR algorithm, 3-replica distributed storage). Standard (High Write) instances support only Cloud Storage. |
| Message Type | The message ordering guarantee. Normal Message (default for Cloud Storage): partition order may not be preserved during broker failures. Partitionally Ordered Message (default for Local Storage): order is preserved during broker failures, but affected partitions are unavailable until restored. |
| Log Cleanup Policy | Required when Storage Engine is Local Storage. Delete (default): retains messages by retention period; deletes the oldest messages when storage exceeds 85%. Compact: retains only the latest value per key. Required for connector internal topics that use Kafka Connect. Important Log-compacted topics can only be used in specific cloud-native components, such as Kafka Connect and Confluent Schema Registry. For more information, see aliware-kafka-demos. |
| Description | An optional description. |
| Tag | Optional tags. |
Repeat to create all five required topics.
Create the consumer group
The connector requires a consumer group named connect-<connector-name> (for example, connect-kafka-adb-sink).
Log on to the ApsaraMQ for Kafka console.
In the Resource Distribution section of the Overview page, select the region of your instance.
On the Instances page, click the instance name.
In the left-side navigation pane, click Groups.
On the Groups page, click Create Group.
In the Create Group panel, enter the group name in the Group ID field, add an optional description and tags, and then click OK.
Step 2: Create and deploy the connector
Log on to the ApsaraMQ for Kafka console.
In the Resource Distribution section of the Overview page, select the region of your instance.
On the Instances page, click the instance name.
In the left-side navigation pane, click Connectors.
On the Connectors page, click Create Connector.
Complete the Create Connector wizard:
Configure basic information
| Parameter | Description | Example |
|---|---|---|
| Name | The connector name. 1-48 characters: digits, lowercase letters, and hyphens (-). Cannot start with a hyphen. Must be unique within the instance. | kafka-adb-sink |
| Instance | The ApsaraMQ for Kafka instance. Displays the instance name and ID. | demo alikafka_post-cn-st21p8vj**** |
Click Next.
Configure source service
Select Message Queue for Apache Kafka as the source service and configure the following parameters.
| Parameter | Description | Example |
|---|---|---|
| Data Source Topic | The topic to export data from. | adb-test-input |
| Consumer Thread Concurrency | The number of concurrent consumer threads. Default: 6. Valid values: 1, 2, 3, 6, 12. | 6 |
| Consumer Offset | Where to start consuming. Earliest Offset: start from the beginning. Latest Offset: start from the most recent message. | Earliest Offset |
Click Configure Runtime Environment to expand advanced settings.
| Parameter | Description | Example |
|---|---|---|
| VPC ID | The VPC for the export task. Defaults to the VPC of the ApsaraMQ for Kafka instance. | vpc-bp1xpdnd3l*** |
| vSwitch ID | The vSwitch for the export task. Must be in the same VPC as the instance. | vsw-bp1d2jgg81*** |
| Failure Handling Policy | Action on message send failure. Continue Subscription: continue consuming and log the error. Stop Subscription: stop consuming and log the error. For log details, see Manage a connector. For error codes, see Error codes. | Continue Subscription |
| Resource Creation Method | How to create the required internal topics and consumer group. Auto: ApsaraMQ for Kafka creates them automatically. Manual: use the resources created in Step 1. | Auto |
| Connector Consumer Group | The consumer group for the connector. Format: connect-<connector-name>. | connect-kafka-adb-sink |
| Task Offset Topic | Stores consumer offsets. Name prefix: connect-offset. Partitions: greater than 1. Storage engine: Local Storage. Cleanup policy: Compact. | connect-offset-kafka-adb-sink |
| Task Configuration Topic | Stores task configurations. Name prefix: connect-config. Partitions: 1. Storage engine: Local Storage. Cleanup policy: Compact. | connect-config-kafka-adb-sink |
| Task Status Topic | Stores task status. Name prefix: connect-status. Partitions: 6 (recommended). Storage engine: Local Storage. Cleanup policy: Compact. | connect-status-kafka-adb-sink |
| Dead-letter Queue Topic | Stores Kafka Connect framework error data. Name prefix: connect-error. Partitions: 6 (recommended). Storage engine: Local Storage or Cloud Storage. Can share a topic with the error data topic. | connect-error-kafka-adb-sink |
| Error Data Topic | Stores sink connector error data. Name prefix: connect-error. Partitions: 6 (recommended). Storage engine: Local Storage or Cloud Storage. Can share a topic with the dead-letter queue topic. | connect-error-kafka-adb-sink |
Click Next.
Configure destination service
Select AnalyticDB as the destination service and configure the following parameters.
| Parameter | Description | Example |
|---|---|---|
| Instance Type | The destination database type: AnalyticDB for MySQL or AnalyticDB for PostgreSQL. | AnalyticDB for MySQL |
| AnalyticDB Instance ID | The destination instance ID. | am-bp139yqk8u1ik**** |
| Database Name | The destination database. | adb_demo |
| Table Name | The destination table for exported data. | user |
| Database Username | The database login username. | adbmysql |
| Database Password | The database login password. To reset a forgotten password: for AnalyticDB for MySQL, reset it in the AnalyticDB for MySQL console. For AnalyticDB for PostgreSQL, go to Account Management and click Reset Password. | ******** |
The database username and password are passed to Function Compute as environment variables when ApsaraMQ for Kafka creates the export task. ApsaraMQ for Kafka does not store these credentials after task creation.
Click Create.
On the Connectors page, find the connector and click Deploy in the Actions column.
Step 3: Configure Function Compute networking
After deployment, Function Compute automatically creates a service (named kafka-service-<connector_name>-<random_string>) and a function (named fc-adb-<random_string>) for the connector.
On the Connectors page, find the connector and click Configure Function in the Actions column. This opens the Function Compute console.
In the Function Compute console, locate the automatically created service and configure the VPC and vSwitch to match the destination database. For more information, see Update a service.
Step 4: Add the VPC CIDR block to the AnalyticDB whitelist
Add the CIDR block of the VPC configured in Function Compute to the AnalyticDB whitelist. Find the CIDR block on the vSwitch page of the VPC console -- it is in the row that matches the VPC and vSwitch of the Function Compute service.
AnalyticDB for MySQL: Configure the whitelist in the AnalyticDB for MySQL console. For more information, see Configure an IP address whitelist.
AnalyticDB for PostgreSQL: Configure the whitelist in the AnalyticDB for PostgreSQL console. For more information, see Configure an IP address whitelist.
Step 5: Verify the data export
Send test messages
Message content must be valid JSON. Each JSON key maps to a column name in the destination table, and each value maps to the column data. Verify that the JSON keys match the column names in your destination table before sending test messages.
On the Connectors page, find the connector and click Test in the Actions column.
In the Send Message panel, select a Sending Method:
Console:
In the Message Key field, enter a key (for example,
demo).In the Message Content field, enter JSON content (for example,
{"key": "test"}).For Send to Specified Partition, select Yes and enter a Partition ID (for example,
0) to target a specific partition, or select No to let ApsaraMQ for Kafka assign the partition. For information about partition IDs, see View partition status.
Docker: Run the Docker command shown in the Run the Docker container to produce a sample message section.
SDK: Select an SDK for your programming language and an access method to send the test message.
Verify the result
After sending messages, check the destination table to confirm the data was exported:
Log on to the AnalyticDB for MySQL console or the AnalyticDB for PostgreSQL console.
Connect to the destination database.
In the SQLConsole of the Data Management Service 5.0 console, open the destination table and confirm that the exported data is present.
What's next
Manage a connector: View connector status, pause, resume, or delete connectors.
Configure logging: Set up Function Compute logs to troubleshoot data export issues.