全部產品
Search
文件中心

MaxCompute:Flink近即時部分列更新寫入Delta Table

更新時間:May 08, 2025

本文為您介紹Delta Table支援對部分列進行更新的使用情境和參數配置,以及Flink Connector設計的兩種部分列更新模式的介紹與相關配置。

背景資訊

  • UPSERT操作:結合了插入(INSERT)和更新(UPDATE)特性的資料庫功能,它通過確保每個經過UPSERT處理的記錄(或行)都必須包含主鍵列,實現了高效的資料操作。

  • UPSERT行為:取決於表中是否存在指定主鍵的資料。

    • 插入語義:當表中不存在指定主鍵的資料時,UPSERT將執行插入操作,將新記錄添加到表中。

    • 更新語義:當表中已存在指定主鍵的記錄時,UPSERT將執行更新操作,用提供的新資料更新現有資料。

  • UPSERT情境:流處理的多表串連中,涉及兩個不同資料流的更新操作影響同一表內的不同列。

    • 資料流StreamA負責更新列ColumnX

    • 資料流StreamB負責更新列ColumnY

  • UPSERT形式比較:

    • 傳統UPSERT:StreamB的更新可能會覆蓋StreamA對資料所做的修改,從而導致資料一致性問題。

    • 部分列更新功能:確保了在執行並發更新時,各個流之間不會發生衝突。它們只更新各自負責的列,同時保留同一行中所有流的更新結果。

使用情境

情境一:更新同行不同列,互不干擾

假設存在一個使用者資訊管理系統,該系統需要即時處理和更新使用者資料。這些資料被兩個獨立的服務流處理,它們分別從不同的資料來源接收資訊。

  • 資料流StreamA負責處理使用者的個人資訊,如姓名、年齡和性別。

  • 資料流StreamB負責處理使用者的聯絡資訊,如郵箱和電話號碼。

在實際業務中,使用者的個人資訊和聯絡資訊可能會幾乎同時發生變化。我們需要確保這些更新能夠即時反映在使用者資訊管理系統中,並不會相互覆蓋。

操作流程

  1. 使用者在不同的平台更新了姓名和電話號碼。StreamA接收到了姓名的更新,StreamB接收到了電話號碼的更新。

  2. StreamAStreamB均將更新發送到使用者資訊管理系統。

最終結果

  • 不使用部分列更新:若StreamB的更新在StreamA之後到達並處理,它將覆蓋StreamA剛剛更新的姓名資訊(如果StreamB以全行更新的方式進行操作),從而導致姓名恢複為舊值。

  • 使用部分列更新

    • StreamA進行更新時,只針對姓名列進行操作,而不會觸及聯絡資訊列。

    • StreamB進行更新時,只針對電話號碼列進行操作,而不會觸及個人資訊列。

    最終的結果是使用者的姓名被更新為最新的資訊,電話號碼也被更新為最新的資訊。而且這些更新是獨立進行的,互不干擾,確保了使用者資訊的完整性和準確性。

在實際應用中,部分列更新功能對於處理使用者資訊等資料至關重要。這一功能不僅保證了資料更新的即時性,還有效避免了資料不一致的問題。

情境二:更新行內部分欄位,其他不變

假設存在一個使用者資訊管理系統,該系統需要即時處理和更新使用者資料。這些資料被兩個獨立的服務流處理,它們分別從不同的資料來源接收資訊。

  • StreamA負責更新使用者個人資訊,如姓名、年齡、性別,以及使用者的聯絡資訊,如郵箱和電話號碼。

  • StreamB負責更新使用者個人資訊,如姓名、年齡、性別,以及使用者的聯絡資訊,如郵箱和電話號碼。與StreamA任務一致。

操作流程

  1. StreamA只希望更新使用者的年齡,命令可能是:INSERT INTO table (pk, age) VALUES (1, 3) ;

  2. 同時,StreamB只希望更新使用者的性別,命令可能是:INSERT INTO table (pk, sex) VALUES (1, 'male') ;

最終結果

  • 不使用部分列更新:主鍵為1的記錄若接收到上述更新命令,則除了待更新的欄位外,其他所有欄位將被設定為NULL。這將導致原有有效資料的丟失。

  • 使用動態更新

    • 當插入操作被觸發時,系統會識別到只有部分欄位包含資料。

    • 部分列更新機制會確保只有這些包含資料的欄位被更新。

    • 同時,那些在插入操作中沒有提供資料的欄位將保持其原有的值不變。

通過實現這種自動識別和更新策略,使用者資訊管理系統能夠在不丟失任何已有有效資料的前提下,精確地更新僅需變更的欄位。這顯著提升了資料管理的靈活性和準確性,為維護資料完整性提供了堅實的保障。

Flink Connector模式概述

根據Delta Table部分列更新的使用情境,Flink Connector精心設計了兩種部分列更新模式,以滿足不同的資料更新需求。

靜態模式

在靜態模式下,使用者需要提前指定將被資料流更新的列。這些被指定的列將遵循正常的UPSERT邏輯進行操作:

  • 如果主鍵存在,則更新資料。

  • 如果主鍵不存在,則插入新資料。

同時,未被指定更新的列將保持其現有值不變。這種模式適用於那些預期將會經常更改的列。

動態模式

動態模式賦予系統更高的智能化和自適應能力。在此模式下,系統能夠自動檢測資料流中哪些列包含非NULL值,並僅對這些值存在的列進行更新操作。這意味著資料流中未攜帶值(即值為NULL)的列將保持原樣不變。動態模式尤其適用於無法預先確定哪些列會發生變化的情況,確保了資料流每次更新的準確性和高效性。

通過引入這兩種更新模式,Flink Connector為使用者提供了更加靈活和強大的資料處理能力,允許他們根據實際情況選擇最合適的資料更新策略,從而保障了資料的準確性和完整性。

以下是不同模式每次更新同樣資料後的結果:

說明

本樣本資料的第一列a為主鍵。靜態模式下,主鍵列預設被選中。

模式

初始資料

第一步:更新(a, b, c)後資料

第二步:更新(a, d, null)後資料

第三步:更新(a, null, e)後最終資料

常規模式

(null, null, null)

(a, b, c)

(a, d, null)

(a, null, e)

動態模式

(null, null, null)

(a, b, c)

(a, d, c)

(a, d, e)

靜態模式(指定第二列更新)

(null, null, null)

(a, b, null)

(a, d, null)

(a, null, null)

使用方法

建立Delta Table並開啟部分列更新

具體方式是在tblproperties配置如下參數:acid.partial.fields.update.enable=true。詳情請參見Delta Table表參數

文法樣本如下:

CREATE TABLE IF NOT EXISTS partial_upsert_test
  (pk INT NOT NULL, 
   c1 STRING, 
   c2 STRING, 
   c3 STRING, 
   primary key(pk)) 
TBLPROPERTIES('transactional'='true', 'acid.partial.fields.update.enable'='true');

Flink Connector配置樣本

配置參數說明

為了配置部分列更新模式,MaxCompute引入了以下兩個配置參數:

參數

參數說明

upsert.partial-column.enable

此參數用於啟動部分列更新功能。若不指定列名(即upsert.partial-column.name參數為空白),則系統將採用動態模式(更新非NULL欄位)進行更新。

upsert.partial-column.name

此參數用於指定需要更新的列名。如果設定了此參數,系統將僅更新列出的欄位,其他欄位保持原值不變。

說明

主鍵列預設已選中,目前不能將分區列名加入到該參數中

動態部分列更新配置樣本

建立一個開啟動態部分列更新的表,樣本如下。

CREATE TABLE partialtable (
  pk INT,
  c1 STRING, 
  c2 STRING, 
  c3 STRING,
  PRIMARY KEY(pk) NOT ENFORCED
) WITH (
  'connector' = 'maxcompute',
  'odps.end.point' = 'https://service.cn-hangzhou-vpc.maxcompute.aliyun-inc.com/api', //VPC網路連接
  'odps.project.name' = 'project_name',
  'odps.namespace.schema' = 'true', //支援三層模型
  'table.name' = 'project.schema.tablename',
  'sink.operation' = 'upsert',
  'upsert.write.bucket.num' = '1',
  'upsert.partial-column.enable' = 'true', 
  'odps.access.id' = 'yourAccessId',
  'odps.access.key' = 'yourAccessKey'
);

後續對錶的操作及結果樣本如下。

  1. 向表中插入資料 [1,a, b, c],其中第一列為主鍵,初始資料為[1, a, b, c]

    INSERT INTO partialtable VALUES (1, 'a', 'b', 'c'); 
  2. 僅更新主鍵為1的記錄的第二列 c2d,更新後資料為[1, a, d, c]

    INSERT INTO partialtable(pk, c2) VALUES (1, 'd'); 
  3. 僅更新主鍵為1的記錄的第三列 c3e,更新後資料為[1, a, d, e]

    INSERT INTO partialtable(pk, c3) VALUES (1, 'e'); 

靜態部分列更新配置樣本

建立一個只對c2列進行更新的表,接下來,對該表的操作將隻影響 'c2' 列,其他列將保持不變。樣本如下。

CREATE TABLE PartialTable2 (
  pk INT,
  c1 STRING, 
  c2 STRING, 
  c3 STRING,
  PRIMARY KEY(pk) NOT ENFORCED
) WITH (
  'connector' = 'maxcompute',
  'odps.end.point' = 'https://service.cn-hangzhou-vpc.maxcompute.aliyun-inc.com/api', //VPC網路連接
  'odps.project.name' = 'project_name',
  'odps.namespace.schema' = 'true', //支援三層模型
  'table.name' = 'project.schema.tablename',
  'sink.operation' = 'upsert',
  'upsert.write.bucket.num' = '1',
  'upsert.partial-column.enable' = 'true', 
  'upsert.partial-column.name' = 'c2', // 指定只更新 'c2' 列
  'odps.access.id' = 'yourAccessId',
  'odps.access.key' = 'yourAccessKey'
);
說明

在配置upsert.partial-column.name參數時,應使用MaxCompute中表對應的列名,而不是Flink內部表的列名。這確保Flink能夠正確識別並更新儲存系統中的對應列。

相關文檔

  • 關於通過Flink寫入資料至Delta Table的操作實踐,請參見使用Flink寫入資料到Delta Table。您可以參考實踐過程進行部分列更新的參數配置來實現業務需求。

  • 關於部分列更新的相關介紹,請參見部分列更新