Translates a text column in a Flink SQL job using a large language model (LLM).
Limitations
-
Requires Ververica Runtime (VVR) 11.4 or later.
-
Throughput is subject to rate limiting by the model platform. When the rate limit is exceeded, the Flink job experiences backpressure and this operator becomes a bottleneck. Severe rate limiting triggers timeout errors and causes the job to restart.
Syntax
AI_TRANSLATE(
MODEL => MODEL <MODEL NAME>,
INPUT => <INPUT COLUMN NAME>,
SOURCE_LANG => <SOURCE LANGUAGE>,
TARGET_LANG => <TARGET LANGUAGE>
)
Parameters
| Parameter |
Data type |
Description |
MODEL <MODEL NAME> |
MODEL |
The name of the registered model service. For more information, see Model settings. The output type of this model must be VARIANT. |
<INPUT COLUMN NAME> |
STRING |
The source text to translate. |
<SOURCE LANGUAGE> |
STRING |
The language code of the source text (for example, zh for Chinese). Pass auto to let the model detect the language automatically. Must be a constant. |
<TARGET LANGUAGE> |
STRING |
The language code of the target language (for example, zh for Chinese). Must be a constant. |
The supported languages are determined by the model you register.
Supported languages
| Language |
Code |
| Arabic |
ar |
| Chinese |
zh |
| English |
en |
| French |
fr |
| German |
de |
| Japanese |
ja |
| Korean |
ko |
| Portuguese |
pt |
| Russian |
ru |
| Spanish |
es |
Return values
| Field |
Data type |
Description |
translated_text |
STRING |
The translated text. |
detected_language |
STRING |
The detected source language. |
Examples
The following examples create a Qwen-Plus model and use AI_TRANSLATE to translate English text to Chinese. Both positional and named argument syntax are shown.
Test data
| id |
content |
| 1 |
I like Flink very much. It's so fast! |
SQL statements
CREATE TEMPORARY MODEL general_model
INPUT (`input` STRING)
OUTPUT (`content` VARIANT)
WITH (
'provider' = 'openai-compat',
'endpoint' = '<YOUR ENDPOINT>',
'apiKey' = '<YOUR KEY>',
'model' = 'qwen-plus'
);
CREATE TEMPORARY VIEW infos(id, content)
AS VALUES (1, 'I like Flink very much. It''s so fast!');
-- Positional argument syntax
SELECT id, translated_text, detected_language
FROM infos,
LATERAL TABLE(
AI_TRANSLATE(
MODEL general_model,
content,
'auto',
'zh'
));
-- Named argument syntax
SELECT id, translated_text, detected_language
FROM infos,
LATERAL TABLE(
AI_TRANSLATE(
MODEL => MODEL general_model,
INPUT => content,
SOURCE_LANG => 'auto',
TARGET_LANG => 'zh'
));
Replace the following placeholders with your actual values:
| Placeholder |
Description |
<YOUR ENDPOINT> |
The endpoint URL of your model service |
<YOUR KEY> |
The API key for your model service |
Output
| id |
translated_text |
detected_language |
| 1 |
I like Flink very much. It's very fast! |
en |