Interactions API のバックグラウンド実行が GA に入って、長時間処理を「投げて、後で受け取る」構成が素直に書けるようになりました。私は個人開発で記事生成のパイプラインを定期実行で回していますが、ここで一つ厄介な前提があります。cron で起動するランナーは、処理を投げたあとプロセスごと終了してしまうということです。
import sqlite3, timedef open_ledger(path="reclaim_ledger.db"): db = sqlite3.connect(path, isolation_level=None) # autocommit db.execute("PRAGMA journal_mode=WAL") db.execute(""" CREATE TABLE IF NOT EXISTS jobs ( idem TEXT PRIMARY KEY, -- 冪等キー(論理ジョブの一意性) handle TEXT, -- Interactions API のハンドル名 status TEXT NOT NULL, -- reserving / submitted / done / consumed / failed submitted_at REAL, updated_at REAL NOT NULL ) """) db.execute("CREATE INDEX IF NOT EXISTS idx_status ON jobs(status)") return db
idem を主キーにしておくことが冪等性の土台です。同じ論理ジョブ(たとえば「2026-06-30 のニュース要約」)を二度投げようとしても、予約の時点で主キー衝突として弾けます。
def reclaim(db, on_result): rows = db.execute( "SELECT idem, handle FROM jobs WHERE status='submitted' AND handle IS NOT NULL" ).fetchall() for idem, handle in rows: op = client.interactions.get(name=handle) if not op.done: continue # まだ処理中。次ティックで再訪 if getattr(op, "error", None): db.execute("UPDATE jobs SET status='failed', updated_at=? WHERE idem=?", (time.time(), idem)) continue # 完了。後段へ渡す前に done へ進めて二重処理を防ぐ db.execute("UPDATE jobs SET status='done', updated_at=? WHERE idem=?", (time.time(), idem)) on_result(idem, op.response) # ここで保存・公開などの副作用 db.execute("UPDATE jobs SET status='consumed', updated_at=? WHERE idem=?", (time.time(), idem))
def resume_unfinished(db, on_result): # done まで進んだが consumed されていない=副作用の途中で落ちたジョブ rows = db.execute("SELECT idem, handle FROM jobs WHERE status='done'").fetchall() for idem, handle in rows: op = client.interactions.get(name=handle) on_result(idem, op.response) db.execute("UPDATE jobs SET status='consumed', updated_at=? WHERE idem=?", (time.time(), idem))
ひとつは、reserving のまま長く留まっている行の検出です。送信が本当に失敗したのか、送信は成ったがハンドルを書けなかったのか、台帳側からは区別できません。そこで送信時に冪等キーを metadata へ刻んでおき(前掲の submit 参照)、回収時はそのタグで API 側のジョブを逆引きして突き合わせます。
def recover_orphans(db, max_age=120): stale = db.execute( "SELECT idem FROM jobs WHERE status='reserving' AND updated_at < ?", (time.time() - max_age,), ).fetchall() if not stale: return stale_keys = {row[0] for row in stale} # API 側に実在するジョブを冪等キーで逆引きし、台帳へハンドルを書き戻す for op in client.interactions.list(filter="background=true"): idem = (op.metadata or {}).get("idem") if idem in stale_keys: db.execute( "UPDATE jobs SET handle=?, status='submitted', updated_at=? WHERE idem=?", (op.name, time.time(), idem), ) stale_keys.discard(idem) # ここで残った stale_keys は API 側にも無い=送信自体が失敗。 # reserving のまま残し、次の submit で再送させる