將離散的事件級日誌按分組鍵彙總為行級樣本執行個體,供下遊 Pipeline 節點消費。
功能說明
AI Agent 運行時產生的離散日誌中,每條只包含互動過程的一個片段,而下遊 Pipeline 節點(dedup、sample、llm-call 等)要求每行是一條完整的樣本執行個體。
make-instance 是一個純 CPU 資料群組裝節點,按指定的分組鍵將多行事件彙總為一行寬表樣本,提供三類內建函數(選值、計算、組合)快速組裝資料,同時支援標準 SQL 彙總運算式滿足高階需求。
適用情境:
AI Agent 作業記錄的行級樣本構建(事件級 → 樣本級)。
OpenTelemetry Trace Span 資料的寬表整型。
多粒度資料彙總(Span / Trace / Session / 使用者粒度)。
作為 Pipeline 首個節點,為下遊 dedup、sample、llm-call 準備輸入。
節點配置
{
"id": "assemble",
"type": "make-instance",
"parameters": {
"question": "first(question)",
"answer": "last(output)",
"model": "any(model)",
"max_latency": "max(latency_ms)",
"total_input": "sum(token_input)",
"total_output": "sum(token_output)",
"tool_count": "count(tool_name)",
"tools": "array_distinct(tool_name)",
"tool_chain": "join(tool_name, ' → ')",
"tool_info": "json_pack(tool_name, tool_args, tool_success)",
"by": "session_id,trace_id"
}
}參數說明
參數 | 類型 | 必填 | 預設值 | 說明 |
| String | 是 | - | 分組鍵,逗號分隔。 |
其餘參數 | String | 是(至少一個) | - | 列定義:key 為輸出資料行別名,value 為函數調用或 SQL 運算式。 |
| String | 否 |
| 節點輸出資料行,以逗號分隔。 |
每個列定義必須是顯式函數調用(如 model=any(model)),不支援裸欄位名。
內建函數
make-instance 按三種資料群組裝需求,提供三類內建函數:
類別 | 本質 | 解決的問題 | 典型情境 | 可用函數 |
選值 | N 行 → 1 值 | 組內多行,只保留一個代表值 | question、output、model 等語義欄位 |
|
計算 | N 行 → 1 數 | 組內數值,需要統計匯總 | token 用量求和、延遲分析、調用計數 |
|
組合 | N 行 → 1 結構 | 組內多值,需要保留並組裝打包 | 工具調用鏈路、事件序列、結構化詳情 |
|
表中 any、first、last、array、array_distinct、join、json_pack 為內建文法糖(自動延伸為 SQL 彙總運算式),其餘為標準 SQL 彙總函式(原樣透傳)。
函數預覽
選值函數
函數 | 用法 | 說明 | 樣本 | 效果 |
|
| 組內任意非空值 |
|
|
|
| 同上,第 2 參數指定視為空白值的字串,預設 |
|
|
|
| 按 |
| 取時間最早的非空值 |
|
| 按指定列取最早非空值 |
| 按 startTime 取最早的非空值 |
|
| 按 |
| 取時間最晚的非空值 |
|
| 按指定列取最晚非空值 |
| 按 endTime 取最晚的非空值 |
|
| 最大值 |
|
|
|
| 最小值 |
|
|
|
| ord 最大時取 col |
| 取時間最晚行的 model 值 |
|
| ord 最小時取 col |
| 取時間最早行的 model 值 |
計算函數
函數 | 用法 | 說明 | 樣本 | 效果 |
|
| 求和 |
|
|
|
| 平均值 |
|
|
|
| 非 NULL 計數 |
|
|
|
| 條件計數 |
|
|
|
| 組內是否存在滿足條件的行 |
|
|
|
| 組內是否所有行都滿足條件 |
|
|
組合函數
函數 | 用法 | 說明 | 樣本 | 效果 |
|
| 按時序收集 JSON 數組,過濾空值 |
|
|
|
| 收集去重 JSON 數組,過濾空值 |
|
|
|
| 按時序拼接文本,過濾空值 |
| search_logs → analyze_pattern → ... |
|
| 多欄位組裝 JSON 對象 |
|
|
|
| 值頻次分布(MAP 結構) |
|
|
|
| 按 key 彙總為 MAP |
|
|
三類函數覆蓋絕大多數情境。如需更靈活的處理邏輯,可直接使用 SQL 彙總運算式(如 total=sum(cast(a as bigint)) + sum(cast(b as bigint))),滿足彙總函式文法即可。
輸入/輸出
輸入要求
上遊節點輸出的任意列資料。
必須包含
by指定的分組鍵欄位。必須包含列定義中引用的所有源欄位。
輸出資料行
列名 | 類型 | 來源 | 說明 |
| - | 透傳 | 分組鍵 |
列定義中的每個 key | 由函數決定 | 新增 | 彙總結果列 |
make-instance 不透傳非分組鍵的原始列。輸出 schema 完全由 by + 列定義確定。
行數變化
M → N(M ≥ N):多行事件彙總為每組一行,輸出行數等於分組數。
效果預覽
未經處理資料(10 條離散事件記錄)
以下為一個使用者請求 AI Agent "分析最近 7 天的錯誤記錄檔"的完整互動過程。Agent 先後調用兩個工具並返回結論,產生 10 條離散事件:
# | __time__ | session_id | trace_id | event_type | question | output | model | tool_name | tool_args | tool_success | latency_ms | token_input | token_output |
1 | 10:00:01 | sess_a1 | trc_7f01 | user_query | 分析最近7天的錯誤記錄檔並給出最佳化建議 | ||||||||
2 | 10:00:01 | sess_a1 | trc_7f01 | system_prompt | 320 | ||||||||
3 | 10:00:02 | sess_a1 | trc_7f01 | llm_request | qwen-max | 1580 | 120 | ||||||
4 | 10:00:04 | sess_a1 | trc_7f01 | tool_call | qwen-max | search_logs | {"query":"level:ERROR","days":7} | 200 | 65 | ||||
5 | 10:00:06 | sess_a1 | trc_7f01 | tool_result | {"total":42,"top":"NullPointer"} | search_logs | true | 1850 | |||||
6 | 10:00:07 | sess_a1 | trc_7f01 | tool_call | qwen-max | analyze_pattern | {"error_type":"NullPointer"} | 180 | 50 | ||||
7 | 10:00:10 | sess_a1 | trc_7f01 | tool_result | {"root_cause":"缺少null 指標檢查","fix":"添加Optional"} | analyze_pattern | true | 3200 | |||||
8 | 10:00:12 | sess_a1 | trc_7f01 | llm_request | qwen-max | 2800 | 520 | ||||||
9 | 10:00:15 | sess_a1 | trc_7f01 | assistant | 近7天共42條錯誤,NullPointerException佔66.7%,建議添加Optional封裝... | qwen-max | 3200 | 680 | |||||
10 | 10:00:15 | sess_a1 | trc_7f01 | completion | qwen-max | 14000 |
資料特徵:question 僅第 1 行有值,output 分散在第 5/7/9 行,model 出現在 4 行中,tool_name 出現 4 次(含重複),token 分散在各行。
配置樣本
{
"id": "assemble",
"type": "make-instance",
"parameters": {
"question": "first(question)",
"answer": "last(output)",
"model": "any(model)",
"max_latency": "max(latency_ms)",
"total_input": "sum(token_input)",
"total_output": "sum(token_output)",
"avg_latency": "avg(latency_ms)",
"llm_calls": "count(model)",
"tool_count": "count(tool_name)",
"err_tools": "count_if(tool_success = 'false')",
"tools": "array_distinct(tool_name)",
"events": "array(event_type)",
"tool_chain": "join(tool_name, ' → ')",
"tool_detail": "json_pack(tool_name, tool_args, tool_success)",
"by": "session_id,trace_id"
}
}處理後(寬表 · 1 行 x 16 列)
10 行窄表彙總為 1 行寬表。對比上方未經處理資料,每一列由指定函數從 10 行中彙總而來:
列 | 類別 | 函數 | 結果 | 資料來源 |
session_id | 分組鍵 | - | sess_a1 | 分組鍵透傳 |
trace_id | 分組鍵 | - | trc_7f01 | 分組鍵透傳 |
question | 選值 |
| 分析最近7天的錯誤記錄檔並給出最佳化建議 | 僅第 1 行有值,取時間最早的非空值 |
answer | 選值 |
| 近7天共42條錯誤,NullPointerException佔66.7%... | 第 5/7/9 行有值,取時間最晚 → 第 9 行 |
model | 選值 |
| qwen-max | 第 3/4/6/8/9/10 行有值,取任意非空值 |
max_latency | 選值 |
| 14000 | 僅第 5/7/10 行有值,取最大值 |
total_input | 計算 |
| 8280 | 320+1580+200+180+2800+3200 = 8280 |
total_output | 計算 |
| 1435 | 120+65+50+520+680 = 1435 |
avg_latency | 計算 |
| 6350.0 | (1850+3200+14000)/3 = 6350.0 |
llm_calls | 計算 |
| 6 | model 非空行數 = 6 |
tool_count | 計算 |
| 4 | tool_name 非空行數 = 4 |
err_tools | 計算 |
| 0 | 無失敗的工具調用 |
tools | 組合 |
| ["search_logs","analyze_pattern"] | 4 次調用去重 → 2 個工具 |
events | 組合 |
| ["user_query","system_prompt",...] | 按時序收集全部 10 個事件類型 |
tool_chain | 組合 |
| search_logs → search_logs → analyze_pattern → analyze_pattern | 4 次調用按時序拼接 |
tool_detail | 組合 |
| [{"tool_name":"search_logs",...},...] | 每行的工具名+參數+結果組裝為 JSON 對象 |
10 條離散事件彙總為 1 條完整樣本執行個體,覆蓋選值、計算、組合三類函數。缺失欄位自動輸出 NULL。整個過程純 CPU 運算,無 LLM/GPU 依賴。
使用樣本
樣本 1:最簡用法
按 trace 粒度彙總,每列取組內任意非空值。
{
"id": "assemble",
"type": "make-instance",
"parameters": {
"question": "any(question)",
"output": "any(output)",
"model": "any(model)",
"by": "session_id,trace_id"
}
}樣本 2:選值 + 統計 + 打包
{
"id": "assemble",
"type": "make-instance",
"parameters": {
"question": "first(question)",
"answer": "last(output)",
"model": "any(model)",
"max_latency": "max(latency_ms)",
"total_tokens": "sum(token_input)",
"tool_count": "count(tool_name)",
"tools": "array_distinct(tool_name)",
"tool_chain": "join(tool_name, ' → ')",
"by": "session_id,trace_id"
}
}樣本 3:完整 Pipeline(執行個體構建 → 清洗 → 採樣 → AI 評估)
過濾有效事件類型 → 資料群組裝 → AI 評估品質。
{
"nodes": [
{
"id": "filter_events",
"type": "where",
"parameters": {
"filter": "event_type IN ('user_query','system_prompt','tool_call','tool_result','assistant_content','completion')"
}
},
{
"id": "extract", "type": "extend",
"parameters": {
"session_id": "json_extract_scalar(attributes, '$.gen_ai.session.id')",
"span_kind": "json_extract_scalar(attributes, '$.gen_ai.span.kind')",
"question": "json_extract_scalar(attributes, '$.input.value')",
"answer": "json_extract_scalar(attributes, '$.output.value')",
"model": "json_extract_scalar(attributes, '$.gen_ai.request.model')",
"tool_name": "json_extract_scalar(attributes, '$.gen_ai.tool.name')",
"input_tokens": "json_extract_scalar(attributes, '$.gen_ai.usage.input_tokens')"
}
},
{
"id": "filter_events", "type": "where",
"parameters": { "filter": "span_kind IN ('AGENT','LLM','TOOL')" }
},
{
"id": "assemble", "type": "make-instance",
"parameters": {
"question": "first(question)",
"answer": "last(answer)",
"model": "last(model)",
"total_tokens": "sum(input_tokens)",
"tools": "array_distinct(tool_name)",
"tool_chain": "join(tool_name, ' → ')",
"by": "session_id,traceId"
}
},
{ "id": "filter_empty", "type": "where", "parameters": { "filter": "question IS NOT NULL AND length(question) > 0" } },
{ "id": "exact", "type": "dedup-exact", "parameters": { "field": "question" } },
{ "id": "fuzzy", "type": "dedup-fuzzy", "parameters": { "field": "question", "threshold": "3" } },
{ "id": "take", "type": "sample", "parameters": { "n": 50 } },
{ "id": "eval", "type": "llm-call", "parameters": { "prompt": "@eval/quality.md", "fields": "question,answer", "format": "json", "as": "eval" } },
{ "id": "stats", "type": "doc-stats", "parameters": { "field": "question" } }
]
}樣本 4:OT Trace 實戰 — 兩步轉換 + 全文整合
實戰情境:28 條 OT Span(AGENT/LLM/TOOL/EXTERNAL 等)→ 過濾無關事件 → 組裝為 1 行寬表 → 合并為 full_text 列供下遊 AI 評估。
{
"nodes": [
{
"id": "extract", "type": "extend",
"parameters": {
"session_id": "json_extract_scalar(attributes, '$.gen_ai.session.id')",
"span_kind": "json_extract_scalar(attributes, '$.gen_ai.span.kind')",
"model": "json_extract_scalar(attributes, '$.gen_ai.request.model')",
"input_tokens": "json_extract_scalar(attributes, '$.gen_ai.usage.input_tokens')",
"output_tokens": "json_extract_scalar(attributes, '$.gen_ai.usage.output_tokens')",
"tool_name": "json_extract_scalar(attributes, '$.gen_ai.tool.name')",
"tool_args": "json_extract_scalar(attributes, '$.gen_ai.tool.call.arguments')",
"input_value": "json_extract_scalar(attributes, '$.input.value')",
"output_value": "json_extract_scalar(attributes, '$.output.value')",
"agent_id": "json_extract_scalar(attributes, '$.agent.id')",
"dur_ms": "cast(duration as bigint) / 1000000"
}
},
{
"id": "filter_events", "type": "where",
"parameters": { "filter": "span_kind IN ('AGENT','LLM','TOOL')" }
},
{
"id": "assemble", "type": "make-instance",
"parameters": {
"question": "min_by(input_value, startTime)",
"answer": "max_by(output_value, endTime)",
"model": "max_by(model, endTime)",
"total_input_tokens": "sum(cast(input_tokens as bigint))",
"total_output_tokens": "sum(cast(output_tokens as bigint))",
"llm_calls": "count_if(span_kind = 'LLM')",
"tool_calls": "count_if(span_kind = 'TOOL')",
"e2e_latency": "max(dur_ms)",
"models": "array_distinct(model)",
"tools": "array_distinct(tool_name)",
"tool_chain": "join(tool_name, ' → ')",
"by": "session_id,traceId"
}
},
{
"id": "compose", "type": "extend",
"parameters": {
"full_text": "concat('## 使用者輸入', chr(10), substr(question, 1, 500), chr(10), chr(10), '## 最終輸出', chr(10), answer)"
}
},
{ "id": "filter_empty", "type": "where", "parameters": { "filter": "question IS NOT NULL AND length(question) > 0" } },
{ "id": "eval", "type": "llm-call", "parameters": { "prompt": "@eval/quality.md", "fields": "full_text", "format": "json", "as": "eval" } }
]
}第一步 extend + where + make-instance 完成欄位提取、事件過濾和資料群組裝(28 行 → 1 行);第二步 extend 將 question + answer 合并為 full_text 供 llm-call 做整體評估。全過程純 CPU。
使用建議與邊界行為
Pipeline 整合建議
邊界行為
情境 | 行為 |
組內某些事件缺少特定欄位 | 彙總函式自然處理 NULL,不報錯。 |
by 分組索引值為 NULL | 該事件不參與分組。 |
列定義使用裸欄位名 | validate 報錯,須使用顯式函數。 |
輸入資料為空白 | 正常返回空結果集。 |