消息队列Kafka版作为事件源通过事件总线EventBridge函数计算集成后,通过消息队列Kafka版触发器(以下简称Kafka触发器)能够触发关联函数执行,通过函数可以对发布到消息队列Kafka版的消息进行自定义处理。本文介绍如何在函数计算控制台创建Kafka触发器、配置入口参数以及编写代码并测试代码。

功能简介

您在函数计算的控制台提交触发器创建请求之后,函数计算根据触发器的配置信息,将按照如下命名规则在事件总线EventBridge创建一条事件流:
  • 事件流名称:<服务名称>-<函数名称>-<触发器名称>
创建完成后,您可以在函数计算控制台查看触发器信息,同时也可以在事件总线EventBridge控制台查看自动创建的事件流。当指定事件源发布事件时,触发该触发器关联的函数执行一次。

前提条件

步骤一:创建Kafka触发器

  1. 登录函数计算控制台
  2. 在左侧导航栏,单击服务及函数
  3. 在顶部菜单栏,选择地域。
  4. 服务列表页面,找到目标服务,在其右侧操作列单击函数管理
  5. 函数管理页面,单击目标函数名称。
  6. 在函数详情页面,单击触发器管理页签,从版本或别名下拉列表选择要创建触发器的版本或别名,然后单击创建触发器
  7. 在创建触发器面板,填写相关信息。然后单击确定
    参数 操作 本文示例
    触发器类型 选择消息队列 Kafka 版 消息队列Kafka版
    名称 填写自定义的触发器名称。 kafka-trigger
    版本或别名 默认值为LATEST,如果您需要创建其他版本或别名的触发器,首先您需要在函数详情页的右上角切换到该版本或别名。关于版本和别名的简介,请参见管理版本管理别名 LATEST
    Kafka 实例 选择已创建的消息队列Kafka版实例。 alikafka_pre-cn-i7m2t7t1****
    Topic 选择已创建的消息队列Kafka版实例的Topic。 topic1
    Group ID 选择已创建的消息队列Kafka版实例的Group ID。
    说明 请使用独立的Group ID来创建触发器,不要与已有的业务混用Group ID,否则会影响已有的消息收发。
    GID_group1
    消费位点 选择消息的消费位点,即消息队列Kafka版从事件总线开始拉取消息的位置。取值说明如下:
    • 最早位点:从最早位点开始消费。
    • 最新位点:从最新位点开始消费。
    最新位点
    网络配置 选择路由消息的网络类型,取值说明如下:
    • 默认网络:默认使用部署Kafka实例时选择的VPC IDvSwitch ID
    • 自建公网:需选择另外的专有网络VPC交换机安全组
    默认网络
    调用方式 选择函数调用方式,取值说明如下:
    • 同步调用:默认调用方式,事件触发函数执行,等待函数调用完成后,函数计算返回执行结果。更多信息,请参见同步调用
    • 异步调用:适用于调度延时较长的函数,事件触发函数执行后,函数计算立即返回响应结果并且确保函数至少被成功执行一次,但不会返回具体执行结果。更多信息,请参见功能概览
    同步调用

    创建完成后,在触发器名称列表中显示已创建的触发器。如需对创建的触发器进行修改或删除,具体操作,请参见触发器管理

步骤二:配置函数入口参数

消息队列Kafka版事件源会以event的形式作为输入参数传递给函数,您可以手动将event传给函数模拟触发事件。

  1. 在函数详情页面,单击函数代码页签,然后单击xialatubiao图标,从下拉列表中,选择配置测试参数
  2. 配置测试参数面板,选择创建新测试事件编辑已有测试事件页签,填写事件名称和事件内容。然后单击确定
    event格式如下所示:
    {
      "specversion": "1.0",
      "id": "8e215af8-ca18-4249-8645-f96c1026****",
      "source": "acs:alikafka",
      "type": "alikafka:Topic:Message",
      "subject": "acs:alikafka_pre-cn-i7m2t7t1****:topic:mytopic",
      "datacontenttype": "application/json; charset=utf-8",
      "time": "2022-06-23T02:49:51.589Z",
      "aliyunaccountid": "164901546557****",
      "data": {
        "topic": "****",
        "partition": 7,
        "offset": 25,
        "timestamp": 1655952591589,
        "headers": {
          "headers": [],
          "isReadOnly": false
        },
        "key": "keytest",
        "value": "hello kafka msg"
      }
    }

    CloudEvents规范中定义的参数解释,请参见事件概述

    data字段包含的参数解释如下表所示。

    参数 类型 示例值 描述
    topic String TopicName Topic的名称。
    partition Int 1 消息队列Kafka版的消费分区信息。
    offset Int 0 消息队列Kafka版的消息位点。
    timestamp String 1655952591589 开始消费时间戳。

步骤三:编写函数代码并测试

完成触发器创建后,您可以开始编写函数代码并测试以验证代码的正确性,在实际操作过程中当发生消息队列Kafka版事件时,触发器会自动触发函数的执行。

  1. 在函数详情页面,单击函数代码页签,在代码编辑器中编写代码,然后单击保存并部署
    本文以Node.js函数代码为例。
    'use strict';
    /*
    To enable the initializer feature
    please implement the initializer function as below:
    exports.initializer = (context, callback) => {
      console.log('initializing');
      callback(null, '');
    };
    */
    exports.handler = (event, context, callback) => {
      console.log("event: %s", event);
      //解析event参数,对event进行处理。
      callback(null, 'return result');
    }
  2. 单击函数代码页签的测试函数
    执行完成后,您可以在函数代码页签的上方查看执行结果。

更多信息

除了函数计算控制台,您还可通过以下方式配置触发器:
  • 通过Serverless Devs工具配置触发器。更多操作,请参见Serverless Devs
  • 通过SDK配置触发器。更多操作,请参见SDK列表

如需对创建的触发器进行修改或删除,具体操作,请参见触发器管理