Introducción
Los equipos de DevOps e infraestructura suelen asumir que «soportamos OpenTelemetry» implica un formato estándar unificado para telemetría de GenAI. Sin embargo, cada SDK, framework y proveedor implementa las convenciones de manera distinta. Este artículo te guía para construir un normalizador que unifique esos datos, resolviendo tres problemas críticos:
- Mensajes estructurados: Los frameworks (LangChain, LangGraph, CrewAI) envuelven los mensajes en objetos constructores que varían según la orquestación.
- Evolución de convenciones: Las convenciones de OTel GenAI tienen tres eras no compatibles entre sí, y el 90% del tráfico actual sigue en Era 1.
- Sin SDK: Capturar telemetría desde el kernel (eBPF) introduce nuevos desafíos de parsing y sincronización con los normalizadores basados en SDKs.
A continuación, implementamos un normalizador funcional en Go que resuelve estos casos, validado contra datos reales de Anthropic, OpenAI y proveedores de cloud (AKS, Prometheus, Jaeger).
Qué es y para qué sirve
Un normalizador de telemetría GenAI convierte datos heterogéneos (desde SDKs, frameworks o capturas en el kernel) a un esquema canónico. El objetivo no es solo renombrar atributos, sino:
- Reconstruir conversaciones: Desenrollar objetos de frameworks como LangChain/LangGraph para extraer mensajes crudos.
- Unificar proveedores: Mapear
content_blocksde Anthropic,tool_callsde OpenAI ycache_controlde Bedrock a un schema común. - Correlacionar con infraestructura: Asegurar que los spans de GenAI coincidan temporal y estructuralmente con métricas de Prometheus (AKS) y traces de Jaeger.
update.messages[]. Un normalizador que no maneje este caso generará JSON crudo en lugar de una conversación legible.Prerequisitos
Software y versiones
| Componente | Versión mínima | Notas |
|---|---|---|
| Go | 1.21+ | Para compilar el normalizador |
| OpenTelemetry SDK | v0.40+ | Soporte para convenciones GenAI |
| Prometheus | 2.45+ | Para métricas de infraestructura |
| Jaeger | 1.48+ | Para correlación de traces |
| Kubernetes (AKS) | 1.27+ | Cluster donde se deployea el normalizador |
| Anthropic SDK | v0.7+ | Para probar parsing de BLOCK18 |
| OpenAI SDK | v1.0+ | Para probar parsing de BLOCK19 |
- Acceso a cluster AKS con permisos
cluster-adminpara deployar el normalizador como sidecar en pods de aplicaciones GenAI. - Jaeger debe exponer su endpoint de colector (por defecto
:4317). - Prometheus debe tener configurada la service discovery para pods con telemetría GenAI.
- Claves de proveedores (Anthropic, OpenAI) con permisos para invocar modelos (opcional, solo para testing).
Guía paso a paso
1. Instalar dependencias y preparar el entorno
Crea un directorio para el proyecto y inicializa el módulo de Go:
mkdir otel-genai-normalizer && cd otel-genai-normalizer
go mod init github.com/tuorg/otel-genai-normalizerAgregá las dependencias necesarias:
go get go.opentelemetry.io/otel \
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc \
go.opentelemetry.io/otel/sdk/resource \
go.opentelemetry.io/otel/sdk/trace \
go.opentelemetry.io/otel/traceResultado esperado:go.mod go.sum main.go2. Definir el schema canónico para GenAI
Creamos un archivo genai_schema.go con las estructuras que normalizaremos:
package main
import "go.opentelemetry.io/otel/attribute"
// Schema canónico para GenAI
type GenAIMessage struct {
Role string `json:"role"`
Content string `json:"content"`
Parts []Part `json:"parts,omitempty"`
}
type Part struct {
Type string `json:"type"`
Text string `json:"text,omitempty"`
Func string `json:"function_call,omitempty"`
}
type GenAISpan struct {
SpanName string `json:"span_name"`
Model string `json:"model"`
Messages []GenAIMessage `json:"messages"`
ToolsUsed []string `json:"tools_used,omitempty"`
TokensIn int `json:"tokens_in"`
TokensOut int `json:"tokens_out"`
FinishReason string `json:"finish_reason"`
Provider string `json:"provider"`
}
const (
ProviderAnthropic = "anthropic"
ProviderOpenAI = "openai"
)Resultado esperado:El archivo define un schema unificado que soporta:
- Mensajes con
role/content(Era 3) oparts(Anthropic). - Llamados a herramientas en
tool_calls(OpenAI) o dentro decontent(Anthropic). - Conteo de tokens consistente (evitando errores como contar tokens cacheados).
3. Implementar parsers para proveedores
3.1 Parser para Anthropic (content_blocks)
Creamos parser_anthropic.go:
package main
import (
"encoding/json"
"fmt"
)
type AnthropicContentBlock struct {
Type string `json:"type"`
Text string `json:"text,omitempty"`
Func struct {
Name string `json:"name"`
Args string `json:"input"` // JSON string
} `json:"function_call,omitempty"`
}
type AnthropicResponse struct {
Content []AnthropicContentBlock `json:"content"`
Model string `json:"model"`
Usage struct {
InputTokens int `json:"input_tokens"`
OutputTokens int `json:"output_tokens"`
} `json:"usage"`
StopReason string `json:"stop_reason"`
}
func parseAnthropic(response []byte) (*GenAISpan, error) {
var ar AnthropicResponse
if err := json.Unmarshal(response, &ar); err != nil {
return nil, fmt.Errorf("failed to unmarshal Anthropic response: %w", err)
}
// Reconstruir mensajes
var messages []GenAIMessage
for _, block := range ar.Content {
switch block.Type {
case "text":
messages = append(messages, GenAIMessage{
Role: "assistant",
Content: block.Text,
})
case "tool_use":
funcCall := block.Func
messages = append(messages, GenAIMessage{
Role: "assistant",
Parts: []Part{{
Type: "function_call",
Func: funcCall.Name,
}},
})
}
}
return &GenAISpan{
SpanName: "genai.llm",
Model: ar.Model,
Messages: messages,
TokensIn: ar.Usage.InputTokens,
TokensOut: ar.Usage.OutputTokens,
FinishReason: ar.StopReason,
Provider: ProviderAnthropic,
}, nil
}Resultado esperado:Un parser que convierte la respuesta de Anthropic a nuestro schema canónico, manejando:
- Bloques de texto y llamadas a herramientas.
- Tokens de entrada/salida sin contar tokens cacheados (si están presentes en el payload).
3.2 Parser para LangChain/LangGraph (envolturas de framework)
Para frameworks que envuelven respuestas en objetos de orquestación (ej: LangGraph), creamos parser_langchain.go:
package main
import (
"encoding/json"
"fmt"
)
type LangGraphStateUpdate struct {
Update struct {
Messages []struct {
Role string `json:"role"`
Content string `json:"content"`
} `json:"messages"`
} `json:"update"`
}
func parseLangGraph(response []byte) (*GenAISpan, error) {
var lgu LangGraphStateUpdate
if err := json.Unmarshal(response, &lgu); err != nil {
return nil, fmt.Errorf("failed to unmarshal LangGraph update: %w", err)
}
// Reconstruir mensajes desde el estado de update
var messages []GenAIMessage
for _, msg := range lgu.Update.Messages {
messages = append(messages, GenAIMessage{
Role: msg.Role,
Content: msg.Content,
})
}
return &GenAISpan{
SpanName: "genai.llm.langgraph",
Messages: messages,
Provider: "langgraph",
}, nil
}Resultado esperado:Un parser que extrae mensajes desde estructuras anidadas como update.messages[], evitando que el normalizador genere JSON crudo en lugar de una conversación legible.
4. Normalizar spans entrantes (SDKs)
Implementamos el normalizador principal en normalizer.go:
package main
import (
"context"
"encoding/json"
"fmt"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/trace"
)
type Normalizer struct {
providerParsers map[string]func([]byte) (*GenAISpan, error)
}
func NewNormalizer() *Normalizer {
return &Normalizer{
providerParsers: map[string]func([]byte) (*GenAISpan, error){
ProviderAnthropic: parseAnthropic,
"langgraph": parseLangGraph,
},
}
}
func (n *Normalizer) Normalize(ctx context.Context, span *trace.Span) (*GenAISpan, error) {
// Obtener atributos del span (ej: SDK, provider)
attrs := span.Resource().Attributes()
provider := getProvider(attrs)
// Obtener payload crudo (ej: desde SDK o eBPF)
rawPayload, ok := span.(interface {
GetRawPayload() []byte
}).GetRawPayload()
if !ok {
return nil, fmt.Errorf("span does not support raw payload extraction")
}
parser, exists := n.providerParsers[provider]
if !exists {
return nil, fmt.Errorf("no parser for provider: %s", provider)
}
return parser(rawPayload)
}
func getProvider(attrs []attribute.KeyValue) string {
for _, attr := range attrs {
if attr.Key == attribute.Key("genai.provider") {
return string(attr.Value.AsString())
}
}
return "unknown"
}Resultado esperado:Un normalizador que:
- Detecta el proveedor desde atributos de OTel (ej:
genai.provider=anthropic). - Usa el parser correspondiente (Anthropic, LangGraph, etc.).
- Devuelve un
GenAISpanunificado para correlación con infraestructura.
5. Integrar con OTel y exportar a Jaeger/Prometheus
Modificamos main.go para integrar el normalizador con el SDK de OTel:
package main
import (
"context"
"log"
"time"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.21.0"
)
func setupOTel() (*sdktrace.TracerProvider, error) {
// Exportador a Jaeger (OTLP gRPC)
exporter, err := otlptracegrpc.New(
context.Background(),
otlptracegrpc.WithEndpoint("jaeger-collector:4317"),
otlptracegrpc.WithInsecure(), // En prod, usar TLS
)
if err != nil {
return nil, err
}
// Tracer provider con normalizador
tp := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exporter),
sdktrace.WithResource(resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceName("genai-normalizer"),
attribute.String("deployment.environment", "production"),
)),
sdktrace.WithSpanProcessor(NewNormalizingSpanProcessor(NewNormalizer())),
)
otel.SetTracerProvider(tp)
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(
propagation.TraceContext{},
propagation.Baggage{},
))
return tp, nil
}
// NormalizingSpanProcessor implementa sdktrace.SpanProcessor
type NormalizingSpanProcessor struct {
normalizer *Normalizer
}
func NewNormalizingSpanProcessor(n *Normalizer) *NormalizingSpanProcessor {
return &NormalizingSpanProcessor{normalizer: n}
}
func (nsp *NormalizingSpanProcessor) OnEnd(span *trace.Span) {
normalized, err := nsp.normalizer.Normalize(context.Background(), span)
if err != nil {
log.Printf("Normalization failed: %v", err)
return
}
// Aquí podés exportar a Prometheus:
// - Registrar métricas de tokens_in/out por modelo
// - Correlacionar con traces de Jaeger via span_name
log.Printf("Normalized span: %+v", normalized)
}Resultado esperado:- Spans de GenAI normalizados y exportados a Jaeger.
- Métricas de tokens registradas en Prometheus para análisis de costos.
Consideraciones y buenas prácticas
1. Manejo de versiones de convenciones (Era 1 vs Era 3)
Problema:El 90% del tráfico usa convenciones de Era 1 (ej: Traceloop, LangSmith), mientras que las implementaciones oficiales de OTel usan Era 3. Un backend que solo soporte Era 3 perderá datos críticos.
Solución:Implementa un dual-parser que soporte ambas eras. Ejemplo para mensajes:
func parseMessagesEra1(attrs []attribute.KeyValue) []GenAIMessage {
// Era 1 usa atributos planos como `genai.messages.0.role`, `genai.messages.0.content`
var messages []GenAIMessage
for i := 0; ; i++ {
roleKey := attribute.Key(fmt.Sprintf("genai.messages.%d.role", i))
contentKey := attribute.Key(fmt.Sprintf("genai.messages.%d.content", i))
role, _ := findAttr(attrs, roleKey)
content, _ := findAttr(attrs, contentKey)
if role == "" { break }
messages = append(messages, GenAIMessage{Role: role, Content: content})
}
return messages
}Alternativa:Usa adapters que mapeen atributos de cada era a tu schema canónico. Ejemplo:
# Configuración de mapping (YAML)
mappings:
era1:
messages: "genai.messages.%d.role"
model: "genai.model"
era3:
messages: "genai.input.messages"
model: "genai.model.name"2. Sincronización entre eBPF y SDKs
Problema:Los normalizadores basados en eBPF (captura desde el kernel) y los basados en SDKs pueden divergir si:
- Los nombres de campos JSON cambian (ej:
contentvsparts). - Los atributos de OTel se renombran en nuevas versiones.
Implementa tests de contrato que validen que ambos paths produzcan el mismo output para el mismo input:
func TestSyncBetweenParsers(t *testing.T) {
// Payload de ejemplo de Anthropic
payload := []byte(`{
"content": [{"type": "text", "text": "Hello"}],
"usage": {"input_tokens": 5, "output_tokens": 10},
"stop_reason": "end_turn"
}`)
// Parser 1: Desde SDK
sdkSpan := crearSpanMock(payload, "anthropic")
sdkNormalized, _ := newNormalizer().Normalize(context.Background(), sdkSpan)
// Parser 2: Desde eBPF
ebpfNormalized, _ := parseAnthropic(payload)
// Validar equivalencia
assert.Equal(t, sdkNormalized.Messages, ebpfNormalized.Messages)
assert.Equal(t, sdkNormalized.TokensIn, ebpfNormalized.TokensIn)
}Herramientas recomendadas:- GoConvey para tests de contrato.
- WireMock para simular respuestas de proveedores en tests de integración.
3. Optimización para AKS y Prometheus
Problema:Correlacionar telemetría GenAI con métricas de infraestructura (ej: latencia de pods, OOM-kills) requiere etiquetas consistentes.
Solución:Agregá atributos de correlación a los spans normalizados:
func (n *Normalizer) Normalize(ctx context.Context, span *trace.Span) (*GenAISpan, error) {
normalized, err := ... // parsing
if err != nil { return nil, err }
// Agregar atributos para Prometheus
normalized.SpanAttributes = []attribute.KeyValue{
attribute.String("k8s.pod.name", getPodName(span)),
attribute.Int("k8s.container.memory.usage", getMemoryUsage(span)),
}
return normalized, nil
}Configuración de Prometheus:En tu prometheus.yml, agregá un recording rule para calcular costos por modelo:
- record: genai:cost_per_model:sum
expr: sum by (model) (
rate(genai_tokens_total{type="out"}[5m]) * on(model) group_left(provider)
genai_model_pricing{provider="anthropic"}
)Resultado:Podés alertar si el costo por modelo supera un umbral (ej: $100/h).
4. Manejo de streaming y respuestas comprimidas (eBPF)
Problema:eBPF captura payloads crudos, pero estos pueden estar:
- Comprimidos (gzip).
- Truncados (respuestas largas).
- En formato streaming (ej: tokens enviados en chunks).
Implementa un parser de streaming que reconstruya el payload completo:
func parseStreamingPayload(chunks [][]byte) ([]byte, error) {
var buf bytes.Buffer
for _, chunk := range chunks {
// Descomprimir si es necesario
if isCompressed(chunk) {
decompressed, err := gzip.NewReader(bytes.NewReader(chunk))
if err != nil { continue }
chunk, _ = io.ReadAll(decompressed)
}
buf.Write(chunk)
}
return buf.Bytes(), nil
}Herramienta recomendada:Usa libbpf para capturar payloads completos en el kernel, evitando truncamientos.
Conclusión
Normalizar telemetría GenAI en OpenTelemetry no es un problema de renombres, sino de reconstrucción semántica. Los equipos deben:
- Soportar múltiples eras de convenciones (Era 1, Era 3) y frameworks (LangChain, LangGraph, CrewAI).
- Sincronizar parsers entre eBPF y SDKs mediante tests de contrato.
- Correlacionar con infraestructura usando etiquetas consistentes en Prometheus y Jaeger.
- Lock-in de proveedores: Usar Datadog o LangSmith obliga a re-instrumentar al cambiar de backend.
- Pérdida de datos: Un backend que ignore Era 1 perderá el 90% del tráfico real.
- Falsos positivos en costos: Contar tokens cacheados como «tokens de salida» infla artificialmente los gastos.
- Implementá el normalizador en tu pipeline de CI/CD (ej: como sidecar en AKS).
- Configurá alertas en Prometheus para detectar divergencias entre parsers (ej:
genai_tokens_total{parser="sdk"} != genai_tokens_total{parser="ebpf"}). - Validá la correlación entre GenAI y infraestructura con Golden Signals:
– Tokens por segundo vs CPU del nodo.
