消息队列Kafka版控制台提供的按位点查询和按时间查询消息的功能无法满足您搜索消息的需求时,您可以使用消息队列Kafka版消息检索功能。消息检索支持按Topic分区、位点范围、时间范围以及消息Key和Value关键字检索。本文介绍如何开通消息检索、添加检索条件以及进行暂停、启用、删除管理操作。

前提条件

已为消息队列Kafka版实例创建数据源Topic。更多信息,请参见步骤一:创建Topic

背景信息

  • 消息队列Kafka版消息检索借助消息队列Kafka版的Connector功能及表格存储(Tablestore)实现,通过Connector对Topic中的消息进行转储,发送到表格存储中的数据表中,由表格存储索引功能提供消息检索的能力。

    开通消息检索相当于自动化创建了一个消息队列Kafka版同步数据至表格存储的Connector任务,名称格式为:ots-ms-{Topic名称}-{6位随机字符},该Connector任务在消息检索页面显示和管理,而不在Connector 任务列表页面显示和管理。

  • 首次开通消息检索后,消息队列Kafka版自动为您开通表格存储服务,并创建表格存储实例和对应的数据表。每个开通消息检索的Topic会在表格存储中对应创建一张数据表。自动创建的实例和数据表名称格式如下:
    • 实例名称:kfk-{消息队列Kafka版实例名称后12位字符}
    • 数据表表名:{Topic名称}:kafka_topic_{Topic名称}_{6位随机字符}
  • 每个开通消息检索的Topic会在消息队列Kafka版实例中自动创建4个Topic和2个Group,用于记录任务配置和任务状态。名称格式如下:
    • 任务位点Topic:connect-offset-{任务名称}
    • 任务配置Topic:connect-config-{任务名称}
    • 任务状态Topic:connect-status-{任务名称}
    • 死信队列Topic/异常数据Topic:connect-error-{任务名称}
    • Connector消费组:connect-{任务名称}、connect-cluster-{任务名称}

    更多信息,请参见创建Tablestore Sink Connector

计费说明

消息队列Kafka版消息检索处于公测阶段,且独立于消息队列Kafka版实例,因此不会在消息队列Kafka版侧产生费用,自动创建的表格存储实例和数据表在公测期也不产生费用。同时,阿里云不承诺消息检索的SLA,使用消息检索所依赖的其他产品的SLA和费用说明请以对应产品为准。

注意事项

  • 首次开通消息检索时,仅会自动开通同地域下的表格存储服务。当前消息检索功能已支持在多个地域使用。具体信息,请参见开服地域
  • 首次开通消息检索时,消息队列Kafka版会为您自动创建服务关联角色AliyunServiceRoleForAlikafkaConnector,以便使用Connector功能。如果已创建服务关联角色,消息队列Kafka版不会重复创建。关于服务关联角色的更多信息,请参见服务关联角色
  • 单个实例默认最多同时支持3个Topic的消息检索。每个Topic的消息检索结果最多显示10条。
  • 检索到的每条消息在消息队列Kafka版控制台上最多显示1 KB的内容,超过1 KB的部分将自动截断。如需查看完整的消息内容,请下载相应的消息。
  • 目前表格存储不支持单个字段大于2 MB,超过2 MB的消息不会被同步,因此超过该大小的消息也无法在消息队列Kafka版控制台的消息检索页面检索出来。
  • 表格存储中的数据保留时长与消息队列Kafka版实例消息保留时长具有相同的数据生命周期(TTL),当数据超过消息保留时长时,将会自动清除并移除相关索引。关于消息队列Kafka版消息保留时长相关配置和说明信息,请参见变更消息配置

    由于表格存储数据过期策略与消息队列Kafka版并不完全一致,故最终检索到的数据请以实际获取的为准。

开通消息检索

开通某个实例下Topic的消息检索功能,以便于您根据需要对其Topic中消息进行检索。

  1. 登录消息队列Kafka版控制台
  2. 概览页面的资源分布区域,选择地域。
  3. 在左侧导航栏,单击消息检索
  4. 消息检索页面,从选择实例的下拉列表选择需检索Topic消息所属的实例,然后单击开通消息检索
  5. 开通消息检索面板,填写开通参数,然后单击确定
    说明 如果您是首次在当前实例下开通消息检索功能,单击开通消息检索后,需在您尚未开通当前实例的 Connector 功能提示对话框,单击确认,然后再在开通消息检索面板填写参数。
    表 1. 开通参数说明
    参数 描述 示例值
    数据源 Topic 需要开通消息检索的Topic。 test
    消费初始位置 开始消费的位置。默认取值为最近位点。取值说明如下:
    • 最早位点:从最初位点开始消费。
    • 最近位点:从最新位点开始消费。
    最近位点
    记录时间 记录消息的时间,即检索消息时的时间范围。默认取值为即时时间。取值说明如下:
    • 原生时间消息队列Kafka版中记录的消息创建时间,即发送消息时,客户端自带的或是您指定的ProducerRecord中的消息创建时间。
    • 即时时间:数据同步至表格存储的时间。

      该值仅表示消息被同步至表格存储的时间,并不是消息的生产时间,因此搜索结果中的消息时间与消息的生产时间可能会不一致。

    原生时间
    消息检索页面您可以查看到刚开通搜索功能的Topic。

测试发送消息

开通消息检索后,您可以向消息队列Kafka版的数据源Topic发送消息,调度任务和测试消息检索是否创建成功。

  1. 登录消息队列Kafka版控制台
  2. 概览页面的资源分布区域,选择地域。
  3. 在左侧导航栏,单击消息检索
  4. 消息检索页面,找到需要测试的目标Topic,根据任务状态在对应位置操作。
    • 如果任务处于运行中已暂停状态以外的其他状态,则在其操作列,单击测试
    • 如果任务处于运行中已暂停状态,则在其操作列,选择更多 > 测试
  5. 快速体验消息收发面板,发送测试消息。
    1. 消息 Key文本框中输入消息的Key值,例如demo。
    2. 消息内容文本框输入测试的消息内容,例如 {"key": "test"}。
    3. 设置发送到指定分区,选择是否指定分区。
      • 单击,在分区 ID文本框中输入分区的ID,例如0。如果您需查询分区的ID,请参见查看分区状态
      • 单击,不指定分区。

搜索消息

  1. 登录消息队列Kafka版控制台
  2. 概览页面的资源分布区域,选择地域。
  3. 在左侧导航栏,单击消息检索
  4. 消息检索页面,找到目标Topic,在其操作列,单击搜索
  5. 搜索面板,设置搜索条件,在搜索项下拉列表中选择需要添加的搜索项,单击添加搜索项,添加搜索项并在列设置搜索信息,然后单击确定
    搜索项 说明
    分区 本次搜索Topic消息内容所在的分区。
    位点范围 本次搜索Topic消息内容的位点范围。
    Key 本次搜索Topic消息内容的消息Key或消息内容,即要匹配的Key或Value。

    检索关键词说明如下:

    • 搜索方式为短语匹配搜索。例如,消息Key是“消息队列Kafka版是阿里云提供的分布式、高吞吐、可扩展的消息队列服务。”,可以设置搜索关键词为“分布式”,或者“阿里云”和“分布式”组合。
    • 如果搜索关键字中包含星号(*)或问号(?),则采用通配符检索。星号(*)代表任意字符序列,问号(?)代表任意单个字符。英文字母不区分大小写。例如,消息内容是“AliKafkaTest001qaz”,可以设置关键词为“AliKafkaTe*”。

      不建议使用通配符作为起始关键字时,检索效率较低,且带有通配符的字符串长度不能超过20个字符。

    关于更多分词、短语匹配检索和通配符检索的使用和限制等详细内容,请参见分词短语匹配查询通配符查询

    Value
    时间范围 搜索Topic消息的时间范围。您可以设置近三天内的某个时间范围和自定义时间。 此处设置的搜索时间最小粒度为分钟。
    操作 单击删除:移除该搜索条件。

    如果需要移除全部条件,您可以单击搜索条件列表上方的移除所有条件

    说明 设置多个搜索条件时,取交集搜索。
    Topic搜索页面,展示检索的消息。
    表 2. 检索结果参数解释
    参数 描述
    分区 消息的Topic分区。
    位点 消息的所在的位点。
    Key 消息的键(已强制转化为String类型)。
    Value 消息的值(已强制转化为String类型),即消息的具体内容。
    消息创建时间 消息创建时间或消息同步至表格存储的时间。

    消息创建时间指发送消息时,客户端自带的或是您指定的ProducerRecord中的消息创建时间。

    说明
    • 如果配置了该字段,则按配置值显示。
    • 如果未配置该字段,则默认取消息发送时的系统时间。
    • 如果显示值为1970/x/x x:x:x,则说明发送时间配置为0或其他有误的值。
    操作
    • 单击下载 Key:下载消息的键值。
    • 单击下载 Value:下载消息的具体内容。
    注意
    • 查询到的每条消息在控制台上最多显示1 KB的内容,超过1 KB的部分将自动截断。如需查看完整的消息内容,请下载相应的消息。
    • 仅专业版支持下载消息。
    • 下载的消息最大为10 MB。如果消息超过10 MB,则只下载10 MB的内容。

查看消息检索任务详情

开通消息检索后,即可查看自动创建的Topic、Group、表格存储实例名称、表格存储数据表表名等详细信息,您也可以在详情中直接进入表格存储数据表。

  1. 登录消息队列Kafka版控制台
  2. 概览页面的资源分布区域,选择地域。
  3. 在左侧导航栏,单击消息检索
  4. 消息检索页面,找到目标Topic,在其操作列,单击详情
    在任务详情页面您可以查看到目标Topic相关消息检索的详细信息。您也可以在基础信息区域目标服务后单击表格存储,即可进入数据表详情页面查看。

查看消费详情

您可以查看订阅当前Topic的在线Group在Topic各个分区的消费进度,了解消息的消费和堆积情况。

  1. 登录消息队列Kafka版控制台
  2. 概览页面的资源分布区域,选择地域。
  3. 在左侧导航栏,单击消息检索
  4. 消息检索页面,找到需要查看消费进度的目标Topic,在其操作列,单击消费进度
    在消费详情页面,您可以查看Topic各分区的消费情况。

暂停消息检索任务

如果您需要暂时中止某个运行中的Topic消息检索,您可以暂停该Topic检索任务。

  1. 登录消息队列Kafka版控制台
  2. 概览页面的资源分布区域,选择地域。
  3. 在左侧导航栏,单击消息检索
  4. 消息检索页面,找到目标Topic,在其操作列,选择更多 > 暂停
  5. 在弹出的对话框单击确认

启用消息检索任务

您可以根据需要重新启用某个已经暂停的消息检索任务。

  1. 登录消息队列Kafka版控制台
  2. 概览页面的资源分布区域,选择地域。
  3. 在左侧导航栏,单击消息检索
  4. 消息检索页面,找到目标Topic,在其操作列,选择更多 > 启用
  5. 在弹出的对话框单击确认

删除消息检索任务

删除Topic的消息检索任务后,表格存储的数据表与多元索引会同步删除,该Topic不再提供消息检索功能。如果需要继续使用该Topic消息检索,请重新创建并等待数据同步。

  1. 登录消息队列Kafka版控制台
  2. 概览页面的资源分布区域,选择地域。
  3. 在左侧导航栏,单击消息检索
  4. 消息检索页面,找到需要删除的目标Topic,根据任务状态在对应位置操作。
    • 如果任务处于运行中已暂停状态以外的其他状态,则在其操作列,单击删除
    • 如果任务处于运行中已暂停状态,则在其操作列,选择更多 > 删除
  5. 在弹出的对话框单击确认
    消息检索页面,您已看不到刚才已删除的Topic。