All Products
Search
Document Center

MaxCompute:Near real-time partial column updates to Delta tables with Flink

Last Updated:Mar 26, 2026

When multiple Flink streams update different columns in the same row, a standard UPSERT overwrites the entire row—so the later stream silently erases the earlier stream's changes. Partial column updates let each stream write only the columns it owns, so concurrent updates from independent streams are preserved in the same row without conflict.

When to use this feature

  • Independent streams, same row: Two streams update different columns in the same record at the same time. Without partial column updates, the later write overwrites columns set by the earlier write, causing data loss.

  • Sparse writes: A stream provides values for only a subset of columns. Without partial column updates, unspecified columns are set to NULL, destroying existing data.

How it works

UPSERT combines INSERT and UPDATE into a single operation. Every record processed by UPSERT must include the primary key columns.

  • If no record with the given primary key exists, UPSERT inserts a new record.

  • If a record with the given primary key already exists, UPSERT updates it with the new values.

In stream processing with multiple table joins, different streams may update different columns in the same row. Standard UPSERT replaces the entire row, so one stream's updates can overwrite another stream's changes. Partial column updates solve this by restricting each stream's write to only the columns it owns.

Partial column update modes

MaxCompute provides two partial column update modes for Flink Connector.

Mode How it works When to use
Dynamic mode Automatically detects non-NULL columns in each record and updates only those columns. NULL columns remain unchanged. Column set is unknown in advance or varies per record
Static mode Updates only the columns you explicitly specify. All other columns retain their existing values. Column set is fixed and known in advance

The following table shows how the same sequence of writes produces different results across modes. Column a is the primary key.

Mode Initial data After writing (a, b, c) After writing (a, d, NULL) After writing (a, NULL, e)
Regular mode (null, null, null) (a, b, c) (a, d, null) (a, null, e)
Dynamic mode (null, null, null) (a, b, c) (a, d, c) (a, d, e)
Static mode (second column specified) (null, null, null) (a, b, null) (a, d, null) (a, null, null)

Prerequisites

Before you begin, ensure that you have:

  • A MaxCompute project with a Delta table that has partial column updates enabled

  • Flink Connector configured to connect to MaxCompute

Enable partial column updates on a Delta table

Set acid.partial.fields.update.enable=true in TBLPROPERTIES when creating the Delta table. For the full list of Delta table parameters, see Parameters for Delta tables.

CREATE TABLE IF NOT EXISTS partial_upsert_test
  (pk INT NOT NULL,
   c1 STRING,
   c2 STRING,
   c3 STRING,
   PRIMARY KEY(pk))
TBLPROPERTIES('transactional'='true', 'acid.partial.fields.update.enable'='true');

Configure Flink Connector for partial column updates

Parameters

Parameter Description
upsert.partial-column.enable Enables partial column updates. When upsert.partial-column.name is not set, dynamic mode is used.
upsert.partial-column.name Specifies the columns to update (static mode). Primary key columns are always included automatically.
upsert.partial-column.name must use the column names from the MaxCompute table, not the column names in the Flink internal table.
Partition key column names cannot be added to upsert.partial-column.name.

Dynamic mode example

The following Flink table definition enables dynamic partial column updates. The system automatically detects which columns contain non-NULL values and updates only those columns.

CREATE TABLE partialtable (
  pk INT,
  c1 STRING,
  c2 STRING,
  c3 STRING,
  PRIMARY KEY(pk) NOT ENFORCED
) WITH (
  'connector' = 'maxcompute',
  'odps.end.point' = 'https://service.cn-hangzhou-vpc.maxcompute.aliyun-inc.com/api', -- VPC endpoint
  'odps.project.name' = 'project_name',
  'odps.namespace.schema' = 'true',                    -- Enable the three-layer model
  'table.name' = 'project.schema.tablename',
  'sink.operation' = 'upsert',
  'upsert.write.bucket.num' = '1',
  'upsert.partial-column.enable' = 'true',             -- Enable dynamic mode
  'odps.access.id' = '<your-access-key-id>',
  'odps.access.key' = '<your-access-key-secret>'
);

The following example shows how dynamic mode preserves existing values across three sequential writes.

Step 1: Insert the initial record. Data after this step: [1, a, b, c].

INSERT INTO partialtable VALUES (1, 'a', 'b', 'c');

Step 2: Update only c2. Because c1 and c3 are not specified, they retain their values. Data after this step: [1, a, d, c].

INSERT INTO partialtable(pk, c2) VALUES (1, 'd');

Step 3: Update only c3. Because c1 and c2 are not specified, they retain their values. Data after this step: [1, a, d, e].

INSERT INTO partialtable(pk, c3) VALUES (1, 'e');

Static mode example

The following Flink table definition restricts updates to the c2 column only. All writes to this table affect c2 regardless of what other values are provided.

CREATE TABLE PartialTable2 (
  pk INT,
  c1 STRING,
  c2 STRING,
  c3 STRING,
  PRIMARY KEY(pk) NOT ENFORCED
) WITH (
  'connector' = 'maxcompute',
  'odps.end.point' = 'https://service.cn-hangzhou-vpc.maxcompute.aliyun-inc.com/api', -- VPC endpoint
  'odps.project.name' = 'project_name',
  'odps.namespace.schema' = 'true',                    -- Enable the three-layer model
  'table.name' = 'project.schema.tablename',
  'sink.operation' = 'upsert',
  'upsert.write.bucket.num' = '1',
  'upsert.partial-column.enable' = 'true',
  'upsert.partial-column.name' = 'c2',                 -- Only update c2; primary key is included by default
  'odps.access.id' = '<your-access-key-id>',
  'odps.access.key' = '<your-access-key-secret>'
);

What's next