全部产品
Search
文档中心

大数据开发治理平台 DataWorks:日志服务通过数据集成投递数据

更新时间:Apr 18, 2024

本文将以LogHub数据同步至MaxCompute为例,为您介绍如何通过数据集成功能同步LogHub数据至数据集成已支持的目的端数据源(例如MaxCompute、OSS、Tablestore、RDBMS和DataHub等)。

前提条件

背景信息

日志服务支持以下数据同步场景:

  • 跨地域的LogHub与MaxCompute等数据源的数据同步。

  • 不同阿里云账号下的LogHub与MaxCompute等数据源间的数据同步。

  • 同一阿里云账号下的LogHub与MaxCompute等数据源间的数据同步。

  • 公共云与金融云账号下的LogHub与MaxCompute等数据源间的数据同步。

以B账号进入数据集成配置同步任务,将A账号的LogHub数据同步至B账号的MaxCompute为例,跨阿里云账号的特别说明如下:

  1. 使用A账号的AccessKey ID和AccessKey Secret创建LogHub数据源。

    此时B账号可以同步A账号下所有日志服务项目的数据。

  2. 使用A账号下的RAM用户A1的AccessKey ID和AccessKey Secret创建LogHub数据源。

    • A账号为RAM用户A1赋予日志服务的通用权限,即AliyunLogFullAccessAliyunLogReadOnlyAccess,详情请参见创建RAM用户及授权

      说明

      为RAM账号授予AliyunLogFullAccessAliyunLogReadOnlyAccess系统策略后,RAM账号可以查询主账号下的所有日志服务。

    • A账号给RAM用户A1赋予日志服务的自定义权限。

      主账号A进入RAM控制台 > 权限管理 > 权限策略页面,单击创建权限策略

      相关的授权请参见简介概览

      根据下述策略进行授权后,B账号通过RAM用户A1只能同步日志服务project_name1以及project_name2的数据。

      {
          "Version": "1",
          "Statement": [
              {
                  "Action": [
                      "log:Get*",
                      "log:List*",
                      "log:CreateConsumerGroup",
                      "log:UpdateConsumerGroup",
                      "log:DeleteConsumerGroup",
                      "log:ListConsumerGroup",
                      "log:ConsumerGroupUpdateCheckPoint",
                      "log:ConsumerGroupHeartBeat",
                      "log:GetConsumerGroupCheckPoint"
                  ],
                  "Resource": [
                      "acs:log:*:*:project/project_name1",
                      "acs:log:*:*:project/project_name1/*",
                      "acs:log:*:*:project/project_name2",
                      "acs:log:*:*:project/project_name2/*"
                  ],
                  "Effect": "Allow"
              }
          ]
      }

新建LogHub数据源

  1. 登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的数据集成,在下拉框中选择对应工作空间后单击进入数据集成

  2. 单击左侧导航栏中的数据源

  3. 数据源列表页面,单击新增数据源

  4. 新增数据源对话框中,选择数据源类型为LogHub

  5. 填写新增LogHub数据源对话框中的配置。

    参数

    描述

    数据源名称

    数据源名称必须以字母、数字、下划线组合,且不能以数字和下划线开头。

    数据源描述

    对数据源进行简单描述,不得超过80个字符。

    LogHub Endpoint

    LogHub的Endpoint,格式为http://example.com。详情请参见服务入口

    Project

    输入项目名称。

    AccessKey ID

    访问密钥中的AccessKey ID,您可以进入控制台的用户信息管理页面进行复制。

    AccessKey Secret

    访问密钥中的AccessKey Secret,相当于登录密码。

  6. 单击测试连通性

  7. 连通性测试通过后,单击完成

新建离线同步节点

  1. 数据源页面,单击左上角的image图标,选择全部产品 > DataStudio(数据开发)

  2. 数据开发页面,鼠标悬停至新建图标,单击业务流程

  3. 新建业务流程对话框中,输入业务流程名称描述,单击新建

  4. 展开业务流程,右键单击数据集成,选择新建节点 > 离线同步

  5. 新建节点对话框中,输入节点名称,并选择路径

  6. 单击确认,进入离线节点编辑页面。

通过向导模式配置同步任务

  1. 在离线节点编辑页面,选择数据来源和数据去向。

    数据来源

    参数

    描述

    数据来源

    选择LogHub

    数据源名称

    选择以添加的日志服务数据源名称。

    资源组

    选择独享数据集成资源组。

    数据去向

    选择MaxCompute。

    数据源名称

    选择以添加的MaxCompute数据源名称。

  2. 测试网络连通性,数据来源和数据去向均可连通后,点击下一步

  3. 配置数据来源与数据去向具体同步的表等信息。

    image

    数据来源参数说明:

    参数

    描述

    Logstore

    目标日志库的名称。

    日志开始时间

    数据消费的开始时间位点,为时间范围(左闭右开)的左边界,为yyyyMMddHHmmss格式的时间字符串(例如20180111013000)。该参数可以和DataWorks的调度时间参数配合使用。

    日志结束时间

    数据消费的结束时间位点,为时间范围(左闭右开)的右边界,为yyyyMMddHHmmss格式的时间字符串(例如20180111013010)。该参数可以和DataWorks的调度时间参数配合使用。

    批量条数

    一次读取的数据条数,默认为256。

    说明

    您可以进行数据预览,此处仅选择LogHub中的几条数据展现在预览框。由于您在进行同步任务时,会指定开始时间和结束时间,会导致预览结果和实际的同步结果不一致。

  4. 选择字段的映射关系。

  5. 通道控制中配置同步速率脏数据策略等参数。

  6. 单击右侧调度配置,配置重跑属性调度资源组以及依赖的上游节点等参数。

    说明

    依赖的上游节点配置为使用工作空间根节点

  7. 确认当前节点的配置无误后,单击左上角的image

  8. 运行离线同步节点。

    您可以通过以下两种方式运行离线同步节点:

    • 直接运行(一次性运行)

      单击节点编辑页面工具栏中的image图标,直接在页面运行。

      说明

      运行之前需要配置自定义参数的具体取值。

    • 调度运行

      1. 单击右侧调度配置,设置时间属性,配置调度周期。

        image

      2. 单击节点编辑页面工具栏中的image图标,然后单击image图标,提交离线同步节点至调度系统,调度系统会根据配置的属性,从第2天开始自动定时运行。

通过脚本模式配置离线同步节点

  1. 成功创建离线同步节点后,单击工具栏中的转换脚本

    转换脚本

  2. 单击提示对话框中的确认,即可进入脚本模式进行开发。

  3. 单击工具栏中的导入模板

    导入模板

  4. 导入模板对话框中,选择从来源端的LogHub数据源同步至目标端的ODPS数据源的导入模板,单击确认

  5. 导入模板后,根据自身需求编辑代码,示例脚本如下。

    {
        "type": "job",
        "version": "1.0",
        "configuration": {
            "reader": {
                "plugin": "loghub",
                "parameter": {
                    "datasource": "loghub_lzz",//数据源名,需要和您添加的数据源名一致。
                    "logstore": "logstore-ut2",//目标日志库的名字,LogStore是日志服务中日志数据的采集、存储和查询单元。
                    "beginDateTime": "${startTime}",//数据消费的开始时间位点,为时间范围(左闭右开)的左边界。
                    "endDateTime": "${endTime}",//数据消费的结束时间位点,为时间范围(左闭右开)的右边界。
                    "batchSize": 256,//一次读取的数据条数,默认为256。
                    "splitPk": "",
                    "column": [
                        "key1",
                        "key2",
                        "key3"
                    ]
                }
            },
            "writer": {
                "plugin": "odps",
                "parameter": {
                    "datasource": "odps_source",//数据源名,需要和您添加的数据源名一致。
                    "table": "test",//目标表名。
                    "truncate": true,
                    "partition": "",//分区信息。
                    "column": [//目标列名。
                        "key1",
                        "key2",
                        "key3"
                    ]
                }
            },
            "setting": {
                "speed": {
                    "mbps": 8,//作业速率上限,单位MB/s。
                    "concurrent": 7//并发数。
                }
            }
        }
    }