本文通过案例为您介绍如何使用ETL实现实时订单分析。

应用场景

为满足企业处理实时数据的需求,ETL提供了流式数据抽取、加工和加载功能,能够高效整合海量实时数据,支持拖拽式操作和低代码开发方式,帮助企业轻松完成商业决策分析、报表提速、实时数据计算等。企业在数字化转型过程中,涉及实时数据处理的应用场景如下:
  • 多区域或异构数据实时集中:将多地域或者异构数据实时存储至同一数据库中,便于企业中心化高效管理及决策支持。
  • 报表提速:帮助客户构建实时报表体系,不仅大幅提升报表产出效率,还能支持更多实时分析场景,满足了企业数字化转型阶段对报表产出效率的高要求。
  • 实时计算场景:对业务侧产生的流数据实时清洗处理,形成特征值、标签支持在线业务计算模型(画像、风控、推荐等)或实时大屏等流计算场景。

案例背景

本案例将为您演示如何使用流式ETL功能,将实时交易数据(订单号、客户ID、产品/商品编码、交易金额、交易时间)与业务维度数据(产品编码、产品单价、产品名称等)相结合,并将满足过滤条件的数据(如统计单笔超3000的实时交易信息)实时集中至数据仓库,实现交易数据的多维分析(如产品维度、客户维度等)。您还可根据业务需要,借助工具实现可视化大屏,洞察动态数据。

实现流程

任务配置流程

警告 为确保成功配置和运行ETL任务,请您在配置前仔细阅读并遵循前提条件注意事项
配置流程概览
步骤 说明
准备工作 将实时交易数据、业务维度数据存储在源表中,并根据业务需求创建目标表。
说明 本案例中实时交易、业务维度表、目标表均存储在RDS MySQL中。
步骤一:配置源库信息 将实时交易数据配置为流表,业务维度数据配置为维表。
步骤二:配置表JOIN 将维表和流表数据关联成一张宽表。
步骤三:配置表记录过滤 配置过滤条件(单笔金额需超过3000元),筛选宽表中的数据。
步骤四:配置目标库信息 将加工后的数据实时加载至目标表中。
步骤五:预检查并启动任务 预检查并启动ETL任务,执行以上配置。

准备工作

在配置ETL任务前,您需将实时交易数据和业务维度数据分别作为流表和维表存储在源RDS MySQL数据库中。

并根据业务需求,在目标RDS MySQL数据库中建表。

说明 本案例中实时交易数据表,业务维度数据表,目标表的具体建表语句如下。
create table test_orders(
   order_id bigint not null COMMENT '订单ID',
     user_id bigint not null comment '用户ID',
     product_id bigint not null comment '产品ID',
   total_price decimal(15,2) not null COMMENT '订单总额',
   order_date TIMESTAMP  not null COMMENT '订单日期',
   PRIMARY KEY (order_id))
CREATE table product (
      product_id bigint not null comment '产品ID',
      product_name varchar(20) comment '产品名称',
      product_price decimal(15,2) not null comment '产品单价')
create table test_orders(
 order_id bigint not null COMMENT '订单ID',
 user_id bigint not null comment '用户ID',
 product_id bigint not null comment '产品ID',
 total_price decimal(15,2) not null COMMENT '订单总额',
 order_date TIMESTAMP  not null
 COMMENT '订单日期',
 product_id_2 bigint not null comment '产品ID',
 product_name varchar(20) comment '产品名称',
 product_price decimal(15,2) not null comment '产品单价',
 PRIMARY KEY (order_id))

步骤一:配置源库信息

  1. 进入ETL任务的列表页面
    说明 您也可以通过如下步骤,在DMS数据管理服务的控制台上配置ETL任务。
    • 登录DMS数据管理服务
    • 在顶部菜单栏中,单击传输与加工(DTS) > 数据加工
    • 单击流式加工页签。
    • 单击左上角的新增数据流,在新增数据流对话框中,您需在数据流名称配置ETL任务名称,选择加工方式流式加工
    • 单击确认
  2. 在页面左上角,选择ETL任务所属地域。
    说明 当前仅支持在华东1(杭州)、华北2(北京)和华北3(张家口)配置ETL任务,请您根据需求选择其中一种。
  3. 在页面左上角,单击创建任务
  4. 创建ETL任务页面,根据页面信息完成ETL任务配置。
  5. 执行如下操作,配置流表和维表信息。
    1. 配置流表信息
      1. 在页面左侧的输入/维表区域选择MySQL,并将其拖拽至页面右侧画布的空白区域。
      2. 单击画布中的MySQL,在输入/维表 MySQL(流表)页面,配置流表信息。
      3. 设置以下参数,配置节点信息。流表_节点配置
        参数 说明
        请输入数据源名称 DTS会自动生成一个数据源名称,建议配置具有业务意义的名称(无唯一性要求),便于后续识别。
        区域 选择源库所在地域。
        数据库连接模版 选择连接源表所属数据库实例的模板名称。您也可以单击右侧的新建连接模板进行新建,新建方法,请参见新建连接模板
        节点类型 选择源表的类型为 流表维表

        本案例中选择为流表
        转换格式 ETL在处理数据时会将流转换为动态表,在该动态表上进行持续查询(即动态表会被INSERT、UPDATE、DELETE操作持续更改),产生一个新的动态表。最终写入目标库时,再将新的动态表会转化为流。当新的动态表转化为流时,您需要指定转化格式,对动态表前后更改信息进行编码:
        • Upsert流:动态表中的数据支持通过INSERT、UPDATE和DELETE操作修改,当转换为流时,会将INSERT和UPDATE操作编码为upsert message,将DELETE操作编码为delete message。
          说明 该编码方式要求动态表具有唯一键(可能是复合的)。
        • Append-Only流: 动态表中的数据仅支持INSERT操作修改,当转换为流时仅需发送INSERT的数据。
        库表选择 选择源表中需转换的库表。
      4. 节点配置完成后,页面会自动切换至输出字段页签,您可根据业务需要,在页签的列名称列勾选需要的字段。
      5. 本案例中选择为流表,需要单击时间属性页签,并设置对应参数。流表_时间属性
        参数 说明
        选择事件时间Watermark字段 选择流表中的一个时间字段。一般流表会定义时间字段,代表数据产生的时间,通常为具有业务含义的时间戳(比如ordertime)。
        事件时间Watermark延迟时间 输入数据延迟的最大容忍时间。

        应用场景是,由于数据并不一定按照实际产生顺序,达到ETL等待处理,可能会出现延迟情况。如果数据一直延迟未到,ETL不能无限制地等待延迟的数据,因此需要建立延迟时间来处理乱序数据。比如10:00的数据已到达,但是9:59的数据还未到达,则ETL只会等待至“10:00+延迟时间”。如果9:59的数据在“10:00+延迟时间”后到达,则ETL会抛弃该数据。

        处理时间ProcTime 处理时间为ETL处理数据时的本地时间。您需要自定义一个列名,ETL会在该列保存数据处理的本地时间。处理时间主要用于算子运算,如时态JOIN会用该处理时间去关联普通表的最新版本。
      说明 MySQL流表中显示配置成功,则表示流表信息配置完成。
    2. 配置维表信息
      1. 在页面左侧的输入/维表区域选择MySQL,并将其拖拽至页面右侧的空白区域。
      2. 单击画布中的MySQL,在输入/维表 MySQL(维表)页面,配置维表信息。
      3. 设置以下参数,配置节点信息。维表_节点配置
        参数 说明
        请输入数据源名称 DTS会自动生成一个数据源名称,建议配置具有业务意义的名称(无唯一性要求),便于后续识别。
        区域 选择源库所在地域。
        数据库连接模版 选择连接源表所属数据库实例的模板名称。您也可以单击右侧的新建连接模板进行新建,新建方法,请参见新建连接模板
        节点类型 选择源表的类型为 流表 还是 维表

        本案例中选择为维表
        库表选择 选择源表中需转换的库表。
      4. 节点配置完成后,页面会自动切换至输出字段页签,您可根据业务需要,在页签的列名称列勾选需要的字段。
      说明 MySQL维表中显示配置成功,则表示维表信息配置完成。

步骤二:配置表JOIN

  1. 在页面左侧的转换区域选择表JOIN,并将其拖拽至页面右侧的空白区域。
  2. 将鼠标指针移动至已完成配置的流表和维表上,单击圆点拉出连接线,分别将流表和维表与表JOIN连接起来。
    连接源表与join
  3. 单击画布中的表JOIN,在转换 表JOIN页面,根据页面信息配置转换组件。
    1. 节点配置页签,设置以下参数,配置节点信息。
      join的节点配置
      区域 参数 说明
      转换名称 请输入转换名称 DTS会自动生成一个转换组件名称,建议配置具有业务意义的名称(无唯一性要求),便于后续识别。
      Join配置 Join类型符左边的表 选择放置在JOIN类型符左边的表,作为主表。本案例中选择为流表。
      时态Join时间属性(不选择为普通Join) 选择使用时态JOIN时,流表关联时态表的时间属性。如不输入,则默认使用普通JOIN。本案例中选择为基于处理时间ProcTime
      说明
      • 时态表,也称动态表,是指基于表的(参数化)视图概念,根据时间记录数据变更历史,分为版本表(可显示数据的历史版本)和普通表(仅显示数据的最新版本)。
      • 时态JOIN要求流表定义时间属性,右表要有主键;如右表是维表,则您所设置的Join的条件需包含维表的主键。
      • 基于事件时间Watermark:使用流表的事件时间去关联版本表对应的版本。
      • 基于处理时间ProcTime:使用流表的处理时间去关联普通表的最新版本。
      选择Join操作 选择Join操作方式。本案例中选择为Inner Join
      • Inner Join:数据为两张表的交集。
      • Left Join:在左表中获取所有数据,在右表中获取两张表的交集。
      • Right Join:在左表中获取两张表的交集,在右表中获取所有数据。
      Join条件 +新增条件 单击+新增条件,选择JOIN的条件字段。
      说明 等号(=)左侧为JOIN后新表的左表字段,右侧为JOIN后新表的右表字段。
  4. 完成JOIN条件配置后,单击输出字段页签。根据实际需要,在列名称列勾选需要的字段。
说明表join中显示配置成功,则表示表JOIN组件配置完成。

步骤三:配置表记录过滤

  1. 在页面左侧的转换区域选择表记录过滤,并将其拖拽至页面右侧的空白区域。
  2. 将鼠标指针移动至已完成配置的表JOIN上,单击圆点拉出连接线,连接表JOIN和表记录过滤
  3. 单击画布中的表记录过滤,在转换 表记录过滤页面,根据页面信息配置转换组件。
    连接join和过滤组件
    1. 转换名称区域,输入转换名称。
      说明 DTS会自动生成一个转换组件名称,建议配置具有业务意义的名称(无唯一性要求),便于后续识别。
    2. Where条件区域,您可以选择以下任意一种方法配置WHERE条件。
      • 直接输入需要的WHERE条件,比如输入total_price>3000.00,表示过滤JOIN后表中total_price字段的值大于3000.00的数据。
      • 单击字段输入操作符区域中的选项配置WHERE条件。
说明表记录过滤中显示配置成功,则表示表记录过滤组件配置完成。

步骤四:配置目标库信息

  1. 在页面左侧的输出区域选择MySQL,并将其拖拽至页面右侧空白区域。
  2. 将鼠标指针移动至已完成配置的表记录过滤上,单击圆点拉出连接线,连接表记录过滤和MySQL
  3. 输出 MySQL页面,根据页面信息配置目标库信息。
    1. 设置以下参数,配置节点信息。
      目标库信息
      参数 说明
      请输入数据源名称 DTS会自动生成一个数据源名称,建议配置具有业务意义的名称(无唯一性要求),便于后续识别。
      区域 选择目标库所在地域。
      数据库连接模版 选择连接源表所属数据库实例的模板名称。您也可以单击右侧的新建连接模板进行新建,新建方法,请参见新建连接模板
      表映射 选择目标库通过转换处理后需要存储的目标表。

      选择目标表区域,单击目标表。

  4. 根据业务需要,在列名称列勾选需要的参数。
说明目标MySQL中显示配置成功,则表示目标库配置完成。

步骤五:预检查并启动任务

  1. 配置完成后,单击生成Flink SQL校验,ETL将生成Flink SQL并进行校验。
  2. 检验完成后,您可单击查看ETL校验详情,在弹跳框中,查看Flink SQL生成结果和SQL语句。确认无误后,单击关闭
    说明 如校验失败,您可以根据生成结果显示的失败原因进行修复。
  3. 单击下一步保存任务并预检查。当预检查通过后,DTS才能开始ETL任务。如果预检查失败,请单击检查失败项后的查看详情,根据提示信息修复后,重新进行预检查。
  4. 预检查完成后,单击页面下方的下一步购买
  5. 购买页面,选择链路规格计算资源,阅读并勾选数据传输(按量付费)服务条款公测协议条款
    说明 公测期间,每个用户可以免费创建并使用两个ETL实例。
  6. 单击购买并启动,ETL任务正式开始。

任务运行结果

本案例中,ETL任务的启动后(以8月1日为例),如实时交易数据表test_orders中更新的数据满足过滤条件(total_priceid>3000.00,即总交易额大于3000.00),则该数据会同步至目标表test_orders_new中。

图 1. 实时交易数据表test_orders
目标表test_orders_new
图 2. 目标表test_orders_new
业务数据表test_orders

后续步骤

如您需实现目标表中的数据可视化,推荐使用数据管理DMS的数据可视化等工具 ,支撑分析趋势、增长对比等分析场景。更多信息,请参见DMS的数据可视化工具