当您通过函数计算消费日志数据时,在不同场景您可以选择使用日志服务提供的函数模板或自定义函数。本文介绍如何构建一个自定义函数。

函数Event

函数Event即运行函数计算的入口参数,格式为一个JSON Object序列化后字符串。

  • 参数说明
    参数 说明
    jobName 日志服务ETL Job名称。函数计算服务上的日志服务触发器对应一个日志服务的ETL Job。
    taskId 对于一个ETL Job,taskId是某一次确定性的函数调用标识。
    cursorTime 本次函数调用包括的数据中,最后一条日志到达日志服务的服务器端的unix_timestamp。
    source 该字段由日志服务生成,日志服务根据ETL Job定义的任务间隔定时触发函数执行,source字段是函数Event中的重要组成部分,定义了本次函数调用的消费范围。
    • endpoint:Project所属地域的服务入口,详情请参见服务入口
    • projectName:Project名称。
    • logstoreName:Logstore名称。
    • shardId:Logstore下的某一个确定Shard。
    • beginCursor:需要从Shard的什么位置开始消费数据。
    • endCursor:需要消费Shard数据到什么位置。
      说明 Shard对应的[beginCursor,endCursor)是一个左闭右开区间。
    parameter JSON Object类型,在您创建触发器函数配置时设置。自定义日志服务ETL函数运行时解析该字段,可以获取到函数所需要的运行参数。详情请参见创建触发器
  • 示例
    {
        "source": {
            "endpoint": "http://cn-shanghai-intranet.log.aliyuncs.com", 
            "projectName": "fc-****************", 
            "logstoreName": "demo", 
            "shardId": 0, 
            "beginCursor": "MTUwNTM5MDI3NTY1ODcwNzU2Ng==", 
            "endCursor": "MTUwNTM5MDI3NTY1ODcwNzU2OA=="
        }, 
        "parameter": {
            ...
        }, 
        "jobName": "fedad35f51a2a97b466da57fd71f315f539d2234", 
        "taskId": "9bc06c96-e364-4f41-85eb-b6e579214ae4",
        "cursorTime": 1511429883
    }

    在函数调试时候,您可以调用GetCursor接口获取cursor并按上述示例构建一个函数Event用于测试。

函数开发

您可以通过Java、Python、Node.js等多种语言实现函数开发,日志服务提供了相应Runtime的SDK,以便您在函数中进行集成,详情请参见SDK参考

以下内容以Java 8 Runtime为例,介绍如何开发日志服务ETL函数。关于Java 8函数编程细节,详情请参见函数计算服务Java编程指南

  • Java函数模板

    目前,日志服务提供了基于Java8 Runtime的自定义数模板,您可以在这基础上完成自定义需求的实现。

    模板已实现以下功能:
    • 函数Event中source、taskId、jobName字段的解析。
    • 根据source中定义的数据源,通过Log Service Java SDK获取数据,并对每一批数据调用processData接口进行处理。
    在模板中,您还需要实现以下功能:
    • 函数Event中parameter字段的解析,通过UserDefinedFunctionParameter.java实现。
    • 函数内针对数据的自定义业务逻辑,通过UserDefinedFunction.java的processData接口实现。
    • 为您的函数取一个可以描述功能的名字,替换UserDefinedFunction
  • processData接口实现

    您可以在processData内完成对一批数据的消费、加工、投递。例如在LogstoreReplication中,实现了将数据从一个Logstore中读出后写到另一个Logstore。

    说明
    • processData处理数据成功则返回true,处理数据遇到异常无法重试成功则返回false,但此时函数还会继续运行下去,且日志服务判定这是一次成功的ETL任务,会忽略其中处理异常的数据。
    • 如果遇到致命错误或业务逻辑上认为有异常需要提前终止函数执行,会通过throw Exception方式跳出函数运行,日志服务可以据此判断函数运行异常,并会按照ETL Job设置的规则重新调用函数执行。
    • 如果Shard流量较大,请为函数配置足够的内存规格,以避免函数OOM导致异常终止。
    • 如果在函数内执行耗时操作或者Shard流量较大,请您设置较短的函数触发间隔和较长的函数运行超时时间。
    • 请您为函数服务配置足够的权限,例如在函数内写OSS就需要配置OSS写权限。

ETL日志

  • ETL调度日志

    调度日志记录ETL任务开始时间、结束时间、任务是否成功以及成功返回的信息。如果ETL任务出错会生成ETL出错日志,并向系统管理员发送报警邮件或短信。请您在创建触发器时设置触发器日志Logstore,并为该Logstore开启并配置索引,详情请参见开启并配置索引

    对于函数执行结果的统计,可以通过函数返回,例如在Java8 Runtime函数的outputStream中体现。日志服务提供的函数模板会写出一个JSON Object序列化后的字符串,该字符串将记录在ETL任务调度日志中,方便您进行统计、查询。

  • ETL过程日志

    这一部分日志是在ETL执行过程中每执行一步记录的关键点和错误信息,包括某一步骤的开始和结束时间、初始化动作完成情况、模块出错信息等。ETL过程日志的意义是随时可以感知ETL运行情况,如果发生错误,可以及时通过过程日志查找原因。

    您可以通过context.getLogger()记录过程日志并存放在日志服务指定Project的Logstore中,建议您为该Logstore开启索引查询功能。