全部產品
Search
文件中心

Realtime Compute for Apache Flink:AI_EXTRACT

更新時間:Dec 04, 2025

本文為您介紹如何使用AI_EXTRACT通過AI大模型進行結構化資訊提取。

使用限制

  • 僅Realtime Compute引擎VVR 11.4及以上版本支援。

  • AI_EXTRACT語句相關的Flink運算元的輸送量受到模型平台限流的限制。當觸及平台允許的訪問資料傳輸量上限時,Flink作業會表現出以該運算元為瓶頸的反壓現象。在限流情況嚴重時,可能會觸發相關運算元的逾時報錯及作業重啟。

文法

AI_EXTRACT(
  MODEL => MODEL <MODEL NAME>, 
  INPUT => <INPUT COLUMN NAME>,
  EXTRACT_SCHEMA => <EXTRACT SCHEMA>
)

入參

參數

資料類型

說明

MODEL <MODEL NAME>

MODEL

註冊的模型服務名字。詳情請參見模型設定註冊模型服務。

注意:目前需要該模型的輸出類型為 VARIANT 類型。

<INPUT COLUMN NAME>

STRING

模型待提取資訊的原始文本。

<EXTRACT SCHEMA>

STRING

用JSON文本的形式描述提取的schema資訊。

注意:目前該入參需要是一個常量。

輸出

參數

資料類型

說明

extracted_json

STRING

提取出的結構化資訊。

樣本

測試資料

id

description

1

小明今年18歲住在杭州,他的號碼是******。

測試語句

樣本SQL建立通義千問Qwen-Plus模型,使用AI_EXTRACT提取使用者資訊。

CREATE TEMPORARY MODEL general_model
INPUT (`input` STRING)
OUTPUT (`content` VARIANT)
WITH (
    'provider' = 'openai-compat',
    'endpoint'='<YOUR ENDPOINT>',
    'apiKey' = '<YOUR KEY>',
    'model' = 'qwen-plus'
);

CREATE TEMPORARY VIEW infos(id, description)
AS VALUES (1, '小明今年18歲住在杭州,他的號碼是******。');

-- Use positional argument to call AI_EXTRACT
SELECT id, extracted_json
FROM infos,
LATERAL TABLE(
  AI_EXTRACT(
    MODEL general_model, 
    description, 
    '{"name":"string","phone":"string","address":"string","age":"int"}'));  
-- Use named argument to call AI_EXTRACT
SELECT id, extracted_json
FROM infos,
LATERAL TABLE(
  AI_EXTRACT(
    MODEL => MODEL general_model, 
    INPUT => description, 
    EXTRACT_SCHEMA => '{"name":"string","phone":"string","address":"string","age":"int"}'));

輸出結果

id

extracted_json

1

{"address":"杭州","age":18,"name":"小明","phone":"******"}