Scaffold dd0c/alert: ingestion, correlation engine, HMAC validation, tests

- Webhook ingestion: HMAC validation for Datadog/PagerDuty/OpsGenie with 5-min timestamp freshness
- Payload normalizers: canonical alert schema with severity mapping per provider
- Correlation engine: time-window grouping, late-alert attachment (2x window), FakeClock for testing
- InMemoryWindowStore for unit tests
- Tests: 12 HMAC validation cases, 5 normalizer cases, 7 correlation engine cases
- PostgreSQL schema with RLS: tenants, incidents, alerts, webhook_secrets, notification_configs
- Free tier enforcement columns (alert_count_month, reset_at)
- Fly.io config, Dockerfile, Gitea Actions CI
This commit is contained in:
2026-03-01 02:49:14 +00:00
parent 5d67de6486
commit ccc4cd1c32
10 changed files with 838 additions and 0 deletions

View File

@@ -0,0 +1,165 @@
import pino from 'pino';
import type { CanonicalAlert } from '../ingestion/webhook.js';
const logger = pino({ name: 'correlation' });
// --- Interfaces ---
export interface WindowStore {
getWindow(tenantId: string, fingerprint: string): Promise<CorrelationWindow | null>;
upsertWindow(tenantId: string, window: CorrelationWindow): Promise<void>;
closeWindow(tenantId: string, fingerprint: string): Promise<CorrelationWindow | null>;
getExpiredWindows(cutoffMs: number): Promise<CorrelationWindow[]>;
}
export interface Clock {
now(): number;
}
export class RealClock implements Clock {
now() { return Date.now(); }
}
export class FakeClock implements Clock {
private current: number;
constructor(start = Date.now()) { this.current = start; }
now() { return this.current; }
advanceBy(ms: number) { this.current += ms; }
set(ms: number) { this.current = ms; }
}
export interface CorrelationWindow {
fingerprint: string;
service: string;
alerts: CanonicalAlert[];
openedAt: number;
lastAlertAt: number;
incidentId?: string;
shipped: boolean;
}
export interface CorrelationResult {
incidentId: string;
action: 'new_incident' | 'attached_to_existing' | 'window_updated';
alertCount: number;
}
// --- Correlation Engine ---
const DEFAULT_WINDOW_MS = 5 * 60 * 1000; // 5 minutes
const LATE_ATTACH_MULTIPLIER = 2; // Attach late alerts up to 2x window
export class CorrelationEngine {
private windowStore: WindowStore;
private clock: Clock;
private windowMs: number;
constructor(windowStore: WindowStore, clock: Clock = new RealClock(), windowMs = DEFAULT_WINDOW_MS) {
this.windowStore = windowStore;
this.clock = clock;
this.windowMs = windowMs;
}
async process(tenantId: string, alert: CanonicalAlert): Promise<CorrelationResult> {
const now = this.clock.now();
const existing = await this.windowStore.getWindow(tenantId, alert.fingerprint);
// Case 1: No existing window — create new
if (!existing) {
const window: CorrelationWindow = {
fingerprint: alert.fingerprint,
service: alert.service ?? 'unknown',
alerts: [alert],
openedAt: now,
lastAlertAt: now,
shipped: false,
};
await this.windowStore.upsertWindow(tenantId, window);
return { incidentId: '', action: 'window_updated', alertCount: 1 };
}
// Case 2: Window exists and is still open
if (!existing.shipped) {
existing.alerts.push(alert);
existing.lastAlertAt = now;
await this.windowStore.upsertWindow(tenantId, existing);
return {
incidentId: existing.incidentId ?? '',
action: 'window_updated',
alertCount: existing.alerts.length,
};
}
// Case 3: Window shipped (incident created) — late alert
const windowAge = now - existing.openedAt;
if (windowAge <= this.windowMs * LATE_ATTACH_MULTIPLIER) {
// Within 2x window — attach to existing incident
existing.alerts.push(alert);
existing.lastAlertAt = now;
await this.windowStore.upsertWindow(tenantId, existing);
return {
incidentId: existing.incidentId ?? '',
action: 'attached_to_existing',
alertCount: existing.alerts.length,
};
}
// Case 4: Very late alert (>2x window) — new incident
const newWindow: CorrelationWindow = {
fingerprint: alert.fingerprint,
service: alert.service ?? 'unknown',
alerts: [alert],
openedAt: now,
lastAlertAt: now,
shipped: false,
};
await this.windowStore.upsertWindow(tenantId, newWindow);
return { incidentId: '', action: 'new_incident', alertCount: 1 };
}
async flushWindows(tenantId: string): Promise<CorrelationWindow[]> {
const now = this.clock.now();
const cutoff = now - this.windowMs;
const expired = await this.windowStore.getExpiredWindows(cutoff);
const shipped: CorrelationWindow[] = [];
for (const window of expired) {
if (window.shipped) continue;
window.shipped = true;
window.incidentId = `inc_${crypto.randomUUID().slice(0, 8)}`;
await this.windowStore.upsertWindow(tenantId, window);
shipped.push(window);
}
return shipped;
}
}
// --- In-Memory Window Store (for testing) ---
export class InMemoryWindowStore implements WindowStore {
private windows = new Map<string, CorrelationWindow>();
private key(tenantId: string, fingerprint: string) {
return `${tenantId}:${fingerprint}`;
}
async getWindow(tenantId: string, fingerprint: string) {
return this.windows.get(this.key(tenantId, fingerprint)) ?? null;
}
async upsertWindow(tenantId: string, window: CorrelationWindow) {
this.windows.set(this.key(tenantId, window.fingerprint), window);
}
async closeWindow(tenantId: string, fingerprint: string) {
const w = this.windows.get(this.key(tenantId, fingerprint));
if (w) this.windows.delete(this.key(tenantId, fingerprint));
return w ?? null;
}
async getExpiredWindows(cutoffMs: number) {
return Array.from(this.windows.values()).filter(w => w.lastAlertAt <= cutoffMs && !w.shipped);
}
}

View File

@@ -0,0 +1,217 @@
import { z } from 'zod';
import crypto from 'node:crypto';
import pino from 'pino';
const logger = pino({ name: 'ingestion' });
// --- Canonical Alert Schema ---
export const canonicalAlertSchema = z.object({
sourceProvider: z.enum(['datadog', 'pagerduty', 'opsgenie', 'grafana', 'custom']),
sourceId: z.string(),
fingerprint: z.string(),
title: z.string(),
severity: z.enum(['critical', 'high', 'medium', 'low', 'info']),
status: z.enum(['firing', 'resolved']),
service: z.string().optional(),
environment: z.string().optional(),
tags: z.record(z.string()).default({}),
rawPayload: z.any(),
timestamp: z.number(), // Unix ms
});
export type CanonicalAlert = z.infer<typeof canonicalAlertSchema>;
// --- HMAC Validation (BMad Must-Have: Replay Prevention) ---
const MAX_TIMESTAMP_DRIFT_SECONDS = 300; // 5 minutes
export interface HmacValidationResult {
valid: boolean;
error?: string;
}
export function validateDatadogHmac(
body: string,
signature: string | undefined,
timestamp: string | undefined,
secret: string,
): HmacValidationResult {
if (!signature || !timestamp) {
return { valid: false, error: 'Missing signature or timestamp header' };
}
// Timestamp freshness check
const ts = parseInt(timestamp, 10);
const now = Math.floor(Date.now() / 1000);
if (Math.abs(now - ts) > MAX_TIMESTAMP_DRIFT_SECONDS) {
return { valid: false, error: 'stale timestamp' };
}
const expected = crypto
.createHmac('sha256', secret)
.update(timestamp + body)
.digest('hex');
if (!crypto.timingSafeEqual(Buffer.from(signature), Buffer.from(expected))) {
return { valid: false, error: 'Invalid signature' };
}
return { valid: true };
}
export function validatePagerdutyHmac(
body: string,
signature: string | undefined,
secret: string,
): HmacValidationResult {
if (!signature) {
return { valid: false, error: 'Missing signature header' };
}
// PagerDuty v1 signatures include timestamp in the signature header
const parts = signature.split(',');
const tsPart = parts.find(p => p.startsWith('t='));
const sigPart = parts.find(p => p.startsWith('v1='));
if (!tsPart || !sigPart) {
return { valid: false, error: 'Malformed PagerDuty signature' };
}
const ts = parseInt(tsPart.slice(2), 10);
const now = Math.floor(Date.now() / 1000);
if (Math.abs(now - ts) > MAX_TIMESTAMP_DRIFT_SECONDS) {
return { valid: false, error: 'stale timestamp' };
}
const expected = crypto
.createHmac('sha256', secret)
.update(`${ts}.${body}`)
.digest('hex');
const sig = sigPart.slice(3);
if (!crypto.timingSafeEqual(Buffer.from(sig), Buffer.from(expected))) {
return { valid: false, error: 'Invalid signature' };
}
return { valid: true };
}
export function validateOpsgenieHmac(
body: string,
signature: string | undefined,
secret: string,
): HmacValidationResult {
if (!signature) {
return { valid: false, error: 'Missing signature header' };
}
// OpsGenie: extract timestamp from payload body
let payload: any;
try {
payload = JSON.parse(body);
} catch {
return { valid: false, error: 'Invalid JSON body' };
}
const ts = payload?.timestamp;
if (ts) {
const tsSeconds = typeof ts === 'number' ? ts / 1000 : parseInt(ts, 10);
const now = Math.floor(Date.now() / 1000);
if (Math.abs(now - tsSeconds) > MAX_TIMESTAMP_DRIFT_SECONDS) {
return { valid: false, error: 'stale timestamp' };
}
}
const expected = crypto
.createHmac('sha256', secret)
.update(body)
.digest('hex');
if (!crypto.timingSafeEqual(Buffer.from(signature), Buffer.from(expected))) {
return { valid: false, error: 'Invalid signature' };
}
return { valid: true };
}
// --- Payload Normalizers ---
export function normalizeDatadog(payload: any): CanonicalAlert {
return {
sourceProvider: 'datadog',
sourceId: payload.id ?? payload.alert_id ?? crypto.randomUUID(),
fingerprint: payload.aggregation_key ?? payload.alert_id ?? '',
title: payload.title ?? payload.msg_title ?? 'Datadog Alert',
severity: mapDatadogPriority(payload.priority),
status: payload.alert_transition === 'Recovered' ? 'resolved' : 'firing',
service: payload.tags?.service,
environment: payload.tags?.env,
tags: payload.tags ?? {},
rawPayload: payload,
timestamp: payload.date_happened ? payload.date_happened * 1000 : Date.now(),
};
}
export function normalizePagerduty(payload: any): CanonicalAlert {
const event = payload.event ?? payload;
const data = event.data ?? event.incident ?? {};
return {
sourceProvider: 'pagerduty',
sourceId: data.id ?? crypto.randomUUID(),
fingerprint: data.incident_key ?? data.id ?? '',
title: data.title ?? data.description ?? 'PagerDuty Incident',
severity: mapPagerdutyUrgency(data.urgency),
status: event.event_type?.includes('resolve') ? 'resolved' : 'firing',
service: data.service?.name,
environment: data.body?.details?.environment,
tags: {},
rawPayload: payload,
timestamp: data.created_at ? new Date(data.created_at).getTime() : Date.now(),
};
}
export function normalizeOpsgenie(payload: any): CanonicalAlert {
return {
sourceProvider: 'opsgenie',
sourceId: payload.alert?.alertId ?? crypto.randomUUID(),
fingerprint: payload.alert?.alias ?? payload.alert?.alertId ?? '',
title: payload.alert?.message ?? 'OpsGenie Alert',
severity: mapOpsgeniePriority(payload.alert?.priority),
status: payload.action === 'Close' ? 'resolved' : 'firing',
service: payload.alert?.tags?.find((t: string) => t.startsWith('service:'))?.slice(8),
tags: {},
rawPayload: payload,
timestamp: payload.alert?.createdAt ? new Date(payload.alert.createdAt).getTime() : Date.now(),
};
}
// --- Severity Mappers ---
function mapDatadogPriority(p: string | undefined): CanonicalAlert['severity'] {
switch (p) {
case 'P1': return 'critical';
case 'P2': return 'high';
case 'P3': return 'medium';
case 'P4': return 'low';
default: return 'medium';
}
}
function mapPagerdutyUrgency(u: string | undefined): CanonicalAlert['severity'] {
switch (u) {
case 'high': return 'critical';
case 'low': return 'low';
default: return 'medium';
}
}
function mapOpsgeniePriority(p: string | undefined): CanonicalAlert['severity'] {
switch (p) {
case 'P1': return 'critical';
case 'P2': return 'high';
case 'P3': return 'medium';
case 'P4': case 'P5': return 'low';
default: return 'medium';
}
}