This topic describes how to troubleshoot Blink and Flink issues when you use Hologres.

Background information

  • Hologres performance
    • Write performance
      • Column-oriented table: InsertOrIgnore > InsertOrReplace > InsertOrUpdate
      • Row-oriented table: InsertOrReplcae = InsertOrUpdate > InsertOrIgnore
      Insert mode Description
      InsertOrIgnore Discards the data that you want to write if the inserted primary key has the same value as the primary key in the result table.
      InsertOrReplace Updates the table based on the inserted primary key if the inserted primary key has the same value as the primary key in the result table. If the written data does not cover all columns, the null value is inserted into the columns that are not overwritten.
      InsertOrUpdate Updates the table based on the inserted primary key if the inserted primary key has the same value as the primary key in the result table. If the written data does not cover all columns, the columns that are not overwritten are not updated.
    • Point query performance

      Row-oriented storage = Hybrid row-column storage > Column-oriented storage

  • The following table describes the Blink, Flink (VVP), and open source Flink service types supported by Hologres.
    Service type Data storage Description
    Source table Result table Dimension table Binlog Hologres Catalog
    Fully managed Flink Row-oriented storage and column-oriented storage are supported. Row-oriented storage and column-oriented storage are supported. We recommend that you use row-oriented storage. Supported. Supported. None.
    Blink in exclusive mode Row-oriented storage and column-oriented storage are supported. Row-oriented storage and column-oriented storage are supported. We recommend that you use row-oriented storage. Hologres V0.8 supports only row-oriented storage. Hologres V0.9 and later support both row-oriented storage and column-oriented storage. We recommend that you use row-oriented storage. Not supported.

    Blink in exclusive mode will be discontinued. We recommend that you use fully managed Flink.

    Open source Flink V1.10 Row-oriented storage and column-oriented storage are supported. Row-oriented storage and column-oriented storage are supported. None. Not supported. Not supported. None.
    Open source Flink V1.11 Row-oriented storage and column-oriented storage are supported. Row-oriented storage and column-oriented storage are supported. We recommend that you use row-oriented storage. Not supported. Not supported. The source code of Hologres is available in open source Flink V1.11 and later. For more information, see alibabacloud-hologres-connectors.
    Open source Flink V1.12 Row-oriented storage and column-oriented storage are supported. Row-oriented storage and column-oriented storage are supported. We recommend that you use row-oriented storage. Not supported. Not supported.
  • The following SQL code provides an example on how to map a Blink or Flink table to a Hologres table:
    create table holo_source(
    'hg_binlog_lsn' BIGINT HEADER,
    'hg_binlog_event_type' BIGINT HEADER,
    'hg_binlog_timestamp_us' BIGINT HEADER,
    A int,
    B int,
    C timestamp )
    with (
    type = 'hologres',
    'endpoint' = 'xxx.hologres.aliyuncs.com:80',   -- The endpoint of the Hologres instance. 
    'userName' = '',                               -- The AccessKey ID of your Alibaba Cloud account. 
    'password' = '',                               -- The AccessKey secret of your Alibaba Cloud account. 
    'dbName' = 'binlog',                           -- The name of the database in the Hologres instance. 
    'tableName' ='test'                            -- The name of the table in the Hologres instance. 
    'binlog' = 'true',
    );
    A Blink, Flink (VVP), or Flink SQL table is created in Flink and then mapped to a physical table based on parameter settings. Blink, Flink (VVP), or Flink SQL tables cannot be mapped to foreign tables in Hologres.

Troubleshoot the issue of slow real-time writes

  1. Verify write-related configurations.
    Verify the following configurations:
    • The storage modes of the destination table, including row-oriented storage, column-oriented storage, and hybrid row-column storage.
    • The insert modes, including InsertOrIgnore, InsertOrUpdate, and InsertOrReplace.
    • The table group and shard count of the destination table.
  2. Check the real-time write latency metric.
    If the average write latency reaches 100 milliseconds or several seconds, the backend server reaches the write performance bottleneck. In this case, the following issues may occur:
    • If the column-oriented table uses the InsertOrUpdate mode and the write operation involves a large amount of traffic, the CPU load on the instance and the write latency may be high.

      Solution: Use a row-oriented table, or use hybrid row-column storage for instances of Hologres V1.1 and later.

    • When you use CloudMonitor to query the CPU load of the instance, you find that the CPU utilization approaches 100% while the column-oriented table is not updated. This issue may occur due to high queries per second (QPS) or the large amount of data written.

      Solution: Scale up the instance.

    • Continuous executions of the INSERT INTO SELECT FROM statement trigger batch writes, which blocks real-time writes.

      Solution: Perform real-time writes, or perform batch writes in smaller quantities.

  3. Check whether data skew exists.
    Execute the following SQL statement to check whether data skew exists:
    select count(1) from t1 group by hg_shard_id;
    Solution: If data skew exists, modify the distribution key.
  4. Check whether heavy workloads exist in backend clusters.
    If no issues are identified after you perform the preceding checks, a sudden drop in the write performance is typically caused by heavy workloads in backend clusters. In this case, you must contact the technical support personnel. For more information, see Obtain online support for Hologres .
  5. Check whether backpressure exists in Blink or Flink.
    If no issues are identified after you perform the preceding checks on Hologres, slow real-time writes may be caused by the slow operation of Blink or Flink. In this case, you must check backpressure on the sink node. If only one sink node is used, you cannot check whether backpressure exists. You must check the components of the sink node. For more information, contact the Flink technical support personnel.

Troubleshoot issues with written data

Issues with written data are typically caused by out-of-order data. For example, if data that uses the same primary key is distributed in different Flink tasks, the data cannot be written in the original order. You must check the logic of Flink SQL and make sure that the written data is shuffled based on the primary key of the destination Hologres table.

Troubleshoot queries in the dimension table

  • JOIN operations for dimension tables and two data streams
    In scenarios that involve data writes to Hologres, you must check whether the JOIN operations for dimension tables are correct. You must make sure that you are not using JOIN operations for two data streams. The following code provides an example of the JOIN operations for dimension tables. If proctime AS PROCTIME() and hologres_dim FOR SYSTEM_TIME AS are missing, the operations become JOIN operations for two data streams.
    CREATE TEMPORARY TABLE datagen_source (
       a INT,
       b BIGINT,
       c STRING,
       proctime AS PROCTIME()
    ) with (
       'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE hologres_dim (
       a INT, 
       b VARCHAR, 
       c VARCHAR
    ) with (
       'connector' = 'hologres',
       ...
    );
    
    CREATE TEMPORARY TABLE blackhole_sink (
       a INT,
       b STRING
    ) with (
       'connector' = 'blackhole'
    );
    
    insert into blackhole_sink select T.a,H.b
    FROM datagen_source AS T JOIN hologres_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;
  • Queries in the dimension table
    1. Check the storage mode of the dimension table.

      Check whether the storage mode of the dimension table is row-oriented storage, column-oriented storage, or hybrid row-column storage.

    2. Check whether the latency of queries in the dimension table is high.
      Backpressure on the Flink or Blink join node is a common issue in the use of dimension tables and causes low throughput.
      1. Check the join mode of the Flink dimension table.
        The JOIN operation of Hologres dimension table connectors of Flink can be performed in synchronous or asynchronous mode. The asynchronous mode is better than the synchronous mode. The following code provides an example on how to enable the asynchronous mode:
        CREATE TABLE hologres_dim(
         id INT,
         len INT,
         content VARCHAR
        ) with (
          'connector'='hologres',
          'dbname'='<yourDbname>',  -- The name of the Hologres database. 
          'tablename'='<yourTablename>', -- The name of the Hologres table to which you want to write data. 
          'username'='<yourUsername>', -- The AccessKey ID of your Alibaba Cloud account. 
          'password'='<yourPassword>', -- The AccessKey secret of your Alibaba Cloud account. 
          'endpoint'='<yourEndpoint>'  -- The Virtual Private Cloud (VPC) endpoint of the Hologres instance. 
          'async' = 'true'-- Enable the asynchronous mode.
        );
      2. Check the query latency in the backend.
        Check the real-time write latency metric.
        • Check whether the dimension table is a column-oriented table. Column-oriented dimension tables require high overheads in scenarios with a high QPS.
        • If the dimension table is a row-oriented table, high latency is typically caused by the high load of the instance. In this case, you must scale up the instance.
    3. Check whether the join key is the primary key.

      Hologres connectors of VVR 4.x (Flink 1.13) and later allow queries by using Holo Client based on columns other than primary key columns. In this case, the query performance is usually low and the instance load is high. In addition, tables are not optimized when they are created. You must optimize the table structure. A common practice is to set the join key as the distribution key, which enables shard pruning.

    4. Check backpressure on the Blink node.

      If no issues are identified after you perform the preceding checks on Hologres, slow queries may be caused by the slow operation of Blink. In this case, you must check backpressure on the sink node. If only one sink node is used, you cannot check whether backpressure exists. You must check the components of the sink node. You can use the same method to check backpressure on the join node. For more information, contact the Flink technical support personnel.

Usage notes of connections

By default, when a Java Database Connectivity (JDBC) driver is used, Hologres connectors use certain JDBC connections. The default number of connections varies based on the type of tables. The following table describes the default number of connections.
Table type Default number of connections (number of concurrent running Flink jobs)
Binlog source table 0
Source table generated by batch jobs 1
Dimension table 3 (The value can be changed by using the connectionSize parameter.)
Result table 3 (The value can be changed by using the connectionSize parameter.)
  • Calculation of connections
    • Default setting

      By default, the maximum number of connections that a job uses can be calculated by using the following formula:

      Maximum number of connections = (Number of batch source tables × 1 + Number of dimension tables × connectionSize + Number of result tables × connectionSize) × Number of concurrent jobs

      For example, a job has a source table containing full and incremental data, two dimension tables, and three result tables, and all these tables use the default value of the connectionSize parameter. If the number of concurrent jobs is set to 5, the number of connections used is 80: (1 × 1 + 2 × 3 + 3 × 3) × 5 = 80.

    • Connection reuse

      Realtime Compute for Apache Flink whose engine version is Flink 1.13-VVR 4.1.12 or later supports connection reuse. If dimension tables and result tables within a concurrent job are configured with the same value for the connectionPoolName parameter, the dimension tables and result tables use the same connection pool. In the preceding example, if two dimension tables and three result tables are configured with the same value for the connectionPoolName parameter, and the connectionSize is set to 5, the number of connections used is 30: (1 × 1 + 5) × 5 = 30.

      Note The connection reuse mode is applicable to most scenarios. However, in scenarios with a large number of dimension tables or in scenarios without enabling the asynchronous mode and caching, synchronous point queries are frequently performed. This way, multi-table connection reuse may cause slow queries. In this case, you can configure connection reuse only for result tables.
    • Other scenarios
      • When a job is started, three to six connections need to be established to verify the metadata of a table. After the job runs as expected, the connections are released.
      • Fully managed Flink supports features such as Hologres catalogs, the CREATE TABLE AS statement (CTAS), and the CREATE DATABASE AS statement (CDAS). When you use these features, connections are used. By default, a job that uses Hologres catalogs consumes three more connections for DDL operations such as creating a table.
  • Diagnosis of connections
    If a job has many tables and a lot of jobs are running in parallel, a large number of connections are used. The number of connections in Hologres may reach the upper limit. You can use the following methods to learn and diagnose the current connections:
    • Execute the following statement to query the active queries on the current instance by using the pg_stat_activity view in HoloWeb. For more information, see Query the pg_stat_activity view. In the application_name field, if the value of a query is ververica-connector-hologres, it indicates that the query uses the read and write connections of Realtime Compute for Apache Flink.
      SELECT application_name, COUNT (1) AS COUNT
      FROM
        pg_stat_activity
      WHERE
        backend_type = 'client backend'
        AND application_name != 'hologres'
        AND usename != 'holo_admin'
      GROUP BY application_name;
    • If the value for concurrent jobs is set to a larger number than expected, the number of connections is large when a job is started, and then decreases after the job runs for a period of time. The result is displayed on the Monitoring Information tab of the instance details page in the Hologres console. The reason for the result is that many idle connections are closed. This result indicates that the job does not need such a large number of concurrent jobs or connections. You can set an appropriate number of connections, reduce the number of concurrent jobs or the value of the connectionSize parameter, or use the connection reuse mode.

Common errors and troubleshooting

  • ERPC TIMEOUT or ERPC CONNECTION CLOSED
    • Description: The error com.alibaba.blink.store.core.rpc.RpcException: request xx UpsertRecordBatchRequest failed on final try 4, maxAttempts=4, errorCode=3, msg=ERPC_ERROR_TIMEOUT is reported.
    • Cause: The TIMEOUT error may be caused by write failures resulting from heavy workloads on writes or busy clusters. You can check whether the instance has heavy CPU loads. The CONNECTION CLOSED error may be reported when excessive loads on the backend node cause out-of-memory (OOM) errors or core dumps.
    • Solution: Retry the write operation. If the error persists, contact the Hologres technical support personnel.
  • Error: BackPresure Exceed Reject Limit.
    • Cause: Heavy workloads on writes in the backend prevent the system from clearing the data of the memory table. In this case, a write failure occurs.
    • Solution: Ignore the error if it occurs only occasionally, or use the rpcRetries = '100' parameter setting to increase the number of write retries. If the error persists, contact the Hologres technical support personnel.
  • Error: The requested table name xxx mismatches the version of the table xxx from server/org.postgresql.util.PSQLException: An I/O error occurred while sending to the backend.Caused by: java.net.SocketTimeoutException: Read timed out.
    • Cause: An ALTER TABLE statement is executed. As a result, the schema version of the Blink table is earlier than that on the server and the maximum number of retries is reached.
    • Solution: Ignore the error if it occurs only occasionally. If the error persists, contact the Hologres technical support personnel.
  • Error: Failed to query table meta for table.
    • Cause: A foreign table is read. However, Hologres connectors do not support read and write operations on foreign tables. Otherwise, the error is caused by an issue related to the metadata of the Hologres instance.
    • Solution: Contact the Hologres technical support personnel.
  • Error: Cloud authentication failed for access id.
    • Cause: The AccessKey pair is incorrect, or no user is added to the Hologres instance.
    • Solution:
      • Verify the AccessKey ID and AccessKey secret.
      • Use the AccessKey pair to log on to the HoloWeb console. If the AccessKey pair is incorrect, the same error is reported. If the user does not have permissions to manage the instance, FATAL: role"ALIYUN$xxxx"does not exist is reported. You must grant permissions to the user.
  • No data can be queried when a dimension table is joined with another table.
    • Cause: The dimension table is a partitioned table. Dimension tables do not support partitioned tables.
    • Solution: Use standard tables as dimension tables.
  • Error: Modify record by primary key is not on this table.
    • Cause: An update mode is specified, but the Hologres result table does not contain a primary key.
    • Solution: Set a primary key.
  • Error: shard columns count is no match.
    • Cause: Not all distribution columns are written. By default, primary key columns are distribution columns.
    • Solution: Write all distribution columns.
  • Error: Full row is required, but the column xxx is missing.
    • Cause: Data in a required column is missing. This error is reported in earlier Hologres versions.
    • Solution: Assign values to the required columns.
  • The number of JDBC connections temporarily increases when VVP users read or write Hologres data.
    • Cause: The maximum number of JDBC connections allowed to read or write Hologres data other than binary logs by using Hologres connectors that use VVP is reached. The maximum number of JDBC connections is calculated based on the following formula: Number of Hologres tables × Number of concurrent read or write operations × connectionSize. connectionSize is a parameter for VVP tables, and the default value of the connectionSize parameter is 3.
    • Solution: Schedule an appropriate number of connections, and reduce the number of concurrent read/write operations or the value of the connectionSize parameter. If the number of concurrent read/write operations or the value of the connectionSize parameter cannot be reduced, set useRpcMode to true to switch to the RPC mode.
  • Blink and VVP users cannot connect to Hologres when they attempt to read or write Hologres data.
    • Cause: The Blink or VVP cluster is slow or unable to access the Internet.
    • Solution: Make sure that the Blink or VVP cluster resides in the same region as the Hologres instance and uses a VPC endpoint.
  • Error: Hologres rpc mode dimension table does not support one to many join.
    • Cause: The Blink or VVP dimension table in RPC mode is not a row-oriented table, or the field used to join tables is not the primary key.
    • Solution: Use the JDBC mode and use the column-oriented or hybrid row-column storage mode for the dimension table.
  • DatahubClientException
    • Description: The error Caused by: com.aliyun.datahub.client.exception.DatahubClientException: [httpStatus:503, requestId:null, errorCode:null, errorMessage:{"ErrorCode":"ServiceUnavailable","ErrorMessage":"Queue Full"}] is reported.
    • Cause: The thread pool is full because a large number of binary log consumption jobs are simultaneously restarted for a specific reason.
    • Solution: Consume binary logs in batches.
  • Error occurs when reading data from datahub
    • Description: The error Error occurs when reading data from datahub, msg: [httpStatus:500, requestId:xxx, errorCode:InternalServerError, errorMessage:Get binlog timeout.] is reported.
    • Cause: Each binary log contains a large amount of data, which makes the size of each RPC request exceed the upper limit.
    • Solution: Reduce the number of binary logs in each batch when a large number of data fields and long character strings exist in each row.
  • Error: Caused by: java.lang.IllegalArgumentException: Column: created_time type does not match: flink row type: TIMESTAMP(6) WITH LOCAL TIME ZONE, hologres type: timestamp.
    • Cause: A Flink field is of the TIMESTAMP(6) data type, which is not supported by Hologres.
    • Solution: Change the data type to TIMESTAMP.
  • Error: Caused by: org.postgresql.util.PSQLException: FATAL: Rejected by ip white list. db = xxx, usr=xxx, ip=xx.xx.xx.xx.
    • Cause: An IP address whitelist is configured in Hologres, which does not contain the IP address used by Flink to connect to Hologres.
    • Solution: Add the IP address used by Flink to the IP address whitelist of Hologres. For more information, see Configure an IP address whitelist.