本文为您介绍使用Hologres过程中关于Blink和Flink的常见问题。

基本概念

  • Hologres性能
    • 写入性能
      • 列存表: InsertOrIgnore > InsertOrReplace > InsertOrUpdate
      • 行存表: InsertOrReplcae = InsertOrUpdate > InsertOrIgnore
      参数说明
      InsertOrIgnore结果表有主键,实时写入时如果主键重复,丢弃后到的数据。
      InsertOrReplace结果表有主键,实时写入时如果主键重复,按照主键更新,如果写入的一行数据不包含所有列,缺失的列的数据补Null。
      InsertOrUpdate结果表有主键,实时写入时如果主键重复,按照主键更新,如果写入的一行数据不包含所有列,缺失的列不更新。
    • 点查性能

      行存 = 行列混存 > 列存。

  • Blink、Flink(VVP)、开源Flink支持情况
    产品形态数据存储类型描述
    源表结果表维表BinlogHologres Catalog
    Flink全托管支持行存储及列存储。支持行存储及列存储。建议使用行存储。支持支持
    Blink独享支持行存储及列存储。支持行存储及列存储。建议使用行存储。Hologres V0.8版本只支持行存储,V0.9及以上版本支持行存储及列存储。建议使用行存储。不支持

    已开始逐步下线,推荐使用阿里云Flink全托管。

    开源Flink1.10支持行存储及列存储。支持行存储及列存储。不支持不支持
    开源Flink1.11及以上支持行存储及列存储。支持行存储及列存储。建议使用行存储。不支持不支持从开源Flink1.11版本开始,Hologres代码已开源。详细内容请参见GitHub
  • Blink、Flink 映射Hologres的SQL示例如下。
    create table holo_source(
    'hg_binlog_lsn' BIGINT HEADER,
    'hg_binlog_event_type' BIGINT HEADER,
    'hg_binlog_timestamp_us' BIGINT HEADER,
    A int,
    B int,
    C timestamp )
    with (
    type = 'hologres',
    'endpoint' = 'xxx.hologres.aliyuncs.com:80',   --Hologres实例的Endpoint。
    'userName' = '',                               --当前阿里云账号的AccessKey ID。
    'password' = '',                               --当前阿里云账号的AccessKey Secret。
    'dbName' = 'binlog',                           --Hologres实例的数据库名称。
    'tableName' ='test'                            --Hologres实例的表名称。
    'binlog' = 'true',
    );
    Blink、VVP、Flink SQL,都是在Flink侧声明一张表,然后根据参数映射至Hologres的一张具体的物理表,所以不支持映射至外部表。

实时写入慢问题排查流程

  1. 确认写入相关配置
    需要确认以下配置信息。
    • 目标表的存储格式,包括行存表、列存表和行列共存表。
    • Insert模式,包括InsertOrIgnore、InsertOrUpdate和InsertOrReplace。
    • 目标表的Table Group及Shard Count。
  2. 查看监控指标的实时写入延迟
    如果平均写入延迟偏高,在百毫秒甚至秒级别,通常便是后端达到了写入瓶颈,这时候可能会存在如下问题。
    • 使用了列存表的InsertOrUpdate,即局部更新,且流量较高,这种情况下会导致实例的CPU负载和写入延迟偏高。

      解决方法:建议更换表的类型,使用行存表,如果您的实例是V1.1及以上版本可以选择行列混存表。

    • 云监控查看实例的CPU负载,如果CPU水位接近100%,但没有列存表的局部更新,那么通常情况下是由于高QPS的查询,或者本身写入量较高导致的。

      解决方法:扩容Hologres实例。

    • 确认是否有不断的Insert into select from命令,触发了该表的BulkLoad写入,当前BulkLoad写入会阻塞实时写入。

      解决方法:将BulkLoad写入转换成实时写入,或者错峰执行。

  3. 确认是否有数据倾斜
    使用如下SQL命令查看是否有数据倾斜。
    SELECT hg_shard_id, count(1) FROM t1 GROUP BY hg_shard_id ORDER BY hg_shard_id;
    解决方法:修改Distribution Key,使数据分布更加均衡。
  4. 确认后端是否有压力
    如果以上步骤排查完没有问题,写入性能突然下降,一般情况是后端集群压力比较大,存在瓶颈。请联系技术支持人员确认情况,详情请参见如何获取更多的在线支持?
  5. 查看Blink/Flink侧的反压情况
    上述步骤排查完后,发现Hologres侧没有明显的异常,通常情况下是客户端慢了,也就是Blink/Flink侧本身就慢了,这时候确认是否是Sink节点反压了。如果作业只有一个节点,就无法看出是否反压了,这时候可以将Sink节点单独拆开再观察。具体请联系Flink技术支持。

写入数据有问题排查流程

这种情况通常是由于数据乱序引起的,比如相同主键的数据分布在不同的Flink Task上,写入的时候无法保证顺序。需要确认Flink SQL的逻辑,最后写出到Hologres的时候,是否按照Hologres表的主键进行Shuffle了。

维表查询问题排查流程

  • 维表Join和双流Join
    对于读Hologres的场景,需要首先确认用户是否使用对了维表Join,是否错将双流Join当成维表Join来使用了。以下是Hologres作为维表的使用示例,如果少了proctime AS PROCTIME() hologres_dim FOR SYSTEM_TIME AS两处关键字,则会变成双流Join。
    CREATE TEMPORARY TABLE datagen_source (
       a INT,
       b BIGINT,
       c STRING,
       proctime AS PROCTIME()
    ) with (
       'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE hologres_dim (
       a INT, 
       b VARCHAR, 
       c VARCHAR
    ) with (
       'connector' = 'hologres',
       ...
    );
    
    CREATE TEMPORARY TABLE blackhole_sink (
       a INT,
       b STRING
    ) with (
       'connector' = 'blackhole'
    );
    
    insert into blackhole_sink select T.a,H.b
    FROM datagen_source AS T JOIN hologres_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;
  • 维表查询
    1. 确认维表存储格式

      确认维表的存储格式是行存表、列存表还是行列共存。

    2. 维表查询延迟高
      维表的使用,最常见的问题就是Flink/Blink侧用户反馈Join节点有反压,导致整个作业的吞吐上不去。
      1. 确认Flink维表Join的模式
        当前Hologres Flink Connector的维表Join功能支持同步和异步模式两种,异步模式性能要优于同步模式,具体需要看Flink SQL进行区分,以下是一个开启异步维表查询功能的SQL示例。
        CREATE TABLE hologres_dim(
         id INT,
         len INT,
         content VARCHAR
        ) with (
          'connector'='hologres',
          'dbname'='<yourDbname>',  --Hologres的数据库名称。
          'tablename'='<yourTablename>',  --Hologres用于接收数据的表名称。
          'username'='<yourUsername>',  --当前阿里云账号的AccessKey ID。
          'password'='<yourPassword>',  --当前阿里云账号的AccessKey Secret。
          'endpoint'='<yourEndpoint>'  --当前Hologres实例VPC网络的Endpoint。
          'async' = 'true'--异步模式
        );
      2. 确认后端查询延迟
        查看监控指标的实时写入延迟:
        • 确认是否是列存表在做维表,列存表的维表在高QPS场景下开销很高。
        • 如果是行存表,且延迟高,通常情况下是实例整体负载较高导致的,需要进行扩容。
    3. 确认Join的Key是否是Hologres表的主键

      自VVR 4.x (Flink 1.13) 版本开始,Hologres Connector基于Holo Client实现了Hologres表的非主键查询,这种情况通常性能会比较差、实例负载也比较高,尤其是建表没有特别优化过的情况。这时候需要引导优化表结构,最常见的就是将Join的key设置成Distribution Key,这样就能实现Shard Pruning。

    4. 查看Blink侧的反压情况

      如果上述步骤排查完成,发现Hologres侧没有明显的异常,通常情况下是客户端慢了,也就是Blink侧本身就慢了,这时候可以确认是否是Sink节点反压了。如果作业只有一个节点,就无法看出是否反压了,这时候可以将Sink节点单独拆开再观察。同样可以排查是否是Join节点导致的反压。具体请联系Flink技术支持排查。

连接数使用说明

Hologres Connector默认采用JDBC模式,会占用一定的JDBC连接数,不同类型的表默认连接数使用情况如下表。
表类型默认连接数(Flink作业的每个并发)
Binlog源表0
批量源表1
维表3(可以通过connectionSize参数调整)
结果表3(可以通过connectionSize参数调整)
  • 连接数计算方法
    • 默认情况

      默认情况下,作业使用的最大连接数可以通过如下公式计算:

      最大连接数 = ( 批量源表数 * 1 + 维表数 * connectionSize + 结果表数 * connectionSize )* 作业并发

      例如某作业有一张全增量源表、两张维表和三张结果表,都使用默认的connectionSize参数值,作业并发设置为5,则最终使用的连接数为:(1 * 1 + 2 * 3 + 3 * 3) * 5 = 80

    • 连接复用

      实时计算1.13-vvr-4.1.12及以上版本支持连接复用。一个作业的同一个并发内,相同connectionPoolName的维表和结果表会使用同一个连接池。默认情况示例中,如果两张维表和三张结果表都配置了相同的connectionPoolName,并适当调大connectionSize5,则最终使用的连接数为(1 * 1 + 5) * 5 = 30

      说明 连接复用模式适用大多数场景,但部分场景比如维表数量较多、没有启用异步也没有开启缓存时,会非常频繁的进行同步的点查,此时多表连接复用可能导致查询变慢,这种情况可以只为结果表配置连接复用。
    • 其他使用连接的场景
      • 作业启动过程中,需要建立连接用于表元数据的验证等工作,可能会暂时使用3至6个连接,作业正常运行后会释放。
      • Flink全托管支持Hologres Catalog、CTAS以及CDAS等功能,使用这些功能也会占用连接数。默认情况下,一个使用Catalog的作业,会多占用三个连接,用于建表等DDL操作。
  • 连接数使用诊断
    当作业的表数量较多、作业并发较高时,会占用大量的连接数,甚至出现将Hologres总连接数占满的情况,通过以下方式对当前连接数的使用进行了解和诊断。
    • 使用如下命令在HoloWeb中通过pg_stat_activity表查看当前的活跃Query,详情请参见查询pg_stat_activity视图信息。其中application_name字段中值为ververica-connector-hologres的Query代表来自实时计算Flink的读写连接。
      SELECT application_name, COUNT (1) AS COUNT
      FROM
        pg_stat_activity
      WHERE
        backend_type = 'client backend'
        AND application_name != 'hologres'
        AND usename != 'holo_admin'
      GROUP BY application_name;
    • 一些时候作业并发设置的过大, 在Hologres管理控制台监控告警页表现如下:作业刚启动的时候连接数很高,运行一段时间之后连接数下降。这是因为很多连接处于空闲状态被关闭了,此现象表明作业实际上不需要如此大的并发或连接数,应该合理规划任务连接数、降低并发度或connectionSize参数值,或者使用连接复用模式。

常见报错

  • ERPC TIMEOUT或者ERPC CONNECTION CLOSED
    • 报错现象:出现com.alibaba.blink.store.core.rpc.RpcException: request xx UpsertRecordBatchRequest failed on final try 4, maxAttempts=4, errorCode=3, msg=ERPC_ERROR_TIMEOUT报错。
    • 可能原因:写入时压力过大写入失败或者集群比较繁忙,可以观察Hologres实例的CPU负载是否打满。CONNECTION CLOSED可能是负载过大导致后端节点挂掉了,出现OOM(Out Of Memory)或者Coredump。
    • 解决方法:请先重试写入,如果不能恢复请找Hologres技术支持人员排查原因。
  • 报错:BackPresure Exceed Reject Limit
    • 可能原因:通常是Hologres后端写入压力过大,导致Memtable来不及刷盘导致写入失败。
    • 解决方法:如偶发失败可忽略该问题,或者Sink加上参数rpcRetries = '100' 来调大写入重试次数。如果一直报该错误,请联系Hologres技术支持人员确认后端实例状态。
  • 报错:The requested table name xxx mismatches the version of the table xxx from server/org.postgresql.util.PSQLException: An I/O error occurred while sending to the backend.Caused by: java.net.SocketTimeoutException: Read timed out
    • 可能原因:通常是用户做了Alter Table导致Blink写入所带表的Schema版本号低于Server端版本号导致的,并且超过了客户端的重试次数。
    • 解决方法:如偶发报错可忽略该问题。如果一直报该错误,请联系Hologres技术支持人员。
  • 报错:Failed to query table meta for table
    • 可能原因:一种可能是用户读写了一张Hologres的外部表,Hologres Connector不支持读写外部表。如果不是,可能是Hologres实例 Meta出现了问题。
    • 解决方法:请联系Hologres技术支持人员。
  • 报错:Cloud authentication failed for access id
    • 可能原因:该报错通常是用户配置的AccesKey信息不对,或者用户没有添加账号至Hologres实例。
    • 解决方法:
      • 请检查当前账户的AccessKey ID和AccessKey Secret填写是否正确,一般是AccessKey Secret错误或者有空格。
      • 检查不出原因可以用当前AccesKey连接HoloWeb(使用账号密码方式登录),在测试联通性时看报错是什么,还是一样的报错说明AccesKey有问题,如果报错为FATAL:role“ALIYUN$xxxx“does not exist说明账号没有实例的权限,需要管理员给该账号授予权限。
  • Hologres维表Join不到数据。
    • 可能原因:Hologres维表使用了分区表,Hologres维表暂不支持分区表。
    • 解决方法:请将分区表转为普通表。
  • 报错:Modify record by primary key is not on this table
    • 可能原因:实时写入的时候选择了更新模式,但是Hologres的结果表没有主键。
    • 解决方法:请设置主键。
  • 报错:shard columns count is no match
    • 可能原因:写入Hologres的时候,没有写入完整的Distribution Key的列(默认是主键)。
    • 解决方法:请写入完整的Distribution Key列。
  • 报错:Full row is required, but the column xxx is missing
    • 可能原因:Hologres老版本的报错信息,通常是用户没有写某列数据,而那一列是不能为空的。
    • 解决方法:请为不能为空的列赋值。
  • VVP用户读写Hologres导致JDBC连接数暴涨。
    • 可能原因:VVP Hologres Connector读写Hologres(除了Binlog),采用JDBC模式,最大占用读写Hologres表数量*并发度 * connectionSize(VVP表的参数,默认为3)个连接。
    • 解决方法:合理规划任务连接数,降低并发度或者connectionSize。如无法调低并发度或connectionSize,可以为表设置参数useRpcMode = 'true' 切回至Rpc模式。
  • Blink/VVP用户读写Hologres报错显示无法连接Hologres。
    • 可能原因:Blink/VVP集群默认访问公网很慢或者无法访问。
    • 解决方法:需要保证和Hologres实例在相同Region,且使用VPC的Endpoint。
  • 报错:Hologres rpc mode dimension table does not support one to many join
    • 可能原因:Blink和VVP的RPC模式维表必须是行存表,且Join的字段必须是主键,报错的原因往往是以上两个条件不满足
    • 解决方法:建议使用JDBC模式,且维表使用行存表或者行列共存表。
  • DatahubClientException
    • 报错现象:出现Caused by: com.aliyun.datahub.client.exception.DatahubClientException: [httpStatus:503, requestId:null, errorCode:null, errorMessage:{"ErrorCode":"ServiceUnavailable","ErrorMessage":"Queue Full"}]报错。
    • 可能原因:大量消费Binlog作业由于某种原因同时重启导致线程池被占满。
    • 解决方法:分批进行消费Binlog作业。
  • Error occurs when reading data from datahub
    • 报错现象:出现Error occurs when reading data from datahub, msg: [httpStatus:500, requestId:xxx, errorCode:InternalServerError, errorMessage:Get binlog timeout.]报错。
    • 可能原因:Binlog每条数据太大,乘上攒批之后,每个RPC请求的大小超过最大限制。
    • 解决方法:在每行数据字段较多且有很长的字符串等字段时,可以减小攒批配置。
  • 报错:Caused by: java.lang.IllegalArgumentException: Column: created_time type does not match: flink row type: TIMESTAMP(6) WITH LOCAL TIME ZONE, hologres type: timestamp
    • 可能原因:在Flink中字段使用了TIMESTAMP(6)类型,当前不支持映射至Hologres。
    • 解决方法:修改字段类型为TIMESTAMP
  • 报错:Caused by: org.postgresql.util.PSQLException: FATAL: Rejected by ip white list. db = xxx, usr=xxx, ip=xx.xx.xx.xx
    • 可能原因:在Hologres中设置了IP白名单,但是白名单中未包含Flink访问Hologres的IP地址,所以Flink访问Hologres时被阻止。
    • 解决方法:在Hologres的IP白名单中增加Flink的IP,详情请参见IP白名单