When multiple Flink streams update different columns in the same row, a standard UPSERT overwrites the entire row—so the later stream silently erases the earlier stream's changes. Partial column updates let each stream write only the columns it owns, so concurrent updates from independent streams are preserved in the same row without conflict.
When to use this feature
-
Independent streams, same row: Two streams update different columns in the same record at the same time. Without partial column updates, the later write overwrites columns set by the earlier write, causing data loss.
-
Sparse writes: A stream provides values for only a subset of columns. Without partial column updates, unspecified columns are set to NULL, destroying existing data.
How it works
UPSERT combines INSERT and UPDATE into a single operation. Every record processed by UPSERT must include the primary key columns.
-
If no record with the given primary key exists, UPSERT inserts a new record.
-
If a record with the given primary key already exists, UPSERT updates it with the new values.
In stream processing with multiple table joins, different streams may update different columns in the same row. Standard UPSERT replaces the entire row, so one stream's updates can overwrite another stream's changes. Partial column updates solve this by restricting each stream's write to only the columns it owns.
Partial column update modes
MaxCompute provides two partial column update modes for Flink Connector.
| Mode | How it works | When to use |
|---|---|---|
| Dynamic mode | Automatically detects non-NULL columns in each record and updates only those columns. NULL columns remain unchanged. | Column set is unknown in advance or varies per record |
| Static mode | Updates only the columns you explicitly specify. All other columns retain their existing values. | Column set is fixed and known in advance |
The following table shows how the same sequence of writes produces different results across modes. Column a is the primary key.
| Mode | Initial data | After writing (a, b, c) | After writing (a, d, NULL) | After writing (a, NULL, e) |
|---|---|---|---|---|
| Regular mode | (null, null, null) | (a, b, c) | (a, d, null) | (a, null, e) |
| Dynamic mode | (null, null, null) | (a, b, c) | (a, d, c) | (a, d, e) |
| Static mode (second column specified) | (null, null, null) | (a, b, null) | (a, d, null) | (a, null, null) |
Prerequisites
Before you begin, ensure that you have:
-
A MaxCompute project with a Delta table that has partial column updates enabled
-
Flink Connector configured to connect to MaxCompute
Enable partial column updates on a Delta table
Set acid.partial.fields.update.enable=true in TBLPROPERTIES when creating the Delta table. For the full list of Delta table parameters, see Parameters for Delta tables.
CREATE TABLE IF NOT EXISTS partial_upsert_test
(pk INT NOT NULL,
c1 STRING,
c2 STRING,
c3 STRING,
PRIMARY KEY(pk))
TBLPROPERTIES('transactional'='true', 'acid.partial.fields.update.enable'='true');
Configure Flink Connector for partial column updates
Parameters
| Parameter | Description |
|---|---|
upsert.partial-column.enable |
Enables partial column updates. When upsert.partial-column.name is not set, dynamic mode is used. |
upsert.partial-column.name |
Specifies the columns to update (static mode). Primary key columns are always included automatically. |
upsert.partial-column.name must use the column names from the MaxCompute table, not the column names in the Flink internal table.
Partition key column names cannot be added to upsert.partial-column.name.
Dynamic mode example
The following Flink table definition enables dynamic partial column updates. The system automatically detects which columns contain non-NULL values and updates only those columns.
CREATE TABLE partialtable (
pk INT,
c1 STRING,
c2 STRING,
c3 STRING,
PRIMARY KEY(pk) NOT ENFORCED
) WITH (
'connector' = 'maxcompute',
'odps.end.point' = 'https://service.cn-hangzhou-vpc.maxcompute.aliyun-inc.com/api', -- VPC endpoint
'odps.project.name' = 'project_name',
'odps.namespace.schema' = 'true', -- Enable the three-layer model
'table.name' = 'project.schema.tablename',
'sink.operation' = 'upsert',
'upsert.write.bucket.num' = '1',
'upsert.partial-column.enable' = 'true', -- Enable dynamic mode
'odps.access.id' = '<your-access-key-id>',
'odps.access.key' = '<your-access-key-secret>'
);
The following example shows how dynamic mode preserves existing values across three sequential writes.
Step 1: Insert the initial record. Data after this step: [1, a, b, c].
INSERT INTO partialtable VALUES (1, 'a', 'b', 'c');
Step 2: Update only c2. Because c1 and c3 are not specified, they retain their values. Data after this step: [1, a, d, c].
INSERT INTO partialtable(pk, c2) VALUES (1, 'd');
Step 3: Update only c3. Because c1 and c2 are not specified, they retain their values. Data after this step: [1, a, d, e].
INSERT INTO partialtable(pk, c3) VALUES (1, 'e');
Static mode example
The following Flink table definition restricts updates to the c2 column only. All writes to this table affect c2 regardless of what other values are provided.
CREATE TABLE PartialTable2 (
pk INT,
c1 STRING,
c2 STRING,
c3 STRING,
PRIMARY KEY(pk) NOT ENFORCED
) WITH (
'connector' = 'maxcompute',
'odps.end.point' = 'https://service.cn-hangzhou-vpc.maxcompute.aliyun-inc.com/api', -- VPC endpoint
'odps.project.name' = 'project_name',
'odps.namespace.schema' = 'true', -- Enable the three-layer model
'table.name' = 'project.schema.tablename',
'sink.operation' = 'upsert',
'upsert.write.bucket.num' = '1',
'upsert.partial-column.enable' = 'true',
'upsert.partial-column.name' = 'c2', -- Only update c2; primary key is included by default
'odps.access.id' = '<your-access-key-id>',
'odps.access.key' = '<your-access-key-secret>'
);
What's next
-
To learn more about writing data to Delta tables with Flink, see Use Flink to write data to a Delta table.
-
For a full reference on the partial column update feature in Delta tables, see Update data in specific columns.