This topic describes how to use Realtime Compute for Apache Flink and large language models (LLMs) available in Alibaba Cloud Model Studio for real-time data analytics.
Background information
Alibaba Cloud Model Studio is a one-stop platform for AI developers and business teams to develop LLMs. You can integrate Alibaba Cloud Model Studio with Realtime Compute for Apache Flink, leveraging Alibaba Cloud Model Studio's AI models and Flink's stream processing capabilities to build a real-time data pipeline, significantly facilitating decision intelligence. The following are application scenarios for two core models:
Chat completion models
Chat completion models are LLMs focused on chat generation and text understanding. They address a wide variety of use cases, including sentiment analysis, intent recognition, and question answering.
Sentiment analysis: Analyzes social media comments in real time to quickly determine if a user's emotional tone is positive, negative, or neutral.
Intelligent customer service: Interacts with customers in natural language based on LLMs' dialogue generation capabilities.
Content moderation: Automatically detects sensitive or prohibited contents in texts to improve content moderation efficiency.
Embedding models
Embedding models convert texts into high-dimensional vector representations. They are suitable for scenarios such as semantic search, recommendation systems, and knowledge graph construction.
Semantic search: Implements semantic relevance search by vectorizing product descriptions or user queries.
Recommendation system: Discovers potential associations between user interests and product features to improve recommendation accuracy based on text vectorization.
Knowledge graph: Converts unstructured texts into vectors to facilitate knowledge extraction and relationship modeling.
Prerequisites
You have created a Flink workspace.
You have created a Model Studio workspace and established its network connection with Realtime Compute for Apache Flink. For information about accessing Model Studio via VPC, see Access model or application APIs over a private network.
Limits
Only Ververica Runtime (VVR) 11.1 or later versions are supported.
Step 1: Register a model
For detailed instructions, see Model DDLs.
Chat/completions models
The following SQL code shows how to register the qwen-turbo model:
CREATE MODEL ai_analyze_sentiment
INPUT (`input` STRING)
OUTPUT (`content` STRING)
WITH (
'provider'='bailian',
'endpoint'='<base_url>/compatible-mode/v1/chat/completions', -- chat/completions model endpoint
'api-key' = '<YOUR KEY>',
'model'='qwen-turbo', -- Use the qwen-turbo model
'system-prompt' = 'Classify the text below with one of the following labels: [positive, negative, neutral, mixed]. Output only the label.'
);Replace <base_url> in the endpoint's value based on your actual situations:
Internet access: Replace
<base_url>withhttps://dashscope-intl.aliyuncs.com.VPC access: Replace
<base_url>withhttps://vpc-ap-southeast-1.dashscope.aliyuncs.com.NoteConfigure
endpointto use the HTTPS protocol.
Step 2: Create an SQL draft
See SQL jobs to create an SQL stream draft.
Step 3: Write code for AI-driven analysis
Chat/completion models
Use the ML_PREDICT AI function to call the ai_analyze_sentiment model registered in Step 1 for sentiment analysis of movie reviews.
The throughput of ML_PREDICT operators is subject to the rate limits of Alibaba Cloud Model Studio. When the rate limits for a model are reached, the Flink job will be backpressured with ML_PREDICT operators as the bottleneck. In some cases, timeout errors and job restarts may be triggered. To increase the rate limits, contact your sales representative.
Copy and paste the following code to the SQL editor.
-- Create a temporary sink table.
CREATE TEMPORARY TABLE print_sink(
id BIGINT,
movie_name VARCHAR,
predict_label VARCHAR,
actual_label VARCHAR
) WITH (
'connector' = 'print', -- Use the Print connector.
'logger' = 'true' -- Display the results in the console.
);
-- Create a temporary view that contains test data.
-- | id | movie_name | user_comment | actual_label |
-- | 1 | Her Story | My favourite part was when the kid guessed the sounds. It was romantic, heartwarming, and loving. | POSITIVE |
-- | 2 | The Dumpling Queen | Nothing special. | NEGATIVE |
CREATE TEMPORARY VIEW movie_comment(id, movie_name, user_comment, actual_label)
AS VALUES (1, 'Her Story', 'My favourite part was when the kid guessed the sounds. It was romantic, heartwarming, and loving.', 'positive'), (2, 'The Dumpling Queen', 'Nothing special.', 'negative');
INSERT INTO print_sink
SELECT id, movie_name, content as predict_label, actual_label
FROM ML_PREDICT(
TABLE movie_comment,
MODEL ai_analyze_sentiment, -- Use the registered Qwen-Turbo model.
DESCRIPTOR(user_comment)); Step 4: Start the job
See SQL jobs to deploy the SQL draft and start the job.
Step 5: View the results
Chat/completion models
Navigate to , and verify your job has finished.

Click the target deployment's name.
On the Logs tab, choose the Task Managers sub-tab and select a TaskManager from the Current TaskManager drop-down list.
Click the Logs side tab, and search
PrintSinkOutputWriter.The results in
predict_labelafter model analysis is consistent with the actual results inactual_label.