背景と前提
AIアプリケーションが本番環境でスケールすると、リアルタイム処理では対応しきれないユースケースが必ず発生します。数千件の顧客レビューの感情分析、数万件のドキュメントの要約生成、大量の画像に対するキャプション付与——こうした大規模な非同期処理には、Gemini APIの Batch Processing(バッチ処理) が最適解です。
Batch Processing APIを使えば、通常の同期APIと比較して 50%のコスト削減 が実現でき、レート制限を気にすることなく大量のリクエストを一括送信できます。処理結果は24時間以内に返却され、その間アプリケーションは他のタスクに集中できます。
このガイドでは、Batch Processing APIの仕組みから、本番環境で活用するための実装パターン、エラーハンドリング、コスト最適化戦略までを包括的に解説します。
Batch Processing APIの基本概念
なぜバッチ処理が必要なのか
同期APIでは、1リクエストごとにレスポンスを待つ必要があります。大量処理の場合、以下の問題が発生します。
- レート制限: モデルやティアによって1分あたりのリクエスト数(RPM)が制限される
- コスト増大: 同期APIのフル料金が適用される
- タイムアウト: 長時間実行のリクエストがタイムアウトする可能性
- エラー処理の複雑さ: 個別リクエストの失敗をリアルタイムで処理する必要がある
Batch Processing APIは、これらの問題をすべて解決します。
料金体系
Batch Processing APIの最大のメリットはコストです。
| 処理方式 | 入力コスト | 出力コスト | 備考 |
|---------|----------|----------|------|
| 同期API(リアルタイム) | 標準料金 | 標準料金 | 即座にレスポンス |
| Batch Processing | 標準の 50% | 標準の 50% | 24時間以内にレスポンス |
ℹ️**コスト試算例**: Gemini 2.5 Flash で10万件のテキスト分類(各500トークン入力・100トークン出力)を実行する場合、同期APIでは約$3.75のところ、バッチ処理なら約$1.88で処理できます。
処理フロー
Batch Processing APIの処理フローは以下の3ステップです。
- バッチジョブの作成: リクエスト群をまとめてジョブを作成
- 非同期処理: Googleのインフラが自動的にリクエストを処理(最大24時間)
- 結果の取得: ジョブ完了後に結果を一括取得
Python でのバッチ処理実装
基本的なバッチジョブの作成
import google.generativeai as genai
import json
import time
genai.configure(api_key="YOUR_API_KEY")
def create_batch_job(requests: list[dict], model: str = "gemini-2.5-flash") -> str:
"""
バッチジョブを作成し、ジョブIDを返す。
各リクエストはカスタムIDとコンテンツを含む。
"""
batch_requests = []
for req in requests:
batch_requests.append(
genai.types.BatchRequest(
custom_id=req["id"],
request=genai.types.GenerateContentRequest(
model=f"models/{model}",
contents=[
genai.types.Content(
parts=[genai.types.Part(text=req["prompt"])]
)
],
config=genai.types.GenerateContentConfig(
temperature=0.3,
max_output_tokens=1024,
),
),
)
)
batch_job = genai.batches.create(
model=f"models/{model}",
requests=batch_requests,
config=genai.types.CreateBatchJobConfig(
display_name="my-batch-job",
),
)
print(f"バッチジョブ作成: {batch_job.name}")
print(f"状態: {batch_job.state}")
return batch_job.name
ジョブの監視と結果取得
def wait_for_batch_completion(job_name: str, poll_interval: int = 30) -> dict:
"""
バッチジョブの完了を待ち、結果を取得する。
"""
while True:
job = genai.batches.get(name=job_name)
state = job.state.name
if state == "JOB_STATE_SUCCEEDED":
print(f"ジョブ完了: 成功={job.succeeded_count}, 失敗={job.failed_count}")
return collect_results(job)
elif state == "JOB_STATE_FAILED":
raise RuntimeError(f"バッチジョブ失敗: {job.error}")
elif state == "JOB_STATE_CANCELLED":
raise RuntimeError("バッチジョブがキャンセルされました")
else:
total = job.total_count or 0
succeeded = job.succeeded_count or 0
progress = (succeeded / total * 100) if total > 0 else 0
print(f"処理中... {succeeded}/{total} ({progress:.1f}%)")
time.sleep(poll_interval)
def collect_results(job) -> dict:
"""
完了したバッチジョブから結果を収集する。
"""
results = {}
for response in job.responses:
custom_id = response.custom_id
if response.response:
text = response.response.candidates[0].content.parts[0].text
results[custom_id] = {
"status": "success",
"text": text,
"usage": {
"input_tokens": response.response.usage_metadata.prompt_token_count,
"output_tokens": response.response.usage_metadata.candidates_token_count,
},
}
else:
results[custom_id] = {
"status": "error",
"error": str(response.error),
}
return results
実践例:大量テキストの感情分析
def batch_sentiment_analysis(texts: list[dict]) -> dict:
"""
大量のテキストに対して感情分析をバッチ実行する。
texts: [{"id": "review_001", "text": "この製品は素晴らしい..."}, ...]
"""
requests = []
for item in texts:
prompt = f"""以下のテキストの感情を分析し、JSON形式で回答してください。
テキスト: {item["text"]}
出力形式:
{{"sentiment": "positive|negative|neutral", "confidence": 0.0-1.0, "summary": "一行要約"}}"""
requests.append({"id": item["id"], "prompt": prompt})
# 1バッチあたり最大100件に分割
BATCH_SIZE = 100
all_results = {}
for i in range(0, len(requests), BATCH_SIZE):
chunk = requests[i : i + BATCH_SIZE]
job_name = create_batch_job(chunk)
results = wait_for_batch_completion(job_name)
all_results.update(results)
print(f"バッチ {i // BATCH_SIZE + 1} 完了: {len(chunk)}件処理")
return all_results
TypeScript でのバッチ処理実装
import { GoogleGenAI } from "@google/genai";
const ai = new GoogleGenAI({ apiKey: process.env.GEMINI_API_KEY! });
interface BatchRequest {
id: string;
prompt: string;
}
interface BatchResult {
status: "success" | "error";
text?: string;
error?: string;
usage?: { inputTokens: number; outputTokens: number };
}
async function createAndRunBatch(
requests: BatchRequest[],
model = "gemini-2.5-flash"
): Promise<Map<string, BatchResult>> {
// バッチジョブの作成
const batchRequests = requests.map((req) => ({
customId: req.id,
request: {
model: `models/${model}`,
contents: [{ parts: [{ text: req.prompt }] }],
generationConfig: {
temperature: 0.3,
maxOutputTokens: 1024,
},
},
}));
const batchJob = await ai.batches.create({
model: `models/${model}`,
requests: batchRequests,
displayName: "ts-batch-job",
});
console.log(`バッチジョブ作成: ${batchJob.name}`);
// ポーリングで完了を待つ
const results = new Map<string, BatchResult>();
let job = batchJob;
while (true) {
job = await ai.batches.get({ name: job.name! });
if (job.state === "JOB_STATE_SUCCEEDED") {
for (const resp of job.responses ?? []) {
if (resp.response) {
results.set(resp.customId!, {
status: "success",
text: resp.response.candidates?.[0]?.content?.parts?.[0]?.text,
usage: {
inputTokens: resp.response.usageMetadata?.promptTokenCount ?? 0,
outputTokens:
resp.response.usageMetadata?.candidatesTokenCount ?? 0,
},
});
} else {
results.set(resp.customId!, {
status: "error",
error: String(resp.error),
});
}
}
break;
}
if (job.state === "JOB_STATE_FAILED" || job.state === "JOB_STATE_CANCELLED") {
throw new Error(`Batch job ${job.state}: ${job.error}`);
}
console.log(`処理中... ${job.succeededCount ?? 0}/${job.totalCount ?? 0}`);
await new Promise((r) => setTimeout(r, 30_000));
}
return results;
}
マルチモーダルバッチ処理
Batch Processing APIはテキストだけでなく、画像や動画などのマルチモーダル入力にも対応しています。
import base64
from pathlib import Path
def create_multimodal_batch(image_tasks: list[dict]) -> str:
"""
画像を含むマルチモーダルバッチジョブを作成する。
image_tasks: [{"id": "img_001", "image_uri": "gs://bucket/img.jpg", "prompt": "画像を説明して"}]
"""
batch_requests = []
for task in image_tasks:
parts = []
# Cloud Storage のURIを使用
if task["image_uri"].startswith("gs://"):
parts.append(
genai.types.Part(
file_data=genai.types.FileData(
file_uri=task["image_uri"],
mime_type="image/jpeg",
)
)
)
# ローカルファイルをBase64エンコード
else:
image_bytes = Path(task["image_uri"]).read_bytes()
parts.append(
genai.types.Part(
inline_data=genai.types.Blob(
data=base64.b64encode(image_bytes).decode(),
mime_type="image/jpeg",
)
)
)
parts.append(genai.types.Part(text=task["prompt"]))
batch_requests.append(
genai.types.BatchRequest(
custom_id=task["id"],
request=genai.types.GenerateContentRequest(
model="models/gemini-2.5-flash",
contents=[genai.types.Content(parts=parts)],
),
)
)
batch_job = genai.batches.create(
model="models/gemini-2.5-flash",
requests=batch_requests,
)
return batch_job.name
⚠️**注意**: マルチモーダルバッチ処理では、Files APIでアップロードしたファイルのURIまたはCloud StorageのURIを使用してください。Base64エンコードはリクエストサイズが増大するため、大量処理には向きません。
エラーハンドリングとリトライ戦略
本番環境では、個別リクエストの失敗に対する堅牢なエラーハンドリングが不可欠です。
from dataclasses import dataclass, field
from datetime import datetime
@dataclass
class BatchJobTracker:
"""バッチジョブの実行を追跡し、失敗したリクエストのリトライを管理する"""
max_retries: int = 3
failed_requests: list = field(default_factory=list)
retry_count: dict = field(default_factory=dict)
def process_results(self, results: dict, original_requests: list[dict]):
"""結果を処理し、失敗したリクエストを記録する"""
for req in original_requests:
rid = req["id"]
result = results.get(rid, {})
if result.get("status") == "error":
current_retries = self.retry_count.get(rid, 0)
if current_retries < self.max_retries:
self.retry_count[rid] = current_retries + 1
self.failed_requests.append(req)
print(f"リトライ対象: {rid} (試行 {current_retries + 1}/{self.max_retries})")
else:
print(f"リトライ上限到達: {rid}")
def get_retry_requests(self) -> list[dict]:
"""リトライ対象のリクエストを返す"""
requests = self.failed_requests.copy()
self.failed_requests.clear()
return requests
def robust_batch_processing(requests: list[dict], model: str = "gemini-2.5-flash") -> dict:
"""
リトライ機能付きのバッチ処理。
失敗したリクエストは最大3回までリトライする。
"""
tracker = BatchJobTracker(max_retries=3)
all_results = {}
pending = requests.copy()
round_num = 0
while pending:
round_num += 1
print(f"\n--- ラウンド {round_num}: {len(pending)}件処理 ---")
job_name = create_batch_job(pending, model=model)
results = wait_for_batch_completion(job_name)
# 成功した結果を保存
for rid, result in results.items():
if result["status"] == "success":
all_results[rid] = result
# 失敗したリクエストのリトライ判定
tracker.process_results(results, pending)
pending = tracker.get_retry_requests()
if pending:
print(f"リトライ対象: {len(pending)}件")
return all_results
コスト最適化のベストプラクティス
1. モデルの適切な選択
タスクの複雑さに応じてモデルを使い分けることで、さらなるコスト削減が可能です。
| タスク種別 | 推奨モデル | 理由 |
|----------|----------|------|
| テキスト分類・感情分析 | Gemini 2.5 Flash | 低コスト・高速・十分な精度 |
| 文書要約・翻訳 | Gemini 2.5 Flash | コストパフォーマンスが最良 |
| 複雑な推論・コード生成 | Gemini 2.5 Pro | 精度が重要なタスク |
| 画像分析・キャプション | Gemini 2.5 Flash | マルチモーダル対応・低コスト |
2. プロンプトの最適化
バッチ処理ではリクエスト数が多いため、プロンプトの最適化がコストに直結します。
# ❌ 冗長なプロンプト(不要なトークン消費)
bad_prompt = """
あなたはプロの感情分析の専門家です。
以下のテキストについて、非常に詳細な感情分析を行ってください。
まず、テキスト全体のトーンを評価し、次に各文の感情を分析し、
最後に総合的な感情スコアを算出してください。
出力は必ずJSON形式にしてください。
テキスト: {text}
"""
# ✅ 簡潔なプロンプト(同等の精度でトークン節約)
good_prompt = """テキストの感情をJSON出力: {{"sentiment":"positive|negative|neutral","confidence":0.0-1.0}}
テキスト: {text}"""
3. バッチサイズの最適化
大規模処理では、適切なバッチサイズの選択が処理効率に影響します。
def optimize_batch_size(total_requests: int) -> int:
"""
リクエスト数に基づいて最適なバッチサイズを返す。
"""
if total_requests <= 100:
return total_requests # 小規模:1バッチで処理
elif total_requests <= 1000:
return 100 # 中規模:100件ずつ
else:
return 500 # 大規模:500件ずつ(並列ジョブで処理)
本番環境での運用パターン
Webhook通知による非同期ワークフロー
ポーリングの代わりにWebhookを使用すれば、効率的な非同期ワークフローを構築できます。
from flask import Flask, request, jsonify
app = Flask(__name__)
@app.route("/webhook/batch-complete", methods=["POST"])
def batch_complete():
"""バッチジョブ完了時のWebhookエンドポイント"""
payload = request.json
job_name = payload["name"]
state = payload["state"]
if state == "JOB_STATE_SUCCEEDED":
# 結果を取得して後続処理を実行
job = genai.batches.get(name=job_name)
results = collect_results(job)
process_completed_results(results)
return jsonify({"status": "received"})
Cloud Schedulerとの連携
定期的なバッチ処理をCloud Schedulerと組み合わせることで、完全自動化が可能です。
from google.cloud import scheduler_v1
def schedule_daily_batch():
"""毎日深夜にバッチ処理を実行するスケジュールを作成"""
client = scheduler_v1.CloudSchedulerClient()
job = scheduler_v1.Job(
name="projects/my-project/locations/us-central1/jobs/daily-batch",
http_target=scheduler_v1.HttpTarget(
uri="https://my-api.run.app/trigger-batch",
http_method=scheduler_v1.HttpMethod.POST,
),
schedule="0 2 * * *", # 毎日午前2時
time_zone="Asia/Tokyo",
)
client.create_job(parent="projects/my-project/locations/us-central1", job=job)
同期APIとバッチAPIの使い分け判断フロー
最後に、同期APIとBatch Processing APIをどのように使い分けるべきかの判断基準をまとめます。
バッチ処理が最適なケース:
リアルタイム性が不要で、処理件数が50件以上あり、コスト最適化を重視する場面です。データパイプラインの定期実行、コンテンツの一括生成、大量データのラベリングなどが該当します。
同期APIが最適なケース:
ユーザーの入力に即座に応答する必要がある場面です。チャットボット、リアルタイム翻訳、インタラクティブな質問応答などが該当します。
ハイブリッドアプローチ: 多くの本番システムでは、ユーザー向けのリアルタイム機能には同期APIを、バックエンド処理にはBatch Processing APIを使い分けるハイブリッド構成が最適です。
個人開発者の視点から(実体験メモ)
ここまでの要点
Gemini Batch Processing APIは、大量のAIリクエストを効率的かつ低コストで処理するための必須ツールです。50%のコスト削減、レート制限の回避、そして堅牢なエラーハンドリングにより、本番環境でのAIワークロードを大幅にスケールアップできます。
このガイドで紹介したパターンを活用し、自社のユースケースに最適なバッチ処理パイプラインを構築してください。