長時間オペレーションをポーリングから Webhook へ移し、さらに取りこぼし対策として照合ポーラーを足した直後に、別の不具合が顔を出します。同じバッチの完了が二度処理され、生成済みの記事が二度公開されたり、完了通知が二通飛んだりするのです。
これは照合を足したことの副作用ではなく、二重路を持った時点で構造的に避けられない現象です。Webhooks に移してもデプロイの瞬間にイベントは落ちる、という照合の話が「欠落を拾い直す」側だとすれば、本記事は「拾い直した結果、同じものを二度掴んでしまう」側の手当てです。受け口を冪等にして、何度届いても副作用は一度だけ、という状態を作ります。
なぜ二度処理が起きるのか — 二重路は「事故」ではなく前提
Gemini の Webhook 配信は at-least-once です。受信側が 2xx を返す前にコネクションが切れれば、Google 側は配信成功を確認できず、同じイベントを再送します。受信は成功していたのに 2xx の返却だけ失敗した、というケースでも再送は起きます。つまり Webhook 単体でも、同じ完了が二度届く前提で組む必要があります。
そこへ照合ポーラーが加わると、重複の経路はさらに増えます。
| 重複が生まれる経路 | 何が起きているか |
| Webhook の再送 | 2xx 返却前に切断 → Google が同一イベントを再配信 |
| Webhook と照合ポーラーの衝突 | Webhook 受信処理中に、低頻度ポーラーが同じ完了を独立に検知 |
| デプロイ中の二度受け | 旧インスタンスが処理開始 → 切替で応答できず → 新インスタンスへ再送 |
| 手動リトライ | 運用者がポーラーを手で再実行し、確定前の完了を再投入 |
ここで大事なのは、これらを「異常系」として individually 潰そうとしないことです。どの経路も塞ぎきれません。代わりに、受け口を「何回呼ばれても結果が変わらない」形にしておけば、経路の数を気にせず済みます。冪等性とは、入口の数を減らす努力ではなく、出口を一本にする設計です。
全体像 — 受け口を「判定」と「副作用」に割る
二度処理を防ぐ鍵は、完了イベントを受け取ってから副作用(公開・課金・通知)を起こすまでの間に、一つの関門を置くことです。関門の役割は「この完了は、まだ誰も副作用を起こしていないか」を原子的に判定することです。
設計を三相に分けます。
- 正規化 — 届いたイベントから安定した冪等キーを取り出す
- クレーム(claim) — 「自分がこの完了を処理する」と先に主張し、競合を弾く
- 確定(commit) — 副作用を起こし切ってから、処理済みを記録する
Webhook レシーバーも照合ポーラーも、最後はこの同じ受け口を呼ぶだけにします。入口は複数でも、副作用を起こす関数は一つです。
冪等キーをどこから取るか — operation name を正規化する
冪等キーは「同じ完了なら必ず同じ値になり、違う完了なら必ず違う値になる」ものを選びます。Gemini の長時間オペレーションでは、operation の name(operations/... あるいは batch job のリソース名)が最も安定した識別子です。タイムスタンプやランダム ID を混ぜてはいけません。再送で値が変わってしまえば、冪等キーの意味がなくなります。
import re
def idempotency_key(event: dict) -> str:
"""完了イベントから安定した冪等キーを取り出す。
Webhook ペイロードと、照合ポーラーが list で得る operation の
両方から、同じ完了なら必ず同じ文字列を返せる形にする。
"""
# Webhook と REST list でフィールド名がぶれるため候補を順に当たる
name = (
event.get("name")
or event.get("operation")
or event.get("operationName")
or event.get("resourceName")
)
if not name:
raise ValueError(f"operation name が見つかりません: {list(event.keys())}")
# 末尾スラッシュ・前後空白・大文字小文字のぶれを吸収する
name = name.strip().rstrip("/")
# projects/.../locations/... のプレフィックス差を畳んで末尾の安定部分を使う
m = re.search(r"(operations|batches|batchJobs)/[A-Za-z0-9_\-]+$", name)
canonical = m.group(0) if m else name
return f"gemini:{canonical}"
# 同じ完了が Webhook 由来でも poller 由来でも同じキーになることを確認
webhook_event = {"name": "projects/p/locations/us/operations/abc-123/"}
poller_event = {"operation": "operations/abc-123"}
assert idempotency_key(webhook_event) == idempotency_key(poller_event)
print(idempotency_key(webhook_event)) # -> gemini:operations/abc-123
正規化を assert で固定しておくのが肝心です。Webhook と REST の list ではフィールド名やプレフィックスが微妙に違うことがあり、ここがずれると「同じ完了なのに別キー」になって二度処理が復活します。私自身、個人開発で複数プロジェクトのバッチをまたいで受けていたとき、プレフィックスの差で一度この罠にはまりました。正規化の単体テストは、後から救ってくれる安全網です。
claim→実行→確定の三相を SQLite で実装する
関門は、冪等キーに対する UNIQUE 制約で作ります。SQLite なら追加の依存なしに、原子的なクレームが書けます。状態は三つだけ持ちます。
| 状態 | 意味 | 次に来た同一イベントの扱い |
| CLAIMED | 誰かが処理中。副作用は未完 | 有効期限内なら無視、超過なら奪取 |
| DONE | 副作用が完了済み | 即座に無視(実質1回が成立) |
| (行なし) | まだ誰も触れていない | クレームを試みる |
import sqlite3
import time
import contextlib
DB = "idempotency.db"
CLAIM_TTL_SEC = 600 # 副作用が確実に終わる想定時間より長く取る
def init_db():
con = sqlite3.connect(DB)
con.execute("""
CREATE TABLE IF NOT EXISTS sink (
key TEXT PRIMARY KEY, -- 冪等キー(UNIQUE)
state TEXT NOT NULL, -- CLAIMED / DONE
claimed_at REAL,
done_at REAL
)
""")
con.commit()
con.close()
@contextlib.contextmanager
def db():
con = sqlite3.connect(DB, isolation_level=None) # 明示的トランザクション
try:
yield con
finally:
con.close()
def try_claim(key: str) -> bool:
"""このプロセスが副作用を起こす権利を取れたら True。
UNIQUE 制約により、同時に来た2つの呼び出しのうち
INSERT に成功するのは必ず1つだけになる。
"""
now = time.time()
with db() as con:
con.execute("BEGIN IMMEDIATE") # 書き込みロックを先に取る
row = con.execute(
"SELECT state, claimed_at FROM sink WHERE key = ?", (key,)
).fetchone()
if row is None:
con.execute(
"INSERT INTO sink(key, state, claimed_at) VALUES (?, 'CLAIMED', ?)",
(key, now),
)
con.execute("COMMIT")
return True
state, claimed_at = row
if state == "DONE":
con.execute("COMMIT")
return False # 既に確定済み → 二度処理しない
# CLAIMED のまま放置されている(処理中にプロセスが落ちた等)
if now - (claimed_at or 0) > CLAIM_TTL_SEC:
con.execute(
"UPDATE sink SET claimed_at = ? WHERE key = ?", (now, key)
)
con.execute("COMMIT")
return True # 期限切れクレームを奪取して再試行
con.execute("COMMIT")
return False # 他プロセスが処理中。今回は何もしない
def mark_done(key: str):
with db() as con:
con.execute(
"UPDATE sink SET state = 'DONE', done_at = ? WHERE key = ?",
(time.time(), key),
)
BEGIN IMMEDIATE で書き込みロックを先取りするのが重要です。デフォルトの遅延トランザクションだと、二つの呼び出しが同時に「行なし」を読んでから両方 INSERT に進み、片方が UNIQUE 違反で例外になります。例外を握り潰す作りにしていると、握り潰したほうが「クレーム失敗」なのか「既に DONE」なのか区別できず、結局二度処理が漏れます。ロックを先に取れば、判定そのものが直列化されます。
副作用を一度だけ走らせる受け口
三相を一つの関数にまとめます。Webhook レシーバーも照合ポーラーも、これを呼ぶだけです。
def process_completion(event: dict, do_side_effect) -> str:
"""完了イベントを受け取り、副作用を実質1回だけ起こす。
do_side_effect(key) は公開・課金・通知などの実処理。
冪等キー単位で高々1回しか呼ばれないことを保証する。
"""
key = idempotency_key(event)
if not try_claim(key):
return "skipped" # DONE 済み or 他プロセスが処理中
try:
do_side_effect(key) # ← ここが二度走ってはいけない処理
except Exception:
# 副作用が落ちたら DONE にしない。
# クレームは TTL 後に奪取され、別の機会に再試行される。
raise
mark_done(key)
return "processed"
# 使用例: Webhook も poller も同じ受け口を呼ぶ
def publish_article(key: str):
print(f"[publish] {key} の記事を公開しました")
# Webhook 由来
print(process_completion({"name": "operations/abc-123"}, publish_article))
# -> [publish] gemini:operations/abc-123 の記事を公開しました
# processed
# 照合ポーラーが同じ完了を後から拾った
print(process_completion({"operation": "operations/abc-123"}, publish_article))
# -> skipped (公開は二度走らない)
副作用が例外で落ちたとき、mark_done を呼ばずに抜けるのが設計の要です。DONE にしてしまうと、未完のまま「処理済み」と記録され、二度と再試行されません。逆に CLAIMED のまま残せば、TTL 経過後に照合ポーラーが奪取して再試行できます。失敗を DONE にしない、これだけは外さないでください。
Webhook と照合ポーラーが同時に来たときの競合
最も気になるのは、Webhook 受信と照合ポーラーが文字通り同時刻に同じ完了を掴むケースです。三相設計では、両者とも try_claim を呼びます。BEGIN IMMEDIATE のロックにより、SQLite レベルでどちらか一方だけが INSERT に成功し、もう一方は CLAIMED 行を見て False を返します。結果、副作用は一度だけ走ります。
複数プロセス・複数ホストで受ける場合は、SQLite ファイルが共有ストレージ上にあるか、KV やリレーショナル DB に置き換える必要があります。要件は一つ、「冪等キーに対する原子的な INSERT-if-not-exists が使えること」です。Cloudflare KV のように比較交換が弱いストアでは、条件付き書き込みやリレーショナル DB の UNIQUE 制約に寄せるのが安全です。
クレームの有効期限をどう決めたか
TTL は「副作用が最悪ケースで完了する時間」より十分長く取ります。短すぎると、まだ正常に処理中のクレームを別プロセスが奪い、本当に二度処理が起きます。長すぎると、プロセスが落ちた完了の再試行が遅れます。
個人開発の無人運用では、副作用の p99 実行時間を計測し、その3〜5倍を初期値にしました。公開処理が長くて数秒、外部 API のリトライを含めても1〜2分で終わる構成だったため、TTL は 600 秒に置いています。照合ポーラーの周期(数分〜十数分)より TTL を長くしておくと、ポーラーが正常処理中のクレームを誤って奪う事故を避けられます。
公式ドキュメントに書かれていない実運用の細部
- 冪等キーは副作用の単位で取る。バッチ全体で1キーにすると、バッチ内の一部だけ再処理したいときに困ります。「公開する記事1本」が副作用の単位なら、キーも記事単位に細分化します。
- DONE 行を消さない。古い DONE を掃除したくなりますが、消した直後に遅れて届いた再送があると二度処理が復活します。掃除するなら「完了から十分な再送猶予(数日)を超えた行のみ」に限ります。
skipped をログに残す。二度届いていること自体は異常ではありませんが、skipped が急増したら配信側か照合周期に変化があった兆候です。件数を見える化しておくと、静かな劣化に早く気づけます。
- 副作用の中で外部状態を読まない判定にする。「もう公開済みか」を公開先に問い合わせて分岐すると、その問い合わせ自体が競合します。判定は冪等台帳に一本化し、副作用は「起こすだけ」に保ちます。
どこから始めるか
まず、いま完了イベントを受けて副作用を起こしている箇所に process_completion を一枚かぶせ、冪等キーの正規化テストを assert で固定してください。Webhook と照合ポーラーの両方を同じ受け口に通せば、二度処理は実質1回に収束します。照合で欠落を拾い、冪等台帳で重複を弾く——この二つが揃って初めて、無人パイプラインの完了処理は「ちょうど1回」に近づきます。署名検証やポーリングからの移行そのものはBatch API を Webhook 駆動に作り替えた設計記録に整理してあるので、受信の入口を固める段ではそちらも併せてご覧ください。