深夜2時に走らせているバッチ処理のログを翌朝に眺めていたとき、50件の入力のうち1件だけ結果が欠けていることに気づきました。例外はどこにも投げられておらず、リトライも静かに尽きていて、そのジョブは結果が出ないまま、どの保存先にも残らず消えていました。3日経つまで気づけなかったのは、失敗が「エラー」ではなく「無」として処理されていたからです。
個人開発で複数のブログと壁紙アプリの自動処理を回していると、こうした「静かな取りこぼし」が一番こわい不具合だと感じます。落ちたことが画面に出る障害はまだ直せます。落ちたことすら残らない失敗は、気づくきっかけ自体がありません。今回は、無人で回る Gemini API パイプラインで失敗ジョブを一件も捨てないための、デッドレター(dead-letter)保存と再投入の作り方を、私自身が運用に入れている形でまとめます。
リトライを足しても「無言の消失」は止まらない
失敗対策というとまずリトライを思い浮かべますが、リトライは取りこぼしの半分しか解きません。問題は、リトライが尽きたあとのジョブの行き先です。多くのコードは max_attempts を超えると return None や continue で先へ進みます。ループは正常に最後まで回り、終了コードは 0 になり、失敗は記録されません。
もう一つの落とし穴は、そもそも再試行しても無駄な失敗です。入力 JSON の構造が壊れている、安全フィルタに恒久的に弾かれている、INVALID_ARGUMENT が返る——これらは何度投げても同じ結果になります。ここでリトライを回すと、API 費用と時間を溶かしながら、最後はやはり静かに消えます。
つまり必要なのは、リトライの追加ではなく「失敗の終着点」を用意することです。再試行で回復し得るものは時間を置いて試し、回復しないものは捨てずに退避させ、あとから人の判断で扱えるようにする。この退避先がデッドレターです。
デッドレターに何を保存するか
デッドレターは特別なミドルウェアを入れなくても、SQLite のテーブル1枚で十分に始められます。私はまず手元の SQLite で運用を確かめてから、必要に応じて Firestore などへ移すようにしています。大切なのは置き場所より「あとで再投入できるだけの情報を全部持つこと」です。
保存すべきは、ジョブを一意に識別する ID、入力そのもの(再投入に必須)、入力の指紋(重複検知用)、使ったモデル、失敗の分類、最後のエラーメッセージ、試行回数、初回と直近の時刻、そして状態です。
import sqlite3
import json
import time
import hashlib
DLQ_PATH = "dead_letter.db"
def init_dlq (path: str = DLQ_PATH ) -> None :
"""デッドレター用の最小テーブルを用意します。"""
con = sqlite3.connect(path)
con.execute(
"""
CREATE TABLE IF NOT EXISTS dead_letter (
job_id TEXT PRIMARY KEY,
payload_hash TEXT NOT NULL,
payload_json TEXT NOT NULL,
model TEXT,
error_class TEXT NOT NULL,
error_message TEXT,
attempts INTEGER NOT NULL,
first_seen REAL NOT NULL,
last_seen REAL NOT NULL,
status TEXT NOT NULL DEFAULT 'parked'
)
"""
)
con.commit()
con.close()
def payload_fingerprint (payload: dict ) -> str :
"""同じ入力を一意に識別するための指紋を作ります。"""
canonical = json.dumps(payload, sort_keys = True , ensure_ascii = False )
return hashlib.sha256(canonical.encode( "utf-8" )).hexdigest()[: 16 ]
def park (job_id: str , payload: dict , model: str ,
error_class: str , error_message: str , attempts: int ,
path: str = DLQ_PATH ) -> None :
"""恒久失敗ジョブをデッドレターへ退避します(既存なら更新)。"""
now = time.time()
con = sqlite3.connect(path)
con.execute(
"""
INSERT INTO dead_letter
(job_id, payload_hash, payload_json, model,
error_class, error_message, attempts, first_seen, last_seen, status)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 'parked')
ON CONFLICT(job_id) DO UPDATE SET
last_seen = excluded.last_seen,
attempts = excluded.attempts,
error_class = excluded.error_class,
error_message = excluded.error_message
""" ,
(job_id, payload_fingerprint(payload),
json.dumps(payload, ensure_ascii = False ), model,
error_class, error_message, attempts, now, now),
)
con.commit()
con.close()
job_id を主キーにして ON CONFLICT ... DO UPDATE を使うのは、同じジョブが再び落ちたときに行を増やさず last_seen と試行回数だけ更新するためです。これをしないと、同じ恒久失敗がデッドレターを何百行も埋め、本当に見るべき内訳が埋もれます。
「再試行可能」と「恒久」を機械的に切り分ける
退避するかどうかの判断を、毎回その場の勘で書くと必ずぶれます。失敗をステータスコードとメッセージから分類する関数を1つ用意して、判断を1か所に集約します。
RETRYABLE_STATUS = { 429 , 500 , 502 , 503 , 504 }
PERMANENT_STATUS = { 400 , 403 , 404 , 422 }
def classify_failure (status_code: int , message: str ) -> str :
"""失敗を 'retryable' / 'permanent' / 'unknown' に分類します。"""
text = (message or "" ).upper()
# 入力内容そのものが原因の失敗は、何度投げても同じ結果になります。
if status_code in PERMANENT_STATUS :
return "permanent"
if "INVALID_ARGUMENT" in text or "FAILED_PRECONDITION" in text:
return "permanent"
if "SAFETY" in text or "BLOCKED" in text:
return "permanent"
# 一時的な負荷・ネットワーク起因は、時間を置けば回復し得ます。
if status_code in RETRYABLE_STATUS :
return "retryable"
if "DEADLINE_EXCEEDED" in text or "UNAVAILABLE" in text:
return "retryable"
return "unknown"
判断の軸は単純です。入力の中身が原因なら恒久、外部要因なら再試行可能、どちらとも言い切れないものは unknown として控えめに少しだけ試す。下の表は、私が実運用で恒久側に倒している代表例です。
失敗の種類 分類 扱い
429(レート制限) retryable バックオフして再試行
503 / UNAVAILABLE retryable 時間を置いて再試行
400 / INVALID_ARGUMENT permanent 即デッドレターへ退避
安全フィルタによるブロック permanent 退避して内容を見直す
原因不明のタイムアウト unknown 少数回だけ試し、尽きたら退避
ここで unknown を恒久扱いにしないのが肝心です。一時障害を恒久と誤判定すると、回復すれば通ったはずのジョブまでデッドレターに溜まり、再投入の手間が増えます。逆に恒久を再試行に倒すと費用が溶けます。迷う失敗は「少しだけ試して、尽きたら退避」が安全側です。
パイプライン本体に退避経路を組み込む
分類と退避先がそろえば、本体はリトライと退避を素直につなぐだけです。恒久失敗は即座に退避し、unknown は控えめなバックオフで試し、上限に達したら「リトライ尽き」として退避します。
import random
class ApiError ( Exception ):
def __init__ (self, status_code: int , message: str ):
super (). __init__ (message)
self .status_code = status_code
self .message = message
def run_with_dead_letter (job_id: str , payload: dict , model: str ,
call_model, max_attempts: int = 4 ):
"""モデル呼び出しをリトライし、恒久失敗・リトライ尽きは退避します。
call_model(payload) -> dict を呼ぶ前提です。失敗時は ApiError を送出します。
"""
attempt = 0
while True :
attempt += 1
try :
return call_model(payload)
except ApiError as exc:
kind = classify_failure(exc.status_code, exc.message)
if kind == "permanent" :
park(job_id, payload, model,
error_class = "permanent: %d " % exc.status_code,
error_message = exc.message, attempts = attempt)
return None
if attempt >= max_attempts:
park(job_id, payload, model,
error_class = "exhausted: %d " % exc.status_code,
error_message = exc.message, attempts = attempt)
return None
# retryable / unknown は控えめなバックオフで様子を見ます。
backoff = min ( 2 ** attempt, 30 ) + random.uniform( 0 , 1 )
time.sleep(backoff)
error_class に permanent:400 と exhausted:503 のように接頭辞を付けておくと、あとで内訳を見たときに「内容の問題」と「外部要因の継続」を一目で区別できます。デッドレターの価値は退避そのものより、退避したものを分類して読めることにあります。
直したあとに、安全に再投入する
退避はゴールではありません。原因を直したあと、溜まったジョブを回収して初めて完結します。ここで雑に全件を投げ直すと、まだ直っていない恒久失敗(いわゆる毒ジョブ)を再びぶつけて費用を溶かすので、再投入は必ず件数上限つき・ドライラン優先で行います。
def replay (call_model, limit: int = 20 , dry_run: bool = True ,
path: str = DLQ_PATH ) -> dict :
"""parked のジョブを再投入します。原因修正後に手動で叩く前提です。"""
con = sqlite3.connect(path)
con.row_factory = sqlite3.Row
rows = con.execute(
"SELECT * FROM dead_letter WHERE status = 'parked' "
"ORDER BY last_seen ASC LIMIT ?" ,
(limit,),
).fetchall()
recovered, still_failing = 0 , 0
for row in rows:
payload = json.loads(row[ "payload_json" ])
if dry_run:
print ( "[dry-run] would replay %s ( %s )" % (row[ "job_id" ], row[ "error_class" ]))
continue
try :
call_model(payload)
con.execute(
"UPDATE dead_letter SET status = 'recovered', last_seen = ? "
"WHERE job_id = ?" ,
(time.time(), row[ "job_id" ]),
)
recovered += 1
except ApiError:
still_failing += 1
con.commit()
con.close()
return { "scanned" : len (rows), "recovered" : recovered,
"still_failing" : still_failing, "dry_run" : dry_run}
dry_run=True を既定にしているのは、まず「何が再投入されるか」を目で確かめてから本番を叩くためです。私はこの確認を省いて毒ジョブをまとめて投げ直し、無駄な呼び出しを増やした失敗があるので、ドライランは既定オンにしておくのが安全だと考えています。再投入で成功したものは recovered に状態を移し、parked の一覧から外して、次に見るべき残りだけが残るようにします。
取りこぼしを「毎日見える」状態にする
デッドレターを作っても、誰も見なければ消失と大差ありません。退避が機能している確証を得るために、1日1回だけ内訳をログへ出すようにしています。出力は短く、error_class ごとの件数だけで十分です。
def dlq_summary (path: str = DLQ_PATH ) -> None :
"""デッドレターの内訳を1日1回ログに出します。"""
con = sqlite3.connect(path)
rows = con.execute(
"SELECT error_class, COUNT(*) AS n FROM dead_letter "
"WHERE status = 'parked' GROUP BY error_class ORDER BY n DESC"
).fetchall()
con.close()
if not rows:
print ( "dead-letter: clean (0 parked)" )
return
total = sum (n for _, n in rows)
print ( "dead-letter: %d parked" % total)
for error_class, n in rows:
print ( " %4d %s " % (n, error_class))
毎朝この1行を見る習慣ができると、「permanent:400 が今日だけ急増した」のような変化に気づけます。私はこの数字を、その日の入力生成側にバグが混ざったかどうかの早期センサーとして使っています。件数そのものより、前日との差分が効きます。
運用に入れて見えた、しきい値の置き方
最後に、コードには表れない判断をいくつか残します。max_attempts は4前後で十分でした。手元の運用では8まで増やしても回復率は2〜3%ほどしか伸びず、恒久失敗を遅く退避させるだけでした。unknown の再試行は2回までに抑えると、毒ジョブが費用を溶かす範囲を狭められます。こうした落とし穴を本番運用で踏む前に、しきい値は小さめから始めることを私は推奨します。
アラートは「parked の総数」ではなく「前日比の増加」に対して鳴らすほうが実用的でした。総数で鳴らす設計は、未回収が少し溜まっただけで通知が常時点灯し、やがて無視されるので回避してください。差分で鳴らせば、いつもと違う日だけ手が動きます。これは Gemini API に限らず、無人で回す処理すべてに通じる感覚だと感じています。
同じ題材をイベント駆動の全体像から見たい方は、Gemini API でイベント駆動型の非同期パイプラインを設計する記事 が、リトライの線引きを深めたい方は429 を全部リトライしてはいけないというリトライ設計の記事 が近い話題です。
まず手元の SQLite に dead_letter テーブルを1枚作り、今動いているパイプラインの return None を park(...) に1か所だけ置き換えてみてください。次に落ちた1件が、消えずに残るようになります。