You can query messages by offset or by the point in time in the Message Queue for Apache Kafka console. If the query results cannot meet your expectations, you can use the message retrieval feature that is provided by Message Queue for Apache Kafka. This feature allows you to retrieve messages by topic partition, offset range, time range, and keyword of message keys and values. This topic describes how to enable the message retrieval feature and configure search conditions for a message retrieval task. This topic also describes how to suspend, resume, and delete a message retrieval task.

Prerequisites

A topic is created as the data source in your Message Queue for Apache Kafka instance. For more information, see Step 1: Create a topic.

Background information

  • The connector feature provided by Message Queue for Apache Kafka and Tablestore are required to use the message retrieval feature. Message Queue for Apache Kafka uses connectors to send the messages in topics to tables in Tablestore. The indexing feature provided by Tablestore provides the message retrieval capabilities.

    When you enable message retrieval, a connector that is used to synchronize data from Message Queue for Apache Kafka to Tablestore is automatically created. The name of the connector is in the following format: ots-ms-{Topic name}-{Six random characters}. You can view and manage this connector on the Message Retrieval page rather than on the Connectors page.

  • The first time you enable message retrieval, Message Queue for Apache Kafka automatically activates Tablestore and creates a Tablestore instance and a table. Message Queue for Apache Kafka automatically creates a table in a Tablestore instance for each topic that has message retrieval enabled. The automatically created Tablestore instance and table use the following name formats:
    • Instance name: kfk-{Last 12 characters of the name of the Message Queue for Apache Kafka instance to which the topic belongs}
    • Table name: {Topic name}:kafka_topic_{Topic name}_{Six random characters}
  • When you enable message retrieval for a topic, Message Queue for Apache Kafka automatically creates four topics and two groups in the corresponding Message Queue for Apache Kafka instance to record the configurations and status of the connector. These topics and groups use the following name formats:
    • Topic that records the consumer offsets for the connector: connect-offset-{Connector name}
    • Topic that records the configurations of the connector: connect-config-{Connector name}
    • Topic that records the status of the connector: connect-status-{Connector name}
    • Topic that records dead-letter queues and error data: connect-error-{Connector name}
    • Consumer groups for the connector: connect-{Connector name} and connect-cluster-{Connector name}

    For more information, see Create a Tablestore sink connector.

Billing

The message retrieval feature of Message Queue for Apache Kafka is in public preview. This feature is independent of Message Queue for Apache Kafka instances. You are not charged on the Message Queue for Apache Kafka side when you use the message retrieval feature. During public preview, you are also not charged for Tablestore instances and tables that are automatically created. Alibaba Cloud does not provide a service level agreement (SLA) for the message retrieval feature. For information about SLAs and pricing of other services that are required to use the message retrieval feature, see the documentation of the related services.

Precautions

  • The first time you enable message retrieval in a Message Queue for Apache Kafka instance, a Tablestore instance can be created only in the same region as the Message Queue for Apache Kafka instance. The message retrieval feature is available in multiple regions. For more information, see Supported regions.
  • The first time you enable message retrieval, Message Queue for Apache Kafka automatically creates a service-linked role that is named AliyunServiceRoleForAlikafkaConnector to allow you to use the connector feature. If this service-linked role is already created, Message Queue for Apache Kafka does not create the role. For more information, see Service-linked roles.
  • By default, you can enable message retrieval for up to three topics in the same Message Queue for Apache Kafka instance at the same time. A maximum of 10 message retrieval results can be displayed for each topic.
  • The Message Queue for Apache Kafka console can display a maximum of 1 KB of content for each retrieved message. If a retrieved message exceeds 1 KB in size, the excess content of the message is omitted. If you need to view the complete message, download the message.
  • Each field in Tablestore tables can store a maximum of 2 MB data. Messages that exceed 2 MB in size cannot be synchronized to Tablestore. Therefore, these messages cannot be retrieved on the Message Retrieval page in the Message Queue for Apache Kafka console.
  • For messages that are synchronized from a Message Queue for Apache Kafka instance to Tablestore, the retention period in Tablestore is the same as the specified message retention period in the Message Queue for Apache Kafka instance. When messages are retained for a longer period of time than the specified message retention period, the related indexes are automatically cleared and removed from Tablestore. For information about the configuration and description of a message retention period in Message Queue for Apache Kafka, see Modify the message configuration.

    The data expiration policy in Tablestore may be different from that in Message Queue for Apache Kafka. For each message retrieval task, the retrieved messages may be different from the messages that meet the specified search conditions.

Enable message retrieval

After you enable message retrieval for a topic in a Message Queue for Apache Kafka instance, you can retrieve messages in the topic based on your business requirements.

  1. Log on to the Message Queue for Apache Kafka console.
  2. In the Resource Distribution section of the Overview page, select the region where your instance resides.
  3. In the left-side navigation pane, click Message Retrieval.
  4. On the Message Retrieval page, select the instance to which the topic for which you want to enable message retrieval belongs from the Select Instance drop-down list, and then click Enable Message Retrieval.
  5. In the Enable Message Retrieval panel, configure the parameters, and then click OK.
    Note The first time you enable message retrieval in the instance, the You have not enabled the connector feature for the instance. message appears after you click Enable Message Retrieval. Click OK in the message, and then configure the parameters in the Enable Message Retrieval panel.
    Table 1. Parameter description
    Parameter Description Example
    Data Source Topic The topic for which you want to enable message retrieval. test
    Consumer Offset The offset from which consumption starts. Default value: Latest Offset. Valid values:
    • Earliest Offset: Consumption starts from the earliest offset.
    • Latest Offset: Consumption starts from the latest offset.
    Latest Offset
    Record Time The time when each retrieved message is recorded. Default value: Instant Time. Valid values:
    • Native Time: the time when the message was created that is recorded in Message Queue for Apache Kafka. This time refers to the timestamp that is recorded by the producer when the message is sent or the value of the timestamp field that you specify for the ProducerRecord object.
    • Instant Time: the time when the message is synchronized to Tablestore.

      If this value is used, for each retrieved message, the time when the message is synchronized to Tablestore instead of the time when the message is produced is used as the message time. Therefore, the message time in the search results may be different from the time when the message is produced.

    Native Time
    On the Message Retrieval page, you can view the topic for which you enabled the message retrieval feature.

Send messages for testing

After you enable message retrieval for a topic in a Message Queue for Apache Kafka instance, you can send messages to the topic to start the connector for data synchronization and test whether the message retrieval task is created.

  1. Log on to the Message Queue for Apache Kafka console.
  2. In the Resource Distribution section of the Overview page, select the region where your instance resides.
  3. In the left-side navigation pane, click Message Retrieval.
  4. On the Message Retrieval page, find the topic to which you want to send a test message and perform one of the following operations based on the status of the message retrieval task:
    • If the task is in a state other than Running and Suspended, click Test in the Actions column.
    • If the task is in the Running or Suspended state, choose More > Test in the Actions column.
  5. In the Start to Send and Consume Message panel, send a test message.
    1. In the Message Key field, enter the key of the test message, such as demo.
    2. In the Message Content field, enter the content of the test message, such as {"key": "test"}.
    3. Configure the Send to Specified Partition parameter to specify whether to send the test message to a specific partition.
      • If you want to send the test message to a specific partition, click Yes and enter the partition ID, such as 0, in the Partition ID field. For information about how to query partition IDs, see View partition status.
      • If you do not want to send the test message to a specific partition, click No.

Retrieve messages

  1. Log on to the Message Queue for Apache Kafka console.
  2. In the Resource Distribution section of the Overview page, select the region where your instance resides.
  3. In the left-side navigation pane, click Message Retrieval.
  4. On the Message Retrieval page, find the topic from which you want to retrieve messages, and click Search in the Actions column.
  5. In the Search panel, set search conditions. Select a search condition from the search condition drop-down list, click Add Search Condition, specify search information in the Value column, and then click OK.
    Search condition Description
    Partition The partition from which you want to retrieve messages.
    Offset Range The offset range of the messages that you want to retrieve.
    Key The key or content of the messages that you want to retrieve.

    When you specify a search keyword, follow these rules:

    • Phrase matching is used as the search pattern. For example, if the message key is "Message Queue for Apache Kafka is a distributed, high-throughput, and scalable message queue service that is provided by Alibaba Cloud", you can set the search keyword to "distributed" or a combination of "Alibaba Cloud" and "distributed".
    • If the search keyword contains an asterisk (*) or a question mark (?), the wildcard matching pattern is used. The asterisk (*) specifies an arbitrary character string. The question mark (?) specifies an arbitrary character. Letters are not case-sensitive. For example, if the message content is "AliKafkaTest001qaz", you can set the search keyword to "AliKafkaTe*".

      To improve retrieval efficiency, we recommend that you do not use keywords that start with a wildcard character. Make sure that strings that contain a wildcard character do not exceed 20 characters in length.

    For information about the usage and limits of tokenization, phrase matching, and wildcard matching, see Tokenization, Match phrase query, and Wildcard query.

    Value
    Time Range The time range during which you want to retrieve messages. You can select a predefined time range within the last three days or customize a time range. The time range that you can specify is accurate only to the minute at most.
    Actions Click the Delete icon icon to delete this search condition.

    To delete all search conditions, click Clear All above the search condition list.

    Note If multiple search conditions are specified, messages that meet all the search conditions are retrieved.
    On the Topic Search page, view the retrieved messages.
    Table 2. Parameters included in the retrieval results and descriptions
    Parameter Description
    Partition The partition from which the message is retrieved.
    Offset The offset of the message.
    Key The key of the message. The key is converted to a string.
    Value The value of the message. The value is converted to a string to indicate the content of the message.
    Created At The time when the message was created or when the message was synchronized to Tablestore.

    The time when the message was created refers to the timestamp that is recorded by the producer when the message is sent or the value of the timestamp field that you specify for the ProducerRecord object.

    Note
    • If a value is specified for the timestamp field, this value is displayed.
    • If no value is specified for the timestamp field, the system time when the message is sent is displayed.
    • A value in the format of 1970/x/x x:x:x indicates that the timestamp field is set to 0 or an invalid value.
    Actions
    • Click Download Key to download the key of the message.
    • Click Download Value to download the content of the message.
    Notice
    • The Message Queue for Apache Kafka console can display a maximum of 1 KB of content for each retrieved message. If a retrieved message exceeds 1 KB in size, the excess content of the message is omitted. If you need to view the complete message, download the message.
    • This operation is supported only by Message Queue for Apache Kafka instances of the Professional Edition.
    • You can download up to 10 MB of messages at a time. If the total size of the retrieved messages exceeds 10 MB, only the first 10 MB of message content can be downloaded.

View the details of a message retrieval task

After you enable message retrieval for a topic, a message retrieval task is automatically created. You can view the details of the task, such as the topics and groups that are automatically created in Message Queue for Apache Kafka and the names of the instance and the table that are automatically created in Tablestore. You can also navigate to the details page of the Tablestore table from the Task Details page.

  1. Log on to the Message Queue for Apache Kafka console.
  2. In the Resource Distribution section of the Overview page, select the region where your instance resides.
  3. In the left-side navigation pane, click Message Retrieval.
  4. On the Message Retrieval page, find the topic for which you want to view the details of the message retrieval task, and click Details in the Actions column.
    On the Task Details page, you can view the details about the message retrieval task that is created for the topic. You can also click Tablestore next to Destination Service in the Basic Information section to go to the table details page.

View consumption details

After you enable message retrieval for a topic, you can view the consumption progress of the online groups that subscribe to the topic in each partition of the topic. This helps you obtain information about the consumption and accumulation of messages.

  1. Log on to the Message Queue for Apache Kafka console.
  2. In the Resource Distribution section of the Overview page, select the region where your instance resides.
  3. In the left-side navigation pane, click Message Retrieval.
  4. On the Message Retrieval page, find the topic for which you want to view the consumption progress and click Consumption Progress in the Actions column.
    On the Consumption Details page, you can view the consumption status of the groups that subscribe to the topic in each partition of the topic.

Suspend a message retrieval task

If you want to temporarily stop a running message retrieval task for a topic, you can suspend the message retrieval task.

  1. Log on to the Message Queue for Apache Kafka console.
  2. In the Resource Distribution section of the Overview page, select the region where your instance resides.
  3. In the left-side navigation pane, click Message Retrieval.
  4. On the Message Retrieval page, find the topic for which you want to suspend the message retrieval task, and choose More > Suspend in the Actions column.
  5. In the message that appears, click OK.

Resume a message retrieval task

You can resume a suspended message retrieval task based on your business requirements.

  1. Log on to the Message Queue for Apache Kafka console.
  2. In the Resource Distribution section of the Overview page, select the region where your instance resides.
  3. In the left-side navigation pane, click Message Retrieval.
  4. On the Message Retrieval page, find the topic for which you want to resume the message retrieval task, and choose More > Enable in the Actions column.
  5. In the message that appears, click OK.

Delete a message retrieval task

After you delete a message retrieval task for a topic, the relevant Tablestore table and search index are also deleted. The topic no longer provides the message retrieval feature. If you want to use the message retrieval feature for the topic again, re-create a message retrieval task and wait for data synchronization to complete.

  1. Log on to the Message Queue for Apache Kafka console.
  2. In the Resource Distribution section of the Overview page, select the region where your instance resides.
  3. In the left-side navigation pane, click Message Retrieval.
  4. On the Message Retrieval page, find the topic for which you want to delete the message retrieval task and perform one of the following operations based on the task status:
    • If the task is in a state other than Running and Suspended, click Delete in the Actions column.
    • If the task is in the Running or Suspended state, choose More > Delete in the Actions column.
  5. In the message that appears, click OK.
    On the Message Retrieval page, you cannot view the topic for which the message retrieval task is deleted.