This topic describes how to insert, update, overwrite, or delete data in Paimon tables using the Development Console of Realtime Compute for Apache Flink. This topic also describes how to consume data from Paimon tables and specify a consumer offset.
Prerequisites
You have created a Paimon catalog and a Paimon table. For details, see Manage Paimon catalogs.
Version requirement
This feature requires Ververica Runtime (VVR) 8.0.5 or later.
Write data
Filter delete messages: By default, DELETE messages delete data with the corresponding primary key. To filter these messages, set
ignore-deletetotrueusing a SQL hint.Adjust sink parallelism: To manually adjust the concurrency of the sink table operator, set
sink.parallelismusing a SQL hint. For example:INSERT INTO t /*+ OPTIONS('sink.parallelism' = '10') */ SELECT * FROM s;
Synchronize data and schema (CTAS/CDAS)
For details, see Manage Paimon catalogs.
Insert or update data (INSERT INTO)
Use the INSERT INTO statement to insert or update data in a Paimon table.
A Paimon primary key table can accept all types of messages, including INSERT, UPDATE_BEFORE, UPDATE_AFTER, and DELETE. Data with the same primary key is merged after it is written, based on the data merge mechanism.
A Paimon append-only table (non-primary key table) can accept only INSERT messages.
Overwrite data (INSERT OVERWRITE)
Overwriting data means clearing and rewriting it. Use the INSERT OVERWRITE statement to overwrite an entire Paimon table or specific partitions. The following examples show the SQL statements.
The INSERT OVERWRITE statement is supported only in batch jobs.
By default, the INSERT OVERWRITE operation does not generate changelog data. The deleted and imported data cannot be consumed by downstream streaming jobs. To consume this type of data, see Stream and consume the results of an INSERT OVERWRITE statement.
Overwrite the entire non-partitioned table my_table.
INSERT OVERWRITE my_table SELECT ...;Overwrite the
dt=20240108,hh=06partition in the my_table table.INSERT OVERWRITE my_table PARTITION (`dt` = '20240108', `hh` = '06') SELECT ...;Dynamically overwrite partitions in the my_table table. The partitions that appear in the result of the SELECT statement are overwritten. Other partitions remain unchanged.
INSERT OVERWRITE my_table SELECT ...;Overwrite the entire partitioned table my_table.
INSERT OVERWRITE my_table /*+ OPTIONS('dynamic-partition-overwrite' = 'false') */ SELECT ...;
Delete data (DELETE)
Use the DELETE statement to delete data from a Paimon primary key table. The DELETE statement can be executed only in Data Exploration.
-- Delete all data where currency = 'UNKNOWN' from the my_table table.
DELETE FROM my_table WHERE currency = 'UNKNOWN';Consume data
Streaming jobs
For Paimon primary key tables that are consumed by streaming jobs, you must configure the changelog producer.
By default, a Paimon source operator in a streaming job first produces the full data from the Paimon table when the job starts. Then, the operator produces the incremental data from the Paimon table from that point forward.
Consume data from a specified offset
To consume data from a Paimon table from a specified offset, use one of the following methods:
To consume only incremental data (but not full data) when the job starts, set
'scan.mode' = 'latest'using a SQL hint.SELECT * FROM t /*+ OPTIONS('scan.mode' = 'latest') */;To consume only incremental data (but not full data) from a specified point in time, set the
scan.timestamp-millisparameter using a SQL hint. Specify the number of milliseconds elapsed from the Unix epoch (1970-01-01 00:00:00 UTC) to the desired point in time.SELECT * FROM t /*+ OPTIONS('scan.timestamp-millis' = '1678883047356') */;To consume the full data written after a specified point in time and continuously consume incremental data, use one of the following methods.
NoteThis method reads files modified after the specified time. Due to file compaction, files may include data written before that time. Use a WHERE filter to exclude unwanted data.
Do not set any SQL hints. When you start the job, select Specify source's start time and specify the time information.

Set the
scan.file-creation-time-millisparameter using a SQL hint. Specify the number of milliseconds elapsed from the Unix epoch to the desired point in time.SELECT * FROM t /*+ OPTIONS('scan.file-creation-time-millis' = '1678883047356') */;
To consume only incremental data (but not full data) starting from a specific snapshot, set the
scan.snapshot-idparameter using a SQL hint.SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '3') */;To consume the full data of a specific snapshot and continuously consume incremental data, set the
'scan.mode' = 'from-snapshot-full'andscan.snapshot-idparameters using a SQL hint.SELECT * FROM t /*+ OPTIONS('scan.mode' = 'from-snapshot-full', 'scan.snapshot-id' = '1') */;
Save consumption progress with consumer ID
A consumer ID saves consumption progress in the Paimon table's metadata file. Benefits:
Resume consumption from the interruption point even when job topology changes or in stateless mode.
Prevent expiration of unconsumed snapshots.
Set the consumer-id parameter to assign a consumer ID to the Paimon source operator in a streaming job. The value can be any string. When a consumer ID is created for the first time, its starting consumer offset is determined by the rules in Consume data from a specified offset. Afterward, if you continue to use the same consumer ID, you can resume consumption from the Paimon table.
For example, the following SQL statement shows how to set a consumer ID named test-id for a Paimon source operator. To reset the consumer offset corresponding to a consumer ID, set 'consumer.ignore-progress' = 'true'.
SELECT * FROM t /*+ OPTIONS('consumer-id' = 'test-id') */;Unconsumed snapshots for a consumer ID are not deleted when they expire. The snapshots and their historical data files occupy storage space if you do not clear obsolete consumer IDs. Set the consumer.expiration-time table parameter to clear consumer IDs that have not been used for a specified period. For example, 'consumer.expiration-time' = '3d' clears consumer IDs unused for three days.
Consume the results of INSERT OVERWRITE
By default, the INSERT OVERWRITE operation does not generate changelog data. Downstream streaming jobs cannot consume the deleted and imported data. To consume deleted and imported data, configure 'streaming-read-overwrite' = 'true' in the streaming job using a SQL hint.
SELECT * FROM t /*+ OPTIONS('streaming-read-overwrite' = 'true') */;Batch jobs
By default, a Paimon source operator in a batch job reads the latest snapshot to output the most recent state data of the Paimon table.
Time travel
Set the scan.timestamp-millis parameter using a SQL hint to query the state of the Paimon table at that point in time. This parameter specifies the number of milliseconds that have elapsed from the Unix epoch (1970-01-01 00:00:00 UTC) to the specified point in time.
SELECT * FROM t /*+ OPTIONS('scan.timestamp-millis' = '1678883047356') */;Set the scan.snapshot-id parameter using a SQL hint to query the state of the Paimon table when the snapshot was generated.
SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '3') */;Query data changes between two snapshots
To query the data changes in a Paimon table between two snapshots, set the incremental-between parameter using a SQL hint. For example, to view all data that has changed between snapshot 20 and snapshot 12, use the following SQL statement.
SELECT * FROM t /*+ OPTIONS('incremental-between' = '12,20') */;Because batch jobs do not support consuming DELETE messages, these messages are discarded by default. To consume DELETE messages in a batch job, query the Audit Log system table. For example, SELECT * FROM `t$audit_log` /*+ OPTIONS('incremental-between' = '12,20') */;.
Adjust source parallelism
By default, Paimon automatically infers the parallelism of the source operator based on information such as the number of partitions and buckets. Use a SQL hint to set the following parameters to manually adjust the parallelism of the source operator.
Parameter | Data type | Default value | Remarks |
scan.parallelism | Integer | None | Manually sets the parallelism of the Paimon source operator. |
scan.infer-parallelism | Boolean | true | Specifies whether to automatically infer the parallelism of the Paimon source operator. |
scan.infer-parallelism.max | Integer | 1024 | The upper limit for the automatically inferred parallelism of the Paimon source operator. |
The following is an example of a SQL statement that manually sets the parallelism of the Paimon source operator to 10.
SELECT * FROM t /*+ OPTIONS('scan.parallelism' = '10') */;Use a Paimon table as a dimension table
A Paimon table can also be used as a dimension table. For details about the syntax for dimension table JOINs, see Dimension table JOIN statement.
Write and consume the semi-structured data type VARIANT
In VVR 11.1 and later, Paimon tables introduce the semi-structured data type VARIANT. This type lets you convert JSON strings of the VARCHAR type to the VARIANT data type using PARSE_JSON or TRY_PARSE_JSON. Writing and consuming data of the VARIANT type directly significantly improves the performance of querying and processing JSON data.
The following is a sample SQL statement:
CREATE TABLE `my-catalog`.`my_db`.`my_tbl` (
k BIGINT,
info VARIANT
);
INSERT INTO `my-catalog`.`my_db`.`my_tbl`
SELECT k, PARSE_JSON(jsonStr) FROM T;