Introducción
Un escáner de vulnerabilidades basado en modelos de IA falla cuando su contexto se llena, su estado se pierde en un reinicio o ignora dependencias entre repositorios. En equipos de seguridad reales, esto significa horas de trabajo perdido por errores de contexto, falsos positivos recurrentes o la imposibilidad de rastrear un bug desde su origen hasta su impacto en producción.
Este artículo detalla cómo construir tu propio harness de vulnerabilidades: un pipeline automatizado de múltiples etapas que externaliza el estado, cross-valida hallazgos con modelos distintos y escala sin limitarse a un solo repositorio. El enfoque es model-agnóstico —soporta cualquier LLM de frontera— y está diseñado para EKS, con ejemplos en Python para orquestación y Rust para componentes de alto rendimiento.
Qué es y para qué sirve
Un harness de vulnerabilidades es una arquitectura de orquestación que:
- Descubre bugs mediante hunters especializados por clase de ataque (ej: inyección SQL, deserialización insegura).
- Valida cada hallazgo con un adversarial validator que intenta refutarlo con evidencia opuesta.
- Trazas dependencias entre repositorios (ej: un bug en una librería
core-utilsimpacta enapi-gateway). - Persiste el estado en una base de datos para reiniciar escaneos sin perder contexto.
- Herramientas como CodeQL o Semgrep requieren parsing sintáctico por lenguaje.
- Nuestro harness no parsea código: usa el modelo como engine de inferencia sobre logs de ejecución, archivos de configuración y documentación. Esto lo hace agnóstico al lenguaje (funciona igual con Rust, Go o Terraform).
Prerequisitos
| Componente | Versión mínima | Notas |
|---|---|---|
| Kubernetes | 1.27+ | Cluster EKS con IAM Roles for Service Accounts (IRSA) activado. |
| Python | 3.11+ | Necesario para el orchestrator y hunters. Usá BLOCK13 para dependencias. |
| Rust | 1.70+ | Para el *validator adversarial* (alto rendimiento en parsing de logs). |
| PostgreSQL | 14+ | Base de datos para almacenar hallazgos, dependencias y estado. |
| Docker | 24+ | Para empaquetar hunters y validators como containers. |
| Modelos de LLM | Cualquier (ej: Claude 3.5, Llama 4) | Deben exponer una API con BLOCK14 y BLOCK15 (ej: Anthropic Messages API). |
| Permisos | – ClusterAdmin (EKS) | Para instalar operadores y CRDs. |
| – IAM: BLOCK16 | Para que el harness acceda a metadata de repositorios. |
- Repositorios privados en GitHub/GitLab con un token de máquina con
read:packagesyread:repo. - API key de un proveedor de LLM (ej: Anthropic, OpenAI o Mistral) con cuota suficiente para ~100 requests/hora por repositorio.
- Cuenta en AWS con permisos para crear IAM Roles for Service Accounts y Secrets Manager.
Guía paso a paso
1. Diseñá el modelo de datos para el harness
Problema: Necesitás persistir hallazgos, dependencias entre repositorios y el estado de cada hunt sin perder contexto al reiniciar.Solución: Usá un esquema relacional con estas tablas:-- Tablas en PostgreSQL
CREATE TABLE repos (
id SERIAL PRIMARY KEY,
path TEXT NOT NULL, -- ej: "github.com/empresa/core-utils"
last_scanned TIMESTAMPTZ,
enabled BOOLEAN DEFAULT TRUE
);
CREATE TABLE findings (
id SERIAL PRIMARY KEY,
repo_id INTEGER REFERENCES repos(id),
hunter_name TEXT NOT NULL, -- ej: "sql_injection_hunter"
raw_evidence JSONB NOT NULL, -- salida del hunter en formato estandarizado
status TEXT NOT NULL CHECK (status IN ('pending', 'validating', 'confirmed', 'rejected')),
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE TABLE repo_dependencies (
from_repo INTEGER REFERENCES repos(id),
to_repo INTEGER REFERENCES repos(id),
type TEXT CHECK (type IN ('library', 'api', 'config')),
PRIMARY KEY (from_repo, to_repo, type)
);
CREATE INDEX idx_findings_status ON findings(status);
CREATE INDEX idx_findings_repo ON findings(repo_id);Resultado esperado:reposalmacena todos los repositorios escaneados.findingsregistra cada hallazgo con su estado (pending→validating→confirmed).repo_dependenciestraza relaciones entre repositorios (ej: siapi-gatewayusacore-utils).
2. Implementá el orchestrator en Python
El orchestrator gestiona el ciclo de vida de cada hunt y delega tareas a los hunters (Python) y validators (Rust).
Archivo:vuln_orchestrator/main.pyimport asyncio
import json
from typing import Dict, List
from pathlib import Path
import httpx
from sqlalchemy import create_engine, select
from sqlalchemy.orm import Session
# Configuración
LLM_API = "https://api.anthropic.com/v1/messages"
LLM_KEY = "claude-3-5-sonnet-20241022"
DB_URI = "postgresql://user:pass@postgres:5432/vuln_db"
engine = create_engine(DB_URI)
client = httpx.AsyncClient(timeout=30.0)
async def run_hunter(
repo_path: str,
hunter_name: str,
context: Dict[str, str]
) -> Dict[str, str]:
"""Ejecuta un hunter específico y devuelve evidencia cruda."""
prompt = f"""
Eres un hunter de vulnerabilidades especializado en {hunter_name}.
Analizá el repositorio en {repo_path} con contexto:
{json.dumps(context, indent=2)}
"""
payload = {
"model": LLM_KEY,
"max_tokens": 4096,
"messages": [{"role": "user", "content": prompt}],
"tools": [{"name": "list_files", "description": "Lista archivos en el repo"}]
}
response = await client.post(LLM_API, headers={"x-api-key": "YOUR_KEY"}, json=payload)
response.raise_for_status()
return response.json()["content"][0]["text"]
async def triage_findings(db_session: Session) -> None:
"""Toma findings 'pending' y los envía a validación adversarial."""
findings = db_session.execute(
select(findings).where(findings.c.status == "pending")
).scalars().all()
for finding in findings:
# Ejemplo: Validación adversarial con un modelo distinto
validator_prompt = f"""
Eres un adversario. Buscá un falso positivo en este hallazgo:
{json.dumps(finding.raw_evidence, indent=2)}
"""
payload = {
"model": "llama-3.2-90b-vision",
"messages": [{"role": "user", "content": validator_prompt}]
}
response = await client.post(LLM_API, json=payload)
is_false_positive = "FALSO POSITIVO" in response.json()["content"][0]["text"]
finding.status = "confirmed" if not is_false_positive else "rejected"
db_session.commit()
async def main():
async with client:
with Session(engine) as db_session:
# 1. Seleccioná repositorios no escaneados recientemente
repos = db_session.execute(select(repos).where(repos.c.last_scanned < "2024-10-01")).scalars().all()
for repo in repos:
# 2. Ejecutá hunters por clase de ataque
hunters = ["sql_injection_hunter", "deserialization_hunter", "hardcoded_secrets_hunter"]
for hunter in hunters:
evidence = await run_hunter(repo.path, hunter, {"last_commit": "abc123"})
db_session.execute(
insert(findings).values(
repo_id=repo.id,
hunter_name=hunter,
raw_evidence=evidence,
status="pending"
)
)
repo.last_scanned = datetime.utcnow()
db_session.commit()
await triage_findings(db_session)
if __name__ == "__main__":
asyncio.run(main())Comandos para deployar en EKS:# 1. Build de contenedores
docker build -t vuln-orchestrator:latest -f Dockerfile.orchestrator .
docker build -t sql-injection-hunter:latest -f hunters/sql_injection/Dockerfile .
# 2. Push a ECR
aws ecr get-login-password | docker login --username AWS --password-stdin 123456789012.dkr.ecr.us-east-1.amazonaws.com
docker tag vuln-orchestrator:latest 123456789012.dkr.ecr.us-east-1.amazonaws.com/vuln-orchestrator:latest
docker push 123456789012.dkr.ecr.us-east-1.amazonaws.com/vuln-orchestrator:latest
# 3. Deploy en EKS
kubectl apply -f k8s/orchestrator-deployment.yamlResultado esperado:- El pod
vuln-orchestratorescanea repositorios cada 6 horas (configurable). - Cada hunter genera hallazgos en formato estandarizado (JSON con
file,line,evidence). - Los hallazgos pendientes pasan a validación adversarial con un modelo distinto.
3. Implementá el validator adversarial en Rust
Problema: Validar hallazgos manualmente es insostenible a escala. Necesitás un validator que refute cada finding con evidencia opuesta.Solución: Un microservicio en Rust que:- Recibe hallazgos en formato JSON.
- Usa un LLM para generar contra-evidencia.
- Devuelve un score de confianza (
0.0= falso positivo,1.0= confirmado).
vuln_validator/src/main.rsuse serde::{Deserialize, Serialize};
use reqwest::Client;
use tokio::time::{sleep, Duration};
#[derive(Debug, Serialize, Deserialize)]
struct Finding {
id: i32,
raw_evidence: serde_json::Value,
repo_path: String,
}
#[derive(Debug, Serialize, Deserialize)]
struct ValidationResult {
finding_id: i32,
is_false_positive: bool,
confidence: f32,
adversarial_evidence: String,
}
async fn validate_finding(finding: Finding) -> ValidationResult {
let client = Client::new();
let prompt = format!(
"Analizá este hallazgo de vulnerabilidad en {}:\n{:#?}\n\n\
Proporcioná contra-evidencia que demuestre que es un falso positivo.",
finding.repo_path, finding.raw_evidence
);
let response = client
.post("https://api.openai.com/v1/chat/completions")
.header("Authorization", "Bearer YOUR_KEY")
.json(&serde_json::json!({
"model": "gpt-4-1106-preview",
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 2048
}))
.send()
.await?
.json::<serde_json::Value>()
.await?;
let adversarial_evidence = response["choices"][0]["message"]["content"]
.as_str()
.unwrap_or("No se pudo generar contra-evidencia")
.to_string();
let is_false_positive = adversarial_evidence.contains("FALSO POSITIVO")
|| adversarial_evidence.contains("no es vulnerable");
ValidationResult {
finding_id: finding.id,
is_false_positive,
confidence: if is_false_positive { 0.0 } else { 1.0 },
adversarial_evidence,
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
loop {
// 1. Consumí mensajes de una cola (ej: RabbitMQ, SQS)
let finding: Finding = /* Deserializá desde la cola */;
let result = validate_finding(finding).await;
println!("Validación para finding {}: {:?}", result.finding_id, result);
sleep(Duration::from_secs(5)).await;
}
}Build y deploy en EKS:# 1. Compilá para Linux x86_64 (EKS usa amd64)
cross build --target x86_64-unknown-linux-musl --release
# 2. Build de contenedor
docker build -t vuln-validator:latest -f Dockerfile.validator .
# 3. Deploy
kubectl apply -f k8s/validator-deployment.yamlResultado esperado:- El validator procesa ~10 hallazgos por minuto (ajustable via
--concurrency). - Devuelve hallazgos confirmados a una cola de triage (ej: SQS → Lambda para notificaciones).
4. Configurá el trazado de dependencias entre repositorios
Problema: Un bug enlib-utils puede impactar en api-gateway, pero el modelo no lo detecta si analiza repositorios en forma aislada.Solución: Un recon agent que:- Escanea archivos de
go.mod,Cargo.toml,package.jsonpara detectar dependencias. - Actualiza la tabla
repo_dependencies.
vuln_recon/main.pyimport os
import subprocess
from pathlib import Path
def detect_dependencies(repo_path: str) -> List[Dict[str, str]]:
"""Detecta dependencias en repositorios multi-lenguaje."""
deps = []
# Go
if (Path(repo_path) / "go.mod").exists():
cmd = "go list -m all"
output = subprocess.check_output(cmd, cwd=repo_path, shell=True).decode()
deps.extend([{"type": "library", "name": line.split()[0]} for line in output.splitlines() if line])
# Rust
if (Path(repo_path) / "Cargo.toml").exists():
cmd = "cargo tree --depth 1"
output = subprocess.check_output(cmd, cwd=repo_path, shell=True).decode()
deps.extend([{"type": "library", "name": line.split()[0]} for line in output.splitlines() if line])
return deps
# Ejemplo de uso:
deps = detect_dependencies("/repos/core-utils")
for dep in deps:
print(f"Dependencia detectada: {dep['name']} (tipo: {dep['type']})")Resultado esperado:- El recon agent actualiza
repo_dependenciescon relaciones como:
{
"from_repo": "api-gateway",
"to_repo": "core-utils",
"type": "library"
}
5. Integración con el pipeline de CI/CD
Para que el harness escale, debe integrarse con los triggers de tus repositorios.
Ejemplo con GitHub Actions (.github/workflows/vuln-scan.yaml):name: Vuln Scan
on:
push:
branches: [main]
schedule:
- cron: "0 6 * * *" # Diario a las 6 AM UTC
jobs:
scan:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Run vulnerability harness
run: |
docker run --rm \
-e REPO_PATH=$(pwd) \
-e LLM_API_KEY=${{ secrets.LLM_API_KEY }} \
vuln-orchestrator:latestResultado esperado:- Cada
pushamaindispara un escaneo incremental. - Hallazgos confirmados se envían a Jira via webhook (ej: Lambda que consume SQS).
Consideraciones y buenas prácticas
1. Límites de contexto y state externalization
- Problema: Los modelos pierden contexto después de ~100K tokens. Solución: externalizá el estado en PostgreSQL y usá el modelo como engine sin estado.
- Error común: Guardar el contexto en el prompt del modelo. Alternativa: Persistí hallazgos crudos y regenerá el contexto en cada iteración.
2. Deduplicación de falsos positivos
- Problema: El mismo bug se reporta múltiples veces en el mismo repositorio. Solución:
– Firma del código vulnerable (ej: hash SHA256 del bloque de código).
– Tipo de vulnerabilidad (ej: CWE-89: SQL Injection).
– Código ejemplo (Python):
from sqlalchemy import func
def deduplicate_findings(db_session):
duplicates = db_session.query(
findings.c.raw_evidence["code_block"].astext,
func.count(findings.c.id)
).group_by(
findings.c.raw_evidence["code_block"].astext
).having(func.count(findings.c.id) > 1).all()
for (code_block, count) in duplicates:
db_session.query(findings).filter(
findings.c.raw_evidence["code_block"].astext == code_block
).update({"status": "rejected"})
3. Escalabilidad con EKS
- Horizontal Pod Autoscaler (HPA):
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: vuln-harness-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: vuln-orchestrator
minReplicas: 2
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- Limitaciones:
claude-3-5 y llama-3.2).– Para repositorios >1GB, usa un sidecar que clone solo el directorio relevante (ej: sparse-checkout en Git).
4. Alternativas si no tenés EKS
- GKE/AKS: Reemplazá los
Deploymentpor equivalentes en GCP/Azure. - Local: Usá Docker Compose para desarrollo:
version: "3.8"
services:
postgres:
image: postgres:14
environment:
POSTGRES_PASSWORD: pass
orchestrator:
build: ./vuln_orchestrator
depends_on: [postgres]
environment:
DB_URI: postgresql://user:pass@postgres:5432/vuln_db
Conclusión
Construir un harness de vulnerabilidades escalable requiere:
- Externalizar el estado (PostgreSQL) para evitar pérdida de contexto.
- Cross-validar hallazgos con modelos distintos (adversarial validation).
- Trazar dependencias entre repositorios para detectar bugs en interfaces críticas.
- Integrar con tu pipeline (GitHub Actions, Jenkins) para escaneos automáticos.
El código provisto es model-agnóstico por diseño: podés reemplazar el LLM sin modificar el orchestrator. Para equipos con >50 repositorios, recomendamos añadir:
- Un deduplicator basado en hashes de código.
- Un recon agent que escanee
go.mod,package.json, etc. - Un webhook para enviar hallazgos a Jira/Slack automáticamente.
