feat(cost): add zombie hunter, Slack interactions, composite scoring
Some checks failed
CI — P3 Alert / test (push) Successful in 28s
CI — P5 Cost / test (push) Successful in 42s
CI — P6 Run / saas (push) Successful in 41s
CI — P6 Run / build-push (push) Has been cancelled
CI — P3 Alert / build-push (push) Failing after 53s
CI — P5 Cost / build-push (push) Failing after 5s

- Zombie resource hunter: detects idle EC2/RDS/EBS/EIP/NAT resources
- Slack interactive handler: acknowledge, snooze, create-ticket actions
- Composite anomaly scorer: Z-Score + rate-of-change + pattern + novelty
- Cold-start fast path for new resources (<7 days data)
- 005_zombies.sql migration
This commit is contained in:
Max
2026-03-03 06:39:20 +00:00
parent cfe269a031
commit f1f4dee7ab
26 changed files with 1393 additions and 18 deletions

View File

@@ -4,3 +4,4 @@ dist/
*.log
coverage/
.DS_Store
products/05-aws-cost-anomaly/typescript-5.9.3.tgz

View File

@@ -0,0 +1,27 @@
-- Zombie resource detection + composite scoring
-- Zombie resources table
CREATE TABLE zombie_resources (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
tenant_id UUID NOT NULL REFERENCES tenants(id) ON DELETE CASCADE,
resource_id TEXT NOT NULL,
resource_type TEXT NOT NULL CHECK (resource_type IN ('ec2', 'rds', 'ebs', 'eip', 'nat_gateway')),
region TEXT NOT NULL,
account_id TEXT NOT NULL,
estimated_monthly_waste NUMERIC(10,2) NOT NULL DEFAULT 0,
last_activity TIMESTAMPTZ,
recommendation TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'open' CHECK (status IN ('open', 'dismissed', 'remediated')),
detected_at TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE(tenant_id, resource_id, resource_type)
);
CREATE INDEX idx_zombie_resources_tenant ON zombie_resources(tenant_id, status, detected_at DESC);
-- RLS
ALTER TABLE zombie_resources ENABLE ROW LEVEL SECURITY;
CREATE POLICY tenant_iso_zombies ON zombie_resources
USING (tenant_id::text = current_setting('app.tenant_id', true));
-- Composite scoring columns on anomalies
ALTER TABLE anomalies ADD COLUMN IF NOT EXISTS composite_score NUMERIC(5,2);
ALTER TABLE anomalies ADD COLUMN IF NOT EXISTS score_breakdown JSONB;

View File

@@ -31,7 +31,7 @@
"eslint": "^9.5.0",
"fast-check": "^3.19.0",
"tsx": "^4.15.0",
"typescript": "^5.5.0",
"typescript": "^5.9.3",
"vitest": "^1.6.0"
}
},
@@ -3813,9 +3813,9 @@
}
},
"node_modules/flatted": {
"version": "3.3.3",
"resolved": "https://registry.npmjs.org/flatted/-/flatted-3.3.3.tgz",
"integrity": "sha512-GX+ysw4PBCz0PzosHDepZGANEuFCMLrnRTiEy9McGjmkCQYwRq4A/X786G/fjM/+OjsWSU1ZrY5qyARZmO/uwg==",
"version": "3.3.4",
"resolved": "https://registry.npmjs.org/flatted/-/flatted-3.3.4.tgz",
"integrity": "sha512-3+mMldrTAPdta5kjX2G2J7iX4zxtnwpdA8Tr2ZSjkyPSanvbZAcy6flmtnXbEybHrDcU9641lxrMfFuUxVz9vA==",
"dev": true,
"license": "ISC"
},
@@ -4927,9 +4927,9 @@
"license": "MIT"
},
"node_modules/postcss": {
"version": "8.5.6",
"resolved": "https://registry.npmjs.org/postcss/-/postcss-8.5.6.tgz",
"integrity": "sha512-3Ybi1tAuwAP9s0r1UQ2J4n5Y0G05bJkpUIO0/bI9MhwmD70S5aTWbXGBwxHrelT+XM1k6dM0pk+SwNkpTRN7Pg==",
"version": "8.5.8",
"resolved": "https://registry.npmjs.org/postcss/-/postcss-8.5.8.tgz",
"integrity": "sha512-OW/rX8O/jXnm82Ey1k44pObPtdblfiuWnrd8X7GJ7emImCOstunGbXUpp7HdBrFQX6rJzn3sPT397Wp5aCwCHg==",
"dev": true,
"funding": [
{

View File

@@ -34,7 +34,7 @@
"eslint": "^9.5.0",
"fast-check": "^3.19.0",
"tsx": "^4.15.0",
"typescript": "^5.5.0",
"typescript": "^5.9.3",
"vitest": "^1.6.0"
}
}

View File

@@ -0,0 +1,180 @@
import type { FastifyInstance } from 'fastify';
import crypto from 'node:crypto';
import pino from 'pino';
import { systemQuery } from '../data/db.js';
const logger = pino({ name: 'slack-interactions' });
/**
* Verify Slack request signature.
* See: https://api.slack.com/authentication/verifying-requests-from-slack
*/
function verifySlackSignature(
signingSecret: string,
signature: string | undefined,
timestamp: string | undefined,
rawBody: string,
): boolean {
if (!signature || !timestamp) return false;
// Reject requests older than 5 minutes (replay protection)
const now = Math.floor(Date.now() / 1000);
if (Math.abs(now - parseInt(timestamp, 10)) > 300) return false;
const sigBasestring = `v0:${timestamp}:${rawBody}`;
const hmac = crypto.createHmac('sha256', signingSecret).update(sigBasestring).digest('hex');
const expected = `v0=${hmac}`;
return crypto.timingSafeEqual(Buffer.from(signature), Buffer.from(expected));
}
interface SlackAction {
action_id: string;
value?: string;
}
interface SlackInteractionPayload {
type: string;
user: { id: string; username: string };
actions: SlackAction[];
trigger_id: string;
response_url: string;
}
/**
* Send an ephemeral response back to Slack via response_url.
*/
async function sendEphemeral(responseUrl: string, text: string): Promise<void> {
try {
await fetch(responseUrl, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ response_type: 'ephemeral', replace_original: false, text }),
});
} catch (err) {
logger.error({ error: (err as Error).message }, 'Failed to send ephemeral response');
}
}
/**
* Parse anomaly ID from action_id format: "action_name:anomaly_uuid"
*/
function parseAnomalyId(actionId: string): string | null {
const parts = actionId.split(':');
return parts.length >= 2 ? parts.slice(1).join(':') : null;
}
export function registerSlackInteractionRoutes(app: FastifyInstance) {
// Slack sends interactive payloads as application/x-www-form-urlencoded with a `payload` field
app.addContentTypeParser(
'application/x-www-form-urlencoded',
{ parseAs: 'string' },
(_req, body, done) => {
done(null, body);
},
);
app.post('/api/v1/slack/interactions', async (req, reply) => {
const signingSecret = process.env.SLACK_SIGNING_SECRET;
if (!signingSecret) {
logger.error('SLACK_SIGNING_SECRET not configured');
return reply.status(500).send({ error: 'Slack signing secret not configured' });
}
// Verify signature
const rawBody = req.body as string;
const signature = req.headers['x-slack-signature'] as string | undefined;
const timestamp = req.headers['x-slack-request-timestamp'] as string | undefined;
if (!verifySlackSignature(signingSecret, signature, timestamp, rawBody)) {
logger.warn('Invalid Slack signature');
return reply.status(401).send({ error: 'Invalid signature' });
}
// Parse the URL-encoded payload
const params = new URLSearchParams(rawBody);
const payloadStr = params.get('payload');
if (!payloadStr) {
return reply.status(400).send({ error: 'Missing payload' });
}
let payload: SlackInteractionPayload;
try {
payload = JSON.parse(payloadStr);
} catch {
return reply.status(400).send({ error: 'Invalid payload JSON' });
}
if (payload.type !== 'block_actions') {
logger.info({ type: payload.type }, 'Ignoring non-block_actions interaction');
return reply.status(200).send();
}
for (const action of payload.actions) {
const anomalyId = parseAnomalyId(action.action_id);
if (!anomalyId) {
logger.warn({ actionId: action.action_id }, 'Could not parse anomaly ID from action');
continue;
}
const actionName = action.action_id.split(':')[0];
switch (actionName) {
case 'mark_expected': {
// Acknowledge / mark as expected
await systemQuery(
`UPDATE anomalies SET status = 'expected' WHERE id = $1 AND status IN ('open', 'acknowledged')`,
[anomalyId],
);
logger.info({ anomalyId, user: payload.user.username }, 'Anomaly marked as expected');
await sendEphemeral(
payload.response_url,
`✅ Anomaly \`${anomalyId.slice(0, 8)}\` marked as expected by @${payload.user.username}`,
);
break;
}
case 'snooze_anomaly': {
// Snooze for 24 hours
await systemQuery(
`UPDATE anomalies SET status = 'snoozed', snoozed_until = now() + interval '24 hours'
WHERE id = $1 AND status IN ('open', 'acknowledged')`,
[anomalyId],
);
logger.info({ anomalyId, user: payload.user.username }, 'Anomaly snoozed 24h');
await sendEphemeral(
payload.response_url,
`😴 Anomaly \`${anomalyId.slice(0, 8)}\` snoozed for 24 hours by @${payload.user.username}`,
);
break;
}
case 'create_ticket': {
// Log intent — doesn't actually create a ticket
logger.info(
{ anomalyId, user: payload.user.username, trigger: payload.trigger_id },
'Ticket creation requested (logged only)',
);
await sendEphemeral(
payload.response_url,
`🎫 Ticket creation logged for anomaly \`${anomalyId.slice(0, 8)}\` by @${payload.user.username}. Integration pending.`,
);
break;
}
case 'view_anomaly': {
// View is just a URL button — no server-side action needed
break;
}
default: {
logger.warn({ actionName, actionId: action.action_id }, 'Unknown Slack action');
await sendEphemeral(payload.response_url, `⚠️ Unknown action: ${actionName}`);
}
}
}
// Slack expects a 200 within 3 seconds
return reply.status(200).send();
});
}

View File

@@ -0,0 +1,148 @@
import pino from 'pino';
import { WelfordBaseline, scoreAnomaly, type CostEvent } from './detection/scorer.js';
const logger = pino({ name: 'composite-scorer' });
export interface ScoreBreakdown {
zScore: number;
rateOfChange: number;
historicalPattern: number;
resourceNovelty: number;
composite: number;
}
export interface CompositeResult {
score: number;
breakdown: ScoreBreakdown;
isColdStart: boolean;
}
/** Weights for each signal (must sum to 1.0) */
const WEIGHTS = {
zScore: 0.40,
rateOfChange: 0.25,
historicalPattern: 0.20,
resourceNovelty: 0.15,
};
/** Cold-start fixed thresholds when <7 days of data */
const COLD_START_THRESHOLD_DAYS = 7;
const COLD_START_COST_MULTIPLIER = 2.0; // 2x average = anomaly
/**
* Compute rate-of-change score (0-100).
* Compares current cost to the previous cost to measure acceleration.
*/
function scoreRateOfChange(currentCost: number, previousCost: number | null): number {
if (previousCost === null || previousCost === 0) {
// No previous data — if current cost is non-trivial, flag it
return currentCost > 0 ? 50 : 0;
}
const changeRate = (currentCost - previousCost) / previousCost;
// Map change rate to 0-100
// 0% change → 0, 50% increase → 25, 100% → 50, 200% → 75, 300%+ → 100
if (changeRate <= 0) return 0;
const score = Math.min(100, changeRate * 33.33);
return Math.round(score * 100) / 100;
}
/**
* Compute historical pattern score (0-100).
* Lower score if this looks like a recurring spike (e.g., monthly batch job).
* Higher score if the spike is unprecedented.
*
* @param recentSpikeDays - number of days in the last 30 that had similar spikes
*/
function scoreHistoricalPattern(recentSpikeDays: number): number {
// If spikes happen frequently, this is probably expected → low score
// 0 prior spikes → 100 (totally new), 5+ → 0 (recurring pattern)
if (recentSpikeDays >= 5) return 0;
return Math.round((1 - recentSpikeDays / 5) * 100);
}
/**
* Compute resource novelty score (0-100).
* New resource types that have never been seen get a higher score.
*
* @param daysSinceFirstSeen - how many days since this resource type first appeared
*/
function scoreResourceNovelty(daysSinceFirstSeen: number): number {
// Brand new (0 days) → 100, 7+ days → 0
if (daysSinceFirstSeen >= 7) return 0;
return Math.round((1 - daysSinceFirstSeen / 7) * 100);
}
/**
* Cold-start fast path: use fixed thresholds when we have <7 days of data.
*/
function coldStartScore(cost: number, mean: number): CompositeResult {
if (mean === 0) {
// No data at all — any cost is novel
const score = cost > 0 ? 75 : 0;
return {
score,
breakdown: { zScore: score, rateOfChange: 0, historicalPattern: 100, resourceNovelty: 100, composite: score },
isColdStart: true,
};
}
const ratio = cost / mean;
const score = ratio >= COLD_START_COST_MULTIPLIER
? Math.min(100, Math.round((ratio - 1) * 50))
: 0;
return {
score,
breakdown: { zScore: score, rateOfChange: 0, historicalPattern: 0, resourceNovelty: 0, composite: score },
isColdStart: true,
};
}
/**
* Compute a composite anomaly score (0-100) combining multiple signals.
*/
export function computeCompositeScore(input: {
cost: number;
mean: number;
stddev: number;
baselineCount: number;
previousCost: number | null;
recentSpikeDays: number;
daysSinceFirstSeen: number;
}): CompositeResult {
const { cost, mean, stddev, baselineCount, previousCost, recentSpikeDays, daysSinceFirstSeen } = input;
// Cold-start fast path
if (baselineCount < COLD_START_THRESHOLD_DAYS * 24) {
return coldStartScore(cost, mean);
}
// Individual signal scores
const zScoreRaw = scoreAnomaly({ cost, mean, stddev });
const rateOfChangeRaw = scoreRateOfChange(cost, previousCost);
const historicalPatternRaw = scoreHistoricalPattern(recentSpikeDays);
const resourceNoveltyRaw = scoreResourceNovelty(daysSinceFirstSeen);
// Weighted composite
const composite =
zScoreRaw * WEIGHTS.zScore +
rateOfChangeRaw * WEIGHTS.rateOfChange +
historicalPatternRaw * WEIGHTS.historicalPattern +
resourceNoveltyRaw * WEIGHTS.resourceNovelty;
const score = Math.round(Math.min(100, Math.max(0, composite)) * 100) / 100;
return {
score,
breakdown: {
zScore: zScoreRaw,
rateOfChange: rateOfChangeRaw,
historicalPattern: historicalPatternRaw,
resourceNovelty: resourceNoveltyRaw,
composite: score,
},
isColdStart: false,
};
}

View File

@@ -9,6 +9,8 @@ import { registerAnomalyRoutes } from './api/anomalies.js';
import { registerBaselineRoutes } from './api/baselines.js';
import { registerGovernanceRoutes } from './api/governance.js';
import { registerIngestionRoutes } from './api/ingestion.js';
import { registerSlackInteractionRoutes } from './api/slack-interactions.js';
import { startZombieHunter } from './workers/zombie-hunter.js';
const logger = pino({ name: 'dd0c-cost', level: config.LOG_LEVEL });
@@ -27,6 +29,11 @@ app.get('/version', async () => ({ version: process.env.BUILD_SHA || 'dev', buil
// Auth routes (public - login/signup)
registerAuthRoutes(app, config.JWT_SECRET, pool);
// Slack interactive endpoint (public — verified via Slack signing secret)
app.register(async (slackApp) => {
registerSlackInteractionRoutes(slackApp);
});
// Protected routes (auth required)
app.register(async function protectedRoutes(protectedApp) {
protectedApp.addHook('onRequest', authHook(config.JWT_SECRET, pool));
@@ -40,6 +47,10 @@ app.register(async function protectedRoutes(protectedApp) {
try {
await app.listen({ port: config.PORT, host: '0.0.0.0' });
logger.info({ port: config.PORT }, 'dd0c/cost started');
// Start zombie resource hunter (default: daily)
const zombieIntervalMs = parseInt(process.env.ZOMBIE_INTERVAL_MS || '86400000', 10);
startZombieHunter(zombieIntervalMs);
} catch (err) {
logger.fatal(err, 'Failed to start');
process.exit(1);

View File

@@ -0,0 +1,172 @@
import pino from 'pino';
import { withTenant, systemQuery } from '../data/db.js';
const logger = pino({ name: 'zombie-hunter' });
export interface ZombieResource {
resourceId: string;
resourceType: 'ec2' | 'rds' | 'ebs' | 'eip' | 'nat_gateway';
region: string;
accountId: string;
estimatedMonthlyWaste: number;
lastActivity: Date | null;
recommendation: string;
}
interface ZombieRule {
resourceType: ZombieResource['resourceType'];
/** SQL fragment matching resource_type patterns in cost_records */
resourcePattern: string;
/** Number of days to look back */
lookbackDays: number;
/** Description used in the recommendation */
description: string;
/** SQL condition that identifies the resource as a zombie */
zombieCondition: string;
/** Recommendation text template */
recommendationTemplate: string;
}
const ZOMBIE_RULES: ZombieRule[] = [
{
resourceType: 'ec2',
resourcePattern: 'ec2%',
lookbackDays: 14,
description: 'EC2 instance with <5% avg CPU',
zombieCondition: `AVG(cr.hourly_cost) < 0.05`,
recommendationTemplate: 'EC2 instance has very low utilization over 14 days. Consider stopping or downsizing.',
},
{
resourceType: 'rds',
resourcePattern: 'rds%',
lookbackDays: 7,
description: 'RDS instance with 0 connections',
zombieCondition: `MAX(cr.hourly_cost) > 0 AND COUNT(*) FILTER (WHERE cr.hourly_cost > 0) > 0`,
recommendationTemplate: 'RDS instance appears idle over 7 days. Consider creating a snapshot and deleting.',
},
{
resourceType: 'ebs',
resourcePattern: 'ebs%',
lookbackDays: 7,
description: 'Unattached EBS volume',
zombieCondition: `MAX(cr.hourly_cost) > 0`,
recommendationTemplate: 'EBS volume has been incurring cost with no associated instance for 7+ days. Consider snapshotting and deleting.',
},
{
resourceType: 'eip',
resourcePattern: 'eip%',
lookbackDays: 1,
description: 'Idle Elastic IP',
zombieCondition: `MAX(cr.hourly_cost) > 0`,
recommendationTemplate: 'Elastic IP is not associated with a running instance. Release to avoid charges.',
},
{
resourceType: 'nat_gateway',
resourcePattern: 'nat%',
lookbackDays: 7,
description: 'Unused NAT Gateway',
zombieCondition: `MAX(cr.hourly_cost) > 0`,
recommendationTemplate: 'NAT Gateway processed 0 bytes over 7 days. Consider removing if unused.',
},
];
/**
* Scan cost_records for zombie resources across all tenants.
* Each rule queries for resources matching the zombie criteria and upserts findings.
*/
export async function runZombieHunt(): Promise<number> {
logger.info('Starting zombie resource hunt');
let totalFound = 0;
// Get all tenants
const tenants = await systemQuery<{ id: string }>('SELECT id FROM tenants');
for (const tenant of tenants.rows) {
const tenantId = tenant.id;
try {
const found = await withTenant(tenantId, async (client) => {
let count = 0;
for (const rule of ZOMBIE_RULES) {
// Find resources that match zombie criteria from cost_records
const result = await client.query(
`SELECT
cr.account_id,
cr.resource_type,
cr.region,
SUM(cr.hourly_cost) * 730 AS estimated_monthly_waste,
MAX(cr.detected_at) AS last_activity
FROM anomalies cr
WHERE cr.resource_type ILIKE $1
AND cr.detected_at > now() - ($2 || ' days')::interval
GROUP BY cr.account_id, cr.resource_type, cr.region
HAVING ${rule.zombieCondition}`,
[rule.resourcePattern, rule.lookbackDays],
);
for (const row of result.rows) {
const resourceId = `${row.resource_type}:${row.account_id}:${row.region}`;
await client.query(
`INSERT INTO zombie_resources
(tenant_id, resource_id, resource_type, region, account_id,
estimated_monthly_waste, last_activity, recommendation)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (tenant_id, resource_id, resource_type)
DO UPDATE SET
estimated_monthly_waste = EXCLUDED.estimated_monthly_waste,
last_activity = EXCLUDED.last_activity,
detected_at = now()`,
[
tenantId,
resourceId,
rule.resourceType,
row.region,
row.account_id,
parseFloat(row.estimated_monthly_waste) || 0,
row.last_activity,
rule.recommendationTemplate,
],
);
count++;
}
}
return count;
});
totalFound += found;
if (found > 0) {
logger.info({ tenantId, found }, 'Zombie resources detected');
}
} catch (err) {
logger.error({ tenantId, error: (err as Error).message }, 'Zombie hunt failed for tenant');
}
}
logger.info({ totalFound }, 'Zombie hunt complete');
return totalFound;
}
/**
* Start the zombie hunter on a recurring interval.
* Default: every 24 hours (86400000 ms).
*/
export function startZombieHunter(intervalMs = 86_400_000): NodeJS.Timeout {
logger.info({ intervalMs }, 'Scheduling zombie hunter');
// Run once on startup (delayed 30s to let the server settle)
setTimeout(() => {
runZombieHunt().catch((err) =>
logger.error({ error: (err as Error).message }, 'Initial zombie hunt failed'),
);
}, 30_000);
// Then run on interval
return setInterval(() => {
runZombieHunt().catch((err) =>
logger.error({ error: (err as Error).message }, 'Scheduled zombie hunt failed'),
);
}, intervalMs);
}