This topic describes how to create an OSS sink connector to synchronize data from a topic in a ApsaraMQ for Kafka instance to Object Storage Service (OSS).
Prerequisites
The following requirements are met:
The connector feature is enabled for your ApsaraMQ for Kafka instance. For more information, see Enable the connector feature.
A topic is created in the ApsaraMQ for Kafka instance. For more information, see Step 1: Create a topic.
An OSS bucket is created in the OSS console. For more information, see Create buckets.
Function Compute is activated. For more information, see Create a function in the Function Compute console.
Precautions
When you synchronize data from a topic in a ApsaraMQ for Kafka instance to an OSS bucket, make sure that the Message Queue for Apache Kafka instance and OSS bucket reside in the same region and that Function Compute is available in the specified region. Message Queue for Apache Kafka synchronizes your data to Function Compute first, which then synchronizes the data to OSS. For more information about the limits on connectors, see Limits.
OSS sink connectors export data by using Function Compute. Function Compute provides a certain amount of resources for free. When you use up this free quota, you are charged for the Function Compute resources that you use based on the billing rules. For more information, see Billing overview.
Function Compute allows you to query the logs of function calls. For more information, see Configure logging.
ApsaraMQ for Kafka serializes messages into UTF-8-encoded strings for transfer. Message Queue for Apache Kafka does not support binary data.
Create and deploy an OSS sink connector
Log on to the ApsaraMQ for Kafka console.
In the Resource Distribution section of the Overview page, select the region where the ApsaraMQ for Kafka instance that you want to manage resides.
In the left-side navigation pane, click Connectors.
On the Connectors page, select the instance to which the connector belongs from the Select Instance drop-down list and click Create Connector.
In the Create Connector wizard, perform the following steps:
In the Configure Basic Information step, configure parameters and click Next. The following table describes the parameters.
ImportantBy default, Authorize to Create Service Linked Role is selected. This means that ApsaraMQ for Kafka will create a service-linked role based on the following rules:
If no service-linked role is available, ApsaraMQ for Kafka automatically creates a service-linked role for you to use the OSS sink connector to synchronize data from ApsaraMQ for Kafka to OSS.
If a service-linked role is available, ApsaraMQ for Kafka does not create a new role.
For more information about service-linked roles, see Service-linked roles.
Parameter
Description
Example
Name
The name of the connector. Specify a connector name based on the following naming conventions:
The connector name must be 1 to 48 characters in length and can contain digits, lowercase letters, and hyphens (-). The name cannot start with a hyphen (-).
The name must be unique within a ApsaraMQ for Kafka instance.
The data synchronization task of the connector must use a consumer group that is named in the connect-Task name format. If you have not created such a consumer group, Message Queue for Apache Kafka automatically creates one for you.
kafka-oss-sink
Instance
The information about the Message Queue for Apache Kafka instance. By default, the name and ID of the instance are displayed.
demo alikafka_post-cn-st21p8vj****
In the Configure Source Service step, select Message Queue for Apache Kafka as the source service, configure parameters, and then click Next. The following table describes the parameters.
Parameter
Description
Example
Data Source Topic
The name of the topic from which data is to be synchronized.
oss-test-input
Consumer Thread Concurrency
The number of concurrent consumer threads that are used to synchronize data from the source topic. Default value: 6. Valid values:
1
2
3
6
12
6
Consumer Offset
The consumer offset from which you want message consumption to start. Valid values:
Earliest Offset: Message consumption starts from the earliest consumer offset.
Latest Offset: Message consumption starts from the latest consumer offset.
Earliest Offset
VPC ID
The ID of the VPC in which the data synchronization task runs. Click Configure Runtime Environment to display the parameter. By default, the VPC ID that you specified when you deployed the source ApsaraMQ for Kafka instance is displayed. You do not need to configure this parameter.
vpc-bp1xpdnd3l***
vSwitch ID
The ID of the vSwitch to which the source instance is connected. Click Configure Runtime Environment to display the parameter. The vSwitch must be deployed in the same VPC as the source ApsaraMQ for Kafka instance. By default, the vSwitch ID that you specified when you deployed the source ApsaraMQ for Kafka instance is displayed.
vsw-bp1d2jgg81***
Failure Handling Policy
Specifies whether to retain the subscription to the partition in which an error occurs after the message fails to be sent. Click Configure Runtime Environment to display the parameter. Valid values:
Continue Subscription: retains the subscription to the partition in which an error occurs. A log entry is generated for the error.
Stop Subscription: stops the subscription to the partition in which an error occurs. A log entry is generated for the error.
NoteFor more information, see Manage a connector.
For information about how to troubleshoot errors based on error codes, see Error codes.
Continue Subscription
Resource Creation Method
The method to create the topic and group that are required by the OSS sink connector. Click Configure Runtime Environment to display the parameter.
Auto
Manual
Auto
Connector Consumer Group
The name of the consumer group that is required by the OSS sink connector. Click Configure Runtime Environment to display the parameter. We recommend that you start the name with connect-cluster.
connect-cluster-kafka-oss-sink
Task Offset Topic
The topic that is used to store consumer offsets. Click Configure Runtime Environment to display the parameter.
Topic: We recommend that you start the topic name with connect-offset.
Partitions: The number of partitions in the topic must be greater than 1.
Storage Engine: You must set the storage engine of the topic to Local Storage.
NoteYou can set the storage engine to Local Storage only for Professional Edition Message Queue for Apache Kafka instances when you create the topic.
cleanup.policy: You must set the log cleanup policy for the topic to Compact.
connect-offset-kafka-oss-sink
Task Configuration Topic
The topic that is used to store the task configurations. Click Configure Runtime Environment to display the parameter.
Topic: We recommend that you start the topic name with connect-config.
Partitions: The topic can contain only one partition.
Storage Engine: You must set the storage engine of the topic to Local Storage.
NoteYou can set the storage engine to Local Storage only for Professional Edition Message Queue for Apache Kafka instances when you create the topic.
cleanup.policy: You must set the log cleanup policy for the topic to Compact.
connect-config-kafka-oss-sink
Task Status Topic
The topic that is used to store the task status. Click Configure Runtime Environment to display the parameter.
Topic: We recommend that you start the topic name with connect-status.
Partitions: We recommend that you set the number of partitions in the topic to 6.
Storage Engine: You must set the storage engine of the topic to Local Storage.
NoteYou can set the storage engine to Local Storage only for Professional Edition Message Queue for Apache Kafka instances when you create the topic.
cleanup.policy: You must set the log cleanup policy for the topic to Compact.
connect-status-kafka-oss-sink
Dead-letter Queue Topic
The topic that is used to store the error data of the Kafka Connect framework. Click Configure Runtime Environment to display the parameter. To save topic resources, you can create a topic and use the topic as both the dead-letter queue topic and the error data topic.
Topic: We recommend that you start the topic name with connect-error.
Partitions: We recommend that you set the number of partitions in the topic to 6.
Storage Engine: You can set the storage engine of the topic to Local Storage or Cloud Storage.
NoteYou can set the storage engine to Local Storage only for Professional Edition Message Queue for Apache Kafka instances when you create the topic.
connect-error-kafka-oss-sink
Error Data Topic
The topic that is used to store the error data of the OSS sink connector. Click Configure Runtime Environment to display the parameter. To save topic resources, you can create a topic and use the topic as both the dead-letter queue topic and the error data topic.
Topic: We recommend that you start the topic name with connect-error.
Partitions: We recommend that you set the number of partitions in the topic to 6.
Storage Engine: You can set the storage engine of the topic to Local Storage or Cloud Storage.
NoteYou can set the storage engine to Local Storage only for Professional Edition Message Queue for Apache Kafka instances when you create the topic.
connect-error-kafka-oss-sink
In the Configure Destination Service step, select Object Storage Service as the destination service, set the parameters, and then click Create. The following table describes the parameters.
Parameter
Description
Example
Bucket Name
The name of the OSS bucket to which the data is to be synchronized.
bucket_test
AccessKey ID
The AccessKey ID of your Alibaba Cloud account.
yourAccessKeyID
AccessKey Secret
The AccessKey secret of your Alibaba Cloud account.
yourAccessKeySecret
Make sure that your Alibaba Cloud account is granted the following permissions according to the principle of least privilege:
{ "Version": "1", "Statement": [ { "Action": [ "oss:GetObject", "oss:PutObject" ], "Resource": "*", "Effect": "Allow" } ] }
NoteThe AccessKey ID and AccessKey secret are passed to OSS as environment variables when the data synchronization task is created. After the task is created, ApsaraMQ for Kafka does not store the AccessKey ID or AccessKey secret of your Alibaba Cloud account.
After the connector is created, you can view the connector on the Connectors page.
Go to the Connectors page, find the connector that you created, and then click Deploy in the Actions column.
Send messages
After you deploy the OSS sink connector, send a message to the source topic in the ApsaraMQ for Kafka instance to test whether the message can be synchronized to OSS.
On the Connectors page, find the connector that you want to manage and click Test in the Actions column.
In the Send Message panel, configure the parameters to send a message for testing.
If you set the Sending Method parameter to Console, perform the following steps:
In the Message Key field, enter the message key. Example: demo.
In the Message Content field, enter the message content. Example: {"key": "test"}.
Configure the Send to Specified Partition parameter to specify whether to send the test message to a specific partition.
If you want to send the test message to a specific partition, click Yes and enter the partition ID in the Partition ID field. Example: 0. For information about how to query partition IDs, see View partition status.
If you do not want to send the test message to a specific partition, click No.
If you set the Sending Method parameter to Docker, run the Docker command in the Run the Docker container to produce a sample message section to send the test message.
If you set the Sending Method parameter to SDK, select an SDK for the required programming language or framework and an access method to send and subscribe to the test message.
Verify the results
After you send a test message to the source topic in the ApsaraMQ for Kafka instance, you can check whether the message is synchronized to OSS on the Files page of the specified OSS bucket in the OSS console. For more information, see Overview.
If new objects are generated in the OSS bucket, the data is synchronized to OSS.
The data that is synchronized from ApsaraMQ for Kafka to OSS is in the following format:
[
{
"key":"123",
"offset":4,
"overflowFlag":true,
"partition":0,
"timestamp":1603779578478,
"topic":"Test",
"value":"1",
"valueSize":272687
}
]
Related operations
You can configure the Function Compute resources that are required by the OSS sink connector based on your requirements.
On the Connectors page, find the connector that you created, click More in the Actions column, and then select .
You are redirected to the Function Compute console, where you can configure the resources as required.