All Products
Search
Document Center

MaxCompute:Near real-time partial column updates to Delta tables with Flink

Last Updated:May 23, 2025

This topic describes the scenarios and parameter configurations for partial column updates in Delta tables, along with the two partial column update modes designed for Flink Connector and their related configurations.

Background information

  • UPSERT operation: A database feature that combines INSERT and UPDATE features. It ensures efficient data operations by requiring that each record (or row) processed by UPSERT must include primary key columns.

  • UPSERT behavior: Depends on whether data with the specified primary key exists in the table.

    • Insert semantics: When data with the specified primary key does not exist in the table, UPSERT performs an insert operation to add a new record to the table.

    • Update semantics: When a record with the specified primary key already exists in the table, UPSERT performs an update operation to update the existing data with the new data provided.

  • UPSERT scenarios: In stream processing with multiple table joins, updates from two different data streams affect different columns in the same table.

    • Data stream StreamA is responsible for updating column ColumnX.

    • Data stream StreamB is responsible for updating column ColumnY.

  • UPSERT form comparison:

    • Traditional UPSERT: Updates from StreamB might overwrite modifications made by StreamA, leading to data inconsistency.

    • Partial column update feature: Ensures that there are no conflicts between streams during concurrent updates. Each stream only updates the columns it is responsible for while preserving the update results from all streams in the same row.

Scenarios

Scenario 1: Update different columns in the same row without interference

Assume there is a user information management system that needs to process and update user data in real time. The data is processed by two independent service streams that receive information from different data sources.

  • Data stream StreamA is responsible for processing users' personal information such as name, age, and gender.

  • Data stream StreamB is responsible for processing users' contact information such as email and phone number.

In actual business operations, a user's personal information and contact information might change almost simultaneously. We need to ensure that these updates are immediately reflected in the user information management system without overwriting each other.

Operation process

  1. A user updates their name and phone number on different platforms. StreamA receives the name update, and StreamB receives the phone number update.

  2. Both StreamA and StreamB send updates to the user information management system.

Final result

  • Without partial column updates: If StreamB's update arrives and is processed after StreamA, it will overwrite the name information just updated by StreamA (if StreamB performs a full row update), causing the name to revert to its old value.

  • With partial column updates:

    • When StreamA performs an update, it only operates on the name column without affecting the contact information columns.

    • When StreamB performs an update, it only operates on the phone number column without affecting the personal information columns.

    The final result is that the user's name is updated to the latest information, and the phone number is also updated to the latest information. These updates are performed independently without interference, ensuring the integrity and accuracy of user information.

In practical applications, the partial column update feature is crucial for processing data such as user information. This feature not only ensures real-time data updates but also effectively prevents data inconsistency issues.

Scenario 2: Update partial fields within a row while keeping others unchanged

Assume there is a user information management system that needs to process and update user data in real time. The data is processed by two independent service streams that receive information from different data sources.

  • StreamA is responsible for updating users' personal information such as name, age, gender, and users' contact information such as email and phone number.

  • StreamB is responsible for updating users' personal information such as name, age, gender, and users' contact information such as email and phone number. The task is identical to StreamA.

Operation process

  1. StreamA only wants to update the user's age, with a command like: INSERT INTO table (pk, age) VALUES (1, 3) ;.

  2. At the same time, StreamB only wants to update the user's gender, with a command like: INSERT INTO table (pk, sex) VALUES (1, 'male') ;.

Final result

  • Without partial column updates: If a record with primary key 1 receives the above update commands, all fields other than the ones being updated will be set to NULL. This will result in the loss of original valid data.

  • With dynamic updates:

    • When an insert operation is triggered, the system identifies that only some fields contain data.

    • The partial column update mechanism ensures that only these fields with data are updated.

    • At the same time, fields that do not have data provided in the insert operation will maintain their original values.

By implementing this automatic identification and update strategy, the user information management system can precisely update only the fields that need to be changed without losing any existing valid data. This significantly enhances the flexibility and accuracy of data management, providing solid protection for maintaining data integrity.

Flink Connector mode overview

Based on the usage scenarios of partial column updates in Delta tables, two partial column update modes are designed for Flink Connector to meet different data update requirements.

Static mode

In static mode, users need to specify in advance which columns will be updated by the data stream. These specified columns will follow the normal UPSERT logic:

  • If the primary key exists, update the data.

  • If the primary key does not exist, insert new data.

At the same time, columns not specified for updates will maintain their existing values. This mode is suitable for columns that are expected to change frequently.

Dynamic mode

Dynamic mode gives the system higher intelligence and adaptability. In this mode, the system can automatically detect which columns in the data stream contain non-NULL values and only update those columns with values. This means that columns in the data stream without values (with NULL values) will remain unchanged. Dynamic mode is particularly suitable for situations where it is not possible to determine in advance which columns will change, ensuring the accuracy and efficiency of each data stream update.

By introducing these two update modes, Flink Connector provides users with more flexible and powerful data processing capabilities, allowing them to choose the most appropriate data update strategy based on actual situations, thereby ensuring data accuracy and integrity.

The following table shows the results after updating the same data using different modes.

Note

In this example, the first column a is the primary key. In static mode, the primary key column is selected by default.

Mode

Initial data

Step 1: Data after updating (a, b, c)

Step 2: Data after updating (a, d, null)

Step 3: Final data after updating (a, null, e)

Regular mode

(null, null, null)

(a, b, c)

(a, d, null)

(a, null, e)

Dynamic mode

(null, null, null)

(a, b, c)

(a, d, c)

(a, d, e)

Static mode (second column specified for update)

(null, null, null)

(a, b, null)

(a, d, null)

(a, null, null)

Usage methods

Create a Delta table and enable partial column updates

The specific method is to configure the acid.partial.fields.update.enable=true parameter in tblproperties. For more information, see Parameters for Delta Tables.

Syntax example:

CREATE TABLE IF NOT EXISTS partial_upsert_test
  (pk INT NOT NULL, 
   c1 STRING, 
   c2 STRING, 
   c3 STRING, 
   primary key(pk)) 
TBLPROPERTIES('transactional'='true', 'acid.partial.fields.update.enable'='true');

Flink Connector configuration examples

Parameters

To configure partial column update modes, MaxCompute has introduced the following two configuration parameters:

Parameter

Description

upsert.partial-column.enable

This parameter is used to enable the partial column update feature. If column names are not specified (with the upsert.partial-column.name parameter left empty), the system will use dynamic mode (updating non-NULL fields) for updates.

upsert.partial-column.name

This parameter is used to specify the columns that need to be updated. If this parameter is set, the system will only update the listed fields, while other fields will maintain their original values.

Note

Primary key columns are selected by default. Partition key column names cannot be added to this parameter.

Dynamic partial column update configuration examples

Create a table with dynamic partial column updates enabled. Example:

CREATE TABLE partialtable (
  pk INT,
  c1 STRING, 
  c2 STRING, 
  c3 STRING,
  PRIMARY KEY(pk) NOT ENFORCED
) WITH (
  'connector' = 'maxcompute',
  'odps.end.point' = 'https://service.cn-hangzhou-vpc.maxcompute.aliyun-inc.com/api', //VPC network connectivity
  'odps.project.name' = 'project_name',
  'odps.namespace.schema' = 'true', //Support the three-layer model.
  'table.name' = 'project.schema.tablename',
  'sink.operation' = 'upsert',
  'upsert.write.bucket.num' = '1',
  'upsert.partial-column.enable' = 'true', 
  'odps.access.id' = 'yourAccessId',
  'odps.access.key' = 'yourAccessKey'
);

Examples of subsequent operations on the table and their results:

  1. Insert data [1,a, b, c] into the table, where the first column is the primary key. The initial data is [1, a, b, c].

    INSERT INTO partialtable VALUES (1, 'a', 'b', 'c'); 
  2. Update only the second column c2 to d for the record with primary key 1. The data after update is [1, a, d, c].

    INSERT INTO partialtable(pk, c2) VALUES (1, 'd'); 
  3. Update only the third column c3 to e for the record with primary key 1. The data after update is [1, a, d, e].

    INSERT INTO partialtable(pk, c3) VALUES (1, 'e'); 

Static partial column update configuration examples

Create a table that only updates the c2 column. Subsequent operations on this table will only affect the c2 column, while other columns will remain unchanged. Example:

CREATE TABLE PartialTable2 (
  pk INT,
  c1 STRING, 
  c2 STRING, 
  c3 STRING,
  PRIMARY KEY(pk) NOT ENFORCED
) WITH (
  'connector' = 'maxcompute',
  'odps.end.point' = 'https://service.cn-hangzhou-vpc.maxcompute.aliyun-inc.com/api', //VPC network connectivity
  'odps.project.name' = 'project_name',
  'odps.namespace.schema' = 'true', //Support the three-layer model.
  'table.name' = 'project.schema.tablename',
  'sink.operation' = 'upsert',
  'upsert.write.bucket.num' = '1',
  'upsert.partial-column.enable' = 'true', 
  'upsert.partial-column.name' = 'c2', // Specify to only update the c2 column.
  'odps.access.id' = 'yourAccessId',
  'odps.access.key' = 'yourAccessKey'
);
Note

When configuring the upsert.partial-column.name parameter, you must use the column names corresponding to the table in MaxCompute, not the column names in the Flink internal table. This ensures that Flink can correctly identify and update the corresponding columns in the storage system.

References