AgentLoop Pipeline 是資料處理流水線引擎,用於將 AI Agent 運行時日誌自動加工為高品質 Dataset。Pipeline 提供 13 個處理節點,覆蓋欄位選取、三級去重、語義聚類採樣、AI 評估/標註/合成等全鏈路資料處理能力。
Pipeline 是什麼
Pipeline 是 AgentLoop 的資料處理流水線引擎。它從 Logstore 中讀取未經處理資料,經過編排的多級處理節點(欄位選取、去重清洗、聚類採樣、AI 評估/標註/合成等),自動產出高品質 Dataset。
能力 | 說明 |
靈活編排 | 13 個處理節點自由組合,構建自訂資料處理管線 |
三級去重 | 精確、近似、語義逐級去重,從字面到語義全方位消除冗餘 |
多樣性採樣 | 語義聚類 + 分組採樣,在降量的同時保證資料多樣性 |
AI 增值 | 內建 LLM 調用和智能體調用,一站式完成評估、標註、合成 |
全域去重 | 跨批次增量去重,確保 Dataset 中無重複資料 |
定時調度 | 周期性自動執行,持續產出高品質資料 |
為什麼需要 Pipeline
AI Agent 在運行過程中會產生大量原始日誌——使用者提問、模型回答、工具調用、Token 消耗等。這些資料散落在不同欄位、存在大量重複、缺少品質標註。如果直接用於訓練或評測,會帶來雜訊、偏差和浪費。
Pipeline 解決的正是這個問題:將原始運行時日誌自動加工為可直接使用的高品質資料集。通過編排處理節點描述資料處理流程,Pipeline 自動完成清洗、標註和去重。
開始之前
使用 Pipeline 前,請確認以下前置條件已滿足:
已開通 AgentLoop服務。
已開通Log Service SLS,並擁有包含 AI Agent 作業記錄的 Logstore。
如需快速上手實操,請參見快速開始。本文側重於協助您理解 Pipeline 的核心概念和編排思維。
核心概念
Pipeline 由四大組件構成:資料來源(Source)、處理節點(Pipeline Nodes)、輸出目標(Sink)和調度策略(ExecutePolicy)。
資料來源(Source)
Pipeline 的輸入端,定義系統從哪裡讀取未經處理資料。目前支援從 Logstore 讀取,可以指定資料來源並配置過濾條件,精準圈定需要處理的資料範圍。
處理節點(Pipeline Nodes)
Pipeline 的核心處理邏輯,由一組按順序排列的節點群組成。每個節點完成一項原子任務(如欄位選取(project)、去重、採樣、AI 調用等),資料依次流過各節點逐步加工。
Pipeline 提供 13 個處理節點,按功能分為 6 大類:
類別 | 節點 | 一句話說明 |
基礎處理 |
| 從未經處理資料中選取並重新命名欄位,聲明 Pipeline 輸入 Schema |
| 基於運算式計算新列或覆蓋已有列 | |
| 按條件運算式過濾行 | |
資料群組裝 |
| 將離散事件按分組鍵彙總為行級樣本寬表(多行合一行) |
資料清洗 |
| 完全相同的文本記錄僅保留一條 |
| 高度相似(字面差異極小)的文本記錄視為重複 | |
| 表述不同但含義相同的文本記錄視為重複 | |
特徵計算 |
| 對文字欄位產生 Embedding 向量 |
| 計算字元數、詞數、行數等統計指標 | |
資料採樣 |
| 基於 Embedding 向量為資料分配簇 ID |
| 按比例或固定條數抽樣,支援分組 | |
AI 處理 |
| 模板渲染 + 模型推理 + 輸出解析,支援評估、標註、合成等 |
| 調用數字員工發起智能對話,支援分析、問答、洞察等 |
輸出目標(Sink)
Pipeline 的輸出端,定義處理結果寫到哪裡。處理結果自動寫入目標 Dataset,只需指定目標工作區(Workspace)和資料集(Dataset)名稱。
調度策略(ExecutePolicy)
控制 Pipeline 何時運行。支援定時周期執行——例如每 15 分鐘自動運行一次,系統自動按時間視窗拉取新資料並處理,實現持續資料產出。
資料處理的六個階段
Pipeline 的資料處理按邏輯分為六個階段,每個階段可根據需要選用。
第一階段:欄位選取與預先處理
為什麼需要?
原始日誌欄位繁多、命名各異,直接使用會導致下遊節點無法統一處理。這一階段的核心任務是「統一語言」——為整條 Pipeline 聲明統一的 Schema,同時過濾掉無效資料,減少後續無用計算。
什麼時候用?
幾乎所有 Pipeline 都需要。project(欄位選取)選取欄位幾乎是必選的第一步;extend(正則提取)在需要從嵌套欄位中提取資訊時使用;where(空值過濾)在資料中存在空值或無效記錄時使用。
第二階段:資料群組裝
為什麼需要?
AI Agent 運行時產生的資料往往是離散事件(如 OT Trace Span),每條只包含互動片段。要做有意義的去重、採樣和評估,需要先將這些片段「拼回」一條完整的互動記錄。
什麼時候用?
當資料來源是事件級資料(每次完整互動被拆成多條記錄)時,需要用 make-instance(執行個體構建)將它們彙總為行級樣本。如果資料已經是一行一條完整記錄,可以跳過這一階段。
make-instance 內建了三類彙總函式(選值、計算、組合),覆蓋了絕大多數資料群組裝需求。整個過程純 CPU 運算,零 AI 成本。
第三階段:三級去重
為什麼需要?
真實資料中存在大量重複——從完全相同的重複提交,到一字之差的錯別字變體,再到表述不同但含義相同的語義重複。這些冗餘會浪費儲存、扭曲分布、浪費 AI 調用成本。三級去重逐層消除不同深度的冗餘。
什麼時候用?
幾乎所有需要清洗的情境。三個層級可按需選用:
層級 | 節點 | 消除什麼類型的重複 | 計算代價 |
L1 精確去重 |
| 完全相同的記錄 | 極低 |
L2 近似去重 |
| 一字之差的變體(錯別字、標點、空格) | 低 |
L3 語義去重 |
| 不同表述但含義相同的記錄 | 中等 |
設計理念:計算代價 L1 < L2 < L3,排列在前的節點已大幅削減資料量,使高代價節點的輸入規模可控。這就是「由粗到細」的編排哲學。
全域去重:dedup-fuzzy 和 dedup-semantic 還支援全域模式,跨批次與目標 Dataset 中的已有資料做增量去重,確保 Dataset 中絕無重複。
第四階段:多樣性採樣
為什麼需要?
去重之後資料量仍可能很大。直接全部送入 AI 處理既不經濟也沒必要。但簡單隨機採樣可能導致某些語義類別被遺漏。「語義聚類 + 分組採樣」的組合確保每個語義類別都有代表,在降量的同時保證多樣性。
什麼時候用?
當需要構建評測資料集、訓練樣本集,或者需要控制 AI 調用成本時。聚類採樣可以用少量資料覆蓋全部語義空間。
semantic-cluster(語義聚類)可複用上遊 dedup-semantic(語義去重)產生的向量列,無需重複計算 Embedding。最終產出條數不超過簇數乘以每簇條數,可按預算逆推參數。
第五階段:AI 處理
為什麼需要?
資料清洗和採樣解決了「量」的問題,但資料的「品質標籤」和「增值資訊」仍然缺失。AI 處理階段通過 LLM 或智能體為每條資料增加評估、標註、合成等增值資訊,讓資料真正可用。
什麼時候用?
當需要自動化品質評分、結構化分類標註、資料增強合成、或通過智能體進行深度分析時。
Pipeline 提供兩個 AI 處理節點:
llm-call(LLM 調用):一次純 LLM 推理。通過 Prompt 模板和欄位預留位置,靈活實現評估、標註、合成、翻譯、過濾等任意任務。支援 JSON 結構化輸出,確保結果可解析。agentic-call(智能體調用):調用數字員工,內部封裝了完整的 SOP 流程、知識庫檢索、工具調用等能力,適合需要多步推理的複雜分析情境。
第六階段:特徵計算
為什麼需要?
有時需要為 Dataset 補充額外的特徵資訊——文本的向量表示(用於後續檢索或聚類)、或字元數/詞數/行數等統計指標(用於資料分析和品質監控)。
什麼時候用?
embedding(向量產生)適用於無上遊向量列時獨立產生向量;doc-stats(文本統計)常放在 Pipeline 末尾,為 Dataset 補充文本統計資訊。
典型情境
以下 6 個情境覆蓋了最常見的資料處理需求。每個情境展示節點群組合方式,協助快速理解「什麼情境用什麼節點群組合」。
情境一:資料去重清洗
目標:快速去除重複資料,產出乾淨 Dataset。零 AI 調用成本,秒級處理萬級資料。
適用情境:日誌中存在大量重複提問,只需要一份乾淨去重的基礎資料集。
節點鏈路:LogStore → project(欄位選取) → where(空值過濾) → dedup-exact(精確去重) → dedup-fuzzy(近似去重) → Dataset
先選取關鍵字段、過濾空值,再用精確 + 近似兩級去重消除重複。純 CPU 運算,無 AI 成本,適合作為所有情境的起步配置。
情境二:多樣性採樣
目標:三級去重 + 聚類採樣,構建覆蓋各語義類別的代表性子集。
適用情境:需要構建評測資料集或訓練樣本,要求資料多樣性而非數量。
節點鏈路:LogStore → project → dedup-exact → dedup-fuzzy → dedup-semantic → semantic-cluster → sample → Dataset
完整的三級去重消除各層次冗餘,語義聚類確保每個類別都有代表性樣本入選。語義去重產生的向量列可直接複用於聚類,無需重複計算。
情境三:AI 品質評估
目標:去重採樣後自動化品質評分,替代人工評審。
適用情境:需要快速瞭解 Agent 回答品質的整體水平,或在大規模資料中篩選出高/低品質樣本。
節點鏈路:LogStore → project → dedup-exact → sample(降量) → llm-call(多維評估) → Dataset
先去重採樣將資料量降到可控範圍,再通過 LLM 多維評估對每條資料打分。評估結果以 JSON 結構化輸出,可直接用於彙總分析和品質監控。
情境四:資料增強合成
目標:從種子資料出發,通過 LLM 合成多樣化樣本,擴充訓練資料。
適用情境:訓練資料不足,需要通過改寫、雜訊注入、追問產生、對抗樣本等方式擴充資料量和多樣性。
節點鏈路:LogStore → project → sample(種子採樣) → llm-call(合成) → Dataset
採樣一批種子資料,通過 LLM 合成多種變體。超長 Prompt 建議註冊為命名模板,便於版本管理和複用。
情境五:端到端全流程
目標:清洗 → 採樣 → 評估 → 標註 → 合成 → 統計,一站式產出高品質 Dataset。
適用情境:需要一次性完成從原始日誌到帶多維標註的高品質資料集的全鏈路處理。這是 Pipeline 最完整的參考配置。
節點鏈路:LogStore → project + extend → where → dedup-exact → dedup-fuzzy(全域) → dedup-semantic(全域) → semantic-cluster → sample → llm-call x 3(評估 + 標註 + 合成) → doc-stats → Dataset
10,000 條未經處理資料經三級去重(含全域去重)+ 聚類採樣後約剩 300 條,再做 3 輪 LLM 調用(評估 + 標註 + 合成),最後補充文本統計,最終產出帶多維標註的高品質 Dataset。
情境六:OT-Trace 資料處理
目標:從 OT Span 離散事件組裝為完整樣本,再清洗評估。
適用情境:AI Agent 接入了 OpenTelemetry 採集規範,運行時資料以離散 Span 形式儲存,需要先組裝為完整互動記錄再進行後續處理。
節點鏈路:OT Span → extend(JSON 提取) → where(事件過濾) → make-instance(Trace 彙總) → where(空值過濾) → dedup-exact → sample → llm-call(評估) → Dataset
先用 extend 從嵌套 JSON 中提取扁平欄位,where 過濾無關事件類型,make-instance 按 Trace 彙總為樣本寬表,再進入常規的去重、採樣、評估流程。資料群組裝階段是這個情境的關鍵差異點。
編排思維
掌握以下思維模式,可以為任意情境設計高效的 Pipeline。
先問自己三個問題
編排前先明確以下問題:
資料長什麼樣?是已經一行一條完整記錄,還是離散事件需要組裝?欄位嵌套在 JSON 裡嗎?——這決定了是否需要
make-instance和extend。最終要什嗎?是乾淨的去重資料集,還是帶評分的評測集,還是用於訓練的增強資料?——這決定了需要哪些 AI 處理節點。
預算和時效要求是什嗎?LLM 調用有成本,處理有時間——這決定了採樣策略和模型選擇。
四條編排哲學
哲學 | 核心思想 | 為什麼這樣做 |
Schema 前置 | 以 | 後續所有節點使用同一套欄位名,避免命名混亂 |
先減後增 | 先去重/採樣(減少行數),再 AI 處理(增加列數) | LLM 調用按 Token 計費。10,000 條資料全部做 AI 評估 vs 300 條做 AI 評估,成本差 30 倍以上 |
由粗到細 | 去重順序:精確 → 近似 → 語義 | 計算代價遞增,但前置步驟已大幅削減資料量,使高代價節點的輸入規模可控 |
節點原子,組合永珍 | 每個節點只做一件事,複雜邏輯通過組合實現 | 聚類只標註簇 ID,採樣只過濾行,AI 只增列。靈活插拔,創造出適合自己情境的獨特組合 |
靈活編排
上面的六個典型情境是常見模式,但不是唯一選擇。Pipeline 的節點可以自由組合:
跳過某些去重層級——如果資料幾乎沒有近似重複,直接用精確去重 + 語義去重
多輪 AI 處理——先評估打分,再按分數過濾,然後對優質資料做標註和合成
採樣前置——如果未經處理資料量巨大但只需要少量樣本,可以在去重之前就先採樣降量
純 AI 增值——資料已經很乾淨,只需要加
llm-call做評估或標註
根據業務情境特點自由組合節點,構建適合實際需求的 Pipeline。
預置模板導航
Pipeline 提供 9 個預置模板,覆蓋從簡單去重到端到端全流程的各類情境。支援直接使用或基於模板定製。
按目標快速選擇
目標 | 推薦模板 | 複雜度 |
快速清理重複資料 | Agent 資料去重清洗 | 低 |
三級去重 + 聚類採樣 | 多樣性採樣 | 中 |
自動化品質評分 | 對話品質評估 | 中 |
結構化多維標註 | 自動標註分類 | 中 |
種子資料擴充 | 資料增強合成 | 中 |
端到端全流程 | 端到端全流程 | 高 |
快速體驗全部能力 | 模擬資料 Demo | 低~高 |
分析 LLM 調用品質 | OT-LLM 品質分析 | 高 |
Trace 級資料治理 | OT-Trace 資料治理 | 高 |
按角色選擇
角色 | 推薦模板 |
資料工程師 | 去重清洗、多樣性採樣 |
演算法工程師 | 多樣性採樣、品質評估、資料增強 |
評測工程師 | 品質評估、自動標註 |
資料平台團隊 | 全流程、模擬 Demo |
新使用者上手 | 模擬資料 Demo |
OT 資料使用者 | OT-LLM 品質分析、OT-Trace 資料治理 |
使用方式
直接使用:選擇模板,修改資料來源和輸出目標配置,調整欄位對應為實際日誌欄位,提交建立。
基於模板定製:選擇最接近的模板,參考其文檔中的「定製建議」,增減節點、調整參數、替換 Prompt。
自由組合:不同模板的節點可自由混搭——如去重清洗模板的清洗鏈 + 品質評估模板的評估 Prompt。
最佳實務
Pipeline 設計
從簡單開始:先用 2~3 個節點驗證資料流轉,確認欄位對應正確後再逐步添加更多節點。
關注資料量變化:在每個節點後觀察行數變化,確保去重和採樣的效果符合預期。
Prompt 獨立管理:將 LLM 的 Prompt 註冊為命名模板(
@path引用),與 Pipeline 配置解耦,便於獨立迭代。善用預置模板:從最接近的模板出發定製,比從零開始編排效率高得多。
成本控制
先減後增是鐵律:LLM 調用按 Token 計費,務必在去重 + 採樣後再執行。以 10,000 條未經處理資料為例,經三級去重 + 採樣後約剩 300 條,LLM 調用成本降低 97%。
模型分級使用:評估等高精度任務用
qwen-max,標註等相對簡單的任務用qwen-plus或qwen-turbo,靈活控製成本。採樣規模可算:最終資料量約等於聚類簇數乘以每簇採樣條數,可按預算逆推參數。
調度間隔合理:定時調度的間隔應與資料量和預算匹配,避免過於頻繁導致成本失控。
全域去重
首批資料無效果:全域去重是與目標 Dataset 已有資料比對,首批執行時 Dataset 為空白,效果從第二批開始顯現。
僅在必要時啟用:全域去重需要額外的向量計算和 Dataset 查詢,如果 Pipeline 不是增量運行(只跑一次),可以省略。
目標必須一致:全域去重的目標必須與輸出 Dataset 一致,否則去重無意義。
Schema 一致性
統一命名:通過
project在 Pipeline 入口統一欄位名,下遊所有節點使用相同的列名引用資料。注意系統擴充列:系統自動產生的擴充列以
__開頭(如__dedup_emb、__cluster_id、__doc_stats),避免自訂欄名與之衝突。關注列的來源:當 Pipeline 較長時,注意確認下遊節點引用的列是否已在上遊產生。
常見問題
入門常見問題
Q1:Pipeline 和 Dataset 是什麼關係?
Pipeline 是資料加工引擎,Dataset 是資料存放區。Pipeline 從 Logstore 讀取未經處理資料,經過處理後寫入 Dataset。一個 Pipeline 對應一個輸出 Dataset。
Q2:應該從哪個模板開始?
如果是新使用者,建議從模擬資料Demo模板開始——它基於 Mock 資料,無需準備真實日誌即可體驗 Pipeline 的全部能力。確認理解後,再選擇與業務情境最匹配的模板。
Q3:Pipeline 建立後多久開始執行?
配置調度策略後,Pipeline 會按設定的間隔自動執行。首次執行從指定的起始時間開始,之後按周期運行。
Q4:一個 Pipeline 可以輸出到多個 Dataset 嗎?
目前一個 Pipeline 對應一個輸出 Dataset。如果需要將資料寫入多個 Dataset,可以建立多個 Pipeline,共用相同的資料來源但配置不同的處理邏輯和輸出目標。
編排與設計
Q5:節點的執行順序是怎樣的?
嚴格按節點列表的順序依次執行,前一個節點的輸出是後一個節點的輸入。不支援分支或並行。
Q6:Prompt 模板怎麼引用欄位?
在 Prompt 中使用 {{列名}} 預留位置,並在配置中聲明所有參與渲染的列名。例如:Prompt 寫「請評估:{{question}} 的回答:{{output}}」,對應聲明 question,output。系統會自動校正兩者的一致性。
Q7:上遊產生的擴充列可以在下遊使用嗎?
可以。上遊節點產生的擴充列(如語義去重的向量列、聚類的簇 ID 列)對下遊完全可見,可直接引用。這也是「擴充列複用」原則的核心——避免重複計算。
Q8:make-instance 的分組鍵怎麼選?
分組鍵應選擇能唯一標識一次完整互動的欄位組合。例如 OT-Trace 情境下,session_id + traceId 可以將同一次請求的所有 Span 彙總為一行樣本。
成本與效能
Q9:LLM 調用成本怎麼控制?
三個手段:(1)先去重 + 採樣,大幅減少調用行數;(2)選擇性價比合適的模型;(3)控制採樣數量限制最終資料量。以 300 條 x 3 輪 LLM 調用為例,約 20~50 元(視模型定價)。
Q10:去重閾值怎麼選?
近似去重推薦閾值 3(越小越嚴格),語義去重推薦閾值 0.1(越小越嚴格)。可先用預設值運行,觀察去重效果後再微調。
Q11:Pipeline 處理大規模資料需要多久?
純 CPU 節點(欄位選取、去重、採樣等)處理萬級資料通常在秒級完成。LLM 調用是主要耗時環節,300 條資料 x 單輪 LLM 調用約 10~30 分鐘,具體取決於模型和 Prompt 複雜度。
故障排查
Q12:Pipeline 執行失敗時如何查看錯誤資訊?
在 Pipeline 列表中單擊失敗的執行記錄,查看執行日誌和錯誤詳情。常見錯誤類型包括:
資料來源問題:Logstore 無資料、欄位名錯誤、查詢語法錯誤。
配置問題:節點參數錯誤、欄位對應不一致、命名模板未註冊。
許可權問題:無 Logstore 讀取許可權、無 Dataset 寫入許可權。
資源問題:處理逾時、記憶體不足。
Q13:Pipeline 執行失敗了怎麼排查?
檢查三個常見原因:(1)欄位對應中引用的原始欄位名是否與 Logstore 實際欄位一致;(2)節點中聲明的列是否在上遊已產生;(3)命名模板是否已正確註冊。詳細錯誤資訊可在執行日誌中查看。
Q14:全域去重配置後沒有效果?
首批執行時目標 Dataset 為空白,全域去重無資料可比對,效果從第二批開始顯現。另外請確認全域去重的目標與輸出 Dataset 一致。
Q15:LLM 返回結果不是合法 JSON 怎麼辦?
配置 JSON 輸出格式後,系統會自動重試並修複非法 JSON。如果仍然失敗,輸出中的錯誤資訊欄位會記錄具體原因,可用條件過濾出這些異常行進行排查。
Q16:資料量沒有按預期減少怎麼排查?
可能原因:(1)去重欄位選擇不當——確保選擇最能代表資料唯一性的文字欄位(如使用者提問而非系統輸出);(2)閾值過於嚴格——適當放寬;(3)資料本身重複率低——這是正常情況,說明資料品質較好。