リデューサーは純粋関数ですが、その前後の「現在状態を読む→判定する→書き戻す」は、二経路が同時に走ると競合します。Webhook ハンドラと照合ポーラーがほぼ同時に同じオペレーションを処理すると、両方が同じ古い状態を読み、両方が「前進する」と判定し、片方の書き込みがもう片方を踏みます。ここは version を使った楽観ロックで閉じます。
import Database from "better-sqlite3";const db = new Database("ops.db");db.exec(` CREATE TABLE IF NOT EXISTS operations ( name TEXT PRIMARY KEY, state TEXT NOT NULL, update_time TEXT NOT NULL, version INTEGER NOT NULL );`);// 競合時はリトライする。リデューサーは純粋なので何度走らせても安全。function ingest(ev: OpEvent, maxRetry = 4): ApplyResult { for (let attempt = 0; attempt < maxRetry; attempt++) { const cur = db .prepare("SELECT name, state, update_time AS updateTime, version FROM operations WHERE name = ?") .get(ev.name) as OpState | undefined; if (!cur) { // 初見のオペレーション。条件付き INSERT で競合を検出する const ins = db .prepare( "INSERT OR IGNORE INTO operations(name, state, update_time, version) VALUES(?, ?, ?, 1)" ) .run(ev.name, ev.state, ev.updateTime); if (ins.changes === 1) { return { applied: true, next: { name: ev.name, state: ev.state, updateTime: ev.updateTime, version: 1 } }; } continue; // 同時に誰かが INSERT した。読み直してリトライ } const res = applyEvent(cur, ev); if (!res.applied) return res; // 後退イベント。捨てて終わり(成功扱い) // version 一致を条件に更新。負ければ誰かが先に進めた → リトライ const upd = db .prepare( "UPDATE operations SET state = ?, update_time = ?, version = ? WHERE name = ? AND version = ?" ) .run(res.next.state, res.next.updateTime, res.next.version, ev.name, cur.version); if (upd.changes === 1) return res; // 競合した。ループ先頭へ戻って読み直す } return { applied: false, reason: "max-retry-exceeded" };}
UPDATE ... WHERE version = ? が肝です。読んだときの version を条件に書くので、自分が読んでから誰かが状態を進めていれば、書き込みは 0 行になって失敗します。失敗したら読み直してもう一度判定する。リデューサーが純粋で副作用を持たないからこそ、この再試行を何度繰り返しても害がありません。
Webhook ハンドラと照合ポーラーを同じリデューサーに通す
ここまでの部品が効くのは、状態を変えうる経路を一本残らずingest に集約したときだけです。Webhook ハンドラから直接 state = ... と書く近道を一つでも残すと、そこから巻き戻りが復活します。
// Webhook 受信ハンドラ(署名検証は別記事の方式を踏襲)app.post("/gemini/webhook", verifySignature, (req, res) => { const ev = normalizeEvent(req.body); // name 正規化・updateTime 抽出 const result = ingest(ev); if (result.applied) enqueueSideEffect(result.next); // 公開・課金などは冪等な受け口へ res.sendStatus(200); // 後退イベントを捨てた場合も 200。再配送を誘発しない});// 照合ポーラー(低頻度)。API の現在状態を「イベント」に変換して同じ口へ流すasync function reconcileOnce(name: string) { const op = await genaiClient.operations.get({ name }); const ev: OpEvent = { name: normalizeName(op.name), state: op.done ? terminalStateOf(op) : "RUNNING", updateTime: op.metadata?.updateTime ?? new Date(0).toISOString(), }; ingest(ev); // 単調適用を通るので、新鮮な Webhook を古い照合で塗り潰さない}