All Products
Search
Document Center

Realtime Compute for Apache Flink:AI_CLASSIFY

Last Updated:Mar 26, 2026

AI_CLASSIFY classifies text using a large language model (LLM). Call it from Flink SQL to add an LLM-powered classification column to your streaming pipeline—no UDF required.

Prerequisites

Before you begin, ensure that you have:

  • Ververica Runtime (VVR) 11.4 or later

  • A registered model with a VARIANT output type. For details, see Model settings.

Syntax

AI_CLASSIFY(
  MODEL => MODEL <model_name>,
  INPUT => <input_column>,
  LABELS => <labels>
)

Both positional and named argument styles are supported. See Examples.

Parameters

Parameter Type Description
MODEL <model_name> MODEL The registered model to use. The model's output type must be VARIANT.
<input_column> STRING The text column to classify.
<labels> ARRAY<STRING> The classification labels. Must be a constant.

Return values

AI_CLASSIFY returns one row per input row with the following columns:

Column Type Description
category STRING The label assigned by the model.
confidence DOUBLE The confidence level for the assigned label.

Examples

This example creates a model and a product view, then classifies each product using both positional and named argument styles.

Test data

id content label
1 Li-Ning Way of Wade 10 Basketball Shoes, Performance Basketball Shoes, Shock Absorption and Rebound, Black/Red Digital
2 Apple iPhone 15 Pro Max 256GB, Space Black, 5G Phone, A17 Pro Chip, Titanium Frame Clothing

SQL

CREATE TEMPORARY MODEL general_model
INPUT (`input` STRING)
OUTPUT (`content` VARIANT)
WITH (
    'provider' = 'openai-compat',
    'endpoint'  = '<YOUR ENDPOINT>',
    'apiKey'    = '<YOUR KEY>',
    'model'     = 'qwen-plus'
);

CREATE TEMPORARY VIEW products(id, content)
AS VALUES
  (1, 'Li-Ning Way of Wade 10 Basketball Shoes, Performance Basketball Shoes, Shock Absorption and Rebound, Black/Red'),
  (2, 'Apple iPhone 15 Pro Max 256GB, Space Black, 5G Phone, A17 Pro Chip, Titanium Frame');

-- Positional argument style
SELECT id, category, confidence
FROM products,
LATERAL TABLE(
  AI_CLASSIFY(MODEL general_model, content, ARRAY['Digital', 'Clothing']));

-- Named argument style
SELECT id, category, confidence
FROM products,
LATERAL TABLE(
  AI_CLASSIFY(
    MODEL  => MODEL general_model,
    INPUT  => content,
    LABELS => ARRAY['Digital', 'Clothing']));

Replace the following placeholders with your actual values:

Placeholder Description
<YOUR ENDPOINT> The endpoint of your model service
<YOUR KEY> The API key for your model service

Output

id category confidence
1 Clothing 0.95
2 Digital 0.99

Usage notes

  • `LABELS` must be a constant. Dynamic or computed label arrays are not supported.

Limitations

  • Requires Ververica Runtime (VVR) 11.4 or later.

  • The throughput of AI_CLASSIFY operators is subject to the rate limits of Alibaba Cloud Model Studio. When the rate limits for a model are reached, the Flink job is backpressured with AI_CLASSIFY operators as the bottleneck. In some cases, timeout errors and job restarts may be triggered.

What's next

  • Model settings: Register and configure a model for use with AI_CLASSIFY.