Add notification dispatchers (P3 Slack/Email/Webhook, P5 Slack), full YAML parser for P6
- P3 alert: NotificationDispatcher with Slack Block Kit, Resend email, generic webhook; severity-gated dispatch
- P5 cost: CostSlackNotifier with anomaly Block Kit (score, deviation, snooze/expected buttons)
- P6 run: Full YAML runbook parser with serde_yaml, variable substitution ({{var}}), failure actions, 7 tests
- P6 parser: validates non-empty steps, default timeout (300s), default abort on failure
This commit is contained in:
210
products/03-alert-intelligence/src/notifications/dispatcher.ts
Normal file
210
products/03-alert-intelligence/src/notifications/dispatcher.ts
Normal file
@@ -0,0 +1,210 @@
|
||||
import pino from 'pino';
|
||||
import type { CorrelationWindow } from '../correlation/engine.js';
|
||||
|
||||
const logger = pino({ name: 'notifications' });
|
||||
|
||||
export interface NotificationChannel {
|
||||
send(incident: IncidentNotification): Promise<boolean>;
|
||||
}
|
||||
|
||||
export interface IncidentNotification {
|
||||
incidentId: string;
|
||||
title: string;
|
||||
severity: string;
|
||||
service: string;
|
||||
alertCount: number;
|
||||
firstAlertAt: Date;
|
||||
fingerprint: string;
|
||||
dashboardUrl: string;
|
||||
}
|
||||
|
||||
// --- Slack Block Kit ---
|
||||
|
||||
export class SlackNotifier implements NotificationChannel {
|
||||
private webhookUrl: string;
|
||||
|
||||
constructor(webhookUrl: string) {
|
||||
this.webhookUrl = webhookUrl;
|
||||
}
|
||||
|
||||
async send(incident: IncidentNotification): Promise<boolean> {
|
||||
const severityEmoji: Record<string, string> = {
|
||||
critical: '🔴', high: '🟠', medium: '🟡', low: '🔵', info: 'ℹ️',
|
||||
};
|
||||
const emoji = severityEmoji[incident.severity] ?? '⚪';
|
||||
|
||||
const blocks = [
|
||||
{
|
||||
type: 'header',
|
||||
text: { type: 'plain_text', text: `${emoji} ${incident.title}`, emoji: true },
|
||||
},
|
||||
{
|
||||
type: 'section',
|
||||
fields: [
|
||||
{ type: 'mrkdwn', text: `*Severity:*\n${incident.severity.toUpperCase()}` },
|
||||
{ type: 'mrkdwn', text: `*Service:*\n${incident.service}` },
|
||||
{ type: 'mrkdwn', text: `*Alerts:*\n${incident.alertCount}` },
|
||||
{ type: 'mrkdwn', text: `*First seen:*\n<!date^${Math.floor(incident.firstAlertAt.getTime() / 1000)}^{date_short_pretty} {time}|${incident.firstAlertAt.toISOString()}>` },
|
||||
],
|
||||
},
|
||||
{
|
||||
type: 'actions',
|
||||
elements: [
|
||||
{
|
||||
type: 'button',
|
||||
text: { type: 'plain_text', text: '👀 View Incident' },
|
||||
url: incident.dashboardUrl,
|
||||
action_id: `view_incident:${incident.incidentId}`,
|
||||
},
|
||||
{
|
||||
type: 'button',
|
||||
text: { type: 'plain_text', text: '✅ Acknowledge' },
|
||||
style: 'primary',
|
||||
action_id: `ack_incident:${incident.incidentId}`,
|
||||
},
|
||||
{
|
||||
type: 'button',
|
||||
text: { type: 'plain_text', text: '🔇 Suppress' },
|
||||
action_id: `suppress_incident:${incident.incidentId}`,
|
||||
},
|
||||
],
|
||||
},
|
||||
];
|
||||
|
||||
try {
|
||||
const res = await fetch(this.webhookUrl, {
|
||||
method: 'POST',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify({ blocks }),
|
||||
});
|
||||
|
||||
if (!res.ok) {
|
||||
logger.warn({ status: res.status, incidentId: incident.incidentId }, 'Slack notification failed');
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
} catch (err) {
|
||||
logger.error({ error: (err as Error).message, incidentId: incident.incidentId }, 'Slack send error');
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// --- Email (via Resend) ---
|
||||
|
||||
export class EmailNotifier implements NotificationChannel {
|
||||
private apiKey: string;
|
||||
private from: string;
|
||||
private to: string;
|
||||
|
||||
constructor(apiKey: string, from: string, to: string) {
|
||||
this.apiKey = apiKey;
|
||||
this.from = from;
|
||||
this.to = to;
|
||||
}
|
||||
|
||||
async send(incident: IncidentNotification): Promise<boolean> {
|
||||
try {
|
||||
const res = await fetch('https://api.resend.com/emails', {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
'Authorization': `Bearer ${this.apiKey}`,
|
||||
},
|
||||
body: JSON.stringify({
|
||||
from: this.from,
|
||||
to: this.to,
|
||||
subject: `[dd0c/alert] ${incident.severity.toUpperCase()}: ${incident.title}`,
|
||||
html: `
|
||||
<h2>${incident.title}</h2>
|
||||
<p><strong>Severity:</strong> ${incident.severity}</p>
|
||||
<p><strong>Service:</strong> ${incident.service}</p>
|
||||
<p><strong>Alerts grouped:</strong> ${incident.alertCount}</p>
|
||||
<p><a href="${incident.dashboardUrl}">View in Dashboard</a></p>
|
||||
`,
|
||||
}),
|
||||
});
|
||||
|
||||
if (!res.ok) {
|
||||
logger.warn({ status: res.status }, 'Email notification failed');
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
} catch (err) {
|
||||
logger.error({ error: (err as Error).message }, 'Email send error');
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// --- Webhook (generic) ---
|
||||
|
||||
export class WebhookNotifier implements NotificationChannel {
|
||||
private url: string;
|
||||
|
||||
constructor(url: string) {
|
||||
this.url = url;
|
||||
}
|
||||
|
||||
async send(incident: IncidentNotification): Promise<boolean> {
|
||||
try {
|
||||
const res = await fetch(this.url, {
|
||||
method: 'POST',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify(incident),
|
||||
});
|
||||
return res.ok;
|
||||
} catch (err) {
|
||||
logger.error({ error: (err as Error).message }, 'Webhook send error');
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// --- Dispatcher ---
|
||||
|
||||
const SEVERITY_ORDER: Record<string, number> = { critical: 5, high: 4, medium: 3, low: 2, info: 1 };
|
||||
|
||||
export class NotificationDispatcher {
|
||||
private channels: Array<{ channel: NotificationChannel; minSeverity: string }> = [];
|
||||
|
||||
addChannel(channel: NotificationChannel, minSeverity = 'medium') {
|
||||
this.channels.push({ channel, minSeverity });
|
||||
}
|
||||
|
||||
async dispatch(incident: IncidentNotification): Promise<number> {
|
||||
const incidentLevel = SEVERITY_ORDER[incident.severity] ?? 0;
|
||||
let sent = 0;
|
||||
|
||||
for (const { channel, minSeverity } of this.channels) {
|
||||
const minLevel = SEVERITY_ORDER[minSeverity] ?? 0;
|
||||
if (incidentLevel >= minLevel) {
|
||||
const ok = await channel.send(incident);
|
||||
if (ok) sent++;
|
||||
}
|
||||
}
|
||||
|
||||
logger.info({ incidentId: incident.incidentId, sent, total: this.channels.length }, 'Notifications dispatched');
|
||||
return sent;
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert a shipped correlation window into a notification.
|
||||
*/
|
||||
static fromWindow(window: CorrelationWindow, baseUrl: string): IncidentNotification {
|
||||
const topSeverity = window.alerts.reduce((max, a) => {
|
||||
return (SEVERITY_ORDER[a.severity] ?? 0) > (SEVERITY_ORDER[max] ?? 0) ? a.severity : max;
|
||||
}, 'info');
|
||||
|
||||
return {
|
||||
incidentId: window.incidentId ?? 'unknown',
|
||||
title: window.alerts[0]?.title ?? 'Alert Incident',
|
||||
severity: topSeverity,
|
||||
service: window.service,
|
||||
alertCount: window.alerts.length,
|
||||
firstAlertAt: new Date(window.openedAt),
|
||||
fingerprint: window.fingerprint,
|
||||
dashboardUrl: `${baseUrl}/incidents/${window.incidentId}`,
|
||||
};
|
||||
}
|
||||
}
|
||||
89
products/05-aws-cost-anomaly/src/notifications/slack.ts
Normal file
89
products/05-aws-cost-anomaly/src/notifications/slack.ts
Normal file
@@ -0,0 +1,89 @@
|
||||
import pino from 'pino';
|
||||
|
||||
const logger = pino({ name: 'cost-notifications' });
|
||||
|
||||
export interface CostAnomalyNotification {
|
||||
anomalyId: string;
|
||||
accountId: string;
|
||||
resourceType: string;
|
||||
region: string;
|
||||
hourlyCost: number;
|
||||
score: number;
|
||||
baselineMean: number;
|
||||
baselineStddev: number;
|
||||
dashboardUrl: string;
|
||||
}
|
||||
|
||||
// --- Slack Block Kit for Cost Anomalies ---
|
||||
|
||||
export class CostSlackNotifier {
|
||||
private webhookUrl: string;
|
||||
|
||||
constructor(webhookUrl: string) {
|
||||
this.webhookUrl = webhookUrl;
|
||||
}
|
||||
|
||||
async send(anomaly: CostAnomalyNotification): Promise<boolean> {
|
||||
const scoreEmoji = anomaly.score >= 75 ? '🔴' : anomaly.score >= 50 ? '🟠' : '🟡';
|
||||
const deviation = anomaly.baselineStddev > 0
|
||||
? ((anomaly.hourlyCost - anomaly.baselineMean) / anomaly.baselineStddev).toFixed(1)
|
||||
: '∞';
|
||||
|
||||
const blocks = [
|
||||
{
|
||||
type: 'header',
|
||||
text: { type: 'plain_text', text: `${scoreEmoji} Cost Anomaly Detected`, emoji: true },
|
||||
},
|
||||
{
|
||||
type: 'section',
|
||||
fields: [
|
||||
{ type: 'mrkdwn', text: `*Resource:*\n\`${anomaly.resourceType}\`` },
|
||||
{ type: 'mrkdwn', text: `*Account:*\n${anomaly.accountId}` },
|
||||
{ type: 'mrkdwn', text: `*Region:*\n${anomaly.region}` },
|
||||
{ type: 'mrkdwn', text: `*Score:*\n${anomaly.score}/100` },
|
||||
{ type: 'mrkdwn', text: `*Hourly Cost:*\n$${anomaly.hourlyCost.toFixed(4)}` },
|
||||
{ type: 'mrkdwn', text: `*Baseline:*\n$${anomaly.baselineMean.toFixed(4)} ± ${anomaly.baselineStddev.toFixed(4)}` },
|
||||
{ type: 'mrkdwn', text: `*Deviation:*\n${deviation}σ` },
|
||||
],
|
||||
},
|
||||
{
|
||||
type: 'actions',
|
||||
elements: [
|
||||
{
|
||||
type: 'button',
|
||||
text: { type: 'plain_text', text: '📊 View Details' },
|
||||
url: anomaly.dashboardUrl,
|
||||
action_id: `view_anomaly:${anomaly.anomalyId}`,
|
||||
},
|
||||
{
|
||||
type: 'button',
|
||||
text: { type: 'plain_text', text: '✅ Expected' },
|
||||
action_id: `mark_expected:${anomaly.anomalyId}`,
|
||||
},
|
||||
{
|
||||
type: 'button',
|
||||
text: { type: 'plain_text', text: '😴 Snooze 24h' },
|
||||
action_id: `snooze_anomaly:${anomaly.anomalyId}`,
|
||||
},
|
||||
],
|
||||
},
|
||||
];
|
||||
|
||||
try {
|
||||
const res = await fetch(this.webhookUrl, {
|
||||
method: 'POST',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify({ blocks }),
|
||||
});
|
||||
|
||||
if (!res.ok) {
|
||||
logger.warn({ status: res.status, anomalyId: anomaly.anomalyId }, 'Slack cost notification failed');
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
} catch (err) {
|
||||
logger.error({ error: (err as Error).message }, 'Slack cost send error');
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,4 +1,5 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
use anyhow::{Context, Result};
|
||||
|
||||
/// Parsed runbook step.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
@@ -6,47 +7,217 @@ pub struct RunbookStep {
|
||||
pub index: usize,
|
||||
pub description: String,
|
||||
pub command: String,
|
||||
#[serde(default = "default_timeout")]
|
||||
pub timeout_seconds: u64,
|
||||
#[serde(default)]
|
||||
pub on_failure: FailureAction,
|
||||
pub condition: Option<String>, // Optional: only run if previous step output matches
|
||||
pub condition: Option<String>,
|
||||
}
|
||||
|
||||
fn default_timeout() -> u64 { 300 }
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[serde(tag = "action", rename_all = "snake_case")]
|
||||
pub enum FailureAction {
|
||||
Abort,
|
||||
Continue,
|
||||
Retry { max_attempts: u32 },
|
||||
}
|
||||
|
||||
impl Default for FailureAction {
|
||||
fn default() -> Self { FailureAction::Abort }
|
||||
}
|
||||
|
||||
/// Parsed runbook.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct Runbook {
|
||||
pub name: String,
|
||||
#[serde(default)]
|
||||
pub description: String,
|
||||
#[serde(default = "default_version")]
|
||||
pub version: String,
|
||||
pub steps: Vec<RunbookStep>,
|
||||
#[serde(default)]
|
||||
pub variables: std::collections::HashMap<String, VariableSpec>,
|
||||
}
|
||||
|
||||
fn default_version() -> String { "0.1.0".into() }
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct VariableSpec {
|
||||
pub description: String,
|
||||
pub default: Option<String>,
|
||||
#[serde(default)]
|
||||
pub required: bool,
|
||||
}
|
||||
|
||||
/// Parse a YAML runbook into structured steps.
|
||||
pub fn parse_yaml(content: &str) -> anyhow::Result<Runbook> {
|
||||
// TODO: Full YAML parsing with serde_yaml
|
||||
// For now, return a placeholder
|
||||
Ok(Runbook {
|
||||
name: "placeholder".into(),
|
||||
description: "TODO: implement YAML parser".into(),
|
||||
version: "0.1.0".into(),
|
||||
steps: vec![],
|
||||
})
|
||||
///
|
||||
/// Example YAML:
|
||||
/// ```yaml
|
||||
/// name: restart-service
|
||||
/// description: Restart a stuck ECS service
|
||||
/// variables:
|
||||
/// cluster:
|
||||
/// description: ECS cluster name
|
||||
/// required: true
|
||||
/// service:
|
||||
/// description: ECS service name
|
||||
/// required: true
|
||||
/// steps:
|
||||
/// - description: Check current task count
|
||||
/// command: "aws ecs describe-services --cluster {{cluster}} --services {{service}}"
|
||||
/// timeout_seconds: 30
|
||||
/// - description: Force new deployment
|
||||
/// command: "aws ecs update-service --cluster {{cluster}} --service {{service}} --force-new-deployment"
|
||||
/// timeout_seconds: 60
|
||||
/// on_failure:
|
||||
/// action: abort
|
||||
/// - description: Wait for stable
|
||||
/// command: "aws ecs wait services-stable --cluster {{cluster}} --services {{service}}"
|
||||
/// timeout_seconds: 300
|
||||
/// on_failure:
|
||||
/// action: retry
|
||||
/// max_attempts: 3
|
||||
/// ```
|
||||
pub fn parse_yaml(content: &str) -> Result<Runbook> {
|
||||
let mut runbook: Runbook = serde_yaml::from_str(content)
|
||||
.context("Failed to parse runbook YAML")?;
|
||||
|
||||
// Assign step indices
|
||||
for (i, step) in runbook.steps.iter_mut().enumerate() {
|
||||
step.index = i;
|
||||
}
|
||||
|
||||
// Validate: at least one step
|
||||
if runbook.steps.is_empty() {
|
||||
anyhow::bail!("Runbook must have at least one step");
|
||||
}
|
||||
|
||||
Ok(runbook)
|
||||
}
|
||||
|
||||
/// Substitute variables in a command string.
|
||||
/// Variables use `{{name}}` syntax.
|
||||
pub fn substitute_variables(
|
||||
command: &str,
|
||||
variables: &std::collections::HashMap<String, String>,
|
||||
) -> Result<String> {
|
||||
let mut result = command.to_string();
|
||||
for (key, value) in variables {
|
||||
result = result.replace(&format!("{{{{{}}}}}", key), value);
|
||||
}
|
||||
|
||||
// Check for unresolved variables
|
||||
if result.contains("{{") {
|
||||
let unresolved: Vec<&str> = result
|
||||
.match_indices("{{")
|
||||
.filter_map(|(start, _)| {
|
||||
result[start..].find("}}").map(|end| &result[start..start + end + 2])
|
||||
})
|
||||
.collect();
|
||||
anyhow::bail!("Unresolved variables: {:?}", unresolved);
|
||||
}
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::collections::HashMap;
|
||||
|
||||
const SAMPLE_YAML: &str = r#"
|
||||
name: restart-service
|
||||
description: Restart a stuck ECS service
|
||||
variables:
|
||||
cluster:
|
||||
description: ECS cluster name
|
||||
required: true
|
||||
service:
|
||||
description: ECS service name
|
||||
required: true
|
||||
steps:
|
||||
- description: Check current task count
|
||||
command: "aws ecs describe-services --cluster {{cluster}} --services {{service}}"
|
||||
timeout_seconds: 30
|
||||
- description: Force new deployment
|
||||
command: "aws ecs update-service --cluster {{cluster}} --service {{service}} --force-new-deployment"
|
||||
timeout_seconds: 60
|
||||
on_failure:
|
||||
action: abort
|
||||
- description: Wait for stable
|
||||
command: "aws ecs wait services-stable --cluster {{cluster}} --services {{service}}"
|
||||
timeout_seconds: 300
|
||||
on_failure:
|
||||
action: retry
|
||||
max_attempts: 3
|
||||
"#;
|
||||
|
||||
#[test]
|
||||
fn test_parse_empty_returns_placeholder() {
|
||||
let result = parse_yaml("").unwrap();
|
||||
assert_eq!(result.name, "placeholder");
|
||||
assert!(result.steps.is_empty());
|
||||
fn test_parse_valid_yaml() {
|
||||
let runbook = parse_yaml(SAMPLE_YAML).unwrap();
|
||||
assert_eq!(runbook.name, "restart-service");
|
||||
assert_eq!(runbook.steps.len(), 3);
|
||||
assert_eq!(runbook.steps[0].index, 0);
|
||||
assert_eq!(runbook.steps[2].timeout_seconds, 300);
|
||||
assert_eq!(runbook.variables.len(), 2);
|
||||
assert!(runbook.variables.get("cluster").unwrap().required);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_empty_steps_fails() {
|
||||
let yaml = "name: empty\nsteps: []";
|
||||
assert!(parse_yaml(yaml).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_invalid_yaml_fails() {
|
||||
assert!(parse_yaml("not: [valid: yaml: {{").is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_substitute_variables() {
|
||||
let mut vars = HashMap::new();
|
||||
vars.insert("cluster".into(), "prod".into());
|
||||
vars.insert("service".into(), "api".into());
|
||||
|
||||
let result = substitute_variables(
|
||||
"aws ecs describe-services --cluster {{cluster}} --services {{service}}",
|
||||
&vars,
|
||||
).unwrap();
|
||||
|
||||
assert_eq!(result, "aws ecs describe-services --cluster prod --services api");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_unresolved_variable_fails() {
|
||||
let vars = HashMap::new();
|
||||
let result = substitute_variables("echo {{missing}}", &vars);
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_default_failure_action_is_abort() {
|
||||
let yaml = r#"
|
||||
name: simple
|
||||
steps:
|
||||
- description: test
|
||||
command: echo hello
|
||||
"#;
|
||||
let runbook = parse_yaml(yaml).unwrap();
|
||||
assert!(matches!(runbook.steps[0].on_failure, FailureAction::Abort));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_default_timeout() {
|
||||
let yaml = r#"
|
||||
name: simple
|
||||
steps:
|
||||
- description: test
|
||||
command: echo hello
|
||||
"#;
|
||||
let runbook = parse_yaml(yaml).unwrap();
|
||||
assert_eq!(runbook.steps[0].timeout_seconds, 300);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user