個人開発でチャット機能を載せたとき、最初に届いた不具合報告は「昨日の続きから話せない」でした。ローカルでは何の問題もなく動いていたのに、Cloud Run に乗せた途端、ユーザーがアプリを開き直すたびに AI が自己紹介からやり直してしまう。原因を追うと、ChatSession オブジェクトをプロセスのメモリに握ったままにしていただけのことでした。
Gemini API の chats.create(history=...) は、履歴さえ渡せば過去の文脈を踏まえた応答を返してくれます。公式サンプルの history=[] から始めるのは正しいのですが、その履歴を「どこに置くか」を決めないままスケールさせると、本番運用の初日に静かに壊れます。素朴な実装の落とし穴は、いずれも例外が出ないまま進行するのが厄介です。ここでは、私自身が個人開発のプロダクトで実際に運用しているチャット状態管理のうち、Redis を受け皿にする前提で「素朴な実装が破綻する箇所」と「そこをどう塞いだか」を、コードを添えて整理します。
なお、本稿のモデル指定は 2026 年 6 月時点で既定となった gemini-3.5-flash と、推論を厚くしたいときの gemini-3.5-pro を前提にしています。既定モデルが上がると出力の癖も変わるため、履歴フォーマットを後述のように SDK から剥がしておくと、移行のたびに保存層を書き換えずに済みます。
本番で監視している3つの壊れ方
インメモリ保持が本番で破綻する理由は、突き詰めると「コンテナの寿命とリクエストの寿命が一致しない」ことに尽きます。私が実運用で監視しているのは、次の3つの兆候です。
ひとつ目は、コンテナの短命さ です。Cloud Run のインスタンスはトラフィックが途切れれば数分で停止します。再起動後にグローバル変数の中身は当然空になり、直前まで続いていた会話が消えます。ログ上は例外も出ないため、ユーザー報告で初めて気づくのが厄介なところです。
ふたつ目は、水平スケール時の振り分け です。同じユーザーの 1 通目と 2 通目が別インスタンスに届けば、それぞれが独立したメモリを見ているので履歴が繋がりません。Sticky セッションで同じインスタンスに固定する逃げ道もありますが、スケールの自由度を捨てることになり、結局はステートを外に出すのが筋だと考えています。
みっつ目は、クライアントの再接続 です。モバイルアプリを落として数時間後に再開したとき、サーバー側に状態が残っている保証はありません。「セッションはいつでも消えうる」という前提で組むほうが、結果的に堅牢になります。
Redis はこの受け皿としてよく噛み合います。ミリ秒オーダーの読み書き、TTL による自動失効、必要なら Pub/Sub でのリアルタイム通知まで揃っているため、チャット状態の中継地点として扱いやすいのです。
まず「とりあえず動く」版と、その綻び
最初に、最小限の実装を見てから、何が足りないかを順に潰していきます。
# requirements: google-genai, redis
import json
import os
import redis
from google import genai
r = redis.Redis.from_url(os.environ[ "REDIS_URL" ], decode_responses = True )
client = genai.Client( api_key = os.environ[ "GEMINI_API_KEY" ])
def chat_once (session_id: str , user_message: str ) -> str :
raw = r.get( f "chat: { session_id } " )
history = json.loads(raw) if raw else []
chat = client.chats.create( model = "gemini-3.5-flash" , history = history)
response = chat.send_message(user_message)
new_history = [
{ "role" : msg.role, "parts" : [{ "text" : p.text} for p in msg.parts]}
for msg in chat.get_history()
]
r.set( f "chat: { session_id } " , json.dumps(new_history))
return response.text
これは動きます。ただ数日運用すると、次の綻びが順番に顔を出しました。
第一に、履歴が無限に伸びます 。1 往復ごとに数百〜数千トークンが積み上がり、私の手元では雑談寄りのセッションで 1 週間ほどで 10 万トークンを超えました。応答レイテンシが目に見えて悪化し、入力トークン課金も線形に膨らみます。
第二に、TTL がありません 。書き込みっぱなしなので、使われなくなったセッションが残り続けます。Redis のメモリ使用量が右肩上がりになり、maxmemory と eviction ポリシー次第では新規セッションの保存に失敗し始めます。
第三に、同時書き込みで履歴が壊れます 。同じユーザーが二つのタブから送ると、片方の保存がもう片方を丸ごと上書きします。UI 側のオプティミスティック更新と重なると、画面表示と実体がずれて再現の難しいバグになります。
第四に、Gemini SDK の内部構造に依存しています 。chat.get_history() の戻りをそのまま直列化しているので、SDK のバージョンアップで構造が変わると古いデータが読めなくなります。
ここからは、それぞれの落とし穴への対処を順に見ていきます。
トークン予算を「測って」決める
無限に伸びる履歴には、「直近 N 往復はそのまま残し、それより古い分は要約に置き換える」スライディングウィンドウ方式を推奨しています。直近の文脈を保ちながら、古い背景は圧縮して持てるからです。
from google import genai
MAX_RECENT_TURNS = 10 # 直近 10 往復(= 20 メッセージ)はそのまま残す
SUMMARY_TRIGGER = 14 # 往復数がこれを超えたら要約を走らせる
def trim_history (history: list , client: genai.Client) -> list :
"""直近 N 往復を残し、それ以前を要約で置き換える"""
turns = len (history) // 2
if turns <= SUMMARY_TRIGGER :
return history
cut = (turns - MAX_RECENT_TURNS ) * 2
older, recent = history[:cut], history[cut:]
older_text = " \n " .join(
f " { m[ 'role' ] } : { m[ 'parts' ][ 0 ][ 'text' ] } " for m in older
)
# 要約は難しいタスクではないので軽量モデルで十分
summary = client.models.generate_content(
model = "gemini-3.5-flash" ,
contents = (
"次の会話を 300 字以内で要約してください。"
"固有名詞・数値・未完了のタスク状態は必ず残してください。 \n\n "
f " { older_text } "
),
).text
return [
{ "role" : "user" , "parts" : [{ "text" : f "[これまでの会話の要約] \n{ summary } " }]},
{ "role" : "model" , "parts" : [{ "text" : "承知しました。続きを進めます。" }]},
* recent,
]
しきい値を勘で決めず、いったん本番ログから「1 往復あたりの平均トークン」を測ってから MAX_RECENT_TURNS を逆算したのが、地味ですが効きました。私の用途では 1 往復が平均 600 トークン前後だったので、直近 10 往復=約 1.2 万トークンを上限の目安に置き、要約後の 1 リクエストを 1.5 万トークン以内に収める設計にしています。この上限を入れてから、長い会話での応答レイテンシは体感で約 2 倍改善し、入力課金も読めるようになりました。
要約に gemini-3.5-flash をあてているのは意図的です。メインの応答に推論寄りのモデルを使いつつ、要約という軽い処理は速くて安いモデルに振り分けることで、圧縮コストをほとんど無視できる水準まで下げられます。
同時送信と TTL を Redis 側で固める
TTL は書き込みのたびに更新するのが素直で、「最終アクセスから一定時間で自動失効」という直感的な挙動になります。
SESSION_TTL = 60 * 60 * 24 * 7 # 7 日間、最終アクセスから起算
r.set( f "chat: { session_id } " , json.dumps(new_history), ex = SESSION_TTL )
同時書き込みは、セッション単位の排他ロックで順序を保証します。チャットでは WATCH/MULTI の楽観ロックより、SET NX の専用ロックキーのほうが扱いやすい場面が多い印象です。
import uuid
import time
def with_session_lock (session_id: str , fn, timeout_ms: int = 5000 ):
"""セッション単位の排他ロックを取ってから fn() を実行する"""
lock_key = f "chat:lock: { session_id } "
token = str (uuid.uuid4())
deadline = time.time() + timeout_ms / 1000
while time.time() < deadline:
if r.set(lock_key, token, nx = True , px = 10000 ): # ロックは 10 秒で自動失効
try :
return fn()
finally :
# 自分のトークンと一致するときだけ削除(他人のロックを奪わない)
release = (
"if redis.call('get', KEYS[1]) == ARGV[1] then "
"return redis.call('del', KEYS[1]) else return 0 end"
)
r.eval(release, 1 , lock_key, token)
time.sleep( 0.05 )
raise RuntimeError ( "Failed to acquire chat session lock" )
解放を Lua スクリプトでトークン照合してから行っているのは、処理が長引いてロックの TTL が先に切れ、別プロセスが同じキーを取得した後に、自分が古いロックを誤って消してしまう事故を防ぐためです。send_message は数秒かかることがあるので、この「自分のロックだけを消す」一手は本番で効いてきます。
保存フォーマットを SDK から剥がす
SDK のオブジェクト構造に直接依存すると、マイナーアップデートのたびに保存層が壊れます。自分たちで定義した中間フォーマット(DTO)を一枚挟むのが、結局いちばん壊れにくいというのが私の結論です。
import time
from dataclasses import dataclass, asdict
from typing import Literal
@dataclass
class StoredMessage :
role: Literal[ "user" , "model" ]
text: str
ts: float # 作成時刻(デバッグ・分析用)
def to_stored (history) -> list[ dict ]:
out = []
for msg in history:
text = "" .join(p.text for p in msg.parts if hasattr (p, "text" ))
out.append(asdict(StoredMessage( role = msg.role, text = text, ts = time.time())))
return out
def from_stored (stored: list[ dict ]) -> list[ dict ]:
"""Gemini API が期待するフォーマットへ戻す"""
return [
{ "role" : m[ "role" ], "parts" : [{ "text" : m[ "text" ]}]}
for m in stored
]
この層があると、画像入力やツール呼び出しのような Parts が増えても StoredMessage を拡張するだけで吸収できます。実際、保存層をこの形にしておいたおかげで、SDK を google-generativeai から google-genai に移したときも、書き換えはアダプタ関数の中だけで済みました。保存済みデータを一度もマイグレーションせずに移行できたのは、この一枚を挟んでいたからです。
Redis が落ちても会話を止めない
ここが、本番に出してから一番考えさせられた箇所です。セッション履歴は再現不可能なデータですが、だからといって Redis 障害でサービス全体を止めるのは過剰です。私は「履歴の読み書きに失敗したら、履歴なしの新しい会話として受け付ける」縮退を入れています。
def load_history (session_id: str ) -> list :
try :
raw = r.get( f "chat: { session_id } " )
return from_stored(json.loads(raw)) if raw else []
except redis.RedisError:
# 履歴が引けないだけで会話自体は成立させる(縮退運転)
return []
def save_history (session_id: str , history) -> None :
try :
r.set( f "chat: { session_id } " , json.dumps(to_stored(history)), ex = SESSION_TTL )
except redis.RedisError:
# 保存失敗はメトリクスに数え、応答自体は返す
pass # ここで監視カウンタをインクリメントしておく
縮退中はユーザーから見ると「文脈が一度途切れる」ことになりますが、エラー画面を返すより体験の毀損は小さく済みます。重要なのは、pass で握りつぶすのではなく、ここを必ずメトリクスに記録しておくことです。保存失敗率が普段ゼロのところで跳ねたら、Redis 側の異常を即座に検知できます。
組み上げた本番スケルトン
ここまでの対策を一本にまとめると、次の構造に落ち着きます。読み込み・トリム・送信・保存を、セッションロックの内側で順序立てて実行します。
def chat_persistent (session_id: str , user_message: str ) -> str :
def _run ():
history = load_history(session_id)
history = trim_history(history, client) # 保存前に圧縮しておく
chat = client.chats.create( model = "gemini-3.5-pro" , history = history)
response = chat.send_message(user_message)
save_history(session_id, chat.get_history())
return response.text
return with_session_lock(session_id, _run)
レイテンシが気になる規模になったら、trim_history の要約呼び出しだけを非同期のジョブキューへ逃がす手もあります。ユーザー応答には圧縮前の履歴を使い、裏で要約が終わったら書き戻す設計にすれば、体感速度を犠牲にせず履歴を畳めます。私はまずは同期実装で運用を始め、要約の所要時間がログ上で気になり始めてから非同期化する、という順番を勧めます。先回りして複雑にしないほうが、運用しながら正しい閾値が見えてきます。
次の一歩
導入の順序としては、次の段取りを推奨します。
既存のインメモリ実装に、セッションキーと load_history / save_history を足す(ここまでで「再デプロイで会話が消える」報告の大半が止まります)
トリムと TTL を入れてトークンとメモリを実測ベースで抑える
セッションロックと中間フォーマット、Redis 障害時の縮退を順に積み上げる
この順で一段ずつ重ねていけば、数か月運用しても崩れないチャット層になっていきます。会話 AI をプロダクトに本気で組み込むなら、こうした引き出しを一つずつ増やしておくと、障害が起きたときの判断が確実に速くなります。