Acesse o painel da sua conta

Não tem uma conta? Registrar

Entrar em contato

Visite também nosso site craftxp.com.br

  • img
  • img
  • img
  • img
  • img
  • img

Entre em contato

Apache Airflow: Orquestração de Pipelines de Dados com Python

Apache Airflow: Orquestração de Pipelines de Dados com Python

O que é Apache Airflow?

O Apache Airflow é uma plataforma open-source para orquestração, agendamento e monitoramento de workflows computacionais. Criado pela Airbnb em 2014 e doado à Apache Foundation, o Airflow tornou-se o padrão da indústria para construção e gerenciamento de pipelines de dados.

Diferente de ferramentas ETL tradicionais que operam em lote, o Airflow representa workflows como DAGs (Directed Acyclic Graphs), onde cada nó é uma tarefa e as arestas definem as dependências. Isso permite visualizar, depurar e escalar pipelines complexas com facilidade.

Conceitos Fundamentais

DAG (Directed Acyclic Graph)

Uma DAG é um conjunto de tarefas organizadas com dependências e relações. O Airflow executa as tarefas na ordem definida, respeitando as dependências — e como o nome indica, não permite ciclos, garantindo que o workflow sempre termine.

Operators

Operators determinam o que cada tarefa faz. O Airflow ofere dezenas de operators prontos:

  • PythonOperator: executa funções Python
  • BashOperator: executa comandos shell
  • PostgresOperator: executa queries SQL no PostgreSQL
  • S3FileTransformOperator: transforma arquivos no S3
  • EmailOperator: envia e-mails
  • DockerOperator: executa tarefas dentro de containers Docker
  • KubernetesPodOperator: executa tarefas como Pods no Kubernetes

Tasks e Task Instances

Uma task é a definição de um trabalho (ex.: "rodar script de limpeza"). Uma task instance representa uma execução específica dessa task em um momento no tempo, com seu próprio status, logs e duração.

Instalação e Setup

A forma mais simples de começar é via Docker Compose. O Airflow oficial fornece um arquivo docker-compose.yaml completo:

curl -LfO 'https://airflow.apache.org/docs/apache-airflow/stable/docker-compose.yaml'
mkdir -p ./dags ./logs ./plugins ./config
echo -e "AIRFLOW_UID=$(id -u)\nAIRFLOW_GID=0" > .env
docker compose up -d

Após o setup, acesse a interface web em http://localhost:8080 (usuário: airflow, senha: airflow).

Sua Primeira DAG

Crie o arquivo dags/meu_primeiro_dag.py:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator

default_args = {
    'owner': 'data_team',
    'depends_on_past': False,
    'email_on_failure': True,
    'email': ['alerta@exemplo.com'],
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
}

def extrair_dados():
    """Simula extração de dados de uma API"""
    print("Extraindo dados da fonte...")
    return {"status": "sucesso", "registros": 1500}

def transformar_dados(ti):
    """Transforma os dados extraídos"""
    dados = ti.xcom_pull(task_ids='extrair')
    print(f"Transformando {dados['registros']} registros...")
    ti.xcom_push(key='total_transformado', value=dados['registros'] * 2)

with DAG(
    'meu_primeiro_etl',
    default_args=default_args,
    description='Pipeline ETL simples de exemplo',
    schedule_interval='0 8 * * *',  # Todo dia às 8h
    start_date=datetime(2026, 1, 1),
    catchup=False,
    tags=['exemplo', 'etl'],
) as dag:

    extracao = PythonOperator(
        task_id='extrair',
        python_callable=extrair_dados
    )

    transformacao = PythonOperator(
        task_id='transformar',
        python_callable=transformar_dados
    )

    carregar = BashOperator(
        task_id='carregar',
        bash_command='echo "Carregando dados no banco..." && sleep 2'
    )

    notificar = EmailOperator(
        task_id='notificar',
        to='equipe@exemplo.com',
        subject='Pipeline ETL concluída',
        html_content='

Pipeline executada com sucesso!

' ) extracao >> transformacao >> carregar >> notificar

Este exemplo demonstra:

  • Comunicação entre tasks via XComs
  • Configuração de retentativas e alertas
  • Agendamento com cron expression
  • Encadeamento de tarefas com o operador >>
  • Diferentes tipos de operators em uma mesma DAG

XComs: Compartilhando Dados entre Tasks

O Airflow permite que tasks troquem dados através do mecanismo de XComs (cross-communications). Uma task pode enviar dados com ti.xcom_push() e outra pode recebê-los com ti.xcom_pull():

def task_a(ti):
    ti.xcom_push(key='valor', value=42)

def task_b(ti):
    valor = ti.xcom_pull(task_ids='task_a', key='valor')
    print(f"Recebido: {valor}")  # 42

Importante: XComs são armazenados no banco de dados do Airflow, então evite transferir grandes volumes de dados — prefira salvar em um data lake e passar apenas o caminho do arquivo.

Melhores Práticas

  1. Tasks atômicas e idempotentes: cada task deve fazer apenas uma coisa e poder ser reexecutada sem efeitos colaterais
  2. Use conexões e variáveis: nunca hardcode credenciais ou URLs — configure via Admin > Connections
  3. Teste suas DAGs: utilize airflow dags test antes de ativar uma DAG em produção
  4. Backfill consciente: configure catchup=False para evitar que execuções passadas disparem automaticamente
  5. Timeout em tasks: sempre defina execution_timeout para evitar tasks que nunca terminam
  6. Logs centralizados: configure o Airflow para enviar logs para um sistema externo (S3, GCS, Elasticsearch)
  7. Pool de recursos: use Pools para limitar o número de execuções simultâneas de tasks que consomem muitos recursos

Monitoramento e Observabilidade

A interface web do Airflow oferece:

  • Grid View: visão geral de todas as execuções com código de cores por status
  • Graph View: visualização da DAG com dependências entre tasks
  • Gantt Chart: timeline de execução de cada task para identificar gargalos
  • Code View: visualização do código fonte da DAG
  • Logs detalhados: logs completos de cada tentativa de execução

Para alertas avançados, integre o Airflow com Slack, PagerDuty ou um webhook personalizado.

Airflow no Ecossistema de Dados

O Airflow se posiciona como o orquestrador central em arquiteturas modernas de dados, coordenando as seguintes ferramentas:

  • Extraction: Airbyte, Fivetran, Stitch
  • Storage: S3, GCS, Azure Blob, HDFS
  • Processing: Spark, dbt, EMR, Databricks
  • Warehousing: Snowflake, BigQuery, Redshift
  • BI: Metabase, Tableau, Power BI, Superset

Conclusão

O Apache Airflow é a espinha dorsal de pipelines de dados modernas, oferecendo flexibilidade, escalabilidade e uma comunidade ativa. Se você trabalha com engenharia de dados, ciência de dados ou DevOps, dominar o Airflow é um diferencial competitivo enorme.

Comece com DAGs simples, explore os operators disponíveis e gradualmente construa pipelines mais complexas. Com o tempo, você terá um ecossistema de dados totalmente automatizado e observável.

Craft XP
Craft XP