Introducción

Los equipos de DevOps e infraestructura suelen asumir que «soportamos OpenTelemetry» implica un formato estándar unificado para telemetría de GenAI. Sin embargo, cada SDK, framework y proveedor implementa las convenciones de manera distinta. Este artículo te guía para construir un normalizador que unifique esos datos, resolviendo tres problemas críticos:

  • Mensajes estructurados: Los frameworks (LangChain, LangGraph, CrewAI) envuelven los mensajes en objetos constructores que varían según la orquestación.
  • Evolución de convenciones: Las convenciones de OTel GenAI tienen tres eras no compatibles entre sí, y el 90% del tráfico actual sigue en Era 1.
  • Sin SDK: Capturar telemetría desde el kernel (eBPF) introduce nuevos desafíos de parsing y sincronización con los normalizadores basados en SDKs.

A continuación, implementamos un normalizador funcional en Go que resuelve estos casos, validado contra datos reales de Anthropic, OpenAI y proveedores de cloud (AKS, Prometheus, Jaeger).

Qué es y para qué sirve

Un normalizador de telemetría GenAI convierte datos heterogéneos (desde SDKs, frameworks o capturas en el kernel) a un esquema canónico. El objetivo no es solo renombrar atributos, sino:

  1. Reconstruir conversaciones: Desenrollar objetos de frameworks como LangChain/LangGraph para extraer mensajes crudos.
  2. Unificar proveedores: Mapear content_blocks de Anthropic, tool_calls de OpenAI y cache_control de Bedrock a un schema común.
  3. Correlacionar con infraestructura: Asegurar que los spans de GenAI coincidan temporal y estructuralmente con métricas de Prometheus (AKS) y traces de Jaeger.
Ejemplo práctico: Si un equipo usa LangGraph con Anthropic, el span de LLM puede estar envuelto en un update.messages[]. Un normalizador que no maneje este caso generará JSON crudo en lugar de una conversación legible.

Prerequisitos

Software y versiones

ComponenteVersión mínimaNotas
Go1.21+Para compilar el normalizador
OpenTelemetry SDKv0.40+Soporte para convenciones GenAI
Prometheus2.45+Para métricas de infraestructura
Jaeger1.48+Para correlación de traces
Kubernetes (AKS)1.27+Cluster donde se deployea el normalizador
Anthropic SDKv0.7+Para probar parsing de BLOCK18
OpenAI SDKv1.0+Para probar parsing de BLOCK19
### Permisos y accesos
  • Acceso a cluster AKS con permisos cluster-admin para deployar el normalizador como sidecar en pods de aplicaciones GenAI.
  • Jaeger debe exponer su endpoint de colector (por defecto :4317).
  • Prometheus debe tener configurada la service discovery para pods con telemetría GenAI.
  • Claves de proveedores (Anthropic, OpenAI) con permisos para invocar modelos (opcional, solo para testing).

Guía paso a paso

1. Instalar dependencias y preparar el entorno

Crea un directorio para el proyecto y inicializa el módulo de Go:

mkdir otel-genai-normalizer && cd otel-genai-normalizer
go mod init github.com/tuorg/otel-genai-normalizer

Agregá las dependencias necesarias:

go get go.opentelemetry.io/otel \
  go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc \
  go.opentelemetry.io/otel/sdk/resource \
  go.opentelemetry.io/otel/sdk/trace \
  go.opentelemetry.io/otel/trace
Resultado esperado:
go.mod  go.sum  main.go

2. Definir el schema canónico para GenAI

Creamos un archivo genai_schema.go con las estructuras que normalizaremos:

package main

import "go.opentelemetry.io/otel/attribute"

// Schema canónico para GenAI
type GenAIMessage struct {
    Role    string `json:"role"`
    Content string `json:"content"`
    Parts   []Part `json:"parts,omitempty"`
}

type Part struct {
    Type string `json:"type"`
    Text string `json:"text,omitempty"`
    Func string `json:"function_call,omitempty"`
}

type GenAISpan struct {
    SpanName   string         `json:"span_name"`
    Model      string         `json:"model"`
    Messages   []GenAIMessage `json:"messages"`
    ToolsUsed  []string       `json:"tools_used,omitempty"`
    TokensIn   int            `json:"tokens_in"`
    TokensOut  int            `json:"tokens_out"`
    FinishReason string        `json:"finish_reason"`
    Provider   string         `json:"provider"`
}

const (
    ProviderAnthropic = "anthropic"
    ProviderOpenAI    = "openai"
)
Resultado esperado:

El archivo define un schema unificado que soporta:

  • Mensajes con role/content (Era 3) o parts (Anthropic).
  • Llamados a herramientas en tool_calls (OpenAI) o dentro de content (Anthropic).
  • Conteo de tokens consistente (evitando errores como contar tokens cacheados).

3. Implementar parsers para proveedores

3.1 Parser para Anthropic (content_blocks)

Creamos parser_anthropic.go:

package main

import (
    "encoding/json"
    "fmt"
)

type AnthropicContentBlock struct {
    Type string `json:"type"`
    Text string `json:"text,omitempty"`
    Func struct {
        Name string `json:"name"`
        Args string `json:"input"` // JSON string
    } `json:"function_call,omitempty"`
}

type AnthropicResponse struct {
    Content []AnthropicContentBlock `json:"content"`
    Model  string                 `json:"model"`
    Usage  struct {
        InputTokens  int `json:"input_tokens"`
        OutputTokens int `json:"output_tokens"`
    } `json:"usage"`
    StopReason string `json:"stop_reason"`
}

func parseAnthropic(response []byte) (*GenAISpan, error) {
    var ar AnthropicResponse
    if err := json.Unmarshal(response, &ar); err != nil {
        return nil, fmt.Errorf("failed to unmarshal Anthropic response: %w", err)
    }

    // Reconstruir mensajes
    var messages []GenAIMessage
    for _, block := range ar.Content {
        switch block.Type {
        case "text":
            messages = append(messages, GenAIMessage{
                Role:  "assistant",
                Content: block.Text,
            })
        case "tool_use":
            funcCall := block.Func
            messages = append(messages, GenAIMessage{
                Role: "assistant",
                Parts: []Part{{
                    Type: "function_call",
                    Func: funcCall.Name,
                }},
            })
        }
    }

    return &GenAISpan{
        SpanName:   "genai.llm",
        Model:      ar.Model,
        Messages:   messages,
        TokensIn:  ar.Usage.InputTokens,
        TokensOut: ar.Usage.OutputTokens,
        FinishReason: ar.StopReason,
        Provider:   ProviderAnthropic,
    }, nil
}
Resultado esperado:

Un parser que convierte la respuesta de Anthropic a nuestro schema canónico, manejando:

  • Bloques de texto y llamadas a herramientas.
  • Tokens de entrada/salida sin contar tokens cacheados (si están presentes en el payload).

3.2 Parser para LangChain/LangGraph (envolturas de framework)

Para frameworks que envuelven respuestas en objetos de orquestación (ej: LangGraph), creamos parser_langchain.go:

package main

import (
    "encoding/json"
    "fmt"
)

type LangGraphStateUpdate struct {
    Update struct {
        Messages []struct {
            Role    string `json:"role"`
            Content string `json:"content"`
        } `json:"messages"`
    } `json:"update"`
}

func parseLangGraph(response []byte) (*GenAISpan, error) {
    var lgu LangGraphStateUpdate
    if err := json.Unmarshal(response, &lgu); err != nil {
        return nil, fmt.Errorf("failed to unmarshal LangGraph update: %w", err)
    }

    // Reconstruir mensajes desde el estado de update
    var messages []GenAIMessage
    for _, msg := range lgu.Update.Messages {
        messages = append(messages, GenAIMessage{
            Role:    msg.Role,
            Content: msg.Content,
        })
    }

    return &GenAISpan{
        SpanName: "genai.llm.langgraph",
        Messages: messages,
        Provider: "langgraph",
    }, nil
}
Resultado esperado:

Un parser que extrae mensajes desde estructuras anidadas como update.messages[], evitando que el normalizador genere JSON crudo en lugar de una conversación legible.

4. Normalizar spans entrantes (SDKs)

Implementamos el normalizador principal en normalizer.go:

package main

import (
    "context"
    "encoding/json"
    "fmt"

    "go.opentelemetry.io/otel/attribute"
    "go.opentelemetry.io/otel/sdk/trace"
)

type Normalizer struct {
    providerParsers map[string]func([]byte) (*GenAISpan, error)
}

func NewNormalizer() *Normalizer {
    return &Normalizer{
        providerParsers: map[string]func([]byte) (*GenAISpan, error){
            ProviderAnthropic: parseAnthropic,
            "langgraph":      parseLangGraph,
        },
    }
}

func (n *Normalizer) Normalize(ctx context.Context, span *trace.Span) (*GenAISpan, error) {
    // Obtener atributos del span (ej: SDK, provider)
    attrs := span.Resource().Attributes()
    provider := getProvider(attrs)

    // Obtener payload crudo (ej: desde SDK o eBPF)
    rawPayload, ok := span.(interface {
        GetRawPayload() []byte
    }).GetRawPayload()
    if !ok {
        return nil, fmt.Errorf("span does not support raw payload extraction")
    }

    parser, exists := n.providerParsers[provider]
    if !exists {
        return nil, fmt.Errorf("no parser for provider: %s", provider)
    }

    return parser(rawPayload)
}

func getProvider(attrs []attribute.KeyValue) string {
    for _, attr := range attrs {
        if attr.Key == attribute.Key("genai.provider") {
            return string(attr.Value.AsString())
        }
    }
    return "unknown"
}
Resultado esperado:

Un normalizador que:

  1. Detecta el proveedor desde atributos de OTel (ej: genai.provider=anthropic).
  2. Usa el parser correspondiente (Anthropic, LangGraph, etc.).
  3. Devuelve un GenAISpan unificado para correlación con infraestructura.

5. Integrar con OTel y exportar a Jaeger/Prometheus

Modificamos main.go para integrar el normalizador con el SDK de OTel:

package main

import (
    "context"
    "log"
    "time"

    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
    "go.opentelemetry.io/otel/propagation"
    "go.opentelemetry.io/otel/sdk/resource"
    sdktrace "go.opentelemetry.io/otel/sdk/trace"
    semconv "go.opentelemetry.io/otel/semconv/v1.21.0"
)

func setupOTel() (*sdktrace.TracerProvider, error) {
    // Exportador a Jaeger (OTLP gRPC)
    exporter, err := otlptracegrpc.New(
        context.Background(),
        otlptracegrpc.WithEndpoint("jaeger-collector:4317"),
        otlptracegrpc.WithInsecure(), // En prod, usar TLS
    )
    if err != nil {
        return nil, err
    }

    // Tracer provider con normalizador
    tp := sdktrace.NewTracerProvider(
        sdktrace.WithBatcher(exporter),
        sdktrace.WithResource(resource.NewWithAttributes(
            semconv.SchemaURL,
            semconv.ServiceName("genai-normalizer"),
            attribute.String("deployment.environment", "production"),
        )),
        sdktrace.WithSpanProcessor(NewNormalizingSpanProcessor(NewNormalizer())),
    )

    otel.SetTracerProvider(tp)
    otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(
        propagation.TraceContext{},
        propagation.Baggage{},
    ))

    return tp, nil
}

// NormalizingSpanProcessor implementa sdktrace.SpanProcessor
type NormalizingSpanProcessor struct {
    normalizer *Normalizer
}

func NewNormalizingSpanProcessor(n *Normalizer) *NormalizingSpanProcessor {
    return &NormalizingSpanProcessor{normalizer: n}
}

func (nsp *NormalizingSpanProcessor) OnEnd(span *trace.Span) {
    normalized, err := nsp.normalizer.Normalize(context.Background(), span)
    if err != nil {
        log.Printf("Normalization failed: %v", err)
        return
    }

    // Aquí podés exportar a Prometheus:
    // - Registrar métricas de tokens_in/out por modelo
    // - Correlacionar con traces de Jaeger via span_name
    log.Printf("Normalized span: %+v", normalized)
}
Resultado esperado:
  • Spans de GenAI normalizados y exportados a Jaeger.
  • Métricas de tokens registradas en Prometheus para análisis de costos.

Consideraciones y buenas prácticas

1. Manejo de versiones de convenciones (Era 1 vs Era 3)

Problema:

El 90% del tráfico usa convenciones de Era 1 (ej: Traceloop, LangSmith), mientras que las implementaciones oficiales de OTel usan Era 3. Un backend que solo soporte Era 3 perderá datos críticos.

Solución:

Implementa un dual-parser que soporte ambas eras. Ejemplo para mensajes:

func parseMessagesEra1(attrs []attribute.KeyValue) []GenAIMessage {
    // Era 1 usa atributos planos como `genai.messages.0.role`, `genai.messages.0.content`
    var messages []GenAIMessage
    for i := 0; ; i++ {
        roleKey := attribute.Key(fmt.Sprintf("genai.messages.%d.role", i))
        contentKey := attribute.Key(fmt.Sprintf("genai.messages.%d.content", i))
        role, _ := findAttr(attrs, roleKey)
        content, _ := findAttr(attrs, contentKey)
        if role == "" { break }
        messages = append(messages, GenAIMessage{Role: role, Content: content})
    }
    return messages
}
Alternativa:

Usa adapters que mapeen atributos de cada era a tu schema canónico. Ejemplo:

# Configuración de mapping (YAML)
mappings:
  era1:
    messages: "genai.messages.%d.role"
    model: "genai.model"
  era3:
    messages: "genai.input.messages"
    model: "genai.model.name"

2. Sincronización entre eBPF y SDKs

Problema:

Los normalizadores basados en eBPF (captura desde el kernel) y los basados en SDKs pueden divergir si:

  • Los nombres de campos JSON cambian (ej: content vs parts).
  • Los atributos de OTel se renombran en nuevas versiones.
Solución:

Implementa tests de contrato que validen que ambos paths produzcan el mismo output para el mismo input:

func TestSyncBetweenParsers(t *testing.T) {
    // Payload de ejemplo de Anthropic
    payload := []byte(`{
        "content": [{"type": "text", "text": "Hello"}],
        "usage": {"input_tokens": 5, "output_tokens": 10},
        "stop_reason": "end_turn"
    }`)

    // Parser 1: Desde SDK
    sdkSpan := crearSpanMock(payload, "anthropic")
    sdkNormalized, _ := newNormalizer().Normalize(context.Background(), sdkSpan)

    // Parser 2: Desde eBPF
    ebpfNormalized, _ := parseAnthropic(payload)

    // Validar equivalencia
    assert.Equal(t, sdkNormalized.Messages, ebpfNormalized.Messages)
    assert.Equal(t, sdkNormalized.TokensIn, ebpfNormalized.TokensIn)
}
Herramientas recomendadas:
  • GoConvey para tests de contrato.
  • WireMock para simular respuestas de proveedores en tests de integración.

3. Optimización para AKS y Prometheus

Problema:

Correlacionar telemetría GenAI con métricas de infraestructura (ej: latencia de pods, OOM-kills) requiere etiquetas consistentes.

Solución:

Agregá atributos de correlación a los spans normalizados:

func (n *Normalizer) Normalize(ctx context.Context, span *trace.Span) (*GenAISpan, error) {
    normalized, err := ... // parsing
    if err != nil { return nil, err }

    // Agregar atributos para Prometheus
    normalized.SpanAttributes = []attribute.KeyValue{
        attribute.String("k8s.pod.name", getPodName(span)),
        attribute.Int("k8s.container.memory.usage", getMemoryUsage(span)),
    }

    return normalized, nil
}
Configuración de Prometheus:

En tu prometheus.yml, agregá un recording rule para calcular costos por modelo:

- record: genai:cost_per_model:sum
  expr: sum by (model) (
    rate(genai_tokens_total{type="out"}[5m]) * on(model) group_left(provider)
    genai_model_pricing{provider="anthropic"}
  )
Resultado:

Podés alertar si el costo por modelo supera un umbral (ej: $100/h).

4. Manejo de streaming y respuestas comprimidas (eBPF)

Problema:

eBPF captura payloads crudos, pero estos pueden estar:

  • Comprimidos (gzip).
  • Truncados (respuestas largas).
  • En formato streaming (ej: tokens enviados en chunks).
Solución:

Implementa un parser de streaming que reconstruya el payload completo:

func parseStreamingPayload(chunks [][]byte) ([]byte, error) {
    var buf bytes.Buffer
    for _, chunk := range chunks {
        // Descomprimir si es necesario
        if isCompressed(chunk) {
            decompressed, err := gzip.NewReader(bytes.NewReader(chunk))
            if err != nil { continue }
            chunk, _ = io.ReadAll(decompressed)
        }
        buf.Write(chunk)
    }
    return buf.Bytes(), nil
}
Herramienta recomendada:

Usa libbpf para capturar payloads completos en el kernel, evitando truncamientos.

Conclusión

Normalizar telemetría GenAI en OpenTelemetry no es un problema de renombres, sino de reconstrucción semántica. Los equipos deben:

  1. Soportar múltiples eras de convenciones (Era 1, Era 3) y frameworks (LangChain, LangGraph, CrewAI).
  2. Sincronizar parsers entre eBPF y SDKs mediante tests de contrato.
  3. Correlacionar con infraestructura usando etiquetas consistentes en Prometheus y Jaeger.
Alternativas a no normalizar:
  • Lock-in de proveedores: Usar Datadog o LangSmith obliga a re-instrumentar al cambiar de backend.
  • Pérdida de datos: Un backend que ignore Era 1 perderá el 90% del tráfico real.
  • Falsos positivos en costos: Contar tokens cacheados como «tokens de salida» infla artificialmente los gastos.
Próximos pasos:
  • Implementá el normalizador en tu pipeline de CI/CD (ej: como sidecar en AKS).
  • Configurá alertas en Prometheus para detectar divergencias entre parsers (ej: genai_tokens_total{parser="sdk"} != genai_tokens_total{parser="ebpf"}).
  • Validá la correlación entre GenAI y infraestructura con Golden Signals:
– Latencia del modelo vs latencia del pod.

– Tokens por segundo vs CPU del nodo.

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *