朝、バッチジョブのステータスが SUCCEEDED になっていました。安心して結果を書き戻し、その日は別の作業に移りました。
数日後、分類結果を眺めていて気づきます。ある一群のレビューだけ、カテゴリが空のまま Firestore に入っていました。件数にして数十件。ジョブ全体は「成功」でも、その中の一部の行は静かに落ちていたのです。
個人開発で複数のアプリと4つのサイトを回していると、夜間バッチは「動かしてから寝る」道具になります。App Store Connect と Google Play Console に溜まったレビューを分類する私自身の運用でも、翌朝に全件が揃っている前提で次の処理を積み上げていました。だからこそ、この「一部だけ落ちる」取りこぼしは、後工程まで汚染してしまいます。
この記事は、Batch API の完了を「全件成功」と読んでしまう油断をやめ、行単位で結果を台帳に記録し、落ちた行だけを拾い直す 設計をまとめたものです。夜間処理の初回実装ではなく、その後に必ず訪れる「失敗した数十件をどう回収するか」に焦点を当てています。
「完了」と「全件成功」は違う
Batch API のジョブ状態と、ジョブに含まれる個々のリクエストの成否は、別の層の話です。ジョブは無事に終わっても、出力JSONL の各行には成功したレスポンスと失敗したエラーが混在します。
出力の1行は、おおよそ次のどちらかの形をしています。SDK のバージョンによってキー名は前後します。この場合は、実際の出力を一度 head で覗いてから実装に落とすことを推奨します。
{ "key" : "review-000512" , "response" : { "candidates" : [ ... ]}}
{ "key" : "review-000513" , "error" : { "code" : 400 , "message" : "..." }}
つまり、ジョブが SUCCEEDED でも、error を持つ行は普通に混ざります。私が取りこぼしたのは、safety フィルタに引っかかった攻撃的なレビュー本文と、絵文字だけで構成された本文でスキーマ抽出に失敗した行でした。
私自身の実測では、8,000件規模のバッチで error を持つ行はおおむね全体の1〜2%、件数にして数十件から百数十件でした。通常APIの約50%というコストで回せる代わりに、この数%を静かに取りこぼすと、後工程がじわじわ狂います。
ここで大切な考え方を1つ。ジョブの成否と、行の成否を、別々に記録する こと。この2層を混ぜて「ジョブが成功したから全部入れる」とすると、必ず穴が空きます。
行単位の台帳を持つ
そこで、投入するリクエスト1件ごとに状態を持つ台帳を用意します。キーは custom_id(ここでは key)です。状態は次の4つに絞りました。
状態 意味 次にすること
pending まだ結果を受け取っていない 次のバッチに含める
succeeded 正常な構造化出力が得られた 何もしない(確定)
retryable 一時的な失敗(429 / 503 等) 回数上限まで再投入する
permanent 入力起因・safety 等で再試行しても直らない 再投入せず、別扱いで人間が見る
SQLite にしたのは、個人開発の夜間バッチで「1ファイルで持ち運べて、途中で落ちても残る」ことが何より効くからです。台帳のスキーマと初期化はこれだけです。
import sqlite3
import time
def open_ledger (path: str = "batch_ledger.db" ) -> sqlite3.Connection:
conn = sqlite3.connect(path)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS rows (
key TEXT PRIMARY KEY,
payload TEXT NOT NULL, -- 入力リクエスト(JSON文字列)
status TEXT NOT NULL DEFAULT 'pending',
attempts INTEGER NOT NULL DEFAULT 0,
last_error TEXT,
result TEXT, -- 成功時の抽出結果(JSON文字列)
updated_at REAL NOT NULL DEFAULT 0
)
"""
)
conn.commit()
return conn
def enroll (conn: sqlite3.Connection, key: str , payload: str ) -> None :
"""初回投入時に pending として登録する。再投入では触らない。"""
conn.execute(
"INSERT OR IGNORE INTO rows(key, payload, updated_at) VALUES (?, ?, ?)" ,
(key, payload, time.time()),
)
conn.commit()
INSERT OR IGNORE にしているのが要点です。2晩目に同じ key で再投入しても、succeeded 行を pending に巻き戻さない。台帳は「一度確定した成功を守る」ためにあります。
失敗を「一時」と「恒久」に切り分ける
リトライ設計でいちばん事故が起きるのは、恒久失敗を一時失敗と誤認して延々と投げ続けてしまうことです。safety ブロックされたレビューは、100回投げても100回ブロックされます。ここでコストと時間が静かに溶けます。
そこで、出力行の error を見て状態を決める分類器を1つ挟みます。判断材料は HTTP ステータス相当のコードと、メッセージの型です。
# 一時的とみなすコード(再投入で直る見込みがある)
TRANSIENT_CODES = { 429 , 500 , 502 , 503 , 504 }
# 恒久的とみなすコード(入力・権限・仕様起因で、再投入しても直らない)
PERMANENT_CODES = { 400 , 403 , 404 , 422 }
def classify_error (error: dict ) -> str :
code = error.get( "code" )
message = (error.get( "message" ) or "" ).lower()
if code in TRANSIENT_CODES :
return "retryable"
if code in PERMANENT_CODES :
return "permanent"
# safety ブロックはコードで表れないことがあるためメッセージでも判定
if "safety" in message or "blocked" in message or "recitation" in message:
return "permanent"
# 判断がつかないものは、いったん retryable にして回数上限で頭打ちにする
return "retryable"
判断がつかないエラーを retryable に倒しているのは、意図があります。未知の失敗を恒久扱いで捨てると、本当は一過性だった行を失います。逆に一時扱いにしても、後述の試行回数の上限 が最終的な歯止めになるので、暴走はしません。迷ったら拾える側に倒す、という運用判断です。
出力を台帳に照合して書き戻す
ジョブが終わったら、出力JSONL を1行ずつ読み、key で台帳を更新します。成功なら succeeded、失敗なら分類結果で retryable か permanent に落とします。
import json
MAX_ATTEMPTS = 4 # 一時失敗をここまで再投入したら permanent に落とす
def reconcile (conn: sqlite3.Connection, output_path: str ) -> dict :
counts = { "succeeded" : 0 , "retryable" : 0 , "permanent" : 0 }
with open (output_path, "r" , encoding = "utf-8" ) as f:
for line in f:
line = line.strip()
if not line:
continue
record = json.loads(line)
key = record[ "key" ]
if "response" in record and "error" not in record:
extracted = extract_structured(record[ "response" ])
if extracted is None :
# レスポンスは来たがスキーマ抽出に失敗 → 恒久扱い
_mark(conn, key, "permanent" , "schema_extract_failed" )
counts[ "permanent" ] += 1
else :
_mark(conn, key, "succeeded" , None ,
result = json.dumps(extracted, ensure_ascii = False ))
counts[ "succeeded" ] += 1
continue
# error 行の処理
error = record.get( "error" , { "message" : "unknown" })
verdict = classify_error(error)
row = conn.execute(
"SELECT attempts FROM rows WHERE key = ?" , (key,)
).fetchone()
attempts = (row[ 0 ] if row else 0 )
if verdict == "retryable" and attempts + 1 >= MAX_ATTEMPTS :
# 上限に達したら、これ以上投げない
verdict = "permanent"
_mark(conn, key, verdict, json.dumps(error, ensure_ascii = False ))
counts[verdict] += 1
return counts
def _mark (conn, key, status, last_error, result = None ):
conn.execute(
"""
UPDATE rows
SET status = ?,
last_error = ?,
result = COALESCE(?, result),
attempts = attempts + CASE WHEN ? = 'retryable' THEN 1 ELSE 0 END,
updated_at = ?
WHERE key = ?
""" ,
(status, last_error, result, status, time.time(), key),
)
conn.commit()
extract_structured は、あなたのスキーマに合わせて response からカテゴリなどを取り出す関数に差し替えてください。ここでの肝は、「レスポンスは返ってきたがスキーマに合わない」を成功に数えない ことです。私が最初に取りこぼした絵文字だけの本文は、まさにこの型でした。200 が返っているのに中身が空、という失敗は、error 行より見つけにくいのです。
失敗した行だけを再投入する
翌晩は、台帳から pending と retryable の行だけを引き、新しい JSONL を作って投げます。succeeded と permanent は触りません。これが選択的リトライの本体です。
def build_retry_batch (conn: sqlite3.Connection, out_path: str ) -> int :
rows = conn.execute(
"SELECT key, payload FROM rows WHERE status IN ('pending', 'retryable')"
).fetchall()
with open (out_path, "w" , encoding = "utf-8" ) as f:
for key, payload in rows:
f.write(json.dumps(
{ "key" : key, "request" : json.loads(payload)},
ensure_ascii = False ,
) + " \n " )
return len (rows)
再投入でも key は初回と同じものを使います。台帳は key 主キーなので、同じ行が二重に増えることはありません。冪等性は「同じ key を使い続ける」という一点で担保されます。この単純さが、夜間に無人で回すうえで効いてきます。
投入コードは初回と共通です。生成した JSONL をアップロードし、バッチジョブを作るだけです。
from google import genai
client = genai.Client( api_key = "YOUR_GEMINI_API_KEY" )
def submit_batch (src_path: str , model: str = "gemini-flash-latest" ) -> str :
uploaded = client.files.upload(
file = src_path,
config = { "mime_type" : "application/jsonl" },
)
job = client.batches.create( model = model, src = uploaded.name)
return job.name # このジョブ名を台帳とは別のジョブ表に控えておく
モデルに gemini-flash-latest を指定していますが、2026年7月に GA となった Gemini 3.5 Flash がこの別名の実体になっています。別名は実体が入れ替わるため、分類結果の傾向が変わっていないかは、後述のゴールデン数件で毎回確認しておくと安心です。
完了はポーリングではなく Webhook で受ける
2026年6月に Batch API へイベント駆動の Webhooks が入り、完了を受け取るのにポーリングが要らなくなりました。無人運用ではこれがよく効きます。翌朝に while True で batches.get を叩き続ける代わりに、完了通知を受けて照合を始められます。
受信側は、通知に含まれるジョブ名を検証し、出力をダウンロードして reconcile を呼ぶだけです。
from flask import Flask, request, abort
app = Flask( __name__ )
@app.post ( "/gemini/batch-complete" )
def on_batch_complete ():
# 署名検証は必須(詳細は割愛。共有秘密での HMAC を推奨)
if not verify_signature(request):
abort( 401 )
job_name = request.json.get( "name" )
job = client.batches.get( name = job_name)
if job.state != "JOB_STATE_SUCCEEDED" :
# 失敗ジョブは別扱い。台帳の行はすべて pending のまま残る
return ( "noted" , 200 )
output_path = download_output(job) # dest から手元へ
conn = open_ledger()
counts = reconcile(conn, output_path)
app.logger.info( "reconciled: %s " , counts)
return ( "ok" , 200 )
Webhook は少なくとも1回は届く前提で作ります。同じ完了通知が二度来ても、reconcile は key で冪等に上書きするだけなので、二重処理にはなりません。台帳を状態機械にしておくと、こうした「重複配信」への耐性が自然と手に入ります。
運用して見えた落とし穴
無人で数晩回すと、初回実装では見えなかった穴がいくつも出てきました。本番運用に入る前に、次の4点を潰しておくと安定します。
恒久失敗を放置すると通知が鳴りやまない。 permanent 行は再投入しない代わりに、溜まり続けます。私は permanent が一定数を超えたら日次サマリを自分に飛ばすようにして、safety ブロックの傾向(特定の言語・特定の表現)を人間の目で確認する運びにしました。捨てるのでも無限に投げるのでもなく、「人が見る箱」に入れて回避するのが落としどころです。
コスト計上が試行をまたいでズレる。 同じ行を3晩投げれば、その行には3回分の課金が乗ります。台帳の attempts を合計すれば実際の課金回数に近い数字が出るので、私はこれを日次の見積もりに使っています。「成功件数 × 単価」で見積もると、リトライ分だけ必ず下振れします。
MAX_ATTEMPTS を大きくしすぎない。 一時失敗が本当に一時的なら、2〜3回で拾えます。4回で拾えないものは、たいてい恒久失敗が紛れ込んでいます。上限を10などにすると、恒久失敗をだらだら投げ続けるだけなので、4前後に抑えることを推奨します。個人的には、この値が無駄な課金と通知を回避する落としどころでした。
台帳とジョブ表を分ける。 行の状態(rows)と、投入したジョブの履歴(どの JSONL をいつ投げ、どのジョブ名になったか)は別テーブルにします。片方を消しても、もう片方から追跡できる冗長性が、無人運用では効いてきます。
台帳が守ってくれるもの
行単位の台帳は、派手な仕組みではありません。SQLite が1つと、4つの状態、それだけです。けれど「完了=全件成功」という思い込みを、静かに、しかし確実に防いでくれます。
夜間バッチの価値は、動かしている間に眠れることにあります。その眠りが「翌朝、取りこぼしの尻拭いをする」ものにならないために、落ちた数十件を拾う仕組みを最初から持っておく。それだけで、無人運用の安心感はずいぶん変わります。
まず今日の一歩としては、いま回っているバッチの出力JSONL を head で覗き、error を持つ行が何件あるかを数えてみてください。おそらく、思っているより多いはずです。その数字が、台帳を持つ理由になります。
夜間バッチの取りこぼしに一度でも肝を冷やした方に、この台帳の考え方が届けば嬉しく思います。最後までお付き合いいただき、ありがとうございました。