本文为您介绍Hudi结果表的背景信息、使用限制、 DDL定义、WITH参数和示例。

背景信息

  • 什么是Hudi
    Hudi的定义、特性及典型场景详情如下表所示。
    类别详情
    定义Apache Hudi是一种开源的数据湖表格式框架。Hudi基于对象存储或者HDFS组织文件布局,保证ACID,支持行级别的高效更新和删除,从而降低数据ETL开发门槛。同时该框架还支持自动管理及合并小文件,保持指定的文件大小,从而在处理数据插入和更新时,不会创建过多的小文件,引发查询端性能降低,避免手动监控和重写小文件的运维负担。结合Flink、Presto、Spark等计算引擎进行数据入湖和计算分析,常用来支持DB入湖加速、增量数据实时消费和数仓回填等需求。详情请参见Apache Hudi
    特性
    • 支持ACID:支持ACID语义,提供事务的线性隔离级别。
    • 支持UPSERT语义:UPSERT语义即就是INSERT和UPDATE两种语义的合并。在UPSERT语义时,如果记录不存在则插入;如果记录存在则更新。通过INSERT INTO语法可以大幅简化开发代码的复杂度,提升效率。
    • 支持Data Version:通过时间旅行(Time Travel)特性,提供任意时间点的数据版本历史,便于数据运维,提升数据质量。
    • 支持Schema Evolution:支持动态增加列,类型变更等Schema操作。
    典型场景
    • DB入湖加速

      相比昂贵且低效的传统批量加载和Merge,Hudi提供超大数据集的实时流式更新写入。通过实时的ETL,您可以直接将CDC(change data capture)数据写入数据湖,供下游业务使用。典型案例为采用Flink MySQL CDC Connector将RDBMS(MySQL)的Binlog写入Hudi表。

    • 增量ETL

      通过增量拉取的方式获取Hudi中的变更数据流,相对离线ETL调度,实时性更好且更轻量。典型场景是增量拉取在线服务数据到离线存储中,通过Flink引擎写入Hudi表,借助Presto或Spark引擎实现高效的OLAP分析。

    • 消息队列

      在小体量的数据场景下,Hudi也可以作为消息队列替代Kafka,简化应用开发架构。

    • 数仓回填(backfill)

      针对历史全量数据进行部分行、列的更新场景,通过数据湖极大减少计算资源消耗,提升了端到端的性能。典型案例是Hive场景下全量和增量的打宽。

  • 全托管Flink集成Hudi功能优势
    相比开源社区Hudi,全托管Flink平台集成Hudi具有的功能优势详情如下表所示。
    功能优势详情
    平台侧与Flink全托管集成,免运维Flink全托管内置Hudi Connector,降低运维复杂度,提供SLA保障。
    完善的数据连通性对接多个阿里云大数据计算分析引擎,数据与计算引擎解耦,可以在Flink、Spark、Presto或Hive间无缝流转。
    深度打磨DB入湖场景与Flink CDC Connector联动,降低开发门槛。
    提供企业级特性包括集成DLF统一元数据视图、自动且轻量化的表结构变更。
    内置阿里云OSS存储,低成本存储,弹性扩展数据以开放的Parquet、Avro格式存储在阿里云OSS,存储计算分离,资源灵活弹性扩展。
  • CDC数据同步导入示意图
    CDC数据保存了完整的数据库变更,您可以通过以下任意一种方式将数据导入Hudi:
    • 对接CDC Format,消费Kafka数据的同时导入Hudi。

      支持debezium-json、canal-json和maxwell-json三种格式,该方式优点是可扩展性强,缺点是需要依赖Kafka和Debezium数据同步工具。

    • 通过Flink-CDC-Connector直接对接DB的Binlog,将数据导入Hudi。

      该方式优点是轻量化组件依赖少。

    说明
    • 如果无法保证上游数据顺序,则需要指定write.precombine.field字段。
    • 在CDC场景下,需要开启changelog模式,即changelog.enabled设为true。

使用限制

  • 仅Flink计算引擎vvr-4.0.11-flink-1.13及以上版本支持Hudi Connector。
  • 文件系统仅支持HDFS或阿里云OSS服务。
  • 不支持以Session模式提交作业。

DDL定义

CREATE TEMPORARY TABLE hudi_sink (
  uuid BIGINT,
  data STRING,
  ts   TIMESTAMP(3)
) WITH (
  'connector' = 'hudi',
  'table.type' = 'COPY_ON_WRITE',
  'path' = 'oss://<yourOSSBucket>/<自定义存储位置>',
  'oss.endpoint' = '<yourOSSEndpoint>',
  'accessKeyId' = '<yourAccessKeyId>',
  'accessKeySecret' = '<yourAccessKeySecret>' ,
  'hive_sync.enable'='true',
  'hive_sync.db'='<db name>',
  'hive_sync.table' = '<table name>',
  'hive_sync.mode' = 'hms',
  'dlf.catalog.region' = 'cn-hangzhou',
  'dlf.catalog.endpoint' = 'dlf-vpc.cn-hangzhou.aliyuncs.com'
);

WITH参数

  • 基础参数
    参数说明是否必选备注
    connector结果表类型。固定值为hudi
    table.type表类型。参数取值如下:
    • COPY_ON_WRITE:使用Parquet列式存储,每次更新数据,创建一个新的base文件。
    • MERGE_ON_READ:使用Parquet列式和Avro行式存储,更新操作将会被写入delta日志文件,异步合并Parquet列式文件生成新版本文件。
    path表存储路径。支持阿里云OSS和HDFS两种路径。例如oss://<bucket name>/tablehdfs://<ip>:<port>/table
    oss.endpoint阿里云对象存储服务OSS的Endpoint。如果使用OSS作为存储,则必需填写。参数取值请详情参见访问域名和数据中心
    accessKeyId阿里云账号的AccessKey ID。如果使用OSS作为存储,则必需填写。获取方法请参见获取AccessKey
    accessKeySecret阿里云账号的AccessKey Secret。如果使用OSS作为存储,则必需填写。获取方法请参见获取AccessKey
    hive_sync.enable是否开启同步元数据到Hive功能。参数取值如下:
    • true:开启。
    • false:不开启。
    hive_sync.modeHive数据同步模式。参数取值如下:
    • hms(推荐值):采用Hive Metastore或者DLF Catalog时,需要设置为hms。
    • jdbc(默认值):采用jdbc Catalog时,需要设置为jdbc。
    hive_sync.db同步到Hive的数据库名称。无。
    hive_sync.table同步到Hive的表名称。无。
    dlf.catalog.regionDLF服务的地域名。
    说明
    • 仅当hive_sync.mode设置为hms时,dlf.catalog.region参数设置才生效。
    • 请和dlf.catalog.endpoint选择的地域保持一致。
    dlf.catalog.endpointDLF服务的Endpoint。
    说明
    • 仅当hive_sync.mode设置为hms时,dlf.catalog.endpoint参数设置才生效。
    • 推荐您为dlf.catalog.endpoint参数配置DLF的VPC Endpoint。例如,如果您选择的地域为cn-hangzhou地域,则dlf.catalog.endpoint参数需要配置为dlf-vpc.cn-hangzhou.aliyuncs.com。
    • 如果您需要跨VPC访问DLF,则请参见如何访问跨VPC的其他服务?
    write.operation写入操作模式。参数取值如下:
    • insert模式:数据追加写。
    • upsert模式(默认值):数据更新。
    • bulk_insert模式:批量写入。
    write.precombine.field版本字段,基于此字段的大小来判断消息是否进行更新。默认为ts字段,如果没有ts字段则为处理顺序。
    index.type索引类型。参数取值如下:
    • FLINK_STATE(默认): 使用Flink的状态存储后端作index存储,使用后作业支持跨partition更新,支持bucket动态扩容。
    • BUCKET: 使用hash算法路由消息到bucekt,使用后Flink 不支持跨partition更新,只支持固定的bucket数。

      Flink支持通过hoodie.bucket.index.hash.field参数指定hash字段,默认是主键字段,也可以指定主键的子集。

    说明
    • 在数据量每秒超过2万QPS时,建议您该参数的取值选择为BUCKET。
    • Flink支持通过hoodie.bucket.index.num.bucket参数指定bucket数,默认值为4。
  • 高阶参数
    • 内存参数
      说明
      • 所有的内存参数单位都是MB。
      • 影响内存的三个因素包括TaskManager的数量和内存配置、write task的并发和每个write task能够分配到的内存。因此建议您确认每个write task能够分配到的内存,再考虑相关的内存参数设置。
      参数说明默认值备注
      write.task.max.size一个write task的最大可用内存。1024预留给每个write task的内存buffer大小为write.task.max.size参数值减去compaction.max_memory参数值的差值。

      当write task的内存buffer达到阈值后,会将内存的数据落盘。

      您需要关注TM分配给每个write task的内存,保证每个write task能够分配到 write.task.max.size所配置的大小。例如TM的内存是4 GB,运行了2个StreamWriteFunction,那每个write function能分配到2 GB。此时你需要预留一些buffer,因为网络buffer,TM上其他类型task,例如BucketAssignFunctio也会消耗一些内存。

      compaction.max_memory合并文件时的最大可用内存。100如果是在线合并,资源充足时可以调大该参数,例如调为1 GB。
      您需要关注compaction内存的变化。因为compaction.max_memory控制了每个compaction task读log时可以利用的内存大小。在内存资源充足时,有以下建议:
      • 如果是MOR表,可以将compaction.max_memory参数值调大些。
      • 如果是COW表,可以将write.task.max.size和write.merge.max_memory参数值同时调大。
      write.merge.max_memoryCOW写操作,会有增量文件和全量文件数据合并的过程,增量的数据会缓存在内存中,该参数控制使用的堆内存大小。100通常您不需要设置该参数,保持默认值即可。
    • 并发参数
      参数说明默认值备注
      write.tasks写任务(write)的并发读。每个write顺序写1~N个buckets。4增加并发不影响小文件个数。
      write.bucket_assign.tasksbucket assigner的并发。1增加该参数值会导致bucket数量增加,即增加小文件数。
      write.index_bootstrap.tasksIndex bootstrap算子的并发。增加并发可以加快bootstrap阶段的效率,bootstrap阶段会阻塞checkpoint,因此需要设置多一些的checkpoint失败容忍次数。不显式指定该参数时,默认使用Flink算子的并行度。仅在index.bootstrap.enabled为true时生效。
    • 在线合并参数
      参数说明默认值备注
      compaction.tasks在线合并算子的并发。4在线合并会消耗计算资源。
      compaction.trigger.strategy合并策略。num_commits支持以下四种策略:
      • num_commits:根据commits数量合并。
      • time_elapsed:根据时间合并。
      • num_and_time:根据数据量和时间合并。
      • num_or_time:根据数量或时间合并。
      compaction.delta_commitsdelta commits文件的个数。5取值范围是整数,建议不超过20个。默认值的含义为5个delta文件触发一次合并。
      compaction.delta_seconds在线合并的时间间隔。3600单位为秒。
      compaction.target_io每个压缩任务的IO上限。500(GB)无。
  • Changelog模式参数

    Hudi支持保留消息的所有变更,对接Flink引擎的后,实现全链路近实时数仓生产。Hudi的MOR表以行存格式保留消息的所有变更,通过流读MOR表可以消费到所有的变更记录。此时,您需要开启Changelog模式,changelog.enabled设置为true。

    开启changelog.enabled参数后,支持消费所有变更。异步的合并任务会将中间变更合并成1条。所以如果流读消费不够及时,被压缩后只能读到最后一条记录。当然,通过调整压缩的buffer时间可以预留一定的时间buffer给reader。例如调整合并的compaction.delta_commits和compaction.delta_seconds参数。
    参数说明是否必填备注
    changelog.enabled是否开启Changelog模式。
    参数取值如下:
    • true:开启Changelog模式。
    • false(默认值):关闭Changelog模式。关闭Changelog模式时,即支持UPSERT语义,所有的消息仅保证最后一条合并消息,中间的变更可能会被合并。
    说明 流读取会展示每次变更,批读只会展示合并后的变更结构。
  • 批量导入参数
    如果存量数据来源于其他数据源,则可以使用批量导入功能,快速将存量数据转换为Hudi表格式。需要注意以下几点:
    • 批量导入省去avro的序列化以及数据的合并过程,后续不会再有去重操作,数据的唯一性需要您自己来保证。
    • write.operation参数在Batch Execuiton Mode下执行更高效,Batch模式默认会按照分区排序输入消息再写入Hudi,避免不同文件频繁切换处理导致性能下降。
    • 通过write.tasks参数指定bulk_insert write task的并发,并发的数量会影响到小文件的数量。
    其中涉及的WITH参数如下表所示。
    参数说明是否必填备注
    write.tasks写任务(write)的并发读。每个write顺序写1~N个buckets。默认值为4。
    write.bulk_insert.shuffle_by_partition是否将数据按照partition字段打散,再通过write task写入。默认值为true。
    警告 开启该参数将减少小文件的数量,但可能存在数据倾斜的风险。
    write.bulk_insert.sort_by_partition是否将数据按照partition字段排序再写入。默认值为true。当一个Write Task写多个partition,开启该参数可以减少小文件数量。
    write.sort.memorysort算子的可用管理内存。默认值为128,单位为MB。
  • 全量接增量参数
    如果您已经有全量的离线Hudi表,需要导入离线数据后,再把新增数据写入表中,并且保证数据不重复,则可以把index.bootstrap.enabled参数设置为true,开启全量接增量功能。
    说明 如果发现耗时太长,则可以在写入全量数据时调大资源。全量数据写完后再写新数据时,可以将资源调小或者开启写入限流参数。
    参数说明是否必填备注
    index.bootstrap.enabled是否开启全量接增量功能。开启后存量表的索引数据一次性被加载到state中。参数取值如下:
    • true:开启。
    • false(默认值):关闭。
    index.partition.regex设置正则表达式进行分区筛选。默认加载全部分区。
  • 写入限流参数
    写入量吞吐高,Partition随机引发的严重乱序,容易导致写入性能退化,出现吞吐毛刺的问题。此时您可以开启限流功能,保证写入流量平稳。例如,在同步百亿数量级全量和增量数据到Kafka后,再通过Flink流式消费的方式将库表数据直接导成Hudi格式的库表,这会导致Kafka中存在大量的存量和增量数据,下游读数据的压力很大,所以此时可以使用该参数进行限流。
    参数说明是否必填备注
    write.rate.limit每秒处理数据最大条数。默认值为0(条/s),代表不作限制。
  • Append模式参数
    参数说明是否必填备注
    write.insert.cluster是否在写入时合并小文件。参数取值如下:
    • true:写入时合并小文件。
      重要 开启该参数后,每次写入会优先合并之前的小文件,不会去重,吞吐会受影响。
    • false(默认值):写入时不合并小文件。
    说明 COW表的INSERT模式默认为Append,在此模式下,文件不会有合并操作,但您可以启动小文件合并功能后进行合并。

示例

  • 示例一:写Hudi结果表
    本示例为您介绍如何通过MySQL CDC流式读取数据然后写入Hudi表。
    1. 创建OSS Bucket。

      详情请参见创建存储空间

    2. 作业开发页面,在目标作业文本编辑区域,编写SQL设计流作业。
      CREATE TEMPORARY TABLE datagen(
        uuid    BIGINT,
        data  STRING,
        ts TIMESTAMP(3)
      ) WITH (
        'connector' = 'datagen'
      );
      
      CREATE TEMPORARY TABLE hudi_sink (
        uuid BIGINT,
        data STRING,
        ts TIMESTAMP(3)
      ) WITH (
        'connector' = 'hudi',           
        'oss.endpoint' = '<yourOSSEndpoint>',                     
        'accessKeyId' = '<yourAccessKeyId>',                    
        'accessKeySecret' = '<yourAccessKeySecret>',                    
        'path' = 'oss://<yourOSSBucket>/<自定义存储位置>', 
        'table.type' = 'COPY_ON_WRITE'                           
      );
      
      INSERT INTO hudi_sink SELECT * from datagen;
    3. 在作业开发页面右侧高级配置面版中,引擎版本配置为vvr-4.0.15-flink-1.13。引擎版本
    4. 单击验证
    5. 单击上线
    6. 作业运维页面,单击目标作业名称操作列的启动
    7. 在OSS控制台查看写入的测试数据。

      等第一次Checkpoint完成之后,您将能看到写入的测试数据了。

  • 示例二:MySQL CDC数据入湖
    本示例为您介绍如何通过MySQL的CDC源表读取数据然后写入Hudi表。
    1. 创建OSS Bucket。

      详情请参见创建存储空间

    2. 作业开发页面,在目标作业文本编辑区域,编写SQL设计流作业。
      CREATE TEMPORARY TABLE mysql_src (
        id BIGINT,
        name STRING,
        PRIMARY KEY(id) NOT ENFORCED
      ) WITH (
        'connector' = 'mysql-cdc',
        'hostname' = '<yourRDSHostName>',
        'port' = '3306',
        'username' = '<yourRDSUserName>',
        'password' = '<yourRDSPassword>',
        'database-name' = 'user_db.*', -- 正则匹配多个分库。
        'table-name' = 'user.*'   -- 正则匹配多张分表。
      );
      
      CREATE TEMPORARY TABLE hudi_sink (
        id BIGINT PRIMARY KEY NOT ENFORCED,
        name STRING
      ) WITH (
        'connector' = 'hudi',
        'oss.endpoint' = '<yourOSSEndpoint>',
        'accessKeyId' = '<yourAccessKeyId>',
        'accessKeySecret' = '<yourAccessKeySecret>',
        'path' = 'oss://<yourOSSBucekt>/<Path to Table>/',
        'table.type' = 'MERGE_ON_READ'
      );
      
      INSERT INTO hudi_sink SELECT * FROM mysql_src;
    3. 在作业开发页面右侧高级配置面版中,引擎版本配置为vvr-4.0.15-flink-1.13
    4. 单击验证
    5. 单击上线
    6. 作业运维页面,单击目标作业名称操作列的启动
      作业上线后,您可以在作业总览页面,查看作业的Vertex图,了解作业运行过程。上线成功
    7. 在OSS控制台查看写入的测试数据。

      等第一次Checkpoint完成之后,您将能看到写入的测试数据了。