📋 Descrição do Projeto
Pipeline completo de ETL para análise de dados de uma concessionária de veículos. O projeto extrai dados operacionais de vendas, clientes, produtos e categorias de um banco de produção PostgreSQL, realiza transformações com dbt seguindo arquitetura de camadas, e disponibiliza os dados em um Data Warehouse para visualização em Power BI.
Toda a orquestração é feita via Apache Airflow, permitindo agendamento automático, monitoramento e retry de falhas.
🏗️ Arquitetura do Pipeline
┌─────────────────┐
│ PostgreSQL │
│ (Produção) │
│ - vendas │
│ - clientes │
│ - produtos │
└────────┬────────┘
│ Extract
▼
┌─────────────────┐
│ Apache Airflow │
│ DAG Pipeline │
│ - Extração │
│ - Validação │
│ - Trigger dbt │
└────────┬────────┘
│ Load
▼
┌─────────────────┐
│ PostgreSQL │
│ (Data Warehouse)│
│ - staging │
└────────┬────────┘
│ Transform
▼
┌─────────────────┐
│ dbt │
│ - staging │
│ - dimensions │
│ - analytics │
└────────┬────────┘
│
▼
┌─────────────────┐
│ Power BI │
│ Dashboards │
│ - Vendas │
│ - Clientes │
│ - Produtos │
└─────────────────┘
Fluxo de Dados
- Extração: Airflow conecta ao banco de produção e extrai dados via SQL
- Carga: Dados brutos são carregados na camada staging do DW
- Transformação: dbt aplica transformações em 3 camadas (staging → dimensions → analytics)
- Visualização: Power BI consome as tabelas finais para dashboards
🛠️ Stack Utilizada
Orquestração
- Apache Airflow 2.5+ para orquestração
- DAGs com agendamento diário
- Retry automático em caso de falhas
Transformação
- dbt para transformações SQL
- Arquitetura de camadas (staging, dimensions, analytics)
- Testes de qualidade de dados
Armazenamento
- PostgreSQL como banco de origem
- PostgreSQL como Data Warehouse
- Schemas separados por camada
Visualização
- Power BI para dashboards
- Conexão direta com PostgreSQL
- Refresh automático de dados
💻 Exemplo de Código
DAG do Airflow
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data_team',
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'concessionaria_etl',
default_args=default_args,
description='Pipeline ETL Concessionária',
schedule_interval='0 2 * * *', # Diariamente às 2h
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:
extract_vendas = PostgresOperator(
task_id='extract_vendas',
postgres_conn_id='prod_db',
sql='sql/extract_vendas.sql'
)
run_dbt = BashOperator(
task_id='run_dbt',
bash_command='cd /opt/dbt && dbt run --models analytics'
)
extract_vendas >> run_dbt
Modelo dbt (Exemplo)
-- models/analytics/fact_vendas.sql
{{ config(
materialized='table',
schema='analytics'
) }}
WITH vendas_enriched AS (
SELECT
v.venda_id,
v.data_venda,
v.valor_total,
c.nome_cliente,
c.cidade,
p.modelo,
p.categoria
FROM {{ ref('stg_vendas') }} v
LEFT JOIN {{ ref('dim_clientes') }} c ON v.cliente_id = c.cliente_id
LEFT JOIN {{ ref('dim_produtos') }} p ON v.produto_id = p.produto_id
)
SELECT * FROM vendas_enriched
🎯 Desafios e Soluções
1. Performance na Extração de Dados
Desafio: Extração completa levava mais de 2 horas, impactando o banco de produção.
Solução: Implementei extração incremental baseada em timestamps, reduzindo o tempo para ~15 minutos e diminuindo a carga no banco de produção em 85%.
2. Qualidade de Dados
Desafio: Dados inconsistentes quebravam o pipeline (valores nulos, duplicatas).
Solução: Adicionei testes dbt para validar unicidade, não-nulidade e integridade referencial. Pipeline agora falha rápido com alertas claros.
3. Monitoramento e Alertas
Desafio: Falhas silenciosas não eram detectadas rapidamente.
Solução: Configurei callbacks do Airflow para enviar alertas por email em caso de falhas, com logs detalhados para troubleshooting.
📊 Resultados e Métricas
Tempo de Execução Diária
Registros Processados/Dia
Taxa de Sucesso
Impacto no Negócio
- Dashboards atualizados diariamente, permitindo tomada de decisão baseada em dados atuais
- Redução de 85% no tempo de geração de relatórios gerenciais
- Visibilidade em tempo real de vendas, estoque e performance de produtos
- Base sólida para expansão futura (ML, previsões, etc.)
💡 Principais Aprendizados
- ✓ Importância de testes de qualidade de dados desde o início do projeto
- ✓ Benefícios da extração incremental para reduzir carga no banco de origem
- ✓ Valor do monitoramento proativo e alertas em pipelines de produção
- ✓ Como estruturar transformações em camadas lógicas (staging → dimensions → analytics)