From Document Processing to LLM Resilience: Patterns That Scale
From Document Processing to LLM Resilience: Patterns That Scale
The Document Extraction Pipeline taught me a lot about building resilient async systems. FastAPI + Celery + Redis handled OCR and LLM extraction at scale, with retries, job queues, and error handling. But when I started building real-time LLM APIs for the DeepAgent project, I realized document processing resilience wasn't enough.
The difference? In document processing, you're batch-oriented and can retry. In LLM APIs, you're real-time and can't afford to fail.
This post shows how to evolve your async patterns from "it eventually works" to "it never breaks user experience."
Case Study Foundation: Document Extraction Pipeline
Let's recap the resilience patterns already working in production:
# From Document Extraction Pipeline def extract_document_task(self, document_id: str, schema: str): """Celery task with built-in retry.""" try: document = fetch_document(document_id) ocr_result = run_mineru_ocr(document) # LLM extraction with provider retry for attempt in range(3): try: result = call_llm_provider(ocr_result, schema) return result except LLMProviderError as e: if attempt == 2: raise time.sleep(2 ** attempt) # Exponential backoff except Exception as exc: # Celery handles retry with countdown raise self.retry(exc=exc, countdown=60)
What this handles:
- ✅ Transient OCR failures
- ✅ LLM provider timeouts (3 attempts)
- ✅ Exponential backoff between retries
- ✅ Job persistence via Redis
What's missing for pure LLM APIs:
- ❌ Circuit breakers (keeps hitting failing provider)
- ❌ Multi-provider fallback (single point of failure)
- ❌ Real-time rate limiting (429 errors)
- ❌ Latency-aware routing
Extending to LLM-Specific Resilience
The Problem: 99.5% Uptime Isn't Enough
Cloud infrastructure promises 99.9% uptime. LLM providers run at 99-99.5%. That's 6-14x worse reliability—and when your entire product depends on LLM calls, that 0.5% downtime kills user trust.
Real scenario from DeepAgent:
- 2 PM: OpenAI API throws 503 errors
- 2:01 PM: Users see 30-second timeouts
- 2:05 PM: Queue backs up, Redis memory spikes
- 2:10 PM: Cascade failure, entire system down
Circuit breakers and multi-provider fallbacks prevent this.
The Resilience Stack
Four layers:
- Exponential backoff + jitter - Handle transient failures
- Circuit breaker - Stop hitting failing providers
- Dual rate limiting - Client-side + provider limits
- Multi-provider fallback - Route around outages
Implementing Exponential Backoff
Celery has built-in retry, but for real-time LLM calls, you need more control:
import random import time from functools import wraps from typing import Callable, TypeVar T = TypeVar('T') def exponential_backoff( max_retries: int = 3, base_delay: float = 1.0, max_delay: float = 60.0, exceptions: tuple = (Exception,) ): """Decorator for exponential backoff with jitter.""" def decorator(func: Callable[..., T]) -> Callable[..., T]: @wraps(func) def wrapper(*args, **kwargs) -> T: for attempt in range(max_retries + 1): try: return func(*args, **kwargs) except exceptions as e: if attempt == max_retries: raise # Exponential backoff + full jitter delay = min(base_delay * (2 ** attempt), max_delay) jitter = random.uniform(0, delay) # Log for monitoring print(f"Attempt {attempt + 1} failed: {e}. Retrying in {jitter:.2f}s...") time.sleep(jitter) raise RuntimeError("Unreachable") return wrapper return decorator # Usage for LLM calls @exponential_backoff( max_retries=3, base_delay=1.0, exceptions=(LLMProviderError, TimeoutError) ) def call_openai_with_retry(prompt: str) -> str: return openai_client.chat.completions.create( model="gpt-4", messages=[{"role": "user", "content": prompt}] )
Handling different error codes:
def classify_error(error: Exception) -> str: """Classify errors for appropriate handling.""" if isinstance(error, RateLimitError): # 429 return "rate_limited" # Longer backoff elif isinstance(error, ServiceUnavailableError): # 503 return "transient" # Standard retry elif isinstance(error, AuthenticationError): # 401 return "fatal" # Don't retry elif isinstance(error, BadRequestError): # 400 return "client_error" # Don't retry else: return "unknown" # Conservative retry
Circuit Breaker Pattern for LLMs
When an LLM provider is down, stop trying. The circuit breaker has three states:
Implementation:
import time from enum import Enum from dataclasses import dataclass from typing import Optional class CircuitState(Enum): CLOSED = "closed" # Normal operation OPEN = "open" # Failing, reject fast HALF_OPEN = "half_open" # Testing recovery @dataclass class CircuitBreaker: failure_threshold: int = 5 recovery_timeout: float = 60.0 half_open_max_calls: int = 3 def __post_init__(self): self.state = CircuitState.CLOSED self.failure_count = 0 self.last_failure_time: Optional[float] = None self.half_open_calls = 0 def call(self, func, *args, **kwargs): """Execute function with circuit breaker protection.""" if self.state == CircuitState.OPEN: if time.time() - self.last_failure_time > self.recovery_timeout: self.state = CircuitState.HALF_OPEN self.half_open_calls = 0 else: raise CircuitBreakerOpenError("Circuit breaker is OPEN") try: result = func(*args, **kwargs) self._on_success() return result except Exception as e: self._on_failure() raise def _on_success(self): if self.state == CircuitState.HALF_OPEN: self.half_open_calls += 1 if self.half_open_calls >= self.half_open_max_calls: self.state = CircuitState.CLOSED self.failure_count = 0 else: self.failure_count = 0 def _on_failure(self): self.failure_count += 1 self.last_failure_time = time.time() if self.state == CircuitState.HALF_OPEN: self.state = CircuitState.OPEN elif self.failure_count >= self.failure_threshold: self.state = CircuitState.OPEN # FastAPI middleware integration circuit_breakers = { "openai": CircuitBreaker(failure_threshold=5, recovery_timeout=60), "anthropic": CircuitBreaker(failure_threshold=5, recovery_timeout=60), "ollama": CircuitBreaker(failure_threshold=3, recovery_timeout=30), } def call_with_circuit_breaker(provider: str, func, *args, **kwargs): breaker = circuit_breakers.get(provider) if not breaker: return func(*args, **kwargs) return breaker.call(func, *args, **kwargs)
Multi-Provider Fallback Strategy
When OpenAI fails, try Claude. When Claude fails, use local Ollama.
Implementation:
from dataclasses import dataclass from typing import List, Optional, Callable import time @dataclass class ProviderConfig: name: str client: Callable priority: int # Lower = higher priority cost_per_1k: float max_latency: float # Max acceptable latency class MultiProviderRouter: def __init__(self, providers: List[ProviderConfig]): self.providers = sorted(providers, key=lambda p: p.priority) self.latency_tracker = {} async def generate_with_fallback(self, prompt: str) -> str: """Try providers in priority order with circuit breaker awareness.""" for provider in self.providers: # Skip if circuit breaker is open breaker = circuit_breakers.get(provider.name) if breaker and breaker.state == CircuitState.OPEN: continue # Check latency history avg_latency = self.latency_tracker.get(provider.name, 0) if avg_latency > provider.max_latency: continue start = time.time() try: result = await call_with_circuit_breaker( provider.name, provider.client, prompt ) # Track latency self.latency_tracker[provider.name] = time.time() - start return { "result": result, "provider": provider.name, "latency": self.latency_tracker[provider.name] } except Exception as e: print(f"Provider {provider.name} failed: {e}") continue raise AllProvidersFailedError("All LLM providers unavailable") # Configuration providers = [ ProviderConfig("openai", openai_client, priority=1, cost_per_1k=0.03, max_latency=5.0), ProviderConfig("anthropic", anthropic_client, priority=2, cost_per_1k=0.03, max_latency=5.0), ProviderConfig("ollama", ollama_client, priority=3, cost_per_1k=0, max_latency=10.0), ] router = MultiProviderRouter(providers)
Cost implications:
- Fallback to Claude: Same cost tier (~$0.03/1K tokens)
- Fallback to Ollama: $0 but slower (local GPU)
- Always monitor: Set alerts if >10% traffic hitting fallback
Token Bucket Rate Limiting
Prevent 429 errors by enforcing client-side limits.
Redis-based implementation (leveraging existing Redis from Document Pipeline):
import time from dataclasses import dataclass @dataclass class RateLimitConfig: requests_per_minute: int = 60 burst_size: int = 10 # Allow burst of 10 class TokenBucketRateLimiter: def __init__(self, redis_client, config: RateLimitConfig): self.redis = redis_client self.config = config async def is_allowed(self, key: str) -> bool: """Check if request is allowed under rate limit.""" now = time.time() bucket_key = f"rate_limit:{key}" # Lua script for atomic operation lua_script = """ local key = KEYS[1] local now = tonumber(ARGV[1]) local window = tonumber(ARGV[2]) local limit = tonumber(ARGV[3]) local burst = tonumber(ARGV[4]) -- Get current tokens and last update local data = redis.call('HMGET', key, 'tokens', 'last_update') local tokens = tonumber(data[1]) or burst local last_update = tonumber(data[2]) or now -- Add tokens based on time passed local time_passed = now - last_update local tokens_to_add = time_passed * (limit / window) tokens = math.min(burst, tokens + tokens_to_add) -- Check if request allowed if tokens >= 1 then tokens = tokens - 1 redis.call('HMSET', key, 'tokens', tokens, 'last_update', now) redis.call('EXPIRE', key, window) return 1 else redis.call('HSET', key, 'last_update', now) redis.call('EXPIRE', key, window) return 0 end """ allowed = await self.redis.eval( lua_script, 1, # num keys bucket_key, now, 60, # window in seconds self.config.requests_per_minute, self.config.burst_size ) return bool(allowed) # Per-user rate limiting in FastAPI @app.post("/generate") async def generate( request: Request, body: GenerateRequest, limiter: TokenBucketRateLimiter = Depends(get_limiter) ): user_id = get_user_id_from_token(request) if not await limiter.is_allowed(f"user:{user_id}"): raise HTTPException( status_code=429, detail="Rate limit exceeded. Try again in 60 seconds." ) # Process request...
Production Checklist
Before shipping resilient LLM APIs:
- Exponential backoff on all LLM calls
- Circuit breaker per provider (5 errors → 60s cooldown)
- Multi-provider fallback (minimum 2 providers)
- Client-side rate limiting (token bucket)
- Latency-aware routing (track p50/p99 per provider)
- Dead letter queue for failed requests
- Monitoring: circuit breaker state, fallback rate, latency
- Alerts: >5% fallback traffic, circuit breaker open >5min
Lessons from Document Extraction Pipeline Scaling
What translated directly:
- Celery + Redis for job queuing
- Retry with exponential backoff
- Async/await patterns
- Error classification
What needed adaptation:
- Circuit breakers (not needed for batch processing)
- Multi-provider (document pipeline used single LLM provider)
- Real-time rate limiting (batch has natural backpressure)
- Streaming responses (documents are processed whole)
Key insight: Start with Document Extraction Pipeline's patterns, add circuit breakers at month 3, multi-provider fallback at month 6 when you have SLA commitments.
Monitoring Dashboard
Track these metrics in production:
# Prometheus metrics CIRCUIT_BREAKER_STATE = Gauge('circuit_breaker_state', 'Circuit state', ['provider']) FALLBACK_RATE = Counter('fallback_total', 'Fallback events', ['from_provider', 'to_provider']) LATENCY_BY_PROVIDER = Histogram('llm_latency_seconds', 'Latency by provider', ['provider']) RATE_LIMIT_HITS = Counter('rate_limit_hits_total', 'Rate limit hits', ['user_type'])
Alert thresholds:
- Circuit breaker open > 5 minutes
- Fallback rate > 10% of traffic
- P99 latency > 10 seconds
- Error rate > 1%
Next Steps
This resilience layer sits on top of the architecture from the previous post. In the next post, I'll cover memory management—how to handle context windows in multi-turn conversations without blowing up token costs.
Code examples: GitHub repository (DeepAgent implementation)
Related: