全部产品
Search
文档中心

数据传输服务 DTS:使用ETL分析实时订单

更新时间:Feb 22, 2024

本文通过案例为您介绍如何使用ETL实现实时订单分析。

应用场景

为满足企业处理实时数据的需求,ETL提供了流式数据抽取、加工和加载功能,能够高效整合海量实时数据,支持拖拽式操作和低代码开发方式,帮助企业轻松完成商业决策分析、报表提速、实时数据计算等。企业在数字化转型过程中,涉及实时数据处理的应用场景如下:

  • 多区域或异构数据实时集中:将多地域或者异构数据实时存储至同一数据库中,便于企业中心化高效管理及决策支持。

  • 报表提速:帮助客户构建实时报表体系,不仅大幅提升报表产出效率,还能支持更多实时分析场景,满足了企业数字化转型阶段对报表产出效率的高要求。

  • 实时计算场景:对业务侧产生的流数据实时清洗处理,形成特征值、标签支持在线业务计算模型(画像、风控、推荐等)或实时大屏等流计算场景。

案例背景

本案例将为您演示如何使用流式ETL功能,将实时交易数据(订单号、客户ID、产品/商品编码、交易金额、交易时间)与业务维度数据(产品编码、产品单价、产品名称等)相结合,并将满足过滤条件的数据(如统计单笔超3000的实时交易信息)实时集中至数据仓库,实现交易数据的多维分析(如产品维度、客户维度等)。您还可根据业务需要,借助工具实现可视化大屏,洞察动态数据。

实现流程

任务配置流程

警告

为确保成功配置和运行ETL任务,请您在配置前仔细阅读并遵循前提条件注意事项

配置流程概览

步骤

说明

准备工作

将实时交易数据、业务维度数据存储在源表中,并根据业务需求创建目标表。

说明

本案例中实时交易、业务维度表、目标表均存储在RDS MySQL中。

步骤一:配置源库信息

将实时交易数据配置为流表,业务维度数据配置为维表。

步骤二:配置表JOIN

将维表和流表数据关联成一张宽表。

步骤三:配置表记录过滤

配置过滤条件(单笔金额需超过3000),筛选宽表中的数据。

步骤四:配置目标库信息

将加工后的数据实时加载至目标表中。

步骤五:预检查并启动任务

预检查并启动ETL任务,执行以上配置。

准备工作

在配置ETL任务前,您需将实时交易数据和业务维度数据分别作为流表和维表存储在源RDS MySQL数据库中。

并根据业务需求,在目标RDS MySQL数据库中建表。

说明

本案例中实时交易数据表,业务维度数据表,目标表的具体建表语句如下。

建表语句

实时交易数据

create table test_orders(
   order_id bigint not null COMMENT '订单ID',
     user_id bigint not null comment '用户ID',
     product_id bigint not null comment '产品ID',
   total_price decimal(15,2) not null COMMENT '订单总额',
   order_date TIMESTAMP  not null COMMENT '订单日期',
   PRIMARY KEY (order_id))

业务维度数据

CREATE table product (
      product_id bigint not null comment '产品ID',
      product_name varchar(20) comment '产品名称',
      product_price decimal(15,2) not null comment '产品单价')

目标表

create table test_orders(
 order_id bigint not null COMMENT '订单ID',
 user_id bigint not null comment '用户ID',
 product_id bigint not null comment '产品ID',
 total_price decimal(15,2) not null COMMENT '订单总额',
 order_date TIMESTAMP  not null
 COMMENT '订单日期',
 product_id_2 bigint not null comment '产品ID',
 product_name varchar(20) comment '产品名称',
 product_price decimal(15,2) not null comment '产品单价',
 PRIMARY KEY (order_id))

步骤一:配置源库信息

  1. 进入ETL任务的列表页面。

    1. 登录DMS数据管理服务

    2. 在顶部菜单栏中,单击集成与开发(DTS)

    3. 在左侧导航栏,选择数据集成 > 流式ETL

  2. 单击左上角的新增数据流,在新增数据流对话框中,您需在数据流名称配置ETL任务名称,选择开发方式DAG

  3. 单击确认

  4. 执行如下操作,配置流表和维表信息。

    1. 配置流表信息

      1. 页面左侧,将输入/维表 MySQL节点拖拽至页面右侧画布的空白区域。

      2. 单击画布区域的输入/维表 MySQL-1,根据页面信息配置源库信息。

      3. 设置以下参数,配置节点信息。配置源库信息_节点配置

        参数

        说明

        请输入数据源名称

        DTS会自动生成一个数据源名称,建议配置具有业务意义的名称(无唯一性要求),便于后续识别。

        区域

        选择源库所在地域。

        说明

        当前仅支持在华东1(杭州)、华东2(上海)、华北1(青岛)、华北2(北京)、华北3(张家口)、华南1(深圳)、华南3(广州)和中国香港创建ETL任务。

        实例列表

        选择源库所在数据库实例的实例名称。您也可以单击下方的新建实例进行新建,新建方法请参见DMS支持的数据库

        节点类型

        选择源表的类型,本案例中选择为流表

        • 流表:实时发生变化的表,可以关联一个维表,用于数据关联查询。

        • 维表:更新不频繁(非实时更新)的表,一般用于结合实时数据拼装成宽表进行数据分析。

        转换格式

        ETL在处理数据时会将流转换为动态表,在该动态表上进行持续查询(即动态表会被INSERT、UPDATE、DELETE操作持续更改),产生一个新的动态表。最终写入目标库时,再将新的动态表会转化为流。当新的动态表转化为流时,您需要指定转化格式,对动态表前后更改信息进行编码:

        • Upsert流:动态表中的数据支持通过INSERT、UPDATE和DELETE操作修改,当转换为流时,会将INSERT和UPDATE操作编码为upsert message,将DELETE操作编码为delete message。

          说明

          该编码方式要求动态表具有唯一键(可能是复合的)。

        • Append-Only流: 动态表中的数据仅支持INSERT操作修改,当转换为流时仅需发送INSERT的数据。

        库表选择

        选择源表中需转换的库表。

      4. 节点配置完成后,页面会自动切换至输出字段页签,您可根据业务需要,在页签的列名称列勾选需要的字段。

      5. 本案例中选择为流表,需要单击时间属性页签,并设置对应参数。配置源库信息_时间属性

        参数

        说明

        选择事件时间Watermark字段

        选择流表中的一个时间字段。一般流表会定义时间字段,代表数据产生的时间,通常为具有业务含义的时间戳(比如ordertime)。

        事件时间Watermark延迟时间

        输入数据延迟的最大容忍时间。

        应用场景是,由于数据并不一定按照实际产生顺序,达到ETL等待处理,可能会出现延迟情况。如果数据一直延迟未到,ETL不能无限制地等待延迟的数据,因此需要建立延迟时间来处理乱序数据。比如10:00的数据已到达,但是9:59的数据还未到达,则ETL只会等待至“10:00+延迟时间”。如果9:59的数据在“10:00+延迟时间”后到达,则ETL会抛弃该数据。

        处理时间ProcTime

        处理时间为ETL处理数据时的本地时间。您需要自定义一个列名,ETL会在该列保存数据处理的本地时间。处理时间主要用于算子运算,如时态JOIN会用该处理时间去关联普通表的最新版本。

      说明

      完成配置的源库右侧不显示配置源库信息_感叹号时,说明配置完成。

    2. 配置维表信息

      1. 页面左侧,将输入/维表 MySQL节点拖拽至页面右侧画布的空白区域。

      2. 单击画布区域的输入/维表 MySQL-2,根据页面信息配置源库信息。

      3. 设置以下参数,配置节点信息。维表_节点配置

        参数

        说明

        请输入数据源名称

        DTS会自动生成一个数据源名称,建议配置具有业务意义的名称(无唯一性要求),便于后续识别。

        区域

        选择源库所在地域。

        实例列表

        选择源库所在数据库实例的实例名称。您也可以单击下方的新建实例进行新建,新建方法请参见DMS支持的数据库

        节点类型

        选择源表的类型,本案例中选择为维表

        库表选择

        选择源表中需转换的库表。

      4. 节点配置完成后,页面会自动切换至输出字段页签,您可根据业务需要,在页签的列名称列勾选需要的字段。

      说明

      完成配置的源库右侧不显示配置源库信息_感叹号时,说明配置完成。

步骤二:配置表JOIN

  1. 在页面左侧,将表 Join节点拖拽至页面右侧画布的空白区域。

  2. 将鼠标指针移动至已完成配置的流表和维表上,单击圆点拉出连接线,分别将流表和维表与表 Join-1连接起来。

  3. 单击画布区域的表 Join-1,根据页面信息配置转换组件。

    1. 节点配置页签,设置以下参数,配置节点信息。

      Join_节点配置

      区域

      参数

      说明

      转换名称

      请输入转换名称

      DTS会自动生成一个转换组件名称,建议配置具有业务意义的名称(无唯一性要求),便于后续识别。

      Join配置

      Join类型符左边的表

      选择放置在JOIN类型符左边的表,作为主表。本案例中选择为流表。

      时态Join时间属性(不选择为普通Join)

      选择使用时态JOIN时,流表关联时态表的时间属性。如不输入,则默认使用普通JOIN。本案例中选择为基于处理时间ProcTime

      说明
      • 时态表,也称动态表,是指基于表的(参数化)视图概念,根据时间记录数据变更历史,分为版本表(可显示数据的历史版本)和普通表(仅显示数据的最新版本)。

      • 时态JOIN要求流表定义时间属性,右表要有主键;如右表是维表,则您所设置的Join的条件需包含维表的主键。

      • 基于事件时间Watermark:使用流表的事件时间去关联版本表对应的版本。

      • 基于处理时间ProcTime:使用流表的处理时间去关联普通表的最新版本。

      选择Join操作

      选择Join操作方式。本案例中选择为Inner Join

      • Inner Join:数据为两张表的交集。

      • Left Join:在左表中获取所有数据,在右表中获取两张表的交集。

      • Right Join:在左表中获取两张表的交集,在右表中获取所有数据。

      Join条件

      +新增条件

      单击+新增条件,选择JOIN的条件字段。

      说明

      等号(=)左侧为JOIN后新表的左表字段,右侧为JOIN后新表的右表字段。

  4. 完成Join条件配置后,单击输出字段页签。根据实际需要,在列名称列勾选需要的字段。

说明

完成配置的转换组件右侧不显示配置源库信息_感叹号时,说明配置完成。

步骤三:配置表记录过滤

  1. 在页面左侧,将表记录过滤节点拖拽至页面右侧画布的空白区域。

  2. 将鼠标指针移动至已完成配置的表 Join-1上,单击圆点拉出连接线,连接表 Join-1表记录过滤-1

  3. 单击画布区域的表记录过滤-1,根据页面信息配置转换组件。

    1. 转换名称区域,输入转换名称。

      说明

      DTS会自动生成一个转换组件名称,建议配置具有业务意义的名称(无唯一性要求),便于后续识别。

    2. where条件区域,您可以选择以下任意一种方法配置WHERE条件。

      • 直接输入需要的WHERE条件,比如输入total_price>3000.00,表示过滤JOIN后表中total_price字段的值大于3000.00的数据。

      • 单击字段输入操作符区域中的选项配置WHERE条件。

说明

完成配置的转换组件右侧不显示配置源库信息_感叹号时,说明配置完成。

步骤四:配置目标库信息

  1. 在页面左侧,将输出 MySQL节点拖拽至页面右侧画布的空白区域。

  2. 将鼠标指针移动至已完成配置的表记录过滤-1上,单击圆点拉出连接线,连接表记录过滤-1输出 MySQL-1

  3. 单击画布区域的输出 MySQL-1页面,根据页面信息配置目标库信息。

    1. 设置以下参数,配置节点信息。

      目标库_节点配置

      参数

      说明

      请输入数据源名称

      DTS会自动生成一个数据源名称,建议配置具有业务意义的名称(无唯一性要求),便于后续识别。

      区域

      选择目标库所在地域。

      说明

      当前仅支持在华东1(杭州)、华东2(上海)、华北1(青岛)、华北2(北京)、华北3(张家口)、华南1(深圳)、华南3(广州)和中国香港创建ETL任务。

      实例列表

      选择目标库所在数据库的实例名称。您也可以单击下方的新建实例进行新建,新建方法请参见DMS支持的数据库

      表映射

      选择目标库通过转换处理后需要存储的目标表。

      选择目标表区域,单击目标表。

  4. 根据业务需要,在列名称列勾选需要的参数。

说明

完成配置的目标库右侧不显示配置源库信息_感叹号时,说明配置完成。

步骤五:预检查并启动任务

  1. 配置完成后,单击生成Flink SQL校验,ETL将生成Flink SQL并进行校验。

  2. 检验完成后,您可单击查看ETL校验详情,在弹跳框中,查看Flink SQL生成结果和SQL语句。确认无误后,单击关闭

    说明

    如校验失败,您可以根据生成结果显示的失败原因进行修复。

  3. 单击下一步保存任务并预检查。当预检查通过后,DTS才能开始ETL任务。如果预检查失败,请单击检查失败项后的查看详情,根据提示信息修复后,重新进行预检查。

  4. 预检查完成后,单击页面下方的下一步购买

  5. 购买页面,选择链路规格计算资源,阅读并勾选数据传输(按量付费)服务条款公测协议条款

    说明

    公测期间,每个用户可以免费创建并使用两个ETL实例。

  6. 单击购买并启动,ETL任务正式开始。

任务运行结果

本案例中,ETL任务的启动后(以8月1日为例),如实时交易数据表test_orders中更新的数据满足过滤条件(total_priceid>3000.00,即总交易额大于3000.00),则该数据会同步至目标表test_orders_new中。

图 1. 实时交易数据表test_orders目标表test_orders_new

图 2. 目标表test_orders_new业务数据表test_orders