すべてのプロダクト
Search
ドキュメントセンター

Alibaba Cloud Model Studio:レート制限への対処に関するベストプラクティス

最終更新日:Apr 21, 2026

このトピックでは、Model Studio API のレート制限メカニズムについて説明し、スループットを向上させ、サービス可用性を確保するための、さまざまなシナリオに応じたトラフィック制御戦略を提案します。

Model Studio API は、リクエスト数、トークン使用量、および時間経過に伴う増加率を制限します。これはレート制限と呼ばれます。大規模言語モデル (LLM) サービスはレイテンシーが長く、リクエスト数とトークン量の両方を制限する2次元レート制限を使用します。従来のエラー時のリトライ戦略はこれらのサービスには効果がないため、特定のトラフィック制御策を実装する必要があります。

このトピックでは、実装コストの低い順に 3 種類のソリューションを紹介します。

現在 429 エラーのトラブルシューティングを行っている場合は、「エラー診断と推奨戦略」に進み、原因を特定してください。

プラットフォームのレート制限メカニズム

レート制限は、ルートアカウントレベルで各モデルごとに個別に計算されます。レート制限がトリガーされた後、サービスは通常 1 分以内に再開します。各モデルの具体的なレート制限条件と現在の使用量については、「レート制限」および「モデルのモニタリング」をご参照ください。Model Studio API には、以下の 3 種類のレート制限ルールが含まれています。

  • 分単位のクォータ制限 (RPM / TPM):1分あたりの最大リクエスト数 (RPM) と1分あたりの最大トークン使用量 (TPM)。

  • 瞬間的な頻度制限 (RPS / TPS):1秒あたりの最大リクエスト数 (RPS) と1秒あたりの最大トークン使用量 (TPS)。1秒以内に API 呼び出しやトークン消費が集中すると、レート制限がトリガーされる可能性があります。

  • 増加率制限 (トラフィックバースト):リクエスト量やトークン使用量が急増すると、レート制限がトリガーされます。しきい値はサービスの状況に応じて動的に調整されます。リクエスト量を徐々に増やすことで、この制限のトリガーを回避できます。

これらのレート制限メカニズムに基づき、以下のセクションではプラットフォーム設定、クライアント側のトラフィック制御、およびアーキテクチャレベルのフォールバックソリューションについて説明します。

エラー診断と推奨戦略

同じエラーコードが異なるレート制限ディメンションによってトリガーされることがあります。さらに、高い同時実行性下でのサーバー飽和も、レスポンスの遅延やタイムアウトにつながる可能性があります。この問題は、このトピックで後述する適応型輻輳制御戦略を使用して緩和できます。

エラーコード (DashScope / OpenAI)

トリガーディメンション

特徴診断

推奨戦略

Throttling.RateQuota / limit_requests

リクエストレート超過
(RPM 超過)

断続的なエラー。成功率は時間とともに低下します。

トークンバケット:単位時間あたりのリクエストクォータを制御します。

リクエストレート超過
(RPS 超過)

起動時や同時実行数の急増時にエラーが集中します。

同時実行セマフォまたは平滑化レートリミッター:リクエスト間の間隔を広げます。

Throttling.AllocationQuota / insufficient_quota

トークン使用量超過
(TPM 超過)

長文テキスト処理時に断続的なエラーが発生します。

デュアルトークンバケット:RPM と TPM の両方のクォータを同時に制限します。

トークン使用量超過
(TPS 超過)

長文テキストの並行処理中に瞬間的なトークン消費量が高すぎます。

同時実行セマフォまたは平滑化レートリミッター

Throttling.BurstRate / limit_burst_rate

トラフィック増加率超過
(トラフィックバースト)

起動後またはアイドル状態からの復帰後に、突然大量のリクエストが発生します。

スロースタートを実装するために、initial_tokens=0 のような低い初期値を持つトークンバケットを使用します。または、ピークシフトのために平滑化レートリミッターを使用します。

プラットフォーム設定ソリューション

以下のソリューションは、プラットフォーム側の設定やリソース調整を通じて、レート制限の問題を緩和または解消するのに役立ちます。

クォータ制限の引き上げ

デフォルトのクォータが不十分な場合、Model Studio コンソールでモデルの一時的なレート制限クォータを直接引き上げることができます。変更はすぐに有効になります。この機能は現在、中国 (北京) およびシンガポールリージョンでサポートされています。

シナリオ:ビジネスの成長によりデフォルトの RPM/TPM クォータが不十分な場合、または短期的なイベントのために一時的なスループットの増加が必要な場合。詳細については、「レート制限」をご参照ください。

クォータの引き上げは簡単です。クライアント側のトラフィック制御戦略を試す前に、このオプションを評価してください。

Provisioned Throughput Unit (PTU)

PTU サービスは、専用の予約済みコンピューティング能力を提供します。これは、リアルタイムで高いスループット要件を満たし、パブリックリソースプールでのコンピューティング能力の競合を回避するための推奨ソリューションです。

このソリューションは、サービスレベルアグリーメント (SLA) のコミットメントなど、ビジネスに確定的なスループット要件がある場合や、複雑なクライアント側のトラフィック制御開発なしで安定した高いスループットを達成したい場合に適しています。

PTU は予約済みリソースであり、完全に使用されていない場合でも継続的に課金されます。リソースの無駄を避けるために、実際のピークビジネス負荷に基づいて必要な仕様を評価してください。

非同期バッチ処理 (Batch API)

データクレンジングやバッチ分析など、厳密なリアルタイム要件がないタスクについては、Batch API を使用してバッチ処理のために送信できます。これらのタスクはオフピーク時に実行され、結果を非同期で提供し、リアルタイムのオンラインリクエスト頻度やトラフィック制限の対象にはなりません。

このソリューションは、データアノテーション、ログ分析、バッチ要約生成など、数時間から数日の結果返却時間を許容できるオフラインタスクに適しています。Batch API のコストは、通常、リアルタイム API 呼び出しよりも低くなります。

Batch API の結果返却時間は保証されていません。即時の応答を必要とするオンラインサービスには適していません。タスクを送信した後、ポーリングまたはコールバックを通じて結果を取得する必要があります。

クライアント側のトラフィック制御戦略

プラットフォーム設定ソリューションでニーズを満たせない場合は、クライアント側にトラフィック制御メカニズムを導入する必要があります。基本原則は、バーストトラフィックがレート制限をトリガーするのを避けるため、タイムウィンドウ内でリクエストを可能な限り均等に分散させることです。システムが起動したときや長時間のアイドル状態の後には、最大レベルを即座に使用するのではなく、同時実行レベルを徐々に上げる必要があります。

以下の 4 つの戦略は、エンジニアリングの複雑さが増す順にリストされています。各戦略は、前の戦略の機能を含み、それらを強化します。

  • 基本的なリトライは受動的な防御のみを提供します。

  • リクエストレート制限はアクティブなキューイングを追加します。

  • トラフィックシェーピングはさらにトークンレベルの制御とスムーズな送信を導入します。

  • 適応型輻輳制御はリアルタイムのフィードバックに基づいて送信レートを動的に調整します。

ビジネスニーズを満たす最も実装コストの低い戦略を選択してください。

各戦略のスループット性能比較

image

以下は、異なる負荷下での 4 つのクライアント側トラフィック制御戦略の有効スループット性能を比較したものです。

  • 基本的なリトライ戦略:低負荷下では効果的です。高い同時実行性下では輻輳崩壊を起こしやすく、スループットが急激に低下する原因となります。

  • リクエストレート制限戦略:輻輳崩壊に対する強力な保護を提供します。しかし、長文テキストを含む混合ワークロード下では、トークン制御がないため、スループットはのこぎり状の変動を示します。

  • トラフィックシェーピング戦略:高い安定性を持ちます。一部のピークスループットを犠牲にすることで、スムーズな出力を実現します。

  • 適応型輻輳制御戦略:高負荷下で安定した高いスループットポイントに動的に収束できますが、コールドスタート時の探索オーバーヘッドがあります。

基本的なリトライ戦略

この戦略は、個人テスト、ローカルスクリプト、低頻度のバックグラウンドタスクなど、高い同時実行性を伴わないシナリオに適しています。デフォルトでは送信レートを制限しません。429 または 5xx エラーを受け取った場合にのみ、ランダムジッター付きのエクスポネンシャルバックオフリトライをトリガーします。

この戦略には積極的なトラフィック制御がありません。マルチスレッドの同時実行下では、レート制限を容易にトリガーし、多くのリクエストが滞留して失敗する原因となります。

コード例

tenacity ライブラリの使用

import openai
from openai import OpenAI
from tenacity import (
    retry,
    stop_after_attempt,
    wait_random_exponential,
    retry_if_exception_type
)

RETRYABLE_ERRORS = (
    openai.RateLimitError,
    openai.InternalServerError,
    openai.APIConnectionError,
)

@retry(
    wait=wait_random_exponential(min=1, max=60),
    stop=stop_after_attempt(6),
    retry=retry_if_exception_type(RETRYABLE_ERRORS)
)
def chat_with_retry(client, model, messages, max_tokens):
    return client.chat.completions.create(
        model=model,
        max_tokens=max_tokens,
        messages=messages
    )

client = OpenAI(
    base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
    api_key="YOUR_DASHSCOPE_API_KEY"
)

try:
    response = chat_with_retry(
        client=client,
        model="qwen-plus",
        messages=[{"role": "user", "content": "What is exponential backoff retry?"}],
        max_tokens=1024
    )
    print(response.choices[0].message.content)
except Exception as e:
    print(f"Request failed: {e}")

ネイティブ実装 (依存関係なし)

import time
import random
import openai
from openai import OpenAI

RETRYABLE_ERRORS = (
    openai.RateLimitError,
    openai.InternalServerError,
    openai.APIConnectionError,
)

def chat_with_retry(client, model, messages, max_tokens):
    attempt = 0
    max_retries = 5
    base_delay = 1
    max_delay = 60

    while attempt <= max_retries:
        try:
            return client.chat.completions.create(
                model=model,
                max_tokens=max_tokens,
                messages=messages
            )
        except RETRYABLE_ERRORS as e:
            attempt += 1
            if attempt > max_retries:
                raise e
            backoff = min(max_delay, base_delay * (2 ** (attempt - 1)))
            sleep_time = backoff + random.uniform(0, 1)
            print(f"Triggered {type(e).__name__}, retrying after {sleep_time:.2f}s...")
            time.sleep(sleep_time)

client = OpenAI(
    base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
    api_key="YOUR_DASHSCOPE_API_KEY"
)

try:
    response = chat_with_retry(
        client=client,
        model="qwen-plus",
        messages=[{"role": "user", "content": "What is exponential backoff retry?"}],
        max_tokens=1024
    )
    print(response.choices[0].message.content)
except Exception as e:
    print(f"Request failed: {e}")

上記のコードでは、固定間隔のリトライではなく、エクスポネンシャルバックオフを使用しています。固定間隔のリトライ (例えば、失敗したすべてのリクエストを 3 秒後にリトライするなど) は、すべてのリクエストが同時に再送信される原因となります。これにより、再びレート制限がトリガーされ、持続的な輻輳につながる可能性があります。ランダムジッター付きのエクスポネンシャルバックオフは、リトライを「分散」させます。

  • 待機時間が段階的に倍増:例えば、1s, 2s, 4s... のようにします。これにより、短時間での繰り返しリクエストを回避します。

  • ランダムジッターの追加2s +/- 0.5s のようにランダムな値を導入することで、リトライのトラフィックを分散させます。これにより、多くのリクエストが同時にリトライして二次的なフラッド (サンダーリングハード効果) を引き起こすのを防ぎます。

これにより、システムは「失敗→一斉リトライ→再失敗」という悪循環に陥ることなく、分散的に回復できます。

リクエストレート制限戦略

実際のビジネストラフィックに対しては、受動的なリトライだけに頼るのでは不十分です。頻繁なリトライはレスポンスレイテンシーを大幅に増加させます。リクエストレート制限戦略は、アクティブなトラフィック制御を導入します。リクエストを送信する前に自己チェックと調整を行い、大量の無秩序なリクエストの流入を、プラットフォームの RPM 制限に準拠したスムーズなキューに整理します。レート制限がトリガーされた後、回復には通常時間がかかります。リクエストのリズムを積極的に平滑化することで、小さく制御可能なキューイング遅延が発生します。しかし、これは受動的な「エラー→待機→リトライ」ループで費やされる時間よりもはるかにコストが低いです。要するに、予測不能な大きな遅延を避けるために、小さく予測可能なコストを負担するということです。

この戦略は、チャットボットやその他の軽量なリクエスト・レスポンス型のインタラクションなど、最初のトークンまでの時間 (TTFT) に敏感なオンラインサービスに適しています。

この戦略は、クライアント側で 2 段階の制御を持つアクティブなキューイングを実装します。

  • RPM トークンバケット:1分あたりの総リクエスト数を制限します。バケットの容量は RPM クォータであり、トークンは一定のレートで補充されます。この方法は借用をサポートします。トークンが不十分な場合、リクエストは将来のクォータから借用できますが、厳密に先入れ先出し (FIFO) の順序に従う必要があります。

  • 同時実行セマフォ:同時リクエスト数を制限します。非同期セマフォが処理中のリクエストを制御し、瞬間的な高い同時実行性が RPS 制限をトリガーするのを防ぎ、クライアントの過負荷を回避します。

これら 2 段階の制御は、まず RPM トークンを取得し、次に同時実行セマフォを取得するという厳密な順序で実行する必要があります。同時実行スロットは希少なリソースであり、実行条件を満たしたリクエストにのみ割り当てるべきです。順序が逆 (スロットを先に占有し、次にトークンを待つ) の場合、高負荷下でhead-of-line ブロッキングを容易に引き起こす可能性があります。リクエストがスロットを占有しても利用可能なトークンがなく、実行されずに長時間スロットを保持します。すべてのスロットが占有されますが、実際にはリクエストは送信されません。基本原則は、希少なリソースを保持している間は、潜在的に長時間の待機を実行しないことです。

以下のコードでは、トークンバケットをフルの状態 (initial_tokens=rpm_limit) で初期化しています。これは、軽量なオンラインサービスが起動時に即座にリクエストを処理するのに適しています。フルバケットで開始するとレート制限エラーがトリガーされる場合は、初期トークン数を下げることができます。例えば、initial_tokens=0 に設定すると、「空バケットスタート」となり、システムがより緩やかなペースで動作状態に入ることができます。

この戦略はトークン使用量を追跡しません。長文テキストのタスクでは、TPM クォータを使い果たすことでレート制限がトリガーされる可能性があります。

コード例

コアコンポーネント:トークンバケット

import time

class TokenBucket:
    """
    1分あたりのリクエスト数 (RPM) を制御するためのトークンバケット実装。
    高い同時実行性下で先入れ先出し (FIFO) 順序を保証するための負債メカニズムをサポート。
    """
    def __init__(self, quota_per_minute: float, initial_tokens: float = 0.0):
        self.capacity = quota_per_minute
        self.tokens = initial_tokens
        self.refill_rate = quota_per_minute / 60.0
        self.last_refill = time.monotonic()

    def reserve(self, cost: float = 1.0) -> float:
        """
        トークンを取得する。
        トークンが不十分な場合、待機する秒数を返す (負債をサポート)。
        """
        self._refill()

        # 1. トークンが十分な場合:直接差し引く
        if self.tokens >= cost:
            self.tokens -= cost
            return 0.0

        # 2. トークンが不十分な場合:待機時間を計算し、負債を発生させる
        # FIFO 順序を保証するために、現在のリクエストのために将来のトークンを「予約」する
        deficit = cost - self.tokens
        wait_seconds = deficit / self.refill_rate
        self.tokens -= cost
        return wait_seconds

    def _refill(self):
        """経過時間に基づいてトークンを補充する。"""
        now = time.monotonic()
        elapsed = now - self.last_refill
        if elapsed > 0:
            self.tokens = min(self.capacity, self.tokens + elapsed * self.refill_rate)
            self.last_refill = now

クライアントロジック

import asyncio
import openai
from openai import AsyncOpenAI
from tenacity import retry, wait_random_exponential, stop_after_attempt, retry_if_exception_type

class RateLimitedClient:
    def __init__(
        self,
        api_key: str,
        base_url: str = "https://dashscope.aliyuncs.com/compatible-mode/v1",
        rpm_limit: float = 600.0,
        max_concurrency: int = 20
    ):
        self.client = AsyncOpenAI(api_key=api_key, base_url=base_url)
        # コンポーネント 1:RPM トークンバケット (総量を制御)
        self.rpm_bucket = TokenBucket(
            quota_per_minute=rpm_limit,
            initial_tokens=rpm_limit  # フルバケットで開始、軽量なオンラインサービスに適している
        )
        # コンポーネント 2:同時実行セマフォ (瞬間的な同時実行数を制御)
        self.semaphore = asyncio.Semaphore(max_concurrency)

    async def _execute_request(self, model, messages, max_tokens):
        """単一のリクエストを実行し、RPM チェックと同時実行数制限を順に通過させる。"""
        # 1. RPM チェック (最初にトークンを取得)
        wait_seconds = self.rpm_bucket.reserve(1.0)
        if wait_seconds > 0:
            await asyncio.sleep(wait_seconds)
        # 2. 同時実行数チェック (次にセマフォを取得)
        async with self.semaphore:
            # 3. API 呼び出しを行う
            return await self.client.chat.completions.create(
                model=model,
                messages=messages,
                max_tokens=max_tokens
            )

    @retry(
        wait=wait_random_exponential(min=1, max=60),
        stop=stop_after_attempt(5),
        retry=retry_if_exception_type((
            openai.RateLimitError,
            openai.InternalServerError,
            openai.APIConnectionError
        ))
    )
    async def chat_with_limit(self, model, messages, max_tokens=1024):
        # 設計上の考慮事項:なぜリトライもトークンを再取得する必要があるのか?
        # 回答:安全のため。再取得しないと、リトライによるトラフィックのパルスが
        # 即座に RPM 制限を超える可能性がある。
        return await self._execute_request(model, messages, max_tokens)

トラフィックシェーピング戦略

リアルタイム RAG インジェスチョンや長文ドキュメントの一括分析など、高く安定したスループットを必要とするバッチ処理シナリオでは、リクエストレート制限戦略には重大なTPM の死角があります。これに対処するため、トラフィックシェーピング戦略はデュアルリソース認識 (RPM & TPM) を提供します。また、送信側にシェーピングメカニズムを導入し、バースト的なトラフィックをピークシフトしてスムーズなフローに変換します。

この戦略は、元のリクエストレート制限戦略を以下の機能で強化します。

  • デュアルリソース制御 (RPM & TPM):RPM と TPM の両方のトークンバケットを維持します。すべてのリクエストは、送信される前に両方のディメンションのクォータチェックを通過する必要があります。

  • 入力時の事前控除、出力時の事後精算:モデルの出力長はリクエスト前には不明です。TPM トークンバケットは、送信時に入力トークンのみを事前控除します。リクエスト完了後、実際の出力トークンが精算されます。精算時にクォータが不足していても (マイナストークン)、後続のリクエストはトークン数がプラスになるのを待つため、自然にフローレートが平滑化されます。

  • 継続的なウォームアップ:コールドスタート時、トークン発行レートは時間とともに線形に増加し、初期バーストのリスクを排除します。

  • 平滑化レートリミッター (ペーシング):リクエスト間の最小間隔を強制することで送信レートを平滑化し (ペーシング)、レート制限がトリガーされるリスクを低減します。

代替ソリューションの参考:ビジネスが起動時のわずかなキューイング遅延に敏感でない場合は、標準のトークンバケットロジックを再利用 (initial_tokens=0 に設定) することで、クライアントの複雑さを軽減しつつ安全なスタートを実現できます。また、このトピックの Python トークンバケット実装は設計概念を実証するためのものです。本番環境では、Java の Guava の SmoothRateLimiter など、言語のエコシステムから成熟したレート制限コンポーネントを使用してください。

コード例では、平滑化待機は head-of-line ブロッキングによるリクエストバーストを避けるために、同時実行ロックの内側に配置されています。複数のリクエストが待機終了後に同時に同時実行セマフォを競合し、平滑化されたトラフィックが出口で再び輻輳する可能性があります。これにより同時実行効率はわずかに低下しますが、送信間隔の正確な制御が保証されます。

完全なトラフィックシェーピングパイプラインは次のとおりです:入力トークンの推定 → デュアルアドミッション (RPM & TPM) → 同時実行ロック → トラフィックシェーピング → 送信 → 出力トークンの精算

image

この戦略は、保守的な平滑化メカニズムのために、理論上の最大同時実行数の一部を犠牲にします。極めて低いレイテンシーを必要とするオンラインサービスには適していません。

コード例

高度なトークンバケット

import time

class TokenBucket:
    """継続的なウォームアップメカニズムをサポートする高度なトークンバケット。"""
    def __init__(self, quota_per_minute: float, warmup_seconds: float = 0.0):
        self.capacity = quota_per_minute
        self.tokens = 0.0
        self.target_refill_rate = quota_per_minute / 60.0
        self.warmup_seconds = warmup_seconds
        self.start_time = time.monotonic()
        self.last_update_time = self.start_time
        self.cumulative_generated = 0.0

    def _get_cumulative_tokens(self, t: float) -> float:
        if t <= 0:
            return 0.0
        R = self.target_refill_rate
        T = self.warmup_seconds
        if T <= 0:
            return R * t
        if t <= T:
            return (R / (2 * T)) * (t ** 2)
        else:
            warmup_total = (R * T) / 2.0
            return warmup_total + R * (t - T)

    def _get_time_for_cumulative_tokens(self, target_cumulative: float) -> float:
        if target_cumulative <= 0:
            return 0.0
        R = self.target_refill_rate
        T = self.warmup_seconds
        if T <= 0:
            return target_cumulative / R
        warmup_total = (R * T) / 2.0
        if target_cumulative <= warmup_total:
            return ((2 * T * target_cumulative) / R) ** 0.5
        else:
            return (target_cumulative - warmup_total) / R + T

    def reserve(self, cost: float = 1.0) -> float:
        now = time.monotonic()
        relative_now = now - self.start_time
        current_cumulative = self._get_cumulative_tokens(relative_now)
        new_tokens = current_cumulative - self.cumulative_generated
        self.tokens = min(self.capacity, self.tokens + new_tokens)
        self.cumulative_generated = current_cumulative
        self.last_update_time = now
        if self.tokens >= cost:
            self.tokens -= cost
            return 0.0
        deficit = cost - self.tokens
        self.tokens -= cost
        target_cumulative = self.cumulative_generated + deficit
        target_time = self._get_time_for_cumulative_tokens(target_cumulative)
        wait_seconds = target_time - relative_now
        return max(0.0, wait_seconds)

    def adjust(self, amount: float):
        self.tokens = min(self.capacity, self.tokens + amount)

平滑化レートリミッター

import time

class SmoothRateLimiter:
    def __init__(self, rate_per_minute: float):
        self._min_interval = 60.0 / rate_per_minute
        self._last_operation = time.monotonic()

    def reserve(self) -> float:
        now = time.monotonic()
        elapsed = now - self._last_operation
        wait_time = max(0.0, self._min_interval - elapsed)
        self._last_operation = now + wait_time
        return wait_time

クライアントロジック

import asyncio

class TrafficShapingClient:
    def __init__(self):
        self._rpm_bucket = TokenBucket(quota_per_minute=600)
        self._tpm_bucket = TokenBucket(quota_per_minute=1_000_000)
        self._smooth_limiter = SmoothRateLimiter(rate_per_minute=600)
        self._concurrency_semaphore = asyncio.Semaphore(20)

    async def _execute_throttled_request(self, model, prompt, max_tokens, input_tokens):
        # [ステップ 1] デュアルアドミッション制御
        # RPM と TPM の両方をチェックし、より長い待機時間を採用
        wait_rpm = self._rpm_bucket.reserve(1.0)
        # TPM チェックは入力トークンのクォータのみを要求
        wait_tpm = self._tpm_bucket.reserve(input_tokens)
        admission_wait = max(wait_rpm, wait_tpm)
        if admission_wait > 0:
            await asyncio.sleep(admission_wait)

        # [ステップ 2] 同時実行ロックの取得
        async with self._concurrency_semaphore:
            # [ステップ 3] トラフィックシェーピング
            # 重要:ロック内で平滑化待機を実行
            # 送信間隔の正確な制御のために、一部の同時実行効率を犠牲にする
            smooth_wait = self._smooth_limiter.reserve()
            if smooth_wait > 0:
                await asyncio.sleep(smooth_wait)

            # [ステップ 4] リクエストの送信
            content, actual_usage = await self._send_chat_request(model, prompt, max_tokens)

            # [ステップ 5] 出力トークンの精算
            output_tokens = actual_usage.completion_tokens
            if output_tokens > 0:
                self._tpm_bucket.adjust(-output_tokens)
            return content

適応型輻輳制御戦略

この戦略は、API ゲートウェイ、複雑なプロキシ、マルチテナントシステムなど、大規模で動的な混合負荷シナリオに適しています。

説明

選択のヒント:この戦略は万能のソリューションではありません

適応型輻輳制御戦略の核心的な価値は、非常に不確実変動の激しいビジネス環境に対応することです。これは万能の選択肢ではありません。

  • パフォーマンスのパラドックス:定量的なバッチ処理など、ビジネス負荷が予測可能で比較的安定している場合、経験に基づいて最適な静的パラメータを直接設定する方が、通常、「試行と収束」を必要とする動的な探索よりも優れたパフォーマンスを発揮します。

  • 探索のオーバーヘッド:境界を見つけるために、動的アルゴリズムには必然的にコールドスタートのランプアップと探索的な変動が伴います。既知のシナリオでは、この「探索コスト」は不要なパフォーマンス損失です。

  • メンテナンスコスト:クローズドループのフィードバックメカニズムを導入すると、システムの複雑さとトラブルシューティングの難易度が大幅に増加します。

ビジネスが非常に大規模で、複雑な負荷を持ち、著しい変動性がない限り、よりシンプルな最初の 3 つの戦略のいずれかを選択してください。

リクエストレート制限とトラフィックシェーピング戦略は、静的クォータに基づく古典的な防御戦略です。これらは、安定して予測可能な負荷を持つシナリオで完全に適用可能です。しかし、複雑なゲートウェイレベルのシナリオでは、ビジネスは両側から動的な変化に直面します。下流の負荷は複雑で変動し、高い同時実行性の短いリクエストと長時間実行される深い推論タスクが混在しています。プラットフォームのレート制限しきい値は、秒単位のレート制限と増加率検出のしきい値がサービス状況に基づいて調整されるため、動的に変動します。静的な戦略では、効率と安定性のバランスを取るのが困難です。

このポリシーは、BBR (Bottleneck Bandwidth and RTT) に着想を得ており、EBP (Elastic Bandwidth Probing) に基づくクローズドループ制御システムを確立します。RPM/TPM クォータを指針となる上限として使用し、このシステムはレイテンシーの変化やレート制限がアクティブかどうかなどのリアルタイムフィードバックに基づいて、スループットを最大化するための最適な送信レートを動的に計算します。

  • Elastic Bandwidth Probing (EBP):このメソッドは、過去の最高の成功ウォーターマークを保存します。現在の同時実行レベルと最高のウォーターマークとの距離に基づいて、バネの張力をシミュレートして探索ゲインを計算します。距離が遠いほど加速は速く、距離が近いほど減速は遅くなります。飽和度の高い区間でも継続的な境界探索を保証するために、小さな線形推力が追加されます。

  • TPT 輻輳認識:大規模モデルの生成時間はその長さに比例します。長文テキストの高いレイテンシーは、必ずしも輻輳を示しているわけではありません。TPT (Time Per Token)、つまりトークンあたりの処理時間をメトリックとして使用し、コンテンツ長からのノイズを除去します。TPT が著しく悪化した場合にのみ、計算飽和による輻輳と判断されます。

  • アンチバーストレートガバナー:EBP によって計算された目標同時実行レベルに関係なく、レートガバナーは同時実行数の増加の加速度を強制的に制限します。これにより、トラフィックがスムーズに増加し、増加率制限をトリガーする可能性のあるステップ状の変化を回避します。

image

ネイティブ BBR と比較して、この戦略には大規模モデル向けに以下の主要な変更が含まれています。

  • ガイド付き探索:既知の RPM/TPM クォータを「指針となる上限」として導入し、盲目的な探索による頻繁な衝突を回避します。

  • 信号ソースの変更 (RTT → TPT):ネイティブ BBR は RTT に依存します。しかし、大規模モデルのシナリオでは、コンテンツ長によるレイテンシーの差はネットワークジッターよりもはるかに大きいです。代わりに TPT を使用して、コンテンツ長からの干渉を排除します。

  • 応答メカニズムの強化 (ProbeRTT → Hold):レイテンシーの変動に直面した場合、積極的にバックオフしてスループットを低下させるのではなく、現在の同時実行レベルを維持することを選択します。

  • ハードレート制限への応答 (Packet Loss → 429 Drain)429 エラーがトリガーされると、即座に積極的な Drain 状態に入り、クールダウン期間後に高速回復を実行します。

この戦略には以下の制限があります。

  • 輻輳信号ノイズ (TPT Noise):現在の TPT は「総レイテンシー / 総トークン数」として大まかに推定されます。総レイテンシーには、ネットワークの往復レイテンシ、キューイング時間、最初のトークンまでの時間が含まれます。ネットワークジッターや長い入力によって水増しされやすく、誤って Hold 状態をトリガーする可能性があります。

  • 大規模リクエストのスターベーション (Starvation Risk):最大のスケジューリング性能を達成するために、この戦略は非厳密な FIFO ウェイクアップメカニズムを使用します。クォータが不足している場合、短いトークンのリクエストが「割り込み」してリソースを先取りし、長いトークンのリクエストが長期間待たされる原因となる可能性があります。

  • コールドスタート問題:この戦略は、統計モデルを構築するためにウォームアップ期間を必要とします。低負荷または短命のタスクでは、ゼロから探索するため、スループットが最初の 3 つの戦略よりも低くなる可能性があります。

コード例

制御エントリポイント

class ElasticCongestionController:
    async def acquire(self):
        """[アドミッションフェーズ] リクエスト開始前のチェック"""
        # 1. SSR スロースタート再起動:アイドル時間が長すぎる場合、制限を積極的に減衰させる
        #    古いウォーターマークによるバーストトラフィックを防ぐため。
        if self.is_idle_too_long():
            self.perform_slow_start_restart()

        # 2. サーキットブレーカーチェック:DRAIN (クールダウン) 状態の場合、強制的に待機。
        if self.state == CongestionState.DRAIN:
            await self.wait_for_cooldown()

        # 3. デュアルバジェットチェック:同時実行スロットとトークンバジェットの両方をチェック。
        await self.wait_for_budget(request_tokens)

    async def release(self, latency, actual_tokens, error):
        """[フィードバックフェーズ] リクエスト終了後の決定"""
        if error:
            # [障害応答] レート制限エラー (429/503) 時:即座にドレイン + 乗法バックオフ
            self.state = CongestionState.DRAIN
            self.concurrency_limit *= self.backoff_factor  # 例:0.7
            return

        # [通常応答] TPT (Time-Per-Token) を計算
        current_tpt = latency / actual_tokens

        # [輻輳認識] TPT の急上昇 (生成が遅くなる):HOLD に入り観察
        # 同時実行レベルを維持し、バックオフも増加もさせない
        if current_tpt > self.metrics.ema_tpt * 2.0:
            self.state = CongestionState.HOLD
        else:
            # [定常状態の探索] ネットワークは健全:EBP 弾性探索を実行
            self.state = CongestionState.PROBING
            self.update_limit_via_ebp()

EBP 探索

def probe_next_limit(self, current_limit, max_known_capacity):
    """
    次の同時実行数制限を計算する
    コア数式:次 = Max(バネ張力, 加法的推力) + ガバナー平滑化
    """
    # 1. 物理的制限の計算 (リトルの法則)
    # 理論的制限 = スループット * レイテンシー * バッファ係数
    dynamic_ceiling = self.metrics.tps * self.metrics.avg_latency * 1.2

    # 2. バネロジック (バネ張力)
    # 過去の最高ウォーターマークから遠いほど張力は大きく (加速)、近いほど小さくなる (減速)
    tension = 1.0 - (current_limit / max_known_capacity)
    spring_target = current_limit * (1.0 + tension * gain)

    # 3. 加法的推力
    # 「ゼノンのパラドックス」を解決:張力が 0 に近づくと、強制的に小さな線形増分を追加
    # システムが局所最適解から脱出し、境界の探索を続けられるようにする。
    linear_target = current_limit + self.min_additive_step

    raw_target = max(spring_target, linear_target)

    # 4. アンチバーストレートガバナー
    # 同時実行数の増加の加速度を制限し、ステップ状の変化を防ぐ。
    final_limit = self.governor.smooth(raw_target)

    return min(final_limit, dynamic_ceiling)

メトリクス追跡

class CongestionMetrics:
    def update_stats(self, latency, token_count):
        """
        [センサー] 統計メトリクスをリアルタイムで更新
        EMA (指数移動平均) を使用して、ロングテールリクエストからのノイズを除去
        """
        alpha = 0.2  # 平滑化係数

        # 1. 単一リクエストサイズの推定 (トークンサイズ)
        self.ema_tokens = (1 - alpha) * self.ema_tokens + alpha * token_count

        # 2. TPT (Time Per Token) の推定
        # 異なる LLM 生成長による誤差を排除するために、レイテンシーの代わりに TPT を使用
        instant_tpt = latency / token_count
        self.ema_tpt = (1 - alpha) * self.ema_tpt + alpha * instant_tpt

    def track_inflight(self, estimated_tokens):
        """
        [死角の補完] 「応答後にのみカウントする」という遅延を修正
        リクエストが開始された瞬間にクォータを事前控除
        """
        self.inflight_tokens += estimated_tokens

アーキテクチャレベルのフォールバックソリューション

プラットフォーム設定とクライアント側のトラフィック制御でも、可用性やピークスループットに関するビジネス要件を満たせない場合、システムアーキテクチャーレベルでフォールバックメカニズムを導入できます。

モデルフォールバック

プライマリモデルがレート制限やサービス例外により応答できない場合、より寛容なクォータを持つ代替モデルに自動的にフォールバックし、メインプロセスが応答を継続できるようにします。

フォールバックパスの設計原則

  • 異なるシリーズのモデルを選択:Model Studio では、レート制限はモデルごとに個別に計算されます。あるモデルがレート制限された場合、別のモデルをフォールバックとして選択できます。例えば、qwen3.6-plus から qwen3.6-flash にフォールバックできます。

  • レート制限エラー時のみフォールバックをトリガー:フォールバックは、すべての例外ではなく、429 レート制限エラーに対してトリガーされるべきです。モデルを切り替えても、ネットワークタイムアウトやパラメータエラーなどの問題は解決しません。

  • フォールバックモデルを事前に検証:フォールバック後に機能的な例外を避けるため、フォールバックモデルが Function Calling や構造化出力など、ビジネスで必要な機能をサポートしていることを確認してください。

コード例

以下の例は、429 エラーコードに基づくモデルフォールバックロジックを示しています。プライマリモデルへのリクエストがレート制限をトリガーすると、自動的にフォールバックモデルに切り替えてリトライします。

import os
import asyncio
from openai import AsyncOpenAI, APIStatusError

# プライマリモデルとフォールバックモデル (異なるシリーズ、独立したクォータ)
PRIMARY_MODEL = "qwen3.6-plus"
FALLBACK_MODEL = "qwen3.6-flash"

client = AsyncOpenAI(
    api_key=os.getenv("DASHSCOPE_API_KEY"),
    base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"
)

async def chat_with_fallback(messages: list) -> str:
    """フォールバック付きリクエスト:プライマリモデルがレート制限された場合、自動的にフォールバックモデルに切り替える。"""
    for model in [PRIMARY_MODEL, FALLBACK_MODEL]:
        try:
            response = await client.chat.completions.create(
                model=model,
                messages=messages
            )
            return response.choices[0].message.content
        except APIStatusError as e:
            if e.status_code == 429 and model == PRIMARY_MODEL:
                print(f"[Rate Limit Triggered] {model}, falling back to {FALLBACK_MODEL}")
                continue
            raise
    raise RuntimeError("All models are unavailable")

async def main():
    result = await chat_with_fallback(
        messages=[{"role": "user", "content": "Hello"}]
    )
    print(result)

if __name__ == "__main__":
    asyncio.run(main())

モデルフォールバックは、クライアント側のトラフィック制御戦略と組み合わせることができます。例えば、リクエストレート制限戦略のリトライメカニズムにフォールバックロジックを統合することができます。リトライが尽きてもレート制限がトリガーされる場合は、フォールバックモデルに切り替えます。

メッセージキュー (MQ) を利用したピークシフト

即時応答を必要としないバックエンドサービスの場合、RabbitMQ や Kafka などのメッセージミドルウェアを導入してピークシフトを行うことができます。バーストトラフィックはまず MQ に書き込まれ、コンシューマー側がレート制限クォータに従って安定したレートでプルして処理します。このアーキテクチャーは、フロントエンドのピークとバックエンドの呼び出しを分離し、根本的にレート制限エラーを防ぐことができます。

シナリオ:チケット処理、コンテンツモデレーション、バッチデータアノテーションなど、ユーザーがタスクを送信した後に非同期通知メッセージを受け入れることができるビジネス。MQ はバッファレイヤーとして機能し、フロントエンドからのトラフィックスパイクを吸収し、コンシューマー側は安定したレートで Model Studio API にリクエストを送信します。

主要なアーキテクチャー設計のポイント:

  • コンシューマーのレート制御:コンシューマー側は、メッセージを無制限にプルするのではなく、RPM/TPM クォータに基づいて安定したレートでメッセージを消費するために、リクエストレート制限またはトラフィックシェーピング戦略を使用すべきです。

  • デッドレター処理:複数回のリトライ後に失敗したメッセージは、デッドレターキューに移動し、アラートをトリガーします。これにより、無限リトライが消費をブロックするのを防ぎます。

  • バックプレッシャーの伝播:MQ のバックログがしきい値を超えた場合、例えばキューイングステータスを返すなどして、上流に圧力を伝播させます。これにより、キューが無限に増大するのを防ぎます。

本番環境での考慮事項

上記のコード例は、Python の asyncio シングルスレッドループに基づいており、コアアルゴリズムを実証することを目的としています。大規模な本番環境に適用する前に、以下の問題を考慮してください。

  • 非テキストモデルへの適応

    上記の戦略はテキストモデルを例としていますが、コアとなる制御原則は、画像生成や音声合成などのマルチモーダルモデルサービスにも適用されます。測定単位が異なるだけで、本質は同じです。つまり、送信レートと処理能力を制限することです。

    • 音声認識などのモデルは、通常、単位時間あたりのリクエスト数 (RPM など) と使用量 (音声の持続時間など) の両方によって制約されます。戦略は基本的にテキストモデルと同じです。

    • 画像や動画のモデルは、通常、タスク送信レートと同時タスク数によって制約されます。リクエストレート制限戦略と同じアプローチを使用できます。つまり、タスク送信レートを制限し、セマフォを使用して同時実行数を制御します。

    レート制限メトリクスがどのように変化しても、クライアント側のスロットリングの原則は同じです。カウンター (RPM トークンバケットなど) や探索メトリック (TPT など) を対応するモダリティのメトリックに置き換えるだけです。モデルの具体的なレート制限ルールとメトリック定義については、「レート制限」をご参照ください。

  • 並行モデルにおけるアトミック性

    実装例:asyncio はシングルスレッドの協調スケジューリングを使用するため、コード例の状態変更操作は本質的にアトミックであり、単一プロセス内で追加の同時実行保護は必要ありません。

    本番環境での推奨事項:マルチスレッドまたはマルチプロセス環境で実装する場合、状態更新の正しさを保証するために、トークンバケットと統計ウィンドウの同時実行安全性を確保してください。そうしないと、競合状態がトラフィック制御の失敗を引き起こします。

  • 分散レート制限

    実装例:コード例のトラフィック制御コンポーネントはすべてインメモリ実装です。

    本番環境での推奨事項:マルチインスタンスの分散デプロイメントでは、各インスタンスが独立してローカルのトラフィック制御を行います。実際の総使用量が制限を超え、グローバルなレート制限がトリガーされる可能性があります。Redis などの集中カウンターを使用して、すべてのノードの使用量を一元的に管理してください。

  • 優先度付きキューとスターベーション防止

    実装例:どのコード例も優先度による差別化を実装していません。特に適応型輻輳制御戦略は、最大のスケジューリング性能を達成するために非厳密な FIFO ウェイクアップメカニズムを使用しています。

    本番環境での推奨事項:ビジネスに高優先度と低優先度のリクエストがある場合、高優先度リクエストの帯域幅を保証するために重み付き優先度付きキューを実装してください。また、継続的な高負荷時に低優先度キューが完全にスケジューリング不能になるのを防ぐために、低優先度キューに最小限のクォータを予約するスターベーション防止メカニズムを導入してください。