📋 Descrição do Projeto
Pipeline automatizado que coleta previsões meteorológicas de 7 dias para localidades específicas via API pública, armazena os dados em um Data Lake local, e os organiza para análises futuras.
O processo é completamente orquestrado pelo Apache Airflow, executando automaticamente toda segunda-feira às 8h, garantindo consistência e rastreabilidade dos dados climáticos ao longo do tempo.
🎯 Objetivos
Automação
Eliminar processos manuais de coleta, garantindo dados atualizados semanalmente sem intervenção humana.
Histórico
Construir uma base histórica de previsões para análises de acurácia e tendências climáticas ao longo do tempo.
Escalabilidade
Arquitetura modular que permite fácil expansão para novas localidades, métricas ou fontes de dados.
Aprendizado
Praticar conceitos de ETL, orquestração, consumo de APIs e armazenamento em Data Lake.
🏗️ Arquitetura do Pipeline
┌──────────────────┐
│ API Pública │
│ (Meteorologia) │
│ OpenWeather/ │
│ WeatherAPI │
└────────┬─────────┘
│ HTTP GET
▼
┌──────────────────┐
│ Apache Airflow │
│ - Scheduler │
│ - Extração │
│ - Validação │
│ - Transformação │
└────────┬─────────┘
│ Save JSON
▼
┌──────────────────┐
│ Data Lake │
│ (Local) │
│ /data/weather/ │
│ └── 2024-01-15/ │
│ └── data.json
└────────┬─────────┘
│ Read
▼
┌──────────────────┐
│ Pandas/Python │
│ Análises │
│ (Futuro: BI) │
└──────────────────┘
Fluxo Passo a Passo
- Agendamento: DAG executa toda segunda-feira às 8h UTC
- Extração: Requisição GET para API de previsão do tempo
- Validação: Verifica status 200 e estrutura JSON válida
- Transformação: Extrai campos relevantes e formata datas
- Armazenamento: Salva JSON particionado por data no Data Lake
- Logging: Registra status da execução para monitoramento
🛠️ Tecnologias Utilizadas
Orquestração
- Apache Airflow 2.3.2+
- DAG com schedule semanal
- PythonOperator para tasks customizadas
Linguagem
- Python 3.8+
- Requests para chamadas HTTP
- Pandas para manipulação de dados
Armazenamento
- Data Lake local (file system)
- Formato JSON
- Particionamento por data (YYYY-MM-DD)
API
- OpenWeatherMap API ou similar
- Endpoint de previsão de 7 dias
- Formato de resposta em JSON
💻 Exemplo de Código
DAG do Airflow
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import requests
import json
import os
default_args = {
'owner': 'data_team',
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
def extract_weather_data(**context):
"""Extrai previsão de 7 dias da API"""
API_KEY = os.getenv('WEATHER_API_KEY')
CITY = 'Sao Paulo'
BASE_URL = f'https://api.openweathermap.org/data/2.5/forecast'
params = {
'q': CITY,
'appid': API_KEY,
'units': 'metric',
'cnt': 56 # 7 dias * 8 previsões/dia
}
response = requests.get(BASE_URL, params=params)
response.raise_for_status()
return response.json()
def save_to_datalake(**context):
"""Salva dados no Data Lake com particionamento por data"""
ti = context['ti']
data = ti.xcom_pull(task_ids='extract_weather')
# Criar diretório com data atual
today = datetime.now().strftime('%Y-%m-%d')
data_dir = f'/opt/data_lake/weather/{today}'
os.makedirs(data_dir, exist_ok=True)
# Salvar JSON
filepath = f'{data_dir}/forecast.json'
with open(filepath, 'w') as f:
json.dump(data, f, indent=2)
print(f'Dados salvos em: {filepath}')
with DAG(
'weather_pipeline',
default_args=default_args,
description='Pipeline de dados meteorológicos',
schedule_interval='0 8 * * 1', # Segunda-feira 8h
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:
extract = PythonOperator(
task_id='extract_weather',
python_callable=extract_weather_data
)
save = PythonOperator(
task_id='save_to_datalake',
python_callable=save_to_datalake
)
extract >> save
Exemplo de Resposta da API
{
"city": {
"name": "São Paulo",
"country": "BR"
},
"list": [
{
"dt": 1704096000,
"main": {
"temp": 25.3,
"feels_like": 26.1,
"humidity": 72
},
"weather": [
{
"main": "Clouds",
"description": "poucas nuvens"
}
],
"wind": {
"speed": 3.5
},
"dt_txt": "2024-01-01 12:00:00"
}
// ... mais 55 registros
]
}
🎯 Desafios e Soluções
1. Rate Limiting da API
Desafio: API gratuita tem limite de chamadas/minuto.
Solução: Implementei retry com backoff exponencial e cache local para evitar chamadas redundantes no mesmo dia.
2. Gerenciamento de Credenciais
Desafio: API key não pode estar hardcoded no código.
Solução: Usei Airflow Variables e environment variables para gerenciar credenciais de forma segura.
3. Organização do Data Lake
Desafio: Como organizar dados para fácil recuperação futura?
Solução: Particionamento por data (YYYY-MM-DD) permite queries eficientes e limpeza de dados antigos.
📊 Resultados
Tempo de Execução
Previsões por Execução
Taxa de Sucesso
Possíveis Expansões
- Adicionar múltiplas cidades para comparação regional
- Integrar com PostgreSQL para queries SQL estruturadas
- Criar dashboard de visualização com histórico de temperatura
- Implementar alertas para condições climáticas extremas
- Análise de acurácia: comparar previsões com dados reais
💡 Principais Aprendizados
- ✓ Como consumir APIs REST de forma confiável com tratamento de erros
- ✓ Importância do particionamento de dados para organização e performance
- ✓ Uso de Airflow XComs para passar dados entre tasks
- ✓ Boas práticas de gerenciamento de credenciais em pipelines