diff --git a/products/05-aws-cost-anomaly/src/api/anomalies.ts b/products/05-aws-cost-anomaly/src/api/anomalies.ts new file mode 100644 index 0000000..42f8d23 --- /dev/null +++ b/products/05-aws-cost-anomaly/src/api/anomalies.ts @@ -0,0 +1,100 @@ +import type { FastifyInstance } from 'fastify'; +import { z } from 'zod'; +import { withTenant } from '../data/db.js'; + +const listQuerySchema = z.object({ + page: z.coerce.number().min(1).default(1), + limit: z.coerce.number().min(1).max(100).default(20), + status: z.enum(['open', 'acknowledged', 'snoozed', 'expected', 'resolved']).optional(), + account_id: z.string().optional(), + min_score: z.coerce.number().min(0).max(100).optional(), +}); + +export function registerAnomalyRoutes(app: FastifyInstance) { + // List anomalies + app.get('/api/v1/anomalies', async (req, reply) => { + const query = listQuerySchema.parse(req.query); + const tenantId = (req as any).tenantId; + const offset = (query.page - 1) * query.limit; + + const result = await withTenant(tenantId, async (client) => { + let sql = 'SELECT * FROM anomalies WHERE 1=1'; + const params: any[] = []; + let idx = 1; + + if (query.status) { sql += ` AND status = $${idx++}`; params.push(query.status); } + if (query.account_id) { sql += ` AND account_id = $${idx++}`; params.push(query.account_id); } + if (query.min_score) { sql += ` AND score >= $${idx++}`; params.push(query.min_score); } + + sql += ` ORDER BY detected_at DESC LIMIT $${idx++} OFFSET $${idx++}`; + params.push(query.limit, offset); + + return client.query(sql, params); + }); + + return { anomalies: result.rows, page: query.page, limit: query.limit }; + }); + + // Acknowledge anomaly + app.post('/api/v1/anomalies/:id/acknowledge', async (req, reply) => { + const { id } = req.params as { id: string }; + const tenantId = (req as any).tenantId; + + await withTenant(tenantId, async (client) => { + await client.query("UPDATE anomalies SET status = 'acknowledged' WHERE id = $1 AND status = 'open'", [id]); + }); + return { status: 'acknowledged' }; + }); + + // Snooze anomaly + app.post('/api/v1/anomalies/:id/snooze', async (req, reply) => { + const { id } = req.params as { id: string }; + const { hours } = z.object({ hours: z.coerce.number().min(1).max(168).default(24) }).parse(req.body ?? {}); + const tenantId = (req as any).tenantId; + + await withTenant(tenantId, async (client) => { + await client.query( + "UPDATE anomalies SET status = 'snoozed', snoozed_until = now() + $1 * interval '1 hour' WHERE id = $2", + [hours, id], + ); + }); + return { status: 'snoozed', until_hours: hours }; + }); + + // Mark as expected (recurring cost) + app.post('/api/v1/anomalies/:id/expected', async (req, reply) => { + const { id } = req.params as { id: string }; + const tenantId = (req as any).tenantId; + + await withTenant(tenantId, async (client) => { + await client.query("UPDATE anomalies SET status = 'expected' WHERE id = $1", [id]); + }); + return { status: 'expected' }; + }); + + // Dashboard summary + app.get('/api/v1/dashboard', async (req, reply) => { + const tenantId = (req as any).tenantId; + + const result = await withTenant(tenantId, async (client) => { + const openCount = await client.query("SELECT COUNT(*)::int as count FROM anomalies WHERE status = 'open'"); + const topResources = await client.query(` + SELECT resource_type, COUNT(*)::int as anomaly_count, AVG(score)::numeric(5,2) as avg_score + FROM anomalies WHERE status = 'open' + GROUP BY resource_type ORDER BY anomaly_count DESC LIMIT 10 + `); + const recentTrend = await client.query(` + SELECT date_trunc('hour', detected_at) as hour, COUNT(*)::int as count + FROM anomalies WHERE detected_at > now() - interval '24 hours' + GROUP BY hour ORDER BY hour + `); + return { + open_anomalies: openCount.rows[0]?.count ?? 0, + top_resources: topResources.rows, + hourly_trend: recentTrend.rows, + }; + }); + + return result; + }); +} diff --git a/products/05-aws-cost-anomaly/src/api/baselines.ts b/products/05-aws-cost-anomaly/src/api/baselines.ts new file mode 100644 index 0000000..70dd620 --- /dev/null +++ b/products/05-aws-cost-anomaly/src/api/baselines.ts @@ -0,0 +1,38 @@ +import type { FastifyInstance } from 'fastify'; +import { withTenant } from '../data/db.js'; + +export function registerBaselineRoutes(app: FastifyInstance) { + // List baselines + app.get('/api/v1/baselines', async (req, reply) => { + const tenantId = (req as any).tenantId; + + const result = await withTenant(tenantId, async (client) => { + return client.query(` + SELECT account_id, resource_type, welford_count as sample_count, + welford_mean::numeric(12,4) as mean, + CASE WHEN welford_count > 1 + THEN sqrt(welford_m2 / welford_count)::numeric(12,4) + ELSE 0 END as stddev, + updated_at + FROM baselines ORDER BY account_id, resource_type + `); + }); + + return { baselines: result.rows }; + }); + + // Reset baseline for a specific resource + app.delete('/api/v1/baselines/:accountId/:resourceType', async (req, reply) => { + const { accountId, resourceType } = req.params as { accountId: string; resourceType: string }; + const tenantId = (req as any).tenantId; + + await withTenant(tenantId, async (client) => { + await client.query( + 'DELETE FROM baselines WHERE account_id = $1 AND resource_type = $2', + [accountId, resourceType], + ); + }); + + return { status: 'reset', accountId, resourceType }; + }); +} diff --git a/products/05-aws-cost-anomaly/src/api/governance.ts b/products/05-aws-cost-anomaly/src/api/governance.ts new file mode 100644 index 0000000..b12b83e --- /dev/null +++ b/products/05-aws-cost-anomaly/src/api/governance.ts @@ -0,0 +1,59 @@ +import type { FastifyInstance } from 'fastify'; +import { withTenant } from '../data/db.js'; +import { GovernanceEngine } from '../governance/engine.js'; + +const engine = new GovernanceEngine(); + +export function registerGovernanceRoutes(app: FastifyInstance) { + // Get governance status + app.get('/api/v1/governance', async (req, reply) => { + const tenantId = (req as any).tenantId; + + const result = await withTenant(tenantId, async (client) => { + return client.query('SELECT governance_mode, governance_started_at FROM tenants WHERE id = $1', [tenantId]); + }); + + const tenant = result.rows[0]; + if (!tenant) return reply.status(404).send({ error: 'Tenant not found' }); + + const daysInMode = Math.floor((Date.now() - new Date(tenant.governance_started_at).getTime()) / (86400 * 1000)); + + return { + mode: tenant.governance_mode, + days_in_mode: daysInMode, + started_at: tenant.governance_started_at, + }; + }); + + // Check promotion eligibility + app.get('/api/v1/governance/promotion', async (req, reply) => { + const tenantId = (req as any).tenantId; + + const result = await withTenant(tenantId, async (client) => { + const tenant = await client.query('SELECT governance_mode, governance_started_at FROM tenants WHERE id = $1', [tenantId]); + const t = tenant.rows[0]; + if (!t) return null; + + // Calculate FP rate from last 30 days + const fpStats = await client.query(` + SELECT + COUNT(*) FILTER (WHERE status = 'expected')::float / + NULLIF(COUNT(*)::float, 0) as fp_rate + FROM anomalies + WHERE detected_at > now() - interval '30 days' + `); + + return { tenant: t, fpRate: fpStats.rows[0]?.fp_rate ?? 0 }; + }); + + if (!result) return reply.status(404).send({ error: 'Tenant not found' }); + + const daysInMode = Math.floor((Date.now() - new Date(result.tenant.governance_started_at).getTime()) / (86400 * 1000)); + const evaluation = engine.evaluatePromotion(tenantId, { + fpRate: result.fpRate, + daysInCurrentMode: daysInMode, + }); + + return { ...evaluation, current_mode: result.tenant.governance_mode, fp_rate: result.fpRate, days_in_mode: daysInMode }; + }); +} diff --git a/products/05-aws-cost-anomaly/src/api/ingestion.ts b/products/05-aws-cost-anomaly/src/api/ingestion.ts new file mode 100644 index 0000000..9d26d9e --- /dev/null +++ b/products/05-aws-cost-anomaly/src/api/ingestion.ts @@ -0,0 +1,103 @@ +import type { FastifyInstance } from 'fastify'; +import { z } from 'zod'; +import pino from 'pino'; +import { withTenant } from '../data/db.js'; +import { WelfordBaseline, scoreAnomaly, shouldAlert, type CostEvent } from '../detection/scorer.js'; +import { config } from '../config/index.js'; + +const logger = pino({ name: 'api-ingestion' }); + +const costEventSchema = z.object({ + account_id: z.string(), + resource_type: z.string(), + hourly_cost: z.number().min(0), + region: z.string().default('us-east-1'), + tags: z.record(z.string()).default({}), +}); + +const batchSchema = z.object({ + events: z.array(costEventSchema).min(1).max(100), +}); + +export function registerIngestionRoutes(app: FastifyInstance) { + // Ingest cost events (from agent or Cost Explorer poller) + app.post('/api/v1/ingest', async (req, reply) => { + const tenantId = (req as any).tenantId; + const { events } = batchSchema.parse(req.body); + + const results = await withTenant(tenantId, async (client) => { + const anomalies = []; + + for (const event of events) { + // Fetch or create baseline + const baselineRow = await client.query( + `SELECT welford_count, welford_mean, welford_m2, version + FROM baselines WHERE account_id = $1 AND resource_type = $2`, + [event.account_id, event.resource_type], + ); + + let baseline: WelfordBaseline; + let version: number; + + if (baselineRow.rows[0]) { + baseline = WelfordBaseline.fromJSON({ + count: baselineRow.rows[0].welford_count, + mean: parseFloat(baselineRow.rows[0].welford_mean), + m2: parseFloat(baselineRow.rows[0].welford_m2), + }); + version = baselineRow.rows[0].version; + } else { + baseline = new WelfordBaseline(); + version = 0; + } + + // Score before updating baseline + const score = scoreAnomaly({ + cost: event.hourly_cost, + mean: baseline.mean, + stddev: baseline.stddev, + }); + + const isAnomaly = shouldAlert(score, config.ANOMALY_THRESHOLD); + + // Update baseline with optimistic locking (BMad must-have: concurrent Welford corruption) + baseline.update(event.hourly_cost); + const bj = baseline.toJSON(); + + if (version === 0) { + await client.query( + `INSERT INTO baselines (tenant_id, account_id, resource_type, welford_count, welford_mean, welford_m2, version) + VALUES ($1, $2, $3, $4, $5, $6, 1) + ON CONFLICT (tenant_id, account_id, resource_type) DO NOTHING`, + [tenantId, event.account_id, event.resource_type, bj.count, bj.mean, bj.m2], + ); + } else { + const updated = await client.query( + `UPDATE baselines SET welford_count = $1, welford_mean = $2, welford_m2 = $3, + version = version + 1, updated_at = now() + WHERE account_id = $4 AND resource_type = $5 AND version = $6`, + [bj.count, bj.mean, bj.m2, event.account_id, event.resource_type, version], + ); + if (updated.rowCount === 0) { + logger.warn({ accountId: event.account_id, resourceType: event.resource_type }, + 'Optimistic lock conflict — baseline update skipped'); + } + } + + // Record anomaly if threshold exceeded + if (isAnomaly) { + await client.query( + `INSERT INTO anomalies (tenant_id, account_id, resource_type, region, hourly_cost, score, baseline_mean, baseline_stddev, tags) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)`, + [tenantId, event.account_id, event.resource_type, event.region, event.hourly_cost, score, baseline.mean, baseline.stddev, JSON.stringify(event.tags)], + ); + anomalies.push({ ...event, score, isAnomaly: true }); + } + } + + return anomalies; + }); + + return { ingested: events.length, anomalies: results.length, details: results }; + }); +} diff --git a/products/05-aws-cost-anomaly/src/config/index.ts b/products/05-aws-cost-anomaly/src/config/index.ts new file mode 100644 index 0000000..3dbc8ef --- /dev/null +++ b/products/05-aws-cost-anomaly/src/config/index.ts @@ -0,0 +1,14 @@ +import { z } from 'zod'; + +const envSchema = z.object({ + PORT: z.coerce.number().default(3000), + DATABASE_URL: z.string().default('postgresql://localhost:5432/dd0c_cost'), + REDIS_URL: z.string().default('redis://localhost:6379'), + JWT_SECRET: z.string().min(32).default('dev-secret-change-me-in-production!!'), + CORS_ORIGIN: z.string().default('*'), + LOG_LEVEL: z.string().default('info'), + ANOMALY_THRESHOLD: z.coerce.number().default(50), +}); + +export const config = envSchema.parse(process.env); +export type Config = z.infer; diff --git a/products/05-aws-cost-anomaly/src/data/db.ts b/products/05-aws-cost-anomaly/src/data/db.ts new file mode 100644 index 0000000..85cd7f5 --- /dev/null +++ b/products/05-aws-cost-anomaly/src/data/db.ts @@ -0,0 +1,24 @@ +import pg from 'pg'; +import pino from 'pino'; +import { config } from '../config/index.js'; + +const logger = pino({ name: 'data' }); + +export const pool = new pg.Pool({ connectionString: config.DATABASE_URL }); + +export async function withTenant(tenantId: string, fn: (client: pg.PoolClient) => Promise): Promise { + const client = await pool.connect(); + try { + await client.query('BEGIN'); + await client.query(`SET LOCAL app.tenant_id = '${tenantId}'`); + const result = await fn(client); + await client.query('COMMIT'); + return result; + } catch (err) { + await client.query('ROLLBACK'); + throw err; + } finally { + await client.query('RESET app.tenant_id'); + client.release(); + } +} diff --git a/products/05-aws-cost-anomaly/src/index.ts b/products/05-aws-cost-anomaly/src/index.ts new file mode 100644 index 0000000..92d9ba4 --- /dev/null +++ b/products/05-aws-cost-anomaly/src/index.ts @@ -0,0 +1,29 @@ +import Fastify from 'fastify'; +import cors from '@fastify/cors'; +import pino from 'pino'; +import { config } from './config/index.js'; +import { registerAnomalyRoutes } from './api/anomalies.js'; +import { registerBaselineRoutes } from './api/baselines.js'; +import { registerGovernanceRoutes } from './api/governance.js'; +import { registerIngestionRoutes } from './api/ingestion.js'; + +const logger = pino({ name: 'dd0c-cost', level: config.LOG_LEVEL }); + +const app = Fastify({ logger: true }); + +await app.register(cors, { origin: config.CORS_ORIGIN }); + +app.get('/health', async () => ({ status: 'ok', service: 'dd0c-cost' })); + +registerIngestionRoutes(app); +registerAnomalyRoutes(app); +registerBaselineRoutes(app); +registerGovernanceRoutes(app); + +try { + await app.listen({ port: config.PORT, host: '0.0.0.0' }); + logger.info({ port: config.PORT }, 'dd0c/cost started'); +} catch (err) { + logger.fatal(err, 'Failed to start'); + process.exit(1); +}