消息队列RocketMQ版作为事件源通过事件总线EventBridge函数计算集成后,通过消息队列RocketMQ版触发器(以下简称RocketMQ触发器)能够触发关联函数执行,通过函数可以对发布到消息队列RocketMQ版中的消息进行自定义处理。本文介绍如何在函数计算控制台创建RocketMQ触发器、配置函数入口参数和编写代码并测试。

功能简介

您在函数计算的控制台提交触发器创建请求之后,函数计算会根据触发器的配置信息,自动创建事件总线EventBridge相关的资源。目前提供事件模型事件流模型两种消息推送模型,每个模型创建的资源如下:
创建完成后,您可以在函数计算控制台查看触发器信息,同时也可以在事件总线EventBridge控制台查看自动创建的资源信息。当源消息队列RocketMQ版实例中有消息入队时,将会触发函数计算执行,不同消息推送模型支持的参数内容也不同。
  • 事件模型:每次会将单个消息作为事件参数传入函数中,事件遵循CloudEvents规范。消息内容和CloudEvents的关系,请参见参数内容
  • 事件流模型:根据您的攒批配置,将一个或多个消息事件以批的形式推送到函数中进行处理,适合端到端的流式数据处理场景。

注意事项

  • 作为触发源的消息队列RocketMQ版的实例必须和函数计算的函数在相同的地域。
  • 创建的自定义总线以及事件规则的数量超过上限后,将无法再创建事件模型的RocketMQ触发器。
  • 创建的事件流数量超过上限后,将无法再创建事件流模型的RocketMQ触发器。
在单个阿里云账号单个地域维度下,创建触发器涉及的资源数量的限制如下所示。
限制项 资源上限
自定义事件总线数量 10
每个自定义事件总线下创建的事件规则数量 10
事件流数量 30

前提条件

步骤一:创建触发器

  1. 登录函数计算控制台,在左侧导航栏,单击服务及函数
  2. 在顶部菜单栏,选择地域,然后在服务列表页面,单击目标服务。
  3. 函数管理页面,单击目标函数名称。
  4. 在函数详情页面,单击触发器管理页签,从版本或别名下拉列表选择要创建触发器的版本或别名,然后单击创建触发器
  5. 在创建触发器面板,填写相关信息。然后单击确定
    高级配置的更多信息,请参见EventBridge触发器高级功能
    配置项类型 配置项 操作 本文示例
    基础配置 触发器类型 选择消息队列 RocketMQ 版 消息队列 RocketMQ 版
    名称 填写自定义的触发器名称。 rocketmq-trigger
    版本或别名 默认值为LATEST,如果您需要创建其他版本或别名的触发器,首先需要在函数详情页的右上角切换到该版本或别名。关于版本和别名的简介,请参见管理版本管理别名 LATEST
    RocketMQ 实例 选择已创建的消息队列RocketMQ版的实例。 MQ_INST_164901546557****_BX7****
    Topic 选择已创建的消息队列RocketMQ版实例的Topic。 topic1
    Tag 填写消息过滤标签。

    只有收到包含此处设置的过滤标签字符串的消息时,才会触发函数执行。

    tag
    Group ID 选择已创建的消息队列RocketMQ版实例的Group ID。 GID_group1
    消费位点 选择消息的消费位点,即消息队列RocketMQ版从事件总线开始拉取消息的位置。取值说明如下。
    • 最新位点:从最新位点开始消费。
    • 最早位点:从最早位点开始消费。
    • 指定时间戳:从指定时间戳开始消费。
    最新位点
    高级配置 调用方式 选择函数调用方式。
    取值说明如下。
    • 同步调用:默认调用方式,事件触发函数执行,等待函数调用完成后,函数计算返回执行结果。更多信息,请参见同步调用
    • 异步调用:适用于调度延时较长的函数,事件触发函数执行后,函数计算立即返回响应结果并且确保函数至少被成功执行一次,但不会返回具体执行结果。更多信息,请参见功能概览
    同步调用
    消息推送模型 消息数据推送到函数计算时的底层应用模型。
    取值说明如下。
    • 事件模型:每次会将单个消息作为事件参数传入函数中,事件遵循CloudEvents规范。消息内容和CloudEvents的关系,请参见步骤二:配置函数入口参数
    • 事件流模型:会根据您的攒批配置将一个或多个消息事件以批的形式推送到函数中进行处理,适合端到端的流式数据处理场景。
    事件模型
    批量推送

    仅当您选择事件流模型时,可以配置此参数。

    可帮您批量聚合多个事件,当批量推送条数批量推送间隔两者条件达到其一时即会触发批量推送。

    例如,您设置的推送条数为100条,间隔时间为15秒,在10秒内消息条数已达到100条,那么该次推送则不会等15秒后再推送。

    开启
    批量推送条数 一次调用函数发送的最大批量消息条数,当积压的消息数量到达设定值时才会发送请求,取值范围为[1,500]。 1
    批量推送间隔 调用函数的间隔时间,系统每到间隔时间点会将消息聚合后发给函数计算,取值范围为[0,15],单位为秒。0秒表示无等待时间,直接投递。 1
    重试策略

    仅当您选择事件流模型时,可以配置此参数。

    消息推送失败时的重试策略。

    取值说明如下。
    • 退避重试:重试3次,每次重试的间隔时间是10秒到20秒之间的随机值。
    • 指数衰减重试:重试176次,每次重试的间隔时间指数递增至512秒,总计重试时间为1天;每次重试的具体间隔为:1、2、4、8、...512秒。
    退避重试
    容错策略

    仅当您选择事件流模型时,可以配置此参数。

    当错误发生时的处理方式。

    取值说明如下。
    • 允许容错:允许异常容错,当异常发生时不会阻塞执行,超过重试策略后会根据配置将消息投递至死信队列或直接丢弃。
    • 禁止容错:不允许容错,当异常发生并超过重试策略配置时会阻塞执行。
    允许容错
    死信队列

    仅当您选择事件流模型时,可以配置此参数。

    未处理或超过重试次数的消息可以发送到指定队列。该配置不启用时,超过重试策略后消息将被丢弃。

    开启
    队列类型 死信队列的类型。
    取值如下:
    • 消息服务 MNS
    • 消息队列 RocketMQ 版
    消息服务 MNS
    队列名称 死信队列的名称。 test-queue

    创建完成后,在触发器名称列表中显示已创建的触发器。如需对创建的触发器进行修改或删除,具体操作,请参见触发器管理

步骤二:配置函数入口参数

消息队列RocketMQ版事件源会以event的形式作为输入参数传递给函数,您可以手动将event传给函数模拟触发事件,测试函数代码是否正确。

  1. 在函数详情页面,单击函数代码页签,然后单击xialatubiao图标,从下拉列表中,选择配置测试参数
  2. 配置测试参数面板,选择创建新测试事件编辑已有测试事件页签,填写事件名称和事件内容。然后单击确定
    事件模型的event格式如下所示。
    {
        "id":"94ebc15f-f0db-4bbe-acce-56fb72fb****",
        "source":"RocketMQ-Function-rocketmq-trigger",
        "specversion":"1.0",
        "type":"mq:Topic:SendMessage",
        "datacontenttype":"application/json; charset=utf-8",
        "subject":"acs:mq:cn-hangzhou:164901546557****:MQ_INST_164901546557****_BXhFHryi%TopicName",
        "time":"2021-04-08T06:01:20.766Z",
        "aliyunaccountid":"164901546557****",
        "aliyunpublishtime":"2021-10-15T02:05:16.791Z",
        "aliyunoriginalaccountid":"164901546557****",
        "aliyuneventbusname":"RocketMQ-Function-rocketmq-trigger",
        "aliyunregionid":"cn-chengdu",
        "aliyunpublishaddr":"42.120.XX.XX",
        "data":{
            "topic":"TopicName",
            "systemProperties":{
                "MIN_OFFSET":"0",
                "TRACE_ON":"true",
                "MAX_OFFSET":"8",
                "MSG_REGION":"cn-hangzhou",
                "KEYS":"systemProperties.KEYS",
                "CONSUME_START_TIME":1628577790396,
                "TAGS":"systemProperties.TAGS",
                "INSTANCE_ID":"MQ_INST_164901546557****_BXhFHryi"
            },
            "userProperties":{
    
            },
            "body":"TEST"
        }
    }
    事件流模型的event格式如下所示。
    [
        {
        "id":"94ebc15f-f0db-4bbe-acce-56fb72fb****",
        "source":"RocketMQ-Function-rocketmq-trigger",
        "specversion":"1.0",
        "type":"mq:Topic:SendMessage",
        "datacontenttype":"application/json; charset=utf-8",
        "subject":"acs:mq:cn-hangzhou:164901546557****:MQ_INST_164901546557****_BXhFHryi%TopicName",
        "time":"2021-04-08T06:01:20.766Z",
        "aliyunaccountid":"164901546557****",
        "aliyunpublishtime":"2021-10-15T02:05:16.791Z",
        "aliyunoriginalaccountid":"164901546557****",
        "aliyuneventbusname":"RocketMQ-Function-rocketmq-trigger",
        "aliyunregionid":"cn-chengdu",
        "aliyunpublishaddr":"42.120.XX.XX",
        "data":{
            "topic":"TopicName",
            "systemProperties":{
                "MIN_OFFSET":"0",
                "TRACE_ON":"true",
                "MAX_OFFSET":"8",
                "MSG_REGION":"cn-hangzhou",
                "KEYS":"systemProperties.KEYS",
                "CONSUME_START_TIME":1628577790396,
                "TAGS":"systemProperties.TAGS",
                "INSTANCE_ID":"MQ_INST_164901546557****_BXhFHryi"
            },
            "userProperties":{
    
            },
            "body":"TEST"
        }
        },
        {
        "id":"94ebc15f-f0db-4bbe-acce-56fb72fb****",
        "source":"RocketMQ-Function-rocketmq-trigger",
        "specversion":"1.0",
        "type":"mq:Topic:SendMessage",
        "datacontenttype":"application/json; charset=utf-8",
        "subject":"acs:mq:cn-hangzhou:164901546557****:MQ_INST_164901546557****_BXhFHryi%TopicName",
        "time":"2021-04-08T06:01:20.766Z",
        "aliyunaccountid":"164901546557****",
        "aliyunpublishtime":"2021-10-15T02:05:16.791Z",
        "aliyunoriginalaccountid":"164901546557****",
        "aliyuneventbusname":"RocketMQ-Function-rocketmq-trigger",
        "aliyunregionid":"cn-chengdu",
        "aliyunpublishaddr":"42.120.XX.XX",
        "data":{
            "topic":"TopicName",
            "systemProperties":{
                "MIN_OFFSET":"0",
                "TRACE_ON":"true",
                "MAX_OFFSET":"8",
                "MSG_REGION":"cn-hangzhou",
                "KEYS":"systemProperties.KEYS",
                "CONSUME_START_TIME":1628577790396,
                "TAGS":"systemProperties.TAGS",
                "INSTANCE_ID":"MQ_INST_164901546557****_BXhFHryi"
            },
            "userProperties":{
    
            },
            "body":"TEST"
        }
        }
    ]
    data字段包含的参数解释如下表所示。关于CloudEvents规范中定义的参数解释,请参见事件概述
    参数 类型 示例值 描述
    topic String TopicName Topic名称。
    systemProperties Map 系统属性。
    MIN_OFFSET Int 0 最低位点。
    TRACE_ON Boolean true 是否有消息轨迹。取值说明如下:
    • true:有消息轨迹。
    • false:无消息轨迹。
    MAX_OFFSET Int 8 最高位点。
    MSG_REGION String cn-hangzhou 发送消息的地域。
    KEYS String systemProperties.KEYS 过滤属性。
    CONSUME_START_TIME Long 1628577790396 开始消费时间。单位:毫秒。
    UNIQ_KEY String AC14C305069E1B28CDFA3181CDA2**** 消息唯一键。
    TAGS String systemProperties.TAGS 过滤属性。
    INSTANCE_ID String MQ_INST_123456789098****_BXhFHryi 实例ID。
    userProperties Map 用户属性。
    body String TEST 消息内容。

步骤三:编写函数代码并测试

完成触发器创建后,您可以开始编写并测试函数代码,以验证代码的正确性。在实际操作过程中,当消息队列RocketMQ版事件通过事件总线EventBridge投递到函数计算时,触发器会自动触发函数的执行。

  1. 在函数详情页面,单击函数代码页签,在代码编辑器中编写代码,然后单击部署代码
    本文以Node.js函数代码为例。
    'use strict';
    /*
    To enable the initializer feature
    please implement the initializer function as below:
    exports.initializer = (context, callback) => {
      console.log('initializing');
      callback(null, '');
    };
    */
    exports.handler = (event, context, callback) => {
      console.log("event: %s", event);
      //解析event参数,对event进行处理。
      callback(null, 'return result');
    }
  2. 单击函数代码页签的测试函数
    执行完成后,您可以在函数代码页签的上方查看执行结果。

更多信息

如需对创建的触发器进行修改或删除,具体操作,请参见触发器管理