Pipeline 通过节点组合实现数据处理流程。本文介绍所有可用节点的功能及典型编排方式。
节点列表
基础处理
节点 | 功能 |
字段选取:从原始数据中选取并重命名字段,声明 Pipeline 输入 Schema。 | |
字段扩展:基于表达式计算新列或覆盖已有列,支持所有内建函数。 | |
条件过滤:按表达式筛选行,仅保留符合条件的数据。 |
数据组装
节点 | 功能 |
实例构建:按分组键将离散事件聚合为行级样本宽表(纯 CPU 计算)。 |
数据清洗
节点 | 功能 |
精确去重:完全相同的文本记录仅保留一条。 | |
近似去重:字面差异极小的文本记录视为重复,仅保留一条。 | |
语义去重:表述不同但含义相同的文本记录视为重复,仅保留一条。执行时自动生成向量扩展列 |
向量与聚类
节点 | 功能 |
向量生成:对文本字段生成 Embedding 向量,结果写入扩展列(如 | |
语义聚类:基于 Embedding 向量为数据分配簇 ID,结果写入扩展列 |
统计与采样
节点 | 功能 |
文档统计:计算字符数、词数、行数等文本指标。 | |
随机采样:按比例或固定条数抽样,支持按分组键分层采样。 |
AI 增强
节点 | 功能 |
LLM 调用:模板渲染 + 模型推理 + 输出解析,适用于评估、标注、合成等场景。 | |
智能体调用:调用数字员工发起智能对话,适用于 SOP 分析、知识问答、数据洞察等场景。 |
Pipeline 编排指南
数据流全景
Pipeline 的数据流遵循"由粗到细、先减后增"的原则,各类节点按以下顺序处理数据:
基础处理:通过 project / extend / where 完成字段选取、扩展和过滤。
数据组装:通过 make-instance 将离散事件聚合为行级样本(多行合并为一行)。此步骤之后,每行代表一条完整样本。
数据清洗:通过 dedup-exact / dedup-fuzzy / dedup-semantic 逐级去重,数据量递减。
数据采样:通过 semantic-cluster 聚类后,由 sample 按簇分组抽样,进一步降低数据量。
AI 处理:通过llm-call / agentic-call 执行评估、标注、合成等 AI 任务,以及通过 doc-stats 计算统计指标。行数不变,仅增加列。
编排原则
原则 | 说明 |
Schema 前置 | Pipeline 以 |
组装优先 | 离散事件数据先用 |
先减后增 | 先做去重和采样(减少行数),再做 AI 处理(增加列数)。LLM 调用成本高,应在数据量降低后执行。 |
由粗到细 | 去重顺序:精确 - 近似 - 语义。计算代价递增,但前置步骤已大幅削减数据量。 |
扩展列复用 | 上游节点产生的扩展列(如 |
节点原子性 | 每个节点职责单一。聚类只标注簇 ID,采样只过滤行,AI 只增列。通过 Pipeline 组合实现复杂逻辑。 |
使用示例
字段选取 + 扩展 + 过滤
基础节点组合:从原始数据中选取字段,基于字符串处理、JSON 解析、正则表达式扩展字段,再按条件过滤。
{
"nodes": [
{
"id": "n1", "type": "project",
"parameters": { "question": "a", "input": "b", "output": "c" }
},
{
"id": "n2", "type": "extend",
"parameters": { "question": "regexp_extract(question, '用户提问:(.*)', 1)" }
},
{
"id": "n3", "type": "where",
"parameters": { "filter": "length(question) > 10" }
}
]
}三级去重(精确 - 近似 - 语义)
逐级去重,计算代价递增,数据量递减。精确去重在前可大幅降低后续节点的计算开销。
{
"nodes": [
{ "id": "n1", "type": "project", "parameters": { "question": "a", "input": "b", "output": "c" } },
{ "id": "n2", "type": "dedup-exact", "parameters": { "field": "question" } },
{ "id": "n3", "type": "dedup-fuzzy", "parameters": { "field": "question", "threshold": "3" } },
{ "id": "n4", "type": "dedup-semantic", "parameters": { "field": "question", "threshold": "0.1" } }
]
}参数说明:
threshold:去重相似度阈值。dedup-fuzzy的 threshold 为编辑距离(整数,建议值 1~5,值越大容忍的字面差异越多);dedup-semantic的 threshold 为语义距离(0~1 之间的浮点数,值越小表示越严格,建议值 0.05~0.2)。
多样性采样(去重 + 聚类 + 分组采样)
语义去重后利用生成的向量列做聚类,每簇取样保证语义多样性。
{
"nodes": [
{ "id": "n1", "type": "project", "parameters": { "question": "a", "input": "b", "output": "c" } },
{ "id": "n2", "type": "dedup-exact", "parameters": { "field": "question" } },
{ "id": "n3", "type": "dedup-fuzzy", "parameters": { "field": "question", "threshold": "3" } },
{ "id": "n4", "type": "dedup-semantic", "parameters": { "field": "question", "threshold": "0.1" } },
{ "id": "n5", "type": "semantic-cluster", "parameters": { "field": "__dedup_emb", "n": 100 } },
{ "id": "n6", "type": "sample", "parameters": { "by": "__cluster_id", "n": 1 } }
]
}完整 AI Pipeline(去重 + 采样 + 评估 + 标注)
将去重、聚类采样与 LLM 评估和标注串联,构建端到端数据处理流程。
{
"nodes": [
{ "id": "n1", "type": "project", "parameters": { "question": "a", "input": "b", "output": "c" } },
{ "id": "n2", "type": "dedup-exact", "parameters": { "field": "question" } },
{ "id": "n3", "type": "dedup-fuzzy", "parameters": { "field": "question", "threshold": "3", "global": true, "workspace": "my-ws", "dataset": "my-ds" } },
{ "id": "n4", "type": "dedup-semantic", "parameters": { "field": "question", "threshold": "0.1", "global": true, "workspace": "my-ws", "dataset": "my-ds" } },
{ "id": "n5", "type": "semantic-cluster", "parameters": { "field": "__dedup_emb", "n": 100 } },
{ "id": "n6", "type": "sample", "parameters": { "by": "__cluster_id", "n": 3 } },
{ "id": "n7", "type": "llm-call", "parameters": { "prompt": "@eval/prompt.md", "fields": "question,input,output", "format": "json", "as": "eval" } },
{ "id": "n8", "type": "llm-call", "parameters": { "prompt": "@anno/prompt.md", "fields": "output", "format": "json", "model": "qwen-plus", "as": "anno" } }
]
}参数说明:
global:设为true时,节点在全局范围内执行去重(跨多次 Pipeline 运行累积判断),而非仅在当前批次内去重。适用于增量数据场景。workspace:全局去重使用的工作空间名称。在 CMS 控制台的工作空间管理页面获取。dataset:全局去重使用的数据集名称。在对应工作空间的数据集列表中获取。threshold:去重相似度阈值。取值说明参见上方"三级去重"示例。
常见问题
节点配置的字段名与 Logstore 实际字段不匹配
错误现象:Pipeline 执行后输出数据为空或字段值全部为 null。
可能原因:project 节点中配置的字段名与 Logstore 中的实际字段名不一致(区分大小写)。
排查步骤:
在日志服务控制台查看目标 Logstore 的索引配置,确认字段名的大小写和拼写。
检查
project节点的parameters中,键名(目标字段名)和值(源字段名)是否与 Logstore 实际字段完全匹配。修正字段名后重新运行 Pipeline。
threshold 取值不当导致去重效果异常
错误现象:去重后数据量几乎不变(阈值过严格)或大量数据被误删(阈值过宽松)。
可能原因:dedup-fuzzy 和 dedup-semantic 使用了不合适的 threshold 值。
排查步骤:
确认
dedup-fuzzy的 threshold 为编辑距离(整数),建议从 3 开始调整。值越大,容忍的字面差异越多。确认
dedup-semantic的 threshold 为语义距离(0~1 之间的浮点数),建议从 0.1 开始调整。值越小越严格。先使用小批量数据测试去重效果,确认 threshold 合理后再处理完整数据集。
节点顺序错误导致执行失败
错误现象:Pipeline 执行报错,提示字段不存在或类型不匹配。
可能原因:节点编排顺序不符合数据流依赖关系。例如,在 project 之前使用了引用字段的节点,或在 embedding 之前使用了 semantic-cluster。
排查步骤:
检查报错信息中提示的缺失字段,确认该字段由哪个上游节点产生。
参照本文"数据流全景"章节的处理顺序,调整节点顺序使其满足依赖关系:基础处理 > 数据组装 > 数据清洗 > 数据采样 > AI 处理。
特别注意扩展列依赖:
semantic-cluster依赖__dedup_emb(由dedup-semantic或embedding生成),sample按__cluster_id分组时需先执行semantic-cluster。