This topic describes the scenarios and parameter configurations for partial column updates in Delta tables, along with the two partial column update modes designed for Flink Connector and their related configurations.
Background information
UPSERT operation: A database feature that combines INSERT and UPDATE features. It ensures efficient data operations by requiring that each record (or row) processed by UPSERT must include primary key columns.
UPSERT behavior: Depends on whether data with the specified primary key exists in the table.
Insert semantics: When data with the specified primary key does not exist in the table, UPSERT performs an insert operation to add a new record to the table.
Update semantics: When a record with the specified primary key already exists in the table, UPSERT performs an update operation to update the existing data with the new data provided.
UPSERT scenarios: In stream processing with multiple table joins, updates from two different data streams affect different columns in the same table.
Data stream
StreamAis responsible for updating columnColumnX.Data stream
StreamBis responsible for updating columnColumnY.
UPSERT form comparison:
Traditional UPSERT: Updates from
StreamBmight overwrite modifications made byStreamA, leading to data inconsistency.Partial column update feature: Ensures that there are no conflicts between streams during concurrent updates. Each stream only updates the columns it is responsible for while preserving the update results from all streams in the same row.
Scenarios
Scenario 1: Update different columns in the same row without interference
Assume there is a user information management system that needs to process and update user data in real time. The data is processed by two independent service streams that receive information from different data sources.
Data stream
StreamAis responsible for processing users' personal information such as name, age, and gender.Data stream
StreamBis responsible for processing users' contact information such as email and phone number.
In actual business operations, a user's personal information and contact information might change almost simultaneously. We need to ensure that these updates are immediately reflected in the user information management system without overwriting each other.
Operation process
A user updates their name and phone number on different platforms.
StreamAreceives the name update, andStreamBreceives the phone number update.Both
StreamAandStreamBsend updates to the user information management system.
Final result
Without partial column updates: If
StreamB's update arrives and is processed afterStreamA, it will overwrite the name information just updated byStreamA(ifStreamBperforms a full row update), causing the name to revert to its old value.With partial column updates:
When
StreamAperforms an update, it only operates on the name column without affecting the contact information columns.When
StreamBperforms an update, it only operates on the phone number column without affecting the personal information columns.
The final result is that the user's name is updated to the latest information, and the phone number is also updated to the latest information. These updates are performed independently without interference, ensuring the integrity and accuracy of user information.
In practical applications, the partial column update feature is crucial for processing data such as user information. This feature not only ensures real-time data updates but also effectively prevents data inconsistency issues.
Scenario 2: Update partial fields within a row while keeping others unchanged
Assume there is a user information management system that needs to process and update user data in real time. The data is processed by two independent service streams that receive information from different data sources.
StreamAis responsible for updating users' personal information such as name, age, gender, and users' contact information such as email and phone number.StreamBis responsible for updating users' personal information such as name, age, gender, and users' contact information such as email and phone number. The task is identical toStreamA.
Operation process
StreamAonly wants to update the user's age, with a command like:INSERT INTO table (pk, age) VALUES (1, 3) ;.At the same time,
StreamBonly wants to update the user's gender, with a command like:INSERT INTO table (pk, sex) VALUES (1, 'male') ;.
Final result
Without partial column updates: If a record with primary key 1 receives the above update commands, all fields other than the ones being updated will be set to NULL. This will result in the loss of original valid data.
With dynamic updates:
When an insert operation is triggered, the system identifies that only some fields contain data.
The partial column update mechanism ensures that only these fields with data are updated.
At the same time, fields that do not have data provided in the insert operation will maintain their original values.
By implementing this automatic identification and update strategy, the user information management system can precisely update only the fields that need to be changed without losing any existing valid data. This significantly enhances the flexibility and accuracy of data management, providing solid protection for maintaining data integrity.
Flink Connector mode overview
Based on the usage scenarios of partial column updates in Delta tables, two partial column update modes are designed for Flink Connector to meet different data update requirements.
Static mode
In static mode, users need to specify in advance which columns will be updated by the data stream. These specified columns will follow the normal UPSERT logic:
If the primary key exists, update the data.
If the primary key does not exist, insert new data.
At the same time, columns not specified for updates will maintain their existing values. This mode is suitable for columns that are expected to change frequently.
Dynamic mode
Dynamic mode gives the system higher intelligence and adaptability. In this mode, the system can automatically detect which columns in the data stream contain non-NULL values and only update those columns with values. This means that columns in the data stream without values (with NULL values) will remain unchanged. Dynamic mode is particularly suitable for situations where it is not possible to determine in advance which columns will change, ensuring the accuracy and efficiency of each data stream update.
By introducing these two update modes, Flink Connector provides users with more flexible and powerful data processing capabilities, allowing them to choose the most appropriate data update strategy based on actual situations, thereby ensuring data accuracy and integrity.
The following table shows the results after updating the same data using different modes.
In this example, the first column a is the primary key. In static mode, the primary key column is selected by default.
Mode | Initial data | Step 1: Data after updating (a, b, c) | Step 2: Data after updating (a, d, null) | Step 3: Final data after updating (a, null, e) |
Regular mode | (null, null, null) | (a, b, c) | (a, d, null) | (a, null, e) |
Dynamic mode | (null, null, null) | (a, b, c) | (a, d, c) | (a, d, e) |
Static mode (second column specified for update) | (null, null, null) | (a, b, null) | (a, d, null) | (a, null, null) |
Usage methods
Create a Delta table and enable partial column updates
The specific method is to configure the acid.partial.fields.update.enable=true parameter in tblproperties. For more information, see Parameters for Delta Tables.
Syntax example:
CREATE TABLE IF NOT EXISTS partial_upsert_test
(pk INT NOT NULL,
c1 STRING,
c2 STRING,
c3 STRING,
primary key(pk))
TBLPROPERTIES('transactional'='true', 'acid.partial.fields.update.enable'='true');Flink Connector configuration examples
Parameters
To configure partial column update modes, MaxCompute has introduced the following two configuration parameters:
Parameter | Description |
upsert.partial-column.enable | This parameter is used to enable the partial column update feature. If column names are not specified (with the |
upsert.partial-column.name | This parameter is used to specify the columns that need to be updated. If this parameter is set, the system will only update the listed fields, while other fields will maintain their original values. Note Primary key columns are selected by default. Partition key column names cannot be added to this parameter. |
Dynamic partial column update configuration examples
Create a table with dynamic partial column updates enabled. Example:
CREATE TABLE partialtable (
pk INT,
c1 STRING,
c2 STRING,
c3 STRING,
PRIMARY KEY(pk) NOT ENFORCED
) WITH (
'connector' = 'maxcompute',
'odps.end.point' = 'https://service.cn-hangzhou-vpc.maxcompute.aliyun-inc.com/api', //VPC network connectivity
'odps.project.name' = 'project_name',
'odps.namespace.schema' = 'true', //Support the three-layer model.
'table.name' = 'project.schema.tablename',
'sink.operation' = 'upsert',
'upsert.write.bucket.num' = '1',
'upsert.partial-column.enable' = 'true',
'odps.access.id' = 'yourAccessId',
'odps.access.key' = 'yourAccessKey'
);Examples of subsequent operations on the table and their results:
Insert data [1,a, b, c] into the table, where the first column is the primary key. The initial data is
[1, a, b, c].INSERT INTO partialtable VALUES (1, 'a', 'b', 'c');Update only the second column
c2todfor the record with primary key1. The data after update is[1, a, d, c].INSERT INTO partialtable(pk, c2) VALUES (1, 'd');Update only the third column
c3toefor the record with primary key1. The data after update is[1, a, d, e].INSERT INTO partialtable(pk, c3) VALUES (1, 'e');
Static partial column update configuration examples
Create a table that only updates the c2 column. Subsequent operations on this table will only affect the c2 column, while other columns will remain unchanged. Example:
CREATE TABLE PartialTable2 (
pk INT,
c1 STRING,
c2 STRING,
c3 STRING,
PRIMARY KEY(pk) NOT ENFORCED
) WITH (
'connector' = 'maxcompute',
'odps.end.point' = 'https://service.cn-hangzhou-vpc.maxcompute.aliyun-inc.com/api', //VPC network connectivity
'odps.project.name' = 'project_name',
'odps.namespace.schema' = 'true', //Support the three-layer model.
'table.name' = 'project.schema.tablename',
'sink.operation' = 'upsert',
'upsert.write.bucket.num' = '1',
'upsert.partial-column.enable' = 'true',
'upsert.partial-column.name' = 'c2', // Specify to only update the c2 column.
'odps.access.id' = 'yourAccessId',
'odps.access.key' = 'yourAccessKey'
);When configuring the upsert.partial-column.name parameter, you must use the column names corresponding to the table in MaxCompute, not the column names in the Flink internal table. This ensures that Flink can correctly identify and update the corresponding columns in the storage system.
References
For operations on writing data to Delta tables using Flink, see Use Flink to write data to a Delta table. You can refer to the practice process to configure partial column update parameters to meet your business requirements.
For more information about partial column updates, see Update data in specific columns.