全部產品
Search
文件中心

Cloud Monitor:Pipeline 參考

更新時間:May 01, 2026

Pipeline 是 AgentLoop 的資料處理流水線引擎。本文提供 Pipeline 的配置結構、運算元參數、擴充列、REST API 和使用限制等參考資訊。

什麼是 Pipeline

Pipeline 是 AgentLoop 的資料處理流水線引擎,將未經處理資料經過多級處理(欄位選取、資料群組裝、去重、採樣、AI 調用等),自動產出高品質 Dataset。定義資料來源、編排處理運算元並指定輸出目標後,Pipeline 按調度策略自動執行。

資料處理路徑

Pipeline 從 SLS LogStore 讀取原始日誌,經過多級處理後輸出高品質結構化資料集。

資料在 Pipeline 中的處理流程:

  1. 資料接入:從 SLS LogStore 中讀取原始日誌資料。

  2. 基礎處理:選取關鍵字段、計算衍生的資料行、過濾無效記錄。

  3. 資料群組裝:將離散的事件級日誌彙總為完整的樣本執行個體。

  4. 資料清洗:通過精確、近似、語義三級去重,消除冗餘資料。

  5. 資料採樣:基於語義聚類實現多樣性採樣,確保資料代表性。

  6. AI 處理:調用大語言模型或數字員工,完成智能評估、標註與合成。

  7. 特徵計算:產生向量表示、統計文檔指標,豐富資料維度。

  8. 資料輸出:將處理完畢的高品質資料寫入 Dataset。

按需組合處理步驟,可以跳過不需要的環節,也可以多次使用同一類運算元,組成適合業務情境的資料管線。

核心能力

特點

說明

與 SLS LogStore 無縫整合

直接對接 SLS LogStore,無需資料匯入,支援查詢條件在源頭過濾資料範圍。

內建 LLM 和智能體調用能力

原生支援 llm-callagentic-call,在流水線中直接完成品質評估、智能標註、內容合成等 AI 任務。

單運算元表達能力強

每個運算元內建豐富的參數選項與函數(字串處理、JSON 提取、正則匹配、數學運算等),單步即可完成複雜邏輯。

多運算元靈活組合

13 個運算元自由編排,從簡單的欄位選取到完整的"去重 - 聚類 - 採樣 - AI 評估"全鏈路,按需增減節點即可。

三級去重與全域去重

精確、近似、語義三種策略逐級串聯;全域模式可跨批次對比歷史資料,杜絕增量入庫重複。

定時調度自動運轉

設定起始時間與執行間隔後自動運行,支援從每 15 分鐘高頻採集到每日批量處理。

運算元詳解

Pipeline 提供 13 個處理運算元,分為 6 大類。

運算元快速索引表

類別

運算元

說明

行數變化

基礎處理

project

從未經處理資料選取並重新命名欄位

不變(1:1)

extend

基於運算式計算新列或覆蓋已有列

不變(1:1)

where

按條件運算式過濾行

減少或不變

資料群組裝

make-instance

將離散事件按分組鍵彙總為行級樣本

多行合并為每組一行

資料清洗

dedup-exact

完全相同的文本僅保留一條

減少或不變

dedup-fuzzy

高度相似的文本視為重複

減少或不變

dedup-semantic

含義相同但表述不同的文本視為重複

減少或不變

特徵計算

embedding

對文字欄位產生向量

不變(1:1)

doc-stats

計算字元數、詞數、行數等指標

不變(1:1)

資料採樣

semantic-cluster

基於向量為資料分配簇 ID

不變(1:1)

sample

按比例或固定條數抽樣

減少

AI 處理

llm-call

調用大語言模型進行評估、標註、合成等

不變(1:1)

agentic-call

調用數字員工發起智能對話

不變(1:1)

基礎處理

基礎處理運算元完成資料準備:選取欄位、計算衍生的資料行、過濾不合格記錄。通常作為 Pipeline 編排的起點。

project:欄位選取

從未經處理資料中選取並重新命名欄位,僅保留需要的欄位。

核心參數

參數

說明

樣本

{新欄位名}

原始欄位名。鍵為映射後名稱,值為原始列名。

"question": "user_query"

行數變化:不變(1:1)。無擴充列。

extend:欄位擴充

基於運算式計算新列或覆蓋已有列,支援字串處理、數學運算、條件判斷、JSON 提取等豐富的內建函數。

核心參數

參數

說明

樣本

{欄位名}

計算運算式。列名已存在則覆蓋,不存在則新增。

"q_len": "length(question)"

行數變化:不變(1:1)。無擴充列(直接輸出新列)。

where:篩選過濾

按條件運算式過濾行,僅保留滿足條件的記錄。支援比較運算、邏輯組合、模式比對等豐富的過濾條件。

核心參數

參數

說明

樣本

filter

過濾條件運算式,結果須為布爾值。

"length(question) > 10 AND output IS NOT NULL"

行數變化:減少或不變。無擴充列。

資料群組裝

資料群組裝運算元將離散的事件級日誌彙總為完整的樣本執行個體。一次使用者會話通常由多條事件記錄組成,通過資料群組裝可合并為一行完整樣本。

make-instance:執行個體構建

將離散的事件級日誌按分組鍵彙總為行級樣本執行個體。例如將一次會話中的多條訊息合并為一行完整的對話樣本。

核心參數

參數

說明

樣本

by

分組鍵,逗號分隔。

"session_id,trace_id"

{輸出資料行}

彙總函式調用(不支援裸欄位名)。

"question": "first(question)"

行數變化:多行合并為每組一行。無擴充列。

資料清洗

資料清洗運算元消除冗餘記錄。Pipeline 提供三種粒度的去重策略,支援逐級串聯:先用精確去重快速削減資料量,再用近似去重和語義去重逐步收斂。

dedup-exact:精確去重

去除完全相同的文本記錄,每組保留文本最長的一條。計算代價最低,建議作為去重的第一步。

核心參數

參數

說明

樣本

field

去重依據的文字欄位。

"question"

global

是否開啟全域去重(跨批次與 Dataset 增量去重)。

true

行數變化:減少或不變。擴充列:__dedup_hash__dedup_weight__dedup_rnk

dedup-fuzzy:近似去重

去除高度相似但不完全相同的文本記錄,每組保留文本最長的一條。通過 threshold 控制相似性判定的嚴格程度。

核心參數

參數

說明

樣本

field

去重依據的文字欄位。

"question"

threshold

相似性閾值(預設 "3"),越小越嚴格。

"3"

global

是否開啟全域去重。

true

行數變化:減少或不變。擴充列:__dedup_hash__dedup_weight__dedup_rnk

dedup-semantic:語義去重

去除含義相同但表述不同的文本記錄,通過向量距離衡量語義相似性,每組保留一條代表性記錄。

核心參數

參數

說明

樣本

field

去重依據的文字欄位。

"question"

threshold

向量距離閾值(0~1,預設 "0.1"),越小越嚴格。

"0.1"

model

Embedding 模型名稱。

"sls-multilang-600m-i32k-o1024"

global

是否開啟全域去重。

true

行數變化:減少或不變。擴充列:__dedup_emb(向量,可被下遊複用)、__dedup_rid

特徵計算

特徵計算運算元為資料增加向量表示和統計指標等衍生特徵,可供下遊運算元使用(如語義聚類依賴向量輸入),也可作為 Dataset 資料列保留。

embedding:向量產生

對指定文字欄位產生 Embedding 向量,供下遊聚類、相似性計算等運算元使用。

核心參數

參數

說明

樣本

field

待向量化的文字欄位。

"question"

model

Embedding 模型名稱(可選)。

"sls-multilang-600m-i32k-o1024"

as

輸出資料行名(預設 {field}_embedding)。

"q_emb"

說明

如果上遊已有 dedup-semantic,可直接使用其 __dedup_emb 擴充列,無需重複調用 embedding

行數變化:不變(1:1)。擴充列:{as}(預設 {field}_embedding)。

doc-stats:文檔統計

計算文字欄位的文檔級統計指標,輸出包含字元數、詞數、行數的統計 JSON。

核心參數

參數

說明

樣本

field

待統計的文字欄位。

"question"

as

輸出資料行名(預設 "__doc_stats")。

"stats"

輸出格式:JSON 對象,包含 doc_len_char(字元數)、doc_len_words(詞數)、line_counts(行數)三個數值欄位。

行數變化:不變(1:1)。擴充列:{as}(預設 __doc_stats,JSON 類型)。

資料採樣

資料採樣運算元在去重後進一步精選資料:通過語義聚類按主題分組,再從每個聚類中抽取代表性樣本,確保資料集精簡且語義多樣。

semantic-cluster:語義聚類

基於 Embedding 向量為資料分配簇 ID,是實現多樣性採樣的關鍵步驟。通常與 sample 運算元配合使用,每簇取樣保證語義多樣性。

核心參數

參數

說明

樣本

field

Embedding 向量列名。

"__dedup_emb"

n

聚類簇數(正整數)。

100

行數變化:不變(1:1)。擴充列:__cluster_id(簇編號,從 0 開始)。

sample:隨機採樣

從資料中隨機採樣指定比例或數量的記錄。支援按分組列做分層採樣,常與 semantic-cluster 搭配實現"每簇取 N 條"的多樣性採樣。

核心參數

參數

說明

樣本

ratio

採樣率 (0, 1],與 n 互斥。

0.1

n

採樣條數,與 ratio 互斥。

1

by

分組列名(可選),多列逗號分隔。

"__cluster_id"

行數變化:減少。無擴充列。

AI 處理

AI 處理運算元將大語言模型和智能體能力嵌入資料處理流程,對每行資料進行智能評估、品質標註、內容合成等操作。

llm-call:LLM 調用

調用大語言模型對每行資料進行智能處理,支援 Prompt 模板渲染、模型選擇和輸出格式解析。

核心參數

參數

說明

樣本

prompt

Prompt 模板,支援 {{列名}} 預留位置或 @<path> 引用。

"@eval/prompt.md"

fields

參與渲染的輸入列名,逗號分隔。

"question,output"

format

輸出格式:raw(預設)或 json

"json"

model

LLM 模型標識(可選)。

"qwen-plus"

as

輸出資料行名(預設 "__llm_result")。

"eval"

行數變化:不變(1:1)。擴充列:{as}(預設 __llm_result)。

agentic-call:智能體調用

調用數字員工對每行資料發起智能對話。與 llm-call 不同,agentic-call 調用使用者構建的數字員工,支援多步推理、知識庫檢索、工具調用等。

核心參數

參數

說明

樣本

prompt

Prompt 模板,支援 {{列名}} 預留位置。

"請分析 {{host_name}} 的異常"

fields

參與渲染的輸入列名,逗號分隔。

"host_name,metric_name"

employee

數字員工名稱。

"skill_bench_analysis"

as

輸出資料行名(預設 "__agentic_result")。

"analysis"

行數變化:不變(1:1)。擴充列:{as}(預設 __agentic_result)。

編排

合理編排運算元順序,構建從簡單欄位處理到完整 AI 評估的資料鏈路。

編排原則

原則

說明

Schema 前置

Pipeline 以 project 選取欄位開頭,聲明統一欄位名,確保下遊所有運算元使用一致的欄位命名。

組裝優先

離散事件數目據先用 make-instance 彙總為樣本級,再進入後續處理。

先減後增

先做去重和採樣(減少行數),再做 AI 處理(增加列數)。LLM 調用成本較高,務必在資料量降下來之後再執行。

由粗到細

去重順序:精確去重 - 近似去重 - 語義去重。計算代價遞增,但前置步驟已大幅削減資料量。

擴充列複用

上遊運算元產生的擴充列(如 __dedup_emb__cluster_id)可被下遊直接引用,無需重複計算。

運算元原子性

每個運算元職責單一——聚類只標註簇 ID,採樣只過濾行,AI 只增列。通過組合實現複雜邏輯。

常見編排組合

根據資料處理需求,選擇合適的運算元組合。

情境

編排鏈路

說明

簡單欄位提取

project - where

從未經處理資料中選取欄位並按條件過濾,適合資料量小、無需去重的情境。

基礎清洗入庫

project - extend - where - dedup-exact

選取欄位、計算衍生的資料行、過濾後做精確去重,適合結構化程度高的資料。

三級去重

project - dedup-exact - dedup-fuzzy - dedup-semantic

逐級去重,由粗到細,計算代價遞增但資料量逐步遞減。

多樣性採樣

dedup-semantic - semantic-cluster - sample

語義去重後複用其向量列做聚類,再按簇抽樣,保證資料多樣性。

AI 品質評估

sample - llm-call(評估)- llm-call(標註)

採樣後串聯多次 LLM 調用,分別完成不同維度評估和標註。

會話級全鏈路

project - make-instance - dedup-exact - dedup-fuzzy - dedup-semantic - semantic-cluster - sample - llm-call

從事件彙總到 AI 評估的完整鏈路,適合對話類資料的全流程處理。

調度模式

Pipeline 支援單次執行和定時調度兩種模式。

單次執行

適用於一次性資料處理任務,手動觸發後處理完畢即結束。

定時調度

適用於持久性資料擷取和處理。設定起始時間和執行間隔後,Pipeline 自動按計劃運行。

調度參數

參數

說明

樣本

調度模式

當前支援定時調度(scheduled)。

"scheduled"

起始時間

調度開始的時間點(Unix 時間戳記,秒)。

1700000000

執行間隔

兩次執行之間的時間間隔。

"15m""1h""1d"

常見調度間隔參考

間隔

適用情境

15m

高頻資料擷取,需要近即時處理的情境。

1h

中等頻率,適合日常資料處理。

6h

低頻處理,適合資料量不大或對即時性要求不高的情境。

1d

每日批處理,適合日報層級的資料匯總。

每次執行時,Pipeline 自動處理上一次執行以來的增量資料。

API 與配置

Pipeline JSON 配置結構

完整的 Pipeline 配置由四個頂層模組組成:source(資料來源)、pipeline(處理管線)、sink(輸出目標)、executePolicy(調度策略)。

頂層欄位

欄位

類型

必填

說明

name

String

Pipeline 名稱,專案內唯一。

description

String

Pipeline 描述。

source

Object

資料來源配置。

pipeline

Object

處理管線配置。

sink

Object

輸出目標配置。

executePolicy

Object

調度策略配置。

source 配置

定義 Pipeline 從哪裡讀取未經處理資料。

參數

類型

必填

說明

type

String

資料來源類型,目前支援 "LogStore"

LogStore.project

String

Project 名稱。

LogStore.LogStore

String

LogStore 名稱。

LogStore.query

String

查詢過濾條件。

pipeline.nodes 配置

每個運算元的通用結構:

參數

類型

必填

說明

id

String

運算元唯一標識。

type

String

運算元類型(見上方運算元詳解)。

parameters

Object

運算元參數(各類型不同)。

sink 配置

定義處理結果輸出到哪裡。

參數

類型

必填

說明

type

String

輸出類型,目前支援 "dataset"

dataset.workspace

String

Dataset 所在工作空間。

dataset.dataset

String

目標 Dataset 名稱。

executePolicy 配置

定義 Pipeline 的調度執行策略。

參數

類型

必填

說明

mode

String

調度模式,目前支援 "scheduled"

scheduled.fromTime

Integer

調度起始時間(Unix 時間戳記,秒)。

scheduled.interval

String

調度間隔,如 "15m""1h""1d"

配置樣本

以下樣本展示一個完整的 Pipeline 配置,從 LogStore 讀取資料,經過欄位選取、精確去重和 LLM 評估後輸出到 Dataset。

{
  "name": "quality-eval-pipeline",
  "description": "對話品質評估管線",
  "source": {
    "type": "LogStore",
    "LogStore": {
      "project": "my-sls-project",
      "LogStore": "agent-logs",
      "query": "* | WHERE level = 'INFO'"
    }
  },
  "pipeline": {
    "nodes": [
      {
        "id": "select-fields",
        "type": "project",
        "parameters": {
          "question": "user_query",
          "answer": "agent_response",
          "session": "session_id"
        }
      },
      {
        "id": "remove-duplicates",
        "type": "dedup-exact",
        "parameters": {
          "field": "question",
          "global": false
        }
      },
      {
        "id": "eval-quality",
        "type": "llm-call",
        "parameters": {
          "prompt": "評估以下對話的回答品質(1-5分):\n問題:{{question}}\n回答:{{answer}}",
          "fields": "question,answer",
          "format": "json",
          "model": "qwen-plus",
          "as": "quality_score"
        }
      }
    ]
  },
  "sink": {
    "type": "dataset",
    "dataset": {
      "workspace": "my-workspace",
      "dataset": "evaluated-conversations"
    }
  },
  "executePolicy": {
    "mode": "scheduled",
    "scheduled": {
      "fromTime": 1700000000,
      "interval": "1h"
    }
  }
}

REST API 參考

Pipeline 提供 REST API,通過 SDK 或 HTTP 要求管理 Pipeline 的完整生命週期。

Pipeline 類型:

類型

說明

DATASET

資料集匯出,將 LogStore 資料經過處理後匯出到 Dataset。

ETL

資料加工,對資料進行轉換和處理。

CreatePipeline

建立一個新的 Pipeline。

POST /pipelines

請求參數

欄位

類型

必填

說明

PipelineName

String

Pipeline 名稱,專案內唯一。

Description

String

Pipeline 描述。

PipelineType

String

DATASET(資料集匯出)或 ETL(資料加工)。

Source

Object

資料來源配置(含 Type、LogStore.Project、LogStore.LogStore、LogStore.RoleArn)。

Configuration

Object

Pipeline 配置參數。

Destination

Object

輸出目標配置(含 Type、Dataset.Project、Dataset.Dataset)。

要求的權限CreatePipelineacs:log:*:*:project/{project}/pipeline/{name})+ ram:PassRole

請求樣本

POST /pipelines HTTP/1.1

{
  "PipelineName": "quality-eval",
  "PipelineType": "DATASET",
  "Description": "對話品質評估管線",
  "Source": {
    "Type": "LogStore",
    "LogStore": {
      "Project": "my-sls-project",
      "LogStore": "agent-logs"
    }
  },
  "Configuration": { ... },
  "Destination": {
    "Type": "dataset",
    "Dataset": {
      "Project": "my-workspace",
      "Dataset": "eval-results"
    }
  }
}

GetPipeline

查詢指定 Pipeline 的詳細資料。

GET /pipelines/{name}

響應包含完整的 Pipeline 配置(與建立時結構一致),以及額外的時間戳記欄位:

欄位

說明

CreateTime

建立時間(Unix 時間戳記)。

UpdateTime

最新動向時間(Unix 時間戳記)。

要求的權限GetPipelineacs:log:*:*:project/{project}/pipeline/{name})。

UpdatePipeline

更新指定 Pipeline 的配置。請求體結構與 CreatePipeline 一致,限制如下:

  • PipelineType 不可修改。

  • Source 中的 ProjectLogStore 不可修改(RoleArn 可更新)。

PUT /pipelines/{name}

要求的權限UpdatePipelineacs:log:*:*:project/{project}/pipeline/{name})+ ram:PassRole

DeletePipeline

刪除指定 Pipeline。

DELETE /pipelines/{name}

要求的權限DeletePipelineacs:log:*:*:project/{project}/pipeline/{name})。

ListPipelines

查詢專案下的 Pipeline 列表,支援分頁和名稱過濾。

GET /pipelines?size=100&offset=0&name=keyword

查詢參數

說明

size

每頁條數(可選)。

offset

分頁位移量(可選)。

name

按名稱模糊過濾(可選)。

響應包含 total(總數)、count(本頁條數)、pipelines(Pipeline 摘要列表,含 name、description、createTime、updateTime)。

要求的權限ListPipelinesacs:log:*:*:project/{project}/pipeline/*)。

限制與配額

限制項

說明

Pipeline 數量上限

每個 Project 最多 200 個 Pipeline。

Pipeline 類型

建立後不可修改。

資料來源

建立後 Project 和 LogStore 不可修改。

使用限制

通用約束

約束項

說明

運算元順序

運算元數組按聲明順序執行,前一運算元的輸出作為後一運算元的輸入。

運算元 ID 唯一性

同一 Pipeline 內每個運算元的 id 必須唯一。

欄位引用

運算元引用的欄位必須存在於上遊輸出中,否則運行時報錯。

NULL 值與類型要求

運算元

NULL 值行為

欄位類型要求

dedup-exact / dedup-fuzzy / dedup-semantic

NULL 行不參與去重且不出現在輸出中。

文本類型

embedding

NULL 行被過濾。

文本類型

doc-stats

NULL 時輸出零值統計。

文本類型

semantic-cluster

-

向量類型 array(double)

make-instance

分組鍵為 NULL 的事件不參與分組。

-

llm-call / agentic-call

可能返回不完整結果。

-

擴充列預覽

以下匯總所有運算元產生的擴充列:

擴充列

類型

來源運算元

說明

__dedup_hash

bigint

dedup-exact、dedup-fuzzy

文本指紋值。

__dedup_weight

integer

dedup-exact、dedup-fuzzy

文本長度(去重時保留最長文本)。

__dedup_rnk

integer

dedup-exact、dedup-fuzzy

組內排名(去重後恒為 1)。

__dedup_emb

array(double)

dedup-semantic

Embedding 向量(可被 semantic-cluster 複用)。

__dedup_rid

bigint

dedup-semantic

批內行標識。

__cluster_id

bigint

semantic-cluster

聚類簇編號(從 0 開始)。

{as} / __llm_result

varchar / json

llm-call

LLM 調用結果。

{as} / __agentic_result

varchar

agentic-call

數字員工回複。

{as} / {field}_embedding

array(double)

embedding

Embedding 向量。

{as} / __doc_stats

json

doc-stats

文檔統計 JSON。

說明

擴充列可被下遊運算元直接引用。典型複用:dedup-semantic__dedup_emb 可作為 semantic-clusterfield 輸入。

全域去重

三種去重運算元均支援全域去重模式。

工作機制

設定 global=true 並指定 workspacedataset 後,去重範圍擴充為當前批次與 Dataset 歷史資料的聯合比對:

  1. 當前批次資料先做批內去重。

  2. 去重後的結果與 Dataset 中已有資料做增量比對。

  3. 僅保留 Dataset 中不存在的"全新"記錄。

配置要求

參數

說明

global

設為 true

workspace

目標 Dataset 所在的工作空間。

dataset

目標 Dataset 名稱。

column_name

dedup-semantic 可選,指定 Dataset 中用於語義比對的列名。

適用情境

  • 增量資料入庫:每批新資料需與歷史資料做去重,確保 Dataset 無重複。

  • 推薦順序:精確去重(全域)- 近似去重(全域)- 語義去重(全域),逐級收斂。