このトピックでは、Alibaba Cloud Model Studio で利用可能な Realtime Compute for Apache Flink と大規模言語モデル (LLM) を使用してリアルタイムデータ分析を行う方法について説明します。
背景情報
Alibaba Cloud Model Studio は、AI 開発者とビジネスチームが LLM を開発するためのワンストッププラットフォームです。Alibaba Cloud Model Studio を Realtime Compute for Apache Flink と統合し、Alibaba Cloud Model Studio の AI モデルと Flink のストリーム処理機能を活用してリアルタイムデータパイプラインを構築することで、デシジョンインテリジェンスを大幅に促進できます。以下に、2 つのコアモデルのアプリケーションシナリオを示します。
チャット補完モデル
チャット補完モデルは、チャット生成とテキスト理解に特化した LLM です。感情分析、意図認識、質問応答など、さまざまなユースケースに対応します。
感情分析: ソーシャルメディアのコメントをリアルタイムで分析し、ユーザーの感情的なトーンが肯定的、否定的、または中立的であるかを迅速に判断します。
インテリジェントカスタマーサービス: LLM の対話生成機能に基づいて、自然言語で顧客と対話します。
コンテンツモデレーション: テキスト内の機密コンテンツや禁止コンテンツを自動的に検出し、コンテンツモデレーションの効率を向上させます。
埋め込みモデル
埋め込みモデルは、テキストを高次元のベクトル表現に変換します。セマンティック検索、レコメンデーションシステム、知識グラフの構築などのシナリオに適しています。
セマンティック検索: 製品説明やユーザーのクエリをベクトル化することで、セマンティックな関連性検索を実装します。
レコメンデーションシステム: テキストのベクトル化に基づいて、ユーザーの興味と製品の特徴との間の潜在的な関連性を発見し、レコメンデーションの精度を向上させます。
知識グラフ: 非構造化テキストをベクトルに変換して、知識の抽出と関係のモデリングを容易にします。
前提条件
Flink ワークスペースを作成していること。
Model Studio ワークスペースを作成し、Realtime Compute for Apache Flink とのネットワーク接続を確立していること。Model Studio に VPC 経由でアクセスする方法については、「プライベートネットワーク経由でモデルまたはアプリケーション API にアクセスする」をご参照ください。
制限事項
Ververica Runtime (VVR) 11.1 以降のバージョンのみがサポートされています。
ステップ 1: モデルの登録
詳細な手順については、「モデル DDL」をご参照ください。
チャット/補完モデル
次の SQL コードは、qwen-turbo モデルを登録する方法を示しています。
CREATE MODEL ai_analyze_sentiment
INPUT (`input` STRING)
OUTPUT (`content` STRING)
WITH (
'provider'='bailian',
'endpoint'='<base_url>/compatible-mode/v1/chat/completions', -- チャット/補完モデルのエンドポイント
'api-key' = '<YOUR KEY>',
'model'='qwen-turbo', -- qwen-turbo モデルを使用
'system-prompt' = 'Classify the text below with one of the following labels: [positive, negative, neutral, mixed]. Output only the label.'
);実際の状況に応じて、endpoint の値の <base_url> を置き換えます。
インターネットアクセス:
<base_url>をhttps://dashscope-intl.aliyuncs.comに置き換えます。VPC アクセス:
<base_url>をhttps://vpc-ap-southeast-1.dashscope.aliyuncs.comに置き換えます。説明endpointを設定して HTTPS プロトコルを使用します。
ステップ 2: SQL ドラフトの作成
SQL ストリームドラフトを作成するには、「SQL ジョブ」をご参照ください。
ステップ 3: AI を活用した分析のためのコードの記述
チャット/補完モデル
ML_PREDICT AI 関数を使用して、ステップ 1 で登録した ai_analyze_sentiment モデルを呼び出し、映画レビューの感情分析を行います。
ML_PREDICT オペレーターのスループットは、Alibaba Cloud Model Studio のレート制限の対象となります。モデルのレート制限に達すると、ML_PREDICT オペレーターがボトルネックとなり、Flink ジョブにバックプレッシャーがかかります。場合によっては、タイムアウトエラーやジョブの再起動がトリガーされることがあります。レート制限を引き上げるには、営業担当者にお問い合わせください。
次のコードをコピーして SQL エディターに貼り付けます。
-- 一時的な結果テーブルを作成します。
CREATE TEMPORARY TABLE print_sink(
id BIGINT,
movie_name VARCHAR,
predict_label VARCHAR,
actual_label VARCHAR
) WITH (
'connector' = 'print', -- Print コネクタを使用します。
'logger' = 'true' -- 結果をコンソールに表示します。
);
-- テストデータを含む一時的なビューを作成します。
-- | id | movie_name | user_comment | actual_label |
-- | 1 | Her Story | My favourite part was when the kid guessed the sounds. It was romantic, heartwarming, and loving. | POSITIVE |
-- | 2 | The Dumpling Queen | Nothing special. | NEGATIVE |
CREATE TEMPORARY VIEW movie_comment(id, movie_name, user_comment, actual_label)
AS VALUES (1, 'Her Story', 'My favourite part was when the kid guessed the sounds. It was romantic, heartwarming, and loving.', 'positive'), (2, 'The Dumpling Queen', 'Nothing special.', 'negative');
INSERT INTO print_sink
SELECT id, movie_name, content as predict_label, actual_label
FROM ML_PREDICT(
TABLE movie_comment,
MODEL ai_analyze_sentiment, -- 登録済みの Qwen-Turbo モデルを使用します。
DESCRIPTOR(user_comment)); ステップ 4: ジョブの開始
SQL ドラフトをデプロイしてジョブを開始するには、「SQL ジョブ」をご参照ください。
ステップ 5: 結果の表示
チャット/補完モデル
に移動し、ジョブが完了したことを確認します。

ターゲットのデプロイメントの名前をクリックします。
[ログ] タブで、[タスクマネージャー] サブタブを選択し、[現在のタスクマネージャー] ドロップダウンリストから TaskManager を選択します。
[ログ] サイドタブをクリックし、
PrintSinkOutputWriterを検索します。モデル分析後の
predict_labelの結果は、actual_labelの実際の結果と一致しています。