端到端全流程模板覆蓋 AI Agent 運行時資料治理的全部環節(清洗、去重、採樣、評估、標註、合成、統計),展示全部運算元的協同編排,可直接用於生產環境的周期性調度任務。
模板概述
端到端全流程模板覆蓋 AI Agent 運行時資料治理的全部環節,從原始日誌中產出高品質 Dataset。
本模板將原始 OpenTelemetry 日誌經過 7 個階段處理,最終產出經過去重、採樣、評估、標註和合成的高品質資料集。整條 Pipeline 串聯了全部 13 個運算元節點,涵蓋project、 extend、where、make-instance、dedup-exact、dedup-fuzzy、dedup-semantic、embedding、doc-stats、semantic-cluster、sample、llm-call 和 agentic-call 等運算元類型。關於所有可用運算元,請參見節點總覽。
適用人群
資料平台團隊:構建完整的資料治理流水線。
AI 基礎設施工程師:實現周期性資料處理和任務調度。
產品/技術負責人:評估 AI Agent 業務和能力全景。
功能特點
端到端資料治理,從原始日誌到高品質 Dataset。
三級去重(精確、近似、語義)+ 全域去重,確保資料無冗餘。
LLM 三輪處理(評估 + 標註 + 合成),一次性完成所有 AI 增值。
支援直接部署為生產環境的周期性調度任務。
Pipeline 流程
Pipeline 分為 7 個階段,依次執行:
階段 | 運算元 | 功能 | 說明 |
1 | 欄位提取與過濾 | 從原始 OpenTelemetry 日誌中提取 session_id、question、output 等關鍵字段,過濾指定 span 類型。 | |
2 | 會話彙總 | 按 session_id 和 traceId 彙總多條 Span,組裝為一個完整的 Agent 會話執行個體。 | |
3 | 三級去重 | 依次執行精確去重(SimHash 指紋)、近似去重(海明距離)和語義去重(向量距離),支援跨批次全域去重。資料量大幅下降。 | |
4 | 多樣性採樣 | 語義聚類(複用 | |
5 | llm-call x 3 | AI 多輪處理 | 三次 LLM 調用:多維度品質評估、結構化分類標註、多類型資料合成。 |
6 | 文檔統計 | 計算 question 欄位的字元數、詞數、行數等基礎統計指標。 | |
7 | 輸出 | 寫入 Dataset | 將處理結果寫入目標 Dataset,完成端到端資料治理。 |
Pipeline 遵循先減後增原則:先通過去重和採樣大幅減少資料量(行數遞減),再通過 LLM 調用豐富資料維度(列數遞增)。LLM 調用成本較高,務必在資料量降下來之後執行。
完整配置
Pipeline 支援 JSON API 配置格式。關於 Pipeline 的基本概念和建立方式,請參見Pipeline 概述。
以下為完整的 JSON API 配置。將 your-project、your-agent-logstore、your-workspace、your-dataset 等預留位置替換為實際值。
{
"name": "full_pipeline",
"description": "端到端全流程:清洗→採樣→評估→標註→合成,一站式 Agent 資料治理",
"source": {
"type": "logstore",
"logstore": {
"project": "your-project",
"logstore": "your-agent-logstore",
"query": "serviceName:your-agent-service and *"
}
},
"pipeline": {
"nodes": [
{
"id": "select_fields",
"type": "project",
"parameters": {
"input": "attributes.input.value",
"output": "attributes.output.value",
"model": "attributes.gen_ai.model_name",
"trace_id": "traceId",
"span_id": "spanId"
}
},
{
"id": "extract",
"type": "extend",
"parameters": {
"session_id": "json_extract_scalar(attributes, '$.gen_ai.session.id')",
"span_kind": "json_extract_scalar(attributes, '$.gen_ai.span.kind')",
"question": "json_extract_scalar(attributes, '$.input.value')",
"output": "json_extract_scalar(attributes, '$.output.value')",
"model": "json_extract_scalar(attributes, '$.gen_ai.request.model')",
"tool_name": "json_extract_scalar(attributes, '$.gen_ai.tool.name')",
"input_tokens": "json_extract_scalar(attributes, '$.gen_ai.usage.input_tokens')",
"output_tokens": "json_extract_scalar(attributes, '$.gen_ai.usage.output_tokens')"
}
},
{
"id": "filter_events",
"type": "where",
"parameters": {
"filter": "span_kind IN ('AGENT','LLM','TOOL')"
}
},
{
"id": "assemble",
"type": "make-instance",
"parameters": {
"question": "first(question)",
"output": "last(output)",
"model": "any(model)",
"total_tokens": "sum(input_tokens)",
"tools": "array_distinct(tool_name)",
"tool_chain": "join(tool_name, ' → ')",
"by": "session_id,traceId"
}
},
{
"id": "filter_empty",
"type": "where",
"parameters": {
"filter": "question IS NOT NULL AND length(question) > 0"
}
},
{
"id": "exact_dedup",
"type": "dedup-exact",
"parameters": {
"field": "question"
}
},
{
"id": "fuzzy_dedup",
"type": "dedup-fuzzy",
"parameters": {
"field": "question",
"threshold": "3",
"global": true,
"workspace": "your-workspace",
"dataset": "your-dataset"
}
},
{
"id": "semantic_dedup",
"type": "dedup-semantic",
"parameters": {
"field": "question",
"threshold": "0.1",
"global": true,
"workspace": "your-workspace",
"dataset": "your-dataset"
}
},
{
"id": "cluster",
"type": "semantic-cluster",
"parameters": {
"field": "__dedup_emb",
"n": 100
}
},
{
"id": "sample_per_cluster",
"type": "sample",
"parameters": {
"n": 3,
"by": "__cluster_id"
}
},
{
"id": "evaluate",
"type": "llm-call",
"parameters": {
"prompt": "@eval/agent-quality.md",
"fields": "question,input,output",
"format": "json",
"as": "eval"
}
},
{
"id": "annotate",
"type": "llm-call",
"parameters": {
"prompt": "@anno/agent-label.md",
"fields": "question,output",
"format": "json",
"as": "anno"
}
},
{
"id": "synthesize",
"type": "llm-call",
"parameters": {
"prompt": "@synthetic/data-augment.md",
"fields": "question,input,output",
"format": "json",
"as": "synthetic"
}
},
{
"id": "stats",
"type": "doc-stats",
"parameters": {
"field": "question"
}
}
]
},
"sink": {
"type": "dataset",
"dataset": {
"workspace": "your-workspace",
"dataset": "agent_full_dataset"
}
},
"executePolicy": {
"mode": "scheduled",
"scheduled": {
"fromTime": 1735689600,
"interval": "15m"
}
}
}參數說明
各節點的參數配置說明如下。
project (欄位選取)
參數 | 說明 | 樣本值 |
| Agent 輸入內容欄位路徑。 |
|
| Agent 輸出內容欄位路徑。 |
|
| 模型名稱欄位路徑。 |
|
| Trace ID 欄位,用於關聯追蹤鏈路。 |
|
| Span ID 欄位,用於關聯追蹤鏈路。 |
|
關於 project 運算元的完整參數說明,請參見project。
extend(欄位提取)
使用 json_extract_scalar 函數從 OpenTelemetry 的 attributes JSON 中提取關鍵字段。
提取欄位 | 來源路徑 | 說明 |
|
| 會話 ID,用於彙總同一會話的多條 Span。 |
|
| Span 類型(AGENT、LLM、TOOL),用於後續過濾。 |
|
| 使用者提問內容。 |
|
| Agent 回答內容。 |
|
| 請求使用的模型名稱。 |
|
| 工具調用名稱。 |
|
| Token 用量統計。 |
關於 extend 運算元的完整參數說明,請參見extend。
where(條件過濾)
包含兩個 where 節點:
filter_events:過濾
span_kind IN ('AGENT','LLM','TOOL'),僅保留 Agent、LLM 和 Tool 類型的 Span。filter_empty:過濾
question IS NOT NULL AND length(question) > 0,排除空問題。
關於 where 運算元的完整參數說明,請參見where。
make-instance(會話彙總)
按 session_id 和 traceId 將同一會話的多條 Span 彙總為一條記錄。
輸出欄位 | 彙總函式 | 說明 |
|
| 取會話中第一條使用者提問。 |
|
| 取會話中最後一條 Agent 回答。 |
|
| 任取一條模型名稱。 |
|
| 累加所有 Span 的 Token 用量。 |
|
| 去重後的工具列表。 |
|
| 工具調用鏈路,按執行順序拼接。 |
關於 make-instance 運算元的完整參數說明,請參見make-instance。
三級去重
按由粗到細的順序依次執行三級去重,計算代價遞增但前置步驟已大幅削減資料量。
運算元 | 去重方式 | 關鍵參數 | 說明 |
精確去重 |
| 基於 SimHash 指紋匹配,去除完全相同的記錄。 | |
近似去重 |
| 基于海明距離匹配,去除高度相似的記錄。啟用全域模式,支援跨批次去重。 | |
語義去重 |
| 基於向量距離匹配,去除語義相近的記錄。啟用全域模式,支援跨批次去重。 |
全域去重(global=true)需要指定 workspace 和 dataset 參數。首次運行時無歷史資料對比,全域去重效果從第二次調度開始顯現。
語義聚類與採樣
semantic-cluster:將資料按語義相似性聚為 100 個簇。
field=__dedup_emb直接複用 dedup-semantic 階段產生的向量,無需重新計算 Embedding。sample:從每個簇中採樣 3 條記錄(
n=3 by __cluster_id),確保最終資料集的多樣性和代表性。
最終資料量 = 簇數 x 每簇採樣量。預設為 100 x 3 = 300 條。
關於語義聚類和採樣運算元的完整參數說明,請參見semantic-cluster和sample。
LLM 三輪處理
採樣後資料經過三次 llm-call 運算元處理,每次調用通過 Prompt 模板指定不同的處理任務。
調用 | 節點 ID | Prompt 模板 | 輸出別名 | 功能 |
第 1 次 |
|
|
| 多維度品質評估(需求理解、回答品質、格式規範等),輸出 JSON 格式的評分和理由。 |
第 2 次 |
|
|
| 結構化分類標註(意圖、複雜度、情境等),輸出 JSON 格式的多維分類和標籤。 |
第 3 次 |
|
|
| 多類型資料合成(改寫、雜訊、追問、對抗),輸出 JSON 格式的合成資料。 |
doc-stats(文檔統計)
對 question 欄位計算字元數、詞數、行數等基礎統計指標,結果寫入 __doc_stats 列。
關於 doc-stats 運算元的完整參數說明,請參見doc-stats。
運行結果
以 10,000 條原始日誌為例,各階段資料量變化如下:
步驟 | 運算元 | 資料量 | 列數變化 | 說明 |
1 | extend + where | 10,000 | 8 列 | 欄位提取 + 過濾指定 Span 類型。 |
2 | make-instance + where | 3,000 | 6 列 | 會話彙總 + 空值過濾。 |
3 | dedup-exact | 2,000 | +3 擴充列 | 精確去重。 |
4 | dedup-fuzzy(global) | 1,200 | 同上 | 近似去重 + 全域去重。 |
5 | dedup-semantic(global) | 800 | +2 擴充列 | 語義去重 + 全域去重。 |
6 | semantic-cluster | 800 | +1 擴充列 | 聚 100 簇。 |
7 | sample | 300 | 同上 | 每簇 3 條。 |
8 | llm-call(eval) | 300 | +1 列 eval | 品質評分。 |
9 | llm-call(anno) | 300 | +1 列 anno | 分類標註。 |
10 | llm-call(synth) | 300 | +1 列 synthetic | 資料合成。 |
11 | doc-stats | 300 | +1 列 __doc_stats | 文本統計。 |
10,000 條原始日誌經全流程處理後,最終產出 300 條多維度標註的 Dataset。
輸出 Dataset 的列結構
列名 | 來源運算元 | 說明 |
| project + extend + make-instance | 原始業務欄位和會話彙總欄位。 |
| dedup-exact / dedup-fuzzy | 去重特徵列。 |
| dedup-semantic | 語義去重向量和記錄標識。 |
| semantic-cluster | 語義聚類簇 ID。 |
| llm-call #1 | JSON 格式的多維度品質評分。 |
| llm-call #2 | JSON 格式的結構化分類標註。 |
| llm-call #3 | JSON 格式的合成資料。 |
| doc-stats | JSON 格式的文本統計指標。 |
定製建議
根據實際業務情境調整以下參數:
定製點 | 操作 |
欄位選取 | 修改 |
去重閾值 | 調整 |
全域去重目標 | 修改 |
採樣規模 | 調整 |
LLM 處理 | 修改 Prompt 模板調整評估維度、標註維度和合成類型。可為 |
精簡 Pipeline | 不需要某個功能時,直接刪除對應節點即可。例如:不需要資料合成,刪除 |
調度策略 | 修改 |
實踐原則
原則 | 說明 |
Schema 前置 |
|
先減後增 | 先去重 + 採樣(行數遞減),再 AI 處理(列數遞增)。LLM 調用成本高,務必在資料量降下來之後執行。 |
由粗到細 | 去重順序:精確 -> 近似 -> 語義,計算代價遞增但前置步驟已大幅削減資料量。 |
擴充列複用 |
|
全域去重 |
|
運算元原子性 | 每個運算元職責單一,通過管道組合實現複雜邏輯。 |
注意事項
使用本模板前,注意以下成本和效能相關事項:
LLM 調用成本:300 條 x 3 輪 = 900 次 LLM 調用。根據模型定價評估成本。
Pipeline 執行時間:LLM 調用為主要耗時,300 條資料約 10-30 分鐘。
全域去重首次運行:首批資料無歷史資料對比,全域去重效果從第二次調度開始顯現。
擴充列體積:
__dedup_emb(向量)體積較大,如不需要可在輸出 Dataset 中排除。調度間隔:
interval=15m表示每 15 分鐘執行一次,按資料量和成本預算調整。