All Products
Search
Document Center

Realtime Compute for Apache Flink:FAQ about upstream and downstream storage

Last Updated:Mar 05, 2024

This topic provides answers to some frequently asked questions about the upstream and downstream storage of Realtime Compute for Apache Flink.

How do I obtain JSON data by using fully managed Flink?

  • For more information about how to obtain common JSON data, see JSON Format.

  • If you want to obtain nested JSON data, you can define JSON objects in the ROW format in the DDL statement for the source table, define the keys that correspond to the JSON data that you want to obtain in the DDL statement for the result table, and configure the method to obtain keys in the DML statement to obtain the values that correspond to the nested keys. Sample code:

    • Test data

      {
          "a":"abc",
          "b":1,
          "c":{
              "e":["1","2","3","4"],
              "f":{"m":"567"}
          }
      }
    • DDL syntax that is used to create a source table

      CREATE TEMPORARY TABLE `kafka_table` (
        `a` VARCHAR,
         b int,
        `c` ROW<e ARRAY<VARCHAR>,f ROW<m VARCHAR>>  --c specifies a JSON object, which corresponds to ROW in fully managed Flink. e specifies a JSON list, which corresponds to ARRAY. 
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'xxx',
        'properties.bootstrap.servers' = 'xxx',
        'properties.group.id' = 'xxx',
        'format' = 'json',
        'scan.startup.mode' = 'xxx'
      );
    • DDL syntax that is used to create a result table

      CREATE TEMPORARY TABLE `sink` (
       `a` VARCHAR,
        b INT,
        e VARCHAR,
        `m` varchar
      ) WITH (
        'connector' = 'print',
        'logger' = 'true'
      );
    • DML statement

      INSERT INTO `sink`
        SELECT 
        `a`,
        b,
        c.e[ 1], --Fully managed Flink traverses the array from 1. This example shows how fully managed Flink obtains Element 1 from the array. If you want to obtain the entire array, remove [1]. 
        c.f.m
      FROM `kafka_table`;
    • Test results测试结果

Fully managed Flink is connected to ApsaraMQ for Kafka, but cannot read data from or write data to ApsaraMQ for Kafka. What do I do?

  • Cause

    If a forwarding mechanism, such as a proxy or a port mapping, is used to connect fully managed Flink to ApsaraMQ for Kafka, the Kafka client obtains the endpoints of Kafka brokers instead of the address of the proxy. The Kafka client is the Kafka connector of fully managed Flink. In this case, fully managed Flink cannot read data from or write data to ApsaraMQ for Kafka even if fully managed Flink is connected to ApsaraMQ for Kafka.

    The process to connect fully managed Flink to a Kafka client consists of the following steps:

    1. The Kafka client obtains the metadata of the Kafka broker, including the endpoints of all Kafka brokers.

    2. Fully managed Flink uses the endpoints of Kafka brokers that are obtained by the Kafka client to read data from or write data to Message Queue for Apache Kafka.

  • Troubleshooting

    To troubleshoot this issue, you can perform the following steps to check whether a forwarding mechanism, such as a proxy or a port mapping, is used to connect fully managed Flink to ApsaraMQ for Kafka:

    1. Use the ZooKeeper command-line tool zkCli.sh or zookeeper-shell.sh to log on to the ZooKeeper service that is used by your ApsaraMQ for Kafka cluster.

    2. Run a command based on the information about your ApsaraMQ for Kafka cluster to obtain the metadata of your Kafka brokers.

      In most cases, you can run the get /brokers/ids/0 command to obtain the metadata of Kafka brokers. You can obtain the endpoint of ApsaraMQ for Kafka from the endpoints field.endpoint

    3. Use commands such as ping or telnet to test the connectivity between fully managed Flink and the endpoint in the endpoints field.

      If the connectivity test fails, a forwarding mechanism, such as a proxy or a port mapping, is used to connect fully managed Flink to ApsaraMQ for Kafka.

  • Solution

    • Set up a direct connection between fully managed Flink and ApsaraMQ for Kafka, instead of using a forwarding mechanism such as a proxy or a port mapping. This way, fully managed Flink can directly connect to ApsaraMQ for Kafka by using the endpoint in the endpoints field of the Kafka broker metadata.

    • Contact O&M staff of ApsaraMQ for Kafka to set the advertised.listeners property of Kafka brokers to the forwarding address. This way, the metadata of the Kafka brokers obtained by the Kafka client contains the forwarding address.

      Note

      Only Kafka 0.10.2.0 and later allow you to add the forwarding address to the advertised.listeners property of Kafka brokers.

    For more information about this issue, see KIP-103: Separation of Internal and External traffic and Kafka network connection issues.

After the data of an ApsaraMQ for Kafka source table is calculated by using event time-based window functions, no data output is returned. Why?

  • Problem description

    After the data of an ApsaraMQ for Kafka source table is processed by using event time-based window functions, no data output is returned.

  • Cause

    No watermark is generated because a partition of the ApsaraMQ for Kafka source table has no data. As a result, no data is returned after event time-based window functions are used to calculate the data of an ApsaraMQ for Kafka source table.

  • Solution

    1. Make sure that all partitions of the ApsaraMQ for Kafka source table contain data.

    2. Enable the idleness detection feature for source data. Add the following code to the Other Configuration field in the Parameters section of the Configuration tab and save the code for the code to take effect. For more information, see How do I configure parameters for deployment running?

      table.exec.source.idle-timeout: 5

      For more information about the table.exec.source.idle-timeout parameter, see Configuration.

What is the purpose of the commit offset mechanism in fully managed Flink?

Fully managed Flink commits the read offset to ApsaraMQ for Kafka each time a checkpoint is generated. If checkpointing is disabled or the checkpoint interval is too large, you may fail to query the read offset in the ApsaraMQ for Kafka console.

Why does the error message "timeout expired while fetching topic metadata" appear even if a network connection is established between fully managed Flink and ApsaraMQ for Kafka?

Fully managed Flink may be unable to read data from ApsaraMQ for Kafka even if a network connection is established between fully managed Flink and ApsaraMQ for Kafka. To ensure that both services are connected and the data of ApsaraMQ for Kafka can be read, you must use the endpoint that is described in the cluster metadata returned by Kafka brokers during bootstrapping. For more information, see Flink-cannot-connect-to-Kafka. To check the network connection, perform the following operations:

  1. Use zkCli.sh or zookeeper-shell.sh to log on to the ZooKeeper service that is used by ApsaraMQ for Kafka cluster.

  2. Run the ls /brokers/ids command to obtain the IDs of all Kafka brokers.

  3. Run the get /brokers/ids/{your_broker_id} command to view the metadata information of Kafka brokers.

    You can query the endpoint from listener_security_protocol_map.

  4. Check whether fully managed Flink can connect to the endpoint.

    If the endpoint contains a domain name, configure the DNS service for fully managed Flink. For more information about how to configure the DNS service for fully managed Flink, see How do I resolve the domain name of the service on which a Flink deployment depends?

How does the Kafka connector parse nested JSON-formatted data?

When the Kafka connector is used to parse the following JSON-formatted data, the data is parsed into an ARRAY<ROW<cola VARCHAR, colb VARCHAR>> field. This field is an array of the ROW type and contains two child fields of the VARCHAR data type. Then, the data is parsed by using a user-defined table-valued function (UDTF).

{"data":[{"cola":"test1","colb":"test2"},{"cola":"test1","colb":"test2"},{"cola":"test1","colb":"test2"},{"cola":"test1","colb":"test2"},{"cola":"test1","colb":"test2"}]}

How do I connect to a Kafka cluster for which security information is configured?

  1. Add the security configurations related to encryption and authentication to the parameters in the WITH clause of the DDL statement for the Kafka cluster. For more information about the security configurations, see SECURITY. The following sample code provides an example of the security configurations.

    Important

    You must add the properties. prefix to the security configurations.

    • Configure a Kafka table to use PLAIN as the SASL mechanism and provide the Java Authentication and Authorization Service (JAAS) configuration.

      CREATE TABLE KafkaTable (
        `user_id` BIGINT,
        `item_id` BIGINT,
        `behavior` STRING,
        `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
      ) WITH (
        'connector' = 'kafka',
        ...
        'properties.security.protocol' = 'SASL_PLAINTEXT',
        'properties.sasl.mechanism' = 'PLAIN',
        'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";'
      );
    • Use SASL_SSL as the security protocol and SCRAM-SHA-256 as the SASL mechanism.

      CREATE TABLE KafkaTable (
        `user_id` BIGINT,
        `item_id` BIGINT,
        `behavior` STRING,
        `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
      ) WITH (
        'connector' = 'kafka',
        ...
        'properties.security.protocol' = 'SASL_SSL',
        /* Configure SSL. */
        /* Configure the path of the CA certificate truststore provided by the server. */
        'properties.ssl.truststore.location' = '/flink/usrlib/kafka.client.truststore.jks',
        'properties.ssl.truststore.password' = 'test1234',
        /* Configure the path of the private key file keystore if client authentication is required. */
        'properties.ssl.keystore.location' = '/flink/usrlib/kafka.client.keystore.jks',
        'properties.ssl.keystore.password' = 'test1234',
        /* Configure SASL. */
        /* Configure SCRAM-SHA-256 as the SASL mechanism. * /
        'properties.sasl.mechanism' = 'SCRAM-SHA-256',
        /* Configure JAAS. */
        'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"password\";'
      );
      Note
      • If properties.sasl.mechanism is set to SCRAM-SHA-256, set properties.sasl.jaas.config to org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule.

      • If properties.sasl.mechanism is set to PLAINTEXT, set properties.sasl.jaas.config to org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule.

  2. Upload all required files, such as a certificate, public key file, or private key file, in Additional Dependencies for the deployment.

    The files that are uploaded are stored in the /flink/usrlib directory. For more information about how to upload a file in Additional Dependencies, see Create a deployment.

    Important

    If the authentication mechanism for the username and password on the Kafka broker is SASL_SSL but the authentication mechanism on the client is SASL_PLAINTEXT, the OutOfMemory exception error is reported during deployment verification. In this case, you must change the authentication mechanism on the client.

How do I resolve field name conflicts?

  • Problem description

    Messages from Kafka are serialized into two JSON-formatted strings. In this case, the key and value contain the same field, such as the id field. If the strings are directly parsed as a Flink table for processing, a field name conflict occurs.

    • Key

      {
         "id": 1
      }

    • Value

      {
         "id": 100,
         "name": "flink"
      }
  • Solution

    To prevent the preceding issue, configure the key.fields-prefix property. The following sample code provides an example of an SQL statement that is used to create a Flink table.

    CREATE TABLE kafka_table (
      -- Specify the fields in the key and value.
      key_id INT,
      value_id INT,
      name STRING
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'test_topic',
      'properties.bootstrap.servers' = 'localhost:9092',
      'format' = 'json',
      'json.ignore-parse-errors' = 'true',
      -- Specify the field in the key and the data type of the key.
      'key.format' = 'json',
      'key.fields' = 'id',
      'value.format' = 'json',
      'value.fields' = 'id, name',
      -- Configure a prefix for the field in the key.
      'key.fields-prefix' = 'key_'
    );

    In the preceding example, the key.fields-prefix property is set to key_ when the Flink table is created. In this case, the fields in the key are prefixed with key_ during the processing of Kafka data. In this example, the field in the key is named id. Therefore, the name of the field in the Flink table becomes key_id, which is distinguished from value_id.

    When you run the SELECT * FROM kafka_table; command to query data, the following result is returned:

    key_id: 1,
    value_id: 100,
    name: flink

What do I do if a service latency occurs when data is read from a Kafka source table?

  • Problem description

    The value of the currentEmitEventTimeLag metric indicates a latency of more than 50 years when data is read from the Kafka source table. The following figure shows the details.延迟

  • Troubleshooting

    1. Check whether the deployment is a JAR deployment or an SQL deployment.

      If the deployment is a JAR deployment, check whether the Kafka dependency in the Project Object Model (POM) file is a built-in dependency of fully managed Flink. If the Kafka dependency in the POM file is the dependency of Kafka, no latency-related metrics are displayed.

    2. Check whether data is inserted into all partitions of the Kafka source table in real time.

    3. Check whether the timestamp of metadata on a Kafka message is 0 or null.

      The data latency of the Kafka source table is calculated by subtracting the timestamp on a Kafka message from the current time. If the message does not contain the timestamp, the value of the currentEmitEventTimeLag metric indicates a latency of more than 50 years. You can use one of the following methods to identify the issue:

      • For an SQL deployment, you can define a metadata column to obtain the timestamp on a Kafka message. For more information, see Create a Message Queue for Apache Kafka source table.

        CREATE TEMPORARY TABLE sk_flink_src_user_praise_rt (
            `timestamp` BIGINT ,
            `timestamp` TIMESTAMP METADATA,  -- The timestamp of metadata. 
            ts as to_timestamp (
              from_unixtime (`timestamp`, 'yyyy-MM-dd HH:mm:ss')
            ),
            watermark for ts as ts - interval '5' second
          ) WITH (
            'connector' = 'kafka',
            'topic' = '',
            'properties.bootstrap.servers' = '',
            'properties.group.id' = '',
            'format' = 'json',
            'scan.startup.mode' = 'latest-offset',
            'json.fail-on-missing-field' = 'false',
            'json.ignore-parse-errors' = 'true'
          );
      • Write a simple Java program and use KafkaConsumer to read Kafka messages for testing.

How do I resume a deployment of fully managed Flink that fails to run after a DataHub topic is split or scaled in?

If a topic that is being read by fully managed Flink is split or scaled in, the deployment of fully managed Flink fails and cannot resume. If you want to resume the deployment, you must cancel the deployment, and then start the deployment.

Can I delete a DataHub topic that is being consumed?

No, you cannot delete or recreate a DataHub topic that is being consumed.

What do the endPoint and tunnelEndpoint parameters mean? What happens if the two parameters are incorrectly configured?

For more information about the endPoint and tunnelEndpoint parameters, see Endpoints. If these two parameters are incorrectly configured in a virtual private cloud (VPC), the following task errors may occur:

  • If the endPoint parameter is incorrectly configured, the task stops when the progress reaches 91%.

  • If the tunnelEndpoint parameter is incorrectly configured, the task fails to run.

How do full MaxCompute source tables and incremental MaxCompute source tables read data from MaxCompute?

Full MaxCompute source tables and incremental MaxCompute source tables read data from MaxCompute by using MaxCompute Tunnel. Therefore, the read speed is limited by the bandwidth of MaxCompute Tunnel.

If MaxCompute is referenced as a data source, can the full MaxCompute source table or incremental MaxCompute source table read the data that is appended to an existing partition or table after a deployment of fully managed Flink starts?

No, the new data cannot be read. If new data is appended to the partition or the table that is read or is being read by the full MaxCompute source table or incremental MaxCompute source table after a deployment of fully managed Flink starts, the data cannot be read and a failover may be triggered.

Full MaxCompute source tables and incremental MaxCompute source tables use ODPS DOWNLOAD SESSION to read data from a partition or table. When you create a download session, the MaxCompute server creates an index file that contains the data mappings obtained when you create the download session. Subsequent data reading is performed based on the data mappings. Therefore, in most cases, the data that is appended to a MaxCompute table or to a partition in the table after you create a download session cannot be read. If new data is written to the MaxCompute source table, two exceptions occur:

  • If the MaxCompute data storage reads data by using a tunnel, the following error is returned when new data is written to a table or to a partition in the table: ErrorCode=TableModified,ErrorMessage=The specified table has been modified since the download initiated..

  • The accuracy of data cannot be guaranteed. If new data is written after MaxCompute Tunnel is disabled, the data is not read. If a deployment is recovered from a failover or is resumed, the data may not be correct. For example, existing data is read again but the new data may not be completely read.

After I suspend a deployment for a full MaxCompute source table or incremental MaxCompute source table, can I change the degree of parallelism for the deployment and resume the deployment that I suspended?

If the useNewApi parameter is set to true for a streaming deployment that uses a MaxCompute source table, you can change the degree of parallelism for the deployment and resume the deployment after you suspend the deployment. By default, the useNewApi parameter is set to true. The MaxCompute source table reads data from multiple matched partitions in sequence. When the MaxCompute source table reads data from a specific partition, data of different ranges in the partition is distributed to each subtask based on the degree of parallelism. If you change the degree of parallelism for the deployment, the parallel data distribution method for the partition that is being read before the deployment is suspended remains unchanged. Instead, data in the next partition is distributed to each subtask based on the new degree of parallelism. Therefore, if you increase the degree of parallelism for the deployment and restart the deployment when data in a large partition is read, data in the partition may be read by only some MaxCompute operators.

You cannot change the degree of parallelism for a batch deployment or a deployment in which the useNewApi parameter is set to false.

Why is the data in the partitions before the start offset also read when you set the start offset of a deployment to 2019-10-11 00:00:00?

The start offset is valid only for message queue data sources, such as DataHub. You cannot apply the start offset in MaxCompute source tables. After you start a deployment of fully managed Flink, fully managed Flink reads data by using the following methods:

  • For a partitioned table, fully managed Flink reads data from all existing partitions.

  • For a non-partitioned table, fully managed Flink reads all existing data.

What do I do if an incremental MaxCompute source table detects a new partition and some data is not written to the partition?

No mechanism is provided to check the data integrity of a partition. Therefore, each time an incremental MaxCompute source table detects a new partition, the source table reads data from the partition and writes the data to fully managed Flink. If the incremental MaxCompute source table reads data from the MaxCompute partitioned table T whose partition key column is ds, we recommend that you write data to the MaxCompute table by using the following method: Execute the Insert overwrite table T partition (ds='20191010') ... statement without creating a partition first. If the write operation is successful, the partition and partition data are available at the same time.

Important

We recommend that you do not use the following method to write data: Create a partition, such as ds=20191010, and write data to the partition. If the incremental MaxCompute source table consumes table T and detects the new partition ds=20191010, the source table immediately reads data from the new partition and writes the data to fully managed Flink. If specific data is not written to the partition, the data that is read by the incremental MaxCompute source table is incomplete.

What do I do if the error message "ErrorMessage=Authorization Failed [4019], You have NO privilege" appears during the running of the MaxCompute connector?

  • Problem description

    An error message appears on the Failover page or on the TaskManager.log page when a deployment is running. Error details:

    ErrorMessage=Authorization Failed [4019], You have NO privilege'ODPS:***'
  • Cause

    The user identity information specified in the MaxCompute DDL statements cannot be used to access MaxCompute.

  • Solution

    Use an Alibaba Cloud account, a RAM user, or a RAM role to authenticate the user identity. For more information, see User authentication.

How do I configure the startPartition parameter for an incremental MaxCompute source table?

Perform the steps that are described in the following table to configure the startPartition parameter.

Step

Description

Example

1

Use an equal sign (=) to connect the name of each partition key column and the related partition value. Each partition value must be a fixed value.

The partition key column is dt, and you want to start to read data from the data record whose value of dt is 20220901. In this case, the result is dt=20220901.

2

Sort the results that are obtained in the first step by partition level in ascending order. Separate the results with commas (,). Spaces are not allowed. The result that is obtained in this step is the value of the startPartition parameter.

Note

You can specify all partition levels or only the first few partition levels.

  • If the table has only the first-level partition key column named dt and you want to start to read data that meets the condition of dt=20220901, you can enter 'startPartition' = 'dt=20220901'.

  • If the first-level partition key column is dt, the second-level partition key column is hh, and the third-level partition key column is mm and you want to start to read data that meets the condition of dt=20220901, hh=08, mm=10, you can enter 'startPartition' = 'dt=20220901,hh=08,mm=10'.

  • If the first-level partition key column is dt, the second-level partition key column is hh, and the third-level partition key column is mm and you want to start to read data that meets the condition of dt=20220901, hh=08, you can enter 'startPartition' = 'dt=20220901,hh=08'.

When the incremental MaxCompute source table loads the partition list, the source table compares all partitions in the partition list with the partition that is specified by the startPartition parameter based on the alphabetical order. The source table loads the partitions whose alphabetical order is greater than or equal to the partition that is specified by the startPartition parameter. For example, an incremental MaxCompute partitioned table contains the first-level partition key column ds and the second-level partition key column type and contains the following partitions:

  • ds=20191201,type=a

  • ds=20191201,type=b

  • ds=20191202,type=a

  • ds=20191202,type=b

  • ds=20191202,type=c

  • ds=20191203,type=a

If the value of the startPartition parameter is ds=20191202, the following partitions are read: ds=20191202,type=a, ds=20191202,type=b, ds=20191202,type=c, and ds=20191203,type=a. If the value of the startPartition parameter is ds=20191202,type=b, the following partitions are read: ds=20191202,type=b, ds=20191202,type=c, and ds=20191203,type=a.

Note

The partition that is specified by the startPartition parameter does not necessarily need to exist. All partitions whose alphabetical order is greater than or equal to the partition that is specified by the startPartition parameter are read.

Why is data reading not started after a deployment that uses an incremental MaxCompute source table is started?

Excessive partitions whose alphabetical order is greater than or equal to the partition that is specified by the startPartition parameter exist, or partitions whose alphabetical order is greater than or equal to the partition that is specified by the startPartition parameter contain excessive small files. Before the incremental MaxCompute source table starts to read data, the source table must sort the information about existing partitions that meet the specific conditions. We recommend that you take note of the following items:

  • Do not read excessive historical data.

    Note

    If you want to process historical data, you can run a batch deployment that uses a MaxCompute source table.

  • Decrease the number of small files in historical data.

How do I configure the partition parameter when data is read from or written to partitions?

Read data from partitions

  • Read data from static partitions

    If a source table or a dimension table needs to read data from static partitions, perform the steps that are described in the following table to configure the partition parameter.

    Step

    Description

    Example

    1

    Use an equal sign (=) to connect the name of each partition key column and the related partition value. Each partition value can be a fixed value or a value that contains asterisks (*) as wildcards. Wildcards can be used to match any string, including empty strings.

    • The partition key column is dt, and you want to read data whose value of dt is 20220901. In this case, the result is dt=20220901.

    • The partition key column is dt, and you want to read data whose value of dt starts with 202209. In this case, the result is dt=202209*.

    • The partition key column is dt, and you want to read data whose value of dt starts with 2022 and ends with 01. In this case, the result is dt=2022*01.

    • The partition key column is dt, and you want to read data from all partitions. In this case, the result is dt=*.

    2

    Sort the results that are obtained in the first step by partition level in ascending order. Separate the results with commas (,). Spaces are not allowed. The result that is obtained in this step is the value of the partition parameter.

    You can specify all partition levels or only the first few partition levels.

    • If the table has only the first-level partition key column named dt and you want to read data from partitions that meet the condition of dt=20220901, you can enter 'partition' = 'dt=20220901'.

    • If the first-level partition key column is dt, the second-level partition key column is hh, and the third-level partition key column is mm and you want to read data from partitions that meet the condition of dt=20220901, hh=08, mm=10, you can enter 'partition' = 'dt=20220901,hh=08,mm=10'.

    • If the first-level partition key column is dt, the second-level partition key column is hh, and the third-level partition key column is mm and you want to read data from partitions that meet the condition of dt=20220901, hh=08, you can enter 'partition' = 'dt=20220901,hh=08' or 'partition' = 'dt=20220901,hh=08,mm=*'.

    • If the first-level partition key column is dt, the second-level partition key column is hh, and the third-level partition key column is mm and you want to read data from partitions that meet the conditions of dt=20220901, mm=10, you can enter 'partition' = 'dt=20220901,hh=*,mm=10'.

    If the preceding steps cannot meet your partition filter requirements, you can also add filter conditions to the WHERE clause of the SQL statement and use the partition pushdown feature of the SQL optimizer to filter partitions. For example, if the first-level partition key column is dt and the second-level partition key column is hh and you want to read data from partitions that meet the conditions of dt>=20220901, dt<=20220903, hh>=09, and hh<=17, you can use the following sample SQL code:

    CREATE TABLE maxcompute_table (
      content VARCHAR,
      dt VARCHAR,
      hh VARCHAR
    ) PARTITIONED BY (dt, hh) WITH ( 
       -- You must use PARTITIONED BY to specify partition key columns. If you do not use PARTITIONED BY to specify partition key columns, the partition pushdown feature of the SQL optimizer cannot be enabled. This affects the partition filter efficiency. 
      'connector' = 'odps',
      ... -- Configure required parameters, such as accessId. The partition parameter is optional. If you do not configure this parameter, the SQL optimizer filters partitions. 
    );
    
    SELECT content, dt, hh FROM maxcompute_table
    WHERE dt >= '20220901' AND dt <= '20220903' AND hh >= '09' AND hh <= '17'; -- Configure the partition filter condition in the WHERE clause.

  • Read data from the partition that is ranked first in alphabetical order

    • If the source table or dimension table needs to read data from the partition that is ranked first in alphabetical order, set the partition parameter to 'partition' = 'max_pt()'.

    • If the source table or dimension table needs to read data from the first two partitions that are ranked in alphabetical order, set the partition parameter to 'partition' = 'max_two_pt()'.

    • If the source table or dimension table needs to read data from the first partition that matches a partition whose name ends with .done, set the partition parameter to 'partition' = 'max_pt_with_done()'.

    In most cases, the partition that is ranked first in alphabetical order is also the latest partition that is generated. In specific cases, if the data in the latest partition is not ready and you want the dimension table to read earlier data, you can set the partition parameter to max_pt_with_done().

    When data preparation in a partition is complete, you must create an empty partition named Name of the partition that contains data.done. For example, when data preparation in the dt=20220901 partition is complete, you must create an empty partition named dt=20220901.done. If you set the partition parameter to max_pt_with_done(), the dimension table reads data only from the partitions that match partitions whose name ends with .done. For more information, see What is the difference between max_pt() and max_pt_with_done()?

    Note

    When a deployment is started, the source table obtains data only of the partition that is ranked first in alphabetical order. After the source table reads all data, the source table stops running and does not monitor whether a new partition is generated. If you want the source table to continuously read new partitions, use an incremental source table. Each time a dimension table is updated, the dimension table checks the latest partition and reads the latest data.

Write data to partitions

  • Write data to a static partition

    If the result table needs to write data to static partitions, you can perform the steps that are described in "Read data from static partitions" to configure the partition parameter.

    Important

    The partition parameter that is configured for a result table does not support asterisks (*) as wildcards.

  • Write data to a dynamic partition

    When the result table needs to write data to the related partition based on the values of partition key columns in the data, you must sort the partition key column names by partition level in ascending order and separate the partition key column names with commas (,). Spaces are not allowed. In this case, the result is the value of the partition parameter. For example, if the first-level partition is dt, the second-level partition is hh, and the third-level partition is mm, you can enter 'partition' = 'dt,hh,mm'.

Why does a deployment that uses a MaxCompute source table keep being started, or why does it take a long time to generate data after the deployment is started?

This issue is caused by one of the following reasons:

  • The MaxCompute source table contains excessive small files.

  • The MaxCompute storage cluster does not reside in the same region as the Flink computing cluster. As a result, the network communication requires a long period of time. We recommend that you deploy a MaxCompute storage cluster that resides in the same region as the Flink computing cluster before you perform operations.

  • The MaxCompute permission configuration is invalid. The Download control on the MaxCompute source table is required.

How do I select a data tunnel?

MaxCompute provides the following types of tunnels: Batch Tunnel and Streaming Tunnel. You can select a tunnel type based on your business requirements for consistency and operational efficiency. The following table describes the differences between the two types of tunnels.

Business requirement

Batch Tunnel

Streaming Tunnel

Consistency

Compared with Streaming Tunnel, Batch Tunnel can help you write accurate data to MaxCompute tables in most cases. This helps prevent data loss based on the at-least-once semantics.

Duplicate data is generated in specific partitions only if an error occurs during checkpointing and data in a deployment is written to multiple partitions at the same time.

The at-least-once semantics is used to prevent data loss. If a deployment becomes abnormal, duplicate data may be generated.

Operation efficiency

If you use Batch Tunnel, the overall operation efficiency is lower than the overall efficiency of Streaming Tunnel because you must commit data during checkpointing and create files on the server.

You do not need to commit data during checkpointing. If you use Streaming Tunnel and set the numFlushThreads parameter to a value greater than 1, upstream data can be continuously received during the flush process. Therefore, the overall operation efficiency is higher than the overall operation efficiency of Batch Tunnel.

Note

If the execution of a checkpoint on a deployment that uses Batch Tunnel is slow or even times out and the downstream store allows to receive duplicate data, you can use Streaming Tunnel for the deployment.

What do I do if duplicate data is written to a MaxCompute result table?

If duplicate data is generated when the MaxCompute connector is used to write data to MaxCompute in a Flink deployment, perform the following operations to troubleshoot the issue:

  • Check the logic of your Flink deployment. When Flink writes data to MaxCompute by using the MaxCompute connector, Flink does not check the uniqueness of a primary key even if a primary key constraint is declared in the MaxCompute result table. In addition, non-transactional tables of MaxCompute do not support primary key constraints. If duplicate data is generated during data processing based on the logic of your Flink deployment, the data that is written to MaxCompute also contains the duplicate data.

  • Check whether multiple Flink deployments are run to write data to the same MaxCompute table at the same time. MaxCompute does not support primary key constraints. If the same result is obtained in multiple Flink deployments, the MaxCompute table to which data is written contains duplicate data.

  • If Batch Tunnel is used, check whether your Flink deployment fails during checkpointing. If your Flink deployment fails during checkpointing, data in the MaxCompute result table may have been submitted to MaxCompute. When your Flink deployment is resumed from the previous checkpoint, data between the two checkpoints may be duplicated.

  • If Streaming Tunnel is used, check whether your Flink deployment performs a failover during checkpointing. If you enable Streaming Tunnel to write data to MaxCompute, data between checkpoints is submitted to MaxCompute. Therefore, if your Flink deployment performs a failover and is resumed from the most recent checkpoint, the data that is generated after the most recent checkpointing is complete and before your Flink deployment performs a failover may be repeatedly written to MaxCompute. For more information, see How do I select a data tunnel? In this case, you can switch to the Batch Tunnel mode to avoid duplicate data.

  • If Batch Tunnel is used, check whether your Flink deployment performs a failover or restarts after the deployment is canceled. For example, a Flink deployment may be canceled and then started due to optimization based on Autopilot. If the Flink engine version is earlier than vvr-6.0.7-flink-1.15, data of the MaxCompute result table is submitted before the MaxCompute connector is disabled. Therefore, if the Flink deployment is canceled and resumed from the previous checkpoint, the data that is generated during the period of time between checkpointing and deployment cancellation may be repeatedly written to MaxCompute. In this case, you can upgrade the Flink engine version to vvr-6.0.7-flink-1.15 or later to resolve the issue.

What do I do if the error message "Invalid partition spec" appears when a deployment that uses a MaxCompute result table runs?

  • Cause: The value of a partition key column in the data that is written to MaxCompute is invalid. Invalid values include an empty string, a null value, and a value that contains equal signs (=), commas (,), or slashes (/).

  • Solution: Check whether invalid data exists.

What do I do if the error message "No more available blockId" appears when a deployment that uses a MaxCompute result table runs?

  • Cause: The number of blocks that are written to the MaxCompute result table exceeds the upper limit. This indicates that the amount of data that is flushed each time is excessively small and frequent flush operations are performed.

  • Solution: We recommend that you change the values of the batchSize and flushIntervalMs parameters.

How do I use the SHUFFLE_HASH hint for a dimension table?

By default, information about the entire dimension table is stored in each subtask. If a dimension table contains a large amount of data, you can use the SHUFFLE_HASH hint to evenly distribute the data of the dimension table to each subtask. This reduces the consumption of the Java virtual machine (JVM) heap memory. In the following example, the data of the dim_1 and dim_3 dimension tables is distributed to each subtask after the SHUFFLE_HASH hint is used, but the data of the dim_2 dimension table is still completely cached in each subtask.

-- Create a source table and three dimension tables. 
CREATE TABLE source_table (k VARCHAR, v VARCHAR) WITH ( ... );
CREATE TABLE dim_1 (k VARCHAR, v VARCHAR) WITH ('connector' = 'odps', 'cache' = 'ALL', ... );
CREATE TABLE dim_2 (k VARCHAR, v VARCHAR) WITH ('connector' = 'odps', 'cache' = 'ALL', ... );
CREATE TABLE dim_3 (k VARCHAR, v VARCHAR) WITH ('connector' = 'odps', 'cache' = 'ALL', ... );

-- Specify the names of the dimension tables whose data needs to be distributed to each subtask in the SHUFFLE_HASH hint. 
SELECT /*+ SHUFFLE_HASH(dim_1), SHUFFLE_HASH(dim_3) */
k, s.v, d1.v, d2.v, d3.v
FROM source_table AS s
INNER JOIN dim_1 FOR SYSTEM_TIME AS OF PROCTIME() AS d1 ON s.k = d1.k
LEFT JOIN dim_2 FOR SYSTEM_TIME AS OF PROCTIME() AS d2 ON s.k = d2.k
LEFT JOIN dim_3 FOR SYSTEM_TIME AS OF PROCTIME() AS d3 ON s.k = d3.k;

How do I configure the CacheReloadTimeBlackList parameter?

Perform the steps that are described in the following table to configure the cacheReloadTimeBlackList parameter.

Step

Description

Example

1

Use an arrow (->) that is a combination of a hyphen (-) and a closing angle bracket (>) to connect the start time and end time during which data updates are prohibited in the dimension table. The time is in the yyyy-MM-dd HH:mm format.

If you want to prohibit data updates in the dimension table from 23:00 on November 10, 2022 to 01:00 on November 11, 2022, set this parameter to 2022-11-10 23:00 -> 2022-11-11 01:00.

2

Separate multiple time periods during which you want to prohibit data updates with commas (,). The result is the value of the cacheReloadTimeBlackList parameter.

If you want to prohibit data updates in the dimension table from 23:00 on November 10, 2022 to 01:00 on November 11, 2022 and from 23:00 on December 11, 2022 to 01:00 on December 12, 2022, set this parameter to 2022-11-10 23:00 -> 2022-11-11 01:00, 2022-12-11 23:00 -> 2022-12-12 01:00.

Why does the data type of the primary key in a MySQL table change from BIGINT UNSIGNED to DECIMAL when I create a catalog in the console of fully managed Flink? Why does the data type of the primary key change to TEXT after I execute the CREATE TABLE AS statement to synchronize data to Hologres?

Fully managed Flink does not support the BIGINT UNSIGNED data type. Therefore, fully managed Flink converts the data type of the primary key in the MySQL table from BIGINT UNSIGNED into DECIMAL based on the limits on the value range. When you execute the CREATE TABLE AS statement to synchronize data from MySQL to Hologres, fully managed Flink automatically converts the data type of the primary key into TEXT because Hologres does not support the BIGINT UNSIGNED data type and does not support a primary key of the DECIMAL data type.

We recommend that you convert the data type of the primary key based on the requirements during the development and design process. If you want to use the column for the primary key that is of the DECIMAL data type, you can manually create a table in the Hologres console and configure another field as the primary key or do not specify a primary key for the table. However, this may lead to data duplication because different primary keys or a missing primary key can affect the uniqueness of the data. Therefore, you must resolve this issue at the application level. For example, you can allow a specific degree of data duplication or use the deduplication logic.

When output data is written to an ApsaraDB RDS result table, is a new data record inserted into the table, or is the result table updated based on the primary key value?

If a primary key is defined in the DDL statement, you can execute the INSERT INTO tablename(field1,field2, field3, ...) VALUES(value1, value2, value3, ...) ON DUPLICATE KEY UPDATE field1=value1,field2=value2, field3=value3, ...; statement to update data. If the primary key does not exist, the value is directly inserted into the table. If the primary key already exists, the existing value is replaced. If no primary key is defined in the DDL statement, you can execute the INSERT INTO statement to insert data.

How do I perform GROUP BY operations by using the unique index of an ApsaraDB RDS result table?

  • If you want to use the unique index of an ApsaraDB RDS result table to perform GROUP BY operations, you must declare the unique index in the GROUP BY clause in your deployment of fully managed Flink.

  • An ApsaraDB RDS table has only one auto-increment primary key, which cannot be declared as a primary key in a deployment of fully managed Flink.

Why is the INT UNSIGNED data type that is supported by MySQL physical tables, such as ApsaraDB RDS for MySQL physical tables or AnalyticDB for MySQL physical tables, declared as another data type in Flink SQL?

To ensure data precision, the Java Database Connectivity (JDBC) driver of MySQL converts the received data into a different data type based on the data type of the received data. For example, the MySQL JDBC driver converts the received data of the INT UNSIGNED type into the LONG type of MySQL. The LONG type of MySQL is mapped to the BIGINT type of Flink SQL. The MySQL JDBC driver converts the received data of the BIGINT UNSIGNED type into the BIGINTEGER type of MySQL. The BIGINTEGER type of MySQL is mapped to the DECIMAL(20, 0) type of Flink SQL.

What do I do if the error message "Incorrect string value: '\xF0\x9F\x98\x80\xF0\x9F...' for column 'test' at row 1" appears?

  • Problem description

    Caused by: java.sql.BatchUpdateException: Incorrect string value: '\xF0\x9F\x98\x80\xF0\x9F...' for column 'test' at row 1
    at sun.reflect.GeneratedConstructorAccessor59.newInstance(Unknown Source)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at com.mysql.cj.util.Util.handleNewInstance(Util.java:192)
    at com.mysql.cj.util.Util.getInstance(Util.java:167)
    at com.mysql.cj.util.Util.getInstance(Util.java:174)
    at com.mysql.cj.jdbc.exceptions.SQLError.createBatchUpdateException(SQLError.java:224)
    at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchedInserts(ClientPreparedStatement.java:755)
    at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchInternal(ClientPreparedStatement.java:426)
    at com.mysql.cj.jdbc.StatementImpl.executeBatch(StatementImpl.java:796)
    at com.alibaba.druid.pool.DruidPooledPreparedStatement.executeBatch(DruidPooledPreparedStatement.java:565)
    at com.alibaba.ververica.connectors.rds.sink.RdsOutputFormat.executeSql(RdsOutputFormat.java:488)
    ... 15 more
  • Cause

    The data contains special characters or encoding formats. As a result, the database encoding cannot be parsed as expected.

  • Solution

    When you use a JDBC driver to connect to a MySQL database, you must add the character set UTF-8 to the JDBC URL of the MySQL database, such as jdbc:mysql://<Internal endpoint>/<databaseName>?characterEncoding=UTF-8. For more information, see Using Character Sets and Unicode.

What do I do if a deadlock occurs when data is written to an ApsaraDB RDS for MySQL database (RDS/TDDL)?

  • Problem description

    When data is written to an ApsaraDB RDS for MySQL database that uses the Taobao Distributed Data Layer (TDDL) or RDS connector, a deadlock occurs.

    Important

    Realtime Compute for Apache Flink uses relational databases, such as ApsaraDB RDS for MySQL, to store result data. The TDDL and RDS connectors are used. If Realtime Compute for Apache Flink frequently writes data to an ApsaraDB RDS for MySQL table, deadlocks may occur.

  • Example of how a deadlock occurs

    For example, an INSERT operation preempts two locks (A,B) in sequence. A indicates a range lock used for two transactions (T1,T2). The table schema is (id(Auto-increment primary key ),nid(Unique key)). T1 contains two statements insert(null,2),(null,1). T2 contains one statement insert(null,2).

    1. At the t time point, the first statement in T1 is executed. T1 has two locks (A,B).

    2. At the t+1 time point, the statement in T2 is executed. Lock A is required to lock (-inf,2]. The ranges of the two transactions have an inclusion relationship. T2 cannot use Lock A until Lock A is released by T1.

    3. At the t+2 time point, the second statement in T1 is executed. Lock A is required to lock (-inf,1]. This range is included in (-inf,2]. Therefore, T2 is required to release the lock. T1 cannot use Lock A until Lock A is released by T2.

    When T1 and T2 are waiting for each other to release the lock, a deadlock occurs.

  • Differences in database engine locks between RDS/TDDL and Tablestore

    • RDS/TDDL: The row lock in InnoDB is used to lock an index rather than a record. In this case, if the same index key is used when you access records in different rows, a lock conflict may occur, and the data in the entire section cannot be updated.

    • Tablestore: Only a single row is locked. This does not affect the update of other data.

  • Solution

    In scenarios with high queries per second (QPS) or transactions per second (TPS) or highly parallel write operations, use Tablestore as the result table. We recommend that you do not use TDDL or ApsaraDB RDS as the result table for your Flink deployment.

    If you want to use a relational database such as ApsaraDB RDS for MySQL, take note of the following points:

    • Make sure that your deployment is not affected by read and write operations from other systems.

    • If the amount of data in your deployment is small, perform a single-concurrent write operation. In scenarios with high QPS or TPS or high concurrency, the write performance deteriorates.

    • Do not use a unique primary key specified by UniqueKey if possible. If you write data to a table with a unique primary key, a deadlock may occur. If the table must contain unique primary keys, sort the unique primary keys in descending order of differentiation. This significantly reduces the probability of deadlocks. For example, you can place the MD5 function at the beginning of day_time(20171010).

    • Shard databases and tables based on your business requirements to avoid writing data to a single table. For more information about the implementation, contact the database administrator.

Why does the schema of the destination table remain unchanged after I change the table schema of the MySQL instance?

The synchronization of table schema changes is not triggered based on specific DDL statements but is triggered based on the schema changes between the two data records before and after the schema is changed. If only DDL statements are changed, but no data is added or modified in the source table, data change is not triggered in the destination table. For more information, see Synchronization policies of table schema changes.

Why does the error message "finish split response timeout" appear in the source?

This error message appears because the source fails to respond to RPC requests of the coordinator due to high CPU utilization of tasks. In this case, you must increase the number of CPU cores of TaskManager on the Resources tab in the console of fully managed Flink.

What is the impact if the table schema changes during full data reading of a MySQL CDC table?

If the table schema changes during full data reading of a table in a deployment, the deployment may report an error or the change to the table schema cannot be synchronized. In this case, you must cancel the deployment, delete the downstream table to which data is synchronized, and then restart the deployment without states.

What do I do if data synchronization fails in a deployment due to unsupported changes to the table schema during the synchronization by using the CREATE TABLE AS or CREATE DATABASE AS statement?

You must resynchronize data of the table. In this case, you can cancel the deployment, delete the downstream table to which data is synchronized, and then restart the deployment without states. We recommend that you avoid such incompatible modifications. Otherwise, a synchronization failure is reported after the deployment is restarted. For more information about the support for changes to the table schema, see Synchronization policies for table schema changes.

Can I retract the updated data from a ClickHouse result table?

If a primary key is specified in the DDL statement that is used to create the ClickHouse result table of fully managed Flink and the ignoreDelete parameter is set to false, you can retract the updated data. However, the data processing performance significantly decreases.

ClickHouse is a column-oriented database management system used for online analytical processing (OLAP). If you use the UPDATE and DELETE operations to process data, the performance of this system remains low. If a primary key is specified in the DDL statement, fully managed Flink tries to use the ALTER TABLE UPDATE statement to update data or use the ALTER TABLE DELETE statement to delete data. As a result, the data processing performance significantly decreases.

When can I view the data that is written to a ClickHouse result table in the ClickHouse console?

  • By default, the exactly-once semantics is disabled for ClickHouse result tables. If the exactly-once semantics is disabled for a ClickHouse result table, the system automatically writes data in the cache to the result table after the number of data entries in the cache reaches the value specified by the batchSize parameter or the waiting time exceeds the time period that is specified by the flushIntervalMs parameter. In this case, you do not need to wait for a successful checkpoint before you can view the data that is written to the result table in the ClickHouse console.

  • For a ClickHouse result table for which the exactly-once semantics is enabled, you can view the data that is written to the result table in the ClickHouse console only after a checkpoint is run.

How do I view print data results in the console of fully managed Flink?

If you want to view print data results in the console of fully managed Flink, you can use one of the following methods:

  • Log on to the console of fully managed Flink to view data.

    1. In the left-side navigation pane, click Deployments.

    2. On the Deployments page, click the name of the desired deployment.

    3. Click the Diagnostics tab.

    4. On the Logs tab, select the deployment that is running from the Job drop-down list.

      image.png

    5. On the Running Task Managers tab, click the value in the Path, ID column.

      image.png

    6. Click the Logs tab to view print data results.

  • Go to Flink web UI.

    1. In the left-side navigation pane, click Deployments.

    2. On the Deployments page, click the name of the desired deployment.

    3. On the Status tab, click Flink UI.

      image.png

    4. In the left-side navigation pane of Apache Flink Dashboard, click Task Managers.

    5. On the Task Managers page, click the value in the Path, ID column.

    6. On the page that appears, click the Logs tab to view the print data results.

What do I do if no data can be found when I join a dimension table with another table?

Check whether the schema type and name in the DDL statement are the same as the schema type and name in the physical table.

What is the difference between max_pt() and max_pt_with_done()?

If the values of the partition parameter are sorted in alphabetical order, max_pt() returns the partition that ranks first in alphabetical order. If the values of the partition parameter are sorted in alphabetical order, max_pt_with_done() returns the partition that ranks first in alphabetical order and ends with the .done suffix. In this example, the following partitions are used:

  • ds=20190101

  • ds=20190101.done

  • ds=20190102

  • ds=20190102.done

  • ds=20190103

The following example shows the difference between max_pt() and max_pt_with_done():

  • `partition`='max_pt_with_done()' returns the ds=20190102 partition.

  • `partition`='max_pt()' returns the ds=20190103 partition.

What do I do if no data is found in the storage?

  • If data is not flushed, this issue is normal because the writer of Flink flushes data to disks based on the following policies:

    • A bucket reaches a specific size in memory. The default threshold is 64 MB.

    • The total buffer size reaches a specific value. The default threshold is 1 GB.

    • All data in memory is flushed out when checkpointing is triggered.

  • If a streaming write operation is performed, make sure that checkpointing is enabled.

What do I do if duplicate data exists?

  • If you want to perform a copy-on-write (COW) operation, you must set the write.insert.drop.duplicates parameter to true.

    By default, deduplication is not performed on the first file in each bucket during the COW operation. Only incremental data is deduplicated. If you want to enable global deduplication, you must set the write.insert.drop.duplicates parameter to true. If you perform a merge-on-read (MOR) operation, global deduplication is automatically enabled after you define a primary key. You do not need to enable a flag parameter.

    Note

    In Hudi 0.10.0 and later, the write.insert.drop.duplicates parameter is renamed write.precombined. The default value of this parameter is true.

  • If you want to perform deduplication on multiple partitions, you must set the index.global.enabled parameter to true.

    Note

    In Hudi 0.10.0 and later, the default value of the index.global.enabled parameter is true.

  • If you want to update data that was generated a long period of time ago, such as one month, you must increase the value of the index.state.ttl parameter. The unit of this parameter is days.

    The index is the core data structure that is used to determine data duplication. The index.state.ttl parameter is used to specify the number of days for which the index can be saved. The default value is 1.5. If you set this parameter to a value less than 0, the index is permanently stored.

    Note

    In Hudi 0.10.0 and later, the default value of the index.state.ttl parameter is 0.

Why are log files the only files that are generated in MOR mode?

  • Cause: Hudi generates Parquet files only after data is compacted. If data is not compacted, only log files are generated. By default, asynchronous compaction is enabled in MOR mode, and data is compacted at an interval of five commits. A compaction task is triggered only when the compaction interval condition is met.

  • Solution: Decrease the value of the compaction.delta_commits parameter to shorten the compaction interval and accelerate the trigger of a compaction task.

No data is entered in a partition of the upstream Kafka connector. As a result, the watermark cannot move forward and the window output is delayed. What do I do?

For example, five partitions exist in the upstream Kafka connector and two new data entries are entered into Kafka every minute. However, some of the partitions do not receive data entries in real time. If a partition does not receive any elements within the timeout period, the partition is marked as temporarily idle. As a result, the watermark cannot move forward, the window cannot end at the earliest opportunity, and the result cannot be generated in real time.

In this case, you must configure a timeout period to specify that the partition has no data. This way, the partition can be excluded from the calculation of the watermark. When the partition is identified to have data, the partition can be included in the calculation of the watermark. For more information, see Configuration.

Add the following code to the Other Configuration field in the Parameters section of the Configuration tab. For more information, see How do I configure parameters for deployment running?

table.exec.source.idle-timeout: 1s

What do I do if the error message "IllegalArgumentException: timeout value is negative" appears?

  • Problem description报错

  • Cause

    If no new Message Queue messages are consumed for a period of time, the MetaQSource thread starts to hibernate. The hibernation period is specified by the pullIntervalMs parameter. However, the default value of the pullIntervalMs parameter is -1. If -1 is used as the value of the hibernation period, the error message appears.

  • Solution

    Change the value of the pullIntervalMs parameter to a non-negative value.

How does the connector for the ApsaraMQ for RocketMQ source table learn about the change in the number of partitions in a topic when new partitions are created in the topic?

  • Realtime Compute for Apache Flink that uses a Ververica Runtime (VVR) version earlier than 6.0.2 obtains the current number of partitions in a topic of ApsaraMQ for RocketMQ at an interval of 5 to 10 minutes. If the number of partitions is different from the original number of partitions for three consecutive times, a deployment failover is triggered. Therefore, the connector for the ApsaraMQ for RocketMQ source table can learn about the change in the number of partitions 10 to 30 minutes after the change occurs and a deployment failover occurs. After the deployment is restarted, Realtime Compute for Apache Flink reads data from the new partitions.

  • Realtime Compute for Apache Flink that uses VVR 6.0.2 or later obtains the current number of partitions in a topic of ApsaraMQ for RocketMQ at an interval of 5 minutes. When Realtime Compute for Apache Flink detects new partitions, the source operator of a TaskManager directly reads data from the new partitions. The deployment failover is not triggered. Therefore, the connector for the ApsaraMQ for RocketMQ source table can learn about the change in the number of partitions 1 to 5 minutes after the change occurs.

What do I do if the error message "BackPressure Exceed reject Limit" appears?

  • Problem description报错详情

  • Cause

    The write load in Hologres is high.

  • Solution

    Provide the instance information to Hologres technical support engineers to perform an upgrade.

What do I do if the error message "remaining connection slots are reserved for non-replication superuser connections" appears?

  • Problem description

    Caused by: com.alibaba.hologres.client.exception.HoloClientWithDetailsException: failed records 1, first:Record{schema=org.postgresql.model.TableSchema@188365, values=[f06b41455c694d24a18d0552b8b0****, com.chot.tpfymnq.meta, 2022-04-02 19:46:40.0, 28, 1, null], bitSet={0, 1, 2, 3, 4}},first err:[106]FATAL: remaining connection slots are reserved for non-replication superuser connections
        at com.alibaba.hologres.client.impl.Worker.handlePutAction(Worker.java:406) ~[?:?]
        at com.alibaba.hologres.client.impl.Worker.run(Worker.java:118) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_302]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_302]
        ... 1 more
    Caused by: com.alibaba.hologres.org.postgresql.util.PSQLException: FATAL: remaining connection slots are reserved for non-replication superuser connections
        at com.alibaba.hologres.org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2553) ~[?:?]
        at com.alibaba.hologres.org.postgresql.core.v3.QueryExecutorImpl.readStartupMessages(QueryExecutorImpl.java:2665) ~[?:?]
        at com.alibaba.hologres.org.postgresql.core.v3.QueryExecutorImpl.<init>(QueryExecutorImpl.java:147) ~[?:?]
        at com.alibaba.hologres.org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:273) ~[?:?]
        at com.alibaba.hologres.org.postgresql.core.ConnectionFactory.openConnection(ConnectionFactory.java:51) ~[?:?]
        at com.alibaba.hologres.org.postgresql.jdbc.PgConnection.<init>(PgConnection.java:240) ~[?:?]
        at com.alibaba.hologres.org.postgresql.Driver.makeConnection(Driver.java:478) ~[?:?]
        at com.alibaba.hologres.org.postgresql.Driver.connect(Driver.java:277) ~[?:?]
        at java.sql.DriverManager.getConnection(DriverManager.java:674) ~[?:1.8.0_302]
        at java.sql.DriverManager.getConnection(DriverManager.java:217) ~[?:1.8.0_302]
        at com.alibaba.hologres.client.impl.ConnectionHolder.buildConnection(ConnectionHolder.java:122) ~[?:?]
        at com.alibaba.hologres.client.impl.ConnectionHolder.retryExecute(ConnectionHolder.java:195) ~[?:?]
        at com.alibaba.hologres.client.impl.ConnectionHolder.retryExecute(ConnectionHolder.java:184) ~[?:?]
        at com.alibaba.hologres.client.impl.Worker.doHandlePutAction(Worker.java:460) ~[?:?]
        at com.alibaba.hologres.client.impl.Worker.handlePutAction(Worker.java:389) ~[?:?]
        at com.alibaba.hologres.client.impl.Worker.run(Worker.java:118) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_302]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_302]
        ... 1 more
  • Cause

    The number of connections exceeds the upper limit.

  • Solution

    • View app_name of the connection for each access node (Frontend, FE) and view the number of Hologres Client connections used by flink-connector.

    • Check whether other deployments are connecting to Hologres.

    • Release specific connections. For more information, see Manage connections.

What do I do if the error message "no table is defined in publication" appears?

  • Problem description

    After I drop a table and recreate a table that has the same name as the dropped table, the error message no table is defined in publication appears.

  • Cause

    When the table is dropped, the publication that is associated with the table is not dropped.

  • Solution

    1. Run the select * from pg_publication where pubname not in (select pubname from pg_publication_tables); command in Hologres to query the information about the publication that is not dropped when the table is dropped.

    2. Execute the drop publication xx; statement to drop the publication that is associated with the table.

    3. Restart the deployment.

What do I do if the error message "Caused by: java.lang.IllegalArgumentException: Listener with name jobmaster already registered" appears?

  • Problem description

    The deployment of the JAR package cannot be committed to the session cluster. The error message Caused by: java.lang.IllegalArgumentException: Listener with name jobmaster already registered appears.

    Caused by: java.lang.IllegalArgumentException: Listener with name jobmaster already registered
    2022-05-23T18:39:32.646390412+08:00 stdout F  at com.ververica.platform.flink.ha.kubernetes.LeaderElectionRunner.lambda$registerListener$5(LeaderElectionRunner.java:148) ~[vvp-flink-ha-kubernetes-flink111-1.1-SNAPSHOT.jar:?]
    2022-05-23T18:39:32.646394309+08:00 stdout F  at com.ververica.platform.flink.ha.kubernetes.LeaderElectionRunner.inLockScope(LeaderElectionRunner.java:242) ~[vvp-flink-ha-kubernetes-flink111-1.1-SNAPSHOT.jar:?]
    2022-05-23T18:39:32.646397276+08:00 stdout F  at com.ververica.platform.flink.ha.kubernetes.LeaderElectionRunner.registerListener(LeaderElectionRunner.java:142) ~[vvp-flink-ha-kubernetes-flink111-1.1-SNAPSHOT.jar:?]
    2022-05-23T18:39:32.646399845+08:00 stdout F  at com.ververica.platform.flink.ha.kubernetes.KubernetesHaLeaderElectionService.lambda$start$0(KubernetesHaLeaderElectionService.java:58) ~[vvp-flink-ha-kubernetes-flink111-1.1-SNAPSHOT.jar:?]
    2022-05-23T18:39:32.64640333+08:00 stdout F  at com.ververica.platform.flink.ha.kubernetes.KubernetesHaLeaderElectionService.inLockScope(KubernetesHaLeaderElectionService.java:145) ~[vvp-flink-ha-kubernetes-flink111-1.1-SNAPSHOT.jar:?]
    2022-05-23T18:39:32.646406583+08:00 stdout F  at com.ververica.platform.flink.ha.kubernetes.KubernetesHaLeaderElectionService.start(KubernetesHaLeaderElectionService.java:55) ~[vvp-flink-ha-kubernetes-flink111-1.1-SNAPSHOT.jar:?]
  • Cause

    The VVR version is earlier than VVR 4.X, such as vvr-2.1.4-flink-1.11. An issue occurs on the Kubernetes high availability (HA) client.

  • Solution

    If the deployment can be restored after the session is restarted, you can ignore this issue. If the deployment cannot be restored after the session is restarted, upgrade your VVR version to VVR 4.X or later.

What do I do if the error message "org.apache.flink.util.SerializedThrowable" appears?

  • Problem description

    When a JAR deployment is running, the error message "org.apache.flink.util.SerializedThrowable" appears in the logs of the JobManager. If the checkpoint tolerance factor is not configured, the checkpointing may fail and the deployment is restarted.

    Caused by: org.apache.flink.util.SerializedThrowable
      at org.apache.flink.fs.osshadoop.StsFetcherCredentialsProvider.getStsCredential(StsFetcherCredentialsProvider.java:79) ~[?:?]
      at org.apache.flink.fs.osshadoop.StsFetcherCredentialsProvider.getCredentials(StsFetcherCredentialsProvider.java:53) ~[?:?]
      at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.internal.OSSOperation.createDefaultContext(OSSOperation.java:166) ~[?:?]
      at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.internal.OSSOperation.doOperation(OSSOperation.java:114) ~[?:?]
      at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.internal.OSSObjectOperation.getObjectMetadata(OSSObjectOperation.java:458) ~[?:?]
      at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.OSSClient.getObjectMetadata(OSSClient.java:579) ~[?:?]
      at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.OSSClient.getObjectMetadata(OSSClient.java:569) ~[?:?]
      at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystemStore.getObjectMetadata(AliyunOSSFileSystemStore.java:277) ~[?:?]
      at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem.getFileStatus(AliyunOSSFileSystem.java:256) ~[?:?]
      at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem.create(AliyunOSSFileSystem.java:112) ~[?:?]
      at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1169) ~[?:?]
      at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1149) ~[?:?]
      at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1038) ~[?:?]
      at org.apache.flink.fs.osshadoop.common.HadoopFileSystem.create(HadoopFileSystem.java:167) ~[?:?]
      at org.apache.flink.fs.osshadoop.common.HadoopFileSystem.create(HadoopFileSystem.java:43) ~[?:?]
      at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.create(PluginFileSystemFactory.java:170) ~[flink-dist_2.11-1.13-vvr-4.0.8-SNAPSHOT.jar:1.13-vvr-4.0.8-SNAPSHOT]
      at com.alibaba.flink.statebackend.FlinkFileSystemWrapper.create(FlinkFileSystemWrapper.java:94) ~[flink-statebackend-gemini-3.0.6-SNAPSHOT.jar:3.0.6-SNAPSHOT]
      at com.alibaba.gemini.common.io.GeminiFileOutputViewImpl.<init>(GeminiFileOutputViewImpl.java:79) ~[flink-statebackend-gemini-3.0.6-SNAPSHOT.jar:3.0.6-SNAPSHOT]
      at com.alibaba.gemini.common.io.GeminiFileOutputViewImpl.<init>(GeminiFileOutputViewImpl.java:69) ~[flink-statebackend-gemini-3.0.6-SNAPSHOT.jar:3.0.6-SNAPSHOT]
      at com.alibaba.gemini.engine.snapshot.PendingSnapshotOperatorCoordinator.writeMeta(PendingSnapshotOperatorCoordinator.java:396) ~[flink-statebackend-gemini-3.0.6-SNAPSHOT.jar:3.0.6-SNAPSHOT]
      at com.alibaba.gemini.engine.snapshot.PendingSnapshotOperatorCoordinator.acknowledgeAsyncSnapshot(PendingSnapshotOperatorCoordinator.java:366) ~[flink-statebackend-gemini-3.0.6-SNAPSHOT.jar:3.0.6-SNAPSHOT]
      at com.alibaba.gemini.engine.snapshot.SnapshotUploaderImpl.run(SnapshotUploaderImpl.java:222) ~[flink-statebackend-gemini-3.0.6-SNAPSHOT.jar:3.0.6-SNAPSHOT]
      at com.alibaba.gemini.engine.executor.GeminiEventExecutor.lambda$execute$1(GeminiEventExecutor.java:178) ~[flink-statebackend-gemini-3.0.6-SNAPSHOT.jar:3.0.6-SNAPSHOT]
      at com.alibaba.gemini.engine.executor.GeminiEventExecutor.doEventInQueue(GeminiEventExecutor.java:107) ~[flink-statebackend-gemini-3.0.6-SNAPSHOT.jar:3.0.6-SNAPSHOT]
      at com.alibaba.gemini.engine.executor.GeminiEventExecutor.run(GeminiEventExecutor.java:88) ~[flink-statebackend-gemini-3.0.6-SNAPSHOT.jar:3.0.6-SNAPSHOT]
      at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[flink-dist_2.11-1.13-vvr-4.0.8-SNAPSHOT.jar:1.13-vvr-4.0.8-SNAPSHOT]
      at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[flink-dist_2.11-1.13-vvr-4.0.8-SNAPSHOT.jar:1.13-vvr-4.0.8-SNAPSHOT]
      ... 1 more
  • Cause

    An issue may occur in Object Storage Service (OSS).

  • Solution

    We recommend that you configure the execution.checkpointing.tolerable-failed-checkpoints parameter to prevent a deployment failure when checkpointing fails.

What do I do if the error message "Failed to create the job graph for the job: 4b12318d861041ccb14d6e32bae9cec7 (message = 0:0-0:0, Translating the JobGraph for this deployment failed before. Please delete the JobGraph before requesting a new translation" appears?

  • Problem description

    Failed to create the job graph for the job: 4b12318d861041ccb14d6e32bae9**** (message = 0:0-0:0, Translating the JobGraph for this deployment failed before. Please delete the JobGraph before requesting a new translation.
    Error message: org.apache.flink.table.sqlserver.api.utils.FlinkSQLException: Session '7beccb7bcc161704b285974e0ae93957' does not exist.
        at org.apache.flink.table.sqlserver.session.SessionManager.getSession(SessionManager.java:121)
        at org.apache.flink.table.sqlserver.FlinkSqlServiceImpl.configureSession(FlinkSqlServiceImpl.java:312)
        at org.apache.flink.table.sqlserver.FlinkSqlServiceImpl.configureSession(FlinkSqlServiceImpl.java:299)
        at org.apache.flink.table.sqlserver.proto.FlinkSqlServiceGrpc$MethodHandlers.invoke(FlinkSqlServiceGrpc.java:3187)
        at io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172)
        at io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331)
        at io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:820)
        at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
  • Cause

    The custom connector is not uploaded to OSS.

  • Solution

    Upload the custom connector to OSS to make the deployment run as expected. For more information, see Manage custom connectors.

What do I do if the error message "Caused by: com.aliyun.openservices.aliyun.log.producer.errors.LogSizeTooLargeException: the logs is 8785684 bytes which is larger than MAX_BATCH_SIZE_IN_BYTES 8388608" appears?

  • Problem description

    Caused by: com.aliyun.openservices.aliyun.log.producer.errors.LogSizeTooLargeException: the logs is 8785684 bytes which is larger than MAX_BATCH_SIZE_IN_BYTES 8388608
    at com.aliyun.openservices.aliyun.log.producer.internals.LogAccumulator.ensureValidLogSize(LogAccumulator.java:249)
    at com.aliyun.openservices.aliyun.log.producer.internals.LogAccumulator.doAppend(LogAccumulator.java:103)
    at com.aliyun.openservices.aliyun.log.producer.internals.LogAccumulator.append(LogAccumulator.java:84)
    at com.aliyun.openservices.aliyun.log.producer.LogProducer.send(LogProducer.java:385)
    at com.aliyun.openservices.aliyun.log.producer.LogProducer.send(LogProducer.java:308)
    at com.aliyun.openservices.aliyun.log.producer.LogProducer.send(LogProducer.java:211)
    at com.alibaba.ververica.connectors.sls.sink.SLSOutputFormat.writeRecord(SLSOutputFo
    rmat.java:100)
  • Cause

    A single row of log data that is written to Simple Log Service exceeds 8 MB. As a result, no more data can be written to Simple Log Service.

  • Solution

    Change the start offset to skip ultra-large abnormal data. For more information, see Start a deployment.

What do I do if an OOM error occurs in TaskManagers and the error message "java.lang.OutOfMemoryError: Java heap space" appears for the source table when I restore a failed Flink program?

  • Cause

    In most cases, this issue occurs because the message body of Simple Log Service is excessively large. The Simple Log Service connector is used to request data in batches. The number of log groups is determined by the batchGetSize parameter. The default value of this parameter is 100. Therefore, Flink can receive data of a maximum of 100 log groups each time. When Flink runs as expected, Flink consumes data in a timely manner and does not receive data of 100 log groups. However, if a failover occurs, a large amount of data that is not consumed is accumulated. If the memory that is occupied by a single log group multiplied by 100 is greater than the available memory for JVM, an OOM error occurs in TaskManagers.

  • Solution

    Decrease the value of the batchGetSize parameter.

What do I do if the error message "org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Too many ongoing snapshots. Increase kafka producers pool size or decrease number of concurrent checkpoints" appears?

  • Cause

    The checkpoint timeout period is excessively short. The error message appears if the checkpointing fails several times.

  • Solution

    In the Additional Configuration section on the Advanced tab, change the value of the execution.checkpointing.timeout parameter to a large value to increase the timeout period.

How do I specify the consumer offset for an Apache Paimon source table?

You can configure the scan.mode parameter to specify the consumer offset for an Apache Paimon source table. The following table describes the scan modes supported by Apache Paimon source tables and the related behavior.

Scan mode

Batch read behavior

Streaming read behavior

default

The default scan mode. The actual behavior is determined by other parameters.

  • If the scan.timestamp-millis parameter is configured, the actual behavior is the same as the behavior in from-timestamp scan mode.

  • If the scan.snapshot-id parameter is configured, the actual behavior is the same as the behavior in from-snapshot scan mode.

If neither of the preceding parameters is configured, the actual behavior is the same as the behavior in latest-full scan mode.

latest-full

Produces the most recent snapshot of the table.

Produces the most recent snapshot of the table upon the startup of a deployment, and continuously produces incremental data.

compacted-full

Produces the snapshot of the table after the most recent full compaction is performed.

Produces the snapshot of the table upon the startup of a deployment after the most recent full compaction is performed, and continuously produces incremental data.

latest

Triggers the same behavior as latest-full.

Continuously produces incremental data without producing the most recent snapshot of the table upon the startup of a deployment.

from-timestamp

Produces the most recent snapshot that is generated earlier than or at the timestamp specified by the scan.timestamp-millis parameter.

Continuously produces incremental data that is read starting from the timestamp specified by the scan.timestamp-millis parameter without producing the snapshot of the table upon the startup of a deployment.

from-snapshot

Produces a snapshot of the table. The snapshot is specified by the scan.snapshot-id parameter.

Continuously produces incremental data that is read starting from the snapshot specified by the scan.snapshot-id parameter without producing the snapshot of the table upon the startup of a deployment.

from-snapshot-full

Triggers the same behavior as from-snapshot.

Produces a snapshot of the table upon the startup of a deployment and continuously produces incremental data that is read later than the time the snapshot is generated. The snapshot is specified by the scan.snapshot-id parameter.

How do I configure automatic partition expiration?

Apache Paimon tables support the automatic deletion of partitions that exist for a period of time longer than the specified validity period of a partition. This helps reduce storage costs. The default processing mechanism varies based on whether miniBatch is enabled:

  • Period of time for which a partition exists: The value is obtained based on the difference between the current system time and the timestamp of the converted partition value. The timestamp of the converted partition value is obtained based on the following rules:

    1. The partition.timestamp-pattern parameter specifies a pattern to convert a partition value into a time string.

      Each partition key column in the value of this parameter is represented by a dollar sign ($) and a column name. For example, a partition contains four partition key columns year, month, day, and hour, and the pattern is $year-$month-$day $hour:00:00. The partition year=2023,month=04,day=21,hour=17 is converted into the time string 2023-04-21 17:00:00 based on the pattern.

    2. The partition.timestamp-formatter parameter specifies a pattern to convert a time string into a timestamp.

      If this parameter is not configured, the pattern yyyy-MM-dd HH:mm:ss or yyyy-MM-dd is used by default. All patterns that are compatible with DateTimeFormatter of Java can be used.

  • Partition expiration time: The value is specified by the partition.expiration-time parameter.

What do I do if the error message "org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'oss'." appears?

  • Problem description

Unable to create catalog 'paimon'.
The cause is following: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'oss'. The scheme is directly supported by Flink through the following plugin: flink-oss-fs-hadoop. 
  • Cause

    Canary release is performed on the platform, and your platform is not the latest version.

  • Solution

    Contact customer service or a sales representative to upgrade the platform.