全部产品
Search
文档中心

实时计算Flink版:分场景排错指引

更新时间:Jan 19, 2024

本文为您介绍实时计算Flink版分场景排错指引方面的常见问题,包括权限、运维和数据异常等问题。

不小心删除了角色或者变更了授权策略,导致Flink全托管服务不可用怎么办?

您可以按照以下步骤重新进行自动化授权。

  1. 确认您已删除了名称为AliyunStreamAsiDefaultRole的RAM角色。详情请参见删除RAM角色

    重要

    您需要将对应角色的所有权限策略解除授权后,此角色才能删除成功。

  2. 删除名称为FlinkServerlessStack和FlinkOnAckStack资源栈,详情请参见删除资源栈

    • FlinkServerlessStack:Flink全托管的ROS资源栈统一名称。

    • FlinkOnAckStack:容器服务ACK的ROS资源栈统一名称。

  3. 删除名称为AliyunStreamAsiDefaultRolePolicy的RAM权限策略。详情请参见删除自定义权限策略

  4. 在实时计算控制台,重新自动化授权,详情请参见开通流程

JobManager没有运行起来,如何快速定位问题?

JobManager没有运行起来即无法进入Flink UI页面。此时,您可以通过以下操作进行问题定位:

  1. 作业运维页面,单击目标作业名称。

  2. 单击运行事件页签。

  3. 通过快捷键搜索error,获取异常信息。

    • Windows系统:Ctrl+F

    • Mac系统:Command+F

    示例

Python作业,如果Checkpoint慢怎么办?

  • 问题原因

    Python算子内部有一定的缓存,在进行Checkpoint时,需要将缓存中的数据全部处理完。因此,如果Python UDF的性能较差,则会导致Checkpoint时间变长,从而影响作业执行。

  • 解决方案

    将缓存调小,您需要在其他配置中设置以下参数,具体操作请参见如何配置作业运行参数?

    python.fn-execution.bundle.size:默认值为100000,单位是条数。
    python.fn-execution.bundle.time:默认值为1000,单位是毫秒。

    参数的详细信息请参见Flink Python配置

报错:Invalid versionName string

  • 报错详情

    作业启动时报错Invalid versionName string。SQL作业不受影响,1.11.3及以下版本的Flink引擎JAR或PYTHON作业会受影响。

  • 报错原因

    创建Session模式的JAR或PYTHON作业时,没有设置Flink引擎版本。

  • 解决方案

    在作业开发页面,设置Flink引擎版本后,重新上线启动作业。

    对于SDK方式创建的Session作业,建议您升级依赖的SDK common包到1.0.21版本,并在创建Artifact时,设置Flink引擎版本。详细信息如下所示:

    • common包

      <dependency>
        <groupId>com.aliyun</groupId>
        <artifactId>ververica-common</artifactId>
        <version>1.0.21</version>
      </dependency>
    • Artifact设置

      com.ververica.common.model.deployment.Artifact.SqlScriptArtifact#setVersionName
      com.ververica.common.model.deployment.Artifact.JarArtifact#setVersionName
      重要

      versionName取值须和Session集群上的Flink引擎版本保持一致。

如何定位Flink无法读取源数据的问题?

当Flink无法读取源数据时,建议从以下几个方面进行排查并处理:

  • 检查上游存储和Flink全托管之间网络是否连通。

    Flink全托管仅支持访问相同地域、相同VPC下的存储。如果您有访问跨VPC存储资源或者通过公网访问Flink全托管的特殊需求,请查看以下文档:

  • 检查上游存储中是否已配置了白名单。

    上游存储中需要配的产品有Kafka和ES。您可以按照以下步骤配置白名单:

    1. 获取Flink全托管虚拟交换机的网段。

      获取方法请参见设置白名单

    2. 在上游存储中配置Flink全托管白名单。

      上游存储中配置白名单的方法,请参见对应DDL文档的前提条件中的文档链接,例如Kafka源表前提条件

  • 检查DDL中定义的字段类型、字段顺序和字段大小写是否和物理表一致。

    上游存储支持的字段类型和Flink全托管支持的字段类型可能不完全一致,但存在一定的映射关系。您需要按照DDL定义的字段类型映射关系一对一匹配,详情请参见对应DDL文档类型映射文档,例如日志服务SLS源表类型映射

  • 查看源表Taskmanager.log日志中是否有异常信息。

    如果有异常报错,请先按照报错提示处理问题。查看源表Taskmanager.log日志的操作如下:

    1. 在作业运维页面,单击目标作业名称。

    2. 作业总览页签,单击Source节点。

    3. SubTasks页签操作列,单击Open TaskManager Log PageTM日志

    4. logs页签,查看日志信息。

      在当前页面查找最后一个Caused by信息,即第一个Failover中的Cause by信息,往往是导致作业异常的根因,根据该根因的提示信息,可以快速定位作业异常的原因。

如何定位Flink无法将数据写入到结果表的问题?

当Flink无法将数据写入到结果表时,建议从以下几个方面进行排查并处理:

  • 确认下游存储和Flink全托管之间网络是否连通。

    Flink全托管仅支持访问相同地域、相同VPC下的存储。如果您有访问跨VPC存储资源或者通过公网访问Flink全托管的特殊需求,请查看以下文档:

  • 确认下游存储中是否已配置了白名单。

    下游存储中需要配置白名单的产品包括RDS MYSQL、Kafka、ES、云原生数据仓库AnalyticDB MySQL版3.0、Hbase、Redis和Clickhouse。您可以按照以下步骤配置白名单:

    1. 获取Flink全托管虚拟交换机的网段。

      获取方法请参见设置白名单

    2. 在下游存储中配置Flink全托管白名单。

      下游存储中配置白名单的方法,请参见对应DDL文档的前提条件中的文档链接,例如RDS Mysql结果表前提条件

  • 确认DDL中定义的字段类型、字段顺序和字段大小写是否和物理表一致。

    下游存储支持的字段类型和Flink全托管支持的字段类型可能不完全一致,但存在一定的映射关系。您需要按照DDL定义的字段类型映射关系一对一匹配,详情请参见对应DDL文档类型映射文档,例如日志服务SLS结果表类型映射

  • 确认数据是否被中间节点过滤了,例如WHERE、JOIN和窗口等。

    具体请查看Vertex拓扑图上每个计算节点数据输入和输出情况。例如WHERE节点输入为5,输出为0,则代表被WHERE节点过滤了,因此下游存储中无数据写入。

  • 确认下游存储中设置的输出条件相关参数的默认值是否合适。

    如果您的数据源的数据量较小,但结果表DDL定制中设置的输出条件的默认值较大,会导致一直达不到输出条件,而无法下发数据至下游存储。此时,您需要将输出条件相关参数的默认值改小。常见的下游存储中的输出条件参数情况如下表所示。

    输出条件

    参数

    涉及的下游存储

    一次批量写入的条数。

    batchSize

    • DataHub

    • Tablestore

    • MongoDB

    • Phoenix5

    • RDS MYSQL

    • 云原生数据仓库AnalyticDB MySQL版3.0

    • ClickHouse

    • InfluxDB

    每次批量写入数据的最大数据条数。

    batchCount

    DataHub

    Odps tunnel writer缓冲区Flush间隔。

    flushIntervalMs

    MaxCompute

    写入HBase前,内存中缓存的数据量(字节)大小。

    sink.buffer-flush.max-size

    Hbase

    写入HBase前,内存中缓存的数据条数。

    sink.buffer-flush.max-rows

    Hbase

    将缓存数据周期性写入到HBase的间隔,可以控制写入HBase的延迟。

    sink.buffer-flush.interval

    Hbase

    Hologres Sink节点数据攒批的最大值。

    jdbcWriteBatchSize

    Hologres

  • 确认窗口是否因为乱序而导致数据无法输出。

    假如,Flink全托管一开始就流入一条2100年的未来数据,它的Watermark为2100年,系统会默认2100年前的数据已被处理完,只会处理比2100年大的数据。而后续流入的2021年的正常数据因为Watermark小于2100年而被丢弃。直到出现大于2100年的数据流入Flink全托管,则会触发窗口关闭而输出数据,否则就会导致结果表一直没有数据输出。

    您可以通过Print Sink或者Log4j的方式确认数据源中是否存在乱序的数据,详情请参见print结果表配置作业日志输出。找到乱序数据后,您可以过滤或者采取延迟触发窗口计算的方式处理乱序的数据。

  • 确认是否因为个别并发没有数据而导致数据无法输出。

    如果作业为多并发,但个别并发没有数据流入Flink全托管,则它的Watermark就为1970年0点0分,而多个并发的Watermark取最小值,因此就永远没有满足窗口结束的Watermark,就不能触发窗口结束而输出数据。

    此时,您需要检查您上游的Vertex拓扑图的Subtask每个并发是不是都有数据流入。如果有个别并发无数据,建议调整作业并发数小于等于源表Shard数,从而保证所有并发都有数据。

  • 确认Kafka的某个分区是否无数据而导致数据无法输出。

    如果Kafka某个分区没有数据,则会影响Watermark的产生,从而导致Kafka源表数据基于Event Time的窗口后,不能输出数据。解决方案请参见为什么Kafka源表数据基于Event Time的窗口后,不能输出数据?

如何定位数据丢失的问题?

数据经过JOIN、WHERE或窗口等节点时,数据量减少是正常现象,这是因条件限制被过滤或JOIN不上。但如果您的数据丢失异常,建议从以下几个方面进行排查并处理:

  • 确认维表Cache缓存策略是否有问题。

    如果维表DDL中Cache缓存策略设置的有问题,则会导致维表的数据没有被拉取到,从而导致数据丢失。此时建议检查并修改作业Cache策略。作业Cache策略详情请参见各维表的Cache策略,例如HBase维表Cache参数

  • 确认函数使用方法是否不正确。

    如果您在作业中使用了to_timestamp_tz、date_format等函数,而函数的使用方法不对,导致数据转化出问题,数据被丢失。

    此时,您可以通过Print Sink或者Log4j的方式,单独将使用的函数的信息打印到日志中,确认函数的使用方法是否正确。详情请参见print结果表配置作业日志输出

  • 确认数据是否乱序。

    如果作业中存在乱序的数据,这些乱序的数据的Watermark不在新窗口的开窗和关窗时间范围内,导致这些数据被丢弃。例如下图中11秒的数据在16秒进入15~20秒的窗口,而它的Watermark为11,会被系统认为是迟到数据,从而导致被丢弃。乱序

    通常丢失的数据都是一个窗口的,您可以通过Print Sink或者Log4j的方式确认数据源中是否存在乱序的数据。详情请参见print结果表配置作业日志输出

    找到乱序数据后,可以根据乱序的程度,合理的设置Watermark,采取延迟触发窗口计算的方式处理乱序的数据。例如该示例中,可以定义Watermark生成策略为Watermark = Event time -5s,从而让乱序的数据可以被正确的处理。建议以整天整时整分开窗口求聚合,否则数据乱序严重,增加offset后还是会有数据丢失问题。