AI_CLASSIFY classifies text using a large language model (LLM). Call it from Flink SQL to add an LLM-powered classification column to your streaming pipeline—no UDF required.
Prerequisites
Before you begin, ensure that you have:
-
Ververica Runtime (VVR) 11.4 or later
-
A registered model with a VARIANT output type. For details, see Model settings.
Syntax
AI_CLASSIFY(
MODEL => MODEL <model_name>,
INPUT => <input_column>,
LABELS => <labels>
)
Both positional and named argument styles are supported. See Examples.
Parameters
| Parameter | Type | Description |
|---|---|---|
MODEL <model_name> |
MODEL | The registered model to use. The model's output type must be VARIANT. |
<input_column> |
STRING | The text column to classify. |
<labels> |
ARRAY<STRING> | The classification labels. Must be a constant. |
Return values
AI_CLASSIFY returns one row per input row with the following columns:
| Column | Type | Description |
|---|---|---|
category |
STRING | The label assigned by the model. |
confidence |
DOUBLE | The confidence level for the assigned label. |
Examples
This example creates a model and a product view, then classifies each product using both positional and named argument styles.
Test data
| id | content | label |
|---|---|---|
| 1 | Li-Ning Way of Wade 10 Basketball Shoes, Performance Basketball Shoes, Shock Absorption and Rebound, Black/Red | Digital |
| 2 | Apple iPhone 15 Pro Max 256GB, Space Black, 5G Phone, A17 Pro Chip, Titanium Frame | Clothing |
SQL
CREATE TEMPORARY MODEL general_model
INPUT (`input` STRING)
OUTPUT (`content` VARIANT)
WITH (
'provider' = 'openai-compat',
'endpoint' = '<YOUR ENDPOINT>',
'apiKey' = '<YOUR KEY>',
'model' = 'qwen-plus'
);
CREATE TEMPORARY VIEW products(id, content)
AS VALUES
(1, 'Li-Ning Way of Wade 10 Basketball Shoes, Performance Basketball Shoes, Shock Absorption and Rebound, Black/Red'),
(2, 'Apple iPhone 15 Pro Max 256GB, Space Black, 5G Phone, A17 Pro Chip, Titanium Frame');
-- Positional argument style
SELECT id, category, confidence
FROM products,
LATERAL TABLE(
AI_CLASSIFY(MODEL general_model, content, ARRAY['Digital', 'Clothing']));
-- Named argument style
SELECT id, category, confidence
FROM products,
LATERAL TABLE(
AI_CLASSIFY(
MODEL => MODEL general_model,
INPUT => content,
LABELS => ARRAY['Digital', 'Clothing']));
Replace the following placeholders with your actual values:
| Placeholder | Description |
|---|---|
<YOUR ENDPOINT> |
The endpoint of your model service |
<YOUR KEY> |
The API key for your model service |
Output
| id | category | confidence |
|---|---|---|
| 1 | Clothing | 0.95 |
| 2 | Digital | 0.99 |
Usage notes
-
`LABELS` must be a constant. Dynamic or computed label arrays are not supported.
Limitations
-
Requires Ververica Runtime (VVR) 11.4 or later.
-
The throughput of
AI_CLASSIFYoperators is subject to the rate limits of Alibaba Cloud Model Studio. When the rate limits for a model are reached, the Flink job is backpressured withAI_CLASSIFYoperators as the bottleneck. In some cases, timeout errors and job restarts may be triggered.
What's next
-
Model settings: Register and configure a model for use with
AI_CLASSIFY.