All Products
Search
Document Center

Realtime Compute for Apache Flink:AI_MASK

Last Updated:Mar 26, 2026

Detects and masks named entities in text using a large language model (LLM). Pass a text column and a list of entity types — the function returns the masked text and a list of detected entities.

Limitations

  • Requires Ververica Runtime (VVR) 11.4 or later.

  • Throughput is limited by the rate limiting policy of the model service platform. When traffic reaches the platform's access limit, backpressure occurs in the Flink job and AI_MASK becomes the bottleneck. In severe cases, operators may time out and cause the Flink job to restart.

Syntax

AI_MASK(
  MODEL => MODEL <model_name>,
  INPUT => <input_column>,
  MASK_ENTITIES => <mask_entities>
)

Both named arguments (shown above) and positional arguments are supported. See Examples.

Parameters

ParameterData typeDescription
MODEL <model_name>MODELName of the registered model service. For more information, see Model settings. The model must return output of type VARIANT.
<input_column>STRINGThe text column to analyze.
<mask_entities>ARRAY\<STRING\>Entity types to detect and mask. This parameter must be a constant.

Outputs

AI_MASK returns one row per input row with the following columns:

ColumnData typeDescription
masked_textSTRINGThe input text with detected entities replaced by bracketed placeholders, for example [NAME].
detected_entitiesARRAY\<STRING\>The entities detected in the input text. Each element is a JSON string with two fields: entity (the original text fragment) and type (the entity type label), for example {"entity":"Timmo","type":"name"}.

Examples

The following example creates a Qwen-Plus model, loads sample data, and masks person names using both positional and named argument syntax.

Test data

idcontent
1Timmo really loves studying. He reads study materials whenever he has free time.

SQL statement

CREATE TEMPORARY MODEL general_model
INPUT (`input` STRING)
OUTPUT (`content` VARIANT)
WITH (
    'provider' = 'openai-compat',
    'endpoint'='<YOUR ENDPOINT>',
    'apiKey' = '<YOUR KEY>',
    'model' = 'qwen-plus'
);

CREATE TEMPORARY VIEW infos(id, content)
AS VALUES (1, 'Timmo really loves studying. He reads study materials whenever he has free time.');

-- Positional argument syntax
SELECT id, masked_text, detected_entities
FROM infos,
LATERAL TABLE(
  AI_MASK(
    MODEL general_model,
    content,
    ARRAY['name']
    ));

-- Named argument syntax
SELECT id, masked_text, detected_entities
FROM infos,
LATERAL TABLE(
  AI_MASK(
    MODEL => MODEL general_model,
    INPUT => content,
    MASK_ENTITIES => ARRAY['name']
    ));

Replace the following placeholders with actual values:

PlaceholderDescription
<YOUR ENDPOINT>The endpoint URL of your model service
<YOUR KEY>The API key for your model service

Output

idmasked_textdetected_entities
1[NAME] really loves studying. He reads study materials whenever he has free time.[{"entity":"Timmo","type":"name"}]