朝 8 時台に、ユーザー向けの生成機能だけ 429 が集中して出る。そんな症状に気づいたのは、アプリ内説明文の多言語一括再生成ジョブを夜間に流し始めてから 3 日目のことでした。ジョブ自体は深夜 2 時開始の設計だったのですが、対象件数が増えて処理が朝まで食い込み、利用者が増え始める時間帯とバルク処理の尻尾が重なっていたのです。個人開発では 1 つの Google Cloud プロジェクトに複数の機能を同居させることが多く、この「機能同士がクォータを奪い合う」構図は遅かれ早かれ踏む問題だと思います。
Gemini API のレート制限(RPM・TPM)はモデルごと・プロジェクトごとに適用されます。つまり同じプロジェクトから呼んでいる限り、ユーザーが画面の前で待っている対話機能も、遅れても誰も困らない夜間バッチも、同じ 1 つの枠を食べます。今回はこの契約を前提に、対話機能を保護する側に倒したアドミッション制御の設計と、導入前後の実測を記録します。
症状の切り分け — その 429 は「誰が」使い切った枠なのか
429 対策というとリトライ戦略の話になりがちですが、リトライは「枠が回復すれば通る」ことを前提にした対処です。枠そのものを別の機能が継続的に消費している場合、リトライは行列を長くするだけで、むしろリトライ増幅で消費を加速させます。リトライ可能かどうかの分類は429 の原因別リトライ設計の記事 に書いたので、今回はその手前、「そもそも枠を誰が食べているか」から始めます。
私自身が最初にやったのは、すべての Gemini 呼び出しに機能タグを通すことでした。呼び出し箇所が散らばっていると集計のしようがないので、まず薄いラッパーに寄せます。
import time
import threading
from collections import defaultdict, deque
from google import genai
client = genai.Client()
class TaggedGeminiClient :
"""全呼び出しに feature タグを強制し、分単位の使用量を記録する薄いラッパー"""
def __init__ (self, client: genai.Client):
self ._client = client
self ._lock = threading.Lock()
# feature -> deque[(epoch_minute, requests, tokens)]
self ._usage = defaultdict( lambda : deque( maxlen = 180 ))
def generate (self, * , feature: str , model: str , contents, config = None ):
resp = self ._client.models.generate_content(
model = model, contents = contents, config = config
)
used = resp.usage_metadata
total = (used.prompt_token_count or 0 ) + (used.candidates_token_count or 0 )
minute = int (time.time() // 60 )
with self ._lock:
bucket = self ._usage[feature]
if bucket and bucket[ - 1 ][ 0 ] == minute:
m, r, t = bucket[ - 1 ]
bucket[ - 1 ] = (m, r + 1 , t + total)
else :
bucket.append((minute, 1 , total))
return resp
def snapshot (self, last_minutes: int = 60 ):
"""直近 N 分の feature 別 RPM / TPM を返す"""
cutoff = int (time.time() // 60 ) - last_minutes
out = {}
with self ._lock:
for feature, buckets in self ._usage.items():
rows = [b for b in buckets if b[ 0 ] >= cutoff]
if rows:
out[feature] = {
"avg_rpm" : sum (r for _, r, _ in rows) / len (rows),
"peak_rpm" : max (r for _, r, _ in rows),
"peak_tpm" : max (t for _, _, t in rows),
}
return out
usage_metadata を集計に使うので、追加コストなしで実トークン量まで取れます。これを 1 週間流した結果が次の表です。朝 7〜9 時のピーク帯で、バルク再生成が RPM の 82% を占めていました。対話機能の 429 は「対話機能が増えたから」ではなく、「バルクの尻尾が朝に届いていたから」だったわけです。
機能 呼び出しパターン ピーク帯 RPM 占有 許容できる遅延 対話生成(ユーザー向け) 朝夕にスパイク 14% 数秒(体感に直結) 多言語説明文の一括再生成 夜間開始・完了まで数時間 82% 数時間(誰も待っていない) 通知文の下書き生成 散発 4% 数分
この表の「許容できる遅延」の列が設計のすべてです。遅延許容度がまったく違う仕事が同じ枠を同じ優先度で食べていることが問題であって、枠の総量が足りないわけではありませんでした。
Before/After — 素通しの呼び出しをアドミッション層に通す
対策前の構成は、各機能がそれぞれ SDK を直接呼ぶ素直な形でした。
# Before: 各機能が好きなタイミングで直接叩く
# 対話ハンドラ
resp = client.models.generate_content( model = "gemini-flash-latest" , contents = user_prompt)
# バルクワーカー(ループで数千件)
for item in items:
resp = client.models.generate_content( model = "gemini-flash-latest" , contents = build_prompt(item))
この形の何が問題かというと、API 側のレートリミッタに到達して初めて「混んでいる」ことが分かる点です。429 を受け取った時点では、もう対話リクエストとバルクリクエストの区別はつきません。先に自分のプロセス側で交通整理をする、つまり呼び出しの手前に admission control(入場制御)を置くのが After の形です。
# After: 全呼び出しが優先度つきの入場ゲートを通る
gate = PriorityAdmissionGate( rpm_limit = 1000 , tpm_limit = 1_000_000 ,
reserved_interactive_ratio = 0.3 )
# 対話ハンドラ
async with gate.acquire( feature = "chat" , priority = "interactive" , est_tokens = 1200 ):
resp = await async_generate(user_prompt)
# バルクワーカー
async with gate.acquire( feature = "bulk_regen" , priority = "bulk" , est_tokens = 2800 ):
resp = await async_generate(build_prompt(item))
ゲートの契約は 2 つだけです。対話用に枠の一定割合(ここでは 30%)を常に予約しておくこと。そしてバルクは残りしか使えないが、対話が暇なときはその予約分を借りてよいこと。逆方向の貸し借り(バルクが対話の予約を食う)は許しません。
優先度つきトークンバケットの実装
中身は古典的なトークンバケットを 2 レーンに分けたものです。RPM と TPM の両方を見る点だけが LLM 特有です。
import asyncio
import time
from contextlib import asynccontextmanager
class PriorityAdmissionGate :
def __init__ (self, rpm_limit: int , tpm_limit: int ,
reserved_interactive_ratio: float = 0.3 ,
safety_margin: float = 0.85 ):
# API 上限そのままではなく手前で止める(他プロセス分の余白)
self ._rpm = rpm_limit * safety_margin
self ._tpm = tpm_limit * safety_margin
self ._reserved = reserved_interactive_ratio
self ._req_tokens = self ._rpm # リクエスト数バケット
self ._tok_tokens = self ._tpm # トークン数バケット
self ._last = time.monotonic()
self ._cond = asyncio.Condition()
def _refill (self):
now = time.monotonic()
elapsed = now - self ._last
self ._last = now
self ._req_tokens = min ( self ._rpm, self ._req_tokens + self ._rpm * elapsed / 60 )
self ._tok_tokens = min ( self ._tpm, self ._tok_tokens + self ._tpm * elapsed / 60 )
def _floor (self, priority: str ) -> tuple[ float , float ]:
"""バルクは予約分を残して止まる。対話は 0 まで使える"""
if priority == "interactive" :
return ( 0.0 , 0.0 )
return ( self ._rpm * self ._reserved, self ._tpm * self ._reserved)
@asynccontextmanager
async def acquire (self, * , feature: str , priority: str , est_tokens: int ):
async with self ._cond:
req_floor, tok_floor = self ._floor(priority)
while True :
self ._refill()
if ( self ._req_tokens - 1 >= req_floor
and self ._tok_tokens - est_tokens >= tok_floor):
self ._req_tokens -= 1
self ._tok_tokens -= est_tokens
break
# 足りなければ回復を待つ(バルクはここで自然に減速する)
await asyncio.wait_for( self ._cond.wait(), timeout = 1.0 )
try :
yield
finally :
async with self ._cond:
self ._cond.notify_all()
ポイントは _floor() です。バルクレーンはバケット残量が予約ライン(30%)を割り込む手前で待たされるため、対話リクエストが来たときには必ず即時に通れる残量があります。一方、夜中のように対話が完全に暇な時間帯は、予約ラインぎりぎりまでバルクが使えるので、総スループットはほとんど犠牲になりません。導入後の夜間帯では、単純な均等分割(対話 30% を固定で切り出す方式)と比べてバルクの実効スループットが 22% 高くなりました。使われない予約を貸し出せるかどうかの差です。
est_tokens は事前見積もりです。厳密にやるなら count_tokens を呼びますが、それ自体が 1 リクエストを消費するので、私はこの用途では「文字数 ÷ 2 + 出力上限」の粗い見積もりで通し、応答後に usage_metadata の実測値で誤差を補正する方式にしています。見積もり誤差は 2 週間の平均で +11% と粗いものの、アドミッション制御の目的(枠の食い潰し防止)には十分でした。
RPM だけ制御して一度落ちた話 — TPM は別の崖
最初の実装は RPM しか見ていませんでした。それで 1 週間は平和だったのですが、バルク対象に長文ドキュメントの要約系タスクを追加した日に、RPM には余裕があるのに 429 が再発しました。原因は TPM です。RPM だけ見る制御は分かりやすい落とし穴なので、注意点と対処をここに記録しておきます。1 件あたりのプロンプトが 9,000 トークン前後ある処理は、リクエスト数としては大したことがなくても、トークン量では枠を一気に食べます。
制御方式 短文タスクのみの週 長文タスク追加後 RPM のみ制御 429 ほぼゼロ 朝帯に 429 再発(TPM 超過) RPM + TPM の二次元制御 429 ほぼゼロ 429 ほぼゼロ(バルクが自然減速)
リクエスト単価(トークン量)が機能ごとに 1 桁違うのが LLM ワークロードの特徴なので、二次元で制御しないと「件数は少ないのに重い」機能が抜け道になります。上のゲート実装が最初から 2 つのバケットを持っているのはこの失敗の反映です。
バルク側の作法 — 減速シグナルを外から与える
アドミッション制御はプロセス内の交通整理なので、API 側から見た混雑(他プロセスや他マシンの消費、上限側の変動)までは分かりません。そこでバルクワーカーには、ゲートとは別にもう 1 段「外形指標による減速」を入れています。対話機能側の 429 率と p95 レイテンシを 1 分粒度で監視し、しきい値を超えたらバルクを一時停止する単純な回路です。
async def bulk_worker (items, gate, health):
for item in items:
# 対話側の健康状態が悪化していたらバルクは黙って待つ
while not health.interactive_healthy():
await asyncio.sleep( 30 )
async with gate.acquire( feature = "bulk_regen" , priority = "bulk" ,
est_tokens = estimate(item)):
await process(item)
class InteractiveHealth :
def __init__ (self, max_429_rate = 0.005 , max_p95_sec = 3.5 ):
self ._max_429 = max_429_rate
self ._max_p95 = max_p95_sec
self .window = deque( maxlen = 300 ) # (ts, status, latency)
def interactive_healthy (self) -> bool :
recent = [w for w in self .window if w[ 0 ] > time.time() - 60 ]
if not recent:
return True
rate_429 = sum ( 1 for _, s, _ in recent if s == 429 ) / len (recent)
lats = sorted (l for _, _, l in recent)
p95 = lats[ int ( len (lats) * 0.95 ) - 1 ] if len (lats) >= 20 else 0
return rate_429 <= self ._max_429 and p95 <= self ._max_p95
バルク処理は「止めても誰も困らない」のが最大の資産です。困らない側を止める判断を自動化しておくと、朝の対話ピークにバルクの尻尾が重なった日でも、被害が出る前にバルクが自主的に道を譲ります。スループットを積極的に最大化したい夜間帯の考え方はバルク処理の適応的並列度制御の記事 と対になるので、攻めと守りをセットで設計することを推奨します。
「別の API キーに分ければいい」が効かない理由
対策を検討していた当初、いちばん最初に思いついたのは「バルク用に別の API キーを発行して分ける」でした。これは効きません。Gemini API のレート制限はキー単位ではなくプロジェクト単位で適用されるため、同じプロジェクトのキーを何本に分けても、食べている枠は同じだからです。キーの分割はセキュリティ上の権限分離としては意味がありますが、クォータの隔離にはなりません。
本当に隔離したければプロジェクトごと分けることになります。実際、暴走時の被害半径を区切る目的でのプロジェクト分割は有効で、費用面の隔離については支出上限の被害半径設計の記事 で書いたとおりです。ただしプロジェクト分割は請求・監視・キー管理の運用コストが倍々で増えるので、私はこの判断基準で使い分けています。
状況 推奨 同一アプリ内の機能同士の奪い合い プロジェクトは分けず、優先度つきアドミッション制御で解決 暴走・侵害時の被害半径を切りたい プロジェクト分割(費用と枠の両方が隔離される) 本番と検証環境の分離 プロジェクト分割(検証の実験が本番の枠を食べる事故を防ぐ)
導入 2 週間の実測
導入前後それぞれ 2 週間の比較です。母数は対話機能の全リクエストで、計測点はアプリケーション側(リトライ前の初回応答)です。
指標 導入前 導入後 対話機能の 429 率(朝ピーク帯) 3.2% 0.03% 対話機能の p95 レイテンシ(同) 4.1 秒 2.3 秒 バルクジョブの完走時間 42 分 58 分(+16 分) 夜間帯のバルク実効スループット — 固定分割比 +22%(予約枠の貸し出し効果)
バルクの完走が 16 分延びていますが、これは対話ピーク帯にバルクが道を譲った時間そのものであり、設計どおりの挙動です。夜間バッチにとっての 16 分と、画面の前のユーザーにとっての 429 は等価ではありません。この非対称性を実装に落とすのがアドミッション制御の本質だと考えています。
なお、この仕組みが不要なケースも書いておきます。機能が実質 1 つしかない、あるいはすべての呼び出しが同じ遅延許容度を持つなら、ゲートはただの複雑性です。その場合は単純な並列度制限とリトライ設計だけで足ります。複雑性は奪い合いが実測で確認できてから導入する、の順番を守るほうが健全です。
まとめ — 最初の一歩は機能タグ 1 個
もしいま 429 の原因が特定できていないなら、最初にやるべきはゲートの実装ではなく、冒頭の TaggedGeminiClient のような機能タグつき計測を 1 週間流すことです。犯人が分かれば、予約比率もしきい値も表の数字を眺めるだけでほぼ決まります。私の場合、計測を入れてから犯人特定までは 3 日、ゲート本体の実装と調整は週末 1 回分でした。同じ構図に心当たりのある方の参考になれば幸いです。