将离散的事件级日志按分组键聚合为行级样本实例,供下游 Pipeline 节点消费。
功能说明
AI Agent 运行时产生的离散日志中,每条只包含交互过程的一个片段,而下游 Pipeline 节点(dedup、sample、llm-call 等)要求每行是一条完整的样本实例。
make-instance 是一个纯 CPU 数据组装节点,按指定的分组键将多行事件聚合为一行宽表样本,提供三类内置函数(选值、计算、组合)快速组装数据,同时支持标准 SQL 聚合表达式满足高阶需求。
适用场景:
AI Agent 运行日志的行级样本构建(事件级 → 样本级)。
OpenTelemetry Trace Span 数据的宽表整型。
多粒度数据聚合(Span / Trace / Session / 用户粒度)。
作为 Pipeline 首个节点,为下游 dedup、sample、llm-call 准备输入。
节点配置
{
"id": "assemble",
"type": "make-instance",
"parameters": {
"question": "first(question)",
"answer": "last(output)",
"model": "any(model)",
"max_latency": "max(latency_ms)",
"total_input": "sum(token_input)",
"total_output": "sum(token_output)",
"tool_count": "count(tool_name)",
"tools": "array_distinct(tool_name)",
"tool_chain": "join(tool_name, ' → ')",
"tool_info": "json_pack(tool_name, tool_args, tool_success)",
"by": "session_id,trace_id"
}
}参数说明
参数 | 类型 | 必填 | 默认值 | 说明 |
| String | 是 | - | 分组键,逗号分隔。 |
其余参数 | String | 是(至少一个) | - | 列定义:key 为输出列别名,value 为函数调用或 SQL 表达式。 |
| String | 否 |
| 节点输出列,以逗号分隔。 |
每个列定义必须是显式函数调用(如 model=any(model)),不支持裸字段名。
内置函数
make-instance 按三种数据组装需求,提供三类内置函数:
类别 | 本质 | 解决的问题 | 典型场景 | 可用函数 |
选值 | N 行 → 1 值 | 组内多行,只保留一个代表值 | question、output、model 等语义字段 |
|
计算 | N 行 → 1 数 | 组内数值,需要统计汇总 | token 用量求和、延迟分析、调用计数 |
|
组合 | N 行 → 1 结构 | 组内多值,需要保留并组装打包 | 工具调用链路、事件序列、结构化详情 |
|
表中 any、first、last、array、array_distinct、join、json_pack 为内置语法糖(自动展开为 SQL 聚合表达式),其余为标准 SQL 聚合函数(原样透传)。
函数速览
选值函数
函数 | 用法 | 说明 | 示例 | 效果 |
|
| 组内任意非空值 |
|
|
|
| 同上,第 2 参数指定视为空值的字符串,默认 |
|
|
|
| 按 |
| 取时间最早的非空值 |
|
| 按指定列取最早非空值 |
| 按 startTime 取最早的非空值 |
|
| 按 |
| 取时间最晚的非空值 |
|
| 按指定列取最晚非空值 |
| 按 endTime 取最晚的非空值 |
|
| 最大值 |
|
|
|
| 最小值 |
|
|
|
| ord 最大时取 col |
| 取时间最晚行的 model 值 |
|
| ord 最小时取 col |
| 取时间最早行的 model 值 |
计算函数
函数 | 用法 | 说明 | 示例 | 效果 |
|
| 求和 |
|
|
|
| 平均值 |
|
|
|
| 非 NULL 计数 |
|
|
|
| 条件计数 |
|
|
|
| 组内是否存在满足条件的行 |
|
|
|
| 组内是否所有行都满足条件 |
|
|
组合函数
函数 | 用法 | 说明 | 示例 | 效果 |
|
| 按时序收集 JSON 数组,过滤空值 |
|
|
|
| 收集去重 JSON 数组,过滤空值 |
|
|
|
| 按时序拼接文本,过滤空值 |
| search_logs → analyze_pattern → ... |
|
| 多字段组装 JSON 对象 |
|
|
|
| 值频次分布(MAP 结构) |
|
|
|
| 按 key 聚合为 MAP |
|
|
三类函数覆盖绝大多数场景。如需更灵活的处理逻辑,可直接使用 SQL 聚合表达式(如 total=sum(cast(a as bigint)) + sum(cast(b as bigint))),满足聚合函数语法即可。
输入/输出
输入要求
上游节点输出的任意列数据。
必须包含
by指定的分组键字段。必须包含列定义中引用的所有源字段。
输出列
列名 | 类型 | 来源 | 说明 |
| - | 透传 | 分组键 |
列定义中的每个 key | 由函数决定 | 新增 | 聚合结果列 |
make-instance 不透传非分组键的原始列。输出 schema 完全由 by + 列定义确定。
行数变化
M → N(M ≥ N):多行事件聚合为每组一行,输出行数等于分组数。
效果预览
原始数据(10 条离散事件日志)
以下为一个用户请求 AI Agent "分析最近 7 天的错误日志"的完整交互过程。Agent 先后调用两个工具并返回结论,产生 10 条离散事件:
# | __time__ | session_id | trace_id | event_type | question | output | model | tool_name | tool_args | tool_success | latency_ms | token_input | token_output |
1 | 10:00:01 | sess_a1 | trc_7f01 | user_query | 分析最近7天的错误日志并给出优化建议 | ||||||||
2 | 10:00:01 | sess_a1 | trc_7f01 | system_prompt | 320 | ||||||||
3 | 10:00:02 | sess_a1 | trc_7f01 | llm_request | qwen-max | 1580 | 120 | ||||||
4 | 10:00:04 | sess_a1 | trc_7f01 | tool_call | qwen-max | search_logs | {"query":"level:ERROR","days":7} | 200 | 65 | ||||
5 | 10:00:06 | sess_a1 | trc_7f01 | tool_result | {"total":42,"top":"NullPointer"} | search_logs | true | 1850 | |||||
6 | 10:00:07 | sess_a1 | trc_7f01 | tool_call | qwen-max | analyze_pattern | {"error_type":"NullPointer"} | 180 | 50 | ||||
7 | 10:00:10 | sess_a1 | trc_7f01 | tool_result | {"root_cause":"缺少空指针检查","fix":"添加Optional"} | analyze_pattern | true | 3200 | |||||
8 | 10:00:12 | sess_a1 | trc_7f01 | llm_request | qwen-max | 2800 | 520 | ||||||
9 | 10:00:15 | sess_a1 | trc_7f01 | assistant | 近7天共42条错误,NullPointerException占66.7%,建议添加Optional包装... | qwen-max | 3200 | 680 | |||||
10 | 10:00:15 | sess_a1 | trc_7f01 | completion | qwen-max | 14000 |
数据特征:question 仅第 1 行有值,output 分散在第 5/7/9 行,model 出现在 4 行中,tool_name 出现 4 次(含重复),token 分散在各行。
配置示例
{
"id": "assemble",
"type": "make-instance",
"parameters": {
"question": "first(question)",
"answer": "last(output)",
"model": "any(model)",
"max_latency": "max(latency_ms)",
"total_input": "sum(token_input)",
"total_output": "sum(token_output)",
"avg_latency": "avg(latency_ms)",
"llm_calls": "count(model)",
"tool_count": "count(tool_name)",
"err_tools": "count_if(tool_success = 'false')",
"tools": "array_distinct(tool_name)",
"events": "array(event_type)",
"tool_chain": "join(tool_name, ' → ')",
"tool_detail": "json_pack(tool_name, tool_args, tool_success)",
"by": "session_id,trace_id"
}
}处理后(宽表 · 1 行 x 16 列)
10 行窄表聚合为 1 行宽表。对比上方原始数据,每一列由指定函数从 10 行中聚合而来:
列 | 类别 | 函数 | 结果 | 数据来源 |
session_id | 分组键 | - | sess_a1 | 分组键透传 |
trace_id | 分组键 | - | trc_7f01 | 分组键透传 |
question | 选值 |
| 分析最近7天的错误日志并给出优化建议 | 仅第 1 行有值,取时间最早的非空值 |
answer | 选值 |
| 近7天共42条错误,NullPointerException占66.7%... | 第 5/7/9 行有值,取时间最晚 → 第 9 行 |
model | 选值 |
| qwen-max | 第 3/4/6/8/9/10 行有值,取任意非空值 |
max_latency | 选值 |
| 14000 | 仅第 5/7/10 行有值,取最大值 |
total_input | 计算 |
| 8280 | 320+1580+200+180+2800+3200 = 8280 |
total_output | 计算 |
| 1435 | 120+65+50+520+680 = 1435 |
avg_latency | 计算 |
| 6350.0 | (1850+3200+14000)/3 = 6350.0 |
llm_calls | 计算 |
| 6 | model 非空行数 = 6 |
tool_count | 计算 |
| 4 | tool_name 非空行数 = 4 |
err_tools | 计算 |
| 0 | 无失败的工具调用 |
tools | 组合 |
| ["search_logs","analyze_pattern"] | 4 次调用去重 → 2 个工具 |
events | 组合 |
| ["user_query","system_prompt",...] | 按时序收集全部 10 个事件类型 |
tool_chain | 组合 |
| search_logs → search_logs → analyze_pattern → analyze_pattern | 4 次调用按时序拼接 |
tool_detail | 组合 |
| [{"tool_name":"search_logs",...},...] | 每行的工具名+参数+结果组装为 JSON 对象 |
10 条离散事件聚合为 1 条完整样本实例,覆盖选值、计算、组合三类函数。缺失字段自动输出 NULL。整个过程纯 CPU 运算,无 LLM/GPU 依赖。
使用示例
示例 1:最简用法
按 trace 粒度聚合,每列取组内任意非空值。
{
"id": "assemble",
"type": "make-instance",
"parameters": {
"question": "any(question)",
"output": "any(output)",
"model": "any(model)",
"by": "session_id,trace_id"
}
}示例 2:选值 + 统计 + 打包
{
"id": "assemble",
"type": "make-instance",
"parameters": {
"question": "first(question)",
"answer": "last(output)",
"model": "any(model)",
"max_latency": "max(latency_ms)",
"total_tokens": "sum(token_input)",
"tool_count": "count(tool_name)",
"tools": "array_distinct(tool_name)",
"tool_chain": "join(tool_name, ' → ')",
"by": "session_id,trace_id"
}
}示例 3:完整 Pipeline(实例构建 → 清洗 → 采样 → AI 评估)
过滤有效事件类型 → 数据组装 → AI 评估质量。
{
"nodes": [
{
"id": "filter_events",
"type": "where",
"parameters": {
"filter": "event_type IN ('user_query','system_prompt','tool_call','tool_result','assistant_content','completion')"
}
},
{
"id": "extract", "type": "extend",
"parameters": {
"session_id": "json_extract_scalar(attributes, '$.gen_ai.session.id')",
"span_kind": "json_extract_scalar(attributes, '$.gen_ai.span.kind')",
"question": "json_extract_scalar(attributes, '$.input.value')",
"answer": "json_extract_scalar(attributes, '$.output.value')",
"model": "json_extract_scalar(attributes, '$.gen_ai.request.model')",
"tool_name": "json_extract_scalar(attributes, '$.gen_ai.tool.name')",
"input_tokens": "json_extract_scalar(attributes, '$.gen_ai.usage.input_tokens')"
}
},
{
"id": "filter_events", "type": "where",
"parameters": { "filter": "span_kind IN ('AGENT','LLM','TOOL')" }
},
{
"id": "assemble", "type": "make-instance",
"parameters": {
"question": "first(question)",
"answer": "last(answer)",
"model": "last(model)",
"total_tokens": "sum(input_tokens)",
"tools": "array_distinct(tool_name)",
"tool_chain": "join(tool_name, ' → ')",
"by": "session_id,traceId"
}
},
{ "id": "filter_empty", "type": "where", "parameters": { "filter": "question IS NOT NULL AND length(question) > 0" } },
{ "id": "exact", "type": "dedup-exact", "parameters": { "field": "question" } },
{ "id": "fuzzy", "type": "dedup-fuzzy", "parameters": { "field": "question", "threshold": "3" } },
{ "id": "take", "type": "sample", "parameters": { "n": 50 } },
{ "id": "eval", "type": "llm-call", "parameters": { "prompt": "@eval/quality.md", "fields": "question,answer", "format": "json", "as": "eval" } },
{ "id": "stats", "type": "doc-stats", "parameters": { "field": "question" } }
]
}示例 4:OT Trace 实战 — 两步转换 + 全文整合
实战场景:28 条 OT Span(AGENT/LLM/TOOL/EXTERNAL 等)→ 过滤无关事件 → 组装为 1 行宽表 → 合并为 full_text 列供下游 AI 评估。
{
"nodes": [
{
"id": "extract", "type": "extend",
"parameters": {
"session_id": "json_extract_scalar(attributes, '$.gen_ai.session.id')",
"span_kind": "json_extract_scalar(attributes, '$.gen_ai.span.kind')",
"model": "json_extract_scalar(attributes, '$.gen_ai.request.model')",
"input_tokens": "json_extract_scalar(attributes, '$.gen_ai.usage.input_tokens')",
"output_tokens": "json_extract_scalar(attributes, '$.gen_ai.usage.output_tokens')",
"tool_name": "json_extract_scalar(attributes, '$.gen_ai.tool.name')",
"tool_args": "json_extract_scalar(attributes, '$.gen_ai.tool.call.arguments')",
"input_value": "json_extract_scalar(attributes, '$.input.value')",
"output_value": "json_extract_scalar(attributes, '$.output.value')",
"agent_id": "json_extract_scalar(attributes, '$.agent.id')",
"dur_ms": "cast(duration as bigint) / 1000000"
}
},
{
"id": "filter_events", "type": "where",
"parameters": { "filter": "span_kind IN ('AGENT','LLM','TOOL')" }
},
{
"id": "assemble", "type": "make-instance",
"parameters": {
"question": "min_by(input_value, startTime)",
"answer": "max_by(output_value, endTime)",
"model": "max_by(model, endTime)",
"total_input_tokens": "sum(cast(input_tokens as bigint))",
"total_output_tokens": "sum(cast(output_tokens as bigint))",
"llm_calls": "count_if(span_kind = 'LLM')",
"tool_calls": "count_if(span_kind = 'TOOL')",
"e2e_latency": "max(dur_ms)",
"models": "array_distinct(model)",
"tools": "array_distinct(tool_name)",
"tool_chain": "join(tool_name, ' → ')",
"by": "session_id,traceId"
}
},
{
"id": "compose", "type": "extend",
"parameters": {
"full_text": "concat('## 用户输入', chr(10), substr(question, 1, 500), chr(10), chr(10), '## 最终输出', chr(10), answer)"
}
},
{ "id": "filter_empty", "type": "where", "parameters": { "filter": "question IS NOT NULL AND length(question) > 0" } },
{ "id": "eval", "type": "llm-call", "parameters": { "prompt": "@eval/quality.md", "fields": "full_text", "format": "json", "as": "eval" } }
]
}第一步 extend + where + make-instance 完成字段提取、事件过滤和数据组装(28 行 → 1 行);第二步 extend 将 question + answer 合并为 full_text 供 llm-call 做整体评估。全过程纯 CPU。
使用建议与边界行为
Pipeline 集成建议
边界行为
场景 | 行为 |
组内某些事件缺少特定字段 | 聚合函数自然处理 NULL,不报错。 |
by 分组键值为 NULL | 该事件不参与分组。 |
列定义使用裸字段名 | validate 报错,须使用显式函数。 |
输入数据为空 | 正常返回空结果集。 |