All Products
Search
Document Center

Cloud Monitor:Customize traces with loongsuite-util-genai and the OpenTelemetry SDK

Last Updated:Apr 02, 2026

After you integrate ARMS Application Monitoring, the agent automatically instruments common AI frameworks, collecting trace data without any code changes. To capture specific business methods in your traces, add custom instrumentation using the loongsuite-util-genai package and the OpenTelemetry SDK. This topic describes how to use loongsuite-util-genai and the OpenTelemetry Python SDK to create custom spans and add custom attributes.

For a list of AI components and frameworks supported by the ARMS agent, see the following topics:

Prerequisites

Install dependencies

pip install loongsuite-util-genai

The installation provides the opentelemetry.util.genai package and extended interfaces such as ExtendedTelemetryHandler. For more information, see loongsuite-util-genai detailed documentation.

Use loongsuite-util-genai and the OpenTelemetry SDK

With loongsuite-util-genai and the OpenTelemetry SDK, you can:

  • Create spans with GenAI semantics (such as Entry, Agent, Tool, and ReAct Step).

  • Create custom spans by using the OpenTelemetry SDK.

  • Add custom attributes to a span.

  • Get the current trace context and print the traceId.

Key concepts

  • span: A single operation within a request, such as a large language model (LLM) call or a tool execution.

  • SpanContext: The context of a request trace, which contains information such as traceId and spanId.

  • attribute: An additional field on a span used to record key information, such as a model name or token usage.

  • Handler: The ExtendedTelemetryHandler provided by loongsuite-util-genai, used to create Spans that conform to GenAI semantic conventions.

The following table lists all span types supported by loongsuite-util-genai. This topic focuses on how to use Entry, Agent, Tool, and ReAct Step spans. For detailed information about other types such as Embedding, Retrieval, Rerank, and Memory, see the complete loongsuite-util-genai documentation.

Span type

Operation name

Description

Entry

enter

The application entry point, containing the session ID, user ID, and full application interaction details.

Agent

invoke_agent {name}

An Agent invocation, aggregating token usage.

Tool

execute_tool {name}

A tool or function execution.

Step

react

A marker for a single ReAct iteration.

LLM

chat {model}

An LLM chat, typically captured automatically by the agent.

Embedding

embeddings {model}

A vector embedding operation.

Retriever

retrieval {data_source}

A retrieval operation (for RAG).

Reranker

rerank {model}

A rerank operation.

Memory

memory {operation}

A memory read/write operation.

The following sections provide step-by-step instructions and code snippets for instrumenting each type of span. A complete, runnable code example is available in the Appendix at the end of this document.

Important

You must obtain the handler instance by using get_extended_telemetry_handler() instead of directly instantiating TelemetryHandler. The ARMS agent is compatible only with get_extended_telemetry_handler(). Directly instantiating TelemetryHandler may cause environment variable compatibility issues.

Important

When adding custom instrumentation, you must follow the semantic conventions defined in LLM Trace Field Definitions. AI Application Observability features, such as token statistics and session analysis, rely on these conventions for data rendering. If span attributes do not follow these conventions, the related data may not display correctly in the console.

1. Get the handler and tracer

Use get_extended_telemetry_handler() to obtain the singleton Handler from loongsuite-util-genai, and get_tracer(__name__) to obtain the Tracer from the OpenTelemetry SDK. They are used to create GenAI semantic Spans and custom business Spans, respectively.

from opentelemetry.util.genai.extended_handler import get_extended_telemetry_handler
from opentelemetry.util.genai.extended_types import (
    ExecuteToolInvocation,
    InvokeAgentInvocation,
)
from opentelemetry.util.genai._extended_common import EntryInvocation, ReactStepInvocation
from opentelemetry.util.genai.types import Error, InputMessage, OutputMessage, Text
from opentelemetry.trace import get_tracer

handler = get_extended_telemetry_handler()
tracer = get_tracer(__name__)

The handler supports two usage patterns:

  • Context manager (with handler.entry(inv), etc.): This is the recommended method. It automatically manages the span lifecycle.

  • start/stop/fail API (handler.start_entry(inv) / handler.stop_entry(inv) / handler.fail_entry(inv, error)): Suitable for scenarios such as asynchronous, callback, or streaming where you cannot use with statements.

2. Create an Entry span

At the request entry point, create an Entry Span. Include the session_id and user_id, and record the user input by using input_messages. After the streaming response is complete, concatenate the output, set it to output_messages, and then call stop_entry to end the Span. This allows you to directly see the full input and final output of the request in the console.

entry_inv = EntryInvocation(
    session_id=req.session_id or str(uuid.uuid4()),
    user_id=req.user_id or "anonymous",
    input_messages=[
        InputMessage(role="user", parts=[Text(content=req.topic)]),
    ],
)

def event_generator():
    handler.start_entry(entry_inv)
    output_chunks: list[str] = [ ]

    try:
        for chunk in run_agent_stream(topic=req.topic):
            output_chunks.append(chunk)
            yield f"data: {json.dumps({'content': chunk}, ensure_ascii=False)}\n\n"
        yield "data: [DONE]\n\n"
    except Exception as exc:
        handler.fail_entry(entry_inv, Error(message=str(exc), type=type(exc)))
        yield f"data: {json.dumps({'error': str(exc)}, ensure_ascii=False)}\n\n"
        return
    entry_inv.output_messages = [
        OutputMessage(
            role="assistant",
            parts=[Text(content="".join(output_chunks))],
            finish_reason="stop",
        ),
    ]
    handler.stop_entry(entry_inv)

3. Create an Agent span

Use start_invoke_agent to create an Agent Span that records the agent name, model, and description. The Agent Span is the root GenAI Span of the entire trace, and all subsequent ReAct Step, LLM call, and Tool call spans are its child spans.

invocation = InvokeAgentInvocation(
    provider="dashscope",
    agent_name="TechContentAgent",
    agent_description="Technical content generation assistant",
    request_model="qwen-plus",
)
total_input_tokens = 0
total_output_tokens = 0

handler.start_invoke_agent(invocation)
try:
    # ... Core agent logic (ReAct loop) ...

    invocation.input_tokens = total_input_tokens
    invocation.output_tokens = total_output_tokens
    handler.stop_invoke_agent(invocation)
except Exception:
    handler.fail_invoke_agent(invocation, Error(message="agent failed", type=RuntimeError))
    raise

After an agent completes its execution, it writes the accumulated total_input_tokens and total_output_tokens to the Agent span to aggregate token metrics.

4. Create a ReAct Step span

Create a Step Span for each ReAct inference iteration and pass the current round. When an iteration ends, set finish_reason to continue if the iteration needs to continue, or to stop if it is the final answer. In the example, the LLM calls in each iteration are automatically instrumented by the ARMS agent, so you do not need to create them manually.

step_inv = ReactStepInvocation(round=iteration + 1)
handler.start_react_step(step_inv)

try:
    response = client.chat.completions.create(
        model="qwen-plus",
        messages=messages,
        tools=TOOL_DEFINITIONS,
    )
    # ... Process the response ...

    step_inv.finish_reason = "stop"  # or "continue"
    handler.stop_react_step(step_inv)
except Exception:
    handler.fail_react_step(step_inv, Error(message="step failed", type=RuntimeError))
    raise

5. Create a Tool span

When the model returns a tool call, create a Tool Span for each tool_call, recording the tool name, call ID, input parameters, and result.

tool_inv = ExecuteToolInvocation(
    tool_name=tool_call.function.name,
    tool_call_id=tool_call.id,
    tool_call_arguments=tool_call.function.arguments,
    tool_type="function",
)
handler.start_execute_tool(tool_inv)
try:
    result = dispatch_tool(tool_name, tool_call.function.arguments)
    tool_inv.tool_call_result = result
except Exception as exc:
    handler.fail_execute_tool(tool_inv, error=Error(message=str(exc), type=type(exc)))
    raise
else:
    handler.stop_execute_tool(tool_inv)

6. Create custom spans with the OpenTelemetry SDK

In addition to the GenAI semantic Spans provided by loongsuite-util-genai, you can use the tracer.start_as_current_span() method from the OpenTelemetry SDK to create custom business Spans and use them together with GenAI Spans.

The following examples show two typical use cases for custom spans:

duplicate_tool_detection

This process is executed before each ReAct iteration. It uses a Counter to count the number of times each tool is called and writes the detection results to the gen_ai.loop_detection.* attribute. If a loop is detected, a system prompt is appended to the message list to guide the model to avoid repetition.

def _check_duplicate_tools(
    tool_usage_counter: Counter,
    messages: list[dict[str, Any]],
) -> None:
    duplicates = [name for name, count in tool_usage_counter.items() if count > 1]
    has_duplicates = len(duplicates) > 0

    with tracer.start_as_current_span("duplicate_tool_detection") as span:
        span.set_attributes({
            "gen_ai.loop_detection.detected": has_duplicates,
            "gen_ai.loop_detection.duplicate_tools": str(duplicates) if has_duplicates else "[ ]",
            "gen_ai.loop_detection.total_calls": sum(tool_usage_counter.values()),
            "gen_ai.loop_detection.unique_tools": len(tool_usage_counter),
        })

    if has_duplicates:
        details = ", ".join(f"{n}({tool_usage_counter[n]} calls)" for n in duplicates)
        messages.append({
            "role": "system",
            "content": f"[System Hint] Duplicate tool calls detected: {details}. Please avoid repeating the call.",
        })

response_loop_detection

This process is executed after each LLM response. It compares the text similarity between the current response and the previous one and writes metrics such as is_loop and overlap_ratio to the Span attributes. If a loop is detected (the text is identical or the overlap ratio exceeds 80%), the finish_reason is set to loop_detected and the Agent is terminated early.

def _check_response_loop(
    current_content: str | None,
    previous_content: str | None,
) -> bool:
    cur = (current_content or "").strip()
    prev = (previous_content or "").strip()

    with tracer.start_as_current_span("response_loop_detection") as span:
        if not prev or not cur:
            span.set_attributes({
                "gen_ai.loop_detection.is_loop": False,
                "gen_ai.loop_detection.reason": "no_text_content",
            })
            return False

        is_identical = cur == prev
        longer = max(len(cur), len(prev))
        common_prefix_len = sum(1 for a, b in zip(cur, prev) if a == b)
        overlap_ratio = common_prefix_len / longer if longer > 0 else 0.0
        is_loop = is_identical or overlap_ratio > 0.8

        span.set_attributes({
            "gen_ai.loop_detection.is_loop": is_loop,
            "gen_ai.loop_detection.is_identical": is_identical,
            "gen_ai.loop_detection.overlap_ratio": round(overlap_ratio, 2),
            "gen_ai.loop_detection.current_length": len(cur),
            "gen_ai.loop_detection.previous_length": len(prev),
        })
        return is_loop
Note

Because custom spans do not follow GenAI semantic conventions, you must switch to the All Views in the trace view of the console to see them.

View monitoring details

  1. Log in to the Cloud Monitor 2.0 console, select the target workspace, and then choose All Features > AI Application Observability in the left-side navigation pane.

  2. On the AI Applications page, you can see your integrated applications. Click an Application Name to view detailed monitoring data.

Instrumentation results

1. Entry span details

The Entry span displays key attributes like gen_ai.session.id and gen_ai.user.id. When set at the function entry point, these attributes are automatically propagated to LLM, Tool, and other spans, allowing for analysis based on session and user information. The Entry span also contains gen_ai.input.messages (user input) and gen_ai.output.messages (final output), making it easy to view the entire interaction content for the request in the console.

image

2. Agent span details

The Agent span shows the agent's defined name and description. It also displays the aggregated token usage statistics at the agent level, as calculated in the example code.

image.png

3. Tool span details

The Tool span shows the tool's name, its input parameters, and the result of the tool call.image.png

4. LLM span details

In the example code, LLM spans are not manually instrumented. Because they are OpenAI calls, they are automatically captured by the agent. You can clearly observe the complete context information and token consumption for the LLM call.

image.png

5. Custom span details

The example code creates two custom business spans with the OpenTelemetry SDK to demonstrate how to combine custom instrumentation with GenAI semantic spans. Since these custom spans are not part of the GenAI semantics, you must switch to All Views to see them.

  • duplicate_tool_detection: This span is created before each ReAct iteration to detect if the agent is stuck in a loop of repeated tool calls. The span attributes record whether duplicates were detected, the list of duplicate tools, the total number of calls, and the number of unique tools. This helps you quickly diagnose tool call loop issues in ARMS.

    image.png

  • response_loop_detection: This span is created after each LLM response to detect if the model is repeatedly returning highly similar content. The span attributes record whether a loop was identified, if the text is identical, the overlap ratio, and the lengths of the current and previous responses. This helps troubleshoot abnormal scenarios where the model is stuck in a repetitive output loop.

    image.png

References

Custom instrumentation for other languages

Appendix

Complete code example

app.py

import json
import uuid

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from opentelemetry.util.genai.extended_handler import get_extended_telemetry_handler
from opentelemetry.util.genai._extended_common import EntryInvocation
from opentelemetry.util.genai.types import Error, InputMessage, OutputMessage, Text

from agent import run_marketing_agent_stream

app = FastAPI(title="Cloud Product Technical Content Generation Assistant")


class GenerateRequest(BaseModel):
    content_type: str = "blog"
    product: str = "CMS"
    target_audience: str = "Operations Engineer"
    topic: str = ""
    session_id: str = ""
    user_id: str = ""


@app.post("/api/v1/generate/stream")
async def generate_stream(req: GenerateRequest) -> StreamingResponse:
    handler = get_extended_telemetry_handler()

    user_prompt = (
        f"Content type: {req.content_type}, Product: {req.product}, "
        f"Target audience: {req.target_audience}, Topic: {req.topic}"
    )

    entry_inv = EntryInvocation(
        session_id=req.session_id or str(uuid.uuid4()),
        user_id=req.user_id or "anonymous",
        input_messages=[
            InputMessage(role="user", parts=[Text(content=user_prompt)]),
        ],
    )

    def event_generator():
        handler.start_entry(entry_inv)
        output_chunks: list[str] = []
        try:
            for chunk in run_marketing_agent_stream(
                content_type=req.content_type,
                product=req.product,
                target_audience=req.target_audience,
                topic=req.topic,
            ):
                output_chunks.append(chunk)
                yield f"data: {json.dumps({'content': chunk}, ensure_ascii=False)}\n\n"
            yield "data: [DONE]\n\n"
        except Exception as exc:
            handler.fail_entry(
                entry_inv,
                Error(message=str(exc), type=type(exc)),
            )
            yield f"data: {json.dumps({'error': str(exc)}, ensure_ascii=False)}\n\n"
            return
        entry_inv.output_messages = [
            OutputMessage(
                role="assistant",
                parts=[Text(content="".join(output_chunks))],
                finish_reason="stop",
            ),
        ]
        handler.stop_entry(entry_inv)

    return StreamingResponse(event_generator(), media_type="text/event-stream")


@app.get("/health")
async def health():
    return {"status": "ok"}


if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

agent.py

import os
from collections import Counter
from collections.abc import Generator
from typing import Any

from openai import OpenAI
from opentelemetry.trace import get_tracer
from opentelemetry.util.genai.extended_handler import get_extended_telemetry_handler
from opentelemetry.util.genai.extended_types import (
    ExecuteToolInvocation,
    InvokeAgentInvocation,
)
from opentelemetry.util.genai._extended_common import ReactStepInvocation
from opentelemetry.util.genai.types import Error

from tools import TOOL_DEFINITIONS, dispatch_tool

tracer = get_tracer(__name__)

MODEL_NAME = os.environ.get("MODEL_NAME", "qwen-plus")
BASE_URL = os.environ.get(
    "OPENAI_BASE_URL",
    "https://dashscope.aliyuncs.com/compatible-mode/v1",
)
API_KEY = os.environ.get("DASHSCOPE_API_KEY", "")

MAX_ITERATIONS = 10

SYSTEM_PROMPT = """\
You are a technical content generation assistant for Alibaba Cloud Monitor 2.0 (CMS 2.0).
You generate high-value technical content for operations engineers and architects, using professional language they are familiar with.

Key principles: Adjust the perspective and language style based on the target audience.
- Operations Engineers: Focus on hands-on steps, troubleshooting efficiency, and tool integration. Use common terms from daily operations.
- Architects: Focus on architectural design, standardization, and scalability. Use in-depth, professional language.

You must strictly follow these steps, calling the corresponding tool for each step:

Step 1: Use the `search_product_knowledge` tool to search for CMS product information (features or comparison).
Step 2: Use the `get_audience_profile` tool to get the profile and pain points of the target audience.
Step 3: Use the `get_industry_cases` tool to find relevant industry case studies.
Step 4: If the content is a blog post, use the `generate_seo_keywords` tool to get SEO keywords.
Step 5: Generate the content based on the collected information.
Step 6: Use the `check_content_compliance` tool to check for compliance.

Content requirements: Focus on product advantages and audience pain points, cite case study data, write in Chinese, and keep it under 800 characters."""


def _build_client() -> OpenAI:
    return OpenAI(base_url=BASE_URL, api_key=API_KEY)


def _build_user_message(
    content_type: str,
    product: str,
    target_audience: str,
    topic: str,
) -> str:
    type_labels = {
        "blog": "a hands-on technical blog for front-line technical staff",
        "email": "a technical recommendation email for a specific target audience",
        "case_study": "a practical customer case study for reference",
        "comparison": "a product comparison analysis to assist with technology selection",
    }
    label = type_labels.get(content_type, content_type)
    return (
        f"Please generate {label} for the {product} product.\n"
        f"Target audience: {target_audience}\n"
        f"Topic/Direction: {topic}\n\n"
        f"Please write from the perspective and in the language familiar to the target audience in their daily work. "
        f"Strictly follow the steps to call tools and collect information before generating the content."
    )


def _check_duplicate_tools(
    tool_usage_counter: Counter,
    messages: list[dict[str, Any]],
) -> list[str]:
    duplicates = [name for name, count in tool_usage_counter.items() if count > 1]
    total_calls = sum(tool_usage_counter.values())
    has_duplicates = len(duplicates) > 0

    duplicate_details = (
        ", ".join(f"{name}({tool_usage_counter[name]} calls)" for name in duplicates)
        if has_duplicates
        else "none"
    )

    with tracer.start_as_current_span("duplicate_tool_detection") as detect_span:
        detect_span.set_attributes({
            "gen_ai.loop_detection.detected": has_duplicates,
            "gen_ai.loop_detection.duplicate_tools": str(duplicates) if has_duplicates else "[]",
            "gen_ai.loop_detection.details": duplicate_details,
            "gen_ai.loop_detection.total_calls": total_calls,
            "gen_ai.loop_detection.unique_tools": len(tool_usage_counter),
        })

    if not has_duplicates:
        return []

    hint_message = (
        f"[System Hint] The following tools were called multiple times: {duplicate_details}. "
        f"Avoid calling the same tools again. Use the information you already have to proceed to the next steps."
    )
    messages.append({"role": "system", "content": hint_message})

    return duplicates


def _check_response_loop(
    current_content: str | None,
    previous_content: str | None,
) -> bool:
    """Compare consecutive LLM text responses to detect stuck loops."""
    cur = (current_content or "").strip()
    prev = (previous_content or "").strip()

    with tracer.start_as_current_span("response_loop_detection") as span:
        if not prev or not cur:
            span.set_attributes({
                "gen_ai.loop_detection.is_loop": False,
                "gen_ai.loop_detection.reason": "no_text_content",
            })
            return False

        is_identical = cur == prev

        common_prefix_len = 0
        for a, b in zip(cur, prev):
            if a == b:
                common_prefix_len += 1
            else:
                break
        longer = max(len(cur), len(prev))
        overlap_ratio = common_prefix_len / longer if longer > 0 else 0.0
        is_loop = is_identical or overlap_ratio > 0.8

        span.set_attributes({
            "gen_ai.loop_detection.is_loop": is_loop,
            "gen_ai.loop_detection.is_identical": is_identical,
            "gen_ai.loop_detection.overlap_ratio": round(overlap_ratio, 2),
            "gen_ai.loop_detection.current_length": len(cur),
            "gen_ai.loop_detection.previous_length": len(prev),
        })
        return is_loop


def run_marketing_agent_stream(
    content_type: str,
    product: str,
    target_audience: str,
    topic: str,
) -> Generator[str, None, None]:
    client = _build_client()
    handler = get_extended_telemetry_handler()

    user_message = _build_user_message(content_type, product, target_audience, topic)

    invocation = InvokeAgentInvocation(
        provider="dashscope",
        agent_name="TechContentAgent",
        agent_description="A cloud product content generation assistant for different technical roles",
        request_model=MODEL_NAME,
    )

    total_input_tokens = 0
    total_output_tokens = 0
    tool_usage_counter: Counter = Counter()
    previous_content: str | None = None

    handler.start_invoke_agent(invocation)
    try:
        messages: list[dict[str, Any]] = [
            {"role": "system", "content": SYSTEM_PROMPT},
            {"role": "user", "content": user_message},
        ]

        for iteration in range(MAX_ITERATIONS):
            _check_duplicate_tools(tool_usage_counter, messages)

            step_inv = ReactStepInvocation(round=iteration + 1)
            handler.start_react_step(step_inv)
            try:
                response = client.chat.completions.create(
                    model=MODEL_NAME,
                    messages=messages,
                    tools=TOOL_DEFINITIONS,
                    temperature=0.7,
                )

                choice = response.choices[0]
                message = choice.message

                if response.usage:
                    total_input_tokens += response.usage.prompt_tokens
                    total_output_tokens += response.usage.completion_tokens

                current_content = message.content
                if _check_response_loop(current_content, previous_content):
                    step_inv.finish_reason = "loop_detected"
                    handler.stop_react_step(step_inv)
                    if current_content:
                        yield current_content
                    break
                if (current_content or "").strip():
                    previous_content = current_content

                if message.tool_calls:
                    messages.append(message.model_dump())

                    for tool_call in message.tool_calls:
                        tool_name = tool_call.function.name
                        tool_args = tool_call.function.arguments
                        tool_usage_counter[tool_name] += 1

                        tool_inv = ExecuteToolInvocation(
                            tool_name=tool_name,
                            tool_call_id=tool_call.id,
                            tool_call_arguments=tool_args,
                            tool_type="function",
                        )

                        handler.start_execute_tool(tool_inv)
                        try:
                            result = dispatch_tool(tool_name, tool_args)
                            tool_inv.tool_call_result = result
                        except Exception as exc:
                            handler.fail_execute_tool(
                                tool_inv,
                                error=Error(message=str(exc), type=type(exc)),
                            )
                            raise
                        else:
                            handler.stop_execute_tool(tool_inv)

                        messages.append({
                            "role": "tool",
                            "tool_call_id": tool_call.id,
                            "content": result,
                        })

                    step_inv.finish_reason = "continue"
                    handler.stop_react_step(step_inv)
                    continue

                if choice.finish_reason == "stop" or message.content:
                    if message.content:
                        yield message.content

                    step_inv.finish_reason = "stop"
                    handler.stop_react_step(step_inv)
                    break
            except Exception:
                handler.fail_react_step(
                    step_inv, Error(message="step failed", type=RuntimeError)
                )
                raise

        invocation.input_tokens = total_input_tokens
        invocation.output_tokens = total_output_tokens
        handler.stop_invoke_agent(invocation)
    except Exception:
        handler.fail_invoke_agent(
            invocation, Error(message="agent failed", type=RuntimeError)
        )
        raise

tools.py

import json
from typing import Any

PRODUCT_KNOWLEDGE: dict[str, dict[str, str]] = {
    "CMS": {
        "features": (
            "Cloud Monitor 2.0 (CMS 2.0) is a one-stop observability platform from Alibaba Cloud, "
            "integrating the capabilities of SLS, CMS, and ARMS:\n"
            "1. Full-stack unified monitoring: Unified view of metrics, traces, logs, and events.\n"
            "2. UModel unified modeling: Automatic resource correlation and observability graph construction.\n"
            "3. AI-powered analytics: Anomaly detection, alert noise reduction, and a conversational Ops Copilot.\n"
            "4. Open and compatible: Supports Prometheus, Grafana, and OpenTelemetry ecosystems.\n"
            "5. AI Application Observability: LLM trace tracking, token statistics, and model performance analysis."
        ),
        "comparison": (
            "Cloud Monitor 2.0 vs. traditional monitoring solutions:\n"
            "1. Data integration: Traditional solutions require switching between 3-5 consoles; CMS 2.0 provides a one-stop integrated experience.\n"
            "2. AI capabilities: Traditional static threshold alerts have a false positive rate of over 30%; CMS 2.0 reduces noise by 80% with AI.\n"
            "3. Observability graph: CMS 2.0 automatically builds a dependency graph through UModel.\n"
            "4. AI Application Observability: Not supported by traditional solutions; CMS 2.0 provides native support for the full LLM/Agent trace."
        ),
    },
}
AUDIENCE_PROFILES: dict[str, dict[str, str]] = {
    "Operations Engineer": {
        "role": "Operations Engineer / SRE",
        "pain_points": (
            "1. Long troubleshooting times: Locating issues in microservices architectures takes an average of 30-60 minutes.\n"
            "2. Alert storms: A surge in alerts during promotional events makes it difficult to prioritize.\n"
            "3. Tool fragmentation: Need to switch between 5-6 different monitoring tools.\n"
            "4. AI operations blind spots: Lack of transparency in large language model traces."
        ),
        "interests": "End-to-end tracing, root cause analysis, alert noise reduction, Prometheus/Grafana integration",
        "decision_factors": "Technical maturity, community activity, learning curve, integration difficulty",
    },
    "Architect": {
        "role": "Architect / Technical Expert",
        "pain_points": (
            "1. Observability challenges in hybrid architectures with microservices and AI Agents.\n"
            "2. Lack of objective comparison for selecting between open-source self-built and commercial solutions.\n"
            "3. Fragmented monitoring solutions and data formats across different teams.\n"
            "4. Uncertainty if the current solution can support 10x business growth."
        ),
        "interests": "Architectural design, OpenTelemetry standardization, unified data models, scalability",
        "decision_factors": "Architectural advancement, standardization level, scalability, openness, community ecosystem",
    },
}

INDUSTRY_CASES: dict[str, list[dict[str, str]]] = {
    "Finance": [
        {
            "company": "A leading joint-stock bank",
            "scenario": (
                "Observability upgrade for a core trading system: Covering over 200 microservices "
                "with end-to-end tracing for 50 million daily transactions."
            ),
            "results": (
                "Reduced fault MTTR from 45 minutes to 8 minutes, an 82% improvement; "
                "Increased alert accuracy from 60% to 95%; "
                "Improved operational efficiency by 3x and passed Level 3 classified protection compliance checks on the first attempt."
            ),
        },
    ],
    "Internet": [
        {
            "company": "A social media platform",
            "scenario": (
                "Full-stack observability for an application with tens of millions of DAUs: Covering app-side experience monitoring -> "
                "CDN -> API Gateway -> 2000+ microservices -> Databases/Caches."
            ),
            "results": (
                "Reduced user-side crash rate from 0.5% to 0.08%; "
                "Optimized API P99 latency by 40%; "
                "Saved over 100,000 RMB per month in monitoring costs (compared to a self-built solution)."
            ),
        },
    ],
}

COMPLIANCE_RULES: dict[str, dict[str, Any]] = {
    "product_names": {
        "incorrect": {
            "Aliyun": "Alibaba Cloud",
            "CMS2.0": "CMS 2.0",
            "Cloud Monitor2.0": "Cloud Monitor 2.0",
        },
    },
    "claim_rules": [
        "Data citations must include the source",
        "Avoid absolute terms (e.g., 'the best', 'the only', 'the first')",
        "Use objective data when comparing with competitors",
    ],
}

SEO_KEYWORDS_DB: dict[str, dict[str, Any]] = {
    "Observability": {
        "primary": "Observability",
        "long_tail": ["cloud-native observability solution", "microservices observability platform selection"],
        "search_volume": "High",
    },
    "AI Observability": {
        "primary": "AI Application Observability",
        "long_tail": ["LLM trace tracking", "AI Agent observability"],
        "search_volume": "Medium (rapidly growing)",
    },
}





def search_product_knowledge(product: str, aspect: str) -> str:
    product_key = "CMS"
    product_data = PRODUCT_KNOWLEDGE.get(product_key)
    if not product_data:
        available = ", ".join(PRODUCT_KNOWLEDGE.keys())
        return f"Knowledge base for product '{product}' not found. Available products: {available}"

    aspect_lower = aspect.lower()
    aspect_data = product_data.get(aspect_lower)
    if not aspect_data:
        available = ", ".join(product_data.keys())
        return f"Information about '{aspect}' for '{product}' not found. Available aspects: {available}"

    return f"[{product} - {aspect}]\n{aspect_data}"


def get_audience_profile(audience_type: str) -> str:
    profile = AUDIENCE_PROFILES.get(audience_type)
    if not profile:
        available = ", ".join(AUDIENCE_PROFILES.keys())
        return f"Audience type '{audience_type}' not found. Available types: {available}"

    return (
        f"Audience Profile — {profile['role']}\n\n"
        f"Core Pain Points:\n{profile['pain_points']}\n\n"
        f"Areas of Interest: {profile['interests']}\n\n"
        f"Decision Factors: {profile['decision_factors']}"
    )


def get_industry_cases(industry: str) -> str:
    cases = INDUSTRY_CASES.get(industry)
    if not cases:
        available = ", ".join(INDUSTRY_CASES.keys())
        return f"Case studies for industry '{industry}' not found. Available industries: {available}"

    parts: list[str] = [f"[{industry} Industry Case Studies]\n"]
    for i, case in enumerate(cases, 1):
        parts.append(
            f"Case {i}: {case['company']}\n"
            f"  Scenario: {case['scenario']}\n"
            f"  Results: {case['results']}"
        )
    return "\n\n".join(parts)


def check_content_compliance(content_type: str, key_claims: str) -> str:
    issues: list[str] = []

    for wrong, correct in COMPLIANCE_RULES["product_names"]["incorrect"].items():
        if wrong in key_claims and correct not in key_claims:
            issues.append(f"Product name '{wrong}' should be corrected to '{correct}'")

    for word in ("best", "only", "first", "strongest"):
        if word in key_claims:
            issues.append(f"Contains absolute term '{word}'. Suggest replacing with an objective description.")

    rules_text = "\n".join(
        f"  {i+1}. {rule}"
        for i, rule in enumerate(COMPLIANCE_RULES["claim_rules"])
    )

    result = "Compliance Check Results:\n\n"
    if issues:
        result += "Issues Found:\n" + "\n".join(f"  - {i}" for i in issues) + "\n\n"
    else:
        result += "No apparent compliance issues found.\n\n"
    result += f"Compliance Rules:\n{rules_text}"
    return result


def generate_seo_keywords(topic: str) -> str:
    topic_lower = topic.lower()
    matched: list[dict[str, Any]] = []

    for key, data in SEO_KEYWORDS_DB.items():
        if key.lower() in topic_lower or topic_lower in key.lower() or any(
            w in topic_lower for w in key.lower().split() if len(w) > 1
        ):
            matched.append({"keyword": key, **data})

    if not matched:
        all_keywords = list(SEO_KEYWORDS_DB.keys())
        return (
            f"No directly matching keyword data found for '{topic}'.\n"
            f"Suggested keyword directions: {', '.join(all_keywords)}\n"
            f"General SEO advice: Include core keywords in the title, "
            f"use long-tail keywords in H2/H3 headings, and aim for content length of 2000+ words."
        )

    parts: list[str] = [f"SEO Keyword Analysis — '{topic}':\n"]
    for item in matched:
        long_tail = "\n".join(f"    - {kw}" for kw in item["long_tail"])
        parts.append(
            f"Primary Keyword: {item['primary']}\n"
            f"  Search Volume: {item['search_volume']}\n"
            f"  Long-tail Keywords:\n{long_tail}"
        )
    return "\n\n".join(parts)


TOOL_REGISTRY: dict[str, Any] = {
    "search_product_knowledge": search_product_knowledge,
    "get_audience_profile": get_audience_profile,
    "get_industry_cases": get_industry_cases,
    "check_content_compliance": check_content_compliance,
    "generate_seo_keywords": generate_seo_keywords,
}


def dispatch_tool(name: str, arguments: str) -> str:
    func = TOOL_REGISTRY.get(name)
    if not func:
        return f"Unknown tool: {name}"
    try:
        kwargs = json.loads(arguments)
    except json.JSONDecodeError:
        return f"Failed to parse tool arguments: {arguments}"
    return func(**kwargs)


TOOL_DEFINITIONS: list[dict[str, Any]] = [
    {
        "type": "function",
        "function": {
            "name": "search_product_knowledge",
            "description": "Searches the CMS product knowledge base for information on features or competitor comparisons.",
            "parameters": {
                "type": "object",
                "properties": {
                    "product": {
                        "type": "string",
                        "description": "The product name",
                        "enum": ["CMS"],
                    },
                    "aspect": {
                        "type": "string",
                        "description": "The aspect to query",
                        "enum": ["features", "comparison"],
                    },
                },
                "required": ["product", "aspect"],
            },
        },
    },
    {
        "type": "function",
        "function": {
            "name": "get_audience_profile",
            "description": "Gets the profile of the target audience, including pain points, areas of interest, and decision factors.",
            "parameters": {
                "type": "object",
                "properties": {
                    "audience_type": {
                        "type": "string",
                        "description": "The type of target audience",
                        "enum": ["Operations Engineer", "Architect"],
                    },
                },
                "required": ["audience_type"],
            },
        },
    },
    {
        "type": "function",
        "function": {
            "name": "get_industry_cases",
            "description": "Gets industry customer success stories, including scenarios and performance data.",
            "parameters": {
                "type": "object",
                "properties": {
                    "industry": {
                        "type": "string",
                        "description": "The target industry",
                        "enum": ["Finance", "Internet"],
                    },
                },
                "required": ["industry"],
            },
        },
    },
    {
        "type": "function",
        "function": {
            "name": "check_content_compliance",
            "description": "Checks content for compliance, including product name standards and promotional language.",
            "parameters": {
                "type": "object",
                "properties": {
                    "content_type": {
                        "type": "string",
                        "description": "The type of content",
                        "enum": ["blog", "case_study", "comparison"],
                    },
                    "key_claims": {
                        "type": "string",
                        "description": "Key promotional points and data citations",
                    },
                },
                "required": ["content_type", "key_claims"],
            },
        },
    },
    {
        "type": "function",
        "function": {
            "name": "generate_seo_keywords",
            "description": "Generates SEO keywords based on a topic. Call this when generating a blog post.",
            "parameters": {
                "type": "object",
                "properties": {
                    "topic": {
                        "type": "string",
                        "description": "The main topic or core keywords of the article",
                    },
                },
                "required": ["topic"],
            },
        },
    },
]

requirements.txt

openai
fastapi
uvicorn[standard]
loongsuite-util-genai