🧊 Data Lake com Spark

PySpark + Arquitetura Medallion + Particionamento

⬅ Voltar ao Portfolio
Apache Spark PySpark Data Lake Parquet Medallion Python

📋 Descrição do Projeto

Implementação de um Data Lake utilizando Apache Spark (PySpark) seguindo a arquitetura Medallion com três camadas distintas: Bronze (raw), Silver (curated) e Gold (analytics).

O projeto demonstra o fluxo completo de ingestão, transformação progressiva e otimização de dados, com particionamento inteligente e armazenamento em formato Parquet para alta performance em queries analíticas.

🏗️ Arquitetura Medallion


┌─────────────────────────────────────────────────────┐
│                   FONTE DE DADOS                    │
│          CSV / JSON / APIs / Databases              │
└──────────────────────┬──────────────────────────────┘
                       │
                       ▼
        ┌──────────────────────────────┐
        │     🥉 CAMADA BRONZE         │
        │  (Dados Brutos / Raw Data)   │
        │                              │
        │  - Dados como recebidos      │
        │  - Sem transformações        │
        │  - Schema-on-read            │
        │  - Formato: Parquet          │
        │  - Partição: ano/mes/dia     │
        └──────────────┬───────────────┘
                       │ PySpark
                       │ Limpeza,
                       │ Validação
                       ▼
        ┌──────────────────────────────┐
        │     🥈 CAMADA SILVER         │
        │ (Dados Limpos / Curated)     │
        │                              │
        │  - Dados limpos              │
        │  - Schema definido           │
        │  - Deduplicação              │
        │  - Tipo de dados corretos    │
        │  - Formato: Parquet          │
        │  - Partição: ano/mes         │
        └──────────────┬───────────────┘
                       │ PySpark  
                       │ Agregações,
                       │ Joins
                       ▼
        ┌──────────────────────────────┐
        │     🥇 CAMADA GOLD           │
        │  (Dados Analytics-Ready)     │
        │                              │
        │  - Dados agregados           │
        │  - Business metrics          │
        │  - Otimizado para BI         │
        │  - Desnormalizado            │
        │  - Formato: Parquet          │
        │  - Partição: categoria       │
        └──────────────┬───────────────┘
                       │
                       ▼
        ┌──────────────────────────────┐
        │    CONSUMO                   │
        │  BI Tools / ML / Analytics   │
        └──────────────────────────────┘
        

Detalhamento das Camadas

🥉 Bronze - Raw Data

Dados brutos exatamente como recebidos da fonte, sem nenhuma transformação.

  • Preserva dados originais para auditoria e reprocessamento
  • Schema-on-read: estrutura flexível
  • Particionamento temporal para otimizar leitura

🥈 Silver - Curated Data

Dados limpos, validados e estruturados, prontos para uso confiável.

  • Remoção de duplicatas e dados inválidos
  • Padronização de tipos e formatos
  • Enrichment com dados de referência

🥇 Gold - Analytics-Ready

Dados agregados e otimizados para análises de negócio e BI.

  • Métricas calculadas (KPIs, agregações)
  • Desnormalização para performance
  • Otimizado para ferramentas de visualização

🛠️ Stack Utilizada

Processamento

  • Apache Spark 3.x
  • PySpark para transformações
  • Spark SQL para queries

Armazenamento

  • Data Lake local (HDFS ou S3)
  • Formato Parquet (colunar)
  • Compressão Snappy

Otimização

  • Particionamento inteligente
  • Bucketing para joins eficientes
  • Cache para dados frequentes

Qualidade

  • Validação de schema
  • Detecção de anomalias
  • Logging de transformações

💻 Exemplos de Código

Camada Bronze - Ingestão

from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp, year, month, dayofmonth

# Inicializar Spark
spark = SparkSession.builder \
    .appName("DataLake_Bronze") \
    .getOrCreate()

# Ler dados brutos (CSV)
df_raw = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("s3://input/vendas_*.csv")

# Adicionar metadata
df_bronze = df_raw \
    .withColumn("ingestion_timestamp", current_timestamp()) \
    .withColumn("year", year("data_venda")) \
    .withColumn("month", month("data_venda")) \
    .withColumn("day", dayofmonth("data_venda"))

# Escrever em Bronze (particionado)
df_bronze.write \
    .mode("append") \
    .partitionBy("year", "month", "day") \
    .parquet("s3://datalake/bronze/vendas")

Camada Silver - Limpeza e Curação

from pyspark.sql.functions import col, trim, upper, when

# Ler da Bronze
df_bronze = spark.read.parquet("s3://datalake/bronze/vendas")

# Limpeza e transformação
df_silver = df_bronze \
    .dropDuplicates(["id_venda"]) \
    .filter(col("valor_total") > 0) \
    .filter(col("data_venda").isNotNull()) \
    .withColumn("nome_cliente", trim(upper(col("nome_cliente")))) \
    .withColumn("status", when(col("status").isNull(), "PENDENTE")
                          .otherwise(col("status"))) \
    .withColumn("processing_timestamp", current_timestamp())

# Validação de schema
expected_columns = ["id_venda", "data_venda", "valor_total", "nome_cliente"]
assert all(col in df_silver.columns for col in expected_columns)

# Escrever em Silver
df_silver.write \
    .mode("overwrite") \
    .partitionBy("year", "month") \
    .parquet("s3://datalake/silver/vendas")

Camada Gold - Agregações Analytics

from pyspark.sql.functions import sum, avg, count, date_trunc

# Ler da Silver
df_silver = spark.read.parquet("s3://datalake/silver/vendas")

# Agregações para analytics
df_gold = df_silver \
    .withColumn("mes", date_trunc("month", col("data_venda"))) \
    .groupBy("mes", "categoria_produto") \
    .agg(
        sum("valor_total").alias("receita_total"),
        avg("valor_total").alias("ticket_medio"),
        count("id_venda").alias("qtd_vendas"),
        count("nome_cliente").alias("qtd_clientes_unicos")
    ) \
    .withColumn("created_at", current_timestamp())

# Escrever em Gold (otimizado para BI)
df_gold.write \
    .mode("overwrite") \
    .partitionBy("categoria_produto") \
    .format("parquet") \
    .option("compression", "snappy") \
    .save("s3://datalake/gold/vendas_mensais")

📂 Estratégia de Particionamento


datalake/
├── bronze/
│   └── vendas/
│       ├── year=2024/
│       │   ├── month=01/
│       │   │   ├── day=01/
│       │   │   │   └── part-00000.parquet
│       │   │   └── day=02/
│       ├── year=2023/
│       │   └── month=12/
├── silver/
│   └── vendas/
│       ├── year=2024/
│       │   └── month=01/
│       │       └── part-00000.parquet
├── gold/
    └── vendas_mensais/
        ├── categoria=eletronicos/
        │   └── part-00000.parquet
        └── categoria=moveis/
            └── part-00000.parquet
        

Benefícios do Particionamento

🎯 Desafios e Soluções

1. Small Files Problem

Desafio: Muitos arquivos pequenos degradam performance.

Solução: Implementei coalesce() e repartition() estratégico, além de compactação periódica de arquivos antigos.

2. Data Skew

Desafio: Partições desbalanceadas causando processamento lento.

Solução: Usei salting technique e adaptive query execution (AQE) do Spark 3.0 para balancear carga.

3. Schema Evolution

Desafio: Mudanças no schema de origem quebravam o pipeline.

Solução: Habilitei mergeSchema e implementei validação de schema na camada Bronze para detectar mudanças precocemente.

📊 Performance e Resultados

5GB

Dados Processados

95%

Redução em Query Time

3x

Mais Rápido que CSV

Comparação de Performance

Operação
CSV (baseline)
Parquet Particionado
Full Scan
45s
12s ⚡
Filter by Date
38s
2s ⚡⚡⚡
Aggregation
52s
8s ⚡⚡

💡 Principais Aprendizados