Проверка подписи вебхука
Каждый запрос содержит заголовок X-MailInfra-Signature. Без проверки подписи endpoint можно подделать любым внешним клиентом.
Формат заголовка
X-MailInfra-Signature: t=1715461200,v1=3d4f2e1a...
t— Unix timestamp (секунды UTC) в момент отправки;v1— HMAC-SHA256 hex-дайджест строкиf"{t}." + body.
Секрет (whsec_…) выдаётся при создании вебхука в дашборде и показывается один раз. Он же возвращается заново только при ротации. Храните его как пароль в переменных окружения.
Алгоритм
- Получить сырое тело запроса (байты до парсинга JSON).
- Извлечь
tиv1из заголовка. - Проверить, что
|now − t| ≤ 300 секунд— защита от replay-атак. - Собрать строку для подписи:
f"{t}.".encode() + body_bytes. - Вычислить HMAC-SHA256 с ключом-секретом (включая префикс
whsec_). - Сравнить с
v1через timing-safe функцию.
Body parsers и middleware могут изменить байты тела (порядок ключей, пробелы). Подпись считается по тем же байтам, что прислал MailInfra. Всегда вычитывайте request.body до парсинга и подавайте именно его в верификацию.
Реализации
- Python
- TypeScript / Node.js
- Go
import hashlib
import hmac
import time
def verify_mailinfra_signature(
body: bytes,
secret: str,
signature_header: str,
max_age_seconds: int = 300,
) -> bool:
"""Проверяет X-MailInfra-Signature.
Args:
body: Сырое тело запроса до парсинга JSON.
secret: Секрет вебхука (whsec_...).
signature_header: Значение X-MailInfra-Signature.
max_age_seconds: Защита от replay-атак.
Returns:
True если подпись корректна и событие свежее.
"""
try:
parts = dict(p.split("=", 1) for p in signature_header.split(","))
timestamp = int(parts["t"])
provided_sig = parts["v1"]
except (KeyError, ValueError):
return False
if abs(int(time.time()) - timestamp) > max_age_seconds:
return False
signing_string = f"{timestamp}.".encode("utf-8") + body
expected_sig = hmac.new(
secret.encode("utf-8"),
signing_string,
hashlib.sha256,
).hexdigest()
return hmac.compare_digest(expected_sig, provided_sig)
С FastAPI:
from fastapi import FastAPI, Header, HTTPException, Request
app = FastAPI()
WEBHOOK_SECRET = "whsec_..."
@app.post("/hooks/mailinfra")
async def handle_webhook(
request: Request,
x_mailinfra_signature: str = Header(...),
) -> dict:
body = await request.body()
if not verify_mailinfra_signature(body, WEBHOOK_SECRET, x_mailinfra_signature):
raise HTTPException(status_code=400, detail="Invalid signature")
payload = await request.json()
if payload["type"] == "email.delivered":
...
return {"ok": True}
import * as crypto from "crypto";
function verifyMailInfraSignature(
body: Buffer | string,
secret: string,
signatureHeader: string,
maxAgeSeconds = 300,
): boolean {
const rawBody = Buffer.isBuffer(body) ? body : Buffer.from(body);
const parts = Object.fromEntries(
signatureHeader.split(",").map((p) => p.split("=", 2) as [string, string]),
);
const timestamp = parseInt(parts.t, 10);
const providedSig = parts.v1;
if (!timestamp || !providedSig) return false;
if (Math.abs(Date.now() / 1000 - timestamp) > maxAgeSeconds) return false;
const signingString = Buffer.concat([
Buffer.from(`${timestamp}.`, "utf-8"),
rawBody,
]);
const expected = crypto
.createHmac("sha256", secret)
.update(signingString)
.digest("hex");
return crypto.timingSafeEqual(
Buffer.from(expected, "hex"),
Buffer.from(providedSig, "hex"),
);
}
С Express обязательно используйте express.raw, не express.json:
import express from "express";
const app = express();
const SECRET = process.env.MAILINFRA_WEBHOOK_SECRET!;
app.post(
"/hooks/mailinfra",
express.raw({ type: "application/json" }),
(req, res) => {
const sig = req.headers["x-mailinfra-signature"] as string;
if (!verifyMailInfraSignature(req.body, SECRET, sig)) {
return res.status(400).json({ error: "Invalid signature" });
}
const payload = JSON.parse(req.body.toString());
setImmediate(() => handle(payload));
res.json({ ok: true });
},
);
package webhooks
import (
"crypto/hmac"
"crypto/sha256"
"encoding/hex"
"fmt"
"io"
"math"
"net/http"
"strconv"
"strings"
"time"
)
func VerifySignature(body []byte, secret, signatureHeader string, maxAgeSec int64) bool {
parts := make(map[string]string)
for _, p := range strings.Split(signatureHeader, ",") {
kv := strings.SplitN(p, "=", 2)
if len(kv) == 2 {
parts[kv[0]] = kv[1]
}
}
ts, err := strconv.ParseInt(parts["t"], 10, 64)
if err != nil || parts["v1"] == "" {
return false
}
if int64(math.Abs(float64(time.Now().Unix()-ts))) > maxAgeSec {
return false
}
signing := append([]byte(fmt.Sprintf("%d.", ts)), body...)
mac := hmac.New(sha256.New, []byte(secret))
mac.Write(signing)
expected := hex.EncodeToString(mac.Sum(nil))
return hmac.Equal([]byte(expected), []byte(parts["v1"]))
}
func WebhookHandler(secret string) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
body, _ := io.ReadAll(r.Body)
sig := r.Header.Get("X-MailInfra-Signature")
if !VerifySignature(body, secret, sig, 300) {
http.Error(w, "invalid signature", http.StatusBadRequest)
return
}
w.WriteHeader(http.StatusOK)
}
}
Дедупликация
При retry MailInfra шлёт то же событие с тем же X-MailInfra-Event-Id. Сохраняйте обработанные ID и пропускайте дубли — это самый простой способ сделать обработчик идемпотентным.
Ротация секрета
При утечке создайте новый секрет в дашборде: Проект → Вебхуки → Ротация. Старый секрет немедленно перестаёт быть валидным — обновите переменную окружения до следующего события.