E11 — Detecção de Anomalias e Análise Inteligente de Logs com IA
E11 — Detecção de Anomalias e Análise Inteligente de Logs com IA
Extensão C · AIOps e IA Aplicada a DevOps · Artigo E11 de E12 Prof. Ricardo Matos — Dominando DevOps & Cloud em 1 Ano
O Problema do Volume
Uma aplicação moderna em produção gera dezenas de milhares de linhas de log por minuto. Um cluster Kubernetes com dez serviços pode gerar gigabytes de logs por hora. Ler logs manualmente durante um incidente é como procurar uma agulha em um palheiro que cresce enquanto você procura.
A análise tradicional de logs funciona assim: o engenheiro sabe o que procurar, escreve uma query no Kibana ou CloudWatch Insights, filtra por nível de erro, e percorre os resultados. Isso pressupõe que o engenheiro já tem uma hipótese — o que é frequentemente falso nos primeiros minutos de um incidente.
A análise inteligente de logs com ML inverte a lógica: em vez de procurar por padrões conhecidos, o sistema identifica o que é incomum em relação ao comportamento histórico. Novos padrões de erro que nunca apareceram. Mensagens que passaram a aparecer com frequência anormalmente alta. Correlações entre eventos em serviços diferentes que nunca co-ocorreram antes.
Detecção de Anomalias com Prometheus e Python
Para times que já têm Prometheus como sistema de métricas, é possível construir detecção de anomalias sem ferramentas adicionais usando a API HTTP do Prometheus e análise em Python. O exemplo a seguir implementa um detector simples baseado em Z-score:
#!/usr/bin/env python3
# detector_anomalias.py
# Detecta anomalias em métricas do Prometheus usando Z-score
import requests
import numpy as np
from datetime import datetime, timedelta
from dataclasses import dataclass
from typing import Optional
import json
import os
PROMETHEUS_URL = os.getenv('PROMETHEUS_URL', 'http://localhost:9090')
SLACK_WEBHOOK = os.getenv('SLACK_WEBHOOK_URL')
ZSCORE_THRESHOLD = 3.0 # Desvios padrão para considerar anomalia
@dataclass
class Anomalia:
metrica: str
labels: dict
valor_atual: float
media_historica: float
desvio_padrao: float
zscore: float
timestamp: datetime
def buscar_historico(query: str, horas: int = 24) -> list[tuple[float, float]]:
"""Busca o histórico de uma métrica no Prometheus."""
fim = datetime.utcnow()
inicio = fim - timedelta(hours=horas)
response = requests.get(
f'{PROMETHEUS_URL}/api/v1/query_range',
params={
'query': query,
'start': inicio.timestamp(),
'end': fim.timestamp(),
'step': '5m'
},
timeout=30
)
response.raise_for_status()
data = response.json()
if data['status'] != 'success' or not data['data']['result']:
return []
# Retorna apenas os valores (timestamp, valor)
return [(float(ts), float(val))
for ts, val in data['data']['result'][0]['values']]
def detectar_anomalia(
query: str,
nome_metrica: str,
janela_historico_horas: int = 168 # 7 dias
) -> Optional[Anomalia]:
"""
Detecta se o valor atual de uma métrica é anômalo
comparado com o histórico recente.
"""
# Buscar histórico completo
historico = buscar_historico(query, janela_historico_horas)
if len(historico) < 10:
return None # Histórico insuficiente
valores = np.array([v for _, v in historico])
valor_atual = valores[-1]
# Excluir o valor atual do cálculo de baseline
baseline = valores[:-1]
media = np.mean(baseline)
desvio = np.std(baseline)
if desvio == 0:
return None # Métrica constante — sem anomalia possível
zscore = abs((valor_atual - media) / desvio)
if zscore >= ZSCORE_THRESHOLD:
return Anomalia(
metrica=nome_metrica,
labels={},
valor_atual=valor_atual,
media_historica=media,
desvio_padrao=desvio,
zscore=zscore,
timestamp=datetime.utcnow()
)
return None
def notificar_slack(anomalia: Anomalia):
"""Envia notificação de anomalia para o Slack."""
if not SLACK_WEBHOOK:
print(f"ANOMALIA: {anomalia}")
return
direcao = "↑" if anomalia.valor_atual > anomalia.media_historica else "↓"
variacao_pct = abs(
(anomalia.valor_atual - anomalia.media_historica)
/ anomalia.media_historica * 100
)
payload = {
"blocks": [
{
"type": "header",
"text": {
"type": "plain_text",
"text": f"⚠️ Anomalia Detectada: {anomalia.metrica}"
}
},
{
"type": "section",
"fields": [
{
"type": "mrkdwn",
"text": f"*Valor atual:*\n{anomalia.valor_atual:.2f} {direcao}"
},
{
"type": "mrkdwn",
"text": f"*Média histórica:*\n{anomalia.media_historica:.2f}"
},
{
"type": "mrkdwn",
"text": f"*Desvio (Z-score):*\n{anomalia.zscore:.1f}σ"
},
{
"type": "mrkdwn",
"text": f"*Variação:*\n{variacao_pct:.0f}% acima/abaixo do normal"
}
]
}
]
}
requests.post(SLACK_WEBHOOK, json=payload, timeout=10)
def executar_verificacoes():
"""Executa todas as verificações de anomalia configuradas."""
verificacoes = [
(
'sum(rate(http_requests_total{status=~"5..",namespace="producao"}[5m])) / '
'sum(rate(http_requests_total{namespace="producao"}[5m])) * 100',
'Taxa de Erro HTTP (%)'
),
(
'histogram_quantile(0.99, sum(rate(http_request_duration_seconds_bucket'
'{namespace="producao"}[5m])) by (le))',
'Latência p99 (segundos)'
),
(
'sum(rate(loja_pedidos_criados_total[5m])) * 60',
'Pedidos por Minuto'
),
(
'sum(rate(loja_checkouts_concluidos_total[30m])) / '
'sum(rate(loja_checkouts_iniciados_total[30m])) * 100',
'Taxa de Conversão (%)'
),
]
anomalias_detectadas = []
for query, nome in verificacoes:
anomalia = detectar_anomalia(query, nome)
if anomalia:
anomalias_detectadas.append(anomalia)
notificar_slack(anomalia)
print(f"Verificação concluída: {len(anomalias_detectadas)} anomalia(s) detectada(s)")
return anomalias_detectadas
if __name__ == '__main__':
executar_verificacoes()
Análise de Logs com LLM Durante Incidentes
O uso mais imediato de LLMs em operações é a análise de logs durante incidentes. O processo é simples: coletar os logs relevantes do período do incidente, enviá-los ao modelo com contexto sobre o sistema, e perguntar o que está errado.
O desafio é o volume. Logs de 30 minutos de um sistema em produção podem ter milhares de linhas — mais do que cabe no contexto de um LLM. A solução é filtrar e sumarizar antes de enviar:
#!/usr/bin/env python3
# analise_incidente.py
# Analisa logs de um incidente usando LLM
import anthropic
import subprocess
import sys
from datetime import datetime, timedelta
def coletar_logs_kubernetes(
namespace: str,
inicio: datetime,
fim: datetime,
max_linhas: int = 500
) -> dict[str, str]:
"""Coleta logs dos pods em um namespace no período do incidente."""
logs = {}
# Listar pods no namespace
resultado = subprocess.run(
['kubectl', 'get', 'pods', '-n', namespace, '-o', 'name'],
capture_output=True, text=True
)
pods = resultado.stdout.strip().split('\n')
for pod in pods:
pod_nome = pod.replace('pod/', '')
resultado = subprocess.run(
[
'kubectl', 'logs', pod_nome,
'-n', namespace,
'--since-time', inicio.isoformat() + 'Z',
'--tail', str(max_linhas)
],
capture_output=True, text=True
)
if resultado.stdout:
# Filtrar apenas linhas de erro e warning para reduzir volume
linhas_relevantes = [
linha for linha in resultado.stdout.split('\n')
if any(nivel in linha.upper()
for nivel in ['ERROR', 'WARN', 'FATAL', 'EXCEPTION', 'PANIC'])
]
if linhas_relevantes:
logs[pod_nome] = '\n'.join(linhas_relevantes[:100])
return logs
def analisar_incidente_com_llm(
logs: dict[str, str],
descricao_incidente: str,
arquitetura_sistema: str
) -> str:
"""Envia logs para análise pelo LLM e retorna o diagnóstico."""
cliente = anthropic.Anthropic()
# Formatar os logs para o contexto
logs_formatados = ""
for servico, conteudo in logs.items():
if conteudo.strip():
logs_formatados += f"\n=== {servico} ===\n{conteudo}\n"
prompt = f"""Você é um engenheiro sênior de SRE analisando um incidente de produção.
DESCRIÇÃO DO INCIDENTE:
{descricao_incidente}
ARQUITETURA DO SISTEMA:
{arquitetura_sistema}
LOGS COLETADOS (apenas erros e warnings do período do incidente):
{logs_formatados[:8000]} # Limitar para não exceder o contexto
Analise os logs e forneça:
1. **Causa raiz provável**: O que causou o incidente, com evidências dos logs.
2. **Linha do tempo**: Reconstrua a sequência de eventos com base nos timestamps.
3. **Serviços afetados**: Quais serviços foram impactados e em que ordem.
4. **Ações imediatas recomendadas**: O que fazer agora para resolver ou mitigar.
5. **Prevenção**: O que pode ser feito para evitar que isso se repita.
Seja específico e referencie as mensagens de log relevantes quando possível.
Responda em português."""
mensagem = cliente.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=2000,
messages=[{"role": "user", "content": prompt}]
)
return mensagem.content[0].text
def main():
# Exemplo de uso durante um incidente real
NAMESPACE = "producao"
INICIO_INCIDENTE = datetime.utcnow() - timedelta(minutes=30)
FIM_INCIDENTE = datetime.utcnow()
DESCRICAO = """
Às 14:23 os alertas começaram a disparar: queda de 60% nos pedidos por minuto
e aumento da taxa de erro HTTP para 15%. O time foi acionado às 14:25.
O deploy mais recente foi às 14:20 (versão v2.3.1 do order-service).
"""
ARQUITETURA = """
Sistema de e-commerce com microsserviços:
- api-gateway: recebe requisições e roteia para os serviços
- catalog-service: gerencia produtos e estoque (PostgreSQL + Redis cache)
- order-service: cria e gerencia pedidos (PostgreSQL, publica no SQS)
- notification-service: consome SQS e envia emails/SMS
Todos rodando no Kubernetes (EKS) com HPA configurado.
"""
print("Coletando logs do período do incidente...")
logs = coletar_logs_kubernetes(NAMESPACE, INICIO_INCIDENTE, FIM_INCIDENTE)
print(f"Logs coletados de {len(logs)} serviço(s)")
if not logs:
print("Nenhum log de erro encontrado no período")
sys.exit(0)
print("\nAnalisando com LLM...")
diagnostico = analisar_incidente_com_llm(logs, DESCRICAO, ARQUITETURA)
print("\n" + "="*60)
print("DIAGNÓSTICO DO INCIDENTE")
print("="*60)
print(diagnostico)
# Salvar o diagnóstico para o postmortem
with open(f'diagnostico-{datetime.now().strftime("%Y%m%d-%H%M")}.md', 'w') as f:
f.write(f"# Diagnóstico do Incidente — {datetime.now().strftime('%Y-%m-%d %H:%M')}\n\n")
f.write(f"## Descrição\n{DESCRICAO}\n\n")
f.write(f"## Análise\n{diagnostico}\n")
if __name__ == '__main__':
main()
Classificação Automática de Alertas
Outro caso de uso prático: usar um LLM para classificar e priorizar alertas que chegam em volume, reduzindo o ruído antes de acordar alguém:
# classificador_alertas.py
# Recebe alertas do Alertmanager via webhook e classifica com LLM
from flask import Flask, request, jsonify
import anthropic
import json
app = Flask(__name__)
cliente = anthropic.Anthropic()
RUNBOOKS_BASE_URL = "https://wiki.empresa.com/runbooks"
def classificar_alerta(alerta: dict) -> dict:
"""Classifica um alerta e sugere ação usando LLM."""
nome = alerta.get('labels', {}).get('alertname', 'Desconhecido')
severidade = alerta.get('labels', {}).get('severity', 'unknown')
descricao = alerta.get('annotations', {}).get('description', '')
sumario = alerta.get('annotations', {}).get('summary', '')
prompt = f"""Você é um sistema de triagem de alertas de infraestrutura.
ALERTA RECEBIDO:
- Nome: {nome}
- Severidade declarada: {severidade}
- Resumo: {sumario}
- Descrição: {descricao}
Com base nas informações acima, forneça uma análise em JSON com exatamente estes campos:
{{
"urgencia": "imediata|alta|media|baixa",
"acordar_plantao": true/false,
"provavel_causa": "descrição em uma frase",
"primeira_acao": "o que fazer primeiro",
"pode_ser_falso_positivo": true/false,
"razao_falso_positivo": "se aplicável, por quê poderia ser falso positivo"
}}
Responda APENAS com o JSON, sem texto adicional."""
resposta = cliente.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=400,
messages=[{"role": "user", "content": prompt}]
)
try:
classificacao = json.loads(resposta.content[0].text)
except json.JSONDecodeError:
classificacao = {
"urgencia": "alta",
"acordar_plantao": True,
"provavel_causa": "Erro na classificação automática",
"primeira_acao": "Investigar manualmente",
"pode_ser_falso_positivo": False,
"razao_falso_positivo": ""
}
return classificacao
@app.route('/webhook/alertmanager', methods=['POST'])
def receber_alertas():
"""Endpoint que recebe alertas do Alertmanager."""
payload = request.json
alertas = payload.get('alerts', [])
resultados = []
for alerta in alertas:
if alerta.get('status') != 'firing':
continue
classificacao = classificar_alerta(alerta)
nome = alerta['labels'].get('alertname', 'N/A')
# Logar para auditoria
print(f"Alerta: {nome} | Urgência: {classificacao['urgencia']} | "
f"Acordar plantão: {classificacao['acordar_plantao']}")
resultados.append({
'alerta': nome,
'classificacao': classificacao
})
# Notificar apenas se necessário acordar o plantão
if classificacao['acordar_plantao']:
enviar_para_pagerduty(alerta, classificacao)
else:
# Apenas registrar no canal de alertas sem notificação urgente
enviar_para_slack_informativo(alerta, classificacao)
return jsonify({'processados': len(resultados), 'resultados': resultados})
def enviar_para_pagerduty(alerta, classificacao):
# Implementação da integração com PagerDuty
pass
def enviar_para_slack_informativo(alerta, classificacao):
# Implementação da notificação Slack sem urgência
pass
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8080)
Referências para Aprofundamento
— Prometheus HTTP API: https://prometheus.io/docs/prometheus/latest/querying/api/ — Anthropic API — Messages: https://docs.anthropic.com/en/api/messages — Elastic ML — Anomaly Detection: https://www.elastic.co/guide/en/machine-learning/current/ml-ad-overview.html — AWS CloudWatch Anomaly Detection: https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Anomaly_Detection.html