This topic provides answers to some frequently asked questions about the upstream and downstream storage of Realtime Compute for Apache Flink.

How do I obtain JSON data by using fully managed Flink?

  • For more information about how to obtain common JSON data, see JSON Format.
  • If you want to obtain nested JSON data, you can define JSON objects in the ROW format in the DDL statement for the source table, define the keys that correspond to the JSON data that you want to obtain in the DDL statement for the result table, and configure the method to obtain keys in the DML statement to obtain the values that correspond to the nested keys. Sample code:
    • Test data
      {
          "a":"abc",
          "b":1,
          "c":{
              "e":["1","2","3","4"],
              "f":{"m":"567"}
          }
      }
    • DDL syntax that is used to create a source table
      CREATE TEMPORARY TABLE `kafka_table` (
        `a` VARCHAR,
         b int,
        `c` ROW<e ARRAY<VARCHAR>,f ROW<m VARCHAR>>  --c specifies a JSON object, which corresponds to ROW in fully managed Flink. e specifies a JSON list, which corresponds to ARRAY. 
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'xxx',
        'properties.bootstrap.servers' = 'xxx',
        'properties.group.id' = 'xxx',
        'format' = 'json',
        'scan.startup.mode' = 'xxx'
      );
    • DDL syntax that is used to create a result table
      CREATE TEMPORARY TABLE `sink` (
       `a` VARCHAR,
        b INT,
        e VARCHAR,
        `m` varchar
      ) WITH (
        'connector' = 'print',
        'logger' = 'true'
      );
    • DML statement
      INSERT INTO `sink`
        SELECT 
        `a`,
        b,
        c.e[ 1], --Fully managed Flink traverses the array from 1. This example shows how fully managed Flink obtains Element 1 from the array. If you want to obtain the entire array, remove [1]. 
        c.f.m
      FROM `kafka_table`;
    • Test resultsTest results

Fully managed Flink is connected to Message Queue for Apache Kafka, but cannot read data from or write data to Message Queue for Apache Kafka. What do I do?

  • Cause

    If a forwarding mechanism, such as a proxy or a port mapping, is used to connect fully managed Flink to Message Queue for Apache Kafka, the Kafka client obtains the endpoints of Kafka brokers instead of the address of the proxy. The Kafka client is the Kafka connector of fully managed Flink. In this case, fully managed Flink cannot read data from or write data to Message Queue for Apache Kafka even if fully managed Flink is connected to Message Queue for Apache Kafka.

    The process to connect fully managed Flink to a Kafka client consists of the following steps:
    1. The Kafka client obtains the metadata of the Kafka broker, including the endpoints of all Kafka brokers.
    2. Fully managed Flink uses the endpoints of Kafka brokers that are obtained by the Kafka client to read data from or write data to Message Queue for Apache Kafka.
  • Troubleshooting
    To troubleshoot this issue, you can perform the following steps to check whether a forwarding mechanism, such as a proxy or a port mapping, is used to connect fully managed Flink to Message Queue for Apache Kafka:
    1. Use the ZooKeeper command-line tool zkCli.sh or zookeeper-shell.sh to log on to the ZooKeeper service that is used by your Message Queue for Apache Kafka cluster.
    2. Run a command based on the information about your Message Queue for Apache Kafka cluster to obtain the metadata of your Kafka brokers.
      In most cases, you can run the get /brokers/ids/0 command to obtain the metadata of Kafka brokers. You can obtain the endpoint of Message Queue for Apache Kafka from the endpoints field. endpoint
    3. Use commands such as ping or telnet to test the connectivity between fully managed Flink and the endpoint in the endpoints field.

      If the connectivity test fails, a forwarding mechanism, such as a proxy or a port mapping, is used to connect fully managed Flink to Message Queue for Apache Kafka.

  • Solution
    • Set up a direct connection between fully managed Flink and Message Queue for Apache Kafka, instead of using a forwarding mechanism such as a proxy or a port mapping. This way, fully managed Flink can directly connect to Message Queue for Apache Kafka by using the endpoint in the endpoints field of the Kafka broker metadata.
    • Contact O&M staff of Message Queue for Apache Kafka to set the advertised.listeners property of Kafka brokers to the forwarding address. This way, the metadata of the Kafka brokers obtained by the Kafka client contains the forwarding address.
      Note Only Kafka 0.10.2.0 and later allow you to add the forwarding address to the advertised.listeners property of Kafka brokers.

    For more information about this issue, see KIP-103: Separation of Internal and External traffic and Kafka network connection issues.

After the data of a Message Queue for Apache Kafka source table is calculated by using event time-based window functions, no data output is returned. Why?

  • Problem description

    After the data of a Message Queue for Apache Kafka source table is processed by using event time-based window functions, no data output is returned.

  • Cause

    No watermark is generated because a partition of the Message Queue for Apache Kafka source table has no data. As a result, no data is returned after event time-based window functions are used to calculate the data of a Message Queue for Apache Kafka source table.

  • Solution
    1. Make sure that all partitions of the Message Queue for Apache Kafka source table contain data.
    2. Enable the idleness detection feature for source data. In the upper-right corner of the job details page, click Configure. On the right side of the Draft Editor page, click the Advanced tab. In the panel that appears, add the following code to the Additional Configuration section and save the code for the code to take effect.
      table.exec.source.idle-timeout: 5
      For more information about the table.exec.source.idle-timeout parameter, see Configuration.

What is the purpose of the commit offset mechanism in fully managed Flink?

Fully managed Flink commits the read offset to Message Queue for Apache Kafka each time a checkpoint is generated. If checkpointing is disabled or the checkpoint interval is too large, you may fail to query the read offset in the Message Queue for Apache Kafka console.

Why does the error message "timeout expired while fetching topic metadata" appear even if a network connection is established between fully managed Flink and Message Queue for Apache Kafka?

Fully managed Flink may be unable to read data from Message Queue for Apache Kafka even if a network connection is established between fully managed Flink and Message Queue for Apache Kafka. To ensure that both services are connected and the data of Message Queue for Apache Kafka can be read, you must use the endpoint that is described in the cluster metadata returned by Kafka brokers during bootstrapping. For more information, see Flink-cannot-connect-to-Kafka. To check the network connection, perform the following operations:
  1. Use zkCli.sh or zookeeper-shell.sh to log on to the ZooKeeper service that is used by Message Queue for Apache Kafka cluster.
  2. Run the ls /brokers/ids command to obtain the IDs of all Kafka brokers.
  3. Run the get /brokers/ids/{your_broker_id} command to view the metadata information of Kafka brokers.

    You can query the endpoint from listener_security_protocol_map.

  4. Check whether fully managed Flink can connect to the endpoint.

    If the endpoint contains a domain name, configure the DNS service for fully managed Flink.

How do I resume a job of fully managed Flink that fails to run after a DataHub topic is split or scaled in?

If a topic that is being read by fully managed Flink is split or scaled in, the job of fully managed Flink fails and cannot resume. If you want to resume the job, you must cancel the job, and then start the job.

Can I delete a DataHub topic that is being consumed?

No, you cannot delete or recreate DataHub topics that are being consumed.

What do the endPoint and tunnelEndpoint parameters mean? What happens if the two parameters are incorrectly configured?

For more information about the endPoint and tunnelEndpoint parameters, see Endpoints. If these two parameters are incorrectly configured in a virtual private cloud (VPC), the following task errors may occur:
  • If the endPoint parameter is incorrectly configured, the task stops when the progress reaches 91%.
  • If the tunnelEndpoint parameter is incorrectly configured, the task fails to run.

How do full MaxCompute source tables and incremental MaxCompute source tables read data from MaxCompute?

Full MaxCompute source tables and incremental MaxCompute source tables read data from MaxCompute by using MaxCompute Tunnel. Therefore, the read speed is limited by the bandwidth of MaxCompute Tunnel.

If MaxCompute is referenced as a data source, can the full MaxCompute source table or incremental MaxCompute source table read the data that is appended to an existing partition or table after a job of fully managed Flink starts?

No, the new data cannot be read. If new data is appended to the partition or the table that is read or is being read by the full MaxCompute source table or incremental MaxCompute source table after a job of fully managed Flink starts, the data cannot be read and a failover may be triggered.

Full MaxCompute source tables and incremental MaxCompute source tables use ODPS DOWNLOAD SESSION to read data from a partition or table. When you create a download session, the MaxCompute server creates an index file that contains the data mappings obtained when you create the download session. Subsequent data reading is performed based on the data mappings. Therefore, in most cases, the data that is appended to a MaxCompute table or to a partition in the table after you create a download session cannot be read. If new data is written to the MaxCompute source table, two exceptions occur:
  • If the MaxCompute data storage reads data by using a tunnel, the following error is returned when new data is written to a table or to a partition in the table: ErrorCode=TableModified,ErrorMessage=The specified table has been modified since the download initiated..
  • The accuracy of data cannot be guaranteed. If new data is written after MaxCompute Tunnel is disabled, the data is not read. If a job is recovered from a failover or is resumed, the data may not be correct. For example, existing data is read again but the new data may not be completely read.

After I suspend a job for a full MaxCompute source table or incremental MaxCompute source table, can I change the parallelism of jobs and resume the job that I suspended?

No, you cannot change the parallelism of jobs and resume a job that you suspended. MaxCompute determines which data in which partitions needs to be read for each parallel job and then records the consumption information about each parallel job in state data based on the parallelism. This way, MaxCompute can continue to read data from the most recent read position after a job is suspended and then resumed or after a job is recovered from a failover.

If you suspend a job for the full MaxCompute source table or an incremental MaxCompute source table, change the parallelism of jobs, and then resume the job, some data may be repeatedly read and some data may not be read.

Why is the data in the partitions before the start offset also read when you set the start offset of a job to 2019-10-11 00:00:00?

The start offset is valid only for Message Queue data sources, such as DataHub. You cannot apply the start offset in MaxCompute source tables. After you start a job of fully managed Flink, fully managed Flink reads data by using the following methods:
  • For a partitioned table, fully managed Flink reads data from all existing partitions.
  • For a non-partitioned table, fully managed Flink reads all existing data.

What do I do if an incremental MaxCompute source table detects a new partition and some data is not written to the partition?

No mechanism is provided to check whether the data in a partition is complete. If an incremental MaxCompute source table detects a new partition, the source table immediately reads data from the partition. If the incremental MaxCompute source table reads data from the ds partition of the MaxCompute partitioned table T, we recommend that you write data by using the following method: Execute the Insert overwrite table T partition (ds='20191010') ... statement without creating a partition. If the job succeeds, the partition and data are displayed.
Notice We recommend that you do not use the following method to write data: Create a partition, such as ds=20191010 and write data to the partition. If the incremental MaxCompute source table consumes table T and detects the new partition ds=20191010, the source table immediately reads data from the new partition. If the data written to the partition is incomplete, the data fails to be read.

What do I do if the error message "ErrorMessage=Authorization Failed [4019], You have NO privilege'ODPS:***'" appears?

  • Error details
    An error message appears on the Failover page or on the TaskManager.log page when a job is running. Error details:
    ErrorMessage=Authorization Failed [4019], You have NO privilege'ODPS:***'
  • Cause

    The user identity information specified in the MaxCompute DDL statements cannot be used to access MaxCompute.

  • Solution

    Use an Alibaba Cloud account, a RAM user, or a RAM role to authenticate the user identity. For more information, see User authentication. If you have any questions, submit a ticket and set the product name to MaxCompute.

How do I perform window aggregation if watermarks are not supported?

If you need to perform window aggregation on a MySQL CDC source table, you can convert time fields into window values and use GROUP BY to aggregate the window values. For example, if you want to calculate the number of orders and sales per minute in a store, you can use the following code:
SELECT shop_id, DATE_FORMAT(order_ts, 'yyyy-MM-dd HH:mm'), COUNT(*), SUM(price) FROM order_mysql_cdc GROUP BY shop_id, DATE_FORMAT(order_ts, 'yyyy-MM-dd HH:mm')

How do I enable the system to skip the snapshot stage and read only the change data?

You can use the debezium.snapshot.mode parameter in the WITH clause to specify whether to skip the snapshot stage. Valid values:
  • never: When a job starts to run, the system directly reads the change data from the start position and does not read the snapshots from the database. The old incremental data of the MySQL database may be automatically deleted. Therefore, the incremental data may not contain full data and the data that is read by the system may be incomplete.
  • schema_only: If you focus only on the new change data in the database and do not need to ensure data consistency, you can set this parameter to schema_only. This way, only schema snapshots are generated and the system reads data from the most recent change data.

How does the system read data from a MySQL database on which sharding is performed?

For example, a MySQL database has multiple tables such as table user_00, table user_02, and table user_99 after sharding, and the schemas of these tables are the same. In this scenario, you can use the table-name option to specify a regular expression to match multiple tables whose data can be read. For example, you can set table-name to user_.* to monitor all tables whose prefix is user_. If the schemas of all tables in the database are the same, you can use the database-name option to achieve the same effect.

What do I do if the data reading efficiency is low and backpressure exists when full data is read from tables?

If the sink processes data at a low speed, backpressure may occur. You can check whether the sink has backpressure. If the sink has backpressure, use one of the following methods to solve the backpressure issue on the sink first: To resolve this issue, you can use one of the following methods:
  • Increase the parallelism.
  • Enable aggregation optimization features, such as miniBatch.

How do I determine whether full data synchronization is complete in a Change Data Capture (CDC) job?

You can determine whether full data synchronization is complete in the job based on the value of the currentEmitEventTimeLag metric on the Metrics tab of the Deployments page. The currentEmitEventTimeLag metric reflects the difference between the time at which the source sends a data record to the sink and the time at which the data record is generated in the database. This metric is used to measure the delay from the time when the data is generated in the database to the time when the data leaves the source. Metric
Description of the value of the currentEmitEventTimeLag metric:
  • If the value of currentEmitEventTimeLag is 0, the CDC job is still in full data synchronization.
  • If the value of currentEmitEventTimeLag is greater than 0, the CDC job completes full data synchronization and starts to read binary log data.

What do I do if the error message "com.github.shyiko.mysql.binlog.network.ServerException" appears?

  • Error details
    Caused by: com.github.shyiko.mysql.binlog.network.ServerException: The slave is connecting using CHANGE MASTER TO MASTER_AUTO_POSITION = 1, but the master has purged binary logs containing GTIDs that the slave requires.
  • Cause

    A MySQL CDC connector records the position of the binary log file before fully managed Flink starts to read full data. After fully managed Flink reads full data, fully managed Flink reads incremental data from the position of the binary log file that is recorded by the MySQL CDC connector. In most cases, this error occurs because the full data reading is not complete before the MySQL CDC connector deletes the data of the MySQL binary log file position.

  • Solution
    Check the rules that are used to delete MySQL binary log files, including the rules that are related to the time, the storage space, and the number of files. We recommend that you retain binary log files for more than one day. For more information about binary log files of ApsaraDB RDS for MySQL, see Delete the binary log files of an ApsaraDB RDS for MySQL instance.
    Note In Ververica Runtime (VVR) 4.0.8 and later, fully managed Flink can read full data in parallel jobs by using a MySQL CDC connector. This way, full data reading is accelerated and the probability of the preceding error is reduced.

What do I do if the error message "The primary key is necessary when enable 'Key: 'scan.incremental.snapshot.enabled'" appears?

  • Error details
    When a grammar check is performed on a MySQL CDC source table of Flink V1.13, the following error messages appear:
    Caused by: org.apache.flink.table.api.ValidationException: The primary key is necessary when enable 'Key: 'scan.incremental.snapshot.enabled' , default: true (fallback keys: [])' to 'true'
        at com.alibaba.ververica.cdc.connectors.mysql.table.MySqlTableSourceFactory.validatePrimaryKeyIfEnableParallel(MySqlTableSourceFactory.java:186)
        at com.alibaba.ververica.cdc.connectors.mysql.table.MySqlTableSourceFactory.createDynamicTableSource(MySqlTableSourceFactory.java:85)
        at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:134)
        ... 30 more
  • Cause
    No primary key is defined in the parameters in the WITH clause of the DDL statement that is used to create the MySQL CDC source table. In Flink V1.13, data can be sharded based on primary keys to support data reading in parallel jobs.
    Note In Flink V1.12 and earlier, MySQL CDC source tables support only data reading in a single job.
  • Solution
    • If you use Flink V1.13 and want to read data from MySQL databases in parallel jobs, define a primary key in the DDL statement.
    • If you use Flink V1.13 but do not need to read data from MySQL databases in parallel jobs, add the scan.incremental.snapshot.enabled parameter to the DDL statement and set this parameter to false. This way, you do not need to define a primary key.

When output data is written to an ApsaraDB RDS result table, is a new data record inserted into the table, or is the result table updated based on the primary key value?

If a primary key is defined in the DDL statement, you can execute the INSERT INTO tablename(field1,field2, field3, ...) VALUES(value1, value2, value3, ...) ON DUPLICATE KEY UPDATE field1=value1,field2=value2, field3=value3, ...; statement to update data. If the primary key does not exist, the value is directly inserted into the table. If the primary key already exists, the existing value is replaced. If no primary key is defined in the DDL statement, you can execute the INSERT INTO statement to insert data.

How do I perform GROUP BY operations by using the unique index of an ApsaraDB RDS result table?

  • If you want to use the unique index of an ApsaraDB RDS result table to perform GROUP BY operations, you must declare the unique index in the GROUP BY clause in your job of fully managed Flink.
  • An ApsaraDB RDS table has only one auto-increment primary key, which cannot be declared as a primary key in a job of fully managed Flink.

Why is the INT UNSIGNED data type that is supported by MySQL physical tables, such as ApsaraDB RDS for MySQL physical tables or AnalyticDB for MySQL physical tables, declared as another data type in Flink SQL?

To ensure data precision, the Java Database Connectivity (JDBC) driver of MySQL converts the received data into a different data type based on the data type of the received data. For example, the MySQL JDBC driver converts the received data of the INT UNSIGNED type into the LONG type and converts the received data of the BIGINT UNSIGNED type into the BIGINTEGER type.

What do I do if the error message "Incorrect string value: '\xF0\x9F\x98\x80\xF0\x9F...' for column 'test' at row 1" appears?

  • Error details
    Caused by: java.sql.BatchUpdateException: Incorrect string value: '\xF0\x9F\x98\x80\xF0\x9F...' for column 'test' at row 1
    at sun.reflect.GeneratedConstructorAccessor59.newInstance(Unknown Source)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at com.mysql.cj.util.Util.handleNewInstance(Util.java:192)
    at com.mysql.cj.util.Util.getInstance(Util.java:167)
    at com.mysql.cj.util.Util.getInstance(Util.java:174)
    at com.mysql.cj.jdbc.exceptions.SQLError.createBatchUpdateException(SQLError.java:224)
    at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchedInserts(ClientPreparedStatement.java:755)
    at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchInternal(ClientPreparedStatement.java:426)
    at com.mysql.cj.jdbc.StatementImpl.executeBatch(StatementImpl.java:796)
    at com.alibaba.druid.pool.DruidPooledPreparedStatement.executeBatch(DruidPooledPreparedStatement.java:565)
    at com.alibaba.ververica.connectors.rds.sink.RdsOutputFormat.executeSql(RdsOutputFormat.java:488)
    ... 15 more
  • Cause

    The data contains emojis, which cannot be parsed during data encoding.

  • Solution

    When you use a JDBC driver to connect to a MySQL database, you must add the character set UTF-8 to the JDBC URL of the MySQL database, such as jdbc:mysql://<internalEndpoint>/<databaseName>?characterEncoding=UTF-8. For more information, see Using Character Sets and Unicode.

Can I retract the updated data from an ApsaraDB for ClickHouse result table?

If a primary key is defined in the DDL statement that is used to create the ApsaraDB for ClickHouse result table of fully managed Flink, you can retract the updated data. ApsaraDB for ClickHouse is a column-oriented database management system used for online analytical processing (OLAP). If you use the UPDATE and DELETE operations to process data, the performance of this system is still low. If a primary key is defined in the DDL statement, fully managed Flink uses the ALTER TABLE UPDATE or DELETE statement to update or delete data. As a result, the data processing performance significantly decreases.

How do I view print data results in the console of fully managed Flink?

If you want to view print data results in the console of fully managed Flink, you can use one of the following methods:
  • Log on to the console of fully managed Flink to view data:
    1. In the left-side navigation pane, choose Applications > Deployments.
    2. Click the name of the job whose print data results you want to view.
    3. Click the Logs tab.
    4. Click the Running Logs tab. Then, select the instance of the job that is running from the Selected Job drop-down list.
    5. On the Task Managers tab, click the value in the Path, ID column. Click the value of Path, ID
    6. Click the Logs tab to view print data results.
  • Go to Flink web UI:
    1. In the left-side navigation pane, choose Applications > Deployments.
    2. Click the name of the job whose print data results you want to view.
    3. On the Overview tab, click Flink UI. Flink UI
    4. In the left-side navigation pane of Apache Flink Dashboard, click Task Managers.
    5. On the Task Managers page, click the value in the Path, ID column.
    6. On the page that appears, click the Logs tab to view the print data results.

What do I do if no data can be found when I join a dimension table with another table?

Check whether the schema type and name in the DDL statement are the same as the schema type and name in the physical table.

What is the difference between max_pt() and max_pt_with_done()?

If the values of the partition parameter are sorted in alphabetical order, max_pt() returns the partition that ranks first in alphabetical order. If the values of the partition parameter are sorted in alphabetical order, max_pt_with_done() returns the partition that ranks first in alphabetical order and ends with the .done suffix. In this example, the following partitions are used:
  • ds=20190101
  • ds=20190101.done
  • ds=20190102
  • ds=20190102.done
  • ds=20190103
The following example shows the difference between max_pt() and max_pt_with_done():
  • `partition`='max_pt_with_done()' returns the ds=20190102 partition.
  • `partition`='max_pt()' returns the ds=20190103 partition.