All Products
Search
Document Center

Cloud Monitor:Dataset

Last Updated:Mar 28, 2026

Dataset from AgentLoop is a new type of data storage designed for AI applications. Built on traditional log storage, it provides full CRUD capabilities, a flexible schema, vector search, and multi-dimensional analysis. It transforms AI runtime data from read-only logs into manageable assets. Dataset manages the entire data lifecycle for AI applications, including training data management, evaluation dataset construction, bad case tracking, and model regression testing.

Background

Comparison with SLS Logstore

Feature

SLS Logstore

Dataset

Data model

Append-only; data is immutable once written.

Full CRUD support.

Schema

Flexible custom index configuration.

Custom schema. Supported types: text, long, double, and json.

Search capabilities

full-text search + SQL analysis

full-text search + semantic search + SQL analysis + hybrid queries

Vector capabilities

Supports an embedding vector index for semantic similarity search.

Built-in embedding vector index for semantic similarity search.

Data modification

Not supported.

Supports updates and deletions by ID.

Use cases

Log collection, monitoring and alerting, and audit and compliance.

AI data management, evaluation benchmarks, training data, and bad case tracking.

Key capabilities

Capability

Description

Custom schema

Custom fields support the text, long, double, and json types, where the json type supports indexing of nested sub-fields.

Vector search

Enable vector indexing for a text field by configuring an embedding model. This enables semantic similarity search.

Full CRUD

Execute INSERT, UPDATE, and DELETE operations using standard SQL. The data is mutable.

Multidimensional search

Combine full-text search, semantic search, and SQL analysis to perform powerful hybrid queries.

Version traceability

Each record is automatically assigned a unique ID to support traceability, data export, and regression testing.

Procedure

Prerequisites

Install the SDK

pip install alibabacloud-cms20240330-inner==6.0.8

Configure credentials

Set using environment variables or a .env file:

export ALIBABA_CLOUD_ACCESS_KEY_ID="your-access-key-id"
export ALIBABA_CLOUD_ACCESS_KEY_SECRET="your-access-key-secret"
export ALIBABA_CLOUD_ENDPOINT="cms.cn-shanghai.aliyuncs.com"
export ALIBABA_CLOUD_CMS_WORKSPACE="your-workspace"

Initialize the client

from alibabacloud_cms20240330.client import Client
from alibabacloud_tea_openapi.models import Config

config = Config(
    access_key_id="your-access-key-id",
    access_key_secret="your-access-key-secret",
    endpoint="cms.cn-shanghai.aliyuncs.com",
)
client = Client(config)
workspace = "your-workspace"

Create a dataset

Schema field types

A dataset supports a user-defined schema with the following field types:

Type

Description

Optional capabilities

Example field

text

Supports full-text and semantic search.

chn: Enables Chinese word segmentation; embedding: Enables the vector index.

question, output

long

Supports numeric range queries.

input_tokens, latency_ms

double

Supports numeric range queries.

score

json

Supports indexing for nested fields.

json_keys: Defines an index for sub-fields.

metadata

Vector index: Setting the embedding property on a text field enables the vector index and semantic search.

Built-in id primary key: A dataset automatically generates a unique id primary key for each record. All UPDATE and DELETE operations must use this primary key.

SDK example

from alibabacloud_cms20240330.models import (
    CreateDatasetRequest,
    IndexJsonKey,
    IndexKey,
)

schema = {
    "input": IndexKey(
        type="text",
        chn=True,                          # Enable Chinese word segmentation
        embedding="text-embedding-v4",     # Enable vector index
    ),
    "output": IndexKey(
        type="text",
        chn=True,
        embedding="text-embedding-v4",
    ),
    "model": IndexKey(type="text"),
    "score": IndexKey(type="double"),
    "metadata": IndexKey(
        type="json",
        json_keys={
            "input_tokens": IndexJsonKey(type="long"),
            "output_tokens": IndexJsonKey(type="long"),
        },
    ),
}

request = CreateDatasetRequest(
    dataset_name="my_dataset",
    description="AI Q&A dataset",
    schema=schema,
)
client.create_dataset(workspace, request)

Write data

You can write data to a Dataset by executing INSERT SQL statements using the ExecuteQuery interface.

Insert data (INSERT)

Use the ExecuteQuery interface to run standard SQL INSERT statements.

Single-row insert:

from alibabacloud_cms20240330.models import ExecuteQueryRequest

sql = """
INSERT INTO my_dataset (input, output, model, score)
VALUES (
    'How do I view error logs from the last hour?',
    'Use the query: level:ERROR',
    'qwen-plus',
    0.95
)
"""
request = ExecuteQueryRequest(query=sql, type="SQL")
response = client.execute_query(workspace, "my_dataset", request)
print(f"affected_rows: {response.body.meta.affected_rows}")

Batch insert:

sql = """
INSERT INTO my_dataset (input, output, model, score)
VALUES
    ('Count the API calls for each interface today', 'SELECT api, count(*) ...', 'gpt-4o', 0.88),
    ('Find requests that have timed out', 'latency > 5000 | SELECT ...', 'claude-3.5-sonnet', 0.92)
"""
request = ExecuteQueryRequest(query=sql, type="SQL")
client.execute_query(workspace, "my_dataset", request)

Update data (UPDATE)

Updates require the id primary key. Batch updates are not currently supported:

sql = """
UPDATE my_dataset
SET score = 0.98, output = 'Optimized answer...'
WHERE id = 'your-doc-id'
"""
request = ExecuteQueryRequest(query=sql, type="SQL")
client.execute_query(workspace, "my_dataset", request)

Delete data (DELETE)

Deletes also require the id primary key:

sql = "DELETE FROM my_dataset WHERE id = 'your-doc-id'"
request = ExecuteQueryRequest(query=sql, type="SQL")
client.execute_query(workspace, "my_dataset", request)
Tip: To update or delete a specific record, first run a query to retrieve its id.

Data query

To perform a data query, use the ExecuteQuery interface with type="SQL". You can combine the following four query modes:

Query mode

Syntax

Description

Full-text search

field:keyword

Keyword matching that supports AND, OR, and numeric comparisons.

Semantic search

similarity() / semantic_distance()

Two forms of vector semantic search are available.

SQL analysis

SELECT ... FROM dataset_name ...

Standard SQL queries and statistical analysis.

Hybrid query

<search_condition> | <SQL_statement>

Combines different query modes.

Full-text search

Full-text search uses a keyword matching syntax consistent with the SLS query syntax:

Syntax

Description

Example

field:keyword

Single-field matching

input:错误

field1:v1 AND field2:v2

Combined conditions

model:qwen-plus AND input:日志

field:v1 OR field:v2

OR condition

model:qwen-plus OR model:gpt-4o

field > value

Numeric comparison

score > 0.9

field:v1 AND field > n

Mixed conditions

model:qwen-plus AND score > 0.8

You can use the keyword matching syntax by itself or combine it with SQL using the | pipe character:

from alibabacloud_cms20240330.models import ExecuteQueryRequest

# Full-text search
request = ExecuteQueryRequest(query="input:错误", type="SQL")
response = client.execute_query(workspace, "my_dataset", request)

# Full-text search + SQL
query = "input:错误 | SELECT input, score FROM my_dataset ORDER BY score DESC LIMIT 5"
request = ExecuteQueryRequest(query=query, type="SQL")
response = client.execute_query(workspace, "my_dataset", request)

Semantic search

Semantic search retrieves results based on vector similarity. The corresponding field must have an embedding vector index configured in the schema.

Two forms are available:

Form 1: Search syntax similarity()

Use this in the search condition to the left of the | pipe. You can combine it with the full-text search syntax by using AND or OR.

# Semantic search
request = ExecuteQueryRequest(query="similarity(input, '日志分析') < 0.3", type="SQL")

# Semantic search + full-text search
request = ExecuteQueryRequest(
    query="similarity(input, '日志分析') < 0.3 AND model:qwen-plus",
    type="SQL",
)

# Semantic search + SQL
request = ExecuteQueryRequest(
    query="similarity(input, '日志分析') < 0.3 | SELECT input, score FROM my_dataset ORDER BY score DESC",
    type="SQL",
)

Form 2: SQL function semantic_distance()

Use this in the SQL statement to the right of the | pipe. It can be used in SELECT, WHERE, and ORDER BY clauses.

sql = """
SELECT input, semantic_distance(input, '日志查询统计') AS similarity
FROM my_dataset
WHERE semantic_distance(input, '日志查询统计') < 0.3
ORDER BY semantic_distance(input, '日志查询统计') ASC
"""
request = ExecuteQueryRequest(query=sql, type="SQL")
response = client.execute_query(workspace, "my_dataset", request)

Comparison of the two forms

Dimension

similarity()

semantic_distance()

Position

Left of the | pipe (search condition)

Right of the | pipe (SQL statement)

Purpose

Fast semantic filtering

Precise analysis and sorting

Returns distance value

No

Yes (can be used as a SELECT column)

Combinable with full-text search

Yes (using AND / OR)

Yes (in the SQL WHERE clause)

Typical use cases

Fast recall of semantically related data

rerank, similarity sorting, distance analysis

Threshold recommendations: The threshold value ranges from 0 to 1, where a smaller value indicates higher semantic similarity.

Threshold range

Match level

Scenario

0.1 - 0.2

Strict match

Precise deduplication, near-duplicate search

0.2 - 0.3

Standard match

Related content retrieval

0.3 - 0.5

Loose match

Exploratory search, topic clustering

SQL analysis

Dataset supports standard SQL queries and statistical analysis, including GROUP BY, HAVING, ORDER BY, LIMIT, common table expressions (CTEs), subqueries, and window functions. The SQL engine is based on PrestoSQL syntax. It converts queries to PostgreSQL for server-side execution.

sql = """
SELECT model, count(*) AS total, avg(score) AS avg_score
FROM my_dataset
GROUP BY model
ORDER BY total DESC
"""
request = ExecuteQueryRequest(query=sql, type="SQL")
response = client.execute_query(workspace, "my_dataset", request)

Use the LIMIT offset, count syntax for pagination:

page_size = 10
page = 2
skip = (page - 1) * page_size
# Page 2 (skips 10 records and returns 10 records)
sql = f"SELECT * FROM my_dataset ORDER BY score DESC LIMIT {skip}, {page_size}"
request = ExecuteQueryRequest(query=sql, type="SQL")
response = client.execute_query(workspace, "my_dataset", request)

Supported functions

Supported SQL clauses

Clause

Supported

Description

SELECT

Column selection, expressions, aliases, *, subqueries

FROM

Single-table queries, subqueries (with aliases)

WHERE

Comparison operators, IN / NOT IN, BETWEEN, LIKE, IS NULL, EXISTS, IS DISTINCT FROM

GROUP BY

Basic grouping, GROUPING SETS, ROLLUP, CUBE

HAVING

Filtering on aggregate conditions

ORDER BY

ASC / DESC, NULLS FIRST / NULLS LAST

LIMIT

LIMIT count, LIMIT offset, count

WITH (CTE)

Common table expression

JOIN

JOINs are not supported.

UNION / INTERSECT / EXCEPT

Set operations are not supported.

Aggregate functions

Function

Description

Example

count(*)

Returns the total number of rows.

SELECT count(*) FROM ds

count(column)

Counts rows where the specified column is not null.

SELECT count(score) FROM ds

count(DISTINCT column)

Returns the number of distinct values in a column.

SELECT count(DISTINCT model) FROM ds

sum(column)

Calculates the sum.

SELECT sum(score) FROM ds

avg(column)

Calculates the average.

SELECT avg(score) FROM ds

min(column) / max(column)

Returns the minimum/maximum value.

SELECT min(score), max(score) FROM ds

count_if(condition)

Counts rows that match the specified condition.

SELECT count_if(score > 0.9) FROM ds

approx_percentile(column, p)

Returns the approximate percentile.

SELECT approx_percentile(score, 0.5) FROM ds

JSON functions

Function

Description

Example

json_extract_scalar(col, '$.path')

Extracts a scalar value from a JSON field.

json_extract_scalar(metadata, '$.input_tokens')

json_size(col, '$.path')

Returns the size of a JSON array or object.

json_size(metadata, '$.tags')

Note: The json_extract_scalar function only supports scalar value extraction. json_size does not support wildcard paths such as $.a[*]. The json_extract function is not supported.

Date and time functions

Function

Description

Example

now()

Current time

WHERE __time__ > to_unixtime(now() - interval '1' day)

from_unixtime(epoch)

Converts a Unix timestamp to a time value.

SELECT from_unixtime(__time__)

to_unixtime(timestamp)

Converts a time value to a Unix timestamp.

WHERE __time__ >= to_unixtime(...)

date(timestamp)

Extracts the date.

SELECT date(from_unixtime(__time__))

date_add(unit, n, timestamp)

Adds a specified time interval to a timestamp.

date_add('day', 7, ts)

date_diff(unit, ts1, ts2)

Returns the difference between two dates.

date_diff('day', ts1, ts2)

INTERVAL

Time interval

now() - interval '7' day

Type casting and expressions

Function / Expression

Description

Example

CAST(expr AS type)

Type casting

CAST(score AS varchar)

try_cast(expr AS type)

Safe type casting (returns NULL on failure).

try_cast(val AS bigint)

typeof(expr)

Returns the type name.

typeof(score)

CASE WHEN ... THEN ... ELSE ... END

Conditional expression

BETWEEN ... AND ...

Range check

score BETWEEN 0.8 AND 1.0

ARRAY[...]

Array constructor

ARRAY[1, 2, 3]

Window functions

Function

Description

Example

row_number() OVER (...)

Row number

row_number() OVER (PARTITION BY model ORDER BY score DESC)

rank() OVER (...)

Rank

rank() OVER (PARTITION BY model ORDER BY score DESC)

Other common functions

Function

Description

round(value, n)

Rounds a value.

floor(value)

Rounds down.

length(str)

Returns the string length.

lower(str)

Converts to lowercase.

coalesce(a, b, ...)

Returns the first non-null value.

Hybrid query

You can combine the four query modes using the | pipe character. Place search conditions to the left of the | and the SQL statement to the right.

# Full-text + SQL + semantic_distance
query = """
model:qwen-plus
| SELECT input, output, score
  FROM my_dataset
  WHERE semantic_distance(input, '数据分析') < 0.4
  ORDER BY score DESC
  LIMIT 10
"""
request = ExecuteQueryRequest(query=query, type="SQL")
response = client.execute_query(workspace, "my_dataset", request)
For a complete list of supported SQL functions, clauses, and limitations, see the Dataset product documentation.

Dataset management

Actions

API

Description

Create

client.create_dataset(workspace, CreateDatasetRequest)

Creates a dataset and defines its schema.

Get

client.get_dataset(workspace, dataset_name)

Retrieves details for a dataset, including its schema.

List

client.list_datasets(workspace, ListDatasetsRequest)

Lists all datasets. This operation supports pagination and name filtering.

Update

client.update_dataset(workspace, dataset_name, UpdateDatasetRequest)

Updates the description of a dataset.

Delete

client.delete_dataset(workspace, dataset_name)

Deletes a dataset. This operation is irreversible.

List datasets

from alibabacloud_cms20240330.models import ListDatasetsRequest

request = ListDatasetsRequest(max_results=100)
response = client.list_datasets(workspace, request)
for ds in response.body.datasets:
    print(f"{ds.dataset_name}: {ds.description}")

# Filter by name
request = ListDatasetsRequest(max_results=100, dataset_name="my_dataset")
response = client.list_datasets(workspace, request)

Get dataset

response = client.get_dataset(workspace, "my_dataset")
print(response.body.to_map())  # The response contains details such as the schema and creation time.

Update dataset

from alibabacloud_cms20240330.models import UpdateDatasetRequest

request = UpdateDatasetRequest(description="Updated description")
client.update_dataset(workspace, "my_dataset", request)

Delete dataset

# Caution: Deleting a dataset is irreversible.
client.delete_dataset(workspace, "my_dataset")

Typical scenarios

Scenario 1: Bad case management

Filter low-score samples from production data and update them after human review:

# 1. Filter low-score samples
sql = "SELECT id, input, output, score FROM my_dataset WHERE score < 0.3 ORDER BY score ASC LIMIT 50"
request = ExecuteQueryRequest(query=sql, type="SQL")
response = client.execute_query(workspace, "my_dataset", request)

# 2. Update labels after human review
sql = """
UPDATE my_dataset
SET human_label = 'hallucination', fix_suggestion = 'Needs fact-checking'
WHERE id = 'bad-case-id'
"""
client.execute_query(workspace, "my_dataset", ExecuteQueryRequest(query=sql, type="SQL"))

Scenario 2: Regression testing

Build a benchmark dataset from an existing dataset to compare the performance of different model versions:

# Get the benchmark dataset
sql = "SELECT id, input, expected_output FROM my_dataset WHERE is_baseline = 'true'"
request = ExecuteQueryRequest(query=sql, type="SQL")
baseline = client.execute_query(workspace, "my_dataset", request)

# Run the new model version and compare results
for sample in baseline.body.data:
    new_output = run_new_model(sample["input"])
    score = evaluate(new_output, sample["expected_output"])
    # Record evaluation results...

Scenario 3: Training data export

Filter and export high-quality samples for SFT fine-tuning or post-training with RL:

import json

# Query high-quality samples
sql = """
SELECT input, output FROM my_dataset
WHERE human_label = 'correct' AND score >= 0.9
LIMIT 10000
"""
request = ExecuteQueryRequest(query=sql, type="SQL")
response = client.execute_query(workspace, "my_dataset", request)

# Export to JSONL format
with open("sft_data.jsonl", "w") as f:
    for item in response.body.data:
        f.write(json.dumps({
            "input": item["input"],
            "output": item["output"],
        }, ensure_ascii=False) + "\n")

Limitations

Constraint

Value

Description

Default number of rows returned

1,000

If a LIMIT clause is not specified, a query returns 1,000 rows by default.

Maximum number of rows per query

100,000

The value of the LIMIT clause cannot exceed this number.

Maximum data size per query

100 MB

The data size returned by a single query cannot exceed this value.

J***

Not supported

JOIN operations are not currently supported.

Set operations

Not supported

UNION, INTERSECT, and EXCEPT set operations are not currently supported.

Cross-dataset queries

Not supported

Queries that span multiple datasets are not currently supported.

UPDATE / DELETE operations

Only supported by the id primary key.

Conditional batch updates or deletions are not currently supported.

Unsupported functions

Not supported

approx_distinct, max_by, normalize, transform, sequence, URL functions, and more

Parameter placeholders

Not supported

The use of ? in parameterized queries is not supported.

Best practices

Schema design

  • For fields with high-frequency semantic search requirements, enable the vector index for the embedding.

  • Use the json data type to support dynamically expandable data columns, such as metadata and custom multi-dimensional tagging columns.

Data management

  • Perform a query to obtain the id before you update or delete.

  • Periodically clean low-quality data to maintain a high signal-to-noise ratio.

  • Use a pipeline to automate data cleaning and ingestion.

Query optimization

  • Combine semantic search with conditional filtering to narrow the search scope.

  • Use pagination for large datasets.

  • Prioritize SQL analysis for complex statistical queries.

FAQ

Dataset vs. LogStore

A LogStore provides append-only storage for logs, ideal for data that is not modified after it is written. In contrast, a Dataset supports full CRUD operations, making it suitable for AI use cases that require data revision, labeling, and versioning.

Retrieve data record ID

When you insert a data record, the system automatically generates an id. You can retrieve it by running a query:

sql = "SELECT id, input FROM my_dataset LIMIT 10"
request = ExecuteQueryRequest(query=sql, type="SQL")
response = client.execute_query(workspace, "my_dataset", request)

ID requirement for UPDATE and DELETE

To ensure data consistency and operational security, UPDATE and DELETE operations must use the id primary key. Conditional batch updates and deletes are not supported.

Semantic search threshold

The similarity() and semantic_distance() functions return a vector distance that ranges from 0 to 1. A smaller value indicates a closer match. We recommend the following thresholds:

  • 0.1–0.2: Strict matching.

  • 0.2–0.3: Standard matching.

  • 0.3–0.5: Loose matching.

Automate data ingestion

AgentLoop provides a data processing pipeline to automatically clean, deduplicate, and evaluate data from a LogStore before writing it to a Dataset. For more information, see the pipeline user guide.

API

Dataset

Actions

Method

Parameters

Create

client.create_dataset(workspace, request)

CreateDatasetRequest(dataset_name, description, schema)

Get

client.get_dataset(workspace, dataset_name)

List

client.list_datasets(workspace, request)

ListDatasetsRequest(max_results, next_token, dataset_name)

Update

client.update_dataset(workspace, dataset_name, request)

UpdateDatasetRequest(description)

Delete

client.delete_dataset(workspace, dataset_name)

Data

All data operations use the ExecuteQuery interface:

from alibabacloud_cms20240330.models import ExecuteQueryRequest

request = ExecuteQueryRequest(query="...", type="SQL")
response = client.execute_query(workspace, dataset_name, request)

Parameter

Type

Description

query

string

The query to execute. Supports four modes: full-text search, semantic search, SQL analysis, and hybrid queries. Semantic search uses the similarity() search clause or the semantic_distance() SQL function.

type

string

The query type. The value must be "SQL".

Response structure

{
  "data": [{"id": "...", "input": "...", "score": 0.95, ...}],
  "meta": {
    "count": 10,
    "affectedRows": 0,
    "elapsedMillisecond": 42,
    "progress": "Complete"
  },
  "requestId": "..."
}

Field

Description

data

An array of query results from a SELECT operation.

meta.count

The number of records returned.

meta.affectedRows

The number of rows affected by an INSERT, UPDATE, or DELETE operation.

meta.elapsedMillisecond

The query execution time in milliseconds.

SDK sample code

For complete, runnable sample code, see the dataset/samples/v2/ directory:

File

Description

Link

quickstart.py

An end-to-end demonstration (create → write → query → update → delete).

quickstart.py

manage_dataset.py

Dataset resource management (create, get, list, update, and delete).

manage_dataset.py

write_data.py

Write operations (single insert, batch insert, update, and delete).

write_data.py

query_data.py

Query operations (full-text search, semantic search, SQL analysis, composite query, and pagination).

query_data.py