本文為您介紹Delta Table支援對部分列進行更新的使用情境和參數配置,以及Flink Connector設計的兩種部分列更新模式的介紹與相關配置。
背景資訊
UPSERT操作:結合了插入(INSERT)和更新(UPDATE)特性的資料庫功能,它通過確保每個經過UPSERT處理的記錄(或行)都必須包含主鍵列,實現了高效的資料操作。
UPSERT行為:取決於表中是否存在指定主鍵的資料。
插入語義:當表中不存在指定主鍵的資料時,UPSERT將執行插入操作,將新記錄添加到表中。
更新語義:當表中已存在指定主鍵的記錄時,UPSERT將執行更新操作,用提供的新資料更新現有資料。
UPSERT情境:流處理的多表串連中,涉及兩個不同資料流的更新操作影響同一表內的不同列。
資料流
StreamA負責更新列ColumnX。資料流
StreamB負責更新列ColumnY。
UPSERT形式比較:
傳統UPSERT:
StreamB的更新可能會覆蓋StreamA對資料所做的修改,從而導致資料一致性問題。部分列更新功能:確保了在執行並發更新時,各個流之間不會發生衝突。它們只更新各自負責的列,同時保留同一行中所有流的更新結果。
使用情境
情境一:更新同行不同列,互不干擾
假設存在一個使用者資訊管理系統,該系統需要即時處理和更新使用者資料。這些資料被兩個獨立的服務流處理,它們分別從不同的資料來源接收資訊。
資料流
StreamA負責處理使用者的個人資訊,如姓名、年齡和性別。資料流
StreamB負責處理使用者的聯絡資訊,如郵箱和電話號碼。
在實際業務中,使用者的個人資訊和聯絡資訊可能會幾乎同時發生變化。我們需要確保這些更新能夠即時反映在使用者資訊管理系統中,並不會相互覆蓋。
操作流程
使用者在不同的平台更新了姓名和電話號碼。
StreamA接收到了姓名的更新,StreamB接收到了電話號碼的更新。StreamA和StreamB均將更新發送到使用者資訊管理系統。
最終結果
不使用部分列更新:若
StreamB的更新在StreamA之後到達並處理,它將覆蓋StreamA剛剛更新的姓名資訊(如果StreamB以全行更新的方式進行操作),從而導致姓名恢複為舊值。使用部分列更新:
StreamA進行更新時,只針對姓名列進行操作,而不會觸及聯絡資訊列。StreamB進行更新時,只針對電話號碼列進行操作,而不會觸及個人資訊列。
最終的結果是使用者的姓名被更新為最新的資訊,電話號碼也被更新為最新的資訊。而且這些更新是獨立進行的,互不干擾,確保了使用者資訊的完整性和準確性。
在實際應用中,部分列更新功能對於處理使用者資訊等資料至關重要。這一功能不僅保證了資料更新的即時性,還有效避免了資料不一致的問題。
情境二:更新行內部分欄位,其他不變
假設存在一個使用者資訊管理系統,該系統需要即時處理和更新使用者資料。這些資料被兩個獨立的服務流處理,它們分別從不同的資料來源接收資訊。
StreamA負責更新使用者個人資訊,如姓名、年齡、性別,以及使用者的聯絡資訊,如郵箱和電話號碼。StreamB負責更新使用者個人資訊,如姓名、年齡、性別,以及使用者的聯絡資訊,如郵箱和電話號碼。與StreamA任務一致。
操作流程
StreamA只希望更新使用者的年齡,命令可能是:INSERT INTO table (pk, age) VALUES (1, 3) ;。同時,
StreamB只希望更新使用者的性別,命令可能是:INSERT INTO table (pk, sex) VALUES (1, 'male') ;。
最終結果
不使用部分列更新:主鍵為1的記錄若接收到上述更新命令,則除了待更新的欄位外,其他所有欄位將被設定為NULL。這將導致原有有效資料的丟失。
使用動態更新:
當插入操作被觸發時,系統會識別到只有部分欄位包含資料。
部分列更新機制會確保只有這些包含資料的欄位被更新。
同時,那些在插入操作中沒有提供資料的欄位將保持其原有的值不變。
通過實現這種自動識別和更新策略,使用者資訊管理系統能夠在不丟失任何已有有效資料的前提下,精確地更新僅需變更的欄位。這顯著提升了資料管理的靈活性和準確性,為維護資料完整性提供了堅實的保障。
Flink Connector模式概述
根據Delta Table部分列更新的使用情境,Flink Connector精心設計了兩種部分列更新模式,以滿足不同的資料更新需求。
靜態模式
在靜態模式下,使用者需要提前指定將被資料流更新的列。這些被指定的列將遵循正常的UPSERT邏輯進行操作:
如果主鍵存在,則更新資料。
如果主鍵不存在,則插入新資料。
同時,未被指定更新的列將保持其現有值不變。這種模式適用於那些預期將會經常更改的列。
動態模式
動態模式賦予系統更高的智能化和自適應能力。在此模式下,系統能夠自動檢測資料流中哪些列包含非NULL值,並僅對這些值存在的列進行更新操作。這意味著資料流中未攜帶值(即值為NULL)的列將保持原樣不變。動態模式尤其適用於無法預先確定哪些列會發生變化的情況,確保了資料流每次更新的準確性和高效性。
通過引入這兩種更新模式,Flink Connector為使用者提供了更加靈活和強大的資料處理能力,允許他們根據實際情況選擇最合適的資料更新策略,從而保障了資料的準確性和完整性。
以下是不同模式每次更新同樣資料後的結果:
本樣本資料的第一列a為主鍵。靜態模式下,主鍵列預設被選中。
模式 | 初始資料 | 第一步:更新(a, b, c)後資料 | 第二步:更新(a, d, null)後資料 | 第三步:更新(a, null, e)後最終資料 |
常規模式 | (null, null, null) | (a, b, c) | (a, d, null) | (a, null, e) |
動態模式 | (null, null, null) | (a, b, c) | (a, d, c) | (a, d, e) |
靜態模式(指定第二列更新) | (null, null, null) | (a, b, null) | (a, d, null) | (a, null, null) |
使用方法
建立Delta Table並開啟部分列更新
具體方式是在tblproperties配置如下參數:acid.partial.fields.update.enable=true。詳情請參見Delta Table表參數。
文法樣本如下:
CREATE TABLE IF NOT EXISTS partial_upsert_test
(pk INT NOT NULL,
c1 STRING,
c2 STRING,
c3 STRING,
primary key(pk))
TBLPROPERTIES('transactional'='true', 'acid.partial.fields.update.enable'='true');Flink Connector配置樣本
配置參數說明
為了配置部分列更新模式,MaxCompute引入了以下兩個配置參數:
參數 | 參數說明 |
upsert.partial-column.enable | 此參數用於啟動部分列更新功能。若不指定列名(即 |
upsert.partial-column.name | 此參數用於指定需要更新的列名。如果設定了此參數,系統將僅更新列出的欄位,其他欄位保持原值不變。 說明 主鍵列預設已選中,目前不能將分區列名加入到該參數中。 |
動態部分列更新配置樣本
建立一個開啟動態部分列更新的表,樣本如下。
CREATE TABLE partialtable (
pk INT,
c1 STRING,
c2 STRING,
c3 STRING,
PRIMARY KEY(pk) NOT ENFORCED
) WITH (
'connector' = 'maxcompute',
'odps.end.point' = 'https://service.cn-hangzhou-vpc.maxcompute.aliyun-inc.com/api', //VPC網路連接
'odps.project.name' = 'project_name',
'odps.namespace.schema' = 'true', //支援三層模型
'table.name' = 'project.schema.tablename',
'sink.operation' = 'upsert',
'upsert.write.bucket.num' = '1',
'upsert.partial-column.enable' = 'true',
'odps.access.id' = 'yourAccessId',
'odps.access.key' = 'yourAccessKey'
);後續對錶的操作及結果樣本如下。
向表中插入資料 [1,a, b, c],其中第一列為主鍵,初始資料為
[1, a, b, c]。INSERT INTO partialtable VALUES (1, 'a', 'b', 'c');僅更新主鍵為
1的記錄的第二列c2為d,更新後資料為[1, a, d, c]。INSERT INTO partialtable(pk, c2) VALUES (1, 'd');僅更新主鍵為
1的記錄的第三列c3為e,更新後資料為[1, a, d, e]。INSERT INTO partialtable(pk, c3) VALUES (1, 'e');
靜態部分列更新配置樣本
建立一個只對c2列進行更新的表,接下來,對該表的操作將隻影響 'c2' 列,其他列將保持不變。樣本如下。
CREATE TABLE PartialTable2 (
pk INT,
c1 STRING,
c2 STRING,
c3 STRING,
PRIMARY KEY(pk) NOT ENFORCED
) WITH (
'connector' = 'maxcompute',
'odps.end.point' = 'https://service.cn-hangzhou-vpc.maxcompute.aliyun-inc.com/api', //VPC網路連接
'odps.project.name' = 'project_name',
'odps.namespace.schema' = 'true', //支援三層模型
'table.name' = 'project.schema.tablename',
'sink.operation' = 'upsert',
'upsert.write.bucket.num' = '1',
'upsert.partial-column.enable' = 'true',
'upsert.partial-column.name' = 'c2', // 指定只更新 'c2' 列
'odps.access.id' = 'yourAccessId',
'odps.access.key' = 'yourAccessKey'
);在配置upsert.partial-column.name參數時,應使用MaxCompute中表對應的列名,而不是Flink內部表的列名。這確保Flink能夠正確識別並更新儲存系統中的對應列。
相關文檔
關於通過Flink寫入資料至Delta Table的操作實踐,請參見使用Flink寫入資料到Delta Table。您可以參考實踐過程進行部分列更新的參數配置來實現業務需求。
關於部分列更新的相關介紹,請參見部分列更新。