本文以公開的保險資料集為例,基於雲原生資料庫PolarDB,在保險理賠情境中執行圖查詢,以識別異常理賠記錄和欺詐團夥。例如,查詢與欺詐保單涉及的相同理賠人的其他保單,或識別欺詐保單投保人的社交關係,以便進行欺詐預警。PolarDB在關係型資料庫的基礎上,提供圖分析能力,為企業的統一資料管理和分析提供強有力的支援。
關於圖資料庫引擎
圖分析是資料科學中的一個重要領域,專註於通過圖結構表示資料,並執行各種計算與分析任務。圖結構由節點(或稱為頂點)和邊組成,節點通常代表實體,而邊則表示實體之間的關係。圖計算廣泛應用於社交網路分析、推薦系統、知識圖譜、路徑最佳化等多個領域。
PolarDB PostgreSQL版高度相容Apache AGE的圖引擎,支援對知識圖譜的儲存和查詢檢索,能夠在同一個資料庫叢集上同時使用標準的ANSI SQL和圖查詢語言openCypher進行查詢。
完全相容PolarDB PostgreSQL版
AGE是PolarDB PostgreSQL版的一個擴充,可以在現有的PolarDB資料庫中使用,且無需重新構建資料庫。AGE繼承了PolarDB所有強大功能,包括事務、並發控制、以及多種索引和最佳化技術。
統一的圖形和關係型查詢
AGE允許同時處理關係型資料和圖形資料,支援在同一個查詢中混合使用SQL和圖查詢語言,使得處理複雜的資料模型更加容易和高效。
支援Cypher查詢語言
AGE支援使用Cypher查詢語言。Cypher查詢語言專為圖資料庫設計,文法簡單且靈活,提供了一種直觀的方式來進行圖資料的查詢和操作。
高效能
結合PolarDB的最佳化技術和專為圖資料設計的索引,AGE能夠高效地處理大規模圖形資料和複雜的圖形查詢。
綜上, 藉助於AGE強大的能力,PolarDB可以簡單、高效地處理各類圖查詢。
業務情境
情境描述
保險理賠欺詐通常基於保險供應商所擁有的患者、疾病及索賠等資料,分析與被保險人相關的理賠申請、疾病等實體之間的關聯關係,以識別異常理賠記錄並揭示潛在的欺詐團夥。
資料和模型
資料來源於保險領域公開資料集。資料包括保險行業的基本元素,資料模型可以抽象為下圖所示:
點:投保人(policyholder)、保單(incharge)、理賠(claim)、病人(patient)、疾病(disease)。
邊:疾病-病人(has_disease)、投保人-理賠(policyholder_of_claim)、保單-理賠(inchagre_of_claim)、病人-理賠(insured_of_claim)、相似理賠(similar_claim)、投保人關聯(policyholder_connection)。
屬性:姓名(name),是否高危(high_risk)、風險分數(risk_score)、疾病名稱(disease_name)、相似性(similarity_score)、關聯等級(level)、理賠時間(claim_date)、保額(charge)等。
最佳實務
資料庫準備
僅PolarDB PostgreSQL版 14且核心小版本2.0.14.12.24.0及以上版本支援圖引擎外掛程式,詳細說明請參見快速入門。
安裝外掛程式。
CREATE EXTENSION age;將外掛程式加入需要使用此外掛程式的資料庫或使用者的搜尋路徑和預先載入庫中。
說明使用Data Management(Data Management)用戶端設定
search_path時,可能會存在相容性問題,您可使用PolarDB-Tools執行相關語句。ALTER DATABASE <dbname> SET search_path = public,ag_catalog; ALTER USER <username> SET search_path = public,ag_catalog; ALTER DATABASE <dbname> SET session_preload_libraries TO 'age'; ALTER USER <username> SET session_preload_libraries TO 'age';
資料入庫
建立圖。使用位於
ag_catalog命名空間中的create_graph函數建立圖。SELECT create_graph('graph');插入節點和邊。由於下載的資料為CSV檔案,未包含所需的ID資訊,因此需要對資料進行轉換後再進行入庫操作。本文附錄中提供了將資料轉換為PolarDB中的vertex和edge的Python指令碼,轉換結果如下所示:
投保人(policyholder)
SELECT create_vlabel('graph','policyholder'); SELECT * FROM cypher('graph', $$ CREATE (:policyholder {policyholder_id:'PH3068',fname:'ADAM',lname:'OCHSENBEIN',risk_score:'88',high_risk:'1'}) $$ ) as (n agtype); SELECT * FROM cypher('graph', $$ CREATE (:policyholder {policyholder_id:'PH3069',fname:'MALINDA',lname:'MEHSERLE',risk_score:'42',high_risk:'0'}) $$ ) as (n agtype); SELECT * FROM cypher('graph', $$ CREATE (:policyholder {policyholder_id:'PH3070',fname:'SANDRA',lname:'KUHTA',risk_score:'20',high_risk:'0'}) $$ ) as (n agtype); ...理賠(claim)
- Create vlabel SELECT create_vlabel('graph','claim'); SELECT * FROM cypher('graph', $$ CREATE (:claim {claim_id:'C3571',charge:'6517.53',claim_date:'2013-08-11 00:00:00',duration:'13',insured_id:'28523',diagnosis:'no exception',person_incharge_id:'PI23070',type:'services',policyholder_id:'PH9507'}) $$ ) as (n agtype); SELECT * FROM cypher('graph', $$ CREATE (:claim {claim_id:'C3572',charge:'49273.65',claim_date:'2017-02-10 00:00:00',duration:'3',insured_id:'1220',diagnosis:'no exception',person_incharge_id:'PI21197',type:'services',policyholder_id:'PH406'}) $$ ) as (n agtype); SELECT * FROM cypher('graph', $$ CREATE (:claim {claim_id:'C3573',charge:'52005.98',claim_date:'2014-06-29 00:00:00',duration:'27',insured_id:'23735',diagnosis:'no exception',person_incharge_id:'PI22361',type:'services',policyholder_id:'PH7911'}) $$ ) as (n agtype); ...投保人(policyholder)和理賠(claim)的關聯關係
SELECT * FROM cypher('graph', $$ MATCH (a:claim), (b:policyholder) WHERE a.claim_id = 'C1528' AND b.policyholder_id = 'PH2963' CREATE (a)-[e:RELTYPE ]->(b) RETURN e$$) as (e agtype); SELECT * FROM cypher('graph', $$ MATCH (a:claim), (b:policyholder) WHERE a.claim_id = 'C1529' AND b.policyholder_id = 'PH1353' CREATE (a)-[e:RELTYPE ]->(b) RETURN e$$) as (e agtype); SELECT * FROM cypher('graph', $$ MATCH (a:claim), (b:policyholder) WHERE a.claim_id = 'C1530' AND b.policyholder_id = 'PH1071' CREATE (a)-[e:RELTYPE ]->(b) RETURN e$$) as (e agtype); SELECT * FROM cypher('graph', $$ MATCH (a:claim), (b:policyholder) WHERE a.claim_id = 'C1531' AND b.policyholder_id = 'PH8102' CREATE (a)-[e:RELTYPE ]->(b) RETURN e$$) as (e agtype); SELECT * FROM cypher('graph', $$ MATCH (a:claim), (b:policyholder) WHERE a.claim_id = 'C1532' AND b.policyholder_id = 'PH4768' CREATE (a)-[e:RELTYPE ]->(b) RETURN e$$) as (e agtype); ...
將轉換後的結果儲存為sql檔案,配合用戶端工具,如psql等可完成資料匯入。
使用樣本
簡單查詢
資料統計
統計各種類型節點數量。
SELECT count(*) FROM cypher('graph', $$ MATCH (v) RETURN v $$) as (v agtype);返回結果如下:
count -------- 120567理賠(claim)
SELECT count(*) FROM cypher('graph', $$ MATCH (v:claim) RETURN v $$) as (v agtype);返回結果如下:
count -------- 100001投保人(policyholder)
SELECT count(*) FROM cypher('graph', $$ MATCH (v:policyholder) RETURN v $$) as (v agtype);返回結果如下:
count ------- 10006保單(incharge)
SELECT count(*) FROM cypher('graph', $$ MATCH (v:incharge) RETURN v $$) as (v agtype);返回結果如下:
count ------- 10001疾病(disease)
SELECT count(*) FROM cypher('graph', $$ MATCH (v:disease) RETURN v $$) as (v agtype);返回結果如下:
count ------- 393病人(patient)
SELECT count(*) FROM cypher('graph', $$ MATCH (v:patient) RETURN v $$) as (v agtype);返回結果如下:
count ------- 166
過濾查詢、排序查詢
查詢理賠單C4377的投保、理賠、被保情況。
SELECT 'policyholder_id' as type, policyholder_id FROM cypher('graph', $$
MATCH (:claim {claim_id: 'C4377'})-[]->(policyholder:policyholder)
RETURN policyholder.policyholder_id
$$) AS (policyholder_id agtype)
UNION
SELECT 'incharge_id', incharge_id FROM cypher('graph', $$
MATCH (:claim {claim_id: 'C4377'})-[]->(v:incharge)
RETURN v.incharge_id
$$) AS (incharge_id agtype)
UNION
SELECT 'patient_id', patient_id FROM cypher('graph', $$
MATCH (:claim {claim_id: 'C4377'})-[]->(v:patient)
RETURN v.patient_id
$$) AS (patient_id agtype);返回結果如下:
type | policyholder_id
-----------------+-----------------
patient_id | "11279"
policyholder_id | "PH3759"
incharge_id | "PI26607"通用情境
K階鄰居
已知保單C4377為欺詐保單,查詢和理賠單C4377有相同理賠病人的理賠單,說明該理賠人有涉嫌騙保的嫌疑。
SELECT 'claim_id', claim_id FROM cypher('graph', $$ MATCH (:claim {claim_id: 'C4377'})-[]->(p:patient)<-[]-(c:claim) RETURN c.claim_id $$) AS (claim_id agtype);返回結果如下:
?column? | claim_id ----------+---------- claim_id | "C28963" claim_id | "C3679" claim_id | "C96545" claim_id | "C26586" claim_id | "C26754" claim_id | "C87278" claim_id | "C87603" claim_id | "C69395" claim_id | "C67594" claim_id | "C96155" claim_id | "C10160"查詢已知欺詐保單C4377的投保人的社交關係,可以對這些人的理賠情況提前預警。
SELECT 'policyholder_id', policyholder_id FROM cypher('graph', $$ MATCH (:claim {claim_id: 'C4377'})-[]->(a:policyholder)-[r*1..3]->(p:policyholder) RETURN p.policyholder_id $$) AS (policyholder_id agtype);返回結果如下:
?column? | policyholder_id -----------------+----------------- policyholder_id | "PH52532" policyholder_id | "PH11283" policyholder_id | "PH11328" policyholder_id | "PH1" policyholder_id | "PH5" policyholder_id | "PH512" policyholder_id | "PH1569" policyholder_id | "PH4722" policyholder_id | "PH4731"
路徑檢索
查詢投保人PH3759和投保人PH4722的路徑,分析投保人之間的關聯關係。
SELECT *
FROM cypher('graph', $$
MATCH path = (:policyholder {policyholder_id: 'PH3759'})-[r*1..3]->(:policyholder {policyholder_id: 'PH4722'})
RETURN path
$$) AS (v agtype);返回結果如下:
-------
[{"id": 844424930136988, "label": "policyholder", "properties": {"fname": "KURTIS", "lname": "ALKEMA", "high_risk": "1", "risk_score": "78", "policyholder_id": "PH3759"}}::vertex, {"id": 2251799813685487, "label": "RELTYPE", "end_id": 844424930133473, "start_id": 844424930136988, "properties": {"level": "65"}}::edge, {"id": 844424930133473, "label": "policyholder", "properties": {"fname": "TERRA", "lname": "SWARB", "high_risk": "0", "risk_score": "25", "policyholder_id": "PH512"}}::vertex, {"id": 2251799813685546, "label": "RELTYPE", "end_id": 844424930138502, "start_id": 844424930133473, "properties": {"level": "62"}}::edge, {"id": 844424930138502, "label": "policyholder", "properties": {"fname": "VETA", "lname": "SEDLACK", "high_risk": "0", "risk_score": "31", "policyholder_id": "PH1569"}}::vertex, {"id": 2251799813685594, "label": "RELTYPE", "end_id": 844424930136281, "start_id": 844424930138502, "properties": {"level": "92"}}::edge, {"id": 844424930136281, "label": "policyholder", "properties": {"fname": "DEANNA", "lname": "BALSER", "high_risk": "0", "risk_score": "36", "policyholder_id": "PH4722"}}::vertex]::path共同鄰居
查詢保單C4377和保單C67594的共同鄰居,從而找到兩個保單的共同投保人。
SELECT 'policyholder_id', policyholder_id FROM cypher('graph', $$
MATCH (:claim {claim_id: 'C4377'})-[]->(p:policyholder)<-[]-(:claim {claim_id: 'C67594'})
RETURN p.policyholder_id
$$) AS (policyholder_id agtype);返回結果如下:
?column? | policyholder_id
-----------------+-----------------
policyholder_id | "PH3759"協同推薦
已知保單C4377為欺詐保單,尋找和保單C4377有共同投保人的保單,從而找到欺詐疑似涉詐保單。
SELECT 'claim_id', claim_id FROM cypher('graph', $$ MATCH (:claim {claim_id: 'C4377'})-[]->(p:policyholder)<-[]-(c:claim) RETURN c.claim_id $$) AS (claim_id agtype);返回結果如下:
?column? | claim_id ----------+---------- claim_id | "C28963" claim_id | "C96545" claim_id | "C3679" claim_id | "C87603" claim_id | "C26754" claim_id | "C26586" claim_id | "C87278" claim_id | "C69395" claim_id | "C67594" claim_id | "C96155" claim_id | "C10160"與已知涉詐保單C4377相似性最大的保單,返回前20個:
WITH t AS ( SELECT claim_id, replace(trim(both '"' from to_jsonb(properties)::text), '\"', '"') AS similarity FROM cypher('graph', $$ MATCH (:claim {claim_id: 'C4377'})-[e]->(c:claim) RETURN properties(e), c.claim_id $$) AS (properties agtype, claim_id agtype) ) SELECT claim_id, replace((similarity::jsonb->'similarity_score')::text, '"','')::integer AS s FROM t ORDER BY s DESC LIMIT 20;返回結果如下:
claim_id | s ----------+---- "C67594" | 13 "C69395" | 13 "C10160" | 13 "C87603" | 13 "C28963" | 13 "C3679" | 13 "C26754" | 13 "C96155" | 13 "C26586" | 13 "C87278" | 13 "C96545" | 13 "C20113" | 8 "C70759" | 8 "C28785" | 8 "C12793" | 8 "C59736" | 8 "C38059" | 8 "C34068" | 8 "C71827" | 8 "C15760" | 8
總結
利用PolarDB PostgreSQL版的圖分析能力進行圖資料分析。PolarDB結合AGE擴充,提供圖資料計算分析的功能,包括使用Cypher查詢語言,高效處理查詢圖資料,為企業的統一資料管理和分析,提供強有力的支撐。
試用體驗
您可以訪問PolarDB免費試用頁面,選擇試用“雲原生資料庫PolarDB PostgreSQL版”,體驗GanosBase的圖計算能力。
附錄
資料轉換指令碼。
import csv
import os
def convert_vertex_csv(file_path, graph):
file_name = os.path.splitext(os.path.basename(file_path))[0].lower()
# create vlabel
print("------------------------------------------------")
print("-- Create vlabel")
print("SELECT create_vlabel('{}','{}');".format(graph, file_name))
with open(file_path, 'r') as csvfile:
reader = csv.reader(csvfile, delimiter=',')
header = next(reader)
for row in reader:
p = ""
for h in header:
if p != "":
p += ","
else:
p += "{"
p += "{}:'{}'".format(h.lower(), row[header.index(h)].strip())
if p != "":
p += "}"
print("SELECT * FROM cypher('{}', $$ CREATE (:{} {}) $$ ) as (n agtype);".format(graph, file_name, p))
def convert_edge_csv(file_path, graph, from_type, to_type):
file_name = os.path.splitext(os.path.basename(file_path))[0].lower()
with open(file_path, 'r') as csvfile:
reader = csv.reader(csvfile, delimiter=',')
header = next(reader)
for row in reader:
p = ""
for h in header:
if (h.endswith("ID")):
continue;
if p != "":
p += ","
else:
p += "{"
p += "{}:'{}'".format(h.lower(), row[header.index(h)].strip())
if p != "":
p += "}"
print("SELECT * FROM cypher('{0}', $$ MATCH (a:{1}), (b:{2}) WHERE a.{1}_id = '{3}' AND "
"b.{2}_id = '{4}' CREATE (a)-[e:RELTYPE {5} ]->(b) RETURN e$$) as (e agtype);".format(graph, from_type, to_type, row[0].strip(), row[1].strip(), p))
def generate_graph_csv(directory, graph):
print("------------------------------------------------")
print("-- Create graph")
print("SELECT create_graph('{}');".format(graph))
print("------------------------------------------------")
print("-- Create vertex")
convert_vertex_csv(directory + "/POLICYHOLDER.csv", graph)
convert_vertex_csv(directory + "/INCHARGE.csv", graph)
convert_vertex_csv(directory + "/PATIENT.csv", graph)
convert_vertex_csv(directory + "/CLAIM.csv", graph)
convert_vertex_csv(directory + "/DISEASE.csv", graph)
print("------------------------------------------------")
print("-- Create edge")
convert_edge_csv(directory + "/POLICYHOLDER_CONNECTION.csv", graph, 'policyholder','policyholder')
convert_edge_csv(directory + "/INCHARGE_OF_CLAIM.csv", graph,'claim', 'incharge')
convert_edge_csv(directory + "/CLAIM_SIMILARITY.csv", graph, 'claim','claim')
convert_edge_csv(directory + "/POLICYHOLDER_OF_CLAIM.csv", graph,'claim', 'policyholder')
convert_edge_csv(directory + "/INSURED_OF_CLAIM.csv", graph, 'claim','patient')
convert_edge_csv(directory + "/HAS_DISEASE.csv", graph, 'patient','disease')
generate_graph_csv("analyzing-insurance-claims-using-ibm-db2-graph-master/data", "graph")