すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:AI を活用したリアルタイム分析の開始

最終更新日:Nov 06, 2025

このトピックでは、Alibaba Cloud Model Studio で利用可能な Realtime Compute for Apache Flink と大規模言語モデル (LLM) を使用してリアルタイムデータ分析を行う方法について説明します。

背景情報

Alibaba Cloud Model Studio は、AI 開発者とビジネスチームが LLM を開発するためのワンストッププラットフォームです。Alibaba Cloud Model Studio を Realtime Compute for Apache Flink と統合し、Alibaba Cloud Model Studio の AI モデルと Flink のストリーム処理機能を活用してリアルタイムデータパイプラインを構築することで、デシジョンインテリジェンスを大幅に促進できます。以下に、2 つのコアモデルのアプリケーションシナリオを示します。

  • チャット補完モデル

    チャット補完モデルは、チャット生成とテキスト理解に特化した LLM です。感情分析、意図認識、質問応答など、さまざまなユースケースに対応します。

    • 感情分析: ソーシャルメディアのコメントをリアルタイムで分析し、ユーザーの感情的なトーンが肯定的、否定的、または中立的であるかを迅速に判断します。

    • インテリジェントカスタマーサービス: LLM の対話生成機能に基づいて、自然言語で顧客と対話します。

    • コンテンツモデレーション: テキスト内の機密コンテンツや禁止コンテンツを自動的に検出し、コンテンツモデレーションの効率を向上させます。

  • 埋め込みモデル

    埋め込みモデルは、テキストを高次元のベクトル表現に変換します。セマンティック検索、レコメンデーションシステム、知識グラフの構築などのシナリオに適しています。

    • セマンティック検索: 製品説明やユーザーのクエリをベクトル化することで、セマンティックな関連性検索を実装します。

    • レコメンデーションシステム: テキストのベクトル化に基づいて、ユーザーの興味と製品の特徴との間の潜在的な関連性を発見し、レコメンデーションの精度を向上させます。

    • 知識グラフ: 非構造化テキストをベクトルに変換して、知識の抽出と関係のモデリングを容易にします。

前提条件

制限事項

Ververica Runtime (VVR) 11.1 以降のバージョンのみがサポートされています。

ステップ 1: モデルの登録

詳細な手順については、「モデル DDL」をご参照ください。

チャット/補完モデル

次の SQL コードは、qwen-turbo モデルを登録する方法を示しています。

CREATE MODEL ai_analyze_sentiment
INPUT (`input` STRING)
OUTPUT (`content` STRING)
WITH (
    'provider'='bailian',
    'endpoint'='<base_url>/compatible-mode/v1/chat/completions',    -- チャット/補完モデルのエンドポイント
    'api-key' = '<YOUR KEY>',
    'model'='qwen-turbo',                                                               -- qwen-turbo モデルを使用
    'system-prompt' = 'Classify the text below with one of the following labels: [positive, negative, neutral, mixed]. Output only the label.'
);

実際の状況に応じて、endpoint の値の <base_url> を置き換えます。

  • インターネットアクセス: <base_url>https://dashscope-intl.aliyuncs.com に置き換えます。

  • VPC アクセス: <base_url>https://vpc-ap-southeast-1.dashscope.aliyuncs.com に置き換えます。

    説明

    endpoint を設定して HTTPS プロトコルを使用します。

ステップ 2: SQL ドラフトの作成

SQL ストリームドラフトを作成するには、「SQL ジョブ」をご参照ください。

ステップ 3: AI を活用した分析のためのコードの記述

チャット/補完モデル

ML_PREDICT AI 関数を使用して、ステップ 1 で登録した ai_analyze_sentiment モデルを呼び出し、映画レビューの感情分析を行います。

重要

ML_PREDICT オペレーターのスループットは、Alibaba Cloud Model Studio のレート制限の対象となります。モデルのレート制限に達すると、ML_PREDICT オペレーターがボトルネックとなり、Flink ジョブにバックプレッシャーがかかります。場合によっては、タイムアウトエラーやジョブの再起動がトリガーされることがあります。レート制限を引き上げるには、営業担当者にお問い合わせください。

次のコードをコピーして SQL エディターに貼り付けます。

-- 一時的な結果テーブルを作成します。
CREATE TEMPORARY TABLE print_sink(
  id BIGINT,
  movie_name VARCHAR, 
  predict_label VARCHAR, 
  actual_label VARCHAR
) WITH (
  'connector' = 'print',   -- Print コネクタを使用します。
  'logger' = 'true'        -- 結果をコンソールに表示します。
);

-- テストデータを含む一時的なビューを作成します。
-- | id | movie_name | user_comment   | actual_label |
-- | 1  | Her Story | My favourite part was when the kid guessed the sounds. It was romantic, heartwarming, and loving. | POSITIVE |
-- | 2  | The Dumpling Queen | Nothing special. | NEGATIVE |
CREATE TEMPORARY VIEW movie_comment(id, movie_name,  user_comment, actual_label)
AS VALUES (1, 'Her Story', 'My favourite part was when the kid guessed the sounds. It was romantic, heartwarming, and loving.', 'positive'), (2, 'The Dumpling Queen', 'Nothing special.', 'negative');

INSERT INTO print_sink
SELECT id, movie_name, content as predict_label, actual_label 
FROM ML_PREDICT(
  TABLE movie_comment, 
  MODEL ai_analyze_sentiment,  -- 登録済みの Qwen-Turbo モデルを使用します。
  DESCRIPTOR(user_comment));   

ステップ 4: ジョブの開始

SQL ドラフトをデプロイしてジョブを開始するには、「SQL ジョブ」をご参照ください。

ステップ 5: 結果の表示

チャット/補完モデル

  1. [O&M] > [Deployments] に移動し、ジョブが完了したことを確認します。

    image

  2. ターゲットのデプロイメントの名前をクリックします。

  3. [ログ] タブで、[タスクマネージャー] サブタブを選択し、[現在のタスクマネージャー] ドロップダウンリストから TaskManager を選択します。

  4. [ログ] サイドタブをクリックし、PrintSinkOutputWriter を検索します。

    モデル分析後の predict_label の結果は、actual_label の実際の結果と一致しています。

    image

リファレンス