All Products
Search
Document Center

ApsaraMQ for Kafka:Create Function Compute sink connectors

Last Updated:Mar 10, 2025

This topic describes how to create a sink connector in the ApsaraMQ for RocketMQ console to synchronize data from an ApsaraMQ for RocketMQ instance to a Function Compute function.

Prerequisites

What is Function Compute?

Function Compute is a fully managed and event-driven serverless computing service. Function Compute allows you to focus on writing and uploading code without the need to manage infrastructure resources such as servers. Function Compute prepares elastic computing resources for you to run code in a reliable manner. For more information, see What is Function Compute?

What can Function Compute be used for?

  • You can use functions to process business messages and the Function Compute platform to develop and run the processing logic of business messages. You can also use Function Compute to process orders and execute tasks.

  • You can use functions to quickly process messages and perform data cleansing based on extract, transform, and load (ETL).

  • You can use Function Compute to dump messages to other downstream systems in a specific virtual private cloud (VPC).

  • You can use functions to connect a messaging system and another Alibaba Cloud service to deliver messages to the service.

Create a sink connector

  1. Log on to the ApsaraMQ for Kafka console. In the Resource Distribution section of the Overview page, select the region where the ApsaraMQ for Kafka instance that you want to manage resides.

  2. In the left-side navigation pane, choose Connector Ecosystem Integration > Tasks.

  3. On the Tasks page, click Create Task.

  4. On the Create Task page, configure the Task Name and Description parameters and follow the on-screen instructions to configure other parameters. Then, click Save. The following section describes the parameters:

    • Task Creation

      1. In the Source step, set the Data Provider parameter to ApsaraMQ for Kafka and follow the on-screen instructions to configure other parameters. Then, click Next Step. The following table describes the parameters.

        Parameter

        Description

        Example

        Region

        The region where the ApsaraMQ for Kafka instance resides.

        China (Beijing)

        ApsaraMQ for Kafka Instance

        The ApsaraMQ for Kafka instance in which the messages that you want to route are produced.

        MQ_INST_115964845466****_ByBeUp3p

        Topic

        The topic on the ApsaraMQ for Kafka instance in which the messages that you want to route are produced.

        topic

        Group ID

        The name of the consumer group on the ApsaraMQ for Kafka instance. You must use a separate consumer group to create the message routing source. Do not use a consumer group that is in use. Otherwise, existing messages may fail to be sent and received.

        GID_http_1

        Consumer Offset

        The offset from which messages are consumed.

        Latest Offset

        Network Configuration

        The type of the network over which you want to route messages.

        Basic Network

        VPC

        The ID of the virtual private cloud (VPC) in which the ApsaraMQ for Kafka instance is deployed. This parameter is required only if you set the Network Configuration parameter to Self-managed Internet.

        vpc-bp17fapfdj0dwzjkd****

        vSwitch

        The ID of the vSwitch with which the ApsaraMQ for Kafka instance is associated. This parameter is required only if you set the Network Configuration parameter to Self-managed Internet.

        vsw-bp1gbjhj53hdjdkg****

        Security Group

        The security group to which the ApsaraMQ for Kafka instance belongs. This parameter is required only if you set the Network Configuration parameter to Self-managed Internet.

        alikafka_pre-cn-7mz2****

        Messages

        The maximum number of messages that can be sent in each function invocation. Requests are sent only when the number of messages in the backlog reaches the specified value. Valid values: 1 to 10000.

        100

        Interval (Unit: Seconds)

        The time interval at which the function is invoked. The system sends the aggregated messages to Function Compute at the specified time interval. Valid values: 0 to 15. Unit: seconds. The value 0 specifies that messages are sent immediately after aggregation.

        3

      2. In the Filtering step, define a data pattern in the Pattern Content code editor to filter data. For more information, see Event patterns.

      3. In the Transformation step, specify a data cleansing method to implement data splitting, mapping, enrichment, and routing capabilities. For more information, see Use Function Compute to perform message cleansing.

      4. In the Sink step, set the Service Type parameter to Function Compute and follow the on-screen instructions to configure other parameters. The following table describes the parameters.

        Parameter

        Description

        Example

        Function

        The Function Compute function that you created.

        test

        Version and Alias

        The service version or alias. Valid values:

        • Specified Version

        • Specified Alias

        Specified Version

        Version

        The version of the function. We recommend that you select the latest version. This parameter is required only if you set the Service Version and Alias parameter to Specified Version.

        LATEST

        Alias

        The alias of the function. This parameter is required only if you set the Service Version and Alias parameter to Specified Alias.

        test

        Invocation Mode

        The invocation mode. Valid values: Synchronous and Asynchronous.

        Asynchronous

        Event Format

        • Object: Events are delivered to the downstream function in objects.

        • ObjectList: Events are delivered to the downstream function in arrays.

        Object

        Event

        EventBridge extracts data from a message by using JSONPath and routes the specified content of the message to the event target.

        • Complete Data

        • Data Extraction

        • Fixed Value

        • Template

        Complete Data

    • Task Property

      Configure the retry policy that you want to use when events fail to be pushed and the method that you want to use to handle faults. For more information, see Retry policies and dead-letter queues.

  5. Go back to the Tasks page, find the Function Compute sink connector that you created, and then click Enable in the Actions column.

  6. In the Note message, click OK.

    The sink connector requires 30 to 60 seconds to be enabled. You can view the progress in the Status column on the Tasks page.

Other operations

On the Tasks page, find the message outflow task that you want to manage and perform other operations in the Actions column. The following items describe the operations that you can perform:

  • View the task details: Click Details in the Actions column. On the page that appears, view the basic information, properties, and monitoring metrics of the task.

  • Change the task configurations: Click Edit in the Actions column. In the Edit Task panel, change the configurations of the task.

  • Enable or disable the task: Click Enable or Pause in the Actions column. In the Note message, click OK.

  • Delete the task: Click Delete in the Actions column. In the Note message, click OK.