This topic provides answers to some frequently asked questions about the upstream and downstream storage of Realtime Compute for Apache Flink.

How do I obtain JSON data by using fully managed Flink?

  • For more information about how to obtain common JSON data, see JSON Format.
  • If you want to obtain nested JSON data, you can define JSON objects in the ROW format in the DDL statement for the source table, define the keys that correspond to the JSON data that you want to obtain in the DDL statement for the result table, and configure the method to obtain keys in the DML statement to obtain the values that correspond to the nested keys. Sample code:
    • Test data
      {
          "a":"abc",
          "b":1,
          "c":{
              "e":["1","2","3","4"],
              "f":{"m":"567"}
          }
      }
    • DDL syntax that is used to create a source table
      CREATE TEMPORARY TABLE `kafka_table` (
        `a` VARCHAR,
         b int,
        `c` ROW<e ARRAY<VARCHAR>,f ROW<m VARCHAR>>  --c specifies a JSON object, which corresponds to ROW in fully managed Flink. e specifies a JSON list, which corresponds to ARRAY. 
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'xxx',
        'properties.bootstrap.servers' = 'xxx',
        'properties.group.id' = 'xxx',
        'format' = 'json',
        'scan.startup.mode' = 'xxx'
      );
    • DDL syntax that is used to create a result table
      CREATE TEMPORARY TABLE `sink` (
       `a` VARCHAR,
        b INT,
        e VARCHAR,
        `m` varchar
      ) WITH (
        'connector' = 'print',
        'logger' = 'true'
      );
    • DML statement
      INSERT INTO `sink`
        SELECT 
        `a`,
        b,
        c.e[ 1], --Fully managed Flink traverses the array from 1. This example shows how fully managed Flink obtains Element 1 from the array. If you want to obtain the entire array, remove [1]. 
        c.f.m
      FROM `kafka_table`;
    • Test resultsTest results

Fully managed Flink is connected to Message Queue for Apache Kafka, but cannot read data from or write data to Message Queue for Apache Kafka. What do I do?

  • Cause

    If a forwarding mechanism, such as a proxy or a port mapping, is used to connect fully managed Flink to Message Queue for Apache Kafka, the Kafka client obtains the endpoints of Kafka brokers instead of the address of the proxy. The Kafka client is the Kafka connector of fully managed Flink. In this case, fully managed Flink cannot read data from or write data to Message Queue for Apache Kafka even if fully managed Flink is connected to Message Queue for Apache Kafka.

    The process to connect fully managed Flink to a Kafka client consists of the following steps:
    1. The Kafka client obtains the metadata of the Kafka broker, including the endpoints of all Kafka brokers.
    2. Fully managed Flink uses the endpoints of Kafka brokers that are obtained by the Kafka client to read data from or write data to Message Queue for Apache Kafka.
  • Troubleshooting
    To troubleshoot this issue, you can perform the following steps to check whether a forwarding mechanism, such as a proxy or a port mapping, is used to connect fully managed Flink to Message Queue for Apache Kafka:
    1. Use the ZooKeeper command-line tool zkCli.sh or zookeeper-shell.sh to log on to the ZooKeeper service that is used by your Message Queue for Apache Kafka cluster.
    2. Run a command based on the information about your Message Queue for Apache Kafka cluster to obtain the metadata of your Kafka brokers.
      In most cases, you can run the get /brokers/ids/0 command to obtain the metadata of Kafka brokers. You can obtain the endpoint of Message Queue for Apache Kafka from the endpoints field. endpoint
    3. Use commands such as ping or telnet to test the connectivity between fully managed Flink and the endpoint in the endpoints field.

      If the connectivity test fails, a forwarding mechanism, such as a proxy or a port mapping, is used to connect fully managed Flink to Message Queue for Apache Kafka.

  • Solution
    • Set up a direct connection between fully managed Flink and Message Queue for Apache Kafka, instead of using a forwarding mechanism such as a proxy or a port mapping. This way, fully managed Flink can directly connect to Message Queue for Apache Kafka by using the endpoint in the endpoints field of the Kafka broker metadata.
    • Contact O&M staff of Message Queue for Apache Kafka to set the advertised.listeners property of Kafka brokers to the forwarding address. This way, the metadata of the Kafka brokers obtained by the Kafka client contains the forwarding address.
      Note Only Kafka 0.10.2.0 and later allow you to add the forwarding address to the advertised.listeners property of Kafka brokers.

    For more information about this issue, see KIP-103: Separation of Internal and External traffic and Kafka network connection issues.

After the data of a Message Queue for Apache Kafka source table is calculated by using event time-based window functions, no data output is returned. Why?

  • Problem description

    After the data of a Message Queue for Apache Kafka source table is processed by using event time-based window functions, no data output is returned.

  • Cause

    No watermark is generated because a partition of the Message Queue for Apache Kafka source table has no data. As a result, no data is returned after event time-based window functions are used to calculate the data of a Message Queue for Apache Kafka source table.

  • Solution
    1. Make sure that all partitions of the Message Queue for Apache Kafka source table contain data.
    2. Enable the idleness detection feature for source data. In the upper-right corner of the deployment details page, click Configure. On the right side of the Draft Editor page, click the Advanced tab. In the panel that appears, add the following code to the Additional Configuration section and save the code for the code to take effect.
      table.exec.source.idle-timeout: 5
      For more information about the table.exec.source.idle-timeout parameter, see Configuration.

What is the purpose of the commit offset mechanism in fully managed Flink?

Fully managed Flink commits the read offset to Message Queue for Apache Kafka each time a checkpoint is generated. If checkpointing is disabled or the checkpoint interval is too large, you may fail to query the read offset in the Message Queue for Apache Kafka console.

Why does the error message "timeout expired while fetching topic metadata" appear even if a network connection is established between fully managed Flink and Message Queue for Apache Kafka?

Fully managed Flink may be unable to read data from Message Queue for Apache Kafka even if a network connection is established between fully managed Flink and Message Queue for Apache Kafka. To ensure that both services are connected and the data of Message Queue for Apache Kafka can be read, you must use the endpoint that is described in the cluster metadata returned by Kafka brokers during bootstrapping. For more information, see Flink-cannot-connect-to-Kafka. To check the network connection, perform the following operations:
  1. Use zkCli.sh or zookeeper-shell.sh to log on to the ZooKeeper service that is used by Message Queue for Apache Kafka cluster.
  2. Run the ls /brokers/ids command to obtain the IDs of all Kafka brokers.
  3. Run the get /brokers/ids/{your_broker_id} command to view the metadata information of Kafka brokers.

    You can query the endpoint from listener_security_protocol_map.

  4. Check whether fully managed Flink can connect to the endpoint.

    If the endpoint contains a domain name, configure the DNS service for fully managed Flink. For more information about how to configure the DNS service for fully managed Flink, see How do I resolve the domain name of the service on which a Flink deployment depends?.

How does the Kafka connector parse nested JSON-formatted data?

When the Kafka connector is used to parse the following JSON-formatted data, the data is parsed into an ARRAY<ROW<cola VARCHAR, colb VARCHAR>> field. This field is an array of the ROW type and contains two child fields of the VARCHAR data type. Then, the data is parsed by using a user-defined table-valued function (UDTF).
{"data":[{"cola":"test1","colb":"test2"},{"cola":"test1","colb":"test2"},{"cola":"test1","colb":"test2"},{"cola":"test1","colb":"test2"},{"cola":"test1","colb":"test2"}]})

How do I connect to a Kafka cluster for which security information is configured?

  1. Add the security configurations related to encryption and authentication to the parameters in the WITH clause of the DDL statement for the Kafka cluster. For more information about the security configurations, see SECURITY. The following code provides an example.
    Note You must add the properties. prefix to the security configurations.
    • Configure a Kafka table to use PLAIN as the SASL mechanism and provide the Java Authentication and Authorization Service (JAAS) configuration.
      CREATE TABLE KafkaTable (
        `user_id` BIGINT,
        `item_id` BIGINT,
        `behavior` STRING,
        `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
      ) WITH (
        'connector' = 'kafka',
        ...
        'properties.security.protocol' = 'SASL_PLAINTEXT',
        'properties.sasl.mechanism' = 'PLAIN',
        'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";'
      );
    • Use SASL_SSL as the security protocol and SCRAM-SHA-256 as the SASL mechanism.
      CREATE TABLE KafkaTable (
        `user_id` BIGINT,
        `item_id` BIGINT,
        `behavior` STRING,
        `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
      ) WITH (
        'connector' = 'kafka',
        ...
        'properties.security.protocol' = 'SASL_SSL',
        /* Configure SSL. */
        /* Configure the path of the CA certificate truststore provided by the server. */
        'properties.ssl.truststore.location' = '/flink/usrlib/kafka.client.truststore.jks',
        'properties.ssl.truststore.password' = 'test1234',
        /* Configure the path of the private key file keystore if client authentication is required. */
        'properties.ssl.keystore.location' = '/flink/usrlib/kafka.client.keystore.jks',
        'properties.ssl.keystore.password' = 'test1234',
        /* Configure SASL. */
        /* Configure SCRAM-SHA-256 as the SASL mechanism. */
        'properties.sasl.mechanism' = 'SCRAM-SHA-256',
        /* Configure JAAS. */
        'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"password\";'
      );
      Note
      • If properties.sasl.mechanism is set to SCRAM-SHA-256, set properties.sasl.jaas.config to org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule.
      • If properties.sasl.mechanism is set to PLAINTEXT, set properties.sasl.jaas.config to org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule.
  2. Upload all required files, such as a certificate, public key file, or private key file, in Additional Dependencies for the deployment.

    The files that are uploaded are stored in the /flink/usrlib directory. For more information about how to upload a file in Additional Dependencies, see Publish a draft.

    Note If the authentication mechanism for the username and password on your Kafka broker is SASL_SSL, but the authentication mechanism on the client is SASL_PLAINTEXT, the OutOfMemory exception is reported during deployment verification. In this case, you must change the authentication mechanism on the client.

What do I do if a service latency occurs when data is read from a Kafka source table?

  • Problem description
    The value of the currentEmitEventTimeLag metric indicates a latency of more than 50 years when data is read from the Kafka source table. The following figure shows the details. Latency
  • Troubleshooting
    1. Check whether the deployment is a JAR deployment or an SQL deployment.

      If the deployment is a JAR deployment, check whether the Kafka dependency in the Project Object Model (POM) file is a built-in dependency of fully managed Flink. If the Kafka dependency in the POM file is the dependency of Kafka, no latency-related metrics are displayed.

    2. Check whether data is inserted into all partitions of the Kafka source table in real time.
    3. Check whether the timestamp of metadata on a Kafka message is 0 or null.
      The data latency of the Kafka source table is calculated by subtracting the timestamp on a Kafka message from the current time. If the message does not contain the timestamp, the value of the currentEmitEventTimeLag metric indicates a latency of more than 50 years. You can use one of the following methods to identify the issue:
      • For an SQL deployment, you can define a metadata column to obtain the timestamp on a Kafka message. For more information, see Create a Message Queue for Apache Kafka source table.
        CREATE TEMPORARY TABLE sk_flink_src_user_praise_rt (
            `timestamp` BIGINT ,
            `timestamp` TIMESTAMP METADATA,  -- The timestamp of metadata. 
            ts as to_timestamp (
              from_unixtime (`timestamp`, 'yyyy-MM-dd HH:mm:ss')
            ),
            watermark for ts as ts - interval '5' second
          ) WITH (
            'connector' = 'kafka',
            'topic' = '',
            'properties.bootstrap.servers' = '',
            'properties.group.id' = '',
            'format' = 'json',
            'scan.startup.mode' = 'latest-offset',
            'json.fail-on-missing-field' = 'false',
            'json.ignore-parse-errors' = 'true'
          );
      • Write a simple Java program and use KafkaConsumer to read Kafka messages for testing.

How do I resume a deployment of fully managed Flink that fails to run after a DataHub topic is split or scaled in?

If a topic that is being read by fully managed Flink is split or scaled in, the deployment of fully managed Flink fails and cannot resume. If you want to resume the deployment, you must cancel the deployment, and then start the deployment.

Can I delete a DataHub topic that is being consumed?

No, you cannot delete or recreate DataHub topics that are being consumed.

What do the endPoint and tunnelEndpoint parameters mean? What happens if the two parameters are incorrectly configured?

For more information about the endPoint and tunnelEndpoint parameters, see Endpoints. If these two parameters are incorrectly configured in a virtual private cloud (VPC), the following task errors may occur:
  • If the endPoint parameter is incorrectly configured, the task stops when the progress reaches 91%.
  • If the tunnelEndpoint parameter is incorrectly configured, the task fails to run.

How do full MaxCompute source tables and incremental MaxCompute source tables read data from MaxCompute?

Full MaxCompute source tables and incremental MaxCompute source tables read data from MaxCompute by using MaxCompute Tunnel. Therefore, the read speed is limited by the bandwidth of MaxCompute Tunnel.

If MaxCompute is referenced as a data source, can the full MaxCompute source table or incremental MaxCompute source table read the data that is appended to an existing partition or table after a deployment of fully managed Flink starts?

No, the new data cannot be read. If new data is appended to the partition or the table that is read or is being read by the full MaxCompute source table or incremental MaxCompute source table after a deployment of fully managed Flink starts, the data cannot be read and a failover may be triggered.

Full MaxCompute source tables and incremental MaxCompute source tables use ODPS DOWNLOAD SESSION to read data from a partition or table. When you create a download session, the MaxCompute server creates an index file that contains the data mappings obtained when you create the download session. Subsequent data reading is performed based on the data mappings. Therefore, in most cases, the data that is appended to a MaxCompute table or to a partition in the table after you create a download session cannot be read. If new data is written to the MaxCompute source table, two exceptions occur:
  • If the MaxCompute data storage reads data by using a tunnel, the following error is returned when new data is written to a table or to a partition in the table: ErrorCode=TableModified,ErrorMessage=The specified table has been modified since the download initiated..
  • The accuracy of data cannot be guaranteed. If new data is written after MaxCompute Tunnel is disabled, the data is not read. If a deployment is recovered from a failover or is resumed, the data may not be correct. For example, existing data is read again but the new data may not be completely read.

After I suspend a deployment for a full MaxCompute source table or incremental MaxCompute source table, can I change the parallelism of deployments and resume the deployment that I suspended?

No, you cannot change the parallelism of deployments and resume a deployment that you suspended. MaxCompute determines which data in which partitions needs to be read for each parallel deployment and then records the consumption information about each parallel deployment in state data based on the parallelism. This way, MaxCompute can continue to read data from the most recent read position after a deployment is suspended and then resumed or after a deployment is recovered from a failover.

If you suspend a deployment for the full MaxCompute source table or an incremental MaxCompute source table, change the parallelism of deployments, and then resume the deployment, some data may be repeatedly read and some data may not be read.

Why is the data in the partitions before the start offset also read when you set the start offset of a deployment to 2019-10-11 00:00:00?

The start offset is valid only for Message Queue data sources, such as DataHub. You cannot apply the start offset in MaxCompute source tables. After you start a deployment of fully managed Flink, fully managed Flink reads data by using the following methods:
  • For a partitioned table, fully managed Flink reads data from all existing partitions.
  • For a non-partitioned table, fully managed Flink reads all existing data.

What do I do if an incremental MaxCompute source table detects a new partition and some data is not written to the partition?

No mechanism is provided to check whether the data in a partition is complete. If an incremental MaxCompute source table detects a new partition, the source table immediately reads data from the partition. If the incremental MaxCompute source table reads data from the ds partition of the MaxCompute partitioned table T, we recommend that you write data by using the following method: Execute the Insert overwrite table T partition (ds='20191010') ... statement without creating a partition. If the deployment succeeds, the partition and data are displayed.
Important We recommend that you do not use the following method to write data: Create a partition, such as ds=20191010 and write data to the partition. If the incremental MaxCompute source table consumes table T and detects the new partition ds=20191010, the source table immediately reads data from the new partition. If the data written to the partition is incomplete, the data fails to be read.

What do I do if the error message "ErrorMessage=Authorization Failed [4019], You have NO privilege'ODPS:***'" appears?

  • Problem description
    An error message appears on the Failover page or on the TaskManager.log page when a deployment is running. Error details:
    ErrorMessage=Authorization Failed [4019], You have NO privilege'ODPS:***'
  • Cause

    The user identity information specified in the MaxCompute DDL statements cannot be used to access MaxCompute.

  • Solution

    Use an Alibaba Cloud account, a RAM user, or a RAM role to authenticate the user identity. For more information, see User authentication.

How do I perform window aggregation if watermarks are not supported?

If you need to perform window aggregation on a MySQL CDC source table, you can convert time fields into window values and use GROUP BY to aggregate the window values. For example, if you want to calculate the number of orders and sales per minute in a store, you can use the following code:
SELECT shop_id, DATE_FORMAT(order_ts, 'yyyy-MM-dd HH:mm'), COUNT(*), SUM(price) FROM order_mysql_cdc GROUP BY shop_id, DATE_FORMAT(order_ts, 'yyyy-MM-dd HH:mm')

Can the MySQL CDC connector be used as a connector only for source tables?

Yes. The MySQL CDC connector can be used to read full data and incremental data from a table in the MySQL database. Therefore, the MySQL CDC connector can be used as a connector only for source tables. However, the ApsaraDB RDS for MySQL connector can be used as a connector for dimension tables or result tables.

When I restart a deployment, does the connector for the MySQL CDC source table consume data from the binary log file position at which the deployment is canceled or from the binary log file position at which the deployment is configured to start?

When you restart a deployment, you can configure the startup policy based on your business requirements. If you set Starting Strategy to NONE in the Deployment Starting Configuration dialog box, the connector for the MySQL CDC source table re-consumes data from the binary log file position at which the deployment is configured to start. If you set Starting Strategy to Latest State in the Deployment Starting Configuration dialog box, the connector for the MySQL CDC source table consumes data from the binary log file position at which the deployment is canceled.

For example, a deployment is configured to start from the binary log file position {file=mysql-bin.01, position=40} and the deployment is canceled after it runs for a period of time. In this case, data is consumed at the binary log file position {file=mysql-bin.01, position=210}. If you set Starting Strategy to NONE in the Deployment Starting Configuration dialog box, the connector for the MySQL CDC source table re-consumes data from the binary log file position {file=mysql-bin.01, position=40}. If you set Starting Strategy to Latest State in the Deployment Starting Configuration dialog box, the connector for the MySQL CDC source table consumes data from the binary log file position {file=mysql-bin.01, position=210}.

Note When you restart a deployment, make sure that the required binary log file is not deleted from the server due to expiration. Otherwise, an error is returned.

How does the connector for a MySQL CDC source table work? How does a MySQL CDC source table affect a database?

If the scan.startup.mode parameter in the WITH clause for a MySQL CDC source table is set to initial, the connector for the MySQL CDC source table connects to the MySQL database by using a Java Database Connectivity (JDBC) driver, executes a SELECT statement to read full data, and then records the binary log file position. The default value of the scan.startup.mode parameter is initial. After full data reading is complete, the connector reads incremental data from the binary log file at the binary log file position that is recorded.

During full data reading, the query load in the MySQL database may increase because the SELECT statement is executed to query data. During incremental data reading, the binlog client is used to connect to the MySQL database to read binary log data. If the number of data tables that are used increases, excessive connections may exist. You can run the following MySQL command to query the maximum number of connections:
show variables like '%max_connections%';

How do I enable the system to skip the snapshot stage and read only the change data?

To enable the system to skip the snapshot stage and read only the change data, you can configure the scan.startup.mode parameter in the WITH clause. You can set this parameter to latest-offset, earliest-offset, specific-offset, or timestamp based on your business requirements. For more information, see the description of the scan.startup.mode parameter in the "Parameters in the WITH clause" section in Create a MySQL CDC source table.

How does the system read data from a MySQL database on which sharding is performed?

For example, a MySQL database has multiple tables such as table user_00, table user_02, and table user_99 after sharding, and the schemas of these tables are the same. In this scenario, you can use the table-name option to specify a regular expression to match multiple tables whose data can be read. For example, you can set table-name to user_.* to monitor all tables whose prefix is user_. If the schemas of all tables in the database are the same, you can use the database-name option to achieve the same effect.

What do I do if the data reading efficiency is low and backpressure exists when full data is read from tables?

If the sink processes data at a low speed, backpressure may occur. You can check whether the sink has backpressure. If the sink has backpressure, use one of the following methods to solve the backpressure issue on the sink first. To resolve this issue, you can use one of the following methods:
  • Increase the parallelism.
  • Enable aggregation optimization features, such as miniBatch.

How do I determine whether full data synchronization is complete in a MySQL CDC deployment?

  • You can determine whether full data synchronization is complete in the deployment based on the value of the currentEmitEventTimeLag metric on the Metrics tab of the Deployments page.
    The currentEmitEventTimeLag metric reflects the difference between the time at which the source sends a data record to the sink and the time at which the data record is generated in the database. This metric is used to measure the delay from the time when the data is generated in the database to the time when the data leaves the source. Metric
    Description of the value of the currentEmitEventTimeLag metric:
    • If the value of currentEmitEventTimeLag is less than or equal to 0, the full data synchronization in the MySQL CDC deployment is not complete.
    • If the value of currentEmitEventTimeLag is greater than 0, the MySQL CDC deployment completes full data synchronization and starts to read binary log data.
  • Check whether the logs of the TaskManager of the MySQL CDC source table contain "BinlogSplitReader is created". If this message appears, full data is read. The following figure shows "BinlogSplitReader is created" in the logs of the TaskManager.
    Logs

What do I do if incremental data cannot be synchronized after full data synchronization is complete?

  • After full data synchronization is complete, the system automatically switches the data synchronization deployment to the incremental data synchronization phase. If the system runs multiple MySQL CDC deployments at the same time to read full data, the system needs to wait for one more checkpoint before the data synchronization deployment enters the incremental data synchronization phase. This ensures that full data is written to the sink before the incremental data is read. This way, data accuracy is ensured.

    However, if the checkpointing interval that you specified is excessively large, the system needs to wait for a period of time to start incremental data synchronization. For example, if you set the checkpointing interval to 20 minutes, the system waits for 20 minutes after the full data synchronization is complete.

    To avoid this issue, we recommend that you specify a valid checkpointing interval based on your business requirements.

  • If you use an instance that runs ApsaraDB RDS for MySQL V5.6 and can process only read requests, the binary log file that is provided by the instance is simplified and the binary log file does not contain data. In this case, the MySQL CDC connector cannot read incremental data. In this case, we recommend that you use an instance that can process write requests or update the ApsaraDB RDS for MySQL instance to a version later than V5.6.

What do I do if the load on the MySQL database becomes high due to multiple MySQL CDC deployments?

The connector for a MySQL CDC source table needs to connect to a database to read binary log data. If the number of source tables increases, the load on the database also increases. To reduce the load on the database, you can synchronize data from a MySQL CDC source table to a Message Queue for Apache Kafka result table and consume data in the result table. This way, MySQL CDC deployments do not rely on the reading of binary log data. For more information, see Synchronize data from all tables in a MySQL database to Kafka.

If the load on the database becomes high due to data synchronization by using the CREATE TABLE AS statement, you can merge multiple deployments that use the CREATE TABLE AS statement into one deployment to run. If the configurations of the deployments that use the CREATE TABLE AS statement are the same, you can configure the same server ID for each MySQL CDC source table to reuse the data source and reduce the load on the database. For more information, see Example 4: execution of multiple CREATE TABLE AS statements.

What do I do if the error message "com.github.shyiko.mysql.binlog.network.ServerException" appears?

  • Problem description
    Caused by: com.github.shyiko.mysql.binlog.network.ServerException: The slave is connecting using CHANGE MASTER TO MASTER_AUTO_POSITION = 1, but the master has purged binary logs containing GTIDs that the slave requires.
  • Cause

    A MySQL CDC connector records the position of the binary log file before fully managed Flink starts to read full data. After fully managed Flink reads full data, fully managed Flink reads incremental data from the position of the binary log file that is recorded by the MySQL CDC connector. In most cases, this error occurs because the full data reading is not complete before the MySQL CDC connector deletes the data of the MySQL binary log file position.

  • Solution
    Check the rules that are used to delete MySQL binary log files, including the rules that are related to the time, the storage space, and the number of files. We recommend that you retain binary log files for more than one day. For more information about binary log files of ApsaraDB RDS for MySQL, see Delete the binary log files of an ApsaraDB RDS for MySQL instance.
    Note In Ververica Runtime (VVR) 4.0.8 and later, fully managed Flink can read full data in parallel deployments by using a MySQL CDC connector. This way, full data reading is accelerated and the probability of the preceding error is reduced.

What do I do if the error message "The primary key is necessary when enable 'Key: 'scan.incremental.snapshot.enabled'" appears?

  • Problem description
    When a grammar check is performed on a MySQL CDC source table in VVR 4.0.X, the following error messages appear:
    Caused by: org.apache.flink.table.api.ValidationException: The primary key is necessary when enable 'Key: 'scan.incremental.snapshot.enabled' , default: true (fallback keys: [])' to 'true'
        at com.alibaba.ververica.cdc.connectors.mysql.table.MySqlTableSourceFactory.validatePrimaryKeyIfEnableParallel(MySqlTableSourceFactory.java:186)
        at com.alibaba.ververica.cdc.connectors.mysql.table.MySqlTableSourceFactory.createDynamicTableSource(MySqlTableSourceFactory.java:85)
        at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:134)
        ... 30 more
  • Cause
    No primary key is defined in the parameters in the WITH clause of the DDL statement that is used to create the MySQL CDC source table. In VVR 6.0.X and VVR 4.0.8 and later, data can be sharded based on the primary key to support data reading in multiple parallel deployments.
    Note In the VVR version that is earlier than VVR 4.0.8, MySQL CDC source tables support only data reading in a single deployment.
  • Solution
    • If you want to read data from a MySQL database in multiple parallel deployments in VVR 6.0.X or VVR 4.0.8 or later, add the primary key to the DDL statement.
    • If you use the VVR version that is earlier than VVR 4.0.8 and you do not want to read data from a MySQL database in multiple parallel deployments, add the scan.incremental.snapshot.enabled parameter to the DDL statement and set this parameter to false. This way, you do not need to define a primary key.

What do I do if the error message "java.io.EOFException: SSL peer shut down incorrectly" appears?

  • Problem description
    Caused by: java.io.EOFException: SSL peer shut down incorrectly
        at sun.security.ssl.SSLSocketInputRecord.decodeInputRecord(SSLSocketInputRecord.java:239) ~[?:1.8.0_302]
        at sun.security.ssl.SSLSocketInputRecord.decode(SSLSocketInputRecord.java:190) ~[?:1.8.0_302]
        at sun.security.ssl.SSLTransport.decode(SSLTransport.java:109) ~[?:1.8.0_302]
        at sun.security.ssl.SSLSocketImpl.decode(SSLSocketImpl.java:1392) ~[?:1.8.0_302]
        at sun.security.ssl.SSLSocketImpl.readHandshakeRecord(SSLSocketImpl.java:1300) ~[?:1.8.0_302]
        at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:435) ~[?:1.8.0_302]
        at com.mysql.cj.protocol.ExportControlled.performTlsHandshake(ExportControlled.java:347) ~[?:?]
        at com.mysql.cj.protocol.StandardSocketFactory.performTlsHandshake(StandardSocketFactory.java:194) ~[?:?]
        at com.mysql.cj.protocol.a.NativeSocketConnection.performTlsHandshake(NativeSocketConnection.java:101) ~[?:?]
        at com.mysql.cj.protocol.a.NativeProtocol.negotiateSSLConnection(NativeProtocol.java:308) ~[?:?]
        at com.mysql.cj.protocol.a.NativeAuthenticationProvider.connect(NativeAuthenticationProvider.java:204) ~[?:?]
        at com.mysql.cj.protocol.a.NativeProtocol.connect(NativeProtocol.java:1369) ~[?:?]
        at com.mysql.cj.NativeSession.connect(NativeSession.java:133) ~[?:?]
        at com.mysql.cj.jdbc.ConnectionImpl.connectOneTryOnly(ConnectionImpl.java:949) ~[?:?]
        at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:819) ~[?:?]
        at com.mysql.cj.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:449) ~[?:?]
        at com.mysql.cj.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:242) ~[?:?]
        at com.mysql.cj.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:198) ~[?:?]
        at org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider.getOrEstablishConnection(SimpleJdbcConnectionProvider.java:128) ~[?:?]
        at org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat.open(AbstractJdbcOutputFormat.java:54) ~[?:?]
        ... 14 more
  • Cause

    In MySQL V8.0.27, the MySQL database is connected over SSL by default. However, the MySQL database that is connected by using a JDBC driver cannot be accessed over SSL. As a result, the error message appears.

  • Solution
    • If you can upgrade your VVR version to VVR 6.0.2 or later, add the setting jdbc.properties.useSSL'='false' to the WITH parameter of the MySQL CDC table to resolve this issue.
    • If the table is declared only as a dimension table, set the connector parameter to rds in the WITH parameter of the MySQL CDC table and add the setting characterEncoding=utf-8&useSSL=false to the URL parameter. Example:
      'url'='jdbc:mysql://***.***.***.***:3306/test?characterEncoding=utf-8&useSSL=false'

What do I do if the error message "com.github.shyiko.mysql.binlog.network.ServerException: A slave with the same server_uuid/server_id as this slave has connected to the master" appears?

  • Problem description
    Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
        at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42) ~[?:?]
        at io.debezium.connector.mysql.MySqlStreamingChangeEventSource$ReaderThreadLifecycleListener.onCommunicationFailure(MySqlStreamingChangeEventSource.java:1185) ~[?:?]
        at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:973) ~[?:?]
        at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:606) ~[?:?]
        at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:850) ~[?:?]
        ... 1 more
    Caused by: io.debezium.DebeziumException: A slave with the same server_uuid/server_id as this slave has connected to the master; the first event '' at 4, the last event read from '/home/mysql/dataxxx/mysql/mysql-bin.xxx' at xxx, the last byte read from '/home/mysql/dataxxx/mysql/mysql-bin.xxx' at xxx. Error code: 1236; SQLSTATE: HY000.
        at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.wrap(MySqlStreamingChangeEventSource.java:1146) ~[?:?]
        at io.debezium.connector.mysql.MySqlStreamingChangeEventSource$ReaderThreadLifecycleListener.onCommunicationFailure(MySqlStreamingChangeEventSource.java:1185) ~[?:?]
        at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:973) ~[?:?]
        at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:606) ~[?:?]
        at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:850) ~[?:?]
        ... 1 more
    Caused by: com.github.shyiko.mysql.binlog.network.ServerException: A slave with the same server_uuid/server_id as this slave has connected to the master; the first event '' at 4, the last event read from '/home/mysql/dataxxx/mysql/mysql-bin.xxx' at xxx, the last byte read from '/home/mysql/dataxxx/mysql/mysql-bin.xxx' at xxx.
        at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:937) ~[?:?]
        at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:606) ~[?:?]
        at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:850) ~[?:?]
        ... 1 more
  • Cause

    When the connector for a MySQL CDC source table reads data, make sure that a server ID is configured for each parallelism and each server ID is unique. If the value of server-id in the data that is read by the connector for the MySQL CDC source table conflicts with the value of server-id in the CDC source table of the same deployment, the CDC source table of another deployment, or a synchronization tool, an error is returned.

  • Solution

    Specify a globally unique server ID for each parallelism of the MySQL CDC source table. For more information, see Precautions for MySQL CDC source tables.

What do I do if the error message "org.apache.kafka.connect.errors.ConnectException: The connector is trying to read binlog starting at GTIDs xxx and binlog file 'binlog.xxx', pos=xxx, skipping 4 events plus 1 rows, but this is no longer available on the server. Reconfigure the connector to use a snapshot when needed" appears?

  • Problem description
    Caused by: org.apache.kafka.connect.errors.ConnectException: The connector is trying to read binlog starting at GTIDs xxx and binlog file 'binlog.xxx', pos=xxx, skipping 4 events plus 1 rows, but this is no longer available on the server. Reconfigure the connector to use a snapshot when needed
        at io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:133)
        at io.debezium.connector.common. BaseSourceTask.start (BaseSourceTask.java:106) 
        at io.debezium.embedded.EmbeddedEngine.run (EmbeddedEngine.java:758) 
        at io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:171)
        at java.util.concurrent.ThreadPoolExecutor. runWorker(ThreadPoolExecutor.java:1147)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622)
        at java.lang.Thread.run(Thread.java:834)
  • Cause

    The binary log file from which data is read is deleted from the MySQL server.

    A binary log file is deleted due to the following reasons: 1. The retention period that is configured for the binary log file is excessively short. 2. The deployment processing speed is lower than the binary log generation speed and exceeds the maximum retention period of MySQL binary log files. As a result, the binary log files on the MySQL server are deleted and the binary log file position that is being read becomes invalid.

  • Solution

    If the deployment processing speed is always lower than the binary log generation speed, you can increase the retention period of binary log data. You can also check the status of the deployment. If backpressure exists in your deployment, you can optimize the deployment to reduce backpressure and accelerate the data consumption of the source table.

    If the status of the deployment is normal, the binary log data may be deleted due to other operations that are performed on the database. As a result, the database cannot be accessed. You can determine the cause of the deletion of the binary log data based on the information about the MySQL database.

What do I do if the error message "EventDataDeserializationException: Failed to deserialize data of EventHeaderV4 .... Caused by: java.net.SocketException: Connection reset" appears?

  • Problem description
    EventDataDeserializationException: Failed to deserialize data of EventHeaderV4 .... Caused by: java.net.SocketException: Connection reset
        at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeEventData(EventDeserializer.java:304)
        at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.nextEvent(EventDeserializer.java:227)
        at io.debezium.connector.mysql.BinlogReader$1.nextEvent(BinlogReader.java:252)
        at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:934)
    ... 3 more
    Caused by: java.io.EOFExceptionat com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.read (ByteArrayInputStream.java:192)
        at java.io.InputSt ream.read (InputStream.java:170)
        at java.io.InputSt ream.skip (InputStream.java:224)
        at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeEventData(EventDeserializer.java:301)
    ...    6 more
  • Causes
    • A network issue occurs.
    • Backpressure exists in the deployment. In this case, the source may fail to read data and the binlog client is idle. If the binlog client remains idle after the binary log file expires, the MySQL server closes the idle connection to reduce the number of connections. As a result, the preceding error occurs.
  • Solution
    If the error is caused by a network issue, you can increase the values of network-related parameters of the MySQL server.
    set global slave_net_timeout = 120; (Default value: 30s)
    set global thread_pool_idle_timeout = 120;

    If the error is caused by backpressure in the deployment, you can optimize the deployment to reduce the backpressure or increase the values of network-related parameters.

When output data is written to an ApsaraDB RDS result table, is a new data record inserted into the table, or is the result table updated based on the primary key value?

If a primary key is defined in the DDL statement, you can execute the INSERT INTO tablename(field1,field2, field3, ...) VALUES(value1, value2, value3, ...) ON DUPLICATE KEY UPDATE field1=value1,field2=value2, field3=value3, ...; statement to update data. If the primary key does not exist, the value is directly inserted into the table. If the primary key already exists, the existing value is replaced. If no primary key is defined in the DDL statement, you can execute the INSERT INTO statement to insert data.

How do I perform GROUP BY operations by using the unique index of an ApsaraDB RDS result table?

  • If you want to use the unique index of an ApsaraDB RDS result table to perform GROUP BY operations, you must declare the unique index in the GROUP BY clause in your deployment of fully managed Flink.
  • An ApsaraDB RDS table has only one auto-increment primary key, which cannot be declared as a primary key in a deployment of fully managed Flink.

Why is the INT UNSIGNED data type that is supported by MySQL physical tables, such as ApsaraDB RDS for MySQL physical tables or AnalyticDB for MySQL physical tables, declared as another data type in Flink SQL?

To ensure data precision, the Java Database Connectivity (JDBC) driver of MySQL converts the received data into a different data type based on the data type of the received data. For example, the MySQL JDBC driver converts the received data of the INT UNSIGNED type into the LONG type of MySQL. The LONG type of MySQL is mapped to the BIGINT type of Flink SQL. The MySQL JDBC driver converts the received data of the BIGINT UNSIGNED type into the BIGINTEGER type of MySQL. The BIGINTEGER type of MySQL is mapped to the DECIMAL(20, 0) type of Flink SQL.

What do I do if the error message "Incorrect string value: '\xF0\x9F\x98\x80\xF0\x9F...' for column 'test' at row 1" appears?

  • Problem description
    Caused by: java.sql.BatchUpdateException: Incorrect string value: '\xF0\x9F\x98\x80\xF0\x9F...' for column 'test' at row 1
    at sun.reflect.GeneratedConstructorAccessor59.newInstance(Unknown Source)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at com.mysql.cj.util.Util.handleNewInstance(Util.java:192)
    at com.mysql.cj.util.Util.getInstance(Util.java:167)
    at com.mysql.cj.util.Util.getInstance(Util.java:174)
    at com.mysql.cj.jdbc.exceptions.SQLError.createBatchUpdateException(SQLError.java:224)
    at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchedInserts(ClientPreparedStatement.java:755)
    at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchInternal(ClientPreparedStatement.java:426)
    at com.mysql.cj.jdbc.StatementImpl.executeBatch(StatementImpl.java:796)
    at com.alibaba.druid.pool.DruidPooledPreparedStatement.executeBatch(DruidPooledPreparedStatement.java:565)
    at com.alibaba.ververica.connectors.rds.sink.RdsOutputFormat.executeSql(RdsOutputFormat.java:488)
    ... 15 more
  • Cause

    The data contains emojis, which cannot be parsed during data encoding.

  • Solution

    When you use a JDBC driver to connect to a MySQL database, you must add the character set UTF-8 to the JDBC URL of the MySQL database, such as jdbc:mysql://<internalEndpoint>/<databaseName>?characterEncoding=UTF-8. For more information, see Using Character Sets and Unicode.

What do I do if a deadlock occurs when data is written to an ApsaraDB RDS for MySQL database (RDS/TDDL)?

  • Problem description
    When data is written to an ApsaraDB RDS for MySQL database that uses the Taobao Distributed Data Layer (TDDL) or RDS connector, a deadlock occurs.
    Note Realtime Compute for Apache Flink uses relational databases, such as ApsaraDB RDS for MySQL, to store result data. The TDDL and RDS connectors are used. If Realtime Compute for Apache Flink frequently writes data to an ApsaraDB RDS for MySQL table, deadlocks may occur.
  • Example of how a deadlock occurs
    For example, an INSERT operation preempts two locks (A,B) in sequence. A indicates a range lock used for two transactions (T1,T2). The table schema is (id(Auto-increment primary key ),nid(Unique key)). T1 contains two statements insert(null,2),(null,1). T2 contains one statement insert(null,2).
    1. At the t time point, the first statement in T1 is executed. T1 has two locks (A,B).
    2. At the t+1 time point, the statement in T2 is executed. Lock A is required to lock (-inf,2]. The ranges of the two transactions have an inclusion relationship. T2 cannot use Lock A until Lock A is released by T1.
    3. At the t+2 time point, the second statement in T1 is executed. Lock A is required to lock (-inf,1]. This range is included in (-inf,2]. Therefore, T2 is required to release the lock. T1 cannot use Lock A until Lock A is released by T2.
    When T1 and T2 are waiting for each other to release the lock, a deadlock occurs.
  • Differences in database engine locks between RDS/TDDL and Tablestore
    • RDS/TDDL: The row lock in InnoDB is used to lock an index rather than a record. In this case, if the same index key is used when you access records in different rows, a lock conflict may occur, and the data in the entire section cannot be updated.
    • Tablestore: Only a single row is locked. This does not affect the update of other data.
  • Solution

    In scenarios with high queries per second (QPS) or transactions per second (TPS) or highly parallel write operations, use Tablestore as the result table. We recommend that you do not use TDDL or ApsaraDB RDS as the result table for your Flink deployment.

    If you want to use a relational database such as ApsaraDB RDS for MySQL, take note of the following points:
    • Make sure that your deployment is not affected by read and write operations from other systems.
    • If the amount of data in your deployment is small, perform a single-concurrent write operation. In scenarios with high QPS or TPS or high concurrency, the write performance deteriorates.
    • Do not use a unique primary key specified by UniqueKey if possible. If you write data to a table with a unique primary key, a deadlock may occur. If the table must contain unique primary keys, sort the unique primary keys in descending order of differentiation. This significantly reduces the probability of deadlocks. For example, you can place the MD5 function at the beginning of day_time(20171010).
    • Shard databases and tables based on your business requirements to avoid writing data to a single table. For more information about the implementation, contact the database administrator.

Can I retract the updated data from an ApsaraDB for ClickHouse result table?

If a primary key is defined in the DDL statement that is used to create the ApsaraDB for ClickHouse result table of fully managed Flink, you can retract the updated data. ApsaraDB for ClickHouse is a column-oriented database management system used for online analytical processing (OLAP). If you use the UPDATE and DELETE operations to process data, the performance of this system is still low. If a primary key is defined in the DDL statement, fully managed Flink uses the ALTER TABLE UPDATE or DELETE statement to update or delete data. As a result, the data processing performance significantly decreases.

How do I view print data results in the console of fully managed Flink?

If you want to view print data results in the console of fully managed Flink, you can use one of the following methods:
  • Log on to the console of fully managed Flink to view data:
    1. In the left-side navigation pane, choose Applications > Deployments.
    2. Click the name of the desired deployment.
    3. Click the Logs tab.
    4. Click the Running Logs tab. Then, select the instance of the deployment that is running from the Selected Job drop-down list.
    5. On the Task Managers tab, click the value in the Path, ID column. Click the value of Path, ID
    6. Click the Logs tab to view print data results.
  • Go to Flink web UI:
    1. In the left-side navigation pane, choose Applications > Deployments.
    2. Click the name of the desired deployment.
    3. On the Overview tab, click Flink UI. Flink UI
    4. In the left-side navigation pane of Apache Flink Dashboard, click Task Managers.
    5. On the Task Managers page, click the value in the Path, ID column.
    6. On the page that appears, click the Logs tab to view the print data results.

What do I do if no data can be found when I join a dimension table with another table?

Check whether the schema type and name in the DDL statement are the same as the schema type and name in the physical table.

What is the difference between max_pt() and max_pt_with_done()?

If the values of the partition parameter are sorted in alphabetical order, max_pt() returns the partition that ranks first in alphabetical order. If the values of the partition parameter are sorted in alphabetical order, max_pt_with_done() returns the partition that ranks first in alphabetical order and ends with the .done suffix. In this example, the following partitions are used:
  • ds=20190101
  • ds=20190101.done
  • ds=20190102
  • ds=20190102.done
  • ds=20190103
The following example shows the difference between max_pt() and max_pt_with_done():
  • `partition`='max_pt_with_done()' returns the ds=20190102 partition.
  • `partition`='max_pt()' returns the ds=20190103 partition.

No data is entered in a partition of the upstream Kafka connector. As a result, the watermark cannot move forward and the window output is delayed. What do I do?

For example, five partitions exist in the upstream Kafka connector and two new data entries are entered into Kafka every minute. However, some of the partitions do not receive data entries in real time. If a partition does not receive any elements within the timeout period, the partition is marked as temporarily idle. As a result, the watermark cannot move forward, the window cannot end at the earliest opportunity, and the result cannot be generated in real time.

In this case, you must configure a timeout period to specify that the partition has no data. This way, the partition can be excluded from the calculation of the watermark. When the partition is identified to have data, the partition can be included in the calculation of the watermark. For more information, see Configuration.

To configure a timeout period, perform the following steps: In the upper-right corner of the deployment details page, click Draft. On the right side of the Draft Editor page, click the Advanced tab. In the panel that appears, add the following code to the Additional Configuration section.
table.exec.source.idle-timeout: 1s

What do I do if the error message "IllegalArgumentException: timeout value is negative" appears?

  • Problem descriptionError details
  • Cause

    If no new Message Queue for Apache RocketMQ message is consumed for a period of time in the deployment, the MetaQSource thread enters the sleep mode. The sleep duration is specified by the pullIntervalMs parameter. However, the default value of this parameter is -1. If the default value is used, the error is returned for the deployment.

  • Solution

    Change the value of the pullIntervalMs parameter to a non-negative value.

What do I do if the error message "BackPressure Exceed reject Limit" appears?

  • Problem descriptionError details
  • Cause

    The write load in Hologres is high.

  • Solution

    Provide the instance information to Hologres technical support engineers to perform an upgrade.

What do I do if the error message "remaining connection slots are reserved for non-replication superuser connections" appears?

  • Problem description
    Caused by: com.alibaba.hologres.client.exception.HoloClientWithDetailsException: failed records 1, first:Record{schema=org.postgresql.model.TableSchema@188365, values=[f06b41455c694d24a18d0552b8b0****, com.chot.tpfymnq.meta, 2022-04-02 19:46:40.0, 28, 1, null], bitSet={0, 1, 2, 3, 4}},first err:[106]FATAL: remaining connection slots are reserved for non-replication superuser connections
        at com.alibaba.hologres.client.impl.Worker.handlePutAction(Worker.java:406) ~[?:?]
        at com.alibaba.hologres.client.impl.Worker.run(Worker.java:118) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_302]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_302]
        ... 1 more
    Caused by: com.alibaba.hologres.org.postgresql.util.PSQLException: FATAL: remaining connection slots are reserved for non-replication superuser connections
        at com.alibaba.hologres.org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2553) ~[?:?]
        at com.alibaba.hologres.org.postgresql.core.v3.QueryExecutorImpl.readStartupMessages(QueryExecutorImpl.java:2665) ~[?:?]
        at com.alibaba.hologres.org.postgresql.core.v3.QueryExecutorImpl.<init>(QueryExecutorImpl.java:147) ~[?:?]
        at com.alibaba.hologres.org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:273) ~[?:?]
        at com.alibaba.hologres.org.postgresql.core.ConnectionFactory.openConnection(ConnectionFactory.java:51) ~[?:?]
        at com.alibaba.hologres.org.postgresql.jdbc.PgConnection.<init>(PgConnection.java:240) ~[?:?]
        at com.alibaba.hologres.org.postgresql.Driver.makeConnection(Driver.java:478) ~[?:?]
        at com.alibaba.hologres.org.postgresql.Driver.connect(Driver.java:277) ~[?:?]
        at java.sql.DriverManager.getConnection(DriverManager.java:674) ~[?:1.8.0_302]
        at java.sql.DriverManager.getConnection(DriverManager.java:217) ~[?:1.8.0_302]
        at com.alibaba.hologres.client.impl.ConnectionHolder.buildConnection(ConnectionHolder.java:122) ~[?:?]
        at com.alibaba.hologres.client.impl.ConnectionHolder.retryExecute(ConnectionHolder.java:195) ~[?:?]
        at com.alibaba.hologres.client.impl.ConnectionHolder.retryExecute(ConnectionHolder.java:184) ~[?:?]
        at com.alibaba.hologres.client.impl.Worker.doHandlePutAction(Worker.java:460) ~[?:?]
        at com.alibaba.hologres.client.impl.Worker.handlePutAction(Worker.java:389) ~[?:?]
        at com.alibaba.hologres.client.impl.Worker.run(Worker.java:118) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_302]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_302]
        ... 1 more
  • Cause

    The number of connections exceeds the upper limit.

  • Solution
    • View app_name of the connection for each access node (Frontend, FE) and view the number of Hologres Client connections used by flink-connector.
    • Check whether other deployments are connecting to Hologres.
    • Releases specific connections. For more information, see Manage connections.

What do I do if the error message "Caused by: java.lang.IllegalArgumentException: Listener with name jobmaster already registered" appears?

  • Problem description
    The deployment of the JAR package cannot be submitted to the session cluster. The error message Caused by: java.lang.IllegalArgumentException: Listener with name jobmaster already registered appears.
    Caused by: java.lang.IllegalArgumentException: Listener with name jobmaster already registered
    2022-05-23T18:39:32.646390412+08:00 stdout F  at com.ververica.platform.flink.ha.kubernetes.LeaderElectionRunner.lambda$registerListener$5(LeaderElectionRunner.java:148) ~[vvp-flink-ha-kubernetes-flink111-1.1-SNAPSHOT.jar:?]
    2022-05-23T18:39:32.646394309+08:00 stdout F  at com.ververica.platform.flink.ha.kubernetes.LeaderElectionRunner.inLockScope(LeaderElectionRunner.java:242) ~[vvp-flink-ha-kubernetes-flink111-1.1-SNAPSHOT.jar:?]
    2022-05-23T18:39:32.646397276+08:00 stdout F  at com.ververica.platform.flink.ha.kubernetes.LeaderElectionRunner.registerListener(LeaderElectionRunner.java:142) ~[vvp-flink-ha-kubernetes-flink111-1.1-SNAPSHOT.jar:?]
    2022-05-23T18:39:32.646399845+08:00 stdout F  at com.ververica.platform.flink.ha.kubernetes.KubernetesHaLeaderElectionService.lambda$start$0(KubernetesHaLeaderElectionService.java:58) ~[vvp-flink-ha-kubernetes-flink111-1.1-SNAPSHOT.jar:?]
    2022-05-23T18:39:32.64640333+08:00 stdout F  at com.ververica.platform.flink.ha.kubernetes.KubernetesHaLeaderElectionService.inLockScope(KubernetesHaLeaderElectionService.java:145) ~[vvp-flink-ha-kubernetes-flink111-1.1-SNAPSHOT.jar:?]
    2022-05-23T18:39:32.646406583+08:00 stdout F  at com.ververica.platform.flink.ha.kubernetes.KubernetesHaLeaderElectionService.start(KubernetesHaLeaderElectionService.java:55) ~[vvp-flink-ha-kubernetes-flink111-1.1-SNAPSHOT.jar:?]
  • Cause

    The VVR version is earlier than VVR 4.X, such as vvr-2.1.4-flink-1.11. An issue occurs on the Kubernetes high availability (HA) client.

  • Solution

    If the deployment can be restored after the session is restarted, you can ignore this issue. If the deployment cannot be restored after the session is restarted, upgrade your VVR version to VVR 4.X or later.

What do I do if the error message "org.apache.flink.util.SerializedThrowable" appears?

  • Problem description
    When a JAR deployment is running, the error message "org.apache.flink.util.SerializedThrowable" appears in the logs of the JobManager. If the checkpoint tolerance factor is not configured, the checkpointing may fail and the deployment is restarted.
    Caused by: org.apache.flink.util.SerializedThrowable
      at org.apache.flink.fs.osshadoop.StsFetcherCredentialsProvider.getStsCredential(StsFetcherCredentialsProvider.java:79) ~[?:?]
      at org.apache.flink.fs.osshadoop.StsFetcherCredentialsProvider.getCredentials(StsFetcherCredentialsProvider.java:53) ~[?:?]
      at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.internal.OSSOperation.createDefaultContext(OSSOperation.java:166) ~[?:?]
      at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.internal.OSSOperation.doOperation(OSSOperation.java:114) ~[?:?]
      at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.internal.OSSObjectOperation.getObjectMetadata(OSSObjectOperation.java:458) ~[?:?]
      at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.OSSClient.getObjectMetadata(OSSClient.java:579) ~[?:?]
      at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.OSSClient.getObjectMetadata(OSSClient.java:569) ~[?:?]
      at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystemStore.getObjectMetadata(AliyunOSSFileSystemStore.java:277) ~[?:?]
      at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem.getFileStatus(AliyunOSSFileSystem.java:256) ~[?:?]
      at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem.create(AliyunOSSFileSystem.java:112) ~[?:?]
      at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1169) ~[?:?]
      at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1149) ~[?:?]
      at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1038) ~[?:?]
      at org.apache.flink.fs.osshadoop.common.HadoopFileSystem.create(HadoopFileSystem.java:167) ~[?:?]
      at org.apache.flink.fs.osshadoop.common.HadoopFileSystem.create(HadoopFileSystem.java:43) ~[?:?]
      at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.create(PluginFileSystemFactory.java:170) ~[flink-dist_2.11-1.13-vvr-4.0.8-SNAPSHOT.jar:1.13-vvr-4.0.8-SNAPSHOT]
      at com.alibaba.flink.statebackend.FlinkFileSystemWrapper.create(FlinkFileSystemWrapper.java:94) ~[flink-statebackend-gemini-3.0.6-SNAPSHOT.jar:3.0.6-SNAPSHOT]
      at com.alibaba.gemini.common.io.GeminiFileOutputViewImpl.<init>(GeminiFileOutputViewImpl.java:79) ~[flink-statebackend-gemini-3.0.6-SNAPSHOT.jar:3.0.6-SNAPSHOT]
      at com.alibaba.gemini.common.io.GeminiFileOutputViewImpl.<init>(GeminiFileOutputViewImpl.java:69) ~[flink-statebackend-gemini-3.0.6-SNAPSHOT.jar:3.0.6-SNAPSHOT]
      at com.alibaba.gemini.engine.snapshot.PendingSnapshotOperatorCoordinator.writeMeta(PendingSnapshotOperatorCoordinator.java:396) ~[flink-statebackend-gemini-3.0.6-SNAPSHOT.jar:3.0.6-SNAPSHOT]
      at com.alibaba.gemini.engine.snapshot.PendingSnapshotOperatorCoordinator.acknowledgeAsyncSnapshot(PendingSnapshotOperatorCoordinator.java:366) ~[flink-statebackend-gemini-3.0.6-SNAPSHOT.jar:3.0.6-SNAPSHOT]
      at com.alibaba.gemini.engine.snapshot.SnapshotUploaderImpl.run(SnapshotUploaderImpl.java:222) ~[flink-statebackend-gemini-3.0.6-SNAPSHOT.jar:3.0.6-SNAPSHOT]
      at com.alibaba.gemini.engine.executor.GeminiEventExecutor.lambda$execute$1(GeminiEventExecutor.java:178) ~[flink-statebackend-gemini-3.0.6-SNAPSHOT.jar:3.0.6-SNAPSHOT]
      at com.alibaba.gemini.engine.executor.GeminiEventExecutor.doEventInQueue(GeminiEventExecutor.java:107) ~[flink-statebackend-gemini-3.0.6-SNAPSHOT.jar:3.0.6-SNAPSHOT]
      at com.alibaba.gemini.engine.executor.GeminiEventExecutor.run(GeminiEventExecutor.java:88) ~[flink-statebackend-gemini-3.0.6-SNAPSHOT.jar:3.0.6-SNAPSHOT]
      at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[flink-dist_2.11-1.13-vvr-4.0.8-SNAPSHOT.jar:1.13-vvr-4.0.8-SNAPSHOT]
      at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[flink-dist_2.11-1.13-vvr-4.0.8-SNAPSHOT.jar:1.13-vvr-4.0.8-SNAPSHOT]
      ... 1 more
  • Cause

    An issue may occur in Object Storage Service (OSS).

  • Solution

    We recommend that you configure the execution.checkpointing.tolerable-failed-checkpoints parameter to prevent a deployment failure when checkpointing fails.

What do I do if the error message "Failed to create the job graph for the job: 4b12318d861041ccb14d6e32bae9cec7 (message = 0:0-0:0, Translating the JobGraph for this deployment failed before. Please delete the JobGraph before requesting a new translation" appears?

  • Problem description
    Failed to create the job graph for the job: 4b12318d861041ccb14d6e32bae9**** (message = 0:0-0:0, Translating the JobGraph for this deployment failed before. Please delete the JobGraph before requesting a new translation.
    Error message: org.apache.flink.table.sqlserver.api.utils.FlinkSQLException: Session '7beccb7bcc161704b285974e0ae93957' does not exist.
        at org.apache.flink.table.sqlserver.session.SessionManager.getSession(SessionManager.java:121)
        at org.apache.flink.table.sqlserver.FlinkSqlServiceImpl.configureSession(FlinkSqlServiceImpl.java:312)
        at org.apache.flink.table.sqlserver.FlinkSqlServiceImpl.configureSession(FlinkSqlServiceImpl.java:299)
        at org.apache.flink.table.sqlserver.proto.FlinkSqlServiceGrpc$MethodHandlers.invoke(FlinkSqlServiceGrpc.java:3187)
        at io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172)
        at io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331)
        at io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:820)
        at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
  • Cause

    The custom connector is not uploaded to OSS.

  • Solution

    Upload the custom connector to OSS to make the deployment run properly. For more information, see Manage custom connectors.

What do I do if the error message "Caused by: com.aliyun.openservices.aliyun.log.producer.errors.LogSizeTooLargeException: the logs is 8785684 bytes which is larger than MAX_BATCH_SIZE_IN_BYTES 8388608" appears?

  • Problem description
    Caused by: com.aliyun.openservices.aliyun.log.producer.errors.LogSizeTooLargeException: the logs is 8785684 bytes which is larger than MAX_BATCH_SIZE_IN_BYTES 8388608
    at com.aliyun.openservices.aliyun.log.producer.internals.LogAccumulator.ensureValidLogSize(LogAccumulator.java:249)
    at com.aliyun.openservices.aliyun.log.producer.internals.LogAccumulator.doAppend(LogAccumulator.java:103)
    at com.aliyun.openservices.aliyun.log.producer.internals.LogAccumulator.append(LogAccumulator.java:84)
    at com.aliyun.openservices.aliyun.log.producer.LogProducer.send(LogProducer.java:385)
    at com.aliyun.openservices.aliyun.log.producer.LogProducer.send(LogProducer.java:308)
    at com.aliyun.openservices.aliyun.log.producer.LogProducer.send(LogProducer.java:211)
    at com.alibaba.ververica.connectors.sls.sink.SLSOutputFormat.writeRecord(SLSOutputFo
    rmat.java:100)
  • Cause

    A single row of log data that is written to Log Service exceeds 8 MB. As a result, no more data can be written to Log Service.

  • Solution

    Change the start offset to skip ultra-large abnormal data. For more information, see Start a deployment.

What do I do if the error message "org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Too many ongoing snapshots. Increase kafka producers pool size or decrease number of concurrent checkpoints" appears?

  • Cause

    The checkpoint timeout period is excessively short. The error message appears if the checkpointing fails several times.

  • Solution

    In the Additional Configuration section on the Advanced tab, change the value of the execution.checkpointing.timeout parameter to a large value to increase the timeout period.