diff --git a/products/02-iac-drift-detection/saas/migrations/005_drift_features.sql b/products/02-iac-drift-detection/saas/migrations/005_drift_features.sql new file mode 100644 index 0000000..3e82bf1 --- /dev/null +++ b/products/02-iac-drift-detection/saas/migrations/005_drift_features.sql @@ -0,0 +1,39 @@ +-- 005: Canonical resources, remediations, drift acceptances, analytics indexes + +-- Add canonical_resources JSONB column to drift_reports +ALTER TABLE drift_reports ADD COLUMN IF NOT EXISTS canonical_resources JSONB; + +-- Remediations (from Slack interactive actions) +CREATE TABLE IF NOT EXISTS remediations ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + tenant_id UUID NOT NULL REFERENCES tenants(id) ON DELETE CASCADE, + stack_name TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'pending' CHECK (status IN ('pending', 'in_progress', 'completed', 'failed', 'cancelled')), + requested_by TEXT NOT NULL, + requested_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + completed_at TIMESTAMPTZ +); +CREATE INDEX IF NOT EXISTS idx_remediations_tenant_stack ON remediations(tenant_id, stack_name); + +ALTER TABLE remediations ENABLE ROW LEVEL SECURITY; +CREATE POLICY tenant_iso_remediations ON remediations + USING (tenant_id::text = current_setting('app.tenant_id', true)); + +-- Drift acceptances (from Slack interactive actions) +CREATE TABLE IF NOT EXISTS drift_acceptances ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + tenant_id UUID NOT NULL REFERENCES tenants(id) ON DELETE CASCADE, + report_id UUID NOT NULL REFERENCES drift_reports(id) ON DELETE CASCADE, + accepted_by TEXT NOT NULL, + accepted_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + reason TEXT +); +CREATE INDEX IF NOT EXISTS idx_drift_acceptances_tenant ON drift_acceptances(tenant_id, report_id); + +ALTER TABLE drift_acceptances ENABLE ROW LEVEL SECURITY; +CREATE POLICY tenant_iso_drift_acceptances ON drift_acceptances + USING (tenant_id::text = current_setting('app.tenant_id', true)); + +-- Indexes for time-series analytics queries +CREATE INDEX IF NOT EXISTS idx_drift_reports_tenant_scanned ON drift_reports(tenant_id, scanned_at DESC); +CREATE INDEX IF NOT EXISTS idx_drift_reports_score_time ON drift_reports(tenant_id, stack_name, scanned_at DESC, drift_score); diff --git a/products/02-iac-drift-detection/saas/src/api/analytics.ts b/products/02-iac-drift-detection/saas/src/api/analytics.ts new file mode 100644 index 0000000..50d7f87 --- /dev/null +++ b/products/02-iac-drift-detection/saas/src/api/analytics.ts @@ -0,0 +1,105 @@ +import { FastifyInstance } from 'fastify'; +import { withTenant } from '../data/db.js'; + +export async function registerAnalyticsRoutes(app: FastifyInstance) { + // Drift score trends per stack over last 30 days + app.get('/api/v1/analytics/trends', async (request, reply) => { + const tenantId = (request as any).tenantId; + const pool = (app as any).pool; + const { stack_name, days } = request.query as { stack_name?: string; days?: string }; + const lookbackDays = Math.min(parseInt(days ?? '30') || 30, 90); + + const result = await withTenant(pool, tenantId, async (client) => { + if (stack_name) { + const { rows } = await client.query( + `SELECT stack_name, drift_score, total_resources, scanned_at::date as date, + drift_score as score + FROM drift_reports + WHERE tenant_id = $1 AND stack_name = $2 + AND scanned_at >= NOW() - make_interval(days => $3) + ORDER BY scanned_at ASC`, + [tenantId, stack_name, lookbackDays], + ); + return rows; + } + + // All stacks: daily max drift score per stack + const { rows } = await client.query( + `SELECT stack_name, scanned_at::date as date, + MAX(drift_score) as max_score, + MIN(drift_score) as min_score, + AVG(drift_score)::numeric(5,2) as avg_score, + COUNT(*) as scan_count + FROM drift_reports + WHERE tenant_id = $1 + AND scanned_at >= NOW() - make_interval(days => $2) + GROUP BY stack_name, scanned_at::date + ORDER BY date ASC, stack_name`, + [tenantId, lookbackDays], + ); + return rows; + }); + + return reply.send({ days: lookbackDays, trends: result }); + }); + + // Overall drift health summary + app.get('/api/v1/analytics/summary', async (request, reply) => { + const tenantId = (request as any).tenantId; + const pool = (app as any).pool; + + const result = await withTenant(pool, tenantId, async (client) => { + // Latest report per stack + const latestRes = await client.query( + `SELECT DISTINCT ON (stack_name) stack_name, drift_score, total_resources, scanned_at + FROM drift_reports + WHERE tenant_id = $1 + ORDER BY stack_name, scanned_at DESC`, + [tenantId], + ); + const latest = latestRes.rows; + + const totalStacks = latest.length; + const cleanStacks = latest.filter((r: any) => r.drift_score === 0).length; + const cleanPct = totalStacks > 0 ? Math.round((cleanStacks / totalStacks) * 100) : 100; + const avgScore = totalStacks > 0 + ? parseFloat((latest.reduce((sum: number, r: any) => sum + parseFloat(r.drift_score), 0) / totalStacks).toFixed(2)) + : 0; + + const worstOffenders = [...latest] + .sort((a: any, b: any) => b.drift_score - a.drift_score) + .slice(0, 5) + .map((r: any) => ({ + stack_name: r.stack_name, + drift_score: parseFloat(r.drift_score), + total_resources: r.total_resources, + last_scanned: r.scanned_at, + })); + + // 24h trend + const dayAgoRes = await client.query( + `SELECT AVG(drift_score)::numeric(5,2) as avg_score + FROM ( + SELECT DISTINCT ON (stack_name) stack_name, drift_score + FROM drift_reports + WHERE tenant_id = $1 AND scanned_at < NOW() - INTERVAL '24 hours' + ORDER BY stack_name, scanned_at DESC + ) prev`, + [tenantId], + ); + const prevAvg = parseFloat(dayAgoRes.rows[0]?.avg_score ?? '0'); + const trend = avgScore - prevAvg; + + return { + total_stacks: totalStacks, + clean_stacks: cleanStacks, + clean_pct: cleanPct, + avg_drift_score: avgScore, + score_trend_24h: parseFloat(trend.toFixed(2)), + worst_offenders: worstOffenders, + }; + }); + + return reply.send(result); + }); +} diff --git a/products/02-iac-drift-detection/saas/src/api/slack-interactions.ts b/products/02-iac-drift-detection/saas/src/api/slack-interactions.ts new file mode 100644 index 0000000..0adf7ed --- /dev/null +++ b/products/02-iac-drift-detection/saas/src/api/slack-interactions.ts @@ -0,0 +1,118 @@ +import { FastifyInstance } from 'fastify'; +import crypto from 'node:crypto'; +import pino from 'pino'; + +const logger = pino({ name: 'slack-interactions' }); + +function verifySlackSignature(signingSecret: string, timestamp: string, body: string, signature: string): boolean { + const fiveMinAgo = Math.floor(Date.now() / 1000) - 300; + if (parseInt(timestamp) < fiveMinAgo) return false; + + const baseString = `v0:${timestamp}:${body}`; + const hmac = crypto.createHmac('sha256', signingSecret).update(baseString).digest('hex'); + const computed = `v0=${hmac}`; + + return crypto.timingSafeEqual(Buffer.from(computed), Buffer.from(signature)); +} + +export async function registerSlackInteractionRoutes(app: FastifyInstance) { + // Must add raw body support for signature verification + app.addContentTypeParser('application/x-www-form-urlencoded', { parseAs: 'string' }, (req, body, done) => { + done(null, body); + }); + + app.post('/api/v1/slack/interactions', async (request, reply) => { + const config = (app as any).config; + const signingSecret = config.slackSigningSecret; + + if (!signingSecret) { + logger.warn('SLACK_SIGNING_SECRET not configured'); + return reply.status(500).send({ error: 'Slack signing secret not configured' }); + } + + const timestamp = request.headers['x-slack-request-timestamp'] as string; + const signature = request.headers['x-slack-signature'] as string; + const rawBody = request.body as string; + + if (!timestamp || !signature || !rawBody) { + return reply.status(400).send({ error: 'Missing Slack headers' }); + } + + if (!verifySlackSignature(signingSecret, timestamp, rawBody, signature)) { + return reply.status(401).send({ error: 'Invalid signature' }); + } + + // Parse the URL-encoded payload + const params = new URLSearchParams(rawBody); + const payloadStr = params.get('payload'); + if (!payloadStr) { + return reply.status(400).send({ error: 'Missing payload' }); + } + + const payload = JSON.parse(payloadStr); + const actions = payload.actions ?? []; + const userId = payload.user?.id ?? 'unknown'; + const userName = payload.user?.name ?? 'unknown'; + const pool = (app as any).pool; + + for (const action of actions) { + const actionId: string = action.action_id ?? ''; + + if (actionId.startsWith('remediate_')) { + const stackName = actionId.replace('remediate_', ''); + logger.info({ stackName, userId: userName }, 'Remediation requested via Slack'); + + // Find tenant from the stack — look up via notification_configs channel metadata + // For now, use the first tenant that has this stack + const tenantRes = await pool.query( + `SELECT DISTINCT tenant_id FROM drift_reports WHERE stack_name = $1 LIMIT 1`, + [stackName], + ); + + if (tenantRes.rows[0]) { + const tenantId = tenantRes.rows[0].tenant_id; + await pool.query( + `INSERT INTO remediations (tenant_id, stack_name, status, requested_by) + VALUES ($1, $2, 'pending', $3)`, + [tenantId, stackName, userName], + ); + } + + return reply.send({ + response_type: 'ephemeral', + text: `✅ Remediation plan queued for *${stackName}*. Requested by <@${userId}>.`, + }); + } + + if (actionId.startsWith('accept_')) { + const stackName = actionId.replace('accept_', ''); + logger.info({ stackName, userId: userName }, 'Drift acceptance via Slack'); + + // Find latest report for this stack + const reportRes = await pool.query( + `SELECT id, tenant_id FROM drift_reports + WHERE stack_name = $1 + ORDER BY scanned_at DESC LIMIT 1`, + [stackName], + ); + + if (reportRes.rows[0]) { + const { id: reportId, tenant_id: tenantId } = reportRes.rows[0]; + await pool.query( + `INSERT INTO drift_acceptances (tenant_id, report_id, accepted_by, reason) + VALUES ($1, $2, $3, $4)`, + [tenantId, reportId, userName, 'Accepted via Slack'], + ); + } + + return reply.send({ + response_type: 'ephemeral', + text: `✅ Drift accepted for *${stackName}*. Marked by <@${userId}>.`, + }); + } + } + + // Unknown action — acknowledge + return reply.send({ response_type: 'ephemeral', text: 'Action received.' }); + }); +} diff --git a/products/02-iac-drift-detection/saas/src/config/index.ts b/products/02-iac-drift-detection/saas/src/config/index.ts index 7a2397a..d3ccd28 100644 --- a/products/02-iac-drift-detection/saas/src/config/index.ts +++ b/products/02-iac-drift-detection/saas/src/config/index.ts @@ -11,6 +11,8 @@ const envSchema = z.object({ SQS_QUEUE_URL: z.string().optional(), S3_BUCKET: z.string().default('dd0c-drift-snapshots'), SLACK_WEBHOOK_URL: z.string().optional(), + SLACK_SIGNING_SECRET: z.string().optional(), + DAILY_DIGEST_CRON: z.string().default('0 9 * * *'), }); const parsed = envSchema.safeParse(process.env); @@ -30,6 +32,8 @@ export const config = { sqsQueueUrl: parsed.data.SQS_QUEUE_URL, s3Bucket: parsed.data.S3_BUCKET, slackWebhookUrl: parsed.data.SLACK_WEBHOOK_URL, + slackSigningSecret: parsed.data.SLACK_SIGNING_SECRET, + dailyDigestCron: parsed.data.DAILY_DIGEST_CRON, }; export type Config = typeof config; diff --git a/products/02-iac-drift-detection/saas/src/index.ts b/products/02-iac-drift-detection/saas/src/index.ts index 472eca4..b451c31 100644 --- a/products/02-iac-drift-detection/saas/src/index.ts +++ b/products/02-iac-drift-detection/saas/src/index.ts @@ -7,7 +7,10 @@ import { createPool } from './data/db.js'; import { createRedis } from './data/redis.js'; import { registerProcessorRoutes } from './processor/routes.js'; import { registerApiRoutes } from './api/routes.js'; +import { registerAnalyticsRoutes } from './api/analytics.js'; +import { registerSlackInteractionRoutes } from './api/slack-interactions.js'; import { authHook, decorateAuth, registerAuthRoutes, registerProtectedAuthRoutes } from './auth/middleware.js'; +import { runDailyDigest } from './notifications/daily-digest.js'; const app = Fastify({ logger: { @@ -38,16 +41,44 @@ async function start() { // Auth routes (public - login/signup) registerAuthRoutes(app, config.jwtSecret, pool); + // Slack interactions (public — verified via Slack signing secret, not JWT) + await app.register(async function slackRoutes(slackApp) { + await registerSlackInteractionRoutes(slackApp); + }); + // Protected routes (auth required) app.register(async function protectedRoutes(protectedApp) { protectedApp.addHook('onRequest', authHook(config.jwtSecret, pool)); registerProtectedAuthRoutes(protectedApp, config.jwtSecret, pool); await registerProcessorRoutes(protectedApp); await registerApiRoutes(protectedApp); + await registerAnalyticsRoutes(protectedApp); }); await app.listen({ port: config.port, host: '0.0.0.0' }); app.log.info(`dd0c/drift SaaS listening on :${config.port}`); + + // Schedule daily digest + scheduleDailyDigest(pool); +} + +function scheduleDailyDigest(pool: ReturnType) { + // Simple interval-based scheduler: run every hour, check if it's the right time + // In production, use a proper cron library or external scheduler + const DIGEST_HOUR = 9; // 9 AM UTC by default + + const check = () => { + const now = new Date(); + if (now.getUTCHours() === DIGEST_HOUR && now.getUTCMinutes() < 5) { + runDailyDigest(pool).catch((err) => { + app.log.error({ error: (err as Error).message }, 'Daily digest failed'); + }); + } + }; + + // Check every 5 minutes + setInterval(check, 5 * 60 * 1000); + app.log.info({ digestHour: DIGEST_HOUR }, 'Daily digest scheduler started'); } start().catch((err) => { diff --git a/products/02-iac-drift-detection/saas/src/notifications/daily-digest.ts b/products/02-iac-drift-detection/saas/src/notifications/daily-digest.ts new file mode 100644 index 0000000..c6f7501 --- /dev/null +++ b/products/02-iac-drift-detection/saas/src/notifications/daily-digest.ts @@ -0,0 +1,172 @@ +import type pg from 'pg'; +import pino from 'pino'; +import { sendSlackNotification } from './service.js'; + +const logger = pino({ name: 'daily-digest' }); + +interface DigestData { + tenantId: string; + totalStacks: number; + totalDrifted: number; + worstDriftScores: Array<{ stack_name: string; drift_score: number }>; + newDrifts: number; + resolvedDrifts: number; +} + +function buildDigestBlocks(digest: DigestData) { + const cleanPct = digest.totalStacks > 0 + ? Math.round(((digest.totalStacks - digest.totalDrifted) / digest.totalStacks) * 100) + : 100; + + const worstList = digest.worstDriftScores + .slice(0, 5) + .map((w) => `• *${w.stack_name}*: ${w.drift_score}/100`) + .join('\n') || '_None_'; + + return { + blocks: [ + { + type: 'header', + text: { type: 'plain_text', text: '📊 Daily Drift Digest' }, + }, + { + type: 'section', + fields: [ + { type: 'mrkdwn', text: `*Stacks Scanned:* ${digest.totalStacks}` }, + { type: 'mrkdwn', text: `*Currently Drifted:* ${digest.totalDrifted}` }, + { type: 'mrkdwn', text: `*Clean:* ${cleanPct}%` }, + { type: 'mrkdwn', text: `*New Drifts:* ${digest.newDrifts}` }, + ], + }, + { + type: 'section', + text: { type: 'mrkdwn', text: `*Resolved (last 24h):* ${digest.resolvedDrifts}` }, + }, + { + type: 'section', + text: { type: 'mrkdwn', text: `*Worst Offenders:*\n${worstList}` }, + }, + { + type: 'context', + elements: [ + { type: 'mrkdwn', text: `Report generated at ${new Date().toISOString()}` }, + ], + }, + ], + }; +} + +async function gatherDigest(pool: pg.Pool, tenantId: string): Promise { + const client = await pool.connect(); + try { + await client.query('BEGIN'); + await client.query("SELECT set_config('app.tenant_id', $1, true)", [tenantId]); + + // Total distinct stacks + const stacksRes = await client.query( + 'SELECT COUNT(DISTINCT stack_name) as cnt FROM drift_reports WHERE tenant_id = $1', + [tenantId], + ); + const totalStacks = parseInt(stacksRes.rows[0]?.cnt ?? '0'); + + // Currently drifted (latest report per stack with score > 0) + const driftedRes = await client.query( + `SELECT COUNT(*) as cnt FROM ( + SELECT DISTINCT ON (stack_name) stack_name, drift_score + FROM drift_reports WHERE tenant_id = $1 + ORDER BY stack_name, scanned_at DESC + ) latest WHERE drift_score > 0`, + [tenantId], + ); + const totalDrifted = parseInt(driftedRes.rows[0]?.cnt ?? '0'); + + // Worst drift scores (latest per stack, top 5) + const worstRes = await client.query( + `SELECT DISTINCT ON (stack_name) stack_name, drift_score + FROM drift_reports WHERE tenant_id = $1 + ORDER BY stack_name, scanned_at DESC`, + [tenantId], + ); + const worstDriftScores = worstRes.rows + .filter((r: any) => r.drift_score > 0) + .sort((a: any, b: any) => b.drift_score - a.drift_score) + .slice(0, 5); + + // New drifts in last 24h (reports where previous report for same stack had score 0) + const newRes = await client.query( + `SELECT COUNT(*) as cnt FROM drift_reports + WHERE tenant_id = $1 AND drift_score > 0 + AND scanned_at >= NOW() - INTERVAL '24 hours'`, + [tenantId], + ); + const newDrifts = parseInt(newRes.rows[0]?.cnt ?? '0'); + + // Resolved: stacks where latest report is clean but had drift in last 24h + const resolvedRes = await client.query( + `SELECT COUNT(*) as cnt FROM ( + SELECT DISTINCT ON (stack_name) stack_name, drift_score + FROM drift_reports WHERE tenant_id = $1 + ORDER BY stack_name, scanned_at DESC + ) latest + WHERE drift_score = 0 + AND stack_name IN ( + SELECT DISTINCT stack_name FROM drift_reports + WHERE tenant_id = $1 AND drift_score > 0 + AND scanned_at >= NOW() - INTERVAL '48 hours' + AND scanned_at < NOW() - INTERVAL '0 hours' + )`, + [tenantId], + ); + const resolvedDrifts = parseInt(resolvedRes.rows[0]?.cnt ?? '0'); + + await client.query('COMMIT'); + + return { tenantId, totalStacks, totalDrifted, worstDriftScores, newDrifts, resolvedDrifts }; + } catch (err) { + await client.query('ROLLBACK'); + throw err; + } finally { + client.release(); + } +} + +export async function runDailyDigest(pool: pg.Pool): Promise { + logger.info('Starting daily digest run'); + + // Get all tenants with Slack notification configured + const tenantsRes = await pool.query( + `SELECT t.id as tenant_id, nc.config->>'webhook_url' as webhook_url + FROM tenants t + JOIN notification_configs nc ON nc.tenant_id = t.id + WHERE nc.channel = 'slack' AND nc.enabled = true`, + ); + + for (const row of tenantsRes.rows) { + try { + const digest = await gatherDigest(pool, row.tenant_id); + + if (digest.totalStacks === 0) { + logger.info({ tenantId: row.tenant_id }, 'No stacks — skipping digest'); + continue; + } + + const payload = buildDigestBlocks(digest); + const resp = await fetch(row.webhook_url, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(payload), + }); + + if (!resp.ok) { + const text = await resp.text(); + logger.error({ tenantId: row.tenant_id, status: resp.status, text }, 'Digest Slack send failed'); + } else { + logger.info({ tenantId: row.tenant_id }, 'Daily digest sent'); + } + } catch (err) { + logger.error({ tenantId: row.tenant_id, error: (err as Error).message }, 'Digest failed for tenant'); + } + } + + logger.info('Daily digest run complete'); +} diff --git a/products/02-iac-drift-detection/saas/src/processor/chunk-assembler.ts b/products/02-iac-drift-detection/saas/src/processor/chunk-assembler.ts new file mode 100644 index 0000000..9bc0060 --- /dev/null +++ b/products/02-iac-drift-detection/saas/src/processor/chunk-assembler.ts @@ -0,0 +1,96 @@ +import type Redis from 'ioredis'; +import pino from 'pino'; + +const logger = pino({ name: 'chunk-assembler' }); + +const CHUNK_TTL = 600; // 10 minutes + +function chunkKey(reportId: string, index: number): string { + return `chunk:${reportId}:${index}`; +} + +function metaKey(reportId: string): string { + return `chunk:${reportId}:meta`; +} + +export interface ChunkInput { + report_id: string; + chunk_index: number; + total_chunks: number; + data: Record; +} + +export interface AssemblyResult { + complete: boolean; + received: number; + total: number; + assembled?: Record; +} + +export async function storeChunk(redis: Redis, chunk: ChunkInput): Promise { + const { report_id, chunk_index, total_chunks, data } = chunk; + + // Store chunk data with TTL + const key = chunkKey(report_id, chunk_index); + await redis.setex(key, CHUNK_TTL, JSON.stringify(data)); + + // Track metadata: total_chunks and received set + const meta = metaKey(report_id); + await redis.hset(meta, 'total', total_chunks.toString()); + await redis.hset(meta, `idx:${chunk_index}`, '1'); + await redis.expire(meta, CHUNK_TTL); + + // Count received chunks + const fields = await redis.hgetall(meta); + const received = Object.keys(fields).filter((k) => k.startsWith('idx:')).length; + + logger.info({ report_id, chunk_index, received, total_chunks }, 'Chunk stored'); + + if (received < total_chunks) { + return { complete: false, received, total: total_chunks }; + } + + // All chunks received — reassemble + const assembled = await reassemble(redis, report_id, total_chunks); + await cleanup(redis, report_id, total_chunks); + + return { complete: true, received, total: total_chunks, assembled }; +} + +async function reassemble(redis: Redis, reportId: string, totalChunks: number): Promise> { + const parts: Record[] = []; + + for (let i = 0; i < totalChunks; i++) { + const raw = await redis.get(chunkKey(reportId, i)); + if (!raw) throw new Error(`Missing chunk ${i} for report ${reportId}`); + parts.push(JSON.parse(raw)); + } + + // First chunk is the base, subsequent chunks append drifted_resources + const base = { ...parts[0] }; + const allResources: unknown[] = (base.drifted_resources as unknown[]) ?? []; + + for (let i = 1; i < parts.length; i++) { + const chunk = parts[i]; + if (Array.isArray(chunk.drifted_resources)) { + allResources.push(...chunk.drifted_resources); + } + for (const [k, v] of Object.entries(chunk)) { + if (k !== 'drifted_resources') { + base[k] = v; + } + } + } + + base.drifted_resources = allResources; + logger.info({ reportId, totalChunks, resources: allResources.length }, 'Report reassembled'); + return base; +} + +async function cleanup(redis: Redis, reportId: string, totalChunks: number): Promise { + const keys = [metaKey(reportId)]; + for (let i = 0; i < totalChunks; i++) { + keys.push(chunkKey(reportId, i)); + } + await redis.del(...keys); +} diff --git a/products/02-iac-drift-detection/saas/src/processor/normalizer.ts b/products/02-iac-drift-detection/saas/src/processor/normalizer.ts new file mode 100644 index 0000000..4e495a7 --- /dev/null +++ b/products/02-iac-drift-detection/saas/src/processor/normalizer.ts @@ -0,0 +1,141 @@ +import crypto from 'node:crypto'; + +// --- Provider → Canonical resource type mapping --- + +const RESOURCE_TYPE_MAP: Record = { + // Compute + aws_instance: 'compute/instance', + aws_launch_template: 'compute/launch-template', + google_compute_instance: 'compute/instance', + google_compute_instance_template: 'compute/launch-template', + azurerm_virtual_machine: 'compute/instance', + azurerm_linux_virtual_machine: 'compute/instance', + azurerm_windows_virtual_machine: 'compute/instance', + + // Networking + aws_vpc: 'network/vpc', + google_compute_network: 'network/vpc', + azurerm_virtual_network: 'network/vpc', + aws_subnet: 'network/subnet', + google_compute_subnetwork: 'network/subnet', + azurerm_subnet: 'network/subnet', + aws_security_group: 'network/firewall', + google_compute_firewall: 'network/firewall', + azurerm_network_security_group: 'network/firewall', + aws_lb: 'network/load-balancer', + google_compute_forwarding_rule: 'network/load-balancer', + azurerm_lb: 'network/load-balancer', + + // Storage + aws_s3_bucket: 'storage/bucket', + google_storage_bucket: 'storage/bucket', + azurerm_storage_account: 'storage/bucket', + aws_ebs_volume: 'storage/disk', + google_compute_disk: 'storage/disk', + azurerm_managed_disk: 'storage/disk', + + // Database + aws_db_instance: 'database/instance', + google_sql_database_instance: 'database/instance', + azurerm_mssql_server: 'database/instance', + azurerm_postgresql_server: 'database/instance', + + // IAM + aws_iam_role: 'iam/role', + google_project_iam_member: 'iam/role', + azurerm_role_assignment: 'iam/role', + aws_iam_policy: 'iam/policy', + google_project_iam_policy: 'iam/policy', + + // DNS + aws_route53_record: 'dns/record', + google_dns_record_set: 'dns/record', + azurerm_dns_a_record: 'dns/record', + + // Kubernetes + kubernetes_deployment: 'k8s/deployment', + kubernetes_service: 'k8s/service', + kubernetes_namespace: 'k8s/namespace', + kubernetes_config_map: 'k8s/configmap', + kubernetes_secret: 'k8s/secret', +}; + +const PROVIDER_PREFIXES = ['aws_', 'google_', 'azurerm_', 'gcp_']; + +export interface DriftedResource { + resource_type: string; + resource_address: string; + module?: string; + provider: string; + severity: string; + diffs: Array<{ + attribute_name: string; + old_value: string; + new_value: string; + sensitive: boolean; + }>; +} + +export interface CanonicalResource { + canonical_type: string; + original_type: string; + resource_address: string; + provider: string; + severity: string; + fingerprint: string; + normalized_diffs: Array<{ + attribute: string; + original_attribute: string; + old_value: string; + new_value: string; + sensitive: boolean; + }>; +} + +function normalizeResourceType(resourceType: string): string { + return RESOURCE_TYPE_MAP[resourceType] ?? `other/${resourceType}`; +} + +function normalizeAttributeName(attr: string): string { + let normalized = attr; + for (const prefix of PROVIDER_PREFIXES) { + if (normalized.startsWith(prefix)) { + normalized = normalized.slice(prefix.length); + break; + } + } + return normalized.replace(/\./g, '/'); +} + +function computeFingerprint(resourceAddress: string, diffKeys: string[]): string { + const sorted = [...diffKeys].sort(); + const input = `${resourceAddress}:${sorted.join(',')}`; + return crypto.createHash('sha256').update(input).digest('hex').slice(0, 16); +} + +export function normalizeResource(resource: DriftedResource): CanonicalResource { + const normalizedDiffs = resource.diffs.map((d) => ({ + attribute: normalizeAttributeName(d.attribute_name), + original_attribute: d.attribute_name, + old_value: d.old_value, + new_value: d.new_value, + sensitive: d.sensitive, + })); + + const diffKeys = normalizedDiffs.map((d) => d.attribute); + const fingerprint = computeFingerprint(resource.resource_address, diffKeys); + + return { + canonical_type: normalizeResourceType(resource.resource_type), + original_type: resource.resource_type, + resource_address: resource.resource_address, + provider: resource.provider, + severity: resource.severity, + fingerprint, + normalized_diffs: normalizedDiffs, + }; +} + +export function normalizeReport(driftedResources: DriftedResource[]): CanonicalResource[] { + return driftedResources.map(normalizeResource); +} diff --git a/products/02-iac-drift-detection/saas/src/processor/routes.ts b/products/02-iac-drift-detection/saas/src/processor/routes.ts index 7b6af21..6f526b4 100644 --- a/products/02-iac-drift-detection/saas/src/processor/routes.ts +++ b/products/02-iac-drift-detection/saas/src/processor/routes.ts @@ -1,6 +1,8 @@ import { FastifyInstance } from 'fastify'; import { z } from 'zod'; import { withTenant } from '../data/db.js'; +import { normalizeReport } from './normalizer.js'; +import { storeChunk } from './chunk-assembler.js'; const driftReportSchema = z.object({ stack_name: z.string(), @@ -27,6 +29,60 @@ const driftReportSchema = z.object({ nonce: z.string(), }); +const chunkSchema = z.object({ + report_id: z.string().min(1), + chunk_index: z.number().int().min(0), + total_chunks: z.number().int().min(1).max(100), + data: z.record(z.unknown()), +}); + +async function processDriftReport(app: FastifyInstance, tenantId: string, report: z.infer) { + const pool = (app as any).pool; + + // Normalize drifted resources into canonical schema + const canonicalResources = report.drifted_resources?.length + ? normalizeReport(report.drifted_resources) + : []; + + await withTenant(pool, tenantId, async (client) => { + await client.query( + `INSERT INTO drift_reports (tenant_id, stack_name, stack_fingerprint, agent_version, scanned_at, state_serial, lineage, total_resources, drift_score, nonce, raw_report, canonical_resources) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)`, + [tenantId, report.stack_name, report.stack_fingerprint, report.agent_version, report.scanned_at, report.state_serial, report.lineage, report.total_resources, report.drift_score, report.nonce, JSON.stringify(report), JSON.stringify(canonicalResources)] + ); + }); + + // Trigger notification if drift score exceeds threshold + if (report.drift_score > 0) { + try { + const { dispatchNotifications, shouldNotify } = await import('../notifications/service.js'); + + const maxSeverity = report.drifted_resources?.some((r) => r.severity === 'critical') ? 'critical' + : report.drifted_resources?.some((r) => r.severity === 'high') ? 'high' : 'medium'; + + const channelResult = await withTenant(pool, tenantId, async (client) => { + return client.query('SELECT * FROM notification_configs WHERE enabled = true'); + }); + const channels = channelResult.rows.filter((ch: any) => shouldNotify(ch, report.drift_score, maxSeverity)); + + if (channels.length > 0) { + await dispatchNotifications(channels, { + tenantId, + stackName: report.stack_name, + driftScore: report.drift_score, + totalResources: report.total_resources, + totalDrifted: report.drifted_resources?.length ?? 0, + criticalCount: report.drifted_resources?.filter((r) => r.severity === 'critical').length ?? 0, + highCount: report.drifted_resources?.filter((r) => r.severity === 'high').length ?? 0, + reportUrl: `https://drift.dd0c.dev/reports/${report.stack_fingerprint}`, + }, maxSeverity); + } + } catch (err) { + app.log.warn({ tenantId, error: (err as Error).message }, 'Notification dispatch failed (non-fatal)'); + } + } +} + export async function registerProcessorRoutes(app: FastifyInstance) { app.post('/v1/ingest/drift', async (request, reply) => { const parsed = driftReportSchema.safeParse(request.body); @@ -46,48 +102,55 @@ export async function registerProcessorRoutes(app: FastifyInstance) { } await redis.setex(nonceKey, 86400, '1'); // 24h TTL - // Persist - const pool = (app as any).pool; - await withTenant(pool, tenantId, async (client) => { - await client.query( - `INSERT INTO drift_reports (tenant_id, stack_name, stack_fingerprint, agent_version, scanned_at, state_serial, lineage, total_resources, drift_score, nonce, raw_report) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)`, - [tenantId, report.stack_name, report.stack_fingerprint, report.agent_version, report.scanned_at, report.state_serial, report.lineage, report.total_resources, report.drift_score, report.nonce, JSON.stringify(report)] - ); - }); - - // Trigger notification if drift score exceeds threshold - if (report.drift_score > 0) { - try { - const { dispatchNotifications, shouldNotify } = await import('../notifications/service.js'); - const { withTenant } = await import('../data/db.js'); - - const maxSeverity = report.drifted_resources?.some((r: any) => r.severity === 'critical') ? 'critical' - : report.drifted_resources?.some((r: any) => r.severity === 'high') ? 'high' : 'medium'; - - // Fetch tenant notification channels - const channelResult = await withTenant(pool, tenantId, async (client: any) => { - return client.query('SELECT * FROM notification_channels WHERE enabled = true'); - }); - const channels = channelResult.rows.filter((ch: any) => shouldNotify(ch, report.drift_score, maxSeverity)); - - if (channels.length > 0) { - await dispatchNotifications(channels, { - tenantId, - stackName: report.stack_name, - driftScore: report.drift_score, - totalResources: report.total_resources, - totalDrifted: report.drifted_resources?.length ?? 0, - criticalCount: report.drifted_resources?.filter((r: any) => r.severity === 'critical').length ?? 0, - highCount: report.drifted_resources?.filter((r: any) => r.severity === 'high').length ?? 0, - reportUrl: `https://drift.dd0c.dev/reports/${report.stack_fingerprint}`, - }, maxSeverity); - } - } catch (err) { - app.log.warn({ tenantId, error: (err as Error).message }, 'Notification dispatch failed (non-fatal)'); - } - } + await processDriftReport(app, tenantId, report); return reply.status(201).send({ status: 'accepted' }); }); + + // Chunked report ingestion + app.post('/v1/ingest/drift/chunk', async (request, reply) => { + const parsed = chunkSchema.safeParse(request.body); + if (!parsed.success) { + return reply.status(400).send({ error: 'Invalid chunk', details: parsed.error.flatten() }); + } + + const tenantId = (request as any).tenantId; + const redis = (app as any).redis; + const chunk = parsed.data; + + const result = await storeChunk(redis, chunk); + + if (!result.complete) { + return reply.status(202).send({ + status: 'chunk_received', + received: result.received, + total: result.total, + }); + } + + // All chunks received — validate and process the assembled report + const assembled = result.assembled!; + const reportParsed = driftReportSchema.safeParse(assembled); + if (!reportParsed.success) { + return reply.status(400).send({ error: 'Assembled report invalid', details: reportParsed.error.flatten() }); + } + + const report = reportParsed.data; + + // Nonce replay prevention on assembled report + const nonceKey = `nonce:${report.nonce}`; + const exists = await redis.exists(nonceKey); + if (exists) { + return reply.status(409).send({ error: 'Nonce already used (replay rejected)' }); + } + await redis.setex(nonceKey, 86400, '1'); + + await processDriftReport(app, tenantId, report); + + return reply.status(201).send({ + status: 'assembled_and_accepted', + received: result.received, + total: result.total, + }); + }); }