Pipeline 通過節點群組合實現資料處理流程。本文介紹所有可用節點的功能及典型編排方式。
節點列表
基礎處理
節點 | 功能 |
欄位選取:從未經處理資料中選取並重新命名欄位,聲明 Pipeline 輸入 Schema。 | |
欄位擴充:基於運算式計算新列或覆蓋已有列,支援所有內建函數。 | |
條件過濾:按運算式篩選行,僅保留合格資料。 |
資料群組裝
節點 | 功能 |
執行個體構建:按分組鍵將離散事件彙總為行級樣本寬表(純 CPU 計算)。 |
資料清洗
節點 | 功能 |
精確去重:完全相同的文本記錄僅保留一條。 | |
近似去重:字面差異極小的文本記錄視為重複,僅保留一條。 | |
語義去重:表述不同但含義相同的文本記錄視為重複,僅保留一條。執行時自動產生向量擴充列 |
向量與聚類
節點 | 功能 |
向量產生:對文字欄位產生 Embedding 向量,結果寫入擴充列(如 | |
語義聚類:基於 Embedding 向量為資料分配簇 ID,結果寫入擴充列 |
統計與採樣
節點 | 功能 |
文檔統計:計算字元數、詞數、行數等文本指標。 | |
隨機採樣:按比例或固定條數抽樣,支援按分組鍵分層採樣。 |
AI 增強
節點 | 功能 |
LLM 調用:模板渲染 + 模型推理 + 輸出解析,適用於評估、標註、合成等情境。 | |
智能體調用:調用數字員工發起智能對話,適用於 SOP 分析、知識問答、資料洞察等情境。 |
Pipeline 編排指南
資料流全景
Pipeline 的資料流遵循"由粗到細、先減後增"的原則,各類節點按以下順序處理資料:
基礎處理:通過 project / extend / where 完成欄位選取、擴充和過濾。
資料群組裝:通過 make-instance 將離散事件彙總為行級樣本(多行合并為一行)。此步驟之後,每行代表一條完整樣本。
資料清洗:通過 dedup-exact / dedup-fuzzy / dedup-semantic 逐級去重,資料量遞減。
資料採樣:通過 semantic-cluster 聚類後,由 sample 按簇分組抽樣,進一步降低資料量。
AI 處理:通過llm-call / agentic-call 執行評估、標註、合成等 AI 任務,以及通過 doc-stats 計算統計指標。行數不變,僅增加列。
編排原則
原則 | 說明 |
Schema 前置 | Pipeline 以 |
組裝優先 | 離散事件數目據先用 |
先減後增 | 先做去重和採樣(減少行數),再做 AI 處理(增加列數)。LLM 調用成本高,應在資料量降低後執行。 |
由粗到細 | 去重順序:精確 - 近似 - 語義。計算代價遞增,但前置步驟已大幅削減資料量。 |
擴充列複用 | 上遊節點產生的擴充列(如 |
節點原子性 | 每個節點職責單一。聚類只標註簇 ID,採樣只過濾行,AI 只增列。通過 Pipeline 組合實現複雜邏輯。 |
使用樣本
欄位選取 + 擴充 + 過濾
基礎節點群組合:從未經處理資料中選取欄位,基於字串處理、JSON 解析、Regex擴充欄位,再按條件過濾。
{
"nodes": [
{
"id": "n1", "type": "project",
"parameters": { "question": "a", "input": "b", "output": "c" }
},
{
"id": "n2", "type": "extend",
"parameters": { "question": "regexp_extract(question, '使用者提問:(.*)', 1)" }
},
{
"id": "n3", "type": "where",
"parameters": { "filter": "length(question) > 10" }
}
]
}三級去重(精確 - 近似 - 語義)
逐級去重,計算代價遞增,資料量遞減。精確去重在前可大幅降低後續節點的計算開銷。
{
"nodes": [
{ "id": "n1", "type": "project", "parameters": { "question": "a", "input": "b", "output": "c" } },
{ "id": "n2", "type": "dedup-exact", "parameters": { "field": "question" } },
{ "id": "n3", "type": "dedup-fuzzy", "parameters": { "field": "question", "threshold": "3" } },
{ "id": "n4", "type": "dedup-semantic", "parameters": { "field": "question", "threshold": "0.1" } }
]
}參數說明:
threshold:去重相似性閾值。dedup-fuzzy的 threshold 為編輯距離(整數,建議值 1~5,值越大容忍的字面差異越多);dedup-semantic的 threshold 為語義距離(0~1 之間的浮點數,值越小表示越嚴格,建議值 0.05~0.2)。
多樣性採樣(去重 + 聚類 + 分組採樣)
語義去重後利用產生的向量列做聚類,每簇取樣保證語義多樣性。
{
"nodes": [
{ "id": "n1", "type": "project", "parameters": { "question": "a", "input": "b", "output": "c" } },
{ "id": "n2", "type": "dedup-exact", "parameters": { "field": "question" } },
{ "id": "n3", "type": "dedup-fuzzy", "parameters": { "field": "question", "threshold": "3" } },
{ "id": "n4", "type": "dedup-semantic", "parameters": { "field": "question", "threshold": "0.1" } },
{ "id": "n5", "type": "semantic-cluster", "parameters": { "field": "__dedup_emb", "n": 100 } },
{ "id": "n6", "type": "sample", "parameters": { "by": "__cluster_id", "n": 1 } }
]
}完整 AI Pipeline(去重 + 採樣 + 評估 + 標註)
將去重、聚類採樣與 LLM 評估和標註串聯,構建端到端資料處理流程。
{
"nodes": [
{ "id": "n1", "type": "project", "parameters": { "question": "a", "input": "b", "output": "c" } },
{ "id": "n2", "type": "dedup-exact", "parameters": { "field": "question" } },
{ "id": "n3", "type": "dedup-fuzzy", "parameters": { "field": "question", "threshold": "3", "global": true, "workspace": "my-ws", "dataset": "my-ds" } },
{ "id": "n4", "type": "dedup-semantic", "parameters": { "field": "question", "threshold": "0.1", "global": true, "workspace": "my-ws", "dataset": "my-ds" } },
{ "id": "n5", "type": "semantic-cluster", "parameters": { "field": "__dedup_emb", "n": 100 } },
{ "id": "n6", "type": "sample", "parameters": { "by": "__cluster_id", "n": 3 } },
{ "id": "n7", "type": "llm-call", "parameters": { "prompt": "@eval/prompt.md", "fields": "question,input,output", "format": "json", "as": "eval" } },
{ "id": "n8", "type": "llm-call", "parameters": { "prompt": "@anno/prompt.md", "fields": "output", "format": "json", "model": "qwen-plus", "as": "anno" } }
]
}參數說明:
global:設為true時,節點在全域範圍內執行去重(跨多次 Pipeline 運行累積判斷),而非僅在當前批次內去重。適用於增量資料情境。workspace:全域去重使用的工作空間名稱。在 CMS 控制台的工作空間管理頁面擷取。dataset:全域去重使用的資料集名稱。在對應工作空間的資料集列表中擷取。threshold:去重相似性閾值。取值說明參見上方"三級去重"樣本。
常見問題
節點配置的欄位名與 Logstore 實際欄位不匹配
錯誤現象:Pipeline 執行後輸出資料為空白或欄位值全部為 null。
可能原因:project 節點中配置的欄位名與 Logstore 中的實際欄位名不一致(區分大小寫)。
排查步驟:
在Log Service控制台查看目標 Logstore 的索引配置,確認欄位名的大小寫和拼字。
檢查
project節點的parameters中,鍵名(目標欄位名)和值(源欄位名)是否與 Logstore 實際欄位完全符合。修正欄位名後重新運行 Pipeline。
threshold 取值不當導致去重效果異常
錯誤現象:去重後資料量幾乎不變(閾值過嚴格)或大量資料被誤刪(閾值過寬鬆)。
可能原因:dedup-fuzzy 和 dedup-semantic 使用了不合適的 threshold 值。
排查步驟:
確認
dedup-fuzzy的 threshold 為編輯距離(整數),建議從 3 開始調整。值越大,容忍的字面差異越多。確認
dedup-semantic的 threshold 為語義距離(0~1 之間的浮點數),建議從 0.1 開始調整。值越小越嚴格。先使用小批量資料測試去重效果,確認 threshold 合理後再處理完整資料集。
節點順序錯誤導致執行失敗
錯誤現象:Pipeline 執行報錯,提示欄位不存在或類型不符。
可能原因:節點編排順序不符合資料流依賴關係。例如,在 project 之前使用了引用欄位的節點,或在 embedding 之前使用了 semantic-cluster。
排查步驟:
檢查報錯資訊中提示的缺失欄位,確認該欄位由哪個上遊節點產生。
參照本文"資料流全景"章節的處理順序,調整節點順序使其滿足依賴關係:基礎處理 > 資料群組裝 > 資料清洗 > 資料採樣 > AI 處理。
特別注意擴充列依賴:
semantic-cluster依賴__dedup_emb(由dedup-semantic或embedding產生),sample按__cluster_id分組時需先執行semantic-cluster。