全部产品
Search
文档中心

云监控:make-instance(实例构建)

更新时间:Apr 30, 2026

将离散的事件级日志按分组键聚合为行级样本实例,供下游 Pipeline 节点消费。

功能说明

AI Agent 运行时产生的离散日志中,每条只包含交互过程的一个片段,而下游 Pipeline 节点(dedup、sample、llm-call 等)要求每行是一条完整的样本实例。

make-instance 是一个纯 CPU 数据组装节点,按指定的分组键将多行事件聚合为一行宽表样本,提供三类内置函数(选值、计算、组合)快速组装数据,同时支持标准 SQL 聚合表达式满足高阶需求。

适用场景

  • AI Agent 运行日志的行级样本构建(事件级 → 样本级)。

  • OpenTelemetry Trace Span 数据的宽表整型。

  • 多粒度数据聚合(Span / Trace / Session / 用户粒度)。

  • 作为 Pipeline 首个节点,为下游 dedup、sample、llm-call 准备输入。

节点配置

{
  "id": "assemble",
  "type": "make-instance",
  "parameters": {
    "question": "first(question)",
    "answer": "last(output)",
    "model": "any(model)",
    "max_latency": "max(latency_ms)",
    "total_input": "sum(token_input)",
    "total_output": "sum(token_output)",
    "tool_count": "count(tool_name)",
    "tools": "array_distinct(tool_name)",
    "tool_chain": "join(tool_name, ' → ')",
    "tool_info": "json_pack(tool_name, tool_args, tool_success)",
    "by": "session_id,trace_id"
  }
}

参数说明

参数

类型

必填

默认值

说明

by

String

-

分组键,逗号分隔。

其余参数

String

(至少一个)

-

列定义:key 为输出列别名,value 为函数调用或 SQL 表达式。

output

String

*

节点输出列,以逗号分隔。

说明

每个列定义必须是显式函数调用(如 model=any(model)),不支持裸字段名。

内置函数

make-instance 按三种数据组装需求,提供三类内置函数:

类别

本质

解决的问题

典型场景

可用函数

选值

N 行 → 1 值

组内多行,只保留一个代表值

question、output、model 等语义字段

any first last max min max_by min_by

计算

N 行 → 1 数

组内数值,需要统计汇总

token 用量求和、延迟分析、调用计数

sum avg count count_if bool_or bool_and

组合

N 行 → 1 结构

组内多值,需要保留并组装打包

工具调用链路、事件序列、结构化详情

array array_distinct join json_pack histogram map_agg

说明

表中 anyfirstlastarrayarray_distinctjoinjson_pack 为内置语法糖(自动展开为 SQL 聚合表达式),其余为标准 SQL 聚合函数(原样透传)。

函数速览

选值函数

函数

用法

说明

示例

效果

any

any(col)

组内任意非空值

model=any(model)

qwen-max

any

any(col, '空值')

同上,第 2 参数指定视为空值的字符串,默认 ''

status=any(status, 'N/A')

success

first

first(col)

__time__ 取最早非空值

question=first(question)

取时间最早的非空值

first

first(col, 排序列)

按指定列取最早非空值

q=first(question, startTime)

按 startTime 取最早的非空值

last

last(col)

__time__ 取最晚非空值

answer=last(output)

取时间最晚的非空值

last

last(col, 排序列)

按指定列取最晚非空值

a=last(output, endTime)

按 endTime 取最晚的非空值

max

max(col)

最大值

max_lat=max(latency_ms)

14000

min

min(col)

最小值

min_lat=min(latency_ms)

1850

max_by

max_by(col, ord)

ord 最大时取 col

m=max_by(model, __time__)

取时间最晚行的 model 值

min_by

min_by(col, ord)

ord 最小时取 col

m=min_by(model, __time__)

取时间最早行的 model 值

计算函数

函数

用法

说明

示例

效果

sum

sum(col)

求和

total=sum(token_input)

8280

avg

avg(col)

平均值

avg_lat=avg(latency_ms)

6350.0

count

count(col)

非 NULL 计数

n=count(tool_name)

4

count_if

count_if(条件)

条件计数

errs=count_if(success='false')

0

bool_or

bool_or(条件)

组内是否存在满足条件的行

has_err=bool_or(success='false')

false

bool_and

bool_and(条件)

组内是否所有行都满足条件

all_ok=bool_and(success='true')

true

组合函数

函数

用法

说明

示例

效果

array

array(col)

按时序收集 JSON 数组,过滤空值

evts=array(event_type)

["user_query","tool_call",...]

array_distinct

array_distinct(col)

收集去重 JSON 数组,过滤空值

tools=array_distinct(tool_name)

["search_logs","analyze_pattern"]

join

join(col, '分隔符')

按时序拼接文本,过滤空值

chain=join(tool_name,' → ')

search_logs → analyze_pattern → ...

json_pack

json_pack(c1, c2, ...)

多字段组装 JSON 对象

info=json_pack(name, args)

[{"name":"search_logs","args":...},...]

histogram

histogram(col)

值频次分布(MAP 结构)

dist=histogram(event_type)

{"tool_call":2,"tool_result":2,...}

map_agg

map_agg(key, val)

按 key 聚合为 MAP

tok=map_agg(model, tokens)

{"qwen-max":8280}

三类函数覆盖绝大多数场景。如需更灵活的处理逻辑,可直接使用 SQL 聚合表达式(如 total=sum(cast(a as bigint)) + sum(cast(b as bigint))),满足聚合函数语法即可。

输入/输出

输入要求

  • 上游节点输出的任意列数据。

  • 必须包含 by 指定的分组键字段。

  • 必须包含列定义中引用的所有源字段。

输出列

列名

类型

来源

说明

by 指定的列

-

透传

分组键

列定义中的每个 key

由函数决定

新增

聚合结果列

说明

make-instance 不透传非分组键的原始列。输出 schema 完全由 by + 列定义确定。

行数变化

M → N(M ≥ N):多行事件聚合为每组一行,输出行数等于分组数。

效果预览

原始数据(10 条离散事件日志)

以下为一个用户请求 AI Agent "分析最近 7 天的错误日志"的完整交互过程。Agent 先后调用两个工具并返回结论,产生 10 条离散事件:

#

__time__

session_id

trace_id

event_type

question

output

model

tool_name

tool_args

tool_success

latency_ms

token_input

token_output

1

10:00:01

sess_a1

trc_7f01

user_query

分析最近7天的错误日志并给出优化建议

2

10:00:01

sess_a1

trc_7f01

system_prompt

320

3

10:00:02

sess_a1

trc_7f01

llm_request

qwen-max

1580

120

4

10:00:04

sess_a1

trc_7f01

tool_call

qwen-max

search_logs

{"query":"level:ERROR","days":7}

200

65

5

10:00:06

sess_a1

trc_7f01

tool_result

{"total":42,"top":"NullPointer"}

search_logs

true

1850

6

10:00:07

sess_a1

trc_7f01

tool_call

qwen-max

analyze_pattern

{"error_type":"NullPointer"}

180

50

7

10:00:10

sess_a1

trc_7f01

tool_result

{"root_cause":"缺少空指针检查","fix":"添加Optional"}

analyze_pattern

true

3200

8

10:00:12

sess_a1

trc_7f01

llm_request

qwen-max

2800

520

9

10:00:15

sess_a1

trc_7f01

assistant

近7天共42条错误,NullPointerException占66.7%,建议添加Optional包装...

qwen-max

3200

680

10

10:00:15

sess_a1

trc_7f01

completion

qwen-max

14000

说明

数据特征:question 仅第 1 行有值,output 分散在第 5/7/9 行,model 出现在 4 行中,tool_name 出现 4 次(含重复),token 分散在各行。

配置示例

{
  "id": "assemble",
  "type": "make-instance",
  "parameters": {
    "question": "first(question)",
    "answer": "last(output)",
    "model": "any(model)",
    "max_latency": "max(latency_ms)",
    "total_input": "sum(token_input)",
    "total_output": "sum(token_output)",
    "avg_latency": "avg(latency_ms)",
    "llm_calls": "count(model)",
    "tool_count": "count(tool_name)",
    "err_tools": "count_if(tool_success = 'false')",
    "tools": "array_distinct(tool_name)",
    "events": "array(event_type)",
    "tool_chain": "join(tool_name, ' → ')",
    "tool_detail": "json_pack(tool_name, tool_args, tool_success)",
    "by": "session_id,trace_id"
  }
}

处理后(宽表 · 1 行 x 16 列)

10 行窄表聚合为 1 行宽表。对比上方原始数据,每一列由指定函数从 10 行中聚合而来:

类别

函数

结果

数据来源

session_id

分组键

-

sess_a1

分组键透传

trace_id

分组键

-

trc_7f01

分组键透传

question

选值

first(question)

分析最近7天的错误日志并给出优化建议

仅第 1 行有值,取时间最早的非空值

answer

选值

last(output)

近7天共42条错误,NullPointerException占66.7%...

第 5/7/9 行有值,取时间最晚 → 第 9 行

model

选值

any(model)

qwen-max

第 3/4/6/8/9/10 行有值,取任意非空值

max_latency

选值

max(latency_ms)

14000

仅第 5/7/10 行有值,取最大值

total_input

计算

sum(token_input)

8280

320+1580+200+180+2800+3200 = 8280

total_output

计算

sum(token_output)

1435

120+65+50+520+680 = 1435

avg_latency

计算

avg(latency_ms)

6350.0

(1850+3200+14000)/3 = 6350.0

llm_calls

计算

count(model)

6

model 非空行数 = 6

tool_count

计算

count(tool_name)

4

tool_name 非空行数 = 4

err_tools

计算

count_if(tool_success='false')

0

无失败的工具调用

tools

组合

array_distinct(tool_name)

["search_logs","analyze_pattern"]

4 次调用去重 → 2 个工具

events

组合

array(event_type)

["user_query","system_prompt",...]

按时序收集全部 10 个事件类型

tool_chain

组合

join(tool_name, ' → ')

search_logs → search_logs → analyze_pattern → analyze_pattern

4 次调用按时序拼接

tool_detail

组合

json_pack(...)

[{"tool_name":"search_logs",...},...]

每行的工具名+参数+结果组装为 JSON 对象

10 条离散事件聚合为 1 条完整样本实例,覆盖选值、计算、组合三类函数。缺失字段自动输出 NULL。整个过程纯 CPU 运算,无 LLM/GPU 依赖。

使用示例

示例 1:最简用法

按 trace 粒度聚合,每列取组内任意非空值。

{
  "id": "assemble",
  "type": "make-instance",
  "parameters": {
    "question": "any(question)",
    "output": "any(output)",
    "model": "any(model)",
    "by": "session_id,trace_id"
  }
}

示例 2:选值 + 统计 + 打包

{
  "id": "assemble",
  "type": "make-instance",
  "parameters": {
    "question": "first(question)",
    "answer": "last(output)",
    "model": "any(model)",
    "max_latency": "max(latency_ms)",
    "total_tokens": "sum(token_input)",
    "tool_count": "count(tool_name)",
    "tools": "array_distinct(tool_name)",
    "tool_chain": "join(tool_name, ' → ')",
    "by": "session_id,trace_id"
  }
}

示例 3:完整 Pipeline(实例构建 → 清洗 → 采样 → AI 评估)

过滤有效事件类型 → 数据组装 → AI 评估质量。

{
  "nodes": [
    {
      "id": "filter_events",
      "type": "where",
      "parameters": {
        "filter": "event_type IN ('user_query','system_prompt','tool_call','tool_result','assistant_content','completion')"
      }
    },
    {
      "id": "extract", "type": "extend",
      "parameters": {
        "session_id": "json_extract_scalar(attributes, '$.gen_ai.session.id')",
        "span_kind": "json_extract_scalar(attributes, '$.gen_ai.span.kind')",
        "question": "json_extract_scalar(attributes, '$.input.value')",
        "answer": "json_extract_scalar(attributes, '$.output.value')",
        "model": "json_extract_scalar(attributes, '$.gen_ai.request.model')",
        "tool_name": "json_extract_scalar(attributes, '$.gen_ai.tool.name')",
        "input_tokens": "json_extract_scalar(attributes, '$.gen_ai.usage.input_tokens')"
      }
    },
    {
      "id": "filter_events", "type": "where",
      "parameters": { "filter": "span_kind IN ('AGENT','LLM','TOOL')" }
    },
    {
      "id": "assemble", "type": "make-instance",
      "parameters": {
        "question": "first(question)",
        "answer": "last(answer)",
        "model": "last(model)",
        "total_tokens": "sum(input_tokens)",
        "tools": "array_distinct(tool_name)",
        "tool_chain": "join(tool_name, ' → ')",
        "by": "session_id,traceId"
      }
    },
    { "id": "filter_empty", "type": "where", "parameters": { "filter": "question IS NOT NULL AND length(question) > 0" } },
    { "id": "exact", "type": "dedup-exact", "parameters": { "field": "question" } },
    { "id": "fuzzy", "type": "dedup-fuzzy", "parameters": { "field": "question", "threshold": "3" } },
    { "id": "take", "type": "sample", "parameters": { "n": 50 } },
    { "id": "eval", "type": "llm-call", "parameters": { "prompt": "@eval/quality.md", "fields": "question,answer", "format": "json", "as": "eval" } },
    { "id": "stats", "type": "doc-stats", "parameters": { "field": "question" } }
  ]
}

示例 4:OT Trace 实战 — 两步转换 + 全文整合

实战场景:28 条 OT Span(AGENT/LLM/TOOL/EXTERNAL 等)→ 过滤无关事件 → 组装为 1 行宽表 → 合并为 full_text 列供下游 AI 评估。

{
  "nodes": [
    {
      "id": "extract", "type": "extend",
      "parameters": {
        "session_id": "json_extract_scalar(attributes, '$.gen_ai.session.id')",
        "span_kind": "json_extract_scalar(attributes, '$.gen_ai.span.kind')",
        "model": "json_extract_scalar(attributes, '$.gen_ai.request.model')",
        "input_tokens": "json_extract_scalar(attributes, '$.gen_ai.usage.input_tokens')",
        "output_tokens": "json_extract_scalar(attributes, '$.gen_ai.usage.output_tokens')",
        "tool_name": "json_extract_scalar(attributes, '$.gen_ai.tool.name')",
        "tool_args": "json_extract_scalar(attributes, '$.gen_ai.tool.call.arguments')",
        "input_value": "json_extract_scalar(attributes, '$.input.value')",
        "output_value": "json_extract_scalar(attributes, '$.output.value')",
        "agent_id": "json_extract_scalar(attributes, '$.agent.id')",
        "dur_ms": "cast(duration as bigint) / 1000000"
      }
    },
    {
      "id": "filter_events", "type": "where",
      "parameters": { "filter": "span_kind IN ('AGENT','LLM','TOOL')" }
    },
    {
      "id": "assemble", "type": "make-instance",
      "parameters": {
        "question": "min_by(input_value, startTime)",
        "answer": "max_by(output_value, endTime)",
        "model": "max_by(model, endTime)",
        "total_input_tokens": "sum(cast(input_tokens as bigint))",
        "total_output_tokens": "sum(cast(output_tokens as bigint))",
        "llm_calls": "count_if(span_kind = 'LLM')",
        "tool_calls": "count_if(span_kind = 'TOOL')",
        "e2e_latency": "max(dur_ms)",
        "models": "array_distinct(model)",
        "tools": "array_distinct(tool_name)",
        "tool_chain": "join(tool_name, ' → ')",
        "by": "session_id,traceId"
      }
    },
    {
      "id": "compose", "type": "extend",
      "parameters": {
        "full_text": "concat('## 用户输入', chr(10), substr(question, 1, 500), chr(10), chr(10), '## 最终输出', chr(10), answer)"
      }
    },
    { "id": "filter_empty", "type": "where", "parameters": { "filter": "question IS NOT NULL AND length(question) > 0" } },
    { "id": "eval", "type": "llm-call", "parameters": { "prompt": "@eval/quality.md", "fields": "full_text", "format": "json", "as": "eval" } }
  ]
}

第一步 extend + where + make-instance 完成字段提取、事件过滤和数据组装(28 行 → 1 行);第二步 extend 将 question + answer 合并为 full_textllm-call 做整体评估。全过程纯 CPU。

使用建议与边界行为

边界行为

场景

行为

组内某些事件缺少特定字段

聚合函数自然处理 NULL,不报错。

by 分组键值为 NULL

该事件不参与分组。

列定义使用裸字段名

validate 报错,须使用显式函数。

输入数据为空

正常返回空结果集。