●SIRI — WWDC 2026 confirms the revamped Siri runs on a Google Gemini model, though it won't ship in the EU at iOS 27 due to the DMA●FLASH3.5 — Gemini 3.5 Flash is now GA, the top Flash model for sustained frontier performance on agentic and coding tasks●IMAGE-GA — Gemini 3.1 Flash Image and 3.1 Pro Image are GA as native visual models; the preview versions shut down Jun 25●MANAGED-AGENTS — Managed Agents launch in public preview in the Gemini API, running autonomous agents in Google-hosted isolated Linux sandboxes●FILE-SEARCH — File Search now supports multimodal search, with native image embedding and retrieval via gemini-embedding-2●DEPRECATION — gemini-3.1-flash-image-preview and gemini-3-pro-image-preview shut down Jun 25 — migrate to the GA models soon●SIRI — WWDC 2026 confirms the revamped Siri runs on a Google Gemini model, though it won't ship in the EU at iOS 27 due to the DMA●FLASH3.5 — Gemini 3.5 Flash is now GA, the top Flash model for sustained frontier performance on agentic and coding tasks●IMAGE-GA — Gemini 3.1 Flash Image and 3.1 Pro Image are GA as native visual models; the preview versions shut down Jun 25●MANAGED-AGENTS — Managed Agents launch in public preview in the Gemini API, running autonomous agents in Google-hosted isolated Linux sandboxes●FILE-SEARCH — File Search now supports multimodal search, with native image embedding and retrieval via gemini-embedding-2●DEPRECATION — gemini-3.1-flash-image-preview and gemini-3-pro-image-preview shut down Jun 25 — migrate to the GA models soon
Gemini Batch Processing API Guide— Process Thousands of Requests at 50% Off
A comprehensive guide to Gemini's Batch Processing API. Learn how to process thousands of requests asynchronously, cut costs by 50%, and build production-grade batch pipelines with Python and TypeScript.
As AI applications scale in production, you'll inevitably encounter workloads that don't need real-time responses. Sentiment analysis across thousands of customer reviews, summarizing tens of thousands of documents, generating captions for massive image libraries — these large-scale asynchronous tasks are exactly what Gemini's Batch Processing API was built for.
With the Batch Processing API, you get a 50% cost reduction compared to synchronous API calls, freedom from rate limits, and results delivered within 24 hours. Your application can focus on other tasks while Google's infrastructure handles the heavy lifting.
Core Concepts
Why Batch Processing?
Synchronous API calls require waiting for each response before proceeding. At scale, this creates several problems:
Complex error handling: Failures must be caught and handled in real time
The Batch Processing API eliminates all of these constraints.
Pricing
The biggest advantage of batch processing is cost savings.
| Method | Input Cost | Output Cost | Notes |
|--------|-----------|-------------|-------|
| Synchronous (real-time) | Standard rate | Standard rate | Immediate response |
| Batch Processing | 50% of standard | 50% of standard | Response within 24 hours |
ℹ️
**Cost example**: Running 100,000 text classifications with Gemini 2.5 Flash (500 input tokens, 100 output tokens each) costs approximately $3.75 via the synchronous API but only $1.88 with batch processing.
Processing Flow
The Batch Processing API follows three simple steps:
Create a batch job: Bundle your requests into a single job
Retrieve results: Fetch all results once the job completes
✦
Thank you for reading this far.
Continue Reading
What follows includes implementation code, benchmarks, and practical content we hope you'll find useful. This site runs without ads — server and development costs are supported entirely by members like you. If it's been helpful, we'd be truly grateful for your support.
WHAT YOU'LL LEARN
✦Cost optimization for large-scale processing with Gemini Batch API
✦Reducing costs and processing time for batch operations
✦Reliability and monitoring in production environments
Secure payment via Stripe · Cancel anytime
Python Implementation
Creating a Batch Job
import google.generativeai as genaiimport jsonimport timegenai.configure(api_key="YOUR_API_KEY")def create_batch_job(requests: list[dict], model: str = "gemini-2.5-flash") -> str: """ Create a batch job and return the job name. Each request includes a custom ID and content. """ batch_requests = [] for req in requests: batch_requests.append( genai.types.BatchRequest( custom_id=req["id"], request=genai.types.GenerateContentRequest( model=f"models/{model}", contents=[ genai.types.Content( parts=[genai.types.Part(text=req["prompt"])] ) ], config=genai.types.GenerateContentConfig( temperature=0.3, max_output_tokens=1024, ), ), ) ) batch_job = genai.batches.create( model=f"models/{model}", requests=batch_requests, config=genai.types.CreateBatchJobConfig( display_name="my-batch-job", ), ) print(f"Batch job created: {batch_job.name}") print(f"State: {batch_job.state}") return batch_job.name
Monitoring and Retrieving Results
def wait_for_batch_completion(job_name: str, poll_interval: int = 30) -> dict: """ Poll until the batch job completes and return results. """ while True: job = genai.batches.get(name=job_name) state = job.state.name if state == "JOB_STATE_SUCCEEDED": print(f"Job complete: succeeded={job.succeeded_count}, failed={job.failed_count}") return collect_results(job) elif state == "JOB_STATE_FAILED": raise RuntimeError(f"Batch job failed: {job.error}") elif state == "JOB_STATE_CANCELLED": raise RuntimeError("Batch job was cancelled") else: total = job.total_count or 0 succeeded = job.succeeded_count or 0 progress = (succeeded / total * 100) if total > 0 else 0 print(f"Processing... {succeeded}/{total} ({progress:.1f}%)") time.sleep(poll_interval)def collect_results(job) -> dict: """ Collect results from a completed batch job. """ results = {} for response in job.responses: custom_id = response.custom_id if response.response: text = response.response.candidates[0].content.parts[0].text results[custom_id] = { "status": "success", "text": text, "usage": { "input_tokens": response.response.usage_metadata.prompt_token_count, "output_tokens": response.response.usage_metadata.candidates_token_count, }, } else: results[custom_id] = { "status": "error", "error": str(response.error), } return results
Practical Example: Batch Sentiment Analysis
def batch_sentiment_analysis(texts: list[dict]) -> dict: """ Run sentiment analysis on a large set of texts using batch processing. texts: [{"id": "review_001", "text": "This product is amazing..."}, ...] """ requests = [] for item in texts: prompt = f"""Analyze the sentiment of the following text and respond in JSON format.Text: {item["text"]}Output format:{{"sentiment": "positive|negative|neutral", "confidence": 0.0-1.0, "summary": "one-line summary"}}""" requests.append({"id": item["id"], "prompt": prompt}) # Split into batches of 100 BATCH_SIZE = 100 all_results = {} for i in range(0, len(requests), BATCH_SIZE): chunk = requests[i : i + BATCH_SIZE] job_name = create_batch_job(chunk) results = wait_for_batch_completion(job_name) all_results.update(results) print(f"Batch {i // BATCH_SIZE + 1} complete: {len(chunk)} items processed") return all_results
The Batch Processing API supports not just text but also images, videos, and other multimodal inputs.
import base64from pathlib import Pathdef create_multimodal_batch(image_tasks: list[dict]) -> str: """ Create a multimodal batch job with image inputs. image_tasks: [{"id": "img_001", "image_uri": "gs://bucket/img.jpg", "prompt": "Describe this image"}] """ batch_requests = [] for task in image_tasks: parts = [] # Use Cloud Storage URI if task["image_uri"].startswith("gs://"): parts.append( genai.types.Part( file_data=genai.types.FileData( file_uri=task["image_uri"], mime_type="image/jpeg", ) ) ) # Base64-encode local files else: image_bytes = Path(task["image_uri"]).read_bytes() parts.append( genai.types.Part( inline_data=genai.types.Blob( data=base64.b64encode(image_bytes).decode(), mime_type="image/jpeg", ) ) ) parts.append(genai.types.Part(text=task["prompt"])) batch_requests.append( genai.types.BatchRequest( custom_id=task["id"], request=genai.types.GenerateContentRequest( model="models/gemini-2.5-flash", contents=[genai.types.Content(parts=parts)], ), ) ) batch_job = genai.batches.create( model="models/gemini-2.5-flash", requests=batch_requests, ) return batch_job.name
⚠️
**Note**: For multimodal batch processing, prefer using Files API URIs or Cloud Storage URIs. Base64 encoding significantly increases request payload size and is not recommended for high-volume workloads.
Error Handling and Retry Strategies
Robust error handling is essential for production batch pipelines.
from dataclasses import dataclass, field@dataclassclass BatchJobTracker: """Track batch job execution and manage retries for failed requests""" max_retries: int = 3 failed_requests: list = field(default_factory=list) retry_count: dict = field(default_factory=dict) def process_results(self, results: dict, original_requests: list[dict]): """Process results and record failed requests for retry""" for req in original_requests: rid = req["id"] result = results.get(rid, {}) if result.get("status") == "error": current_retries = self.retry_count.get(rid, 0) if current_retries < self.max_retries: self.retry_count[rid] = current_retries + 1 self.failed_requests.append(req) print(f"Retry queued: {rid} (attempt {current_retries + 1}/{self.max_retries})") else: print(f"Max retries reached: {rid}") def get_retry_requests(self) -> list[dict]: """Return requests that need to be retried""" requests = self.failed_requests.copy() self.failed_requests.clear() return requestsdef robust_batch_processing( requests: list[dict], model: str = "gemini-2.5-flash") -> dict: """ Batch processing with automatic retries. Failed requests are retried up to 3 times. """ tracker = BatchJobTracker(max_retries=3) all_results = {} pending = requests.copy() round_num = 0 while pending: round_num += 1 print(f"\n--- Round {round_num}: processing {len(pending)} requests ---") job_name = create_batch_job(pending, model=model) results = wait_for_batch_completion(job_name) # Store successful results for rid, result in results.items(): if result["status"] == "success": all_results[rid] = result # Check for retries tracker.process_results(results, pending) pending = tracker.get_retry_requests() if pending: print(f"Retrying: {len(pending)} requests") return all_results
Cost Optimization Best Practices
1. Choose the Right Model
Match model capability to task complexity for additional savings.
| Task Type | Recommended Model | Rationale |
|-----------|------------------|-----------|
| Text classification / sentiment | Gemini 2.5 Flash | Low cost, fast, sufficient accuracy |
| Summarization / translation | Gemini 2.5 Flash | Best cost-performance ratio |
| Complex reasoning / code gen | Gemini 2.5 Pro | Accuracy-critical tasks |
| Image analysis / captioning | Gemini 2.5 Flash | Multimodal support at low cost |
2. Optimize Your Prompts
With thousands of requests, even small prompt optimizations compound into significant savings.
# ❌ Verbose prompt (wastes tokens)bad_prompt = """You are a professional sentiment analysis expert.Please perform a very detailed sentiment analysis of the following text.First, evaluate the overall tone of the text, then analyze the sentimentof each sentence, and finally calculate a comprehensive sentiment score.Please output in JSON format.Text: {text}"""# ✅ Concise prompt (same accuracy, fewer tokens)good_prompt = """Classify sentiment as JSON: {{"sentiment":"positive|negative|neutral","confidence":0.0-1.0}}Text: {text}"""
3. Optimize Batch Size
Choosing the right batch size affects processing efficiency.
def optimize_batch_size(total_requests: int) -> int: """ Return the optimal batch size based on total request count. """ if total_requests <= 100: return total_requests # Small: single batch elif total_requests <= 1000: return 100 # Medium: 100 per batch else: return 500 # Large: 500 per batch (parallel jobs)
Production Patterns
Webhook-Based Async Workflows
Replace polling with webhooks for efficient asynchronous workflows.
from flask import Flask, request, jsonifyapp = Flask(__name__)@app.route("/webhook/batch-complete", methods=["POST"])def batch_complete(): """Webhook endpoint for batch job completion""" payload = request.json job_name = payload["name"] state = payload["state"] if state == "JOB_STATE_SUCCEEDED": job = genai.batches.get(name=job_name) results = collect_results(job) process_completed_results(results) return jsonify({"status": "received"})
Integration with Cloud Scheduler
Combine batch processing with Cloud Scheduler for fully automated pipelines.
from google.cloud import scheduler_v1def schedule_daily_batch(): """Create a schedule to run batch processing every night""" client = scheduler_v1.CloudSchedulerClient() job = scheduler_v1.Job( name="projects/my-project/locations/us-central1/jobs/daily-batch", http_target=scheduler_v1.HttpTarget( uri="https://my-api.run.app/trigger-batch", http_method=scheduler_v1.HttpMethod.POST, ), schedule="0 2 * * *", # Daily at 2 AM time_zone="Asia/Tokyo", ) client.create_job( parent="projects/my-project/locations/us-central1", job=job )
When to Use Batch vs. Synchronous
Batch processing is ideal when: real-time responses aren't needed, you have 50+ requests, and cost optimization matters. Data pipeline jobs, bulk content generation, and large-scale data labeling are all prime candidates.
Synchronous API is ideal when: you need immediate responses to user input. Chatbots, real-time translation, and interactive Q&A fall into this category.
Hybrid approach: Most production systems benefit from using the synchronous API for user-facing real-time features and the Batch Processing API for background workloads. This hybrid architecture maximizes both responsiveness and cost efficiency.
A Note from an Indie Developer
Final Thoughts
The Gemini Batch Processing API is an essential tool for processing large AI workloads efficiently and affordably. With 50% cost savings, freedom from rate limits, and robust error handling, it enables you to scale your AI pipelines far beyond what synchronous processing allows.
Use the patterns in this guide to build a batch processing pipeline tailored to your specific use case, and watch your infrastructure costs drop while throughput scales up.
Share
Thank You for Reading
Gemini Lab is ad-free, supported entirely by members like you. We publish practical guides daily with implementation code, benchmarks, and production-ready patterns. If you've found it useful, we'd love to have you on board.