Перейти к основному содержимому

Проверка подписи вебхука

Каждый запрос содержит заголовок X-MailInfra-Signature. Без проверки подписи endpoint можно подделать любым внешним клиентом.

Формат заголовка

X-MailInfra-Signature: t=1715461200,v1=3d4f2e1a...
  • t — Unix timestamp (секунды UTC) в момент отправки;
  • v1 — HMAC-SHA256 hex-дайджест строки f"{t}." + body.

Секрет (whsec_…) выдаётся при создании вебхука в дашборде и показывается один раз. Он же возвращается заново только при ротации. Храните его как пароль в переменных окружения.

Алгоритм

  1. Получить сырое тело запроса (байты до парсинга JSON).
  2. Извлечь t и v1 из заголовка.
  3. Проверить, что |now − t| ≤ 300 секунд — защита от replay-атак.
  4. Собрать строку для подписи: f"{t}.".encode() + body_bytes.
  5. Вычислить HMAC-SHA256 с ключом-секретом (включая префикс whsec_).
  6. Сравнить с v1 через timing-safe функцию.
Сырое тело, не JSON-объект

Body parsers и middleware могут изменить байты тела (порядок ключей, пробелы). Подпись считается по тем же байтам, что прислал MailInfra. Всегда вычитывайте request.body до парсинга и подавайте именно его в верификацию.

Реализации

import hashlib
import hmac
import time


def verify_mailinfra_signature(
body: bytes,
secret: str,
signature_header: str,
max_age_seconds: int = 300,
) -> bool:
"""Проверяет X-MailInfra-Signature.

Args:
body: Сырое тело запроса до парсинга JSON.
secret: Секрет вебхука (whsec_...).
signature_header: Значение X-MailInfra-Signature.
max_age_seconds: Защита от replay-атак.

Returns:
True если подпись корректна и событие свежее.
"""
try:
parts = dict(p.split("=", 1) for p in signature_header.split(","))
timestamp = int(parts["t"])
provided_sig = parts["v1"]
except (KeyError, ValueError):
return False

if abs(int(time.time()) - timestamp) > max_age_seconds:
return False

signing_string = f"{timestamp}.".encode("utf-8") + body
expected_sig = hmac.new(
secret.encode("utf-8"),
signing_string,
hashlib.sha256,
).hexdigest()

return hmac.compare_digest(expected_sig, provided_sig)

С FastAPI:

from fastapi import FastAPI, Header, HTTPException, Request

app = FastAPI()
WEBHOOK_SECRET = "whsec_..."


@app.post("/hooks/mailinfra")
async def handle_webhook(
request: Request,
x_mailinfra_signature: str = Header(...),
) -> dict:
body = await request.body()

if not verify_mailinfra_signature(body, WEBHOOK_SECRET, x_mailinfra_signature):
raise HTTPException(status_code=400, detail="Invalid signature")

payload = await request.json()
if payload["type"] == "email.delivered":
...

return {"ok": True}

Дедупликация

При retry MailInfra шлёт то же событие с тем же X-MailInfra-Event-Id. Сохраняйте обработанные ID и пропускайте дубли — это самый простой способ сделать обработчик идемпотентным.

Ротация секрета

При утечке создайте новый секрет в дашборде: Проект → Вебхуки → Ротация. Старый секрет немедленно перестаёт быть валидным — обновите переменную окружения до следующего события.