All Products
Search
Document Center

ApsaraMQ for Kafka:Create an OSS sink connector

Last Updated:Mar 27, 2025

This topic describes how to create an OSS sink connector to synchronize data from a topic in a ApsaraMQ for Kafka instance to Object Storage Service (OSS).

Prerequisites

The following requirements are met:

Precautions

  • When you synchronize data from a topic in a ApsaraMQ for Kafka instance to an OSS bucket, make sure that the Message Queue for Apache Kafka instance and OSS bucket reside in the same region and that Function Compute is available in the specified region. Message Queue for Apache Kafka synchronizes your data to Function Compute first, which then synchronizes the data to OSS. For more information about the limits on connectors, see Limits.

  • OSS sink connectors export data by using Function Compute. Function Compute provides a certain amount of resources for free. When you use up this free quota, you are charged for the Function Compute resources that you use based on the billing rules. For more information, see Billing overview.

  • Function Compute allows you to query the logs of function calls. For more information, see Configure logging.

  • ApsaraMQ for Kafka serializes messages into UTF-8-encoded strings for transfer. Message Queue for Apache Kafka does not support binary data.

Create and deploy an OSS sink connector

  1. Log on to the ApsaraMQ for Kafka console.

  2. In the Resource Distribution section of the Overview page, select the region where the ApsaraMQ for Kafka instance that you want to manage resides.

  3. In the left-side navigation pane, click Connectors.

  4. On the Connectors page, select the instance to which the connector belongs from the Select Instance drop-down list and click Create Connector.

  5. In the Create Connector wizard, perform the following steps:

    1. In the Configure Basic Information step, configure parameters and click Next. The following table describes the parameters.

      Important

      By default, Authorize to Create Service Linked Role is selected. This means that ApsaraMQ for Kafka will create a service-linked role based on the following rules:

      • If no service-linked role is available, ApsaraMQ for Kafka automatically creates a service-linked role for you to use the OSS sink connector to synchronize data from ApsaraMQ for Kafka to OSS.

      • If a service-linked role is available, ApsaraMQ for Kafka does not create a new role.

      For more information about service-linked roles, see Service-linked roles.

      Parameter

      Description

      Example

      Name

      The name of the connector. Specify a connector name based on the following naming conventions:

      • The connector name must be 1 to 48 characters in length and can contain digits, lowercase letters, and hyphens (-). The name cannot start with a hyphen (-).

      • The name must be unique within a ApsaraMQ for Kafka instance.

      The data synchronization task of the connector must use a consumer group that is named in the connect-Task name format. If you have not created such a consumer group, Message Queue for Apache Kafka automatically creates one for you.

      kafka-oss-sink

      Instance

      The information about the Message Queue for Apache Kafka instance. By default, the name and ID of the instance are displayed.

      demo alikafka_post-cn-st21p8vj****

    2. In the Configure Source Service step, select Message Queue for Apache Kafka as the source service, configure parameters, and then click Next. The following table describes the parameters.

      Parameter

      Description

      Example

      Data Source Topic

      The name of the topic from which data is to be synchronized.

      oss-test-input

      Consumer Thread Concurrency

      The number of concurrent consumer threads that are used to synchronize data from the source topic. Default value: 6. Valid values:

      • 1

      • 2

      • 3

      • 6

      • 12

      6

      Consumer Offset

      The consumer offset from which you want message consumption to start. Valid values:

      • Earliest Offset: Message consumption starts from the earliest consumer offset.

      • Latest Offset: Message consumption starts from the latest consumer offset.

      Earliest Offset

      VPC ID

      The ID of the VPC in which the data synchronization task runs. Click Configure Runtime Environment to display the parameter. By default, the VPC ID that you specified when you deployed the source ApsaraMQ for Kafka instance is displayed. You do not need to configure this parameter.

      vpc-bp1xpdnd3l***

      vSwitch ID

      The ID of the vSwitch to which the source instance is connected. Click Configure Runtime Environment to display the parameter. The vSwitch must be deployed in the same VPC as the source ApsaraMQ for Kafka instance. By default, the vSwitch ID that you specified when you deployed the source ApsaraMQ for Kafka instance is displayed.

      vsw-bp1d2jgg81***

      Failure Handling Policy

      Specifies whether to retain the subscription to the partition in which an error occurs after the message fails to be sent. Click Configure Runtime Environment to display the parameter. Valid values:

      • Continue Subscription: retains the subscription to the partition in which an error occurs. A log entry is generated for the error.

      • Stop Subscription: stops the subscription to the partition in which an error occurs. A log entry is generated for the error.

      Note

      Continue Subscription

      Resource Creation Method

      The method to create the topic and group that are required by the OSS sink connector. Click Configure Runtime Environment to display the parameter.

      • Auto

      • Manual

      Auto

      Connector Consumer Group

      The name of the consumer group that is required by the OSS sink connector. Click Configure Runtime Environment to display the parameter. We recommend that you start the name with connect-cluster.

      connect-cluster-kafka-oss-sink

      Task Offset Topic

      The topic that is used to store consumer offsets. Click Configure Runtime Environment to display the parameter.

      • Topic: We recommend that you start the topic name with connect-offset.

      • Partitions: The number of partitions in the topic must be greater than 1.

      • Storage Engine: You must set the storage engine of the topic to Local Storage.

        Note

        You can set the storage engine to Local Storage only for Professional Edition Message Queue for Apache Kafka instances when you create the topic.

      • cleanup.policy: You must set the log cleanup policy for the topic to Compact.

      connect-offset-kafka-oss-sink

      Task Configuration Topic

      The topic that is used to store the task configurations. Click Configure Runtime Environment to display the parameter.

      • Topic: We recommend that you start the topic name with connect-config.

      • Partitions: The topic can contain only one partition.

      • Storage Engine: You must set the storage engine of the topic to Local Storage.

        Note

        You can set the storage engine to Local Storage only for Professional Edition Message Queue for Apache Kafka instances when you create the topic.

      • cleanup.policy: You must set the log cleanup policy for the topic to Compact.

      connect-config-kafka-oss-sink

      Task Status Topic

      The topic that is used to store the task status. Click Configure Runtime Environment to display the parameter.

      • Topic: We recommend that you start the topic name with connect-status.

      • Partitions: We recommend that you set the number of partitions in the topic to 6.

      • Storage Engine: You must set the storage engine of the topic to Local Storage.

        Note

        You can set the storage engine to Local Storage only for Professional Edition Message Queue for Apache Kafka instances when you create the topic.

      • cleanup.policy: You must set the log cleanup policy for the topic to Compact.

      connect-status-kafka-oss-sink

      Dead-letter Queue Topic

      The topic that is used to store the error data of the Kafka Connect framework. Click Configure Runtime Environment to display the parameter. To save topic resources, you can create a topic and use the topic as both the dead-letter queue topic and the error data topic.

      • Topic: We recommend that you start the topic name with connect-error.

      • Partitions: We recommend that you set the number of partitions in the topic to 6.

      • Storage Engine: You can set the storage engine of the topic to Local Storage or Cloud Storage.

        Note

        You can set the storage engine to Local Storage only for Professional Edition Message Queue for Apache Kafka instances when you create the topic.

      connect-error-kafka-oss-sink

      Error Data Topic

      The topic that is used to store the error data of the OSS sink connector. Click Configure Runtime Environment to display the parameter. To save topic resources, you can create a topic and use the topic as both the dead-letter queue topic and the error data topic.

      • Topic: We recommend that you start the topic name with connect-error.

      • Partitions: We recommend that you set the number of partitions in the topic to 6.

      • Storage Engine: You can set the storage engine of the topic to Local Storage or Cloud Storage.

        Note

        You can set the storage engine to Local Storage only for Professional Edition Message Queue for Apache Kafka instances when you create the topic.

      connect-error-kafka-oss-sink

    3. In the Configure Destination Service step, select Object Storage Service as the destination service, set the parameters, and then click Create. The following table describes the parameters.

      Parameter

      Description

      Example

      Bucket Name

      The name of the OSS bucket to which the data is to be synchronized.

      bucket_test

      AccessKey ID

      The AccessKey ID of your Alibaba Cloud account.

      yourAccessKeyID

      AccessKey Secret

      The AccessKey secret of your Alibaba Cloud account.

      yourAccessKeySecret

      Make sure that your Alibaba Cloud account is granted the following permissions according to the principle of least privilege:

      {
          "Version": "1",
          "Statement": [
              {
                  "Action": [
                      "oss:GetObject",
                      "oss:PutObject"
                  ],
                  "Resource": "*",
                  "Effect": "Allow"
              }
          ]
      }
      Note

      The AccessKey ID and AccessKey secret are passed to OSS as environment variables when the data synchronization task is created. After the task is created, ApsaraMQ for Kafka does not store the AccessKey ID or AccessKey secret of your Alibaba Cloud account.

      After the connector is created, you can view the connector on the Connectors page.

  6. Go to the Connectors page, find the connector that you created, and then click Deploy in the Actions column.

Send messages

After you deploy the OSS sink connector, send a message to the source topic in the ApsaraMQ for Kafka instance to test whether the message can be synchronized to OSS.

  1. On the Connectors page, find the connector that you want to manage and click Test in the Actions column.

  2. In the Send Message panel, configure the parameters to send a message for testing.

    • If you set the Sending Method parameter to Console, perform the following steps:

      1. In the Message Key field, enter the message key. Example: demo.

      2. In the Message Content field, enter the message content. Example: {"key": "test"}.

      3. Configure the Send to Specified Partition parameter to specify whether to send the test message to a specific partition.

        • If you want to send the test message to a specific partition, click Yes and enter the partition ID in the Partition ID field. Example: 0. For information about how to query partition IDs, see View partition status.

        • If you do not want to send the test message to a specific partition, click No.

    • If you set the Sending Method parameter to Docker, run the Docker command in the Run the Docker container to produce a sample message section to send the test message.

    • If you set the Sending Method parameter to SDK, select an SDK for the required programming language or framework and an access method to send and subscribe to the test message.

Verify the results

After you send a test message to the source topic in the ApsaraMQ for Kafka instance, you can check whether the message is synchronized to OSS on the Files page of the specified OSS bucket in the OSS console. For more information, see Overview.

If new objects are generated in the OSS bucket, the data is synchronized to OSS.

files

The data that is synchronized from ApsaraMQ for Kafka to OSS is in the following format:

[
    {
        "key":"123",
        "offset":4,
        "overflowFlag":true,
        "partition":0,
        "timestamp":1603779578478,
        "topic":"Test",
        "value":"1",
        "valueSize":272687
    }
]

Related operations

You can configure the Function Compute resources that are required by the OSS sink connector based on your requirements.

On the Connectors page, find the connector that you created, click More in the Actions column, and then select Configure Function.

You are redirected to the Function Compute console, where you can configure the resources as required.