Introducción

Un pipeline de IA en producción no falla con un traceback tradicional: devuelve respuestas seguras para el usuario pero incorrectas en su contenido. Un RAG que «confía» en que su contexto es correcto puede recomendar inversiones basadas en datos inventados, mientras tus dashboards muestran métricas de salud en verde. La causa suele estar tres pasos atrás: en la recuperación de embeddings, en la fragmentación de documentos, o en un cambio mínimo en el prompt que desvía el contexto.

Estos errores probabilísticos no se resuelven con más logging ni con breakpoints en capas de wrapper. Requerirán trazas asincrónicas que capturen el payload completo del LLM, validación estricta de schemas con Pydantic, y evaluadores automáticos en tu pipeline de CI/CD. Este artículo te muestra cómo instrumentar todo eso en Python, conectarlo a AWS CloudWatch con OpenTelemetry, y evitar que un bug contextual llegue a manos de tus usuarios.

Qué es y para qué sirve

Los errores en sistemas de IA generativa ya no son fallos de lógica en el código, sino fallos en el entorno contextual que le das al modelo. Tres escenarios típicos:

  1. Bug de contexto: tu vector DB devuelve chunks irrelevantes o mal fragmentados, y el LLM alucina porque no tiene datos reales para responder.
  2. Bug de razonamiento: el LLM recibe chunks relevantes, pero los ignora, malinterpreta su formato, o sufre format drift (el schema esperado cambia entre versiones).
  3. Bug de inferencia: el modelo sigue el prompt, pero su temperatura o few-shot examples lo llevan a un estado de confabulación.

Para detectarlos en producción, necesitas:

  • Trazas asincrónicas que capten el payload completo del LLM (prompt, contexto, temperatura, modelo usado) sin bloquear el event loop.
  • Validación de schemas con Pydantic para asegurar que la salida del LLM cumpla con un contrato claro.
  • Evaluadores automáticos en CI/CD que usen un modelo pequeño (Claude 3 Haiku, GPT-4o-mini) para juzgar la calidad del output contra fuentes reales.

La combinación de estos tres elementos te permite observar el pipeline como un subsistema I/O externo, predecible y auditables, en lugar de tratarlo como una caja negra mágica.

Prerequisitos

Asegúrate de tener instaladas estas versiones exactas y permisos:

ComponenteVersión mínimaInstalación típica (Linux/macOS)
Python3.9+BLOCK5
OpenTelemetry SDK1.21.0BLOCK6
FastAPI0.109.0BLOCK7
Pydantic2.6.0BLOCK8
AWS CLI2.15.0BLOCK9
boto31.34.0BLOCK10
Docker24.0+BLOCK11
Permisos en AWS:
  • IAM role con CloudWatchFullAccess y AWSCloudMapFullAccess.
  • Si usas EKS: eks:DescribeCluster, logs:CreateLogGroup, logs:PutLogEvents.
Accesos necesarios:
  • Cuenta de AWS con SDK configurado (aws configure).
  • Token de Hugging Face o API key de tu proveedor de LLM (OpenAI, Anthropic, etc.).
  • Un vector DB accesible (FAISS en memoria, Pinecone, o embedding local con SentenceTransformers).

Guía paso a paso

1. Instrumentar un endpoint FastAPI con tracing asincrónico

Vamos a crear un endpoint que procese una consulta, recupere chunks de contexto, llame al LLM, y emita trazas a stdout en formato JSON estructurado (compatible con OpenTelemetry y CloudWatch).

Crea el archivo app.py:

# app.py
import asyncio
import json
import uuid
from contextlib import asynccontextmanager
from typing import Annotated, Any

from fastapi import FastAPI, Header
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from pydantic import BaseModel, Field

# 1. Configurar tracing asincrónico
trace.set_tracer_provider(TracerProvider())
trace.get_tracer_provider().add_span_processor(
    BatchSpanProcessor(ConsoleSpanExporter())  # Emite a stdout para CloudWatch
)
tracer = trace.get_tracer(__name__)

# 2. Definir schemas estrictos
class LLMRequest(BaseModel):
    query: str = Field(..., min_length=3, max_length=500)
    user_id: str = Field(..., pattern=r"^[a-zA-Z0-9-]{3,36}$")
    session_id: str = Field(default_factory=lambda: str(uuid.uuid4()))

class LLMResponse(BaseModel):
    answer: str = Field(..., min_length=10, max_length=5000)
    sources: list[str] = Field(..., min_items=1)
    model_used: str = Field(default="anthropic.claude-3-haiku")

# 3. Simular recuperación de contexto (en producción: FAISS, Pinecone, etc.)
async def retrieve_context(query: str) -> list[str]:
    await asyncio.sleep(0.1)  # Simular latencia de red
    return [
        f"Documento relevante sobre {query}: ...",
        f"Datos financieros actualizados para {query}: ..."
    ]

# 4. Wrapper asincrónico para llamar al LLM
async def call_llm(context: list[str], query: str) -> str:
    prompt = f"""
    CONTEXTO:
    {chr(10).join(context)}

    PREGUNTA: {query}

    RESPONDE SOLO CON EL CONTEXTO PROPORCIONADO. NO INVENTES DATOS.
    """
    # En producción: usar Anthropic API, SageMaker, etc.
    # Ejemplo con Anthropic (reemplazar con tu cliente real)
    return "Respuesta basada en el contexto proporcionado."

# 5. Endpoint principal con tracing
@asynccontextmanager
async def lifespan(app: FastAPI):
    FastAPIInstrumentor.instrument_app(app)
    RequestsInstrumentor().instrument()
    yield

app = FastAPI(lifespan=lifespan)

@app.post("/query")
async def query_llm(
    request: LLMRequest,
    x_request_id: Annotated[str, Header()] = ""
) -> LLMResponse:
    with tracer.start_as_current_span("llm_pipeline") as span:
        span.set_attribute("user.id", request.user_id)
        span.set_attribute("llm.session_id", request.session_id)
        span.set_attribute("llm.query_length", len(request.query))

        # Paso 1: Recuperar contexto (con tracing automático)
        context = await retrieve_context(request.query)
        span.set_attribute("llm.context_chunks", len(context))

        # Paso 2: Llamar al LLM (con tracing automático)
        answer = await call_llm(context, request.query)

        # Paso 3: Validar schema estricto
        response = LLMResponse(
            answer=answer,
            sources=context,
            model_used="anthropic.claude-3-haiku"
        )
        span.set_attribute("llm.output_length", len(response.answer))

        # Emisión explícita de traza (para CloudWatch)
        trace.get_current_span().add_event(
            "llm_response_validated",
            attributes={
                "llm.output_schema_valid": "true",
                "llm.output_length": len(response.answer)
            }
        )
        return response
Resultado esperado:
  • Al ejecutar uvicorn app:app --host 0.0.0.0 --port 8000, cada request emite JSON estructurado a stdout:
{
  "name": "llm_pipeline",
  "context": { /* ... */ },
  "events": [
    { "name": "llm_response_validated", "attributes": { "llm.output_schema_valid": "true" } }
  ]
}
  • CloudWatch (o Datadog) indexará estos eventos por llm.output_schema_valid y llm.query_length.

2. Conectar el tracing a AWS CloudWatch con OpenTelemetry Collector

Instala el Collector en tu cluster (EKS, ECS, o EC2):

# Descargar y configurar OpenTelemetry Collector
curl -L -o otel-collector.tar.gz https://github.com/open-telemetry/opentelemetry-collector-releases/releases/download/v0.88.0/otelcol-contrib_0.88.0_linux_amd64.tar.gz
tar -xzf otel-collector.tar.gz
cd otelcol-contrib_0.88.0_linux_amd64

# Configurar otel-config.yaml
cat > otel-config.yaml <<EOF
receivers:
  otlp:
    protocols:
      grpc:
      http:

processors:
  batch:

exporters:
  logging:
    loglevel: debug
  awscloudwatchlogs:
    log_group_name: "/aws/ai-pipeline/llm-traces"
    log_stream_name: "llm-traces-{uuid()}"
    region: "us-east-1"

service:
  pipelines:
    traces:
      receivers: [otlp]
      processors: [batch]
      exporters: [awscloudwatchlogs, logging]
EOF

# Ejecutar Collector (en producción: como DaemonSet en Kubernetes)
./otelcol-contrib --config=otel-config.yaml
Verificación:
  • En AWS CloudWatch → Log groups → /aws/ai-pipeline/llm-traces.
  • Busca logs con llm.output_schema_valid para confirmar que el schema se validó correctamente.

3. Validar schemas estrictos con Pydantic y evaluadores automáticos

Agrega un evaluador en tu pipeline de CI/CD que use un modelo pequeño para juzgar la calidad del output. Ejemplo en GitHub Actions:

# .github/workflows/evaluate-llm.yaml
name: Evaluar respuestas de LLM

on:
  pull_request:
    paths: ["app.py", "tests/**"]

jobs:
  evaluate:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with:
          python-version: "3.11"

      - name: Instalar dependencias
        run: |
          pip install pydantic==2.6.0 boto3==1.34.0 requests==2.31.0

      - name: Ejecutar evaluador
        env:
          AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
          AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
        run: |
          python - << 'PY'
          import boto3, json, os
          from pydantic import BaseModel, ValidationError

          # Cargar respuesta del modelo (simulada)
          response = {
              "answer": "The company will grow 20% next year because of market trends.",
              "sources": ["Documento obsoleto sobre tendencias 2022"],
              "model_used": "anthropic.claude-3-haiku"
          }

          # Validar schema
          class LLMResponse(BaseModel):
              answer: str
              sources: list[str]
              model_used: str

          try:
              validated = LLMResponse(**response)
              print("✅ Schema validado correctamente")
          except ValidationError as e:
              print(f"❌ Schema inválido: {e}")
              exit(1)

          # Evaluar alucinaciones con un modelo pequeño (ej: Claude 3 Haiku)
          judge_prompt = f"""
          CONTEXTO: {chr(10).join(validated.sources)}
          RESPUESTA: {validated.answer}

          PREGUNTA: ¿La respuesta menciona datos del contexto proporcionado?
          RESPONDE 1 (sí) o 0 (no).
          """

          # En producción: usar Anthropic API o SageMaker
          print("⚠️ Evaluador simulado. Reemplazar con llamada real a API.")
          PY
Resultado esperado:
  • Si el schema falla, el workflow aborta el PR.
  • Si la respuesta no menciona el contexto, el evaluador devuelve 0 y puedes configurar una alerta en CloudWatch.

4. Monitorear alucinaciones en tiempo real con CloudWatch Metrics

Crea un dashboard en CloudWatch que muestre:

  • llm.output_schema_valid (métrica booleana).
  • llm.context_chunks vs llm.output_length (correlación entre contexto y respuesta).
  • llm.hallucination_rate (calculado en tu evaluador).
Consulta en CloudWatch Insights:
stats count(*) as total_requests by bin(5m)
| filter @message like /llm.output_schema_valid/
| stats
  sum(case when @message like /"llm.output_schema_valid": "true"/ then 1 else 0 end) as valid_schemas,
  sum(case when @message like /"llm.output_schema_valid": "false"/ then 1 else 0 end) as invalid_schemas
  by bin(5m)

Consideraciones y buenas prácticas

  1. No confíes en system prompts agresivos: Frases como "¡NO INVENTES DATOS!" no previenen alucinaciones. En su lugar, usa:
Few-shot examples en el prompt para guiar el formato.

Temperatura baja (0.1–0.3) para reducir variabilidad.

Validation layers con Pydantic antes de devolver la respuesta al usuario.

  1. Alertas por drifting de contexto: Si llm.context_chunks cae bruscamente en CloudWatch, revisa:
– Fragmentación de documentos en tu vector DB.

– Cambios en el embedding model (ej: actualización de sentence-transformers/all-MiniLM-L6-v2).

  1. Alternativas a OpenTelemetry:
– Para pipelines simples: stdout + CloudWatch Logs Insights (como en este ejemplo).

– Para entornos serverless (Lambda): Amazon X-Ray con el SDK de OpenTelemetry.

– Para equipos que ya usan Datadog/New Relic: usa el exporter oficial de cada vendor.

  1. Limitaciones conocidas:
Latencia: El tracing asincrónico añade ~50–200ms por request. Mide con otelcol-contrib antes de producción.

Costo: CloudWatch Logs es económico para volúmenes bajos-medios, pero escala con log_stream_name dinámico (evita crear streams por request).

Modelos pequeños: Los evaluadores como claude-3-haiku son útiles para CI/CD, pero no reemplazan tests de carga con datasets reales.

Conclusión

Los pipelines de IA no fallan con excepciones claras, sino con respuestas que parecen correctas pero son falsas. Para detectar estos errores antes de que lleguen a producción, necesitas:

  1. Trazas asincrónicas que capturen el payload completo del LLM y sus dependencias (contexto, modelo, parámetros).
  2. Validación estricta de schemas con Pydantic para asegurar que la salida cumpla con un contrato definido.
  3. Evaluadores automáticos en CI/CD que usen modelos pequeños para juzgar la calidad del output contra fuentes reales.

La combinación de estos tres elementos transforma tu pipeline de IA de una caja negra mágica a un subsistema observable, predecible y auditables, listo para integrarse con tus herramientas actuales de infraestructura (AWS CloudWatch, OpenTelemetry, FastAPI). El próximo bug contextual ya no te tomará días en encontrar: lo verás en tus logs en minutos.

Fuentes

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *