A sink connector synchronizes data from one ApsaraMQ for Kafka instance to another through EventBridge. The connector consumes messages from a source topic, applies optional filtering and transformation, and delivers them to a destination topic on another instance.
How it works
A sink connector bridges two ApsaraMQ for Kafka instances through EventBridge:
EventBridge consumes messages from a topic on the source instance.
Messages pass through optional filtering and transformation steps.
EventBridge delivers the processed messages to a topic on the destination instance.
Prerequisites
Before you begin, make sure that you have:
Two ApsaraMQ for Kafka instances in the Running state. For more information, see Step 2: Purchase and deploy an instance
EventBridge activated and the required permissions granted to a Resource Access Management (RAM) user. For more information, see Activate EventBridge and grant permissions to a RAM user
Step 1: Create a task
Log on to the ApsaraMQ for Kafka console.
In the Resource Distribution section of the Overview page, select the region where your source ApsaraMQ for Kafka instance resides.
In the left-side navigation pane, choose Connector Ecosystem Integration > Tasks.
On the Tasks page, click Create Task.
Enter a Task Name and Description, then proceed with the following configuration steps.
Step 2: Configure the source
In the Source step, set Data Provider to ApsaraMQ for Kafka and configure the following parameters. Click Next Step when finished.
Required parameters
Parameter | Description | Example |
Region | The region of the source ApsaraMQ for Kafka instance. | China (Beijing) |
ApsaraMQ for Kafka Instance | The instance that produces the messages to route. | MQ_INST_115964845466\*\*\*\*\_ByBeUp3p |
Topic | The topic that contains the messages to route. | topic |
Group ID | A dedicated consumer group on the source instance. See the warning below. | GID_http_1 |
Consumer Offset | The offset from which to start consuming messages. | Latest Offset |
Network Configuration | The network type for routing messages. | Basic Network |
Use a dedicated consumer group for this connector. Reusing a consumer group that other consumers are already using may cause existing messages to fail to be sent and received.
Conditional parameters (Self-managed Internet only)
The following parameters are required only when Network Configuration is set to Self-managed Internet.
Parameter | Description | Example |
VPC | The ID of the Virtual Private Cloud (VPC) where the ApsaraMQ for Kafka instance is deployed. | vpc-bp17fapfdj0dwzjkd\*\*\*\* |
vSwitch | The ID of the vSwitch associated with the instance. | vsw-bp1gbjhj53hdjdkg\*\*\*\* |
Security Group | The security group that the instance belongs to. | alikafka\_pre-cn-7mz2\*\*\*\* |
Batching parameters
Parameter | Description | Valid values | Example |
Messages | The maximum number of messages that can be sent in each function invocation. Messages are sent only when the backlog reaches this value. | 1 to 10,000 | 100 |
Interval (Unit: Seconds) | The interval at which the function is invoked to send aggregated messages to Function Compute. Set to 0 to send immediately after aggregation. | 0 to 15 | 3 |
Step 3: Configure filtering (optional)
In the Filtering step, define a data pattern in the Pattern Content code editor to filter messages before they reach the destination. Only messages that match the pattern are forwarded.
For more information, see Event patterns.
Step 4: Configure transformation (optional)
In the Transformation step, specify a data cleansing method to process messages in transit. Supported operations include splitting, mapping, enrichment, and dynamic routing.
For more information, see Data cleansing.
Step 5: Configure the sink
In the Sink step, set Service Type to ApsaraMQ for Kafka and configure the following parameters.
Parameter | Description | Example |
Instance ID | The destination ApsaraMQ for Kafka instance. | test |
Topic | The topic on the destination instance. | test |
Acknowledgment Mode | The acknowledgment mode for received messages. Valid values: None, LeaderOnly, All. | None |
Message Value | How EventBridge extracts the message value using JSONPath. Valid values: Complete Data, Data Extraction, Fixed Value, Template. | Data Extraction: |
Message Key | How EventBridge extracts the message key using JSONPath. Valid values: Empty, Data Extraction, Fixed Value, Template. | Data Extraction: |
Step 6: Configure task properties
Set up the retry policy and dead-letter queue. These settings control how failed deliveries are handled.
For more information, see Retry policies and dead-letter queues.
After you finish all configuration steps, click Save.
Step 7: Enable the connector
Go back to the Tasks page and find the connector you created.
In the Actions column, click Enable.
In the Note message, click OK.
The connector takes 30 to 60 seconds to be enabled. Monitor the Status column on the Tasks page to track progress.
Manage the connector
On the Tasks page, find the connector and use the Actions column to perform these operations:
Operation | Description |
Details | View basic information, properties, and monitoring metrics. |
Edit | Modify the configuration in the Edit Task panel. |
Enable or Pause | Enable or disable the connector. Click OK in the Note message to confirm. |
Delete | Remove the connector. Click OK in the Note message to confirm. |