From 829e408e1e99c5a558cbaca6ee02bcbfe7670fbb Mon Sep 17 00:00:00 2001 From: Max Mayfield Date: Sun, 1 Mar 2026 03:13:06 +0000 Subject: [PATCH] Add notification dispatchers (P3 Slack/Email/Webhook, P5 Slack), full YAML parser for P6 - P3 alert: NotificationDispatcher with Slack Block Kit, Resend email, generic webhook; severity-gated dispatch - P5 cost: CostSlackNotifier with anomaly Block Kit (score, deviation, snooze/expected buttons) - P6 run: Full YAML runbook parser with serde_yaml, variable substitution ({{var}}), failure actions, 7 tests - P6 parser: validates non-empty steps, default timeout (300s), default abort on failure --- .../src/notifications/dispatcher.ts | 210 ++++++++++++++++++ .../src/notifications/slack.ts | 89 ++++++++ .../06-runbook-automation/agent/src/parser.rs | 199 +++++++++++++++-- 3 files changed, 484 insertions(+), 14 deletions(-) create mode 100644 products/03-alert-intelligence/src/notifications/dispatcher.ts create mode 100644 products/05-aws-cost-anomaly/src/notifications/slack.ts diff --git a/products/03-alert-intelligence/src/notifications/dispatcher.ts b/products/03-alert-intelligence/src/notifications/dispatcher.ts new file mode 100644 index 0000000..d58fc45 --- /dev/null +++ b/products/03-alert-intelligence/src/notifications/dispatcher.ts @@ -0,0 +1,210 @@ +import pino from 'pino'; +import type { CorrelationWindow } from '../correlation/engine.js'; + +const logger = pino({ name: 'notifications' }); + +export interface NotificationChannel { + send(incident: IncidentNotification): Promise; +} + +export interface IncidentNotification { + incidentId: string; + title: string; + severity: string; + service: string; + alertCount: number; + firstAlertAt: Date; + fingerprint: string; + dashboardUrl: string; +} + +// --- Slack Block Kit --- + +export class SlackNotifier implements NotificationChannel { + private webhookUrl: string; + + constructor(webhookUrl: string) { + this.webhookUrl = webhookUrl; + } + + async send(incident: IncidentNotification): Promise { + const severityEmoji: Record = { + critical: '🔴', high: '🟠', medium: '🟡', low: 'đŸ”ĩ', info: 'â„šī¸', + }; + const emoji = severityEmoji[incident.severity] ?? 'âšĒ'; + + const blocks = [ + { + type: 'header', + text: { type: 'plain_text', text: `${emoji} ${incident.title}`, emoji: true }, + }, + { + type: 'section', + fields: [ + { type: 'mrkdwn', text: `*Severity:*\n${incident.severity.toUpperCase()}` }, + { type: 'mrkdwn', text: `*Service:*\n${incident.service}` }, + { type: 'mrkdwn', text: `*Alerts:*\n${incident.alertCount}` }, + { type: 'mrkdwn', text: `*First seen:*\n` }, + ], + }, + { + type: 'actions', + elements: [ + { + type: 'button', + text: { type: 'plain_text', text: '👀 View Incident' }, + url: incident.dashboardUrl, + action_id: `view_incident:${incident.incidentId}`, + }, + { + type: 'button', + text: { type: 'plain_text', text: '✅ Acknowledge' }, + style: 'primary', + action_id: `ack_incident:${incident.incidentId}`, + }, + { + type: 'button', + text: { type: 'plain_text', text: '🔇 Suppress' }, + action_id: `suppress_incident:${incident.incidentId}`, + }, + ], + }, + ]; + + try { + const res = await fetch(this.webhookUrl, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ blocks }), + }); + + if (!res.ok) { + logger.warn({ status: res.status, incidentId: incident.incidentId }, 'Slack notification failed'); + return false; + } + return true; + } catch (err) { + logger.error({ error: (err as Error).message, incidentId: incident.incidentId }, 'Slack send error'); + return false; + } + } +} + +// --- Email (via Resend) --- + +export class EmailNotifier implements NotificationChannel { + private apiKey: string; + private from: string; + private to: string; + + constructor(apiKey: string, from: string, to: string) { + this.apiKey = apiKey; + this.from = from; + this.to = to; + } + + async send(incident: IncidentNotification): Promise { + try { + const res = await fetch('https://api.resend.com/emails', { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'Authorization': `Bearer ${this.apiKey}`, + }, + body: JSON.stringify({ + from: this.from, + to: this.to, + subject: `[dd0c/alert] ${incident.severity.toUpperCase()}: ${incident.title}`, + html: ` +

${incident.title}

+

Severity: ${incident.severity}

+

Service: ${incident.service}

+

Alerts grouped: ${incident.alertCount}

+

View in Dashboard

+ `, + }), + }); + + if (!res.ok) { + logger.warn({ status: res.status }, 'Email notification failed'); + return false; + } + return true; + } catch (err) { + logger.error({ error: (err as Error).message }, 'Email send error'); + return false; + } + } +} + +// --- Webhook (generic) --- + +export class WebhookNotifier implements NotificationChannel { + private url: string; + + constructor(url: string) { + this.url = url; + } + + async send(incident: IncidentNotification): Promise { + try { + const res = await fetch(this.url, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(incident), + }); + return res.ok; + } catch (err) { + logger.error({ error: (err as Error).message }, 'Webhook send error'); + return false; + } + } +} + +// --- Dispatcher --- + +const SEVERITY_ORDER: Record = { critical: 5, high: 4, medium: 3, low: 2, info: 1 }; + +export class NotificationDispatcher { + private channels: Array<{ channel: NotificationChannel; minSeverity: string }> = []; + + addChannel(channel: NotificationChannel, minSeverity = 'medium') { + this.channels.push({ channel, minSeverity }); + } + + async dispatch(incident: IncidentNotification): Promise { + const incidentLevel = SEVERITY_ORDER[incident.severity] ?? 0; + let sent = 0; + + for (const { channel, minSeverity } of this.channels) { + const minLevel = SEVERITY_ORDER[minSeverity] ?? 0; + if (incidentLevel >= minLevel) { + const ok = await channel.send(incident); + if (ok) sent++; + } + } + + logger.info({ incidentId: incident.incidentId, sent, total: this.channels.length }, 'Notifications dispatched'); + return sent; + } + + /** + * Convert a shipped correlation window into a notification. + */ + static fromWindow(window: CorrelationWindow, baseUrl: string): IncidentNotification { + const topSeverity = window.alerts.reduce((max, a) => { + return (SEVERITY_ORDER[a.severity] ?? 0) > (SEVERITY_ORDER[max] ?? 0) ? a.severity : max; + }, 'info'); + + return { + incidentId: window.incidentId ?? 'unknown', + title: window.alerts[0]?.title ?? 'Alert Incident', + severity: topSeverity, + service: window.service, + alertCount: window.alerts.length, + firstAlertAt: new Date(window.openedAt), + fingerprint: window.fingerprint, + dashboardUrl: `${baseUrl}/incidents/${window.incidentId}`, + }; + } +} diff --git a/products/05-aws-cost-anomaly/src/notifications/slack.ts b/products/05-aws-cost-anomaly/src/notifications/slack.ts new file mode 100644 index 0000000..0e71f82 --- /dev/null +++ b/products/05-aws-cost-anomaly/src/notifications/slack.ts @@ -0,0 +1,89 @@ +import pino from 'pino'; + +const logger = pino({ name: 'cost-notifications' }); + +export interface CostAnomalyNotification { + anomalyId: string; + accountId: string; + resourceType: string; + region: string; + hourlyCost: number; + score: number; + baselineMean: number; + baselineStddev: number; + dashboardUrl: string; +} + +// --- Slack Block Kit for Cost Anomalies --- + +export class CostSlackNotifier { + private webhookUrl: string; + + constructor(webhookUrl: string) { + this.webhookUrl = webhookUrl; + } + + async send(anomaly: CostAnomalyNotification): Promise { + const scoreEmoji = anomaly.score >= 75 ? '🔴' : anomaly.score >= 50 ? '🟠' : '🟡'; + const deviation = anomaly.baselineStddev > 0 + ? ((anomaly.hourlyCost - anomaly.baselineMean) / anomaly.baselineStddev).toFixed(1) + : '∞'; + + const blocks = [ + { + type: 'header', + text: { type: 'plain_text', text: `${scoreEmoji} Cost Anomaly Detected`, emoji: true }, + }, + { + type: 'section', + fields: [ + { type: 'mrkdwn', text: `*Resource:*\n\`${anomaly.resourceType}\`` }, + { type: 'mrkdwn', text: `*Account:*\n${anomaly.accountId}` }, + { type: 'mrkdwn', text: `*Region:*\n${anomaly.region}` }, + { type: 'mrkdwn', text: `*Score:*\n${anomaly.score}/100` }, + { type: 'mrkdwn', text: `*Hourly Cost:*\n$${anomaly.hourlyCost.toFixed(4)}` }, + { type: 'mrkdwn', text: `*Baseline:*\n$${anomaly.baselineMean.toFixed(4)} Âą ${anomaly.baselineStddev.toFixed(4)}` }, + { type: 'mrkdwn', text: `*Deviation:*\n${deviation}΃` }, + ], + }, + { + type: 'actions', + elements: [ + { + type: 'button', + text: { type: 'plain_text', text: '📊 View Details' }, + url: anomaly.dashboardUrl, + action_id: `view_anomaly:${anomaly.anomalyId}`, + }, + { + type: 'button', + text: { type: 'plain_text', text: '✅ Expected' }, + action_id: `mark_expected:${anomaly.anomalyId}`, + }, + { + type: 'button', + text: { type: 'plain_text', text: '😴 Snooze 24h' }, + action_id: `snooze_anomaly:${anomaly.anomalyId}`, + }, + ], + }, + ]; + + try { + const res = await fetch(this.webhookUrl, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ blocks }), + }); + + if (!res.ok) { + logger.warn({ status: res.status, anomalyId: anomaly.anomalyId }, 'Slack cost notification failed'); + return false; + } + return true; + } catch (err) { + logger.error({ error: (err as Error).message }, 'Slack cost send error'); + return false; + } + } +} diff --git a/products/06-runbook-automation/agent/src/parser.rs b/products/06-runbook-automation/agent/src/parser.rs index 3b3cfaa..307aad2 100644 --- a/products/06-runbook-automation/agent/src/parser.rs +++ b/products/06-runbook-automation/agent/src/parser.rs @@ -1,4 +1,5 @@ use serde::{Deserialize, Serialize}; +use anyhow::{Context, Result}; /// Parsed runbook step. #[derive(Debug, Clone, Serialize, Deserialize)] @@ -6,47 +7,217 @@ pub struct RunbookStep { pub index: usize, pub description: String, pub command: String, + #[serde(default = "default_timeout")] pub timeout_seconds: u64, + #[serde(default)] pub on_failure: FailureAction, - pub condition: Option, // Optional: only run if previous step output matches + pub condition: Option, } +fn default_timeout() -> u64 { 300 } + #[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "action", rename_all = "snake_case")] pub enum FailureAction { Abort, Continue, Retry { max_attempts: u32 }, } +impl Default for FailureAction { + fn default() -> Self { FailureAction::Abort } +} + /// Parsed runbook. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Runbook { pub name: String, + #[serde(default)] pub description: String, + #[serde(default = "default_version")] pub version: String, pub steps: Vec, + #[serde(default)] + pub variables: std::collections::HashMap, +} + +fn default_version() -> String { "0.1.0".into() } + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct VariableSpec { + pub description: String, + pub default: Option, + #[serde(default)] + pub required: bool, } /// Parse a YAML runbook into structured steps. -pub fn parse_yaml(content: &str) -> anyhow::Result { - // TODO: Full YAML parsing with serde_yaml - // For now, return a placeholder - Ok(Runbook { - name: "placeholder".into(), - description: "TODO: implement YAML parser".into(), - version: "0.1.0".into(), - steps: vec![], - }) +/// +/// Example YAML: +/// ```yaml +/// name: restart-service +/// description: Restart a stuck ECS service +/// variables: +/// cluster: +/// description: ECS cluster name +/// required: true +/// service: +/// description: ECS service name +/// required: true +/// steps: +/// - description: Check current task count +/// command: "aws ecs describe-services --cluster {{cluster}} --services {{service}}" +/// timeout_seconds: 30 +/// - description: Force new deployment +/// command: "aws ecs update-service --cluster {{cluster}} --service {{service}} --force-new-deployment" +/// timeout_seconds: 60 +/// on_failure: +/// action: abort +/// - description: Wait for stable +/// command: "aws ecs wait services-stable --cluster {{cluster}} --services {{service}}" +/// timeout_seconds: 300 +/// on_failure: +/// action: retry +/// max_attempts: 3 +/// ``` +pub fn parse_yaml(content: &str) -> Result { + let mut runbook: Runbook = serde_yaml::from_str(content) + .context("Failed to parse runbook YAML")?; + + // Assign step indices + for (i, step) in runbook.steps.iter_mut().enumerate() { + step.index = i; + } + + // Validate: at least one step + if runbook.steps.is_empty() { + anyhow::bail!("Runbook must have at least one step"); + } + + Ok(runbook) +} + +/// Substitute variables in a command string. +/// Variables use `{{name}}` syntax. +pub fn substitute_variables( + command: &str, + variables: &std::collections::HashMap, +) -> Result { + let mut result = command.to_string(); + for (key, value) in variables { + result = result.replace(&format!("{{{{{}}}}}", key), value); + } + + // Check for unresolved variables + if result.contains("{{") { + let unresolved: Vec<&str> = result + .match_indices("{{") + .filter_map(|(start, _)| { + result[start..].find("}}").map(|end| &result[start..start + end + 2]) + }) + .collect(); + anyhow::bail!("Unresolved variables: {:?}", unresolved); + } + + Ok(result) } #[cfg(test)] mod tests { use super::*; + use std::collections::HashMap; + + const SAMPLE_YAML: &str = r#" +name: restart-service +description: Restart a stuck ECS service +variables: + cluster: + description: ECS cluster name + required: true + service: + description: ECS service name + required: true +steps: + - description: Check current task count + command: "aws ecs describe-services --cluster {{cluster}} --services {{service}}" + timeout_seconds: 30 + - description: Force new deployment + command: "aws ecs update-service --cluster {{cluster}} --service {{service}} --force-new-deployment" + timeout_seconds: 60 + on_failure: + action: abort + - description: Wait for stable + command: "aws ecs wait services-stable --cluster {{cluster}} --services {{service}}" + timeout_seconds: 300 + on_failure: + action: retry + max_attempts: 3 +"#; #[test] - fn test_parse_empty_returns_placeholder() { - let result = parse_yaml("").unwrap(); - assert_eq!(result.name, "placeholder"); - assert!(result.steps.is_empty()); + fn test_parse_valid_yaml() { + let runbook = parse_yaml(SAMPLE_YAML).unwrap(); + assert_eq!(runbook.name, "restart-service"); + assert_eq!(runbook.steps.len(), 3); + assert_eq!(runbook.steps[0].index, 0); + assert_eq!(runbook.steps[2].timeout_seconds, 300); + assert_eq!(runbook.variables.len(), 2); + assert!(runbook.variables.get("cluster").unwrap().required); + } + + #[test] + fn test_parse_empty_steps_fails() { + let yaml = "name: empty\nsteps: []"; + assert!(parse_yaml(yaml).is_err()); + } + + #[test] + fn test_parse_invalid_yaml_fails() { + assert!(parse_yaml("not: [valid: yaml: {{").is_err()); + } + + #[test] + fn test_substitute_variables() { + let mut vars = HashMap::new(); + vars.insert("cluster".into(), "prod".into()); + vars.insert("service".into(), "api".into()); + + let result = substitute_variables( + "aws ecs describe-services --cluster {{cluster}} --services {{service}}", + &vars, + ).unwrap(); + + assert_eq!(result, "aws ecs describe-services --cluster prod --services api"); + } + + #[test] + fn test_unresolved_variable_fails() { + let vars = HashMap::new(); + let result = substitute_variables("echo {{missing}}", &vars); + assert!(result.is_err()); + } + + #[test] + fn test_default_failure_action_is_abort() { + let yaml = r#" +name: simple +steps: + - description: test + command: echo hello +"#; + let runbook = parse_yaml(yaml).unwrap(); + assert!(matches!(runbook.steps[0].on_failure, FailureAction::Abort)); + } + + #[test] + fn test_default_timeout() { + let yaml = r#" +name: simple +steps: + - description: test + command: echo hello +"#; + let runbook = parse_yaml(yaml).unwrap(); + assert_eq!(runbook.steps[0].timeout_seconds, 300); } }