Uso de Operadores PythonOperator y BashOperator en ETL

Lectura
20 min~4 min lectura
Objetivo de la lección

Concepto clave En Apache Airflow, los operadores son los componentes fundamentales que ejecutan tareas especificas dentro de un DAG.

Puntos de control
  • Concepto clave
  • Como funciona en la practica
  • Paso 1: Definir el DAG
  • Paso 2: Agregar tareas con operadores

Concepto clave

En Apache Airflow, los operadores son los componentes fundamentales que ejecutan tareas especificas dentro de un DAG. Piensalos como las herramientas en una caja de un ingeniero de datos: cada una tiene una funcion unica. El PythonOperator te permite ejecutar codigo Python arbitrario, ideal para transformaciones de datos complejas o integraciones con APIs. El BashOperator ejecuta comandos de shell, perfecto para mover archivos, ejecutar scripts externos o interactuar con sistemas de archivos.

Una analogia del mundo real seria un restaurante automatizado. El BashOperator seria como el robot que trae ingredientes crudos de la nevera (ejecutando comandos como cp o wget). El PythonOperator seria el chef que cocina esos ingredientes (transformando datos con pandas o logica de negocio). Juntos, orquestan la preparacion de un plato completo (tu pipeline ETL).

Como funciona en la practica

Veamos un ejemplo paso a paso de un DAG simple que usa ambos operadores. Supongamos que necesitas descargar un archivo CSV, procesarlo y guardar los resultados.

Paso 1: Definir el DAG

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime

def procesar_datos():
    import pandas as pd
    df = pd.read_csv('/tmp/datos.csv')
    df['nueva_columna'] = df['valor'] * 2
    df.to_csv('/tmp/datos_procesados.csv', index=False)

default_args = {
    'owner': 'data_engineer',
    'start_date': datetime(2023, 1, 1)
}

dag = DAG(
    'etl_ejemplo',
    default_args=default_args,
    schedule_interval='@daily'
)

Paso 2: Agregar tareas con operadores

descargar = BashOperator(
    task_id='descargar_archivo',
    bash_command='wget -O /tmp/datos.csv https://ejemplo.com/datos.csv',
    dag=dag
)

procesar = PythonOperator(
    task_id='procesar_datos',
    python_callable=procesar_datos,
    dag=dag
)

guardar = BashOperator(
    task_id='guardar_resultados',
    bash_command='aws s3 cp /tmp/datos_procesados.csv s3://bucket/datos/',
    dag=dag
)

Paso 3: Establecer dependencias

descargar >> procesar >> guardar

Este DAG ejecutara secuencialmente: descarga un archivo, lo procesa en Python y sube el resultado a S3.

Caso de estudio

Imagina que trabajas para un e-commerce que necesita actualizar diariamente las recomendaciones de productos. Tu pipeline ETL debe:

  1. Descargar logs de ventas del dia anterior desde un servidor SFTP.
  2. Procesar los logs para calcular productos mas vendidos.
  3. Actualizar una base de datos de recomendaciones.

Implementacion con operadores:

TareaOperadorImplementacion
Descargar logsBashOperatorsftp usuario@servidor:/logs/ventas_$(date -d yesterday +%Y%m%d).csv /tmp/
Procesar datosPythonOperatorFuncion Python que lee el CSV, agrupa por producto y calcula metricas
Actualizar DBPythonOperatorFuncion que usa SQLAlchemy para insertar en PostgreSQL
En produccion, siempre usa variables de Airflow para credenciales y rutas, nunca las codifiques directamente.

Errores comunes

  • No manejar errores en PythonOperator: Si tu funcion Python lanza una excepcion no capturada, la tarea fallara. Siempre incluye try-except y logging.
  • BashOperator con comandos complejos sin pruebas: Un comando bash con errores de sintaxis fallara silenciosamente. Prueba primero en una terminal.
  • Olvidar dependencias de Python: Si tu funcion PythonOperator usa pandas, asegurate de que este instalado en el entorno de Airflow.
  • No limpiar archivos temporales: BashOperator puede crear archivos en /tmp que acumulan espacio. Agrega tareas de limpieza o usa directorios con timestamp.
  • Mal manejo de fechas en BashOperator: Usar $(date) directamente puede causar inconsistencias. Mejor usa las macros de Airflow como {{ ds }}.

Checklist de dominio

  1. Puedo crear un DAG que combine al menos un PythonOperator y un BashOperator con dependencias.
  2. Se como pasar parametros entre tareas usando XComs cuando es necesario.
  3. Entiendo cuando usar cada operador: Bash para comandos de sistema, Python para logica compleja.
  4. Puedo manejar errores en ambos operadores (excepciones en Python, exit codes en Bash).
  5. Se configurar el entorno para que PythonOperator tenga acceso a las librerias necesarias.
  6. Puedo usar variables de Airflow en lugar de valores hardcodeados en los operadores.
  7. Entiendo el impacto en rendimiento: PythonOperator en el scheduler vs BashOperator en workers.

Construye un pipeline ETL para datos meteorologicos

Crea un DAG que automatice la descarga y procesamiento de datos meteorologicos diarios.

  1. Define un DAG llamado etl_clima que se ejecute diariamente.
  2. Agrega una tarea con BashOperator que descargue un archivo CSV desde https://datos-ejemplo.com/clima_$(date +%Y-%m-%d).csv a /tmp/clima.csv.
  3. Agrega una tarea con PythonOperator que:
    • Lea el archivo CSV descargado
    • Calcule la temperatura promedio del dia
    • Guarde el resultado en un archivo JSON en /tmp/clima_promedio.json
  4. Agrega una tarea final con BashOperator que mueva el archivo JSON a un directorio de backup: /backup/clima/.
  5. Establece las dependencias para que las tareas se ejecuten en orden.
  6. Prueba tu DAG ejecutandolo manualmente desde la UI de Airflow.
Pistas
  • Usa pandas en la funcion Python para leer el CSV y calcular promedios
  • Asegurate de que el directorio /backup/clima/ exista antes de ejecutar
  • Considera usar try-except en tu funcion Python para manejar archivos faltantes

Evalua tu comprension

Completa el quiz interactivo de arriba para ganar XP.

Laboratorio de práctica

Antes de marcar esta lección como completa, escribí una evidencia breve para Apache Airflow: Construcción de Pipelines ETL Automatizados con DAGs: un ejemplo, una decisión, una captura, una mini demo o una nota que puedas reutilizar en portfolio.

Reflexión rápida

¿Qué cambiarías en tu forma de trabajar después de aplicar uso de operadores pythonoperator y bashoperator en etl?

De lección a portfolio

Convertí esta lección en evidencia para Data Analyst.

Sumá un mini caso con datos, una conclusión de negocio y una captura del resultado. Eso pesa más que decir que viste la herramienta.

Paso 1

Publicá una consulta, dashboard o notebook con una conclusión clara.

Paso 2

Agregá contexto: problema, dato usado, decisión recomendada y limitación.

Paso 3

Guardá el enlace en tu CV, LinkedIn o portfolio antes de postular.

Newsletter Cursalo

Recibí rutas y cursos nuevos

Sumate para recibir recursos orientados a empleo y portfolio.

  • Rutas de empleo
  • Cursos prácticos
  • Portfolio y entrevistas

Sin spam. También podés entrar con tu cuenta para guardar progreso. Iniciá sesión

Uso de Operadores PythonOperator y BashOperator... | CursaloFalar no WhatsApp