All Products
Search
Document Center

Realtime Compute for Apache Flink:AI_EXTRACT

Last Updated:Mar 26, 2026

AI_EXTRACT extracts structured information from raw text using a large language model (LLM).

Prerequisites

Before using AI_EXTRACT, register a model service. For details, see Model Settings.

The model's output type must be VARIANT.

Syntax

AI_EXTRACT(
  MODEL => MODEL <MODEL NAME>,
  INPUT => <INPUT COLUMN NAME>,
  EXTRACT_SCHEMA => <EXTRACT SCHEMA>
)

AI_EXTRACT supports both named arguments (shown above) and positional arguments. See Example for both calling styles.

Parameters

ParameterData typeDescription
MODEL <MODEL NAME>MODELThe name of the registered model service.
<INPUT COLUMN NAME>STRINGThe column containing the raw text to extract information from.
<EXTRACT_SCHEMA>STRINGA JSON string that defines the fields to extract and their data types. Must be a constant.

EXTRACT_SCHEMA format

EXTRACT_SCHEMA takes a JSON object where each key is the field name to extract and each value is the target data type:

{"<field_name>": "<data_type>", ...}

Example schema:

{"name": "string", "age": "int", "address": "string", "phone": "string"}

Output

ColumnData typeDescription
extracted_jsonSTRINGThe extracted fields as a JSON string.

Example

This example extracts personal information (name, age, address, and phone number) from a text column using a Qwen model.

Step 1: Register the model

CREATE TEMPORARY MODEL general_model
INPUT (`input` STRING)
OUTPUT (`content` VARIANT)
WITH (
    'provider' = 'openai-compat',
    'endpoint' = '<YOUR ENDPOINT>',
    'apiKey'   = '<YOUR KEY>',
    'model'    = 'qwen-plus'
);

Replace the following placeholders:

PlaceholderDescription
<YOUR ENDPOINT>The endpoint URL of your model service
<YOUR KEY>The API key for your model service

Step 2: Call AI_EXTRACT

CREATE TEMPORARY VIEW infos(id, description)
AS VALUES (1, 'Xiao Ming is 18 years old and lives in Hangzhou. His phone number is ******.');

-- Named argument style
SELECT id, extracted_json
FROM infos,
LATERAL TABLE(
  AI_EXTRACT(
    MODEL => MODEL general_model,
    INPUT => description,
    EXTRACT_SCHEMA => '{"name":"string","phone":"string","address":"string","age":"int"}'));

-- Positional argument style
SELECT id, extracted_json
FROM infos,
LATERAL TABLE(
  AI_EXTRACT(
    MODEL general_model,
    description,
    '{"name":"string","phone":"string","address":"string","age":"int"}'));

Output:

idextracted_json
1{"address":"Hangzhou","age":18,"name":"Xiao Ming","phone":"******"}

Limitations

  • Supported only by the real-time computing engine Ververica Runtime (VVR) 11.4 or later.

  • Throughput is limited by the model service platform. When traffic reaches the platform's access limit, backpressure occurs on the Flink operators used in AI_EXTRACT statements. Severe rate limiting can trigger timeouts and cause the Flink job to restart.