您可以手动调整实时计算Flink版的作业参数、资源参数和上下游参数,提升实时计算Flink版作业的性能。

手动配置调优概述

手动配置调优的内容主要有3种类型:
  • 上下游参数调优:对作业中上下游存储的参数进行调优。
  • 作业参数调优:对作业中miniBatch等参数进行调优。
  • 资源参数调优:对作业中Operator的并发数(Parallelism)、CPU(Core)和堆内存(Heap Memory)等参数进行调优。

下面对以上3类调优进行介绍。参数调优后将生成新的配置,作业需要先停止再启动或者先暂停再恢复后才能使用新的配置,启动新配置的方法请参见重新启用新的配置

上下游参数调优

实时计算Flink版每条数据均会触发上下游存储的读写,因此会对上下游存储形成存储压力。设置batchsize,批量读写上下游存储数据可以降低上下游存储的压力。支持batchsize参数的上下游存储如下表所示。
上下游 参数 说明 设置参数值
DataHub源表 batchReadSize 单次读取的条数 可选,默认值为10。
DataHub结果表 batchSize 单次写入的条数 可选,默认值为300。
日志服务LOG源表 batchGetSize 单次读取logGroup的条数 可选,默认值为100。
分析型数据库MySQL版2.0结果表 batchSize 一次批量写入的条数 可选,默认值为1000。
云数据库RDS版结果表 batchSize 一次批量写入的条数 可选,默认值为4096。
说明 DDL的WITH参数列中增加batchSize相关参数,即可完成数据的批量读写设置。例如,batchReadSize='<number>'

作业参数调优

miniBatch设置仅适用于优化GROUP BY。Flink SQL流模式下,每来一条数据都会执行State操作,I/O消耗较大。设置miniBatch后,同一个Key的一批数据只访问一次State,且只输出最新的一条数据,既减少了State的访问,也减少了下游的数据更新。miniBatch设置说明如下:
  • 新增加作业参数后,建议您停止作业,再启动作业。
  • 更新作业参数值后,建议您暂停作业,再恢复作业。
# 3.2及以上版本开启window miniBatch方法(3.2及以上版本默认不开启window miniBatch)。
sql.exec.mini-batch.window.enabled=true
# excatly-once语义。
blink.checkpoint.mode=EXACTLY_ONCE
# checkpoint间隔时间,单位毫秒。
blink.checkpoint.interval.ms=180000
blink.checkpoint.timeout.ms=600000
# 实时计算Flink版2.0及以上版本使用niagara作为statebackend,以及设定state数据生命周期,单位毫秒。
state.backend.type=niagara
state.backend.niagara.ttl.ms=129600000
# 实时计算Flink版2.0及以上版本开启5秒的microbatch(窗口函数不需要设置该参数)。
blink.microBatch.allowLatencyMs=5000
# 表示整个Job允许的延迟。
blink.miniBatch.allowLatencyMs=5000
# 双流join节点优化参数。
blink.miniBatch.join.enabled=true
# 单个Batch的size。
blink.miniBatch.size=20000
# local优化,实时计算Flink版2.0及以上版本默认已经开启,1.6.4版本需要手动开启。
blink.localAgg.enabled=true
# 实时计算Flink版2.0及以上版本开启partial优化,解决count distinct效率低问题。
blink.partialAgg.enabled=true
# union all优化。
blink.forbid.unionall.as.breakpoint.in.subsection.optimization=true
# GC优化(源表为SLS时,不能设置该参数)。
blink.job.option=-yD heartbeat.timeout=180000 -yD env.java.opts='-verbose:gc -XX:NewRatio=3 -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:ParallelGCThreads=4'
# 时区设置。
blink.job.timeZone=Asia/Shanghai

资源调优

下文通过示例为您介绍资源调优方法:
  1. 分析过程
    1. 通过Job的拓扑图查看到2号的Task节点的输入队列已达到100%,造成上游1号节点的数据堆积,输出队列造成数据堆积。
    2. 单击目标Task节点(本示例为2号Task节点)。
    3. SubTask List,查看In Queue100%的行。
    4. 单击目标行左侧的LOG0
    5. Metrics Graph页签,查看CPU和MEM的使用情况。
  2. 性能调优
    1. 单击作业编辑页面右侧的资源配置,进入调优窗口。
    2. 对目标Operator或者Group进行参数修改。
      • 单个Operator参数修改
        1. GROUP框内,单击右上角的加号(+)。
        2. 鼠标悬停至目标Operator框内。
        3. 单击目标Operator右侧的铅笔图标图标。
        4. 修改Operator数据页面,修改参数。
      • GROUP批量参数修改
        1. 鼠标悬停至GROUP框内。
        2. 单击GROUP右侧的铅笔图标图标。
        3. 批量修改Operator数据页面,修改参数。
    3. 完成配置参数修改后,单击资源配置右上角的配置信息操作 > 应用当前配置
      • 如果增加了Group的资源配置后,作业的运行效率提升不明显,需要按照以下顺序分析问题:
        1. 该节点是否存在数据倾斜现象。
        2. 复杂运算的Operator节点(例如,GROUP BY、WINDOW和JOIN等)的子节点是否存在异常。
      • Operator子节点拆解方法:
        1. 单击目标Operator。
        2. 修改chainingStrategy参数值为HEAD
          如果chainingStrategy已设置为HEAD,需要修改后一个Operator的chainingStrategy参数值为HEADchainingStrategy参数说明如下。
          参数 说明
          ALWAYS 算子会尽可能地链接在一起。为了优化性能,通常需要让算子尽可能链接在一起,同时增加并发度。
          NEVER 算子不会和上下游的算子链接在一起。
          HEAD 算子不会和上游的算子链接在一起,但是会和下游的算子链接在一起。
  3. 资源参数的配置原则和建议
    • 作业的资源配置建议为:core:heap_memory=1:4,即1个核对应4 GB内存。例如:
      • core参数值为1CU,heap memory参数值为3 GB,则最终资源分配结果的是1CU+4G
      • core参数值为1CU,heap memory参数值为5 GB,则最终资源分配结果的是1.25CU+5G
      说明
      • Operator的总core=parallelism*core
      • Operator的总heap_memory=parallelism*heap_memory
      • Group中的core取各个Operator中最大值,memory取各个Operator中memory之和。
    • parallelism
      • Source节点
        Source的个数和上游Partition数量有关。例如,Source的个数是16,Source的并发数可以为16、8或4等,不得超过16。
        说明 Source的并发数不能大于Source的Shard数。
      • 中间节点
        根据预估的QPS计算:
        • QPS低的任务,中间处理节点数和Source并发数一样。
        • QPS高的任务,中间处理节点数配置为比Source并发数大,例如,64、128或256。
      • Sink节点
        并发度和下游存储的Partition数有关,一般是下游Partition个数的2~3倍。
        说明 如果配置过大会导致输入超时或失败。例如,下游Sink节点个数是16,建议Sink的并发数最大为48。
    • core

      默认值为0.1,根据实际CPU使用配置,建议配置为0.25。

    • heap_memory

      堆内存,默认为值为256(单位为MB),请根据实际的内存使用状况进行配置。

    • state_size
      存在GROUP BY、JOIN、OVER和WINDOW的Task节点中需要配置参数state_size1,表示该Operator会使用state功能,作业会为该Operator申请额外的内存。state_size默认值为0。
      说明 如果state_size参数值不设置为1,则作业可能运行失败。

重新启用新的配置

完成配置后,建议您采用暂停再恢复作业的方式使新配置生效,而不是停止再启动作业的方式使新配置生效。因为停止作业后,作业状态会被清除,从而可能导致计算结果不一致。
说明
  • 暂停再恢复:仅更改资源配置、调整WITH参数大小或调整作业参数大小。
  • 停止再启动:需要更改SQL逻辑、更改作业版本、增加WITH参数或增加作业参数。

完成作业重启或恢复后,可以在运维页面,运行信息页签下的Vertex拓扑中查看新的配置是否生效。

  • 暂停再恢复步骤如下:
    1. 上线作业。上线步骤参见上线,详情请参见上线配置方式选择使用上次资源配置(手动资源配置)
    2. 运维页面,单击目标作业操作列下的暂停
    3. 运维页面,单击目标作业操作列下的恢复
    4. 恢复作业运行,单击按最新配置恢复resume
  • 停止再启动步骤如下:
    1. 停止作业。
      1. 登录实时计算控制台
      2. 单击页面顶部的运维
      3. 运维,单击目标作业操作列下的停止
    2. 启动作业。
      1. 登录实时计算控制台
      2. 单击页面顶部的运维
      3. 运维,单击目标作业操作列下的启动
      4. 启动作业页面,单击指定数据读取数据时间(即指定启动位点)文本框。Trigger
      5. 指定读取数据时间(启动位点),单击确定
        说明 启动位点表示从数据源表中读取数据的时间点:
        • 选择当前时间:表示从当前时间开始读取数据。
        • 选择历史时间:表示从历史时间点开始读取数据,通常用于回追历史数据。

相关参数说明

  • Global

    isChainingEnabled:是否启用Chain策略,默认为true。不需要修改。

  • Nodes
    参数 说明 能否修改
    id 节点ID号,自动生成,ID号唯一。 不能
    uid 节点UID号,用于计算Operator ID,如果不设置,则使用ID。 不能
    pact 节点类型,例如,Data Source、Operator或Data Sink等。 不能
    name 节点名称,可自定义。
    slotSharingGroup 请保持默认配置。 不能
    chainingStrategy ChainingStrategy用来定义算子链接的策略。当一个算子和上游算子链接在一起,表示它们会运行在同一个线程,合并为一个有多个运行步骤的算子。ChainingStrategy支持以下3种方式:
    • ALWAYS:算子会尽可能的链接在一起。为了优化性能,通常最佳实践是让算子尽可能链接在一起,同时增加并发度。
    • NEVER:算子不会和上下游的算子链接在一起。
    • HEAD:算子不会和上游的算子链接在一起,但是会和下游的算子链接在一起。
    parallelism 并发数,默认值为1,可以根据实际数据量增大并发数。
    core CPU,默认为0.1,可以根据实际CPU使用情况进行配置。建议设置为0.25。
    heap_memory 堆内存,默认为256(单位为MB),可以根据实际内存使情况进行配置。
    direct_memory JVM(Java虚拟机)堆外内存,默认0(单位为MB)。 能,但不建议您修改该参数。
    native_memory JVM(Java虚拟机)堆外内存,JNI(Java Native Interface)中使用,默认为0。如果需要,可以配置为10,主要提供给statebackend使用。 能,但不建议您修改该参数。
  • Chain
    Flink SQL任务是一个DAG图,由多个节点(Operator)组成,部分上下游的节点在运行时可以合成为一个节点,称为Chain操作。Chain操作后的节点总CPU为所有节点CPU的最大值,总内存为所有节点内存的总和。多节点合成一个节点可以有效地减少网络传输,降低成本。
    说明
    • 并发数相同的节点才能进行Chain操作。
    • Group By节点不能进行Chain操作。