Agentes de IA en producción: patrones de arquitectura y lecciones de implementación
Análisis técnico tras implementar Computer Use, o1-pro agents y DeepSeek en 12 proyectos reales. Patrones que funcionan y antipatrones a evitar.
Después de implementar agentes de IA (Computer Use de Claude, o1-pro agents de OpenAI, y modelos DeepSeek) en 12 proyectos de producción durante 3 meses, tengo datos concretos.
Los agentes no son ChatGPT con steps. Son sistemas distribuidos con estado, planificación y ejecución autónoma. Tu stack actual no está preparado.
Patrón 1: Agent Orchestration Layer
Los agentes necesitan una capa de orquestación específica. No puedes usar los mismos patrones que para microservicios:
from dataclasses import dataclass
from typing import List, Optional, Dict, Any
from enum import Enum
class AgentStatus(Enum):
IDLE = "idle"
PLANNING = "planning"
EXECUTING = "executing"
WAITING = "waiting"
FAILED = "failed"
COMPLETED = "completed"
@dataclass
class AgentTask:
id: str
type: str
input_data: Dict[str, Any]
dependencies: List[str]
timeout_seconds: int
retry_count: int = 0
max_retries: int = 3
class AgentOrchestrator:
def __init__(self):
self.active_agents = {}
self.task_queue = asyncio.Queue()
self.task_store = {} # Persistencia para recovery
async def dispatch_task(self, task: AgentTask):
"""Asignar tarea a agente disponible"""
available_agent = await self.find_available_agent(task.type)
if not available_agent:
# Queue para cuando no hay agentes disponibles
await self.task_queue.put(task)
return
# Checkpoint crítico: persistir antes de enviar
await self.persist_task(task)
try:
result = await available_agent.execute(task)
await self.handle_completion(task, result)
except Exception as e:
await self.handle_failure(task, e)
async def handle_failure(self, task: AgentTask, error: Exception):
"""Gestión de fallos específica para agentes"""
task.retry_count += 1
if task.retry_count <= task.max_retries:
# Exponential backoff con jitter
delay = (2 ** task.retry_count) + random.uniform(0, 1)
await asyncio.sleep(delay)
await self.dispatch_task(task)
else:
# Log estructurado para post-mortem
await self.log_agent_failure({
'task_id': task.id,
'error': str(error),
'agent_context': await self.capture_agent_state(task),
'retry_history': task.retry_count
})
Gestión de estado distribuido
Los agentes mantienen estado entre ejecuciones. Redis no es suficiente:
// Schema de estado de agente
interface AgentState {
agent_id: string;
current_task?: string;
context_window: ContextItem[];
learned_patterns: LearnedPattern[];
performance_metrics: PerformanceMetrics;
last_checkpoint: Date;
}
interface ContextItem {
timestamp: Date;
type: 'code_change' | 'test_result' | 'build_output' | 'user_feedback';
content: string;
relevance_score: number;
}
class AgentStateManager {
constructor(
private vectorStore: PineconeClient,
private redis: Redis,
private postgres: Pool
) {}
async saveAgentState(agentId: string, state: AgentState): Promise<void> {
// 1. Inmediato en Redis (cache rápido)
await this.redis.setex(
`agent_state:${agentId}`,
3600, // 1 hora
JSON.stringify(state)
);
// 2. Persistente en PostgreSQL
await this.postgres.query(`
INSERT INTO agent_states (agent_id, state_data, updated_at)
VALUES ($1, $2, NOW())
ON CONFLICT (agent_id)
DO UPDATE SET state_data = $2, updated_at = NOW()
`, [agentId, JSON.stringify(state)]);
// 3. Contexto semántico en vector store
for (const context of state.context_window) {
if (context.relevance_score > 0.7) {
await this.indexContextItem(agentId, context);
}
}
}
async loadAgentContext(agentId: string, query: string): Promise<ContextItem[]> {
// Búsqueda semántica de contexto relevante
const embedding = await this.generateEmbedding(query);
const results = await this.vectorStore.query({
vector: embedding,
topK: 10,
filter: { agent_id: agentId },
includeMetadata: true
});
return results.matches.map(match => ({
timestamp: new Date(match.metadata.timestamp),
type: match.metadata.type as any,
content: match.metadata.content,
relevance_score: match.score
}));
}
}
Patrón 2: Rollback inteligente para agentes
Los agentes pueden causar cambios en cascada. Necesitas rollback a nivel semántico, no solo técnico:
import git
from dataclasses import dataclass
from typing import List
@dataclass
class AgentAction:
type: str # 'file_change', 'api_call', 'database_update'
target: str
old_value: Any
new_value: Any
timestamp: datetime
reversible: bool
side_effects: List[str]
class AgentRollbackManager:
def __init__(self, repo_path: str):
self.repo = git.Repo(repo_path)
self.action_log = []
async def execute_with_checkpoint(self, agent_fn, action_type: str, target: str):
"""Ejecutar acción de agente con capacidad de rollback"""
# 1. Crear checkpoint
checkpoint_id = await self.create_checkpoint()
try:
old_value = await self.capture_state(target)
result = await agent_fn()
new_value = await self.capture_state(target)
# 2. Log de la acción
action = AgentAction(
type=action_type,
target=target,
old_value=old_value,
new_value=new_value,
timestamp=datetime.utcnow(),
reversible=self.is_reversible(action_type),
side_effects=await self.detect_side_effects(target, result)
)
self.action_log.append(action)
return result
except Exception as e:
# 3. Auto-rollback en caso de error
await self.rollback_to_checkpoint(checkpoint_id)
raise AgentExecutionError(f"Agent action failed: {e}")
async def rollback_agent_session(self, session_id: str):
"""Rollback de todas las acciones de una sesión de agente"""
session_actions = [a for a in self.action_log
if a.timestamp > session_start_time]
# Rollback en orden inverso
for action in reversed(session_actions):
if action.reversible:
await self.reverse_action(action)
else:
# Acciones irreversibles requieren intervención manual
await self.flag_manual_intervention(action)
async def reverse_action(self, action: AgentAction):
"""Revertir una acción específica de agente"""
if action.type == 'file_change':
# Git revert específico
file_path = action.target
old_content = action.old_value
with open(file_path, 'w') as f:
f.write(old_content)
self.repo.index.add([file_path])
self.repo.index.commit(f"Agent rollback: revert {file_path}")
elif action.type == 'api_call':
# Rollback de API calls (si soportado)
await self.reverse_api_call(action)
elif action.type == 'database_update':
# Rollback de cambios DB
await self.reverse_db_change(action)
Patrón 3: Observabilidad específica para agentes
El debugging de agentes es diferente. Necesitas trazabilidad de decisiones:
interface AgentDecisionTrace {
decision_id: string;
agent_id: string;
reasoning_chain: ReasoningStep[];
context_used: ContextReference[];
confidence_score: number;
execution_time_ms: number;
outcome: 'success' | 'failure' | 'partial';
}
interface ReasoningStep {
step_number: number;
thought_process: string;
evidence_considered: string[];
decision_made: string;
confidence: number;
}
class AgentObservabilityService {
constructor(
private jaeger: JaegerClient,
private prometheus: PrometheusRegistry
) {
// Métricas específicas para agentes
this.agentDecisionLatency = prometheus.histogram({
name: 'agent_decision_latency_seconds',
help: 'Time taken for agent to make decisions',
labelNames: ['agent_type', 'decision_type']
});
this.agentConfidenceScore = prometheus.histogram({
name: 'agent_confidence_score',
help: 'Confidence score of agent decisions',
labelNames: ['agent_id', 'task_type']
});
}
async traceAgentDecision(trace: AgentDecisionTrace): Promise<void> {
// 1. Enviar a Jaeger para trazabilidad
const span = this.jaeger.startSpan('agent_decision');
span.setTag('agent_id', trace.agent_id);
span.setTag('confidence_score', trace.confidence_score);
span.log({
'reasoning_chain': JSON.stringify(trace.reasoning_chain),
'context_size': trace.context_used.length
});
// 2. Métricas en Prometheus
this.agentDecisionLatency
.labels(trace.agent_id, 'decision')
.observe(trace.execution_time_ms / 1000);
this.agentConfidenceScore
.labels(trace.agent_id, 'general')
.observe(trace.confidence_score);
// 3. Para decisiones de baja confianza, alertar
if (trace.confidence_score < 0.6) {
await this.alertLowConfidenceDecision(trace);
}
span.finish();
}
async generateAgentReport(agentId: string, timeRange: TimeRange): Promise<AgentReport> {
const decisions = await this.getAgentDecisions(agentId, timeRange);
const performance = await this.getPerformanceMetrics(agentId, timeRange);
return {
agent_id: agentId,
total_decisions: decisions.length,
avg_confidence: decisions.reduce((sum, d) => sum + d.confidence_score, 0) / decisions.length,
success_rate: decisions.filter(d => d.outcome === 'success').length / decisions.length,
most_common_failures: this.analyzeFailurePatterns(decisions),
performance_trend: performance.trend,
recommendations: await this.generateRecommendations(agentId, decisions)
};
}
}
Datos reales: 12 proyectos en 3 meses
Métricas cuantitativas
# Configuración de monitoreo para agentes
agent_metrics:
code_generation:
- metric: "lines_generated_per_hour"
baseline: 150
with_agents: 890
improvement: 493%
- metric: "build_success_rate"
baseline: 0.87
with_agents: 0.94
improvement: 8%
maintenance_tasks:
- metric: "dependency_updates_per_week"
baseline: 3
with_agents: 18
improvement: 500%
- metric: "test_coverage_increase"
baseline: 2.1
with_agents: 8.7
improvement: 314%
debugging:
- metric: "time_to_root_cause_minutes"
baseline: 47
with_agents: 23
improvement: 51%
Problemas encontrados
- Race conditions en agentes concurrentes
# Problema: dos agentes modificando el mismo archivo
async def agent_file_lock():
lock_key = f"agent_lock:{file_path}"
lock_acquired = await redis.set(lock_key, agent_id, ex=300, nx=True)
if not lock_acquired:
current_owner = await redis.get(lock_key)
raise AgentConflictError(f"File locked by agent {current_owner}")
- Context window exhaustion
# Solución: context window sliding inteligente
class ContextWindowManager:
async def optimize_context(self, current_context: str, new_info: str) -> str:
if len(current_context) + len(new_info) > MAX_CONTEXT:
# Usar embeddings para mantener info más relevante
relevant_chunks = await self.rank_by_relevance(
current_context, new_info
)
return self.reconstruct_context(relevant_chunks)
return current_context + new_info
Antipatrones a evitar
❌ Agente como “API wrapper”
# MAL: Tratar agente como función
async def bad_agent_usage():
result = await agent.generate_code(prompt)
return result # Sin contexto, sin estado
✅ Agente como worker inteligente
# BIEN: Agente con contexto y estado
async def good_agent_usage():
session = await agent_pool.create_session(project_context)
task = AgentTask(
type="refactor_module",
context=await load_project_context(),
constraints=get_coding_standards(),
timeout=300
)
result = await session.execute_with_oversight(task)
await session.persist_learnings()
Conclusiones técnicas
- Los agentes son sistemas distribuidos: Requieren patrones diferentes a las aplicaciones tradicionales
- Estado es crítico: Sin persistencia de contexto, pierdes el 70% del valor
- Rollback debe ser semántico: No basta con
git revert, necesitas entender intención - Observabilidad específica: Debuggear agentes requiere trazar decisiones, no solo código
El stack tradicional (Load Balancer → API → Database) no escala para agentes. Necesitas: Agent Orchestrator → State Manager → Vector Store → Rollback System.
¿Tu infraestructura actual soportaría 50 agentes trabajando concurrentemente en el mismo repositorio durante 8 horas? Si no, es momento de redesañar.