Introducción

Un escáner de vulnerabilidades basado en modelos de IA falla cuando su contexto se llena, su estado se pierde en un reinicio o ignora dependencias entre repositorios. En equipos de seguridad reales, esto significa horas de trabajo perdido por errores de contexto, falsos positivos recurrentes o la imposibilidad de rastrear un bug desde su origen hasta su impacto en producción.

Este artículo detalla cómo construir tu propio harness de vulnerabilidades: un pipeline automatizado de múltiples etapas que externaliza el estado, cross-valida hallazgos con modelos distintos y escala sin limitarse a un solo repositorio. El enfoque es model-agnóstico —soporta cualquier LLM de frontera— y está diseñado para EKS, con ejemplos en Python para orquestación y Rust para componentes de alto rendimiento.

Qué es y para qué sirve

Un harness de vulnerabilidades es una arquitectura de orquestación que:

  1. Descubre bugs mediante hunters especializados por clase de ataque (ej: inyección SQL, deserialización insegura).
  2. Valida cada hallazgo con un adversarial validator que intenta refutarlo con evidencia opuesta.
  3. Trazas dependencias entre repositorios (ej: un bug en una librería core-utils impacta en api-gateway).
  4. Persiste el estado en una base de datos para reiniciar escaneos sin perder contexto.
Diferencia clave vs. herramientas existentes:
  • Herramientas como CodeQL o Semgrep requieren parsing sintáctico por lenguaje.
  • Nuestro harness no parsea código: usa el modelo como engine de inferencia sobre logs de ejecución, archivos de configuración y documentación. Esto lo hace agnóstico al lenguaje (funciona igual con Rust, Go o Terraform).

Prerequisitos

ComponenteVersión mínimaNotas
Kubernetes1.27+Cluster EKS con IAM Roles for Service Accounts (IRSA) activado.
Python3.11+Necesario para el orchestrator y hunters. Usá BLOCK13 para dependencias.
Rust1.70+Para el *validator adversarial* (alto rendimiento en parsing de logs).
PostgreSQL14+Base de datos para almacenar hallazgos, dependencias y estado.
Docker24+Para empaquetar hunters y validators como containers.
Modelos de LLMCualquier (ej: Claude 3.5, Llama 4)Deben exponer una API con BLOCK14 y BLOCK15 (ej: Anthropic Messages API).
Permisos– ClusterAdmin (EKS)Para instalar operadores y CRDs.
– IAM: BLOCK16Para que el harness acceda a metadata de repositorios.
Accesos requeridos:
  1. Repositorios privados en GitHub/GitLab con un token de máquina con read:packages y read:repo.
  2. API key de un proveedor de LLM (ej: Anthropic, OpenAI o Mistral) con cuota suficiente para ~100 requests/hora por repositorio.
  3. Cuenta en AWS con permisos para crear IAM Roles for Service Accounts y Secrets Manager.

Guía paso a paso

1. Diseñá el modelo de datos para el harness

Problema: Necesitás persistir hallazgos, dependencias entre repositorios y el estado de cada hunt sin perder contexto al reiniciar.Solución: Usá un esquema relacional con estas tablas:
-- Tablas en PostgreSQL
CREATE TABLE repos (
    id SERIAL PRIMARY KEY,
    path TEXT NOT NULL,          -- ej: "github.com/empresa/core-utils"
    last_scanned TIMESTAMPTZ,
    enabled BOOLEAN DEFAULT TRUE
);

CREATE TABLE findings (
    id SERIAL PRIMARY KEY,
    repo_id INTEGER REFERENCES repos(id),
    hunter_name TEXT NOT NULL,    -- ej: "sql_injection_hunter"
    raw_evidence JSONB NOT NULL,   -- salida del hunter en formato estandarizado
    status TEXT NOT NULL CHECK (status IN ('pending', 'validating', 'confirmed', 'rejected')),
    created_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE TABLE repo_dependencies (
    from_repo INTEGER REFERENCES repos(id),
    to_repo INTEGER REFERENCES repos(id),
    type TEXT CHECK (type IN ('library', 'api', 'config')),
    PRIMARY KEY (from_repo, to_repo, type)
);

CREATE INDEX idx_findings_status ON findings(status);
CREATE INDEX idx_findings_repo ON findings(repo_id);
Resultado esperado:
  • repos almacena todos los repositorios escaneados.
  • findings registra cada hallazgo con su estado (pendingvalidatingconfirmed).
  • repo_dependencies traza relaciones entre repositorios (ej: si api-gateway usa core-utils).

2. Implementá el orchestrator en Python

El orchestrator gestiona el ciclo de vida de cada hunt y delega tareas a los hunters (Python) y validators (Rust).

Archivo: vuln_orchestrator/main.py
import asyncio
import json
from typing import Dict, List
from pathlib import Path
import httpx
from sqlalchemy import create_engine, select
from sqlalchemy.orm import Session

# Configuración
LLM_API = "https://api.anthropic.com/v1/messages"
LLM_KEY = "claude-3-5-sonnet-20241022"
DB_URI = "postgresql://user:pass@postgres:5432/vuln_db"

engine = create_engine(DB_URI)
client = httpx.AsyncClient(timeout=30.0)

async def run_hunter(
    repo_path: str,
    hunter_name: str,
    context: Dict[str, str]
) -> Dict[str, str]:
    """Ejecuta un hunter específico y devuelve evidencia cruda."""
    prompt = f"""
    Eres un hunter de vulnerabilidades especializado en {hunter_name}.
    Analizá el repositorio en {repo_path} con contexto:
    {json.dumps(context, indent=2)}
    """
    payload = {
        "model": LLM_KEY,
        "max_tokens": 4096,
        "messages": [{"role": "user", "content": prompt}],
        "tools": [{"name": "list_files", "description": "Lista archivos en el repo"}]
    }
    response = await client.post(LLM_API, headers={"x-api-key": "YOUR_KEY"}, json=payload)
    response.raise_for_status()
    return response.json()["content"][0]["text"]

async def triage_findings(db_session: Session) -> None:
    """Toma findings 'pending' y los envía a validación adversarial."""
    findings = db_session.execute(
        select(findings).where(findings.c.status == "pending")
    ).scalars().all()
    for finding in findings:
        # Ejemplo: Validación adversarial con un modelo distinto
        validator_prompt = f"""
        Eres un adversario. Buscá un falso positivo en este hallazgo:
        {json.dumps(finding.raw_evidence, indent=2)}
        """
        payload = {
            "model": "llama-3.2-90b-vision",
            "messages": [{"role": "user", "content": validator_prompt}]
        }
        response = await client.post(LLM_API, json=payload)
        is_false_positive = "FALSO POSITIVO" in response.json()["content"][0]["text"]
        finding.status = "confirmed" if not is_false_positive else "rejected"
        db_session.commit()

async def main():
    async with client:
        with Session(engine) as db_session:
            # 1. Seleccioná repositorios no escaneados recientemente
            repos = db_session.execute(select(repos).where(repos.c.last_scanned < "2024-10-01")).scalars().all()
            for repo in repos:
                # 2. Ejecutá hunters por clase de ataque
                hunters = ["sql_injection_hunter", "deserialization_hunter", "hardcoded_secrets_hunter"]
                for hunter in hunters:
                    evidence = await run_hunter(repo.path, hunter, {"last_commit": "abc123"})
                    db_session.execute(
                        insert(findings).values(
                            repo_id=repo.id,
                            hunter_name=hunter,
                            raw_evidence=evidence,
                            status="pending"
                        )
                    )
                repo.last_scanned = datetime.utcnow()
                db_session.commit()
                await triage_findings(db_session)

if __name__ == "__main__":
    asyncio.run(main())
Comandos para deployar en EKS:
# 1. Build de contenedores
docker build -t vuln-orchestrator:latest -f Dockerfile.orchestrator .
docker build -t sql-injection-hunter:latest -f hunters/sql_injection/Dockerfile .

# 2. Push a ECR
aws ecr get-login-password | docker login --username AWS --password-stdin 123456789012.dkr.ecr.us-east-1.amazonaws.com
docker tag vuln-orchestrator:latest 123456789012.dkr.ecr.us-east-1.amazonaws.com/vuln-orchestrator:latest
docker push 123456789012.dkr.ecr.us-east-1.amazonaws.com/vuln-orchestrator:latest

# 3. Deploy en EKS
kubectl apply -f k8s/orchestrator-deployment.yaml
Resultado esperado:
  • El pod vuln-orchestrator escanea repositorios cada 6 horas (configurable).
  • Cada hunter genera hallazgos en formato estandarizado (JSON con file, line, evidence).
  • Los hallazgos pendientes pasan a validación adversarial con un modelo distinto.

3. Implementá el validator adversarial en Rust

Problema: Validar hallazgos manualmente es insostenible a escala. Necesitás un validator que refute cada finding con evidencia opuesta.Solución: Un microservicio en Rust que:
  1. Recibe hallazgos en formato JSON.
  2. Usa un LLM para generar contra-evidencia.
  3. Devuelve un score de confianza (0.0 = falso positivo, 1.0 = confirmado).
Archivo: vuln_validator/src/main.rs
use serde::{Deserialize, Serialize};
use reqwest::Client;
use tokio::time::{sleep, Duration};

#[derive(Debug, Serialize, Deserialize)]
struct Finding {
    id: i32,
    raw_evidence: serde_json::Value,
    repo_path: String,
}

#[derive(Debug, Serialize, Deserialize)]
struct ValidationResult {
    finding_id: i32,
    is_false_positive: bool,
    confidence: f32,
    adversarial_evidence: String,
}

async fn validate_finding(finding: Finding) -> ValidationResult {
    let client = Client::new();
    let prompt = format!(
        "Analizá este hallazgo de vulnerabilidad en {}:\n{:#?}\n\n\
        Proporcioná contra-evidencia que demuestre que es un falso positivo.",
        finding.repo_path, finding.raw_evidence
    );

    let response = client
        .post("https://api.openai.com/v1/chat/completions")
        .header("Authorization", "Bearer YOUR_KEY")
        .json(&serde_json::json!({
            "model": "gpt-4-1106-preview",
            "messages": [{"role": "user", "content": prompt}],
            "max_tokens": 2048
        }))
        .send()
        .await?
        .json::<serde_json::Value>()
        .await?;

    let adversarial_evidence = response["choices"][0]["message"]["content"]
        .as_str()
        .unwrap_or("No se pudo generar contra-evidencia")
        .to_string();

    let is_false_positive = adversarial_evidence.contains("FALSO POSITIVO")
        || adversarial_evidence.contains("no es vulnerable");

    ValidationResult {
        finding_id: finding.id,
        is_false_positive,
        confidence: if is_false_positive { 0.0 } else { 1.0 },
        adversarial_evidence,
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    loop {
        // 1. Consumí mensajes de una cola (ej: RabbitMQ, SQS)
        let finding: Finding = /* Deserializá desde la cola */;
        let result = validate_finding(finding).await;
        println!("Validación para finding {}: {:?}", result.finding_id, result);
        sleep(Duration::from_secs(5)).await;
    }
}
Build y deploy en EKS:
# 1. Compilá para Linux x86_64 (EKS usa amd64)
cross build --target x86_64-unknown-linux-musl --release

# 2. Build de contenedor
docker build -t vuln-validator:latest -f Dockerfile.validator .

# 3. Deploy
kubectl apply -f k8s/validator-deployment.yaml
Resultado esperado:
  • El validator procesa ~10 hallazgos por minuto (ajustable via --concurrency).
  • Devuelve hallazgos confirmados a una cola de triage (ej: SQS → Lambda para notificaciones).

4. Configurá el trazado de dependencias entre repositorios

Problema: Un bug en lib-utils puede impactar en api-gateway, pero el modelo no lo detecta si analiza repositorios en forma aislada.Solución: Un recon agent que:
  1. Escanea archivos de go.mod, Cargo.toml, package.json para detectar dependencias.
  2. Actualiza la tabla repo_dependencies.
Archivo: vuln_recon/main.py
import os
import subprocess
from pathlib import Path

def detect_dependencies(repo_path: str) -> List[Dict[str, str]]:
    """Detecta dependencias en repositorios multi-lenguaje."""
    deps = []
    # Go
    if (Path(repo_path) / "go.mod").exists():
        cmd = "go list -m all"
        output = subprocess.check_output(cmd, cwd=repo_path, shell=True).decode()
        deps.extend([{"type": "library", "name": line.split()[0]} for line in output.splitlines() if line])
    # Rust
    if (Path(repo_path) / "Cargo.toml").exists():
        cmd = "cargo tree --depth 1"
        output = subprocess.check_output(cmd, cwd=repo_path, shell=True).decode()
        deps.extend([{"type": "library", "name": line.split()[0]} for line in output.splitlines() if line])
    return deps

# Ejemplo de uso:
deps = detect_dependencies("/repos/core-utils")
for dep in deps:
    print(f"Dependencia detectada: {dep['name']} (tipo: {dep['type']})")
Resultado esperado:
  • El recon agent actualiza repo_dependencies con relaciones como:
  {
    "from_repo": "api-gateway",
    "to_repo": "core-utils",
    "type": "library"
  }
  

5. Integración con el pipeline de CI/CD

Para que el harness escale, debe integrarse con los triggers de tus repositorios.

Ejemplo con GitHub Actions (.github/workflows/vuln-scan.yaml):
name: Vuln Scan
on:
  push:
    branches: [main]
  schedule:
    - cron: "0 6 * * *"  # Diario a las 6 AM UTC

jobs:
  scan:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - name: Run vulnerability harness
        run: |
          docker run --rm \
            -e REPO_PATH=$(pwd) \
            -e LLM_API_KEY=${{ secrets.LLM_API_KEY }} \
            vuln-orchestrator:latest
Resultado esperado:
  • Cada push a main dispara un escaneo incremental.
  • Hallazgos confirmados se envían a Jira via webhook (ej: Lambda que consume SQS).

Consideraciones y buenas prácticas

1. Límites de contexto y state externalization

  • Problema: Los modelos pierden contexto después de ~100K tokens. Solución: externalizá el estado en PostgreSQL y usá el modelo como engine sin estado.
  • Error común: Guardar el contexto en el prompt del modelo. Alternativa: Persistí hallazgos crudos y regenerá el contexto en cada iteración.

2. Deduplicación de falsos positivos

  • Problema: El mismo bug se reporta múltiples veces en el mismo repositorio. Solución:
– Usá un deduplicator que agrupe hallazgos por:

– Firma del código vulnerable (ej: hash SHA256 del bloque de código).

– Tipo de vulnerabilidad (ej: CWE-89: SQL Injection).

Código ejemplo (Python):

    from sqlalchemy import func
    def deduplicate_findings(db_session):
        duplicates = db_session.query(
            findings.c.raw_evidence["code_block"].astext,
            func.count(findings.c.id)
        ).group_by(
            findings.c.raw_evidence["code_block"].astext
        ).having(func.count(findings.c.id) > 1).all()
        for (code_block, count) in duplicates:
            db_session.query(findings).filter(
                findings.c.raw_evidence["code_block"].astext == code_block
            ).update({"status": "rejected"})
    

3. Escalabilidad con EKS

  • Horizontal Pod Autoscaler (HPA):
  apiVersion: autoscaling/v2
  kind: HorizontalPodAutoscaler
  metadata:
    name: vuln-harness-hpa
  spec:
    scaleTargetRef:
      apiVersion: apps/v1
      kind: Deployment
      name: vuln-orchestrator
    minReplicas: 2
    maxReplicas: 10
    metrics:
      - type: Resource
        resource:
          name: cpu
          target:
            type: Utilization
            averageUtilization: 70
  
  • Limitaciones:
– El modelo de LLM es el cuello de botella. Usá un pool de modelos (ej: alterná entre claude-3-5 y llama-3.2).

– Para repositorios >1GB, usa un sidecar que clone solo el directorio relevante (ej: sparse-checkout en Git).

4. Alternativas si no tenés EKS

  • GKE/AKS: Reemplazá los Deployment por equivalentes en GCP/Azure.
  • Local: Usá Docker Compose para desarrollo:
  version: "3.8"
  services:
    postgres:
      image: postgres:14
      environment:
        POSTGRES_PASSWORD: pass
    orchestrator:
      build: ./vuln_orchestrator
      depends_on: [postgres]
      environment:
        DB_URI: postgresql://user:pass@postgres:5432/vuln_db
  

Conclusión

Construir un harness de vulnerabilidades escalable requiere:

  1. Externalizar el estado (PostgreSQL) para evitar pérdida de contexto.
  2. Cross-validar hallazgos con modelos distintos (adversarial validation).
  3. Trazar dependencias entre repositorios para detectar bugs en interfaces críticas.
  4. Integrar con tu pipeline (GitHub Actions, Jenkins) para escaneos automáticos.

El código provisto es model-agnóstico por diseño: podés reemplazar el LLM sin modificar el orchestrator. Para equipos con >50 repositorios, recomendamos añadir:

  • Un deduplicator basado en hashes de código.
  • Un recon agent que escanee go.mod, package.json, etc.
  • Un webhook para enviar hallazgos a Jira/Slack automáticamente.

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *