This topic describes how to troubleshoot Blink and Flink issues when you use Hologres.
Background information
- Hologres performance
- Write performance
- Column-oriented table: InsertOrIgnore > InsertOrReplace > InsertOrUpdate
- Row-oriented table: InsertOrReplcae = InsertOrUpdate > InsertOrIgnore
Insert mode Description InsertOrIgnore Discards the data that you want to write if the inserted primary key has the same value as the primary key in the result table. InsertOrReplace Updates the table based on the inserted primary key if the inserted primary key has the same value as the primary key in the result table. If the written data does not cover all columns, the null value is inserted into the columns that are not overwritten. InsertOrUpdate Updates the table based on the inserted primary key if the inserted primary key has the same value as the primary key in the result table. If the written data does not cover all columns, the columns that are not overwritten are not updated. - Point query performance
Row-oriented storage = Hybrid row-column storage > Column-oriented storage
- Write performance
- The following table describes the Blink, Flink (VVP), and open source Flink service
types supported by Hologres.
Service type Data storage Description Source table Result table Dimension table Binlog Hologres Catalog Fully managed Flink Row-oriented storage and column-oriented storage are supported. Row-oriented storage and column-oriented storage are supported. We recommend that you use row-oriented storage. Supported. Supported. None. Blink in exclusive mode Row-oriented storage and column-oriented storage are supported. Row-oriented storage and column-oriented storage are supported. We recommend that you use row-oriented storage. Hologres V0.8 supports only row-oriented storage. Hologres V0.9 and later support both row-oriented storage and column-oriented storage. We recommend that you use row-oriented storage. Not supported. Blink in exclusive mode will be discontinued. We recommend that you use fully managed Flink.
Open source Flink V1.10 Row-oriented storage and column-oriented storage are supported. Row-oriented storage and column-oriented storage are supported. None. Not supported. Not supported. None. Open source Flink V1.11 Row-oriented storage and column-oriented storage are supported. Row-oriented storage and column-oriented storage are supported. We recommend that you use row-oriented storage. Not supported. Not supported. The source code of Hologres is available in open source Flink V1.11 and later. For more information, see alibabacloud-hologres-connectors. Open source Flink V1.12 Row-oriented storage and column-oriented storage are supported. Row-oriented storage and column-oriented storage are supported. We recommend that you use row-oriented storage. Not supported. Not supported. - The following SQL code provides an example on how to map a Blink or Flink table to
a Hologres table:
A Blink, Flink (VVP), or Flink SQL table is created in Flink and then mapped to a physical table based on parameter settings. Blink, Flink (VVP), or Flink SQL tables cannot be mapped to foreign tables in Hologres.create table holo_source( 'hg_binlog_lsn' BIGINT HEADER, 'hg_binlog_event_type' BIGINT HEADER, 'hg_binlog_timestamp_us' BIGINT HEADER, A int, B int, C timestamp ) with ( type = 'hologres', 'endpoint' = 'xxx.hologres.aliyuncs.com:80', -- The endpoint of the Hologres instance. 'userName' = '', -- The AccessKey ID of your Alibaba Cloud account. 'password' = '', -- The AccessKey secret of your Alibaba Cloud account. 'dbName' = 'binlog', -- The name of the database in the Hologres instance. 'tableName' ='test' -- The name of the table in the Hologres instance. 'binlog' = 'true', );
Troubleshoot the issue of slow real-time writes
Troubleshoot issues with written data
Issues with written data are typically caused by out-of-order data. For example, if data that uses the same primary key is distributed in different Flink tasks, the data cannot be written in the original order. You must check the logic of Flink SQL and make sure that the written data is shuffled based on the primary key of the destination Hologres table.
Troubleshoot queries in the dimension table
- JOIN operations for dimension tables and two data streams
In scenarios that involve data writes to Hologres, you must check whether the JOIN operations for dimension tables are correct. You must make sure that you are not using JOIN operations for two data streams. The following code provides an example of the JOIN operations for dimension tables. If
proctime AS PROCTIME()
andhologres_dim FOR SYSTEM_TIME AS
are missing, the operations become JOIN operations for two data streams.CREATE TEMPORARY TABLE datagen_source ( a INT, b BIGINT, c STRING, proctime AS PROCTIME() ) with ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE hologres_dim ( a INT, b VARCHAR, c VARCHAR ) with ( 'connector' = 'hologres', ... ); CREATE TEMPORARY TABLE blackhole_sink ( a INT, b STRING ) with ( 'connector' = 'blackhole' ); insert into blackhole_sink select T.a,H.b FROM datagen_source AS T JOIN hologres_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;
- Queries in the dimension table
- Check the storage mode of the dimension table.
Check whether the storage mode of the dimension table is row-oriented storage, column-oriented storage, or hybrid row-column storage.
- Check whether the latency of queries in the dimension table is high.
Backpressure on the Flink or Blink join node is a common issue in the use of dimension tables and causes low throughput.
- Check the join mode of the Flink dimension table.
The JOIN operation of Hologres dimension table connectors of Flink can be performed in synchronous or asynchronous mode. The asynchronous mode is better than the synchronous mode. The following code provides an example on how to enable the asynchronous mode:
CREATE TABLE hologres_dim( id INT, len INT, content VARCHAR ) with ( 'connector'='hologres', 'dbname'='<yourDbname>', -- The name of the Hologres database. 'tablename'='<yourTablename>', -- The name of the Hologres table to which you want to write data. 'username'='<yourUsername>', -- The AccessKey ID of your Alibaba Cloud account. 'password'='<yourPassword>', -- The AccessKey secret of your Alibaba Cloud account. 'endpoint'='<yourEndpoint>' -- The Virtual Private Cloud (VPC) endpoint of the Hologres instance. 'async' = 'true'-- Enable the asynchronous mode. );
- Check the query latency in the backend.
Check the real-time write latency metric.
- Check whether the dimension table is a column-oriented table. Column-oriented dimension tables require high overheads in scenarios with a high QPS.
- If the dimension table is a row-oriented table, high latency is typically caused by the high load of the instance. In this case, you must scale up the instance.
- Check the join mode of the Flink dimension table.
- Check whether the join key is the primary key.
Hologres connectors of VVR 4.x (Flink 1.13) and later allow queries by using Holo Client based on columns other than primary key columns. In this case, the query performance is usually low and the instance load is high. In addition, tables are not optimized when they are created. You must optimize the table structure. A common practice is to set the join key as the distribution key, which enables shard pruning.
- Check backpressure on the Blink node.
If no issues are identified after you perform the preceding checks on Hologres, slow queries may be caused by the slow operation of Blink. In this case, you must check backpressure on the sink node. If only one sink node is used, you cannot check whether backpressure exists. You must check the components of the sink node. You can use the same method to check backpressure on the join node. For more information, contact the Flink technical support personnel.
- Check the storage mode of the dimension table.
Usage notes of connections
Table type | Default number of connections (number of concurrent running Flink jobs) |
---|---|
Binlog source table | 0 |
Source table generated by batch jobs | 1 |
Dimension table | 3 (The value can be changed by using the connectionSize parameter.)
|
Result table | 3 (The value can be changed by using the connectionSize parameter.)
|
- Calculation of connections
- Default setting
By default, the maximum number of connections that a job uses can be calculated by using the following formula:
Maximum number of connections = (Number of batch source tables × 1 + Number of dimension tables × connectionSize + Number of result tables × connectionSize) × Number of concurrent jobs
For example, a job has a source table containing full and incremental data, two dimension tables, and three result tables, and all these tables use the default value of the
connectionSize
parameter. If the number of concurrent jobs is set to5
, the number of connections used is 80:(1 × 1 + 2 × 3 + 3 × 3) × 5 = 80
. - Connection reuse
Realtime Compute for Apache Flink whose engine version is Flink 1.13-VVR 4.1.12 or later supports connection reuse. If dimension tables and result tables within a concurrent job are configured with the same value for the
connectionPoolName
parameter, the dimension tables and result tables use the same connection pool. In the preceding example, if two dimension tables and three result tables are configured with the same value for theconnectionPoolName
parameter, and theconnectionSize
is set to5
, the number of connections used is 30:(1 × 1 + 5) × 5 = 30
.Note The connection reuse mode is applicable to most scenarios. However, in scenarios with a large number of dimension tables or in scenarios without enabling the asynchronous mode and caching, synchronous point queries are frequently performed. This way, multi-table connection reuse may cause slow queries. In this case, you can configure connection reuse only for result tables. - Other scenarios
- When a job is started, three to six connections need to be established to verify the metadata of a table. After the job runs as expected, the connections are released.
- Fully managed Flink supports features such as Hologres catalogs, the CREATE TABLE AS statement (CTAS), and the CREATE DATABASE AS statement (CDAS). When you use these features, connections are used. By default, a job that uses Hologres catalogs consumes three more connections for DDL operations such as creating a table.
- Default setting
- Diagnosis of connections
If a job has many tables and a lot of jobs are running in parallel, a large number of connections are used. The number of connections in Hologres may reach the upper limit. You can use the following methods to learn and diagnose the current connections:
- Execute the following statement to query the active queries on the current instance
by using the
pg_stat_activity
view in HoloWeb. For more information, see Query the pg_stat_activity view. In theapplication_name
field, if the value of a query isververica-connector-hologres
, it indicates that the query uses the read and write connections of Realtime Compute for Apache Flink.SELECT application_name, COUNT (1) AS COUNT FROM pg_stat_activity WHERE backend_type = 'client backend' AND application_name != 'hologres' AND usename != 'holo_admin' GROUP BY application_name;
- If the value for concurrent jobs is set to a larger number than expected, the number
of connections is large when a job is started, and then decreases after the job runs
for a period of time. The result is displayed on the Monitoring Information tab of
the instance details page in the Hologres console. The reason for the result is that
many idle connections are closed. This result indicates that the job does not need
such a large number of concurrent jobs or connections. You can set an appropriate
number of connections, reduce the number of concurrent jobs or the value of the
connectionSize
parameter, or use the connection reuse mode.
- Execute the following statement to query the active queries on the current instance
by using the
Common errors and troubleshooting
ERPC TIMEOUT
orERPC CONNECTION CLOSED
- Description: The error
com.alibaba.blink.store.core.rpc.RpcException: request xx UpsertRecordBatchRequest failed on final try 4, maxAttempts=4, errorCode=3, msg=ERPC_ERROR_TIMEOUT
is reported. - Cause: The TIMEOUT error may be caused by write failures resulting from heavy workloads
on writes or busy clusters. You can check whether the instance has heavy CPU loads.
The
CONNECTION CLOSED
error may be reported when excessive loads on the backend node cause out-of-memory (OOM) errors or core dumps. - Solution: Retry the write operation. If the error persists, contact the Hologres technical support personnel.
- Description: The error
- Error:
BackPresure Exceed Reject Limit
.- Cause: Heavy workloads on writes in the backend prevent the system from clearing the data of the memory table. In this case, a write failure occurs.
- Solution: Ignore the error if it occurs only occasionally, or use the rpcRetries = '100' parameter setting to increase the number of write retries. If the error persists, contact the Hologres technical support personnel.
- Error:
The requested table name xxx mismatches the version of the table xxx from server/org.postgresql.util.PSQLException: An I/O error occurred while sending to the backend.Caused by: java.net.SocketTimeoutException: Read timed out
.- Cause: An ALTER TABLE statement is executed. As a result, the schema version of the Blink table is earlier than that on the server and the maximum number of retries is reached.
- Solution: Ignore the error if it occurs only occasionally. If the error persists, contact the Hologres technical support personnel.
- Error:
Failed to query table meta for table
.- Cause: A foreign table is read. However, Hologres connectors do not support read and write operations on foreign tables. Otherwise, the error is caused by an issue related to the metadata of the Hologres instance.
- Solution: Contact the Hologres technical support personnel.
- Error:
Cloud authentication failed for access id
.- Cause: The AccessKey pair is incorrect, or no user is added to the Hologres instance.
- Solution:
- Verify the AccessKey ID and AccessKey secret.
- Use the AccessKey pair to log on to the HoloWeb console. If the AccessKey pair is
incorrect, the same error is reported. If the user does not have permissions to manage
the instance,
FATAL: role"ALIYUN$xxxx"does not exist
is reported. You must grant permissions to the user.
- No data can be queried when a dimension table is joined with another table.
- Cause: The dimension table is a partitioned table. Dimension tables do not support partitioned tables.
- Solution: Use standard tables as dimension tables.
- Error:
Modify record by primary key is not on this table
.- Cause: An update mode is specified, but the Hologres result table does not contain a primary key.
- Solution: Set a primary key.
- Error:
shard columns count is no match
.- Cause: Not all distribution columns are written. By default, primary key columns are distribution columns.
- Solution: Write all distribution columns.
- Error:
Full row is required, but the column xxx is missing
.- Cause: Data in a required column is missing. This error is reported in earlier Hologres versions.
- Solution: Assign values to the required columns.
- The number of JDBC connections temporarily increases when VVP users read or write
Hologres data.
- Cause: The maximum number of JDBC connections allowed to read or write Hologres data
other than binary logs by using Hologres connectors that use VVP is reached. The maximum
number of JDBC connections is calculated based on the following formula:
Number of Hologres tables × Number of concurrent read or write operations × connectionSize
. connectionSize is a parameter for VVP tables, and the default value of the connectionSize parameter is 3. - Solution: Schedule an appropriate number of connections, and reduce the number of concurrent read/write operations or the value of the connectionSize parameter. If the number of concurrent read/write operations or the value of the connectionSize parameter cannot be reduced, set useRpcMode to true to switch to the RPC mode.
- Cause: The maximum number of JDBC connections allowed to read or write Hologres data
other than binary logs by using Hologres connectors that use VVP is reached. The maximum
number of JDBC connections is calculated based on the following formula:
- Blink and VVP users cannot connect to Hologres when they attempt to read or write
Hologres data.
- Cause: The Blink or VVP cluster is slow or unable to access the Internet.
- Solution: Make sure that the Blink or VVP cluster resides in the same region as the Hologres instance and uses a VPC endpoint.
- Error:
Hologres rpc mode dimension table does not support one to many join
.- Cause: The Blink or VVP dimension table in RPC mode is not a row-oriented table, or the field used to join tables is not the primary key.
- Solution: Use the JDBC mode and use the column-oriented or hybrid row-column storage mode for the dimension table.
- DatahubClientException
- Description: The error
Caused by: com.aliyun.datahub.client.exception.DatahubClientException: [httpStatus:503, requestId:null, errorCode:null, errorMessage:{"ErrorCode":"ServiceUnavailable","ErrorMessage":"Queue Full"}]
is reported. - Cause: The thread pool is full because a large number of binary log consumption jobs are simultaneously restarted for a specific reason.
- Solution: Consume binary logs in batches.
- Description: The error
- Error occurs when reading data from datahub
- Description: The error
Error occurs when reading data from datahub, msg: [httpStatus:500, requestId:xxx, errorCode:InternalServerError, errorMessage:Get binlog timeout.]
is reported. - Cause: Each binary log contains a large amount of data, which makes the size of each RPC request exceed the upper limit.
- Solution: Reduce the number of binary logs in each batch when a large number of data fields and long character strings exist in each row.
- Description: The error
- Error:
Caused by: java.lang.IllegalArgumentException: Column: created_time type does not match: flink row type: TIMESTAMP(6) WITH LOCAL TIME ZONE, hologres type: timestamp
.- Cause: A Flink field is of the TIMESTAMP(6) data type, which is not supported by Hologres.
- Solution: Change the data type to TIMESTAMP.
- Error:
Caused by: org.postgresql.util.PSQLException: FATAL: Rejected by ip white list. db = xxx, usr=xxx, ip=xx.xx.xx.xx
.- Cause: An IP address whitelist is configured in Hologres, which does not contain the IP address used by Flink to connect to Hologres.
- Solution: Add the IP address used by Flink to the IP address whitelist of Hologres. For more information, see Configure an IP address whitelist.