Gemini API の Webhooks で Batch 処理のポーリングを畳んだあと、受信側のコードを読み返していて手が止まりました。この URL に POST してくるのは Google だけだ、と私はどこかで思い込んでいたのです。実際には、Webhook の受信エンドポイントはインターネットに公開された URL であり、その気になれば誰でも、どんな JSON でも投げ込めます。
個人開発の Webhook 受信部は、最初は「動けばいい」で書かれがちです。私自身、Dolice Labs のメンバーシップ決済で Stripe の Webhook を運用してきた経験がなければ、素通しのまま本番に置いていたと思います。決済の世界では「Webhook を信じすぎた実装」が事故の定番として知られていて、その規律は Gemini の自動処理パイプラインにもそのまま持ち込めます。ここでは、受信エンドポイントを三層で守る設計を、動くコードと一緒に整理します。
素通しの受け口で、実際に何が起きるのか
防御を考える前に、故障モードを具体的にしておきます。受信部が「届いた JSON をそのまま信じて処理する」実装だった場合、次の3つが起きえます。
偽イベント 。攻撃者、あるいは単に誤設定された別システムが、「ジョブ完了」を装ったペイロードを POST してきます。受信部がそれを信じて後段を起動すると、まだ存在しない結果を取りに行って空の成果物を保存したり、未完成のデータを公開処理に流したりします。悪意がなくても、ステージング環境の通知が本番の受け口に飛んでくる、という事故は現実に起きます。
リプレイ 。過去に受け取った正規のイベントが、もう一度届きます。意図的な再送攻撃だけでなく、配信側のリトライでも同じことが起きます。一度処理を終えたジョブの後段がもう一度走り、処理コストが二重にかかったり、公開済みのデータを上書きしたりします。
二重処理 。受信部の応答が遅れると、配信側はタイムアウトと判断して同じイベントを再送します。受信部が重い処理を同期でやっているほど応答は遅れ、再送が増え、さらに詰まる、という悪循環に入ります。
重要なのは、後半の2つは攻撃がなくても正規の再送だけで発生するという点です。ここが本番運用での落とし穴で、悪意への防御と、再送・順序入れ替わりへの対処は、実装上ほとんど同じ場所に収まります。順序の問題については完了したはずの Gemini ジョブが『実行中』に戻った — Webhook の順序入れ替わりを止める単調適用の設計 で扱ったので、ここでは受け口そのものの防御に集中します。
原則 — ペイロードは「通知」であって「事実」ではない
三層の防御に入る前に、すべての土台になる原則をひとつ決めます。Webhook のペイロードで状態を更新しない 、ということです。
届いたイベントは「何かが変わったかもしれない」という合図として扱い、実際の状態は必ず Gemini API に照会し直します。ペイロードに state: succeeded と書いてあっても、それを理由に後段を動かさない。照会した結果が完了を示したときだけ動かす。この一手で、偽イベントの実害は構造的にほぼ消えます。攻撃者が偽造できるのは「照会のきっかけ」だけになり、事実は API 側にしか存在しないからです。
この考え方は、Webhooks に移しても、デプロイの瞬間にイベントは落ちる — Gemini 長時間オペレーションを照合で二重化した運用メモ で書いた照合の設計と地続きです。イベントの取りこぼしに備えて API 照会で照合する仕組みを持っているなら、その照会ロジックをそのまま「ペイロードを信じない」ためにも使えます。防御のためだけの新しい部品は、ほとんど要りません。
第一層 — 到達の検証: 推測不能なパスとシークレットヘッダ
最初の層は「そもそも正規の送信元以外のリクエストを、処理の手前で落とす」ことです。実装は地味ですが、費用対効果が最も高い部分です。
ひとつめは、受信 URL のパス自体に十分に長い乱数トークンを入れることです。/hooks/gemini のような推測可能なパスではなく、/hooks/gemini/f3a9c2e8b1d4... のように 128 ビット相当の乱数を含めます。URL を知らない相手は、そもそも有効なエンドポイントに到達できません。
ふたつめは、通知設定側で付与できる検証手段を必ず使うことです。カスタムヘッダにシークレットを載せられるなら載せ、受信側で照合します。署名方式が提供されているならそちらを優先します。照合には hmac.compare_digest を使い、タイミング攻撃に対して定数時間で比較します。
なお、送信元 IP のアローリストを主防御にするのは推奨しません。供給元の IP レンジは予告なく変わることがあり、メンテナンスを怠ると正規イベントを落とす側に倒れます。入れるとしても補助の層に留めることを推奨します。
第二層 — リプレイと重複を UNIQUE 制約で止める
到達の検証を抜けた正規のイベントにも、再送とリプレイがあります。ここは受信記録テーブルの一意制約で機械的に畳みます。
CREATE TABLE processed_events (
dedupe_key TEXT PRIMARY KEY , -- イベントID(無ければ合成キー)
received_at INTEGER NOT NULL -- UNIX 秒
);
イベントに一意な ID が含まれているならそれを dedupe_key にします。含まれていない、あるいは信用しきれない場合は、オペレーション名 + 状態 の合成キーで代用します。「同じオペレーションの同じ状態遷移は一度しか処理しない」という意味論になり、リプレイにも配信リトライにも同じ一枚で効きます。
挿入は INSERT OR IGNORE で行い、影響行数が 0 なら既知のイベントとして黙って 200 を返します。ここでエラーを返さないのが運用上のコツです。重複に 4xx や 5xx を返すと、配信側によっては再送がさらに増えます。
第三層 — ハンドラは受領だけ、処理はキューで
受信ハンドラの仕事は「検証して、記録して、すぐ 200 を返す」ところまでに絞ります。結果の取得や後段の起動のような重い処理を同期でやると、応答遅延がタイムアウト再送を呼び、二重処理の温床になります。
三層をまとめた受信部の実装がこちらです。FastAPI と SQLite だけで動きます。
import hmac
import os
import sqlite3
import time
from fastapi import FastAPI, Header, HTTPException, Request
app = FastAPI()
HOOK_TOKEN = os.environ[ "HOOK_PATH_TOKEN" ] # URL パスに埋める乱数
HOOK_SECRET = os.environ[ "HOOK_SHARED_SECRET" ] # 通知設定側で付与するヘッダ値
DB_PATH = os.environ.get( "HOOK_DB" , "webhook.db" )
def db () -> sqlite3.Connection:
conn = sqlite3.connect( DB_PATH )
conn.execute(
"CREATE TABLE IF NOT EXISTS processed_events ("
" dedupe_key TEXT PRIMARY KEY,"
" received_at INTEGER NOT NULL)"
)
conn.execute(
"CREATE TABLE IF NOT EXISTS work_queue ("
" id INTEGER PRIMARY KEY AUTOINCREMENT,"
" operation_name TEXT NOT NULL,"
" enqueued_at INTEGER NOT NULL,"
" done INTEGER NOT NULL DEFAULT 0)"
)
return conn
@app.post ( "/hooks/gemini/ {token} " )
async def receive (token: str , request: Request,
x_hook_secret: str = Header( default = "" )):
# 第一層: 到達の検証(パス乱数 + シークレットヘッダを定数時間比較)
if not hmac.compare_digest(token, HOOK_TOKEN ):
raise HTTPException( status_code = 404 )
if not hmac.compare_digest(x_hook_secret, HOOK_SECRET ):
raise HTTPException( status_code = 401 )
payload = await request.json()
# ペイロードの形は通知の種類・設定で変わるため、実環境の実物に合わせます
operation = payload.get( "operation" , {}).get( "name" ) or payload.get( "name" , "" )
if not operation:
# 形が想定外でも 200 で受領し、再送ループを作らない
return { "status" : "ignored" }
event_id = payload.get( "event_id" ) or f " { operation } : { payload.get( 'state' , '' ) } "
conn = db()
with conn:
cur = conn.execute(
"INSERT OR IGNORE INTO processed_events (dedupe_key, received_at)"
" VALUES (?, ?)" ,
(event_id, int (time.time())),
)
if cur.rowcount == 0 :
return { "status" : "duplicate" } # 第二層: リプレイ・再送はここで畳む
conn.execute(
"INSERT INTO work_queue (operation_name, enqueued_at) VALUES (?, ?)" ,
(operation, int (time.time())),
)
# 第三層: 重い処理はここでやらない。受領だけ返す
return { "status" : "accepted" }
期待する動きはこうです。正しいトークンとヘッダを持つ新規イベントには {"status": "accepted"}、二度目の同じイベントには {"status": "duplicate"}、トークン違いには 404 が返ります。404 にしているのは、認証失敗の 401 を返すとエンドポイントの存在自体を教えてしまうためで、パス乱数の層では「存在しない」ことにしておきます。
そしてキューを処理するワーカー側で、冒頭の原則を実装します。
import sqlite3
from google import genai
client = genai.Client() # GEMINI_API_KEY は環境変数から
def fetch_authoritative_state (operation_name: str ):
"""ペイロードではなく API を事実として扱う: 状態を取得し直す"""
return client.operations.get( name = operation_name)
def run_worker (db_path: str = "webhook.db" ) -> None :
conn = sqlite3.connect(db_path)
rows = conn.execute(
"SELECT id, operation_name FROM work_queue"
" WHERE done = 0 ORDER BY id LIMIT 20"
).fetchall()
for row_id, operation_name in rows:
op = fetch_authoritative_state(operation_name)
if not getattr (op, "done" , False ):
# 偽の「完了」通知はここで実害を失う: 事実がまだなら何もしない
continue
handle_completed(op) # 後段処理(結果の取得・保存など)
with conn:
conn.execute( "UPDATE work_queue SET done = 1 WHERE id = ?" , (row_id,))
def handle_completed (operation) -> None :
# 結果の取り出しと保存。ここも冪等に書いておくと再実行に強くなります
...
偽イベントが第一層を突破し、第二層の重複判定もすり抜けたとしても、ワーカーは API に状態を照会し直すので、実在しないオペレーションや未完了のジョブに対しては何も起きません。攻撃者に残されているのは「無駄な照会を1回発生させる」ことだけで、それはレート制限とキューの上限で抑えられます。
決済 Webhook から持ち込んだ規律
この構成は、私が Stripe の Webhook 運用で身につけた規律の写しです。Dolice Labs のメンバーシップ決済では、checkout.session.completed のペイロードを信じて権限を付与するのではなく、必ずセッションを取得し直して支払い状態とメタデータを確認してから Cookie を発行する作りにしています。お金が絡む処理では「届いた JSON を信じない」のが当たり前の作法なのに、生成パイプラインになると素通しで書いてしまう。この非対称に気づいたのが、受信部を書き直したきっかけでした。
生成パイプラインの Webhook は、決済ほど直接的にお金を扱いませんが、偽の完了通知が公開処理を誤爆させれば、壊れたコンテンツの公開という形で信用を損ないます。個人開発では復旧に割ける時間も限られるので、事故の芽は受け口で摘んでおくほうが安上がりだと考えています。
どこまでやるか — 対策ごとのコストと私の判断
三層すべてを一度に入れる必要はありません。私が実際に入れた順番と判断を、コストと一緒に並べておきます。
対策 主に止まる事故 実装コスト 判断
パスの乱数トークン 偽イベントの大半(到達自体を防ぐ) 数分 必須。最初にやる
シークレットヘッダ照合 URL が漏れた場合の偽イベント 30分程度 必須
受信記録の UNIQUE 制約 リプレイ・配信リトライの二重処理 1時間程度 必須
API への照会し直し 偽ペイロード全般・順序入れ替わり 半日程度 必須。これが本丸
送信元 IP アローリスト 到達段階の偽イベント(補助) 継続メンテが必要 任意。主防御にしない
専用キュー基盤への移行 高負荷時の取りこぼし 数日 規模が出てから
SQLite の受信記録とキューで足りるのか、という点は運用してみての実感ですが、個人開発の Webhook 流量(多くても1日数百イベント)では全く問題になりません。Batch 処理を Webhook 駆動に組み替えた経緯は深夜のポーリングをやめる — Gemini Batch API を Webhook 駆動に作り替えた設計記録 に書いたとおりで、今回の防御層はその上に足す形になっています。
まとめ — まず受信パスに乱数を足す
全部をやろうとすると腰が重くなるので、最初の一歩だけ決めておきます。いま動いている Webhook 受信 URL のパスに乱数トークンを足し、通知設定側の URL を差し替えてください。数分で終わり、それだけで無差別な偽イベントの到達をほぼ断てます。そのあと時間が取れたときに、受信記録テーブルと「照会し直すワーカー」へ進むのが、私のおすすめの順番です。
同じように無人のパイプラインを運用している方の点検の参考になれば幸いです。