ETL MongoDB → PostgreSQL


Contexto

En la organización donde trabajo, el sistema operativo corre sobre MongoDB: es donde los técnicos y laboratoristas cargan sus ensayos día a día. Funciona bien para escritura frecuente y esquemas que cambian, pero se vuelve incómodo cuando alguien pregunta cosas como “¿cuántos ensayos de resistencia hicimos el último semestre por tipo de probeta?”. Las queries de análisis sobre MongoDB son verbosas, frágiles y lentas.

La solución no era migrar todo a PostgreSQL — el sistema operativo seguiría usando MongoDB por razones válidas. La solución era construir una capa analítica separada: PostgreSQL como destino de solo lectura, sincronizado con MongoDB mediante un ETL automatizado.

El problema

Tres requisitos concretos:

  1. Latencia aceptable: los analistas querían datos del día actual para ciertos reportes, aunque no necesariamente en tiempo real.
  2. Confiabilidad: si el ETL fallaba un día, tenía que ser evidente y recuperable, no silencioso.
  3. Bajo costo de mantenimiento: no había presupuesto para una plataforma completa tipo Fivetran o Airbyte. Tenía que construirlo con herramientas open source y correr en la infraestructura existente.

Arquitectura

flowchart LR
    subgraph FUENTE["Fuente operativa"]
        MDB[(MongoDB)]
    end

    subgraph PROC["ETL Python · Docker"]
        direction TB
        SCH["APScheduler\norquestación"]
        VAL["Pydantic\nvalidación de tipos"]
        SCH --> VAL
    end

    subgraph DESTINO["Capa analítica"]
        PG[(PostgreSQL)]
    end

    FUENTE -->|"PyMongo\nlectura"| PROC
    PROC -->|"SQLAlchemy\nescritura"| DESTINO

Decisiones de arquitectura

Por qué APScheduler y no Airflow

Consideré Airflow, Dagster y Prefect. Los descarté porque:

  • Overhead operacional: Airflow por sí mismo es una base de datos, un scheduler y un webserver. Para correr ~12 DAGs por día, no justifica el costo de mantenerlo.
  • Curva de aprendizaje del equipo: quien hereda el proyecto después tiene que entender Airflow antes de modificar el ETL.

APScheduler corre dentro de un proceso Python simple. Un cron de reemplazo pero con lógica de reintentos y logs estructurados. Para la escala del problema, suficiente.

Por qué Pydantic en la capa de validación

Cada documento de MongoDB entra a un modelo Pydantic antes de insertarse en PostgreSQL. Eso hace dos cosas:

  1. Valida tipos: MongoDB acepta cualquier cosa. Si alguien guarda un string donde debería ir un número, el ETL lo captura y lo registra en vez de propagar el error.
  2. Documenta el schema: el modelo Pydantic es la especificación viva de qué esperas recibir. Reemplaza documentación externa que se desactualiza.

Por qué PostgreSQL y no BigQuery o Snowflake

Para el volumen (decenas de millones de filas, no billones), PostgreSQL es más que suficiente. Agrega PostGIS cuando el caso lo requiere. Sin costos de nube por consulta. Sin dependencia de proveedor.

Stack

  • Python 3.11 como lenguaje base
  • APScheduler para orquestación
  • Pydantic para validación de modelos
  • PyMongo para lectura de MongoDB
  • SQLAlchemy para escritura en PostgreSQL
  • Docker para despliegue reproducible

Aprendizajes

Si lo rehiciera hoy, cambiaría dos cosas:

  1. Agregar CDC (Change Data Capture) desde el inicio en vez de batch. Con MongoDB, habilitar change streams y consumirlos reduce latencia y uso de recursos.
  2. Tests de contrato entre el modelo Pydantic y PostgreSQL. Hoy si alguien cambia el schema de PostgreSQL pero no el modelo Pydantic, el error aparece en tiempo de ejecución. Con tests automatizados sería más robusto.

El aprendizaje más importante no es técnico: una capa analítica bien aislada cambia la conversación dentro del equipo. Cuando alguien pregunta algo sobre los datos, la respuesta deja de ser “dame dos días” y pasa a ser “corro esta query”.