All Products
Search
Document Center

Hologres:FAQ about Blink and Flink issues

Last Updated:Apr 23, 2024

This topic provides answers to some frequently asked questions about Blink and Flink when you use Hologres.

Background information

  • Hologres performance

    • Write performance

      • Column-oriented table: InsertOrIgnore > InsertOrReplace > InsertOrUpdate

      • Row-oriented table: InsertOrReplace = InsertOrUpdate > InsertOrIgnore

      Insert mode

      Description

      InsertOrIgnore

      Discards the data that you want to write if the inserted primary key has the same value as the primary key in the result table.

      InsertOrReplace

      Updates the table based on the inserted primary key if the inserted primary key has the same value as the primary key in the result table. If the written data does not cover all columns, the null value is inserted into the columns that are not overwritten.

      InsertOrUpdate

      Updates the table based on the inserted primary key if the inserted primary key has the same value as the primary key in the result table. If the written data does not cover all columns, the columns that are not overwritten are not updated.

    • Point query performance

      Row-oriented storage = Hybrid row-column storage > Column-oriented storage

  • Support for the Blink, Realtime compute for Apache Flink (VVP), and open source Flink service types

    Service type

    Data storage

    Description

    Source table

    Result table

    Dimension table

    Binlog

    Hologres Catalog

    Fully managed Flink

    Row-oriented storage and column-oriented storage are supported.

    Row-oriented storage and column-oriented storage are supported.

    Row-oriented storage is recommended.

    Supported.

    Supported.

    None.

    Blink in exclusive mode

    Row-oriented storage and column-oriented storage are supported.

    Row-oriented storage and column-oriented storage are supported.

    Row-oriented storage is recommended.

    Hologres V0.8 supports only row-oriented storage. Hologres V0.9 and later support row-oriented storage and column-oriented storage. We recommend that you use row-oriented storage.

    Not supported.

    Blink in exclusive mode will be discontinued. We recommend that you use fully managed Flink.

    Open source Flink V1.10

    Row-oriented storage and column-oriented storage are supported.

    Row-oriented storage and column-oriented storage are supported.

    None.

    Not supported.

    Not Supported.

    None.

    Open source Flink V1.11 and later

    Row-oriented storage and column-oriented storage are supported.

    Row-oriented storage and column-oriented storage are supported.

    Row-oriented storage is recommended.

    Not supported.

    Not supported.

    The source code of Hologres is available in open source Flink V1.11 and later. For more information, see alibabacloud-hologres-connectors.

  • Sample SQL statement for mapping a Blink or Flink table to a Hologres table:

    create table holo_source(
    'hg_binlog_lsn' BIGINT HEADER,
    'hg_binlog_event_type' BIGINT HEADER,
    'hg_binlog_timestamp_us' BIGINT HEADER,
    A int,
    B int,
    C timestamp )
    with (
    type = 'hologres',
    'endpoint' = 'xxx.hologres.aliyuncs.com:80',   -- The endpoint of the Hologres instance. 
    'userName' = '',                               -- The AccessKey ID of your Alibaba Cloud account. 
    'password' = '',                               -- The AccessKey secret of your Alibaba Cloud account. 
    'dbName' = 'binlog',                           -- The name of the database in the Hologres instance. 
    'tableName' ='test'                            -- The name of the table in the Hologres instance. 
    'binlog' = 'true',
    );

    A Blink, Flink (VVP), or Flink SQL table is created in Flink and mapped to a physical table in Hologres based on parameter settings. Blink, Flink (VVP), or Flink SQL tables cannot be mapped to foreign tables in Hologres.

Troubleshoot the issue of slow real-time writes

  1. Verify write-related configurations.

    Verify the following configurations:

    • The storage mode of the destination table, such as row-oriented storage, column-oriented storage, or hybrid row-column storage.

    • The insert mode, such as InsertOrIgnore, InsertOrUpdate, and InsertOrReplace.

    • The table group and shard count of the destination table.

  2. Check the real-time write latency metric.

    If the average write latency reaches 100 milliseconds or several seconds, the backend server reaches the write performance bottleneck. In this case, the following issues may occur:

    • If the column-oriented table uses the InsertOrUpdate mode and the write operation involves a large amount of traffic, the CPU load on the instance and the write latency are high.

      Solution: Use a row-oriented table, or use a hybrid row-column storage table if your Hologres instance version is V1.1 and later.

    • The CPU load of the instance monitored by CloudMonitor approaches 100% while the column-oriented table is not updated. This issue may occur due to high queries per second (QPS) or a large amount of written data.

      Solution: Scale up the instance.

    • Continuous executions of the INSERT INTO SELECT FROM statement trigger batch writes, which blocks real-time writes.

      Solution: Convert batch writes into real-time writes, or perform batch writes during off-peak hours.

  3. Check whether data skew exists.

    Execute the following SQL statement to check whether data skew exists:

    SELECT hg_shard_id, count(1) FROM t1 GROUP BY hg_shard_id ORDER BY hg_shard_id;

    Solution: If data skew exists, change the distribution key to balance data distribution.

  4. Check whether heavy workloads exist in backend clusters.

    If no issues are identified after you perform the preceding checks, a sudden drop in the write performance is typically caused by heavy workloads in backend clusters. In this case, contact the technical support. For more information, see Obtain online support for Hologres.

  5. Check whether backpressure exists in Blink or Flink.

    If no issues are identified after you perform the preceding checks on Hologres, slow real-time writes may be caused by the slow operation of Blink or Flink. In this case, you must check backpressure on the sink node. If only one sink node is used, you cannot check whether backpressure exists. You must check the components of the sink node. For more information, contact the Flink technical support.

Troubleshoot issues with written data

Issues with written data are typically caused by out-of-order data. For example, if data that uses the same primary key is distributed in different Flink tasks, the data cannot be written in the original order. You must check the logic of Flink SQL and make sure that the written data is shuffled based on the primary key of the destination Hologres table.

Troubleshoot queries in the dimension table

  • JOIN operations for dimension tables and two data streams

    In scenarios that involve data read in Hologres, you must check whether the JOIN operations for dimension tables are used and correct. You must make sure that you are not using JOIN operations for two data streams. The following code provides an example of the JOIN operations for dimension tables in Hologres. If proctime AS PROCTIME() and hologres_dim FOR SYSTEM_TIME AS are missing, the operations become JOIN operations on two streams.

    CREATE TEMPORARY TABLE datagen_source (
       a INT,
       b BIGINT,
       c STRING,
       proctime AS PROCTIME()
    ) with (
       'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE hologres_dim (
       a INT, 
       b VARCHAR, 
       c VARCHAR
    ) with (
       'connector' = 'hologres',
       ...
    );
    
    CREATE TEMPORARY TABLE blackhole_sink (
       a INT,
       b STRING
    ) with (
       'connector' = 'blackhole'
    );
    
    insert into blackhole_sink select T.a,H.b
    FROM datagen_source AS T JOIN hologres_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;
  • Queries in the dimension table

    1. Check the storage mode of the dimension table.

      Check whether the storage mode of the dimension table is row-oriented storage, column-oriented storage, or hybrid row-column storage.

    2. Check whether the latency of queries in the dimension table is high.

      Backpressure on the Flink or Blink join node is a common issue in the use of dimension tables and causes low throughput.

      1. Check the join mode of the Flink dimension table.

        The JOIN operation of Hologres dimension table connectors of Flink can be performed in synchronous or asynchronous mode. The asynchronous mode is better than the synchronous mode. The following code provides an example on how to enable the asynchronous mode:

        CREATE TABLE hologres_dim(
         id INT,
         len INT,
         content VARCHAR
        ) with (
          'connector'='hologres',
          'dbname'='<yourDbname>',  -- The name of the Hologres database. 
          'tablename'='<yourTablename>', --The name of the Hologres table to which data is written. 
          'username'='<yourUsername>', --The AccessKey ID of your Alibaba Cloud account. 
          'password'='<yourPassword>', --The AccessKey secret of your Alibaba Cloud account. 
          'endpoint'='<yourEndpoint>'  -- The Virtual Private Cloud (VPC) endpoint of the Hologres instance. 
          'async' = 'true'-- Enable the asynchronous mode.
        );
      2. Check the query latency in the backend.

        Check the real-time write latency metric.

        • Check whether the dimension table is a column-oriented table. Column-oriented dimension tables require high overheads in scenarios with a high QPS.

        • If the dimension table is a row-oriented table, high latency is typically caused by the high load of the instance. In this case, you must scale up the instance.

    3. Check whether the join key is the primary key.

      Hologres connectors of VVR 4.x (Flink 1.13) and later allow queries by using Holo Client based on columns other than primary key columns. In this case, the query performance is usually low and the instance load is high. In addition, tables are not optimized when they are created. You must optimize the table structure. A common practice is to configure the join key as the distribution key, which enables shard pruning.

    4. Check backpressure on the Blink node.

      If no issues are identified after you perform the preceding checks on Hologres, slow queries may be caused by the slow operation of Blink. In this case, you must check backpressure on the sink node. If only one sink node is used, you cannot check whether backpressure exists. You must check the components of the sink node. You can use the same method to check backpressure on the join node. For more information, contact the Flink technical support.

Usage notes of connections

By default, Hologres connectors use the Java Database Connectivity (JDBC) driver, and certain JDBC connections are occupied. The default number of connections that are used by tables varies based on the table type. The following table describes the default number of connections that are used by different types of tables.

Table type

Default number of connections for each pod in a Flink job

Binlog source table

0

Source table for batch reading

1

Dimension table

3 (The value can be changed by using the connectionSize parameter.)

Result table

3 (The value can be changed by using the connectionSize parameter.)

  • Calculation of the number of connections

    • Default setting

      By default, the maximum number of connections that a job can use is calculated by using the following formula:

      Maximum number of connections = (Number of source tables × 1 + Number of dimension tables × connectionSize + Number of result tables × connectionSize) × Job parallelism

      For example, a job has a source table that contains full and incremental data, two dimension tables, and three result tables, and all these tables use the default value of the connectionSize parameter. If the job parallelism is 5, the number of connections used is 80, which is calculated by using the following formula: (1 × 1 + 2 × 3 + 3 × 3) × 5 = 80.

    • Connection reuse

      Realtime Compute for Apache Flink whose engine version is Flink 1.13-VVR 4.1.12 or later supports connection reuse. If dimension tables and result tables within the same pod of a job are configured with the same value for the connectionPoolName parameter, the dimension tables and result tables use the same connection pool. In the preceding example, if the two dimension tables and three result tables are configured with the same value for the connectionPoolName parameter, and connectionSize is set to 5 for these tables, the number of connections used is 30, which is calculated by using the following formula: (1 × 1 + 5) × 5 = 30.

      Note

      The connection reuse mode is applicable to most scenarios. However, in scenarios with a large number of dimension tables or in scenarios without enabling the asynchronous mode and caching, synchronous point queries are frequently performed. This way, multi-table connection reuse may cause slow queries. In this case, you can configure connection reuse only for result tables.

    • Other scenarios

      • When a job is started, three to six connections need to be established to verify the metadata of a table. After the job runs as expected, the connections are released.

      • Fully managed Flink supports features such as Hologres catalogs, the CREATE TABLE AS statement (CTAS), and the CREATE DATABASE AS statement (CDAS). When you use these features, connections are used. By default, a job that uses Hologres catalogs consumes three more connections for data definition language (DDL) operations such as creating a table.

  • Diagnosis of connections

    If a job has many tables and has a high degree of parallelism, a large number of connections are used. The number of used connections in Hologres may reach the upper limit. You can use the following methods to learn and diagnose the current connections:

    • Execute the following statement to query the active queries on the current instance by using the pg_stat_activity view in HoloWeb. For more information, see Query the pg_stat_activity view. In the application_name field, if the value of a query is ververica-connector-hologres, the query uses the read and write connections of Realtime Compute for Apache Flink.

      SELECT application_name, COUNT (1) AS COUNT
      FROM
        pg_stat_activity
      WHERE
        backend_type = 'client backend'
        AND application_name != 'hologres'
        AND usename != 'holo_admin'
      GROUP BY application_name;
    • If the parallelism of a job is set to a large value, the number of connections is large when the job is started, and then decreases after the job runs for a period of time. The result is displayed on the Monitoring Information tab of the instance details page in the Hologres console. The reason for the result is that a large number of idle connections are closed. This result indicates that the job does not require high parallelism or excessive connections. You can configure an appropriate number of connections, reduce job parallelism or the value of the connectionSize parameter, or use the connection reuse mode.

FAQ

What do I do if the error message ERPC TIMEOUT or ERPC CONNECTION CLOSED is reported?

  • Description: The error message com.alibaba.blink.store.core.rpc.RpcException: request xx UpsertRecordBatchRequest failed on final try 4, maxAttempts=4, errorCode=3, msg=ERPC_ERROR_TIMEOUT is reported.

  • Cause: The issue may be caused by write failures resulting from highly concurrent write operations or overloaded clusters. You can check whether the instance has high CPU loads. The CONNECTION CLOSED error may be reported when excessive loads on the backend node cause out-of-memory (OOM) errors or core dumps.

  • Solution: Retry the write operation. If the error persists, contact the Hologres technical support.

What do I do if the error message BackPresure Exceed Reject Limit is reported?

  • Cause: Workloads on writes in the backend are heavy and the system cannot clear data in the memory table in a timely manner. As a result, a write failure occurs.

  • Solution: Ignore the error if the error message is occasionally reported, or add the rpcRetries = '100' configuration for the sink to increase the number of write retries. If the error persists, contact the Hologres technical support.

What do I do if the error message The requested table name xxx mismatches the version of the table xxx from server/org.postgresql.util.PSQLException: An I/O error occurred while sending to the backend.Caused by: java.net.SocketTimeoutException: Read timed out is reported?

  • Cause: An ALTER TABLE statement is executed. As a result, the schema version of the Blink table is earlier than that on the server and the maximum number of retries is reached.

  • Solution: Ignore the error if it occurs only occasionally. If the error persists, contact the Hologres technical support.

What do I do if the error message Failed to query table meta for table is reported?

  • Cause: A foreign table is read. However, Hologres connectors do not support read and write operations on foreign tables. Otherwise, the error is caused by an issue related to the metadata of the Hologres instance.

  • Solution: Contact the Hologres technical support.

What do I do if the error message Cloud authentication failed for access id is reported?

  • Cause: The AccessKey pair is incorrect, or no user is added to the Hologres instance.

  • Solution:

    • Verify the AccessKey ID and AccessKey secret. In most cases, this error is reported if the AccessKey secret is incorrect or contains spaces.

    • Use the AccessKey pair to log on to the HoloWeb console. If the AccessKey pair is invalid, the error is reported again. If the user does not have permissions to manage the instance, FATAL: role"ALIYUN$xxxx"does not exist is reported. You must grant permissions to the user.

What do I do if no data can be queried after a dimension table is joined with another table?

  • Cause: The dimension table is a partitioned table. Hologres does not allow partitioned tables to serve as dimension tables.

  • Solution: Use standard tables as dimension tables.

What do I do if the error message Modify record by primary key is not on this table is reported?

  • Cause: An update mode is specified, but no primary key is configured for the Hologres result table.

  • Solution: Configure a primary key.

What do I do if the error message shard columns count is no match is reported?

  • Cause: Not all distribution key columns are written. By default, primary key columns are distribution key columns.

  • Solution: Write all distribution key columns.

What do I do if the error message Full row is required, but the column xxx is missing is reported?

  • Cause: Data in a required column is missing. This error is reported in earlier Hologres versions.

  • Solution: Assign values to the required columns.

What do I do if the number of JDBC connections sharply increases when VVP users read data from or write data to Hologres?

  • Cause: The maximum number of JDBC connections that can be used to read data from or write data to Hologres other than binary logs by using Hologres connectors that use VVP is reached. The maximum number of JDBC connections is calculated based on the following formula: Number of Hologres tables × Number of concurrent read or write operations × connectionSize. The connectionSize parameter is used for VVP tables, and the default value of the connectionSize parameter is 3.

  • Solution: Schedule an appropriate number of connections, and reduce the number of concurrent read/write operations or the value of the connectionSize parameter. If the number of concurrent read/write operations or the value of the connectionSize parameter cannot be reduced, set useRpcMode to true to switch to the RPC mode.

What do I do if Blink and VVP users cannot connect to Hologres when they attempt to read data from or write data to Hologres?

  • Cause: The Blink or VVP cluster is slow or unable to access the Internet.

  • Solution: Make sure that the Blink or VVP cluster resides in the same region as the Hologres instance and uses a VPC endpoint.

What do I do if the error message Hologres rpc mode dimension table does not support one to many join is reported?

  • Cause: The Blink or VVP dimension table in RPC mode is not a row-oriented table, or the field used to join tables is not the primary key.

  • Solution: Use the JDBC mode and use the column-oriented or hybrid row-column storage mode for the dimension table.

What do I do if the error message DatahubClientException is reported?

  • Description: The error message Caused by: com.aliyun.datahub.client.exception.DatahubClientException: [httpStatus:503, requestId:null, errorCode:null, errorMessage:{"ErrorCode":"ServiceUnavailable","ErrorMessage":"Queue Full"}] is reported.

  • Cause: The thread pool is full because a large number of binary log consumption jobs are simultaneously restarted for a specific reason.

  • Solution: Consume binary logs in batches.

What do I do if the error message Error occurs when reading data from datahub is reported?

  • Description: The error message Error occurs when reading data from datahub, msg: [httpStatus:500, requestId:xxx, errorCode:InternalServerError, errorMessage:Get binlog timeout.] is reported.

  • Cause: Each binary log contains a large amount of data, which makes the size of each RPC request exceed the upper limit.

  • Solution: Reduce the number of binary logs in each batch when a large number of data fields and long character strings exist in each row.

What do I do if the error message Caused by: java.lang.IllegalArgumentException: Column: created_time type does not match: flink row type: TIMESTAMP(6) WITH LOCAL TIME ZONE, hologres type: timestamp is reported?

  • Cause: A Flink field is of the TIMESTAMP(6) data type, which is not supported by Hologres.

  • Solution: Change the data type to TIMESTAMP.

What do I do if the error message Caused by: org.postgresql.util.PSQLException: FATAL: Rejected by ip white list. db = xxx, usr=xxx, ip=xx.xx.xx.xx is reported?

  • Cause: An IP address whitelist configured in Hologres does not contain the IP address that is used by Flink to connect to Hologres.

  • Solution: Add the IP address used by Flink to the IP address whitelist of Hologres. For more information, see Configure an IP address whitelist.

What do I do if the error message Caused by: java.lang.RuntimeException: shaded.hologres.com.aliyun.datahub.client.exception.DatahubClientException: [httpStatus:400, requestId:xx, errorCode:TableVersionExpired, errorMessage:The specified table has been modified, please refresh cursor and try again is reported?

  • Cause: DDL operations are performed on the source table, and the table version changes. As a result, data consumption fails.

  • Solution: Upgrade the version of Realtime Compute for Apache Flink to 4.0.16 or later. After the upgrade, data consumption retries are made in case of failures.

When a job that consumes binary logs starts, an error is reported to indicate that the shard ID does not exist. What do I do?

  • Cause: The number of shards in the table whose data you want to consume changes after you rename the table or perform other operations on the table. However, the shard information of the original table is used when the job recovers from the checkpoint.

  • Solution: After you perform operations such as table recreation, the binary log consumption offset that is stored in the checkpoint is invalid. You must start the job without the state.

What do I do if the error message ERROR,22021,"invalid byte sequence for encoding ""UTF8"": 0x00" is reported?

  • Cause: When you perform a point query in a dimension table, the primary key of the STRING data type contains non-UTF-8 encoded characters. As a result, the SQL statement fails to be executed.

  • Solution: Process the upstream dirty data.

What do I do if the error message hologres.org.postgresql.util.PSQLException: ERROR: syntax error is reported?

  • Cause: You must specify a slot when you consume data in a binary log in JDBC mode. This error may occur if the name of the created slot contains unsupported characters. The slot name can contain only lowercase letters, digits, and underscores (_).

  • Solution: Recreate a slot, or use the automatic slot creation feature of VVR 6.0.7.

What do I do if the error message create table hologres.hg_replication_progress failed is reported?

  • Cause: You may need to create a hg_replication_progress table when the JDBC mode is used to consume binary logs if the table does not exist in the current database. However, the number of shards that can be created for the instance reached the upper limit. As a result, the creation failure error is reported.

  • Solution: Clean up databases that you no longer need.

What do I do if a job is suspended during the runtime? The thread dump field indicates that the job suspends when the JDBC driver is loaded at the Class.forName phase in most cases.

  • Cause: Some static initialization operations are performed when the JDBC driver is loaded on JDK 8. A race condition may occur during multi-thread loading.

  • Solution: Retry the operation or use a connector whose Realtime Compute for Apache Flink version is 6.0.7.

What do I do if the error message "no table is defined in publication" or "The table xxx has no slot named xxx" is reported when I consume binary logs in JDBC mode?

  • Cause: When you drop a table and create another table with the same name, the publication that is bound to the old table is not dropped.

  • Solution: Execute the select * from pg_publication where pubname not in (select pubname from pg_publication_tables); statement in Hologres to query the publication that is not dropped. Execute the drop publication xx; statement to drop the publication. Then, restart the job.

What do I do if the error message "permission denied for database" is reported when I publish a job?

  • Cause: The required permissions are not granted to your account when you want to consume binary logs in JDBC mode in Hologres V1.3 or Hologres V2.0.

  • Solution: We recommend that you upgrade your Hologres instance to V2.1 and use a connector that runs on Ververica Runtime (VVR) 8.0.5 or later. This way, you can consume binary logs by using an account that is granted the read-only permission. If the upgrade is not allowed, grant the required permissions based on Limits in the "Use the Hologres connector of Realtime Compute for Apache Flink to consume data of Hologres in real time" topic.

What do I do if the error message "table writer init failed: Fail to fetch table meta from sm" is reported?

  • Cause: Data is written to the table after the truncate or rename operation is performed on the table.

  • Solution: This issue occasionally occurs and can be ignored. This issue is automatically resolved after a job failover. In Hologres V2.1.1 to V2.1.14, the replay cache time is increased for FE nodes. If you execute a DDL statement and then a data manipulation language (DML) statement on a table, the DDL replay slows down. The probability of similar exceptions may increase. We recommend that you upgrade your Hologres instance to the latest minor version of V2.1.

What do I do if an error message similar to "java.lang.ClassNotFoundException: com.alibaba.ververica.connectors.hologres.binlog.source.reader.HologresBinlogRecordEmitter" is reported when I use connector dependencies to develop DataStream deployments in an on-premises environment?