All Products
Search
Document Center

Realtime Compute for Apache Flink:AI_EXTRACT

Last Updated:Dec 09, 2025

This topic describes how to use the AI_EXTRACT function to extract structured information.

Limitations

  • This function requires Ververica Runtime (VVR) 11.4 or later.

  • The throughput of AI_EXTRACT operators is subject to the rate limits of Alibaba Cloud Model Studio. When the rate limits for a model are reached, the Flink job will be backpressured with AI_EXTRACT operators as the bottleneck. In some cases, timeout errors and job restarts may be triggered.

Syntax

AI_EXTRACT(
  MODEL => MODEL <MODEL NAME>, 
  INPUT => <INPUT COLUMN NAME>,
  EXTRACT_SCHEMA => <EXTRACT SCHEMA>
)

Input parameters

Parameter

Data type

Description

MODEL <MODEL NAME>

MODEL

The name of the registered model. For more information, see Model settings.

Note: The output type of the model must be VARIANT.

<INPUT COLUMN NAME>

STRING

The text from which the model extracts information.

<EXTRACT SCHEMA>

STRING

The schema for extraction, specified in JSON format.

Note: This input parameter must be a constant.

Return value

Parameter

Data type

Description

extracted_json

STRING

The extracted structured information.

Examples

Test data

id

description

1

John, 18, lives in Singapore and his phone number is ****.

Test statements

The following SQL uses the Qwen-Plus model and AI_EXTRACT to extract user information.

CREATE TEMPORARY MODEL general_model
INPUT (`input` STRING)
OUTPUT (`content` VARIANT)
WITH (
    'provider' = 'openai-compat',
    'endpoint'='<YOUR ENDPOINT>',
    'apiKey' = '<YOUR KEY>',
    'model' = 'qwen-plus'
);

CREATE TEMPORARY VIEW infos(id, description)
AS VALUES (1, 'John, 18, lives in Singapore and his phone number is ****.');

-- Use positional argument to call AI_EXTRACT
SELECT id, extracted_json
FROM infos,
LATERAL TABLE(
  AI_EXTRACT(
    MODEL general_model, 
    description, 
    '{"name":"string","phone":"string","address":"string","age":"int"}'));  
-- Use named argument to call AI_EXTRACT
SELECT id, extracted_json
FROM infos,
LATERAL TABLE(
  AI_EXTRACT(
    MODEL => MODEL general_model, 
    INPUT => description, 
    EXTRACT_SCHEMA => '{"name":"string","phone":"string","address":"string","age":"int"}'));

Result

id

extracted_json

1

{"address":"Singapore","age":18,"name":"John","phone":"******"}