本模板基于仿真数据(Mock Data)模拟 AI Agent 日志,提供 4 个由浅入深的 Pipeline 演示场景,用于快速上手 Pipeline 的核心能力。
仿真数据模板简介
本模板使用仿真数据模拟 4 种 AI Agent 场景(安全审计、问数助手、日志分析、智能客服),用于 Pipeline 功能演示和算子验证。
与模板总览中的其他模板基于 OT-AI Trace(嵌套 JSON attributes)不同,本模板使用扁平 SLS 事件模型:22 个顶层 text 字段、无 JSON 嵌套,project 映射零成本,适合新用户快速上手。
适用人群
新用户:通过 Demo 1~2 快速理解 Pipeline 基本能力。
算法工程师:通过 Demo 3 评估 Agent 输出质量。
数据平台团队:通过 Demo 4 了解全流程编排模式。
前提条件
在使用本模板前,请确认已完成以下准备工作:
已开通日志服务 SLS。如果尚未开通,请参见Pipeline 概述了解服务开通方式。
已创建目标 Project 和 Logstore,用于存储仿真数据。本模板使用的 Project 为
ali-pub-cn-hangzhou-staging-sls-admin,Logstore 为ai_test。已开通 Pipeline 功能。Pipeline 功能需要在 SLS 控制台中单独开通,详情请参见Pipeline 概述。
(可选,仅 Demo 4 API 配置需要)已创建 Dataset Workspace。在 Demo 4 的 API 配置中,
sink.dataset.workspace参数需要填写您的 Workspace 名称。您可以在 SLS 控制台的数据集管理页面查看或创建 Workspace。
数据源说明
目标资源
资源 | 值 |
SLS Project |
|
SLS Logstore |
|
与 OT-AI Trace 格式的差异
本模板的仿真数据与其他模板使用的 OT-AI Trace 数据在格式上存在以下差异:
维度 | OT-AI Trace | Mock 仿真数据(本模板) |
数据格式 | OpenTelemetry Span(嵌套 JSON attributes) | 扁平 SLS 事件(22 个顶层 text 字段) |
字段引用 |
| 直接引用: |
聚合键 |
|
|
事件区分 |
|
|
字段可用性矩阵
22 个字段在不同 event_type 下的取值情况如下表所示。
字段 | user_query | system_prompt | tool_call | tool_result | assistant_content | completion |
question | 有值(用户提问) | - | - | - | - | - |
output | - | - | - | 有值(工具结果) | 有值(Agent 回答) | - |
model | - | 有值 | 有值 | - | 有值 | 有值 |
tool_name | - | - | 有值 | 有值 | - | - |
latency_ms | - | - | - | 有值(工具耗时) | - | 有值(总耗时) |
agent_name | 有值 | 有值 | 有值 | 有值 | 有值 | 有值 |
所有字段类型均为 text。空值为空字符串 "" 而非 NULL。数值运算需使用 CAST,空值过滤需使用 NULLIF。
示例数据概览
场景 | agent_name | 典型 question | 工具 |
安全审计 | security_audit_agent | "过去24小时内SSH登录日志里有没有异常高频失败的IP?" | sls_execute_sql, get_threat_intel, create_alert |
问数助手 | text2sql_agent | "上季度的总GMV是多少?和去年同期比涨了多少?" | execute_sql, create_chart |
日志分析 | logexplorer_sql_agent | "payment-service的pod一直在CrashLoopBackOff,能帮我看看为啥起不来吗?" | sls_execute_sql, search_logs |
智能客服 | customer_service_agent | "系统显示'已签收'但本人没收到快递!" | query_order, search_knowledge_base, create_ticket |
Demo 场景
# | 场景 | 复杂度 | 算子链 | 数据粒度 | 使用 make-instance |
1 | 用户提问三级去重 | 低 | project > where > dedup-exact > dedup-fuzzy > dedup-semantic | 事件级 | 否 |
2 | 问题聚类 + 场景标注 | 中 | project > where > dedup-exact > semantic-cluster > sample > llm-call | 事件级 | 否 |
3 | Agent 回答质量评估 | 中 | project > where > make-instance > dedup-exact > sample > llm-call > doc-stats | Trace 级 | 是 |
4 | 端到端全流程 | 高 | project > where > make-instance > extend > dedup-exact > dedup-fuzzy > dedup-semantic > semantic-cluster > sample > llm-call x2 > doc-stats | Trace 级 | 是 |
Demo 1:用户提问三级去重
利用仿真数据中带 dedup_tag 标记的重复数据(exact / fuzzy / semantic),演示三级去重链的逐级过滤效果。无 LLM 和 GPU 成本。
定制建议
定制点 | 操作 |
去重严格度 |
|
语义阈值 |
|
跨批次全局去重 | 给 dedup-fuzzy/semantic 添加 |
仅验证某级去重 | 删除不需要的 dedup 节点即可 |
Demo 2:问题聚类 + 场景标注
对去重后的用户提问进行语义聚类,按簇采样后用 LLM 自动标注意图、复杂度等维度,发现 Agent 使用的场景分布。包含 1 轮 LLM 调用。
SPL 语法
* | project question=question,
trace_id=trace_id,
agent=agent_name,
user_id=user_id,
event_type=event_type
| where event_type = 'user_query' AND length(question) > 0
| dedup-exact -field=question
| dedup-semantic -field=question -threshold='0.1'
| semantic-cluster -field=__dedup_emb -n=5
| sample -n=3 by __cluster_id
| llm-call -prompt='@anno/scene-label.md' -fields=question -format=json as anno标注输出示例
{
"意图类型": "问题诊断",
"任务复杂度": "中等",
"业务场景": "安全审计",
"补充标签": ["日志分析", "异常检测", "IP维度"]
}定制建议
定制点 | 操作 |
聚类数量 |
|
每簇采样量 |
|
标注维度 | 修改 |
跳过去重 | 删除 dedup 节点,直接对全量提问聚类标注 |
Demo 3:Agent 回答质量评估
将离散事件日志按 trace_id 聚合为"一问一答"实例,通过 LLM 多维度评估 Agent 回答质量。包含 1 轮 LLM 调用。
text 字段的聚合处理
仿真数据所有字段类型为 text,空值为 "" 而非 NULL。在使用 make-instance 聚合时需要注意以下问题:
问题 | 原因 | 正确做法 |
|
|
|
| text 类型无法直接求和 |
|
| text 按字典序比较,非数值 |
|
对 text 类型的数值字段,统一使用 cast(NULLIF(col, '') as bigint) 包装——先 NULLIF 过滤空串,再 CAST 转数值。
SPL 语法
* | project question=question,
output=output,
model=model,
tool_name=tool_name,
token_input=token_input,
token_output=token_output,
latency_ms=latency_ms,
event_type=event_type,
trace_id=trace_id,
session_id=session_id,
agent_name=agent_name
| where event_type IN ('user_query','tool_call','tool_result','assistant_content','completion')
| make-instance
question=first(question),
answer=last(output),
model=any(model),
agent=any(agent_name),
tool_chain=join(tool_name, ' → '),
tool_count=count_if(event_type = 'tool_call'),
total_tokens=sum(cast(NULLIF(token_input, '') as bigint)),
latency=max(cast(NULLIF(latency_ms, '') as bigint))
by session_id,trace_id
| where question IS NOT NULL AND answer IS NOT NULL
| dedup-exact -field=question
| sample -n=20
| llm-call -prompt='@eval/agent-quality.md' -fields=question,answer,tool_chain -format=json as eval
| doc-stats -field=answer评估输出示例
{
"需求理解": {"score": 5, "reason": "准确理解了用户要求创建基于IP网段的告警规则"},
"回答质量": {"score": 4, "reason": "告警规则配置完整,但未说明触发频率限制"},
"逻辑连贯": {"score": 5, "reason": "从查询到验证到创建,步骤清晰"},
"格式规范": {"score": 5, "reason": "告警配置以结构化格式呈现"},
"安全合规": {"score": 5, "reason": "未泄露敏感信息"}
}定制建议
定制点 | 操作 |
聚合粒度 |
|
增加统计列 | 在 make-instance 中增加 |
评估维度 | 修改 |
采样量 |
|
Demo 4:端到端全流程
数据治理全流程流水线:字段提取、事件聚合、指标派生、三级去重、聚类采样、质量评估 + 场景标注、文档统计。覆盖全部算子,包含 2 轮 LLM 调用。
SPL 语法
* | project question=question,
output=output,
model=model,
tool_name=tool_name,
tool_args=tool_args,
tool_success=tool_success,
token_input=token_input,
token_output=token_output,
latency_ms=latency_ms,
status=status,
event_type=event_type,
trace_id=trace_id,
session_id=session_id,
conversation_id=conversation_id,
agent_name=agent_name,
user_id=user_id,
region_id=region_id
| where event_type IN ('user_query','system_prompt','tool_call','tool_result','assistant_content','completion')
| make-instance
question=first(question),
answer=last(output),
model=any(model),
agent=any(agent_name),
user_id=any(user_id),
region=any(region_id),
tool_chain=join(tool_name, ' → '),
tools=array_distinct(tool_name),
tool_count=count_if(event_type = 'tool_call'),
has_error=bool_or(status = 'error'),
total_input_tokens=sum(cast(NULLIF(token_input, '') as bigint)),
total_output_tokens=sum(cast(NULLIF(token_output, '') as bigint)),
latency=max(cast(NULLIF(latency_ms, '') as bigint))
by session_id,trace_id,conversation_id
| where question IS NOT NULL AND answer IS NOT NULL
| extend token_total=total_input_tokens + total_output_tokens,
answer_preview=substr(answer, 1, 500)
| dedup-exact -field=question
| dedup-fuzzy -field=question -threshold='3'
| dedup-semantic -field=question -threshold='0.1'
| semantic-cluster -field=__dedup_emb -n=5
| sample -n=3 by __cluster_id
| llm-call -prompt='@eval/agent-quality.md' -fields=question,answer,tool_chain -format=json as eval
| llm-call -prompt='@anno/scene-label.md' -fields=question -format=json as anno
| doc-stats -field=answerAPI 配置(JSON)
{
"name": "mock_data_demo_full",
"description": "仿真数据端到端全流程 Demo:聚合、去重、聚类、评估、标注,覆盖全部算子能力",
"source": {
"type": "logstore",
"logstore": {
"project": "ali-pub-cn-hangzhou-staging-sls-admin",
"logstore": "ai_test",
"query": "*"
}
},
"pipeline": {
"nodes": [
{"id": "extract", "type": "project", "parameters": {"question": "question", "output": "output", "model": "model", "tool_name": "tool_name", "tool_args": "tool_args", "tool_success": "tool_success", "token_input": "token_input", "token_output": "token_output", "latency_ms": "latency_ms", "status": "status", "event_type": "event_type", "trace_id": "trace_id", "session_id": "session_id", "conversation_id": "conversation_id", "agent_name": "agent_name", "user_id": "user_id", "region_id": "region_id"}},
{"id": "filter_events", "type": "where", "parameters": {"filter": "event_type IN ('user_query','system_prompt','tool_call','tool_result','assistant_content','completion')"}},
{"id": "assemble", "type": "make-instance", "parameters": {"question": "first(question)", "answer": "last(output)", "model": "any(model)", "agent": "any(agent_name)", "user_id": "any(user_id)", "region": "any(region_id)", "tool_chain": "join(tool_name, ' → ')", "tools": "array_distinct(tool_name)", "tool_count": "count_if(event_type = 'tool_call')", "has_error": "bool_or(status = 'error')", "total_input_tokens": "sum(cast(NULLIF(token_input, '') as bigint))", "total_output_tokens": "sum(cast(NULLIF(token_output, '') as bigint))", "latency": "max(cast(NULLIF(latency_ms, '') as bigint))", "by": "session_id,trace_id,conversation_id"}},
{"id": "filter_valid", "type": "where", "parameters": {"filter": "question IS NOT NULL AND answer IS NOT NULL"}},
{"id": "derive_metrics", "type": "extend", "parameters": {"token_total": "total_input_tokens + total_output_tokens", "answer_preview": "substr(answer, 1, 500)"}},
{"id": "exact_dedup", "type": "dedup-exact", "parameters": {"field": "question"}},
{"id": "fuzzy_dedup", "type": "dedup-fuzzy", "parameters": {"field": "question", "threshold": "3"}},
{"id": "semantic_dedup", "type": "dedup-semantic", "parameters": {"field": "question", "threshold": "0.1"}},
{"id": "cluster", "type": "semantic-cluster", "parameters": {"field": "__dedup_emb", "n": 5}},
{"id": "sample_per_cluster", "type": "sample", "parameters": {"n": 3, "by": "__cluster_id"}},
{"id": "evaluate", "type": "llm-call", "parameters": {"prompt": "@eval/agent-quality.md", "fields": "question,answer,tool_chain", "format": "json", "as": "eval"}},
{"id": "annotate", "type": "llm-call", "parameters": {"prompt": "@anno/scene-label.md", "fields": "question", "format": "json", "as": "anno"}},
{"id": "text_stats", "type": "doc-stats", "parameters": {"field": "answer"}}
]
},
"sink": {
"type": "dataset",
"dataset": {"workspace": "<your-workspace-name>", "dataset": "mock_demo_full"}
},
"executePolicy": {
"mode": "run_once",
"run_once": {"fromTime": 1772150000, "toTime": 1772240000}
}
}上述 JSON 配置中的 <your-workspace-name> 需要替换为您实际的 Workspace 名称。您可以在 SLS 控制台的数据集管理页面查看已有的 Workspace,或创建新的 Workspace。
全流程数据量变化
步骤 | 算子 | 数据量 | 列数 | 说明 |
1 | project | 140 | 17 | 字段选取 |
2 | where | 约 120 | 17 | 过滤 system_prompt 等次要事件 |
3 | make-instance | 约 12 | 15 | 按 trace_id 聚合(事件级转为实例级) |
4 | where | 约 12 | 15 | 过滤 question/answer 为空的实例 |
5 | extend | 约 12 | +2 | 派生 token_total、answer_preview |
6~8 | dedup x3 | 约 8 | +5 | 精确/近似/语义三级去重 |
9~10 | cluster + sample | 约 8 | +1 | 聚 5 簇,每簇 3 条 |
11~13 | llm-call x2 + doc-stats | 约 8 | +3 | 质量评分 + 场景标注 + 文本统计 |
常见问题
在使用 Demo 3 和 Demo 4 时,可能遇到以下常见问题:
问题 | 可能原因 | 排查方向 |
LLM 调用超时或失败 | llm-call 算子调用大模型时,因网络波动或模型服务负载较高导致请求超时。 | Pipeline 内置重试机制,默认会自动重试失败的 LLM 请求。如果多次重试仍然失败,请检查模型服务的可用性和网络连通性,或适当减小 sample 的采样量以降低并发调用数。 |
make-instance 聚合结果为空 | 输入数据中缺少必要的事件类型,或 trace_id 字段为空导致无法按 Trace 聚合。 | 检查输入数据是否包含 user_query 和 assistant_content 类型的事件,确认 trace_id 字段非空。可先单独运行 project + where 算子,验证过滤后的数据是否符合预期。 |
去重后数据量过少 | dedup-semantic 的 threshold 设置过于严格(值过小),导致语义相似的提问被过度去重。 | 适当调大 dedup-semantic 的 threshold 参数(建议范围 0.05~0.15)。也可以暂时移除 dedup-semantic 节点,仅保留 dedup-exact 和 dedup-fuzzy,观察数据量变化。 |
Logstore 不存在报错 | API 配置或 SPL 中指定的 Project 或 Logstore 名称拼写有误,或资源尚未创建。 | 检查 source.logstore.project 和 source.logstore.logstore 参数是否与 SLS 控制台中实际创建的资源名称一致,注意区分大小写。 |
算子覆盖矩阵
算子 | Demo 1 | Demo 2 | Demo 3 | Demo 4 | 算子文档 |
使用 | 使用 | 使用 | 使用 | ||
使用 | 使用 | 使用 | 使用 | ||
- | - | 使用 | 使用 | ||
- | - | - | 使用 | ||
使用 | 使用 | 使用 | 使用 | ||
使用 | - | - | 使用 | ||
使用 | 使用 | - | 使用 | ||
- | 使用 | - | 使用 | ||
- | 使用 | 使用 | 使用 | ||
- | 使用 | 使用 | 使用 x2 | ||
- | - | 使用 | 使用 |
embedding 未在 Demo 中显式使用,因为 dedup-semantic 和 semantic-cluster 内部自动完成了 embedding 计算。
Pipeline 编排原则
原则 | 说明 |
project 前置 | 首算子 |
事件过滤先行 |
|
text 字段 CAST | 仿真数据所有字段为 text,数值运算必须 |
空串不等于 NULL | 仿真数据空值为 |
先减后增 | 先去重/采样(行数递减),再 LLM 处理(列数递增)。 |
扩展列复用 |
|
仿真数据与生产环境的差异
场景 | 说明 |
仿真数据量较小(约 140 条) | 聚类和采样效果有限,建议 |
生产数据字段类型 | 若生产环境字段为 bigint/double,无需 CAST 包装 |
LLM 调用成本 | Demo 3 约 12 次,Demo 4 约 16 次(2 轮 x 8 条),成本极低 |
| 仅在 |
make-instance 空值处理 |
|