🚗 Pipeline Concessionária

Airflow + dbt + PostgreSQL + Power BI

⬅ Voltar ao Portfolio
Apache Airflow dbt PostgreSQL Power BI Python ETL

📋 Descrição do Projeto

Pipeline completo de ETL para análise de dados de uma concessionária de veículos. O projeto extrai dados operacionais de vendas, clientes, produtos e categorias de um banco de produção PostgreSQL, realiza transformações com dbt seguindo arquitetura de camadas, e disponibiliza os dados em um Data Warehouse para visualização em Power BI.

Toda a orquestração é feita via Apache Airflow, permitindo agendamento automático, monitoramento e retry de falhas.

🏗️ Arquitetura do Pipeline


┌─────────────────┐
│  PostgreSQL     │
│  (Produção)     │
│  - vendas       │
│  - clientes     │
│  - produtos     │
└────────┬────────┘
         │ Extract
         ▼
┌─────────────────┐
│  Apache Airflow │
│  DAG Pipeline   │
│  - Extração     │
│  - Validação    │
│  - Trigger dbt  │
└────────┬────────┘
         │ Load
         ▼
┌─────────────────┐
│  PostgreSQL     │
│  (Data Warehouse)│
│  - staging      │
└────────┬────────┘
         │ Transform
         ▼
┌─────────────────┐
│      dbt        │
│  - staging      │
│  - dimensions   │
│  - analytics    │
└────────┬────────┘
         │
         ▼
┌─────────────────┐
│   Power BI      │
│  Dashboards     │
│  - Vendas       │
│  - Clientes     │
│  - Produtos     │
└─────────────────┘
        

Fluxo de Dados

  1. Extração: Airflow conecta ao banco de produção e extrai dados via SQL
  2. Carga: Dados brutos são carregados na camada staging do DW
  3. Transformação: dbt aplica transformações em 3 camadas (staging → dimensions → analytics)
  4. Visualização: Power BI consome as tabelas finais para dashboards

🛠️ Stack Utilizada

Orquestração

  • Apache Airflow 2.5+ para orquestração
  • DAGs com agendamento diário
  • Retry automático em caso de falhas

Transformação

  • dbt para transformações SQL
  • Arquitetura de camadas (staging, dimensions, analytics)
  • Testes de qualidade de dados

Armazenamento

  • PostgreSQL como banco de origem
  • PostgreSQL como Data Warehouse
  • Schemas separados por camada

Visualização

  • Power BI para dashboards
  • Conexão direta com PostgreSQL
  • Refresh automático de dados

💻 Exemplo de Código

DAG do Airflow

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'data_team',
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    'concessionaria_etl',
    default_args=default_args,
    description='Pipeline ETL Concessionária',
    schedule_interval='0 2 * * *',  # Diariamente às 2h
    start_date=datetime(2024, 1, 1),
    catchup=False,
) as dag:
    
    extract_vendas = PostgresOperator(
        task_id='extract_vendas',
        postgres_conn_id='prod_db',
        sql='sql/extract_vendas.sql'
    )
    
    run_dbt = BashOperator(
        task_id='run_dbt',
        bash_command='cd /opt/dbt && dbt run --models analytics'
    )
    
    extract_vendas >> run_dbt

Modelo dbt (Exemplo)

-- models/analytics/fact_vendas.sql
{{ config(
    materialized='table',
    schema='analytics'
) }}

WITH vendas_enriched AS (
    SELECT 
        v.venda_id,
        v.data_venda,
        v.valor_total,
        c.nome_cliente,
        c.cidade,
        p.modelo,
        p.categoria
    FROM {{ ref('stg_vendas') }} v
    LEFT JOIN {{ ref('dim_clientes') }} c ON v.cliente_id = c.cliente_id
    LEFT JOIN {{ ref('dim_produtos') }} p ON v.produto_id = p.produto_id
)

SELECT * FROM vendas_enriched

🎯 Desafios e Soluções

1. Performance na Extração de Dados

Desafio: Extração completa levava mais de 2 horas, impactando o banco de produção.

Solução: Implementei extração incremental baseada em timestamps, reduzindo o tempo para ~15 minutos e diminuindo a carga no banco de produção em 85%.

2. Qualidade de Dados

Desafio: Dados inconsistentes quebravam o pipeline (valores nulos, duplicatas).

Solução: Adicionei testes dbt para validar unicidade, não-nulidade e integridade referencial. Pipeline agora falha rápido com alertas claros.

3. Monitoramento e Alertas

Desafio: Falhas silenciosas não eram detectadas rapidamente.

Solução: Configurei callbacks do Airflow para enviar alertas por email em caso de falhas, com logs detalhados para troubleshooting.

📊 Resultados e Métricas

~15min

Tempo de Execução Diária

500k+

Registros Processados/Dia

99.5%

Taxa de Sucesso

Impacto no Negócio

💡 Principais Aprendizados