Gemini APIを「動かし続ける」難しさ
Gemini APIを使ったプロトタイプを作るのは簡単です。でも、それを「毎日決まった時刻に、複数ステップを経て、エラーが起きても自動回復しながら、コストを管理しながら動かし続ける」ことは、まったく別の話です。
私が最初に直面した壁は、長い処理の途中でAPIタイムアウトが発生したときでした。5つのステップを順番に実行するスクリプトを書いて、3ステップ目でエラーになったとき、最初からやり直すのか、3ステップ目からやり直すのか——単純なPythonスクリプトでは、この判断を自前で実装しなければなりません。
Google Cloud Workflows は、この問題をエレガントに解決してくれます。ステップベースのオーケストレーションサービスで、各ステップの状態管理・リトライ・条件分岐をYAMLで宣言的に定義できます。Gemini APIとの組み合わせで、堅牢な本番AIパイプラインを構築できます。
ここで扱うのは私が実際に本番運用しているパイプラインをベースに、Cloud Workflows × Gemini APIの実装を完全解説します。
Cloud Workflowsの概念を整理しつつ、なぜGemini APIに適しているか
Cloud WorkflowsはGCPのサーバーレスワークフローサービスです。HTTPベースのAPIを呼び出す処理を、ステップとして定義できます。特徴をまとめると次のとおりです。
- ステート管理が不要: 各ステップの実行結果は自動保存される。途中で失敗しても前のステップの結果を保持
- 組み込みリトライ:
retry ブロックで指数バックオフ付きの自動リトライを定義できる
- タイムアウト制御: ステップ単位・ワークフロー全体でタイムアウトを設定できる
- コスト: 実行ステップ数×料金(月5,000ステップ無料、以降$0.01/1,000ステップ)
Gemini APIとの相性が良い理由は、Gemini APIがHTTPエンドポイントを提供しており、Cloud WorkflowsのHTTPコールで直接呼び出せるからです。アプリケーションサーバーを介さず、Cloud Workflows → Gemini API の直接連携が可能です。
事前準備:GCPプロジェクトの設定
必要なAPIを有効化し、サービスアカウントを準備します。
# 必要なAPIを有効化
gcloud services enable workflows.googleapis.com
gcloud services enable cloudscheduler.googleapis.com
gcloud services enable aiplatform.googleapis.com
# ワークフロー実行用サービスアカウントを作成
gcloud iam service-accounts create gemini-workflow-runner \
--display-name="Gemini Workflow Runner"
SA_EMAIL="gemini-workflow-runner@YOUR_PROJECT_ID.iam.gserviceaccount.com"
# 必要な権限を付与
gcloud projects add-iam-policy-binding YOUR_PROJECT_ID \
--member="serviceAccount:${SA_EMAIL}" \
--role="roles/aiplatform.user"
gcloud projects add-iam-policy-binding YOUR_PROJECT_ID \
--member="serviceAccount:${SA_EMAIL}" \
--role="roles/workflows.invoker"
gcloud projects add-iam-policy-binding YOUR_PROJECT_ID \
--member="serviceAccount:${SA_EMAIL}" \
--role="roles/storage.objectViewer"
Gemini APIへの認証は、Vertex AI経由で行います。generativelanguage.googleapis.com の直接エンドポイントでもAPIキーで認証できますが、本番環境ではサービスアカウントを使うVertex AI経由が推奨です。
最初のCloud Workflow:Gemini APIをYAMLから呼ぶ
まずシンプルな例から始めましょう。テキストをGemini 2.5 Proに送り、要約を返すワークフローです。
# workflow-gemini-summarize.yaml
main:
params: [args]
steps:
- init:
assign:
- project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
- location: "us-central1"
- model: "gemini-2.5-pro"
- input_text: ${args.text}
- call_gemini_api:
try:
call: http.post
args:
url: ${"https://" + location + "-aiplatform.googleapis.com/v1/projects/" + project_id + "/locations/" + location + "/publishers/google/models/" + model + ":generateContent"}
auth:
type: OAuth2
headers:
Content-Type: "application/json"
body:
contents:
- role: "user"
parts:
- text: ${"以下のテキストを200文字以内で要約してください:\n\n" + input_text}
generationConfig:
maxOutputTokens: 500
temperature: 0.3
result: gemini_response
retry:
predicate: ${http_retry_predicate}
max_retries: 3
backoff:
initial_delay: 2
max_delay: 60
multiplier: 2
except:
as: e
steps:
- handle_api_error:
raise: ${e}
- extract_text:
assign:
- summary: ${gemini_response.body.candidates[0].content.parts[0].text}
- return_result:
return:
summary: ${summary}
input_tokens: ${gemini_response.body.usageMetadata.promptTokenCount}
output_tokens: ${gemini_response.body.usageMetadata.candidatesTokenCount}
# リトライ条件(429: レート制限、503: サービス一時停止)
http_retry_predicate:
params: [e]
steps:
- check_status:
switch:
- condition: ${e.code == 429 or e.code == 503}
return: true
return: false
このワークフローをデプロイして実行してみます。
# ワークフローをデプロイ
gcloud workflows deploy gemini-summarize \
--source=workflow-gemini-summarize.yaml \
--location=us-central1 \
--service-account="${SA_EMAIL}"
# 実行(テスト)
gcloud workflows run gemini-summarize \
--location=us-central1 \
--data='{"text": "Google Cloud Workflowsは、HTTPベースのサービスを呼び出す複数のステップを定義できるサーバーレスオーケストレーターです。各ステップの実行状態は自動保存されます。"}'
期待する出力:
{
"summary": "Google Cloud Workflowsは、HTTPサービスを呼び出す複数ステップを定義できるサーバーレスオーケストレーターで、各ステップの実行状態を自動保存します。",
"input_tokens": 87,
"output_tokens": 52
}
多段階パイプラインの設計:コンテンツ生成ワークフロー
実際の本番ユースケースとして、「記事テーマを受け取り → 見出し生成 → 各見出しの本文生成 → 全体の品質チェック → Cloud Storageに保存」という4ステップパイプラインを実装します。
# workflow-content-pipeline.yaml
main:
params: [args]
steps:
- init:
assign:
- project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
- location: "us-central1"
- model: "gemini-2.5-pro"
- topic: ${args.topic}
- target_length: ${default(map.get(args, "target_length"), 2000)}
- bucket: ${default(map.get(args, "output_bucket"), "gemini-workflow-outputs")}
- run_id: ${sys.now()}
# Step 1: 見出し構成を生成
- generate_outline:
call: googleapis.aiplatform.v1.projects.locations.publishers.models.generateContent
args:
model: ${"projects/" + project_id + "/locations/" + location + "/publishers/google/models/" + model}
body:
contents:
- role: "user"
parts:
- text: |
${topic}について、${target_length}文字程度の記事を書くための
見出し構成(H2を4〜6個)を作成してください。
JSON配列形式で返してください:
["見出し1", "見出し2", ...]
generationConfig:
responseMimeType: "application/json"
maxOutputTokens: 300
result: outline_response
- parse_outline:
assign:
- outline_json: ${outline_response.candidates[0].content.parts[0].text}
- headings: ${json.decode(outline_json)}
# Step 2: 各見出しの本文を並列生成
- generate_sections:
parallel:
for:
value: heading
in: ${headings}
steps:
- generate_section_content:
call: http.post
args:
url: ${"https://" + location + "-aiplatform.googleapis.com/v1/projects/" + project_id + "/locations/" + location + "/publishers/google/models/" + model + ":generateContent"}
auth:
type: OAuth2
headers:
Content-Type: "application/json"
body:
contents:
- role: "user"
parts:
- text: |
${topic}に関する記事の「${heading}」セクションを書いてください。
400〜600文字程度で、丁寧語(です/ます調)で書いてください。
generationConfig:
maxOutputTokens: 1000
temperature: 0.7
result: section_response
# Step 3: 品質チェック(ハルシネーション検出)
- quality_check:
call: http.post
args:
url: ${"https://" + location + "-aiplatform.googleapis.com/v1/projects/" + project_id + "/locations/" + location + "/publishers/google/models/" + model + ":generateContent"}
auth:
type: OAuth2
headers:
Content-Type: "application/json"
body:
contents:
- role: "user"
parts:
- text: |
以下のコンテンツに明らかな事実誤りや矛盾がないか確認してください。
問題があれば "ISSUES: [問題点]"、問題なければ "OK" と返してください。
トピック: ${topic}
generationConfig:
maxOutputTokens: 200
temperature: 0
result: quality_response
- check_quality_result:
assign:
- quality_text: ${quality_response.body.candidates[0].content.parts[0].text}
- validate_quality:
switch:
- condition: ${text.substring(quality_text, 0, 6) == "ISSUES"}
steps:
- quality_failed:
raise:
code: 400
message: ${"品質チェック失敗: " + quality_text}
# Step 4: Cloud Storageに保存
- save_to_storage:
call: http.post
args:
url: ${"https://storage.googleapis.com/upload/storage/v1/b/" + bucket + "/o?uploadType=media&name=articles/" + run_id + ".json"}
auth:
type: OAuth2
headers:
Content-Type: "application/json"
body:
topic: ${topic}
headings: ${headings}
quality_status: "OK"
generated_at: ${run_id}
result: storage_response
- return_success:
return:
status: "SUCCESS"
storage_path: ${"gs://" + bucket + "/articles/" + run_id + ".json"}
headings_count: ${list.length(headings)}
この多段階ワークフローで特に注目すべき点は、parallel ブロックです。各見出しのコンテンツ生成を並列実行することで、6つの見出しがある場合でも、逐次実行の1/6の時間で完了します。
よくある落とし穴と対処法
実際に運用していて詰まったポイントを正直に共有します。
落とし穴① : Vertex AI APIのレスポンス構造が想定外
Workflowsから googleapis.aiplatform.v1 のネイティブコネクタを使う場合と、http.post でREST APIを直接呼ぶ場合でレスポンス構造が異なります。
# ❌ ネイティブコネクタの場合
# result.candidates[0].content.parts[0].text
# ✅ http.post の場合
# result.body.candidates[0].content.parts[0].text
http.post 使用時は必ず .body を経由してアクセスしてください。私はこれで2時間ハマりました。
落とし穴② : 並列実行の結果収集
parallel ブロック内の結果をまとめて使いたい場合、変数のスコープに注意が必要です。並列ステップ内で代入した変数は、並列ブロック外では参照できません。結果を収集するには list.append を使います。
# 並列実行結果の正しい収集方法
- init_results:
assign:
- results: []
- parallel_generate:
parallel:
shared: [results] # ← shared を明示
for:
value: item
in: ${items}
steps:
- process:
call: http.post
# ...
result: item_result
- collect:
assign:
- results: ${list.concat(results, [item_result.body])}
落とし穴③ : 大きなJSONレスポンスのメモリ制限
Cloud Workflowsの変数サイズ制限は512KBです。Gemini APIが大きなコンテンツを返すと、ワークフロー変数に格納できずエラーになります。
対策として、大きなレスポンスはすぐにCloud Storageに書き出し、後続ステップではGCS URLのみを受け渡す設計にしてください。
# 大きなコンテンツは即座にGCSに保存
- save_large_content:
call: http.post
args:
url: ${"https://storage.googleapis.com/upload/storage/v1/b/" + bucket + "/o?uploadType=media&name=temp/" + item_id + ".txt"}
auth:
type: OAuth2
body: ${large_content}
result: gcs_result
# 後続ステップではURLのみ保持
- store_url_only:
assign:
- content_url: ${"gs://" + bucket + "/temp/" + item_id + ".txt"}
落とし穴④ : タイムアウト設定のデフォルト値
Cloud WorkflowsのHTTPコールのデフォルトタイムアウトは30秒です。Gemini 2.5 Proへの複雑なリクエストは30秒を超えることがあります。必ずタイムアウトを明示してください。
- call_gemini_with_timeout:
call: http.post
args:
url: ${gemini_endpoint}
timeout: 300 # ← 5分に設定(デフォルト30秒では不足することがある)
auth:
type: OAuth2
body:
# ...
Cloud Schedulerで定期実行する
ワークフローを毎日自動実行するCloud Schedulerの設定です。
# Cloud Schedulerジョブを作成(毎日午前6時JST)
gcloud scheduler jobs create http gemini-content-daily \
--location=asia-northeast1 \
--schedule="0 21 * * *" \
--uri="https://workflowexecutions.googleapis.com/v1/projects/YOUR_PROJECT_ID/locations/us-central1/workflows/content-pipeline/executions" \
--message-body='{"argument": "{\"topic\": \"Gemini APIの最新活用事例\", \"target_length\": 2000}"}' \
--oauth-service-account-email="${SA_EMAIL}" \
--oauth-token-scope="https://www.googleapis.com/auth/cloud-platform"
Cloud Schedulerは cron式 でスケジュールを定義します。UTC時刻で指定するため、JST (UTC+9) で動かしたい場合は9時間引いた時刻を設定してください。
Pythonクライアントからワークフローを実行する
APIサーバーやバッチジョブから動的にワークフローを起動したい場合、PythonのCloud Workflows SDKを使います。
import json
import time
from google.cloud import workflows_v1
from google.cloud.workflows import executions_v1
def run_gemini_workflow(
project_id: str,
location: str,
workflow_name: str,
topic: str,
target_length: int = 2000,
poll_interval: int = 10,
max_wait_seconds: int = 600
) -> dict:
"""
Cloud Workflowsを起動し、完了まで待機して結果を返す。
Args:
project_id: GCPプロジェクトID
location: ワークフローのリージョン(例: us-central1)
workflow_name: ワークフロー名
topic: コンテンツ生成のトピック
target_length: 目標文字数
poll_interval: ポーリング間隔(秒)
max_wait_seconds: 最大待機時間(秒)
Returns:
ワークフローの実行結果
Raises:
TimeoutError: max_wait_seconds を超えても完了しない場合
RuntimeError: ワークフローが失敗した場合
"""
# Executionsクライアントを初期化
exec_client = executions_v1.ExecutionsClient()
# ワークフローのリソース名を構築
parent = exec_client.workflow_path(project_id, location, workflow_name)
# 実行を開始
execution = exec_client.create_execution(
request=executions_v1.CreateExecutionRequest(
parent=parent,
execution=executions_v1.Execution(
argument=json.dumps({
"topic": topic,
"target_length": target_length,
"output_bucket": f"{project_id}-gemini-outputs"
})
)
)
)
execution_name = execution.name
print(f"ワークフロー実行開始: {execution_name}")
# 完了までポーリング
elapsed = 0
while elapsed < max_wait_seconds:
execution = exec_client.get_execution(
request=executions_v1.GetExecutionRequest(name=execution_name)
)
state = execution.state
if state == executions_v1.Execution.State.SUCCEEDED:
result = json.loads(execution.result)
print(f"✅ 完了: {result}")
return result
elif state == executions_v1.Execution.State.FAILED:
error = execution.error
raise RuntimeError(
f"ワークフロー失敗: {error.message} (コード: {error.http_status_code})"
)
elif state == executions_v1.Execution.State.CANCELLED:
raise RuntimeError("ワークフローがキャンセルされました")
# まだ実行中
print(f" 実行中... ({elapsed}秒経過)")
time.sleep(poll_interval)
elapsed += poll_interval
raise TimeoutError(
f"ワークフローが{max_wait_seconds}秒以内に完了しませんでした。"
f"実行ID: {execution_name}"
)
# 使用例
if __name__ == "__main__":
try:
result = run_gemini_workflow(
project_id="your-project-id",
location="us-central1",
workflow_name="content-pipeline",
topic="Gemini 2.5 Proの実践的な使い方",
target_length=2000
)
print(f"保存先: {result['storage_path']}")
except RuntimeError as e:
print(f"エラー: {e}")
except TimeoutError as e:
print(f"タイムアウト: {e}")
このクライアントコードで特に重要なのが、ポーリング方式の採用です。Cloud Workflowsの実行は非同期で行われるため、完了を待つにはポーリングが必要です。Webhookでの通知も設定できますが、シンプルなユースケースではポーリングの方が実装コストが低い傾向があります。
コスト制御:Cloud BudgetsとBilling Alertを設定する
Gemini APIの課金はトークン数に比例します。想定外のコスト発生を防ぐため、Cloud Budgetsでアラートを設定します。
from google.cloud import billing_budgets_v1
from google.protobuf import field_mask_pb2
def create_gemini_budget_alert(
billing_account_id: str,
project_id: str,
monthly_budget_usd: float,
alert_thresholds: list[float] = [0.5, 0.8, 1.0],
notification_email: str = None
) -> str:
"""
Gemini API利用のコスト予算アラートを設定する。
Args:
billing_account_id: 請求アカウントID(例: 012345-ABCDEF-789012)
project_id: 監視対象のプロジェクトID
monthly_budget_usd: 月間予算(USD)
alert_thresholds: アラートを発火する閾値(0.0〜1.0のリスト)
notification_email: 通知先メールアドレス
Returns:
作成された予算のリソース名
"""
client = billing_budgets_v1.BudgetServiceClient()
# 予算フィルター(Gemini APIの費用のみ対象)
budget_filter = billing_budgets_v1.Filter(
projects=[f"projects/{project_id}"],
services=["services/F3B6-3D8E-295F"], # AI Platform / Vertex AI のサービスID
credit_types_treatment=billing_budgets_v1.Filter.CreditTypesTreatment.EXCLUDE_ALL_CREDITS
)
# アラート閾値の設定
threshold_rules = [
billing_budgets_v1.ThresholdRule(
threshold_percent=threshold,
spend_basis=billing_budgets_v1.ThresholdRule.Basis.CURRENT_SPEND
)
for threshold in alert_thresholds
]
# 予算の作成
budget = billing_budgets_v1.Budget(
display_name=f"Gemini API Budget - {project_id}",
budget_filter=budget_filter,
amount=billing_budgets_v1.BudgetAmount(
specified_amount={
"currency_code": "USD",
"units": int(monthly_budget_usd),
"nanos": int((monthly_budget_usd % 1) * 1e9)
}
),
threshold_rules=threshold_rules
)
parent = f"billingAccounts/{billing_account_id}"
created_budget = client.create_budget(parent=parent, budget=budget)
print(f"予算アラート作成完了: {created_budget.name}")
print(f"月間上限: ${monthly_budget_usd} USD")
print(f"アラート閾値: {[f'{int(t*100)}%' for t in alert_thresholds]}")
return created_budget.name
# 設定例:月$50のアラートを50%・80%・100%で発火
budget_name = create_gemini_budget_alert(
billing_account_id="012345-ABCDEF-789012",
project_id="my-gemini-project",
monthly_budget_usd=50.0,
alert_thresholds=[0.5, 0.8, 1.0],
notification_email="admin@example.com"
)
予算アラートに加えて、ワークフローレベルでもトークン使用量を追跡することをおすすめします。各Gemini APIレスポンスの usageMetadata にはトークン数が含まれているため、これをCloud Monitoringのカスタムメトリクスとして送信すると、コストの予測精度が上がります。
本番運用のための追加設定
Cloud Monitoringでワークフロー監視ダッシュボードを作成する
# ワークフロー失敗率のアラートポリシーを作成
gcloud alpha monitoring policies create \
--policy-from-file=- << 'EOF'
{
"displayName": "Cloud Workflow Failure Alert",
"conditions": [
{
"displayName": "Workflow execution failure rate",
"conditionThreshold": {
"filter": "resource.type=\"workflows.googleapis.com/Workflow\" AND metric.type=\"workflows.googleapis.com/finished_execution_count\" AND metric.labels.status=\"FAILED\"",
"comparison": "COMPARISON_GT",
"thresholdValue": 0,
"duration": "60s",
"aggregations": [
{
"alignmentPeriod": "300s",
"perSeriesAligner": "ALIGN_SUM"
}
]
}
}
],
"alertStrategy": {
"notificationRateLimit": {
"period": "3600s"
}
}
}
EOF
実行履歴とコストの可視化
Cloud Workflowsの実行ログはCloud Loggingに自動保存されます。BigQueryに転送してコスト分析を行うことができます。
# ワークフロー実行ログをBigQueryでクエリする例
from google.cloud import bigquery
def analyze_workflow_costs(project_id: str, days: int = 30) -> None:
"""直近N日間のワークフロー実行コストを分析する"""
client = bigquery.Client()
query = f"""
SELECT
DATE(timestamp) as date,
COUNT(*) as total_executions,
COUNTIF(JSON_VALUE(jsonPayload.state) = 'SUCCEEDED') as succeeded,
COUNTIF(JSON_VALUE(jsonPayload.state) = 'FAILED') as failed,
-- Gemini APIトークン使用量(各実行のログから集計)
SUM(CAST(JSON_VALUE(jsonPayload.metadata.totalTokens) AS INT64)) as total_tokens
FROM `{project_id}.global._Default._AllLogs`
WHERE
resource.type = 'workflows.googleapis.com/Workflow'
AND timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL {days} DAY)
GROUP BY date
ORDER BY date DESC
"""
results = client.query(query)
print(f"直近{days}日間のワークフロー実行サマリー:")
print(f"{'日付':<12} {'実行数':>6} {'成功':>6} {'失敗':>6} {'総トークン':>12}")
print("-" * 50)
for row in results:
total_tokens = row.total_tokens or 0
# Gemini 2.5 Pro の料金(概算): $7/1M input tokens
estimated_cost = (total_tokens / 1_000_000) * 7
print(
f"{str(row.date):<12} "
f"{row.total_executions:>6} "
f"{row.succeeded:>6} "
f"{row.failed:>6} "
f"{total_tokens:>12,} "
f"(推定 ${estimated_cost:.4f})"
)
個人開発者の視点から(実体験メモ)
全体を振り返って:今日から始める第一歩
Cloud Workflows × Gemini APIの組み合わせで、最も手間がかかっていた「本番での安定稼働」問題が大幅に軽減されます。
まずは、この記事の最初のワークフロー(workflow-gemini-summarize.yaml)をそのままデプロイして、自分のプロジェクトで動かしてみてください。Cloud Workflowsの無料枠(月5,000ステップ)はテストとして十分すぎるほどです。
次のステップとして、Gemini APIとの本番連携については Gemini API Kubernetes マイクロサービス本番デプロイ や Gemini Cloud Run サーバーレス API 本番化 も参考になると思います。また、コスト管理の観点では Gemini API コスト最適化完全ガイド も合わせてお読みください。
Cloud Workflowsを使い始めてから、私のGemini APIパイプラインの安定性は劇的に改善しました。タイムアウトや途中失敗に振り回されていた時間を、もっと価値のある開発に使えるようになっています。ぜひ試してみてください。