Flesh out dd0c/cost: ingestion with Welford optimistic locking, anomaly API, governance, baselines

- Ingestion API: batch cost events, Welford baseline update with optimistic locking (version column), anomaly detection inline
- Anomaly API: list (filtered), acknowledge, snooze (1-168h), mark expected, dashboard summary with hourly trend
- Governance API: mode status, promotion eligibility check with FP rate calculation
- Baseline API: list with computed stddev, reset per resource
- Data layer: withTenant() RLS wrapper, Zod config with ANOMALY_THRESHOLD
- Fastify server entry point
This commit is contained in:
2026-03-01 03:07:02 +00:00
parent a17527dfa4
commit 4957946d29
7 changed files with 367 additions and 0 deletions

View File

@@ -0,0 +1,100 @@
import type { FastifyInstance } from 'fastify';
import { z } from 'zod';
import { withTenant } from '../data/db.js';
const listQuerySchema = z.object({
page: z.coerce.number().min(1).default(1),
limit: z.coerce.number().min(1).max(100).default(20),
status: z.enum(['open', 'acknowledged', 'snoozed', 'expected', 'resolved']).optional(),
account_id: z.string().optional(),
min_score: z.coerce.number().min(0).max(100).optional(),
});
export function registerAnomalyRoutes(app: FastifyInstance) {
// List anomalies
app.get('/api/v1/anomalies', async (req, reply) => {
const query = listQuerySchema.parse(req.query);
const tenantId = (req as any).tenantId;
const offset = (query.page - 1) * query.limit;
const result = await withTenant(tenantId, async (client) => {
let sql = 'SELECT * FROM anomalies WHERE 1=1';
const params: any[] = [];
let idx = 1;
if (query.status) { sql += ` AND status = $${idx++}`; params.push(query.status); }
if (query.account_id) { sql += ` AND account_id = $${idx++}`; params.push(query.account_id); }
if (query.min_score) { sql += ` AND score >= $${idx++}`; params.push(query.min_score); }
sql += ` ORDER BY detected_at DESC LIMIT $${idx++} OFFSET $${idx++}`;
params.push(query.limit, offset);
return client.query(sql, params);
});
return { anomalies: result.rows, page: query.page, limit: query.limit };
});
// Acknowledge anomaly
app.post('/api/v1/anomalies/:id/acknowledge', async (req, reply) => {
const { id } = req.params as { id: string };
const tenantId = (req as any).tenantId;
await withTenant(tenantId, async (client) => {
await client.query("UPDATE anomalies SET status = 'acknowledged' WHERE id = $1 AND status = 'open'", [id]);
});
return { status: 'acknowledged' };
});
// Snooze anomaly
app.post('/api/v1/anomalies/:id/snooze', async (req, reply) => {
const { id } = req.params as { id: string };
const { hours } = z.object({ hours: z.coerce.number().min(1).max(168).default(24) }).parse(req.body ?? {});
const tenantId = (req as any).tenantId;
await withTenant(tenantId, async (client) => {
await client.query(
"UPDATE anomalies SET status = 'snoozed', snoozed_until = now() + $1 * interval '1 hour' WHERE id = $2",
[hours, id],
);
});
return { status: 'snoozed', until_hours: hours };
});
// Mark as expected (recurring cost)
app.post('/api/v1/anomalies/:id/expected', async (req, reply) => {
const { id } = req.params as { id: string };
const tenantId = (req as any).tenantId;
await withTenant(tenantId, async (client) => {
await client.query("UPDATE anomalies SET status = 'expected' WHERE id = $1", [id]);
});
return { status: 'expected' };
});
// Dashboard summary
app.get('/api/v1/dashboard', async (req, reply) => {
const tenantId = (req as any).tenantId;
const result = await withTenant(tenantId, async (client) => {
const openCount = await client.query("SELECT COUNT(*)::int as count FROM anomalies WHERE status = 'open'");
const topResources = await client.query(`
SELECT resource_type, COUNT(*)::int as anomaly_count, AVG(score)::numeric(5,2) as avg_score
FROM anomalies WHERE status = 'open'
GROUP BY resource_type ORDER BY anomaly_count DESC LIMIT 10
`);
const recentTrend = await client.query(`
SELECT date_trunc('hour', detected_at) as hour, COUNT(*)::int as count
FROM anomalies WHERE detected_at > now() - interval '24 hours'
GROUP BY hour ORDER BY hour
`);
return {
open_anomalies: openCount.rows[0]?.count ?? 0,
top_resources: topResources.rows,
hourly_trend: recentTrend.rows,
};
});
return result;
});
}

View File

@@ -0,0 +1,38 @@
import type { FastifyInstance } from 'fastify';
import { withTenant } from '../data/db.js';
export function registerBaselineRoutes(app: FastifyInstance) {
// List baselines
app.get('/api/v1/baselines', async (req, reply) => {
const tenantId = (req as any).tenantId;
const result = await withTenant(tenantId, async (client) => {
return client.query(`
SELECT account_id, resource_type, welford_count as sample_count,
welford_mean::numeric(12,4) as mean,
CASE WHEN welford_count > 1
THEN sqrt(welford_m2 / welford_count)::numeric(12,4)
ELSE 0 END as stddev,
updated_at
FROM baselines ORDER BY account_id, resource_type
`);
});
return { baselines: result.rows };
});
// Reset baseline for a specific resource
app.delete('/api/v1/baselines/:accountId/:resourceType', async (req, reply) => {
const { accountId, resourceType } = req.params as { accountId: string; resourceType: string };
const tenantId = (req as any).tenantId;
await withTenant(tenantId, async (client) => {
await client.query(
'DELETE FROM baselines WHERE account_id = $1 AND resource_type = $2',
[accountId, resourceType],
);
});
return { status: 'reset', accountId, resourceType };
});
}

View File

@@ -0,0 +1,59 @@
import type { FastifyInstance } from 'fastify';
import { withTenant } from '../data/db.js';
import { GovernanceEngine } from '../governance/engine.js';
const engine = new GovernanceEngine();
export function registerGovernanceRoutes(app: FastifyInstance) {
// Get governance status
app.get('/api/v1/governance', async (req, reply) => {
const tenantId = (req as any).tenantId;
const result = await withTenant(tenantId, async (client) => {
return client.query('SELECT governance_mode, governance_started_at FROM tenants WHERE id = $1', [tenantId]);
});
const tenant = result.rows[0];
if (!tenant) return reply.status(404).send({ error: 'Tenant not found' });
const daysInMode = Math.floor((Date.now() - new Date(tenant.governance_started_at).getTime()) / (86400 * 1000));
return {
mode: tenant.governance_mode,
days_in_mode: daysInMode,
started_at: tenant.governance_started_at,
};
});
// Check promotion eligibility
app.get('/api/v1/governance/promotion', async (req, reply) => {
const tenantId = (req as any).tenantId;
const result = await withTenant(tenantId, async (client) => {
const tenant = await client.query('SELECT governance_mode, governance_started_at FROM tenants WHERE id = $1', [tenantId]);
const t = tenant.rows[0];
if (!t) return null;
// Calculate FP rate from last 30 days
const fpStats = await client.query(`
SELECT
COUNT(*) FILTER (WHERE status = 'expected')::float /
NULLIF(COUNT(*)::float, 0) as fp_rate
FROM anomalies
WHERE detected_at > now() - interval '30 days'
`);
return { tenant: t, fpRate: fpStats.rows[0]?.fp_rate ?? 0 };
});
if (!result) return reply.status(404).send({ error: 'Tenant not found' });
const daysInMode = Math.floor((Date.now() - new Date(result.tenant.governance_started_at).getTime()) / (86400 * 1000));
const evaluation = engine.evaluatePromotion(tenantId, {
fpRate: result.fpRate,
daysInCurrentMode: daysInMode,
});
return { ...evaluation, current_mode: result.tenant.governance_mode, fp_rate: result.fpRate, days_in_mode: daysInMode };
});
}

View File

@@ -0,0 +1,103 @@
import type { FastifyInstance } from 'fastify';
import { z } from 'zod';
import pino from 'pino';
import { withTenant } from '../data/db.js';
import { WelfordBaseline, scoreAnomaly, shouldAlert, type CostEvent } from '../detection/scorer.js';
import { config } from '../config/index.js';
const logger = pino({ name: 'api-ingestion' });
const costEventSchema = z.object({
account_id: z.string(),
resource_type: z.string(),
hourly_cost: z.number().min(0),
region: z.string().default('us-east-1'),
tags: z.record(z.string()).default({}),
});
const batchSchema = z.object({
events: z.array(costEventSchema).min(1).max(100),
});
export function registerIngestionRoutes(app: FastifyInstance) {
// Ingest cost events (from agent or Cost Explorer poller)
app.post('/api/v1/ingest', async (req, reply) => {
const tenantId = (req as any).tenantId;
const { events } = batchSchema.parse(req.body);
const results = await withTenant(tenantId, async (client) => {
const anomalies = [];
for (const event of events) {
// Fetch or create baseline
const baselineRow = await client.query(
`SELECT welford_count, welford_mean, welford_m2, version
FROM baselines WHERE account_id = $1 AND resource_type = $2`,
[event.account_id, event.resource_type],
);
let baseline: WelfordBaseline;
let version: number;
if (baselineRow.rows[0]) {
baseline = WelfordBaseline.fromJSON({
count: baselineRow.rows[0].welford_count,
mean: parseFloat(baselineRow.rows[0].welford_mean),
m2: parseFloat(baselineRow.rows[0].welford_m2),
});
version = baselineRow.rows[0].version;
} else {
baseline = new WelfordBaseline();
version = 0;
}
// Score before updating baseline
const score = scoreAnomaly({
cost: event.hourly_cost,
mean: baseline.mean,
stddev: baseline.stddev,
});
const isAnomaly = shouldAlert(score, config.ANOMALY_THRESHOLD);
// Update baseline with optimistic locking (BMad must-have: concurrent Welford corruption)
baseline.update(event.hourly_cost);
const bj = baseline.toJSON();
if (version === 0) {
await client.query(
`INSERT INTO baselines (tenant_id, account_id, resource_type, welford_count, welford_mean, welford_m2, version)
VALUES ($1, $2, $3, $4, $5, $6, 1)
ON CONFLICT (tenant_id, account_id, resource_type) DO NOTHING`,
[tenantId, event.account_id, event.resource_type, bj.count, bj.mean, bj.m2],
);
} else {
const updated = await client.query(
`UPDATE baselines SET welford_count = $1, welford_mean = $2, welford_m2 = $3,
version = version + 1, updated_at = now()
WHERE account_id = $4 AND resource_type = $5 AND version = $6`,
[bj.count, bj.mean, bj.m2, event.account_id, event.resource_type, version],
);
if (updated.rowCount === 0) {
logger.warn({ accountId: event.account_id, resourceType: event.resource_type },
'Optimistic lock conflict — baseline update skipped');
}
}
// Record anomaly if threshold exceeded
if (isAnomaly) {
await client.query(
`INSERT INTO anomalies (tenant_id, account_id, resource_type, region, hourly_cost, score, baseline_mean, baseline_stddev, tags)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)`,
[tenantId, event.account_id, event.resource_type, event.region, event.hourly_cost, score, baseline.mean, baseline.stddev, JSON.stringify(event.tags)],
);
anomalies.push({ ...event, score, isAnomaly: true });
}
}
return anomalies;
});
return { ingested: events.length, anomalies: results.length, details: results };
});
}

View File

@@ -0,0 +1,14 @@
import { z } from 'zod';
const envSchema = z.object({
PORT: z.coerce.number().default(3000),
DATABASE_URL: z.string().default('postgresql://localhost:5432/dd0c_cost'),
REDIS_URL: z.string().default('redis://localhost:6379'),
JWT_SECRET: z.string().min(32).default('dev-secret-change-me-in-production!!'),
CORS_ORIGIN: z.string().default('*'),
LOG_LEVEL: z.string().default('info'),
ANOMALY_THRESHOLD: z.coerce.number().default(50),
});
export const config = envSchema.parse(process.env);
export type Config = z.infer<typeof envSchema>;

View File

@@ -0,0 +1,24 @@
import pg from 'pg';
import pino from 'pino';
import { config } from '../config/index.js';
const logger = pino({ name: 'data' });
export const pool = new pg.Pool({ connectionString: config.DATABASE_URL });
export async function withTenant<T>(tenantId: string, fn: (client: pg.PoolClient) => Promise<T>): Promise<T> {
const client = await pool.connect();
try {
await client.query('BEGIN');
await client.query(`SET LOCAL app.tenant_id = '${tenantId}'`);
const result = await fn(client);
await client.query('COMMIT');
return result;
} catch (err) {
await client.query('ROLLBACK');
throw err;
} finally {
await client.query('RESET app.tenant_id');
client.release();
}
}

View File

@@ -0,0 +1,29 @@
import Fastify from 'fastify';
import cors from '@fastify/cors';
import pino from 'pino';
import { config } from './config/index.js';
import { registerAnomalyRoutes } from './api/anomalies.js';
import { registerBaselineRoutes } from './api/baselines.js';
import { registerGovernanceRoutes } from './api/governance.js';
import { registerIngestionRoutes } from './api/ingestion.js';
const logger = pino({ name: 'dd0c-cost', level: config.LOG_LEVEL });
const app = Fastify({ logger: true });
await app.register(cors, { origin: config.CORS_ORIGIN });
app.get('/health', async () => ({ status: 'ok', service: 'dd0c-cost' }));
registerIngestionRoutes(app);
registerAnomalyRoutes(app);
registerBaselineRoutes(app);
registerGovernanceRoutes(app);
try {
await app.listen({ port: config.PORT, host: '0.0.0.0' });
logger.info({ port: config.PORT }, 'dd0c/cost started');
} catch (err) {
logger.fatal(err, 'Failed to start');
process.exit(1);
}