This topic describes how to create an OSS sink connector to synchronize data from a topic in your Message Queue for Apache Kafka instance to Object Storage Service (OSS).

Prerequisites

Before you export data, make sure that the following operations are completed:

Usage notes

  • To synchronize data from Message Queue for Apache Kafka to OSS, the Message Queue for Apache Kafka instance that contains the data source topic and the destination OSS bucket must be in the same region. Message Queue for Apache Kafka first synchronizes the data to Function Compute. Then, Function Compute synchronizes the data to OSS. For more information about limits on connectors, see Limits.
  • OSS sink connectors are provided based on Function Compute. Function Compute provides you with a free quota. If your usage exceeds the free quota, you are charged for the excess based on the billing rules of Function Compute. For more information about the billing, see Billing.
  • Function Compute allows you to query the logs of function calls. For more information, see Configure Log Service resources and view function execution logs.
  • Message Queue for Apache Kafka serializes messages into UTF-8-encoded strings for message transfer. Message Queue for Apache Kafka does not support the BINARY data type.

Create and deploy an OSS sink connector

  1. Log on to the Message Queue for Apache Kafka console.
  2. In the Resource Distribution section of the Overview page, select the region where your instance resides.
  3. On the Instances page, click the name of the instance that you want to manage.
  4. In the left-side navigation pane, click Connectors.
  5. On the Connectors page, click Create Connector.
  6. In the Create Connector wizard, perform the following steps:
    1. In the Configure Basic Information step, set the parameters that are described in the following table, and click Next.
      Notice By default, Authorize to Create Service Linked Role is selected. This means that Message Queue for Apache Kafka will create a service-linked role based on the following rules:
      • If no service-linked role is created, Message Queue for Apache Kafka automatically creates a service-linked role for you to use the OSS sink connector to synchronize data from Message Queue for Apache Kafka to OSS.
      • If you have created a service-linked role, Message Queue for Apache Kafka does not create it again.
      For more information about service-linked roles, see Service-linked roles.
      Parameter Description Example
      Name The name of the connector. Take note of the following rules when you specify a connector name:
      • The connector name can be up to 48 characters in length. It can contain digits, lowercase letters, and hyphens (-), but cannot start with a hyphen (-).
      • The connector name must be unique within the Message Queue for Apache Kafka instance.

      The data synchronization task of the connector must use a consumer group that is named in the connect-Task name format. If you have not manually created such a consumer group, the system automatically creates a consumer group for you.

      kafka-oss-sink
      Instance The default configuration is the name and instance ID of the instance. demo alikafka_post-cn-st21p8vj****
    2. In the Configure Source Service step, select Message Queue for Apache Kafka for Data Source, set the parameters that are described in the following table, and then click Next.
      Parameter Description Example
      Data Source Topic The name of the topic from which data is to be synchronized. oss-test-input
      Consumer Thread Concurrency The number of concurrent consumer threads to synchronize data from the data source topic. Default value: 6. Valid values:
      • 6
      • 12
      6
      Consumer Offset The offset where consumption starts. Valid values:
      • Earliest Offset: Consumption starts from the earliest offset.
      • Latest Offset: Consumption starts from the latest offset.
      Earliest Offset
      VPC ID The ID of the virtual private cloud (VPC) where the data synchronization task runs. Click Configure Runtime Environment to display the parameter. The default value is the VPC ID that you specified when you deployed the Message Queue for Apache Kafka instance. You do not need to change the value. vpc-bp1xpdnd3l***
      vSwitch ID The ID of the vSwitch based on which the data synchronization task runs. Click Configure Runtime Environment to display the parameter. The vSwitch must be deployed in the same VPC as the Message Queue for Apache Kafka instance. The default value is the vSwitch ID that you specified when you deployed the Message Queue for Apache Kafka instance. vsw-bp1d2jgg81***
      Failure Handling Policy Specifies whether to retain the subscription to the partition where an error occurs after the message fails to be sent. Click Configure Runtime Environment to display the parameter. Valid values:
      • Continue Subscription: retains the subscription to the partition where an error occurs and prints the logs.
      • Stop Subscription: stops the subscription to the partition where an error occurs and prints the logs.
      Note
      • For more information about how to view the connector logs, see View connector logs.
      • For more information about how to troubleshoot errors based on error codes, see Error codes.
      • To resume the subscription to the partition where an error occurs, submit a ticket to the technical support of Message Queue for Apache Kafka .
      Continue Subscription
      Resource Creation Method The method of creating the topics and consumer group that are required by the OSS sink connector. Click Configure Runtime Environment to display the parameter.
      • Auto
      • Manual
      Auto
      Connector Consumer Group The name of the consumer group that is used by the connector. Click Configure Runtime Environment to display the parameter. We recommend that you start the name of this consumer group with connect-cluster. connect-cluster-kafka-oss-sink
      Task Offset Topic The topic that is used to store consumer offsets. Click Configure Runtime Environment to display the parameter.
      • Topic: the name of the topic. We recommend that you start the name with connect-offset.
      • Partitions: the number of partitions in the topic. Set the parameter to a value greater than 1.
      • Storage Engine: the storage engine of the topic. Set the value to Local Storage.
      • cleanup.policy: the log cleanup policy for the topic. Set the value to compact.
      connect-offset-kafka-oss-sink
      Task Configuration Topic The topic that is used to store task configurations. Click Configure Runtime Environment to display the parameter.
      • Topic: the name of the topic. We recommend that you start the name with connect-config.
      • Partitions: the number of partitions in the topic. Set the value to 1.
      • Storage Engine: the storage engine of the topic. Set the value to Local Storage.
      • cleanup.policy: the log cleanup policy for the topic. Set the value to compact.
      connect-config-kafka-oss-sink
      Task Status Topic The topic that is used to store task status. Click Configure Runtime Environment to display the parameter.
      • Topic: the name of the topic. We recommend that you start the name with connect-status.
      • Partitions: the number of partitions in the topic. We recommend that you set the value to 6.
      • Storage Engine: the storage engine of the topic. Set the value to Local Storage.
      • cleanup.policy: the log cleanup policy for the topic. Set the value to compact.
      connect-status-kafka-oss-sink
      Dead-letter Queue Topic The topic that is used to store the error data of the connector framework. Click Configure Runtime Environment to display the parameter. To save topic resources, you can create a topic as both the dead-letter queue topic and the error data topic.
      • Topic: the name of the topic. We recommend that you start the name with connect-error.
      • Partitions: the number of partitions in the topic. We recommend that you set the value to 6.
      • Storage Engine: the storage engine of the topic. Valid values: Local Storage and Cloud Storage.
      connect-error-kafka-oss-sink
      Error Data Topic The topic that is used to store the error data of the connector. Click Configure Runtime Environment to display the parameter. To save topic resources, you can create a topic as both the dead-letter queue topic and the error data topic.
      • Topic: the name of the topic. We recommend that you start the name with connect-error.
      • Partitions: the number of partitions in the topic. We recommend that you set the value to 6.
      • Storage Engine: the storage engine of the topic. Valid values: Local Storage and Cloud Storage.
      connect-error-kafka-oss-sink
    3. In the Configure Destination Service step, select OSS for Destination Service, set the parameters that are described in the following table, and then click Create.
      Parameter Description Example
      Bucket Name The name of the destination OSS bucket. bucket_test
      AccessKey ID The AccessKey ID of your Alibaba Cloud account. LTAI4GG2RGAjppjK********
      AccessKey Secret The AccessKey secret of your Alibaba Cloud account. WbGPVb5rrecVw3SQvEPw6R********

      Make sure that your Alibaba Cloud account is granted at least the following permissions:

      {
          "Version": "1",
          "Statement": [
              {
                  "Action": [
                      "oss:GetObject",
                      "oss:PutObject"
                  ],
                  "Resource": "*",
                  "Effect": "Allow"
              }
          ]
      }
      Note

      When Message Queue for Apache Kafka creates the data synchronization task, the AccessKey ID and AccessKey secret are passed to OSS as environment variables. After the task is created, Message Queue for Apache Kafka does not store the AccessKey ID or AccessKey secret of your Alibaba Cloud account.

      After the connector is created, you can view it on the Connectors page.
  7. After the connector is created, go to the Connectors page, find the connector that you created, and then click Deploy in the Actions column.

Send a test message

After you deploy the OSS sink connector, you can send a message to the data source topic in Message Queue for Apache Kafka to test whether the message can be synchronized to OSS.

  1. On the Connectors page, find the connector that you created, and click Test in the Actions column.
  2. In the Send Message panel, set the parameters or use the method as prompted to send a test message.
    • Set the Method of Sending parameter to Console.
      1. In the Message Key field, enter the key of the test message, such as demo.
      2. In the Message Content field, enter the content of the test message, such as {"key": "test"}.
      3. Set the Send to Specified Partition parameter to specify whether to send the test message to a specific partition.
        1. If you want to send the test message to a specific partition, click Yes and enter the partition ID in the Partition ID field, such as 0. For more information about how to query partition IDs, see View partition status.
        2. If you do not want to send the test message to a specific partition, click No.
    • Set the Method of Sending parameter to Docker and run the docker commands provided in the Run the Docker container to produce a sample message section to send the test message.
    • Set the Method of Sending parameter to SDK, select a programming language or a framework, and then select an access method to use the corresponding SDK to send the test message.

Verify the results

After you send a test message to the data source topic in Message Queue for Apache Kafka , you can check whether the message is synchronized to OSS on the Files page of the destination OSS bucket in the OSS console. For more information, see Overview.

If new objects are generated in the OSS bucket, the data is synchronized to OSS, as shown in the following figure.

files
The data that is synchronized from Message Queue for Apache Kafka to OSS is in the following format:
[
    {
        "key":"123",
        "offset":4,
        "overflowFlag":true,
        "partition":0,
        "timestamp":1603779578478,
        "topic":"Test",
        "value":"1",
        "valueSize":272687
    }
]

What to do next

You can configure the Function Compute resources that are required by the OSS sink connector based on actual needs.

On the Connectors page, find the connector that you created, click More in the Actions column, and then select Configure Function.
You are redirected to the Function Compute console, where you can configure the resources as required.