すべてのプロダクト
Search
ドキュメントセンター

Platform For AI:モデルギャラリーのクイックスタート

最終更新日:Nov 22, 2025

モデルギャラリーは Platform for AI (PAI)-DLC と PAI-EAS をカプセル化し、オープンソースの大規模言語モデル (LLM) を効率的にデプロイおよびトレーニングするためのゼロコードソリューションを提供します。このガイドでは、Qwen3-0.6B モデルを使用してプロセスを説明します。他のモデルにも同じ手順が適用されます。

前提条件

Alibaba Cloud アカウントを使用して Platform for AI (PAI) を有効化し、ワークスペースを作成します。これを行うには、PAI コンソールにログインし、左上隅でリージョンを選択してから、ワンクリック権限付与を使用してサービスを有効化します。

課金

このガイドの例では、パブリックリソースを使用して PAI-DLC タスクと PAI-EAS サービスを作成します。これらのリソースは従量課金制で課金されます。課金ルールの詳細については、「DLC の課金」および「EAS の課金」をご参照ください。

モデルのデプロイ

モデルのデプロイ

  1. PAI コンソールにログインします。左側のナビゲーションウィンドウで、[モデルギャラリー] をクリックします。 Qwen3-0.6B を検索し、[デプロイ]をクリックします。

    image

  2. デプロイメントパラメーターを設定します。デプロイメントページにはデフォルトのパラメーターが含まれています。[デプロイ] > [OK] をクリックします。デプロイメントプロセスには約 5 分かかります。ステータスが [実行中] に変わると、デプロイメントは成功です。

    デフォルトでは、このサービスはパブリックリソースを使用し、従量課金制で課金されます。

    image

モデルの呼び出し

  1. 呼び出し情報を表示します。サービス詳細ページで、[呼び出し情報の表示] をクリックして [インターネットエンドポイント][トークン] を取得します。

    後でデプロイメントタスクの詳細を表示するには、[モデルギャラリー] > [ジョブ管理] > [デプロイメントジョブ] に移動し、[サービス名] をクリックします。

    image

  2. モデルサービスをテストします。次のいずれかのメソッドを使用してモデルを呼び出すことができます。

    オンラインデバッグ

    [オンラインデバッグ] ページに切り替えます。[本文] フィールドに、Hello, who are you? などの質問を入力します。次に、[リクエストの送信] をクリックします。LLM の応答が右側に表示されます。

    image

    Cherry Studio クライアントの使用

    Cherry Studio は、大規模モデルと簡単にチャットできる MCP 機能を統合した、人気のある大規模モデルチャットクライアントです。

    PAI にデプロイされた Qwen3 モデルへの接続

    Python SDK の使用

    from openai import OpenAI
    import os
    
    # 環境変数を設定していない場合は、次の行を EAS サービスのトークンに置き換えてください: token = 'YTA1NTEzMzY3ZTY4Z******************'
    token = os.environ.get("Token")
    # サービス URL の末尾にある "/v1" は削除しないでください。
    client = OpenAI(
        api_key=token,
        base_url=f'Your service URL/v1',
    )
    
    if token is None:
        print("Token 環境変数を設定するか、トークンを token 変数に直接割り当ててください。")
        exit()
    
    query = 'Hello, who are you?'
    messages = [{'role': 'user', 'content': query}]
    
    resp = client.chat.completions.create(model='Qwen3-0.6B', messages=messages, max_tokens=512, temperature=0)
    query = messages[0]['content']
    response = resp.choices[0].message.content
    print(f'query: {query}')
    print(f'response: {response}')

重要事項

このガイドのモデルサービスは、パブリックリソースを使用して作成されており、従量課金制で課金されます。追加料金を避けるため、終了したらサービスを停止または削除してください。

image

モデルのファインチューニング

特定のドメインにおけるモデルのパフォーマンスを向上させるために、そのドメインのデータセットでモデルをファインチューニングできます。このセクションでは、次のシナリオを使用して、モデルのファインチューニングの目的と手順を説明します。

ユースケース

物流業界では、自然言語から構造化情報 (受信者、住所、電話番号など) を抽出する必要が頻繁にあります。Qwen3-235B-A22B などの大規模モデルはこのタスクで優れたパフォーマンスを発揮しますが、コストが高く、レイテンシーも高くなります。パフォーマンスとコストのバランスをとるために、まず大規模パラメーターモデルを使用してデータにラベルを付け、そのデータを使用して Qwen3-0.6B などの小規模モデルをファインチューニングできます。このプロセスはモデル蒸留としても知られています。

同じ構造化情報抽出タスクにおいて、元の Qwen3-0.6B モデルの精度は 50% です。ファインチューニング後、その精度は 90% を超えることができます。

受信者住所情報の例

構造化情報の例

Amina Patel - 電話番号 (474) 598-1543 - 1425 S 5th St, Apt 3B, Allentown, Pennsylvania 18104

{
    "state": "Pennsylvania",
    "city": "Allentown",
    "zip_code": "18104",
    "street_address": "1425 S 5th St, Apt 3B",
    "name": "Amina Patel",
    "phone": "(474) 598-1543"
}

データの準備

このタスクのために教師モデル (Qwen3-235B-A22B) から Qwen3-0.6B モデルに知識を蒸留するには、まず教師モデルの API を使用して受信者の住所情報を構造化 JSON データに抽出する必要があります。このデータの生成には時間がかかる場合があります。したがって、このトピックでは、サンプルのトレーニングデータセットtrain.json検証セットeval.json を提供しており、直接ダウンロードして使用できます。

モデル蒸留では、より大きなモデルは教師モデルとして知られています。このガイドで使用されるデータは、大規模モデルによって合成的に生成されたものであり、機密性の高いユーザー情報は一切含まれていません。

本番稼働

このソリューションをビジネスに適用するには、次の方法でデータを準備することをお勧めします。

実際のビジネスシナリオ (推奨)

実際のビジネスデータはビジネスシナリオをよりよく反映し、ファインチューニングされたモデルはビジネスによりよく適応できます。ビジネスデータを取得した後、プログラムで次の形式の JSON ファイルに変換する必要があります。

[
    {
        "instruction": "あなたは、米国の配送情報から構造化 JSON を抽出する専門のアシスタントです。JSON キーは、name、street_address、city、state、zip_code、phone です。名前: Isabella Rivera Cruz | 182 Calle Luis Lloréns Torres, Apt 3B, Mayagüez, Puerto Rico 00680 | 携帯電話: (640) 486-5927",
        "output": "{\"name\": \"Isabella Rivera Cruz\", \"street_address\": \"182 Calle Luis Lloréns Torres, Apt 3B\", \"city\": \"Mayagüez\", \"state\": \"Puerto Rico\", \"zip_code\": \"00680\", \"phone\": \"(640) 486-5927\"}"
    },
    {
        "instruction": "あなたは、米国の配送情報から構造化 JSON を抽出する専門のアシスタントです。JSON キーは、name、street_address、city、state、zip_code、phone です。1245 Broadwater Avenue, Apt 3B, Bozeman, Montana 59715 受信者: Aisha Patel P: (429) 763-9742",
        "output": "{\"name\": \"Aisha Patel\", \"street_address\": \"1245 Broadwater Avenue, Apt 3B\", \"city\": \"Bozeman\", \"state\": \"Montana\", \"zip_code\": \"59715\", \"phone\": \"(429) 763-9742\"}"
    }
]

JSON ファイルには複数のトレーニングサンプルが含まれています。各サンプルには、instruction と output の 2 つのフィールドが含まれています。

  • instruction: 大規模モデルの動作をガイドするプロンプトと入力データが含まれています。

  • output: 期待される標準的な回答で、通常は人間の専門家や qwen3-235b-a22b などのより大規模なモデルによって生成されます。

モデル生成

ビジネスデータが不十分な場合は、データ拡張にモデルを使用することを検討してください。これにより、データの多様性とカバー率を向上させることができます。ユーザーのプライバシー漏洩を避けるため、このソリューションではモデルを使用して仮想アドレスデータのバッチを生成します。以下の生成コードは参考用です。

ビジネスデータ生成をシミュレートするためのコード

次のコードを実行するには、Alibaba Cloud Model Studio API キーを作成する必要があります。このコードは、qwen-plus-latest を使用してビジネスデータを生成し、qwen3-235b-a22b を使用してラベリングします。

# -*- coding: utf-8 -*-
import os
import asyncio
import random
import json
import sys
from typing import List
import platform
from openai import AsyncOpenAI

# 非同期クライアントインスタンスを作成します。
# 注: このスクリプトは DashScope 互換の API エンドポイントを使用します。
# 別の OpenAI 互換サービスを使用している場合は、base_url を変更してください。
client = AsyncOpenAI(
    api_key=os.getenv("DASHSCOPE_API_KEY"),
    base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"
)

# 米国の州と準州のリスト。
us_states = [
    "Alabama", "Alaska", "Arizona", "Arkansas", "California", "Colorado", "Connecticut", "Delaware",
    "Florida", "Georgia", "Hawaii", "Idaho", "Illinois", "Indiana", "Iowa", "Kansas", "Kentucky",
    "Louisiana", "Maine", "Maryland", "Massachusetts", "Michigan", "Minnesota", "Mississippi",
    "Missouri", "Montana", "Nebraska", "Nevada", "New Hampshire", "New Jersey", "New Mexico",
    "New York", "North Carolina", "North Dakota", "Ohio", "Oklahoma", "Oregon", "Pennsylvania",
    "Rhode Island", "South Carolina", "South Dakota", "Tennessee", "Texas", "Utah", "Vermont",
    "Virginia", "Washington", "West Virginia", "Wisconsin", "Wyoming", "District of Columbia",
    "Puerto Rico", "Guam", "American Samoa", "U.S. Virgin Islands", "Northern Mariana Islands"
]

# 受信者テンプレート。
recipient_templates = [
    "To: {name}", "Recipient: {name}", "Deliver to {name}", "For: {name}",
    "ATTN: {name}", "{name}", "Name: {name}", "Contact: {name}", "Receiver: {name}"
]

# 電話番号テンプレート。
phone_templates = [
    "Tel: {phone}", "Tel. {phone}", "Mobile: {phone}", "Phone: {phone}",
    "Contact number: {phone}", "Phone number {phone}", "TEL: {phone}", "MOBILE: {phone}",
    "Contact: {phone}", "P: {phone}", "{phone}", "Call: {phone}",
]


# もっともらしい米国風の電話番号を生成します。
def generate_us_phone():
    """(XXX) XXX-XXXX 形式のランダムな 10 桁の米国電話番号を生成します。"""
    area_code = random.randint(201, 999)  # 0xx、1xx の市外局番を避けます。
    exchange = random.randint(200, 999)
    line = random.randint(1000, 9999)
    return f"({area_code}) {exchange}-{line}"


# LLM を使用して受信者と住所の情報を生成します。
async def generate_recipient_and_address_by_llm(state: str):
    """LLM を使用して、指定された州の受信者の名前と住所の詳細を生成します。"""
    prompt = f"""米国 {state} の場所の受信者情報を生成してください。以下を含みます:
1. 現実的なフルネームの英語名。多様性を目指してください。
2. その州内の実際の都市名。
3. 番地、通り名、アパート番号などの具体的な住所。現実的である必要があります。
4. その都市または地域に対応する 5 桁の郵便番号。

以下の形式で JSON オブジェクトのみを返してください:
{{"name": "受信者名", "city": "都市名", "street_address": "具体的な住所", "zip_code": "郵便番号"}}

他のテキストは含めず、JSON のみを含めてください。名前は John Doe だけでなく、多様性を持たせてください。
"""

    try:
        response = await client.chat.completions.create(
            messages=[{"role": "user", "content": prompt}],
            model="qwen-plus-latest",
            temperature=1.5,  # より多様な名前と住所のために温度を上げます。
        )

        result = response.choices[0].message.content.strip()
        # 潜在的なマークダウンコードブロックマーカーをクリーンアップします。
        if result.startswith('```'):
            result = result.split('\n', 1)[1]
        if result.endswith('```'):
            result = result.rsplit('\n', 1)[0]

        # JSON の解析を試みます。
        info = json.loads(result)
        print(info)
        return info
    except Exception as e:
        print(f"受信者と住所の生成に失敗しました: {e}、フォールバックを使用します。")
        # フォールバックメカニズム。
        backup_names = ["Michael Johnson", "Emily Williams", "David Brown", "Jessica Jones", "Christopher Davis",
                        "Sarah Miller"]
        return {
            "name": random.choice(backup_names),
            "city": "Anytown",
            "street_address": f"{random.randint(100, 9999)} Main St",
            "zip_code": f"{random.randint(10000, 99999)}"
        }


# 単一の生データレコードを生成します。
async def generate_record():
    """米国の住所情報の乱雑で結合された 1 つの文字列を生成します。"""
    # 州をランダムに選択します。
    state = random.choice(us_states)

    # LLM を使用して受信者と住所の情報を生成します。
    info = await generate_recipient_and_address_by_llm(state)

    # 受信者名をフォーマットします。
    recipient = random.choice(recipient_templates).format(name=info['name'])

    # 電話番号を生成します。
    phone = generate_us_phone()
    phone_info = random.choice(phone_templates).format(phone=phone)

    # 完全な住所行を組み立てます。
    full_address = f"{info['street_address']}, {info['city']}, {state} {info['zip_code']}"

    # すべてのコンポーネントを結合します。
    components = [recipient, phone_info, full_address]

    # コンポーネントの順序をランダム化します。
    random.shuffle(components)

    # ランダムな区切り文字を選択します。
    separators = [' ', ', ', '; ', ' | ', '\t', ' - ', ' // ', '', '  ']
    separator = random.choice(separators)

    # コンポーネントを結合します。
    combined_data = separator.join(components)
    return combined_data.strip()


# データのバッチを生成します。
async def generate_batch_data(count: int) -> List[str]:
    """指定された数のデータレコードを生成します。"""
    print(f"{count} 件のレコードの生成を開始します...")

    # セマフォを使用して同時実行性を制御します。たとえば、最大 20 の同時リクエスト。
    semaphore = asyncio.Semaphore(20)

    async def generate_single_record(index):
        async with semaphore:
            try:
                record = await generate_record()
                print(f"レコード #{index + 1} を生成しました: {record}")
                return record
            except Exception as e:
                print(f"レコード #{index + 1} の生成に失敗しました: {e}")
                return None

    # データを同時に生成します。
    tasks = [generate_single_record(i) for i in range(count)]

    data = await asyncio.gather(*tasks)

    successful_data = [record for record in data if record is not None]

    return successful_data


# データをファイルに保存します。
def save_data(data: List[str], filename: str = "us_recipient_data.json"):
    """生成されたデータを JSON ファイルに保存します。"""
    with open(filename, 'w', encoding='utf-8') as f:
        json.dump(data, f, ensure_ascii=False, indent=2)
    print(f"データは {filename} に保存されました")


# フェーズ 1: データ生成。
async def produce_data_phase():
    """生の受信者データの生成を処理します。"""
    print("=== フェーズ 1: 生の受信者データの生成を開始 ===")

    # 2,000 レコードを生成します。
    batch_size = 2000
    data = await generate_batch_data(batch_size)

    # データを保存します。
    save_data(data, "us_recipient_data.json")

    print(f"\n生成されたレコードの合計: {len(data)}")
    print("\nサンプルデータ:")
    for i, record in enumerate(data[:3]):  # 最初の 3 つを例として表示します。
        print(f"{i + 1}. 生データ: {record}\n")

    print("=== フェーズ 1 完了 ===\n")
    return True


# 抽出モデルのシステムプロンプトを定義します。
def get_system_prompt_for_extraction():
    """情報抽出タスクのシステムプロンプトを返します。"""
    return """あなたは、非構造化テキストから米国の配送先住所を解析することに特化した、プロの情報抽出アシスタントです。

## タスクの説明
指定された入力テキストに基づいて、以下の 6 つのフィールドを含む JSON オブジェクトを正確に抽出して生成します:
- name: 受信者のフルネーム。
- street_address: 番地、通り名、アパートまたはスイート番号を含む完全な住所。
- city: 都市名。
- state: 完全な州名 (例: "California" であり "CA" ではない)。
- zip_code: 5 桁または 9 桁の郵便番号。
- phone: 完全な連絡先電話番号。

## 抽出ルール
1.  **住所の処理**:
    -   コンポーネントを正確に識別します: 通り、都市、州、郵便番号。
    -   `state` フィールドは、完全な公式名でなければなりません (例: "New York" であり "NY" ではない)。
    -   `street_address` には、都市の前のすべての詳細を含める必要があります (例: "123 Apple Lane, Apt 4B")。
2.  **名前の識別**:
    -   受信者のフルネームを抽出します。
3.  **電話番号の処理**:
    -   元の形式を維持して、完全な電話番号を抽出します。
4.  **郵便番号**:
    -   5 桁または 9 桁 (ZIP+4) のコードを抽出します。

## 出力形式
以下の JSON 形式に厳密に従ってください。説明文やマークダウンは追加しないでください。
{
  "name": "受信者のフルネーム",
  "street_address": "完全な住所",
  "city": "都市名",
  "state": "完全な州名",
  "zip_code": "郵便番号",
  "phone": "連絡先電話番号"
}
"""


# LLM を使用して、生のテキストから構造化データを予測します。
async def predict_structured_data(raw_data: str):
    """LLM を使用して、生の文字列から構造化データを予測します。"""
    system_prompt = get_system_prompt_for_extraction()

    try:
        response = await client.chat.completions.create(
            messages=[
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": raw_data}
            ],
            model="qwen3-235b-a22b",  # このタスクには強力なモデルが推奨されます。
            temperature=0.0,  # 抽出の精度を高めるために温度を下げます。
            response_format={"type": "json_object"},
            extra_body={"enable_thinking": False}
        )

        result = response.choices[0].message.content.strip()

        # 潜在的なマークダウンコードブロックマーカーをクリーンアップします。
        if result.startswith('```'):
            lines = result.split('\n')
            for i, line in enumerate(lines):
                if line.strip().startswith('{'):
                    result = '\n'.join(lines[i:])
                    break
        if result.endswith('```'):
            result = result.rsplit('\n```', 1)[0]

        structured_data = json.loads(result)
        return structured_data

    except Exception as e:
        print(f"構造化データの予測に失敗しました: {e}、生データ: {raw_data}")
        # 失敗した場合は空の構造を返します。
        return {
            "name": "",
            "street_address": "",
            "city": "",
            "state": "",
            "zip_code": "",
            "phone": ""
        }


# フェーズ 2: データ変換。
async def convert_data_phase():
    """生データを読み取り、構造化形式を予測し、SFT データとして保存します。"""
    print("=== フェーズ 2: SFT 形式へのデータ変換を開始 ===")

    try:
        print("us_recipient_data.json ファイルを読み込んでいます...")
        with open('us_recipient_data.json', 'r', encoding='utf-8') as f:
            raw_data_list = json.load(f)

        print(f"{len(raw_data_list)} 件のレコードを正常に読み取りました。")
        print("抽出モデルを使用して構造化データの予測を開始します...")

        # シンプルで明確なシステムメッセージは、トレーニングと推論の速度を向上させることができます。
        system_prompt = "あなたは、米国の配送情報から構造化 JSON を抽出する専門のアシスタントです。JSON キーは name、street_address、city、state、zip_code、phone です。"
        output_file = 'us_recipient_sft_data.json'

        # セマフォを使用して同時実行性を制御します。
        semaphore = asyncio.Semaphore(10)

        async def process_single_item(index, raw_data):
            async with (semaphore):
                structured_data = await predict_structured_data(raw_data)
                print(f"レコード #{index + 1} を処理中: {raw_data}")

                conversation = {
                        "instruction": system_prompt + '  ' + raw_data,
                        "output": json.dumps(structured_data, ensure_ascii=False)
                }

                return conversation

        print(f"{output_file} への変換を開始します...")

        tasks = [process_single_item(i, raw_data) for i, raw_data in enumerate(raw_data_list)]
        conversations = await asyncio.gather(*tasks)

        with open(output_file, 'w', encoding='utf-8') as outfile:
            json.dump(conversations, outfile, ensure_ascii=False, indent=4)

        print(f"変換完了!{len(raw_data_list)} 件のレコードを処理しました。")
        print(f"出力ファイル: {output_file}")
        print("=== フェーズ 2 完了 ===")

    except FileNotFoundError:
        print("エラー: us_recipient_data.json が見つかりません。")
        sys.exit(1)
    except json.JSONDecodeError as e:
        print(f"JSON デコードエラー: {e}")
        sys.exit(1)
    except Exception as e:
        print(f"変換中にエラーが発生しました: {e}")
        sys.exit(1)


# メイン関数。
async def main():
    print("データ処理パイプラインを開始します...")
    print("このプログラムは、2 つのフェーズを順番に実行します:")
    print("1. 生の米国の受信者データを生成します。")
    print("2. 構造化データを予測し、SFT 形式に変換します。")
    print("-" * 50)

    # フェーズ 1: データを生成します。
    success = await produce_data_phase()

    if success:
        # フェーズ 2: データを変換します。
        await convert_data_phase()

        print("\n" + "=" * 50)
        print("すべてのプロセスが正常に完了しました!")
        print("生成されたファイル:")
        print("- us_recipient_data.json: 生の非構造化データリスト。")
        print("- us_recipient_sft_data.json: SFT 形式のトレーニングデータ。")
        print("=" * 50)
    else:
        print("データ生成フェーズが失敗しました。終了します。")


if __name__ == '__main__':
    # 必要に応じて Windows のイベントループポリシーを設定します。
    if platform.system() == 'Windows':
        asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())

    # メインコルーチンを実行します。
    asyncio.run(main(), debug=False)

モデルをファインチューニングする

  1. 左側のナビゲーションウィンドウで、[モデルギャラリー]をクリックします。[Qwen3-0.6B] モデルを検索し、[トレーニング]をクリックします。

    image

  2. トレーニングタスクのパラメーターを設定します。次の主要なパラメーターを設定し、その他はデフォルト値のままにします。

    • トレーニングモード: デフォルトは SFT (教師ありファインチューニング) で、LoRA ファインチューニングメソッドを使用します。

      LoRA は、モデルのパラメーターのサブセットのみを変更することでトレーニングリソースを節約する、効率的なファインチューニング技術です。
    • トレーニングデータセット: まず、サンプルのトレーニングデータセット train.json をダウンロードします。次に、設定ページで [OSS ファイルまたはディレクトリ] を選択し、image アイコンをクリックしてバケットを選択します。[ファイルのアップロード] をクリックして、データセットを Object Storage Service (OSS) にアップロードします。最後に、ファイルを選択します。

      image

    • 検証データセット: まず、検証データセット eval.json をダウンロードします。次に、[検証データセットの追加] をクリックし、トレーニングデータセットと同じ手順でファイルをアップロードして選択します。

      検証データセットは、トレーニング中に、未知のデータに対するモデルのパフォーマンスを評価するために使用されます。
    • ModelOutput パス: デフォルトでは、ファインチューニングされたモデルは OSS に保存されます。OSS ディレクトリが空の場合は、[フォルダの作成] をクリックしてディレクトリを指定します。

    • リソースタイプ: [パブリックリソース] を選択します。このファインチューニングタスクには約 5 GB の GPU メモリが必要です。コンソールでは、この要件を満たす利用可能なリソース仕様がすでにフィルタリングされています。ecs.gn7i-c16g1.4xlarge などの仕様を選択します。

    • ハイパーパラメーター構成:

      • learning_rate: 0.0005 に設定

      • num_train_epochs: 4 に設定

      • per_device_train_batch_size: 8 に設定

      • seq_length: 512 に設定

      [トレーニング] > [OK] をクリックします。トレーニングタスクのステータスが [作成中] に変わります。ステータスが [実行中] に変わると、モデルのファインチューニングプロセスが開始されます。

  3. トレーニングタスクを表示し、完了するまで待ちます。モデルのファインチューニングプロセスには約 10 分かかります。ファインチューニング中、タスク詳細ページにはログとメトリック曲線が表示されます。トレーニングが完了すると、ファインチューニングされたモデルは指定された OSS ディレクトリに保存されます。

    後でトレーニングタスクの詳細を表示するには、左側のナビゲーションウィンドウで [モデルギャラリー] > [タスク管理] > [トレーニングタスク] をクリックし、タスク名をクリックします。

    image

    (オプション) 損失グラフに基づいてハイパーパラメーターを調整してモデルのパフォーマンスを向上させる

    タスク詳細ページでは、train_loss 曲線 (トレーニングセットの損失を反映) と eval_loss 曲線 (検証セットの損失を反映) を表示できます。

    imageimage

    損失値の傾向を使用して、モデルのトレーニング効果を評価できます。

    • アンダーフィッティング: トレーニングが終了しても、train_loss 曲線と eval_loss 曲線はまだ減少し続けています。

      num_train_epochs (トレーニングエポック数、トレーニングの深さと正の相関あり) を増やすか、lora_rank (低ランク行列のランク。ランクが大きいほどモデルはより複雑なタスクを学習できますが、オーバーフィッティングのリスクが高まります) をわずかに増やしてから、モデルを再トレーニングしてトレーニングデータへの適合度を向上させることができます。

    • オーバーフィッティング: train_loss は減少し続けますが、eval_loss はトレーニングが終了する前に増加し始めます。

      num_train_epochs を減らすか、lora_rank をわずかに減らしてから、モデルを再トレーニングしてオーバーフィッティングを防ぐことができます。

    • 良好な適合: train_loss 曲線と eval_loss 曲線の両方が安定し、平坦になっています。モデルがこの状態に達したら、先に進むことができます。

ファインチューニング済みモデルをデプロイする

トレーニングジョブの詳細ページで、[デプロイ]ボタンをクリックしてデプロイメント設定ページを開きます。[リソースタイプ][パブリックリソース]に設定します。0.6B モデルのデプロイには約 5 GB の GPU メモリが必要です。[リソース仕様] ドロップダウンには、この要件を満たす仕様がすでにフィルタリングされて表示されています。ecs.gn7i-c8g1.2xlargeなどの仕様を選択します。他のパラメーターはデフォルト設定のままにして、 [デプロイ] > [OK]をクリックします。

デプロイメントプロセスには約 5 分かかります。ステータスが [実行中]に変わると、デプロイメントは成功です。

後でトレーニングタスクの詳細を表示するには、左側のナビゲーションウィンドウで [モデルギャラリー] > [タスク管理] > [トレーニングタスク] をクリックし、タスク名をクリックします。

image

トレーニングタスクが成功した後に [デプロイ] ボタンが無効になっている場合は、出力モデルがまだ登録中であることを意味します。約 1 分間お待ちください。

image

モデルを呼び出すための後続の手順は、「モデルの呼び出し」セクションで説明されているものと同じです。

ファインチューニング済みモデルのパフォーマンス検証

ファインチューニングされたモデルを本番環境にデプロイする前に、そのパフォーマンスを体系的に評価して、安定性と精度を確保し、デプロイ後の予期せぬ問題を回避します。

テストデータの準備

モデルのパフォーマンスをテストするために、トレーニングデータと重複しないテストデータセットを準備します。このトピックでは、以下の精度テストコードを実行すると自動的にダウンロードされるテストセットを提供します。

テストデータはトレーニングデータと重複しないようにしてください。これにより、新しいデータに対するモデルの汎化能力をより正確に反映し、サンプルの記憶によるスコアの過大評価を回避できます。

評価指標の設計

評価基準は、実際のビジネス目標と密接に一致させる必要があります。このソリューションの例では、生成された JSON 文字列が有効かどうかを確認するだけでなく、対応するキーと値のペアが正しいかどうかも確認する必要があります。

評価メトリックはプログラムで定義する必要があります。この例の評価メトリックの実装については、以下の精度テストコードの compare_address_info メソッドを参照してください。

ファインチューニング済みモデルのパフォーマンス検証

次のテストコードを実行すると、テストセットに対するモデルの精度が出力されます。

モデルの精度をテストするためのサンプルコード

注: TokenEndpoint をお使いのサービスの値に置き換えてください。

# pip3 install openai
from openai import AsyncOpenAI
import requests
import json
import asyncio
import os

# 'Token' 環境変数が設定されていない場合は、次の行を EAS サービスのトークンに置き換えてください: token = 'YTA1NTEzMzY3ZTY4Z******************'
token = os.environ.get("Token")

# サービス URL の後の "/v1" サフィックスを削除しないでください。
client = OpenAI(
    api_key=token,
    base_url=f'<Your_Service_URL>/v1',
)

if token is None:
    print("'Token' 環境変数を設定するか、トークンを 'token' 変数に直接割り当ててください。")
    exit()

system_prompt = """あなたは、非構造化テキストから米国の配送先住所を解析することに特化した、プロの情報抽出アシスタントです。

## タスクの説明
指定された入力テキストに基づいて、以下の 6 つのフィールドを含む JSON オブジェクトを正確に抽出して生成します:
- name: 受信者のフルネーム。
- street_address: 番地、通り名、アパートまたはスイート番号を含む完全な住所。
- city: 都市名。
- state: 完全な州名 (例: "California" であり "CA" ではない)。
- zip_code: 5 桁または 9 桁の郵便番号。
- phone: 完全な連絡先電話番号。

## 抽出ルール
1.  **住所の処理**:
    -   コンポーネントを正確に識別します: 通り、都市、州、郵便番号。
    -   `state` フィールドは、完全な公式名でなければなりません (例: "New York" であり "NY" ではない)。
    -   `street_address` には、都市の前のすべての詳細を含める必要があります (例: "123 Apple Lane, Apt 4B")。
2.  **名前の識別**:
    -   受信者のフルネームを抽出します。
3.  **電話番号の処理**:
    -   元の形式を維持して、完全な電話番号を抽出します。
4.  **郵便番号**:
    -   5 桁または 9 桁 (ZIP+4) のコードを抽出します。

## 出力形式
以下の JSON 形式に厳密に従ってください。説明文やマークダウンは追加しないでください。
{
  "name": "受信者のフルネーム",
  "street_address": "完全な住所",
  "city": "都市名",
  "state": "完全な州名",
  "zip_code": "郵便番号",
  "phone": "連絡先電話番号"
}
"""


def compare_address_info(actual_address_str, predicted_address_str):
    """住所情報を表す 2 つの JSON 文字列を比較して、それらが同一であるかどうかを確認します。"""
    try:
        # 実際の住所情報を解析します
        if actual_address_str:
            actual_address_json = json.loads(actual_address_str)
        else:
            actual_address_json = {}

        # 予測された住所情報を解析します
        if predicted_address_str:
            predicted_address_json = json.loads(predicted_address_str)
        else:
            predicted_address_json = {}

        # 2 つの JSON オブジェクトが同一であるかどうかを直接比較します
        is_same = actual_address_json == predicted_address_json

        return {
            "is_same": is_same,
            "actual_address_parsed": actual_address_json,
            "predicted_address_parsed": predicted_address_json,
            "comparison_error": None
        }

    except json.JSONDecodeError as e:
        return {
            "is_same": False,
            "actual_address_parsed": None,
            "predicted_address_parsed": None,
            "comparison_error": f"JSON 解析エラー: {str(e)}"
        }
    except Exception as e:
        return {
            "is_same": False,
            "actual_address_parsed": None,
            "predicted_address_parsed": None,
            "comparison_error": f"比較エラー: {str(e)}"
        }


async def predict_single_conversation(conversation_data):
    """単一の会話のラベルを予測します。"""
    try:
        # ユーザーコンテンツを抽出します (アシスタントメッセージを除く)
        messages = conversation_data.get("messages", [])
        user_content = None

        for message in messages:
            if message.get("role") == "user":
                user_content = message.get("content", "")
                break

        if not user_content:
            return {"error": "ユーザーメッセージが見つかりません"}

        response = await client.chat.completions.create(
            model="Qwen3-0.6B",
            messages=[
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": user_content}
            ],
            response_format={"type": "json_object"},
            extra_body={
                "enable_thinking": False
            }
        )

        predicted_labels = response.choices[0].message.content.strip()
        return {"prediction": predicted_labels}

    except Exception as e:
        return {"error": f"予測に失敗しました: {str(e)}"}


async def process_batch(batch_data, batch_id):
    """データのバッチを処理します。"""
    print(f"バッチ {batch_id} を処理中、{len(batch_data)} 個のアイテムを含みます...")

    tasks = []
    for i, conversation in enumerate(batch_data):
        task = predict_single_conversation(conversation)
        tasks.append(task)

    results = await asyncio.gather(*tasks, return_exceptions=True)

    batch_results = []
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            batch_results.append({"error": f"例外: {str(result)}"})
        else:
            batch_results.append(result)

    return batch_results


async def main():
    output_file = "predicted_labels.jsonl"
    batch_size = 20  # バッチごとに処理するアイテム数

    # テストデータを読み込みます
    url = 'https://help-static-aliyun-doc.aliyuncs.com/file-manage-files/zh-CN/20251015/yghxco/test.jsonl'
    conversations = []

    try:
        response = requests.get(url)
        response.raise_for_status()  # リクエストが成功したかどうかを確認します
        for line_num, line in enumerate(response.text.splitlines(), 1):
            try:
                data = json.loads(line.strip())
                conversations.append(data)
            except json.JSONDecodeError as e:
                print(f"行 {line_num} で JSON 解析エラー: {e}")
                continue
    except requests.exceptions.RequestException as e:
        print(f"リクエストエラー: {e}")
        return

    print(f"{len(conversations)} 個の会話データアイテムを正常に読み込みました")

    # バッチで処理します
    all_results = []
    total_batches = (len(conversations) + batch_size - 1) // batch_size

    for batch_id in range(total_batches):
        start_idx = batch_id * batch_size
        end_idx = min((batch_id + 1) * batch_size, len(conversations))
        batch_data = conversations[start_idx:end_idx]

        batch_results = await process_batch(batch_data, batch_id + 1)
        all_results.extend(batch_results)

        print(f"バッチ {batch_id + 1}/{total_batches} が完了しました")

        # リクエストをあまりにも速く送信しないように、短い遅延を追加します
        if batch_id < total_batches - 1:
            await asyncio.sleep(1)

    # 結果を保存します
    same_count = 0
    different_count = 0
    error_count = 0

    with open(output_file, 'w', encoding='utf-8') as f:
        for i, (original_data, prediction_result) in enumerate(zip(conversations, all_results)):
            result_entry = {
                "index": i,
                "original_user_content": None,
                "actual_address": None,
                "predicted_address": None,
                "prediction_error": None,
                "address_comparison": None
            }

            # 元のユーザーコンテンツを抽出します
            messages = original_data.get("messages", [])
            for message in messages:
                if message.get("role") == "user":
                    result_entry["original_user_content"] = message.get("content", "")
                    break

            # 実際の住所情報を抽出します (アシスタントメッセージが存在する場合)
            for message in messages:
                if message.get("role") == "assistant":
                    result_entry["actual_address"] = message.get("content", "")
                    break

            # 予測結果を保存します
            if "error" in prediction_result:
                result_entry["prediction_error"] = prediction_result["error"]
                error_count += 1
            else:
                result_entry["predicted_address"] = prediction_result.get("prediction", "")

                # 住所情報を比較します
                comparison_result = compare_address_info(
                    result_entry["actual_address"],
                    result_entry["predicted_address"]
                )
                result_entry["address_comparison"] = comparison_result

                # 比較結果を集計します
                if comparison_result["comparison_error"]:
                    error_count += 1
                elif comparison_result["is_same"]:
                    same_count += 1
                else:
                    different_count += 1

            f.write(json.dumps(result_entry, ensure_ascii=False) + '\n')

    print(f"すべての予測が完了しました!結果は {output_file} に保存されました")

    # 統計
    success_count = sum(1 for result in all_results if "error" not in result)
    prediction_error_count = len(all_results) - success_count
    print(f"サンプル数: {success_count}")
    print(f"正解応答数: {same_count}")
    print(f"不正解応答数: {different_count}")
    print(f"精度: {same_count * 100 / success_count} %")


if __name__ == "__main__":
    asyncio.run(main())

出力:

すべての予測が完了しました!結果は predicted_labels.jsonl に保存されました
サンプル数: 400
正解応答数: 382
不正解応答数: 18
精度: 95.5 %
モデルのファインチューニングにおけるランダムシードと、大規模モデルの出力の確率的な性質により、得られる精度はこのトピックの結果と異なる場合があります。これは正常です。

精度は 95.5% で、元の Qwen3-0.6B モデルの 50% の精度から大幅に向上しています。これは、ファインチューニングされたモデルが物流ドメインで構造化情報を抽出する能力を大幅に向上させたことを示しています。

トレーニング時間を短縮するため、このガイドでは 4 トレーニングエポックのみを使用しており、これにより精度はすでに 90% を超えています。トレーニングエポック数を増やすことで、精度をさらに向上させることができます。

重要なリマインダー

このガイドのモデルサービスは、パブリックリソースを使用して作成されており、従量課金制で課金されます。追加料金を避けるため、終了したらサービスを停止または削除してください。

image

参考資料

  • 評価や圧縮などの Model Gallery の機能の詳細については、「Model Gallery」をご参照ください。

  • Auto Scaling、ストレステスト、モニタリングとアラートなどの EAS 機能の詳細については、「EAS の概要」をご参照ください。