📊 API de Previsão do Tempo

Airflow + Python + Data Lake + Automação

⬅ Voltar ao Portfolio
Apache Airflow Python API REST Data Lake Pandas JSON

📋 Descrição do Projeto

Pipeline automatizado que coleta previsões meteorológicas de 7 dias para localidades específicas via API pública, armazena os dados em um Data Lake local, e os organiza para análises futuras.

O processo é completamente orquestrado pelo Apache Airflow, executando automaticamente toda segunda-feira às 8h, garantindo consistência e rastreabilidade dos dados climáticos ao longo do tempo.

🎯 Objetivos

Automação

Eliminar processos manuais de coleta, garantindo dados atualizados semanalmente sem intervenção humana.

Histórico

Construir uma base histórica de previsões para análises de acurácia e tendências climáticas ao longo do tempo.

Escalabilidade

Arquitetura modular que permite fácil expansão para novas localidades, métricas ou fontes de dados.

Aprendizado

Praticar conceitos de ETL, orquestração, consumo de APIs e armazenamento em Data Lake.

🏗️ Arquitetura do Pipeline


┌──────────────────┐
│   API Pública    │
│  (Meteorologia)  │
│  OpenWeather/    │
│  WeatherAPI      │
└────────┬─────────┘
         │ HTTP GET
         ▼
┌──────────────────┐
│ Apache Airflow   │
│  - Scheduler     │
│  - Extração      │
│  - Validação     │
│  - Transformação │
└────────┬─────────┘
         │ Save JSON
         ▼
┌──────────────────┐
│   Data Lake      │
│    (Local)       │
│ /data/weather/   │
│  └── 2024-01-15/ │
│      └── data.json
└────────┬─────────┘
         │ Read
         ▼
┌──────────────────┐
│  Pandas/Python   │
│   Análises       │
│  (Futuro: BI)    │
└──────────────────┘
        

Fluxo Passo a Passo

  1. Agendamento: DAG executa toda segunda-feira às 8h UTC
  2. Extração: Requisição GET para API de previsão do tempo
  3. Validação: Verifica status 200 e estrutura JSON válida
  4. Transformação: Extrai campos relevantes e formata datas
  5. Armazenamento: Salva JSON particionado por data no Data Lake
  6. Logging: Registra status da execução para monitoramento

🛠️ Tecnologias Utilizadas

Orquestração

  • Apache Airflow 2.3.2+
  • DAG com schedule semanal
  • PythonOperator para tasks customizadas

Linguagem

  • Python 3.8+
  • Requests para chamadas HTTP
  • Pandas para manipulação de dados

Armazenamento

  • Data Lake local (file system)
  • Formato JSON
  • Particionamento por data (YYYY-MM-DD)

API

  • OpenWeatherMap API ou similar
  • Endpoint de previsão de 7 dias
  • Formato de resposta em JSON

💻 Exemplo de Código

DAG do Airflow

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import requests
import json
import os

default_args = {
    'owner': 'data_team',
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

def extract_weather_data(**context):
    """Extrai previsão de 7 dias da API"""
    API_KEY = os.getenv('WEATHER_API_KEY')
    CITY = 'Sao Paulo'
    BASE_URL = f'https://api.openweathermap.org/data/2.5/forecast'
    
    params = {
        'q': CITY,
        'appid': API_KEY,
        'units': 'metric',
        'cnt': 56  # 7 dias * 8 previsões/dia
    }
    
    response = requests.get(BASE_URL, params=params)
    response.raise_for_status()
    
    return response.json()

def save_to_datalake(**context):
    """Salva dados no Data Lake com particionamento por data"""
    ti = context['ti']
    data = ti.xcom_pull(task_ids='extract_weather')
    
    # Criar diretório com data atual
    today = datetime.now().strftime('%Y-%m-%d')
    data_dir = f'/opt/data_lake/weather/{today}'
    os.makedirs(data_dir, exist_ok=True)
    
    # Salvar JSON
    filepath = f'{data_dir}/forecast.json'
    with open(filepath, 'w') as f:
        json.dump(data, f, indent=2)
    
    print(f'Dados salvos em: {filepath}')

with DAG(
    'weather_pipeline',
    default_args=default_args,
    description='Pipeline de dados meteorológicos',
    schedule_interval='0 8 * * 1',  # Segunda-feira 8h
    start_date=datetime(2024, 1, 1),
    catchup=False,
) as dag:
    
    extract = PythonOperator(
        task_id='extract_weather',
        python_callable=extract_weather_data
    )
    
    save = PythonOperator(
        task_id='save_to_datalake',
        python_callable=save_to_datalake
    )
    
    extract >> save

Exemplo de Resposta da API

{
  "city": {
    "name": "São Paulo",
    "country": "BR"
  },
  "list": [
    {
      "dt": 1704096000,
      "main": {
        "temp": 25.3,
        "feels_like": 26.1,
        "humidity": 72
      },
      "weather": [
        {
          "main": "Clouds",
          "description": "poucas nuvens"
        }
      ],
      "wind": {
        "speed": 3.5
      },
      "dt_txt": "2024-01-01 12:00:00"
    }
    // ... mais 55 registros
  ]
}

🎯 Desafios e Soluções

1. Rate Limiting da API

Desafio: API gratuita tem limite de chamadas/minuto.

Solução: Implementei retry com backoff exponencial e cache local para evitar chamadas redundantes no mesmo dia.

2. Gerenciamento de Credenciais

Desafio: API key não pode estar hardcoded no código.

Solução: Usei Airflow Variables e environment variables para gerenciar credenciais de forma segura.

3. Organização do Data Lake

Desafio: Como organizar dados para fácil recuperação futura?

Solução: Particionamento por data (YYYY-MM-DD) permite queries eficientes e limpeza de dados antigos.

📊 Resultados

~30s

Tempo de Execução

56

Previsões por Execução

100%

Taxa de Sucesso

Possíveis Expansões

💡 Principais Aprendizados