Pipeline 是 AgentLoop 的資料處理流水線引擎。本文提供 Pipeline 的配置結構、運算元參數、擴充列、REST API 和使用限制等參考資訊。
什麼是 Pipeline
Pipeline 是 AgentLoop 的資料處理流水線引擎,將未經處理資料經過多級處理(欄位選取、資料群組裝、去重、採樣、AI 調用等),自動產出高品質 Dataset。定義資料來源、編排處理運算元並指定輸出目標後,Pipeline 按調度策略自動執行。
資料處理路徑
Pipeline 從 SLS LogStore 讀取原始日誌,經過多級處理後輸出高品質結構化資料集。
資料在 Pipeline 中的處理流程:
資料接入:從 SLS LogStore 中讀取原始日誌資料。
基礎處理:選取關鍵字段、計算衍生的資料行、過濾無效記錄。
資料群組裝:將離散的事件級日誌彙總為完整的樣本執行個體。
資料清洗:通過精確、近似、語義三級去重,消除冗餘資料。
資料採樣:基於語義聚類實現多樣性採樣,確保資料代表性。
AI 處理:調用大語言模型或數字員工,完成智能評估、標註與合成。
特徵計算:產生向量表示、統計文檔指標,豐富資料維度。
資料輸出:將處理完畢的高品質資料寫入 Dataset。
按需組合處理步驟,可以跳過不需要的環節,也可以多次使用同一類運算元,組成適合業務情境的資料管線。
核心能力
特點 | 說明 |
與 SLS LogStore 無縫整合 | 直接對接 SLS LogStore,無需資料匯入,支援查詢條件在源頭過濾資料範圍。 |
內建 LLM 和智能體調用能力 | 原生支援 |
單運算元表達能力強 | 每個運算元內建豐富的參數選項與函數(字串處理、JSON 提取、正則匹配、數學運算等),單步即可完成複雜邏輯。 |
多運算元靈活組合 | 13 個運算元自由編排,從簡單的欄位選取到完整的"去重 - 聚類 - 採樣 - AI 評估"全鏈路,按需增減節點即可。 |
三級去重與全域去重 | 精確、近似、語義三種策略逐級串聯;全域模式可跨批次對比歷史資料,杜絕增量入庫重複。 |
定時調度自動運轉 | 設定起始時間與執行間隔後自動運行,支援從每 15 分鐘高頻採集到每日批量處理。 |
運算元詳解
Pipeline 提供 13 個處理運算元,分為 6 大類。
運算元快速索引表
類別 | 運算元 | 說明 | 行數變化 |
基礎處理 |
| 從未經處理資料選取並重新命名欄位 | 不變(1:1) |
| 基於運算式計算新列或覆蓋已有列 | 不變(1:1) | |
| 按條件運算式過濾行 | 減少或不變 | |
資料群組裝 |
| 將離散事件按分組鍵彙總為行級樣本 | 多行合并為每組一行 |
資料清洗 |
| 完全相同的文本僅保留一條 | 減少或不變 |
| 高度相似的文本視為重複 | 減少或不變 | |
| 含義相同但表述不同的文本視為重複 | 減少或不變 | |
特徵計算 |
| 對文字欄位產生向量 | 不變(1:1) |
| 計算字元數、詞數、行數等指標 | 不變(1:1) | |
資料採樣 |
| 基於向量為資料分配簇 ID | 不變(1:1) |
| 按比例或固定條數抽樣 | 減少 | |
AI 處理 |
| 調用大語言模型進行評估、標註、合成等 | 不變(1:1) |
| 調用數字員工發起智能對話 | 不變(1:1) |
基礎處理
基礎處理運算元完成資料準備:選取欄位、計算衍生的資料行、過濾不合格記錄。通常作為 Pipeline 編排的起點。
project:欄位選取
從未經處理資料中選取並重新命名欄位,僅保留需要的欄位。
核心參數
參數 | 說明 | 樣本 |
| 原始欄位名。鍵為映射後名稱,值為原始列名。 |
|
行數變化:不變(1:1)。無擴充列。
extend:欄位擴充
基於運算式計算新列或覆蓋已有列,支援字串處理、數學運算、條件判斷、JSON 提取等豐富的內建函數。
核心參數
參數 | 說明 | 樣本 |
| 計算運算式。列名已存在則覆蓋,不存在則新增。 |
|
行數變化:不變(1:1)。無擴充列(直接輸出新列)。
where:篩選過濾
按條件運算式過濾行,僅保留滿足條件的記錄。支援比較運算、邏輯組合、模式比對等豐富的過濾條件。
核心參數
參數 | 說明 | 樣本 |
| 過濾條件運算式,結果須為布爾值。 |
|
行數變化:減少或不變。無擴充列。
資料群組裝
資料群組裝運算元將離散的事件級日誌彙總為完整的樣本執行個體。一次使用者會話通常由多條事件記錄組成,通過資料群組裝可合并為一行完整樣本。
make-instance:執行個體構建
將離散的事件級日誌按分組鍵彙總為行級樣本執行個體。例如將一次會話中的多條訊息合并為一行完整的對話樣本。
核心參數
參數 | 說明 | 樣本 |
| 分組鍵,逗號分隔。 |
|
| 彙總函式調用(不支援裸欄位名)。 |
|
行數變化:多行合并為每組一行。無擴充列。
資料清洗
資料清洗運算元消除冗餘記錄。Pipeline 提供三種粒度的去重策略,支援逐級串聯:先用精確去重快速削減資料量,再用近似去重和語義去重逐步收斂。
dedup-exact:精確去重
去除完全相同的文本記錄,每組保留文本最長的一條。計算代價最低,建議作為去重的第一步。
核心參數
參數 | 說明 | 樣本 |
| 去重依據的文字欄位。 |
|
| 是否開啟全域去重(跨批次與 Dataset 增量去重)。 |
|
行數變化:減少或不變。擴充列:__dedup_hash、__dedup_weight、__dedup_rnk。
dedup-fuzzy:近似去重
去除高度相似但不完全相同的文本記錄,每組保留文本最長的一條。通過 threshold 控制相似性判定的嚴格程度。
核心參數
參數 | 說明 | 樣本 |
| 去重依據的文字欄位。 |
|
| 相似性閾值(預設 |
|
| 是否開啟全域去重。 |
|
行數變化:減少或不變。擴充列:__dedup_hash、__dedup_weight、__dedup_rnk。
dedup-semantic:語義去重
去除含義相同但表述不同的文本記錄,通過向量距離衡量語義相似性,每組保留一條代表性記錄。
核心參數
參數 | 說明 | 樣本 |
| 去重依據的文字欄位。 |
|
| 向量距離閾值(0~1,預設 |
|
| Embedding 模型名稱。 |
|
| 是否開啟全域去重。 |
|
行數變化:減少或不變。擴充列:__dedup_emb(向量,可被下遊複用)、__dedup_rid。
特徵計算
特徵計算運算元為資料增加向量表示和統計指標等衍生特徵,可供下遊運算元使用(如語義聚類依賴向量輸入),也可作為 Dataset 資料列保留。
embedding:向量產生
對指定文字欄位產生 Embedding 向量,供下遊聚類、相似性計算等運算元使用。
核心參數
參數 | 說明 | 樣本 |
| 待向量化的文字欄位。 |
|
| Embedding 模型名稱(可選)。 |
|
| 輸出資料行名(預設 |
|
如果上遊已有 dedup-semantic,可直接使用其 __dedup_emb 擴充列,無需重複調用 embedding。
行數變化:不變(1:1)。擴充列:{as}(預設 {field}_embedding)。
doc-stats:文檔統計
計算文字欄位的文檔級統計指標,輸出包含字元數、詞數、行數的統計 JSON。
核心參數
參數 | 說明 | 樣本 |
| 待統計的文字欄位。 |
|
| 輸出資料行名(預設 |
|
輸出格式:JSON 對象,包含 doc_len_char(字元數)、doc_len_words(詞數)、line_counts(行數)三個數值欄位。
行數變化:不變(1:1)。擴充列:{as}(預設 __doc_stats,JSON 類型)。
資料採樣
資料採樣運算元在去重後進一步精選資料:通過語義聚類按主題分組,再從每個聚類中抽取代表性樣本,確保資料集精簡且語義多樣。
semantic-cluster:語義聚類
基於 Embedding 向量為資料分配簇 ID,是實現多樣性採樣的關鍵步驟。通常與 sample 運算元配合使用,每簇取樣保證語義多樣性。
核心參數
參數 | 說明 | 樣本 |
| Embedding 向量列名。 |
|
| 聚類簇數(正整數)。 |
|
行數變化:不變(1:1)。擴充列:__cluster_id(簇編號,從 0 開始)。
sample:隨機採樣
從資料中隨機採樣指定比例或數量的記錄。支援按分組列做分層採樣,常與 semantic-cluster 搭配實現"每簇取 N 條"的多樣性採樣。
核心參數
參數 | 說明 | 樣本 |
| 採樣率 (0, 1],與 |
|
| 採樣條數,與 |
|
| 分組列名(可選),多列逗號分隔。 |
|
行數變化:減少。無擴充列。
AI 處理
AI 處理運算元將大語言模型和智能體能力嵌入資料處理流程,對每行資料進行智能評估、品質標註、內容合成等操作。
llm-call:LLM 調用
調用大語言模型對每行資料進行智能處理,支援 Prompt 模板渲染、模型選擇和輸出格式解析。
核心參數
參數 | 說明 | 樣本 |
| Prompt 模板,支援 |
|
| 參與渲染的輸入列名,逗號分隔。 |
|
| 輸出格式: |
|
| LLM 模型標識(可選)。 |
|
| 輸出資料行名(預設 |
|
行數變化:不變(1:1)。擴充列:{as}(預設 __llm_result)。
agentic-call:智能體調用
調用數字員工對每行資料發起智能對話。與 llm-call 不同,agentic-call 調用使用者構建的數字員工,支援多步推理、知識庫檢索、工具調用等。
核心參數
參數 | 說明 | 樣本 |
| Prompt 模板,支援 |
|
| 參與渲染的輸入列名,逗號分隔。 |
|
| 數字員工名稱。 |
|
| 輸出資料行名(預設 |
|
行數變化:不變(1:1)。擴充列:{as}(預設 __agentic_result)。
編排
合理編排運算元順序,構建從簡單欄位處理到完整 AI 評估的資料鏈路。
編排原則
原則 | 說明 |
Schema 前置 | Pipeline 以 |
組裝優先 | 離散事件數目據先用 |
先減後增 | 先做去重和採樣(減少行數),再做 AI 處理(增加列數)。LLM 調用成本較高,務必在資料量降下來之後再執行。 |
由粗到細 | 去重順序:精確去重 - 近似去重 - 語義去重。計算代價遞增,但前置步驟已大幅削減資料量。 |
擴充列複用 | 上遊運算元產生的擴充列(如 |
運算元原子性 | 每個運算元職責單一——聚類只標註簇 ID,採樣只過濾行,AI 只增列。通過組合實現複雜邏輯。 |
常見編排組合
根據資料處理需求,選擇合適的運算元組合。
情境 | 編排鏈路 | 說明 |
簡單欄位提取 |
| 從未經處理資料中選取欄位並按條件過濾,適合資料量小、無需去重的情境。 |
基礎清洗入庫 |
| 選取欄位、計算衍生的資料行、過濾後做精確去重,適合結構化程度高的資料。 |
三級去重 |
| 逐級去重,由粗到細,計算代價遞增但資料量逐步遞減。 |
多樣性採樣 |
| 語義去重後複用其向量列做聚類,再按簇抽樣,保證資料多樣性。 |
AI 品質評估 |
| 採樣後串聯多次 LLM 調用,分別完成不同維度評估和標註。 |
會話級全鏈路 |
| 從事件彙總到 AI 評估的完整鏈路,適合對話類資料的全流程處理。 |
調度模式
Pipeline 支援單次執行和定時調度兩種模式。
單次執行
適用於一次性資料處理任務,手動觸發後處理完畢即結束。
定時調度
適用於持久性資料擷取和處理。設定起始時間和執行間隔後,Pipeline 自動按計劃運行。
調度參數
參數 | 說明 | 樣本 |
調度模式 | 當前支援定時調度( |
|
起始時間 | 調度開始的時間點(Unix 時間戳記,秒)。 |
|
執行間隔 | 兩次執行之間的時間間隔。 |
|
常見調度間隔參考
間隔 | 適用情境 |
| 高頻資料擷取,需要近即時處理的情境。 |
| 中等頻率,適合日常資料處理。 |
| 低頻處理,適合資料量不大或對即時性要求不高的情境。 |
| 每日批處理,適合日報層級的資料匯總。 |
每次執行時,Pipeline 自動處理上一次執行以來的增量資料。
API 與配置
Pipeline JSON 配置結構
完整的 Pipeline 配置由四個頂層模組組成:source(資料來源)、pipeline(處理管線)、sink(輸出目標)、executePolicy(調度策略)。
頂層欄位
欄位 | 類型 | 必填 | 說明 |
| String | 是 | Pipeline 名稱,專案內唯一。 |
| String | 否 | Pipeline 描述。 |
| Object | 是 | 資料來源配置。 |
| Object | 是 | 處理管線配置。 |
| Object | 是 | 輸出目標配置。 |
| Object | 是 | 調度策略配置。 |
source 配置
定義 Pipeline 從哪裡讀取未經處理資料。
參數 | 類型 | 必填 | 說明 |
| String | 是 | 資料來源類型,目前支援 |
| String | 是 | Project 名稱。 |
| String | 是 | LogStore 名稱。 |
| String | 否 | 查詢過濾條件。 |
pipeline.nodes 配置
每個運算元的通用結構:
參數 | 類型 | 必填 | 說明 |
| String | 是 | 運算元唯一標識。 |
| String | 是 | 運算元類型(見上方運算元詳解)。 |
| Object | 是 | 運算元參數(各類型不同)。 |
sink 配置
定義處理結果輸出到哪裡。
參數 | 類型 | 必填 | 說明 |
| String | 是 | 輸出類型,目前支援 |
| String | 是 | Dataset 所在工作空間。 |
| String | 是 | 目標 Dataset 名稱。 |
executePolicy 配置
定義 Pipeline 的調度執行策略。
參數 | 類型 | 必填 | 說明 |
| String | 是 | 調度模式,目前支援 |
| Integer | 是 | 調度起始時間(Unix 時間戳記,秒)。 |
| String | 是 | 調度間隔,如 |
配置樣本
以下樣本展示一個完整的 Pipeline 配置,從 LogStore 讀取資料,經過欄位選取、精確去重和 LLM 評估後輸出到 Dataset。
{
"name": "quality-eval-pipeline",
"description": "對話品質評估管線",
"source": {
"type": "LogStore",
"LogStore": {
"project": "my-sls-project",
"LogStore": "agent-logs",
"query": "* | WHERE level = 'INFO'"
}
},
"pipeline": {
"nodes": [
{
"id": "select-fields",
"type": "project",
"parameters": {
"question": "user_query",
"answer": "agent_response",
"session": "session_id"
}
},
{
"id": "remove-duplicates",
"type": "dedup-exact",
"parameters": {
"field": "question",
"global": false
}
},
{
"id": "eval-quality",
"type": "llm-call",
"parameters": {
"prompt": "評估以下對話的回答品質(1-5分):\n問題:{{question}}\n回答:{{answer}}",
"fields": "question,answer",
"format": "json",
"model": "qwen-plus",
"as": "quality_score"
}
}
]
},
"sink": {
"type": "dataset",
"dataset": {
"workspace": "my-workspace",
"dataset": "evaluated-conversations"
}
},
"executePolicy": {
"mode": "scheduled",
"scheduled": {
"fromTime": 1700000000,
"interval": "1h"
}
}
}REST API 參考
Pipeline 提供 REST API,通過 SDK 或 HTTP 要求管理 Pipeline 的完整生命週期。
Pipeline 類型:
類型 | 說明 |
| 資料集匯出,將 LogStore 資料經過處理後匯出到 Dataset。 |
| 資料加工,對資料進行轉換和處理。 |
CreatePipeline
建立一個新的 Pipeline。
POST /pipelines
請求參數
欄位 | 類型 | 必填 | 說明 |
| String | 是 | Pipeline 名稱,專案內唯一。 |
| String | 否 | Pipeline 描述。 |
| String | 是 |
|
| Object | 是 | 資料來源配置(含 Type、LogStore.Project、LogStore.LogStore、LogStore.RoleArn)。 |
| Object | 是 | Pipeline 配置參數。 |
| Object | 是 | 輸出目標配置(含 Type、Dataset.Project、Dataset.Dataset)。 |
要求的權限:CreatePipeline(acs:log:*:*:project/{project}/pipeline/{name})+ ram:PassRole。
請求樣本
POST /pipelines HTTP/1.1
{
"PipelineName": "quality-eval",
"PipelineType": "DATASET",
"Description": "對話品質評估管線",
"Source": {
"Type": "LogStore",
"LogStore": {
"Project": "my-sls-project",
"LogStore": "agent-logs"
}
},
"Configuration": { ... },
"Destination": {
"Type": "dataset",
"Dataset": {
"Project": "my-workspace",
"Dataset": "eval-results"
}
}
}GetPipeline
查詢指定 Pipeline 的詳細資料。
GET /pipelines/{name}
響應包含完整的 Pipeline 配置(與建立時結構一致),以及額外的時間戳記欄位:
欄位 | 說明 |
| 建立時間(Unix 時間戳記)。 |
| 最新動向時間(Unix 時間戳記)。 |
要求的權限:GetPipeline(acs:log:*:*:project/{project}/pipeline/{name})。
UpdatePipeline
更新指定 Pipeline 的配置。請求體結構與 CreatePipeline 一致,限制如下:
PipelineType不可修改。Source中的Project和LogStore不可修改(RoleArn 可更新)。
PUT /pipelines/{name}
要求的權限:UpdatePipeline(acs:log:*:*:project/{project}/pipeline/{name})+ ram:PassRole。
DeletePipeline
刪除指定 Pipeline。
DELETE /pipelines/{name}
要求的權限:DeletePipeline(acs:log:*:*:project/{project}/pipeline/{name})。
ListPipelines
查詢專案下的 Pipeline 列表,支援分頁和名稱過濾。
GET /pipelines?size=100&offset=0&name=keyword
查詢參數 | 說明 |
| 每頁條數(可選)。 |
| 分頁位移量(可選)。 |
| 按名稱模糊過濾(可選)。 |
響應包含 total(總數)、count(本頁條數)、pipelines(Pipeline 摘要列表,含 name、description、createTime、updateTime)。
要求的權限:ListPipelines(acs:log:*:*:project/{project}/pipeline/*)。
限制與配額
限制項 | 說明 |
Pipeline 數量上限 | 每個 Project 最多 200 個 Pipeline。 |
Pipeline 類型 | 建立後不可修改。 |
資料來源 | 建立後 Project 和 LogStore 不可修改。 |
使用限制
通用約束
約束項 | 說明 |
運算元順序 | 運算元數組按聲明順序執行,前一運算元的輸出作為後一運算元的輸入。 |
運算元 ID 唯一性 | 同一 Pipeline 內每個運算元的 |
欄位引用 | 運算元引用的欄位必須存在於上遊輸出中,否則運行時報錯。 |
NULL 值與類型要求
運算元 | NULL 值行為 | 欄位類型要求 |
| NULL 行不參與去重且不出現在輸出中。 | 文本類型 |
| NULL 行被過濾。 | 文本類型 |
| NULL 時輸出零值統計。 | 文本類型 |
| - | 向量類型 array(double) |
| 分組鍵為 NULL 的事件不參與分組。 | - |
| 可能返回不完整結果。 | - |
擴充列預覽
以下匯總所有運算元產生的擴充列:
擴充列 | 類型 | 來源運算元 | 說明 |
| bigint | dedup-exact、dedup-fuzzy | 文本指紋值。 |
| integer | dedup-exact、dedup-fuzzy | 文本長度(去重時保留最長文本)。 |
| integer | dedup-exact、dedup-fuzzy | 組內排名(去重後恒為 1)。 |
| array(double) | dedup-semantic | Embedding 向量(可被 semantic-cluster 複用)。 |
| bigint | dedup-semantic | 批內行標識。 |
| bigint | semantic-cluster | 聚類簇編號(從 0 開始)。 |
| varchar / json | llm-call | LLM 調用結果。 |
| varchar | agentic-call | 數字員工回複。 |
| array(double) | embedding | Embedding 向量。 |
| json | doc-stats | 文檔統計 JSON。 |
擴充列可被下遊運算元直接引用。典型複用:dedup-semantic 的 __dedup_emb 可作為 semantic-cluster 的 field 輸入。
全域去重
三種去重運算元均支援全域去重模式。
工作機制
設定 global=true 並指定 workspace 和 dataset 後,去重範圍擴充為當前批次與 Dataset 歷史資料的聯合比對:
當前批次資料先做批內去重。
去重後的結果與 Dataset 中已有資料做增量比對。
僅保留 Dataset 中不存在的"全新"記錄。
配置要求
參數 | 說明 |
| 設為 |
| 目標 Dataset 所在的工作空間。 |
| 目標 Dataset 名稱。 |
| 僅 |
適用情境
增量資料入庫:每批新資料需與歷史資料做去重,確保 Dataset 無重複。
推薦順序:精確去重(全域)- 近似去重(全域)- 語義去重(全域),逐級收斂。