diff --git a/products/02-iac-drift-detection/saas/migrations/006_noisy_neighbor.sql b/products/02-iac-drift-detection/saas/migrations/006_noisy_neighbor.sql new file mode 100644 index 0000000..58fabc9 --- /dev/null +++ b/products/02-iac-drift-detection/saas/migrations/006_noisy_neighbor.sql @@ -0,0 +1,29 @@ +-- 006: Noisy neighbor protection, remediation locks, fair-share processing + +-- Add during_remediation flag to drift_reports +ALTER TABLE drift_reports ADD COLUMN IF NOT EXISTS during_remediation BOOLEAN NOT NULL DEFAULT false; + +-- Add processing_priority to stacks (tracked via drift_reports for now) +-- We use a dedicated lightweight table so we can set priority per-stack without a full stacks table +CREATE TABLE IF NOT EXISTS stack_settings ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + tenant_id UUID NOT NULL REFERENCES tenants(id) ON DELETE CASCADE, + stack_name TEXT NOT NULL, + processing_priority TEXT NOT NULL DEFAULT 'normal' CHECK (processing_priority IN ('low', 'normal', 'high')), + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + UNIQUE(tenant_id, stack_name) +); + +ALTER TABLE stack_settings ENABLE ROW LEVEL SECURITY; +CREATE POLICY tenant_iso_stack_settings ON stack_settings + USING (tenant_id::text = current_setting('app.tenant_id', true)); + +-- Index for fair-share queue queries: find reports per tenant ordered by time +CREATE INDEX IF NOT EXISTS idx_drift_reports_tenant_created + ON drift_reports(tenant_id, created_at); + +-- Index for remediation lock lookups +CREATE INDEX IF NOT EXISTS idx_remediations_active + ON remediations(tenant_id, stack_name, status) + WHERE status IN ('pending', 'in_progress'); diff --git a/products/02-iac-drift-detection/saas/src/auth/ownership-guard.ts b/products/02-iac-drift-detection/saas/src/auth/ownership-guard.ts new file mode 100644 index 0000000..6da6cd1 --- /dev/null +++ b/products/02-iac-drift-detection/saas/src/auth/ownership-guard.ts @@ -0,0 +1,49 @@ +import type { FastifyRequest, FastifyReply } from 'fastify'; +import type { Pool } from 'pg'; +import pino from 'pino'; + +const logger = pino({ name: 'ownership-guard' }); + +/** + * Middleware that validates the authenticated API key's org (tenant) owns + * the stack_name referenced in the request payload. + * + * Runs on ingestion routes. Extracts tenantId from the already-authenticated + * request and cross-checks against the stack's tenant in the database. + * + * For new stacks (first report), we allow the request through — the stack + * will be created under this tenant. For existing stacks, we enforce ownership. + */ +export function ownershipGuard(pool: Pool) { + return async (req: FastifyRequest, reply: FastifyReply) => { + const tenantId = (req as any).tenantId; + if (!tenantId) { + return reply.status(401).send({ error: 'Missing authentication context' }); + } + + const body = req.body as Record | undefined; + if (!body) return; // No body to validate + + const stackName = body.stack_name as string | undefined; + if (!stackName) return; // No stack_name in payload — let route validation handle it + + try { + // Check if this stack_name has ever been reported by a DIFFERENT tenant + const result = await pool.query( + `SELECT tenant_id FROM drift_reports + WHERE stack_name = $1 AND tenant_id != $2 + LIMIT 1`, + [stackName, tenantId], + ); + + if (result.rows.length > 0) { + logger.warn({ tenantId, stackName, ownerTenantId: result.rows[0].tenant_id }, + 'Stack ownership mismatch — rejecting'); + return reply.status(403).send({ error: 'Stack not owned by this organization' }); + } + } catch (err) { + logger.error({ error: (err as Error).message }, 'Ownership check failed'); + return reply.status(500).send({ error: 'Ownership validation failed' }); + } + }; +} diff --git a/products/02-iac-drift-detection/saas/src/execution/remediation-lock.ts b/products/02-iac-drift-detection/saas/src/execution/remediation-lock.ts new file mode 100644 index 0000000..7e698a3 --- /dev/null +++ b/products/02-iac-drift-detection/saas/src/execution/remediation-lock.ts @@ -0,0 +1,58 @@ +import type Redis from 'ioredis'; +import pino from 'pino'; + +const logger = pino({ name: 'remediation-lock' }); + +const LOCK_PREFIX = 'lock:remediation:'; +const DEFAULT_TTL_SECONDS = 30 * 60; // 30 minutes + +/** + * Acquire a remediation lock on a stack. + * Returns true if lock was acquired, false if already locked. + */ +export async function acquireRemediationLock( + redis: Redis, + stackName: string, + ttlSeconds: number = DEFAULT_TTL_SECONDS, +): Promise { + const key = `${LOCK_PREFIX}${stackName}`; + const result = await redis.set(key, Date.now().toString(), 'EX', ttlSeconds, 'NX'); + if (result) { + logger.info({ stackName, ttlSeconds }, 'Remediation lock acquired'); + return true; + } + logger.warn({ stackName }, 'Remediation lock already held'); + return false; +} + +/** + * Release a remediation lock on a stack. + */ +export async function releaseRemediationLock( + redis: Redis, + stackName: string, +): Promise { + const key = `${LOCK_PREFIX}${stackName}`; + await redis.del(key); + logger.info({ stackName }, 'Remediation lock released'); +} + +/** + * Check if a stack is currently under remediation. + */ +export async function isRemediationLocked( + redis: Redis, + stackName: string, +): Promise<{ locked: boolean; lockedSince?: number; ttlRemaining?: number }> { + const key = `${LOCK_PREFIX}${stackName}`; + const value = await redis.get(key); + if (!value) { + return { locked: false }; + } + const ttl = await redis.ttl(key); + return { + locked: true, + lockedSince: parseInt(value), + ttlRemaining: ttl > 0 ? ttl : undefined, + }; +} diff --git a/products/02-iac-drift-detection/saas/src/index.ts b/products/02-iac-drift-detection/saas/src/index.ts index b451c31..5b93d94 100644 --- a/products/02-iac-drift-detection/saas/src/index.ts +++ b/products/02-iac-drift-detection/saas/src/index.ts @@ -9,8 +9,10 @@ import { registerProcessorRoutes } from './processor/routes.js'; import { registerApiRoutes } from './api/routes.js'; import { registerAnalyticsRoutes } from './api/analytics.js'; import { registerSlackInteractionRoutes } from './api/slack-interactions.js'; -import { authHook, decorateAuth, registerAuthRoutes, registerProtectedAuthRoutes } from './auth/middleware.js'; +import { authHook, decorateAuth, registerAuthRoutes, registerProtectedAuthRoutes, requireRole } from './auth/middleware.js'; import { runDailyDigest } from './notifications/daily-digest.js'; +import { getQueueDepthStats } from './processor/fair-share.js'; +import { isRemediationLocked, acquireRemediationLock, releaseRemediationLock } from './execution/remediation-lock.js'; const app = Fastify({ logger: { @@ -53,6 +55,44 @@ async function start() { await registerProcessorRoutes(protectedApp); await registerApiRoutes(protectedApp); await registerAnalyticsRoutes(protectedApp); + + // --- Admin: fair-share queue depth --- + protectedApp.get('/api/v1/admin/queue-depth', async (request, reply) => { + if (!requireRole(request, reply, 'admin')) return; + const stats = await getQueueDepthStats(redis); + return reply.send({ queue_depth: stats }); + }); + + // --- Stack remediation lock status --- + protectedApp.get('/api/v1/stacks/:name/lock', async (request, reply) => { + const { name } = request.params as { name: string }; + const status = await isRemediationLocked(redis, name); + return reply.send({ + stack_name: name, + locked: status.locked, + locked_since: status.lockedSince ? new Date(status.lockedSince).toISOString() : null, + ttl_remaining_seconds: status.ttlRemaining ?? null, + }); + }); + + // --- Acquire remediation lock (used when starting remediation) --- + protectedApp.post('/api/v1/stacks/:name/lock', async (request, reply) => { + if (!requireRole(request, reply, 'admin')) return; + const { name } = request.params as { name: string }; + const acquired = await acquireRemediationLock(redis, name); + if (!acquired) { + return reply.status(409).send({ error: 'Stack is already locked for remediation' }); + } + return reply.status(201).send({ stack_name: name, locked: true }); + }); + + // --- Release remediation lock --- + protectedApp.delete('/api/v1/stacks/:name/lock', async (request, reply) => { + if (!requireRole(request, reply, 'admin')) return; + const { name } = request.params as { name: string }; + await releaseRemediationLock(redis, name); + return reply.send({ stack_name: name, locked: false }); + }); }); await app.listen({ port: config.port, host: '0.0.0.0' }); diff --git a/products/02-iac-drift-detection/saas/src/processor/fair-share.ts b/products/02-iac-drift-detection/saas/src/processor/fair-share.ts new file mode 100644 index 0000000..15c58fe --- /dev/null +++ b/products/02-iac-drift-detection/saas/src/processor/fair-share.ts @@ -0,0 +1,97 @@ +import type Redis from 'ioredis'; +import type { Pool } from 'pg'; +import pino from 'pino'; + +const logger = pino({ name: 'fair-share' }); + +const QUEUE_KEY = 'fairshare:queue'; +const DEPTH_KEY_PREFIX = 'fairshare:depth:'; +const DEFAULT_MAX_PER_CYCLE = 1000; + +export interface FairShareConfig { + maxResourcesPerTenantPerCycle: number; +} + +const defaultConfig: FairShareConfig = { + maxResourcesPerTenantPerCycle: DEFAULT_MAX_PER_CYCLE, +}; + +/** + * Enqueue a drift report for fair-share processing. + * Uses Redis sorted sets keyed per-tenant to track queue depth. + */ +export async function enqueueForProcessing( + redis: Redis, + tenantId: string, + reportId: string, + resourceCount: number, +): Promise { + const now = Date.now(); + // Per-tenant sorted set: score = timestamp, member = reportId + const tenantQueueKey = `${QUEUE_KEY}:${tenantId}`; + await redis.zadd(tenantQueueKey, now.toString(), reportId); + + // Track tenant in the global tenant set + await redis.sadd(QUEUE_KEY + ':tenants', tenantId); + + // Track queue depth (resource count) per tenant + await redis.incrby(`${DEPTH_KEY_PREFIX}${tenantId}`, resourceCount); + + logger.info({ tenantId, reportId, resourceCount }, 'Enqueued for fair-share processing'); +} + +/** + * Dequeue reports using weighted round-robin across tenants. + * Each tenant gets up to maxResourcesPerTenantPerCycle resources processed per cycle. + */ +export async function dequeueRoundRobin( + redis: Redis, + config: FairShareConfig = defaultConfig, +): Promise> { + const tenants = await redis.smembers(QUEUE_KEY + ':tenants'); + if (tenants.length === 0) return []; + + const results: Array<{ tenantId: string; reportId: string }> = []; + + for (const tenantId of tenants) { + const tenantQueueKey = `${QUEUE_KEY}:${tenantId}`; + + // Pop oldest entries up to the per-tenant limit + // We use ZPOPMIN to get the oldest (lowest score = earliest timestamp) + const entries = await redis.zpopmin(tenantQueueKey, config.maxResourcesPerTenantPerCycle); + + // zpopmin returns [member, score, member, score, ...] + for (let i = 0; i < entries.length; i += 2) { + const reportId = entries[i]; + if (reportId) { + results.push({ tenantId, reportId }); + } + } + + // Check if tenant queue is now empty + const remaining = await redis.zcard(tenantQueueKey); + if (remaining === 0) { + await redis.srem(QUEUE_KEY + ':tenants', tenantId); + await redis.del(`${DEPTH_KEY_PREFIX}${tenantId}`); + } + } + + return results; +} + +/** + * Get per-tenant queue depth stats for the admin endpoint. + */ +export async function getQueueDepthStats(redis: Redis): Promise> { + const tenants = await redis.smembers(QUEUE_KEY + ':tenants'); + const stats: Record = {}; + + for (const tenantId of tenants) { + const tenantQueueKey = `${QUEUE_KEY}:${tenantId}`; + const pending = await redis.zcard(tenantQueueKey); + const resourceCount = parseInt(await redis.get(`${DEPTH_KEY_PREFIX}${tenantId}`) ?? '0'); + stats[tenantId] = { pending, resourceCount }; + } + + return stats; +} diff --git a/products/02-iac-drift-detection/saas/src/processor/replay-guard.ts b/products/02-iac-drift-detection/saas/src/processor/replay-guard.ts new file mode 100644 index 0000000..bcf8834 --- /dev/null +++ b/products/02-iac-drift-detection/saas/src/processor/replay-guard.ts @@ -0,0 +1,76 @@ +import type { FastifyRequest, FastifyReply } from 'fastify'; +import type Redis from 'ioredis'; +import type { Pool } from 'pg'; +import pino from 'pino'; + +const logger = pino({ name: 'replay-guard' }); + +const MAX_AGE_MS = 5 * 60 * 1000; // 5 minutes +const NONCE_TTL_SECONDS = 600; // 10 minutes + +/** + * Enhanced replay attack prevention middleware. + * Checks: + * 1. Timestamp freshness (reject if older than 5 minutes) + * 2. Nonce uniqueness via Redis SET with 10min TTL + * 3. report_id deduplication against the database + * + * This replaces the inline nonce check in the ingestion route with a + * more comprehensive guard. + */ +export function replayGuard(redis: Redis, pool: Pool) { + return async (req: FastifyRequest, reply: FastifyReply) => { + const body = req.body as Record | undefined; + if (!body) return; + + const nonce = body.nonce as string | undefined; + const scannedAt = body.scanned_at as string | undefined; + const reportId = body.report_id as string | undefined; + + // --- 1. Timestamp freshness --- + if (scannedAt) { + const ts = new Date(scannedAt).getTime(); + if (isNaN(ts)) { + return reply.status(400).send({ error: 'Invalid scanned_at timestamp' }); + } + const age = Date.now() - ts; + if (age > MAX_AGE_MS) { + logger.warn({ nonce, scannedAt, ageMs: age }, 'Replay rejected: timestamp too old'); + return reply.status(409).send({ error: 'Report timestamp too old (max 5 minutes)' }); + } + // Also reject future timestamps (clock skew tolerance: 30s) + if (age < -30_000) { + logger.warn({ nonce, scannedAt, ageMs: age }, 'Replay rejected: timestamp in the future'); + return reply.status(409).send({ error: 'Report timestamp is in the future' }); + } + } + + // --- 2. Nonce uniqueness (Redis) --- + if (nonce) { + const nonceKey = `nonce:${nonce}`; + // SETNX returns 1 if key was set (new nonce), 0 if already exists + const wasSet = await redis.set(nonceKey, '1', 'EX', NONCE_TTL_SECONDS, 'NX'); + if (!wasSet) { + logger.warn({ nonce }, 'Replay rejected: nonce already seen'); + return reply.status(409).send({ error: 'Nonce already used (replay rejected)' }); + } + } + + // --- 3. report_id deduplication (database) --- + if (reportId) { + try { + const result = await pool.query( + 'SELECT 1 FROM drift_reports WHERE id = $1 LIMIT 1', + [reportId], + ); + if (result.rows.length > 0) { + logger.warn({ reportId }, 'Replay rejected: report_id already exists'); + return reply.status(409).send({ error: 'Report already processed (duplicate report_id)' }); + } + } catch (err) { + logger.error({ error: (err as Error).message }, 'report_id dedup check failed'); + // Non-fatal — continue processing + } + } + }; +} diff --git a/products/02-iac-drift-detection/saas/src/processor/routes.ts b/products/02-iac-drift-detection/saas/src/processor/routes.ts index 6f526b4..5d37774 100644 --- a/products/02-iac-drift-detection/saas/src/processor/routes.ts +++ b/products/02-iac-drift-detection/saas/src/processor/routes.ts @@ -3,6 +3,10 @@ import { z } from 'zod'; import { withTenant } from '../data/db.js'; import { normalizeReport } from './normalizer.js'; import { storeChunk } from './chunk-assembler.js'; +import { replayGuard } from './replay-guard.js'; +import { ownershipGuard } from '../auth/ownership-guard.js'; +import { isRemediationLocked } from '../execution/remediation-lock.js'; +import { enqueueForProcessing } from './fair-share.js'; const driftReportSchema = z.object({ stack_name: z.string(), @@ -36,8 +40,14 @@ const chunkSchema = z.object({ data: z.record(z.unknown()), }); -async function processDriftReport(app: FastifyInstance, tenantId: string, report: z.infer) { +async function processDriftReport( + app: FastifyInstance, + tenantId: string, + report: z.infer, + duringRemediation: boolean = false, +) { const pool = (app as any).pool; + const redis = (app as any).redis; // Normalize drifted resources into canonical schema const canonicalResources = report.drifted_resources?.length @@ -46,12 +56,26 @@ async function processDriftReport(app: FastifyInstance, tenantId: string, report await withTenant(pool, tenantId, async (client) => { await client.query( - `INSERT INTO drift_reports (tenant_id, stack_name, stack_fingerprint, agent_version, scanned_at, state_serial, lineage, total_resources, drift_score, nonce, raw_report, canonical_resources) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)`, - [tenantId, report.stack_name, report.stack_fingerprint, report.agent_version, report.scanned_at, report.state_serial, report.lineage, report.total_resources, report.drift_score, report.nonce, JSON.stringify(report), JSON.stringify(canonicalResources)] + `INSERT INTO drift_reports (tenant_id, stack_name, stack_fingerprint, agent_version, scanned_at, state_serial, lineage, total_resources, drift_score, nonce, raw_report, canonical_resources, during_remediation) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)`, + [tenantId, report.stack_name, report.stack_fingerprint, report.agent_version, report.scanned_at, report.state_serial, report.lineage, report.total_resources, report.drift_score, report.nonce, JSON.stringify(report), JSON.stringify(canonicalResources), duringRemediation] ); }); + // Enqueue for fair-share processing + await enqueueForProcessing( + redis, + tenantId, + report.nonce, // use nonce as unique report identifier + report.drifted_resources?.length ?? 0, + ); + + // Skip notifications and scoring if during active remediation + if (duringRemediation) { + app.log.info({ tenantId, stackName: report.stack_name }, 'Report during remediation — skipping alerts/scoring'); + return; + } + // Trigger notification if drift score exceeds threshold if (report.drift_score > 0) { try { @@ -84,7 +108,16 @@ async function processDriftReport(app: FastifyInstance, tenantId: string, report } export async function registerProcessorRoutes(app: FastifyInstance) { - app.post('/v1/ingest/drift', async (request, reply) => { + const pool = (app as any).pool; + const redis = (app as any).redis; + + // Wire ownership guard + replay guard on ingestion routes + const ownership = ownershipGuard(pool); + const replay = replayGuard(redis, pool); + + app.post('/v1/ingest/drift', { + preHandler: [ownership, replay], + }, async (request, reply) => { const parsed = driftReportSchema.safeParse(request.body); if (!parsed.success) { return reply.status(400).send({ error: 'Invalid drift report', details: parsed.error.flatten() }); @@ -93,29 +126,28 @@ export async function registerProcessorRoutes(app: FastifyInstance) { const report = parsed.data; const tenantId = (request as any).tenantId; - // Nonce replay prevention - const redis = (app as any).redis; - const nonceKey = `nonce:${report.nonce}`; - const exists = await redis.exists(nonceKey); - if (exists) { - return reply.status(409).send({ error: 'Nonce already used (replay rejected)' }); - } - await redis.setex(nonceKey, 86400, '1'); // 24h TTL + // Check remediation lock + const lockStatus = await isRemediationLocked(redis, report.stack_name); + const duringRemediation = lockStatus.locked; - await processDriftReport(app, tenantId, report); + await processDriftReport(app, tenantId, report, duringRemediation); - return reply.status(201).send({ status: 'accepted' }); + return reply.status(201).send({ + status: 'accepted', + during_remediation: duringRemediation, + }); }); // Chunked report ingestion - app.post('/v1/ingest/drift/chunk', async (request, reply) => { + app.post('/v1/ingest/drift/chunk', { + preHandler: [ownership], + }, async (request, reply) => { const parsed = chunkSchema.safeParse(request.body); if (!parsed.success) { return reply.status(400).send({ error: 'Invalid chunk', details: parsed.error.flatten() }); } const tenantId = (request as any).tenantId; - const redis = (app as any).redis; const chunk = parsed.data; const result = await storeChunk(redis, chunk); @@ -137,20 +169,24 @@ export async function registerProcessorRoutes(app: FastifyInstance) { const report = reportParsed.data; - // Nonce replay prevention on assembled report + // Enhanced replay check on assembled report const nonceKey = `nonce:${report.nonce}`; - const exists = await redis.exists(nonceKey); - if (exists) { + const wasSet = await redis.set(nonceKey, '1', 'EX', 600, 'NX'); + if (!wasSet) { return reply.status(409).send({ error: 'Nonce already used (replay rejected)' }); } - await redis.setex(nonceKey, 86400, '1'); - await processDriftReport(app, tenantId, report); + // Check remediation lock + const lockStatus = await isRemediationLocked(redis, report.stack_name); + const duringRemediation = lockStatus.locked; + + await processDriftReport(app, tenantId, report, duringRemediation); return reply.status(201).send({ status: 'assembled_and_accepted', received: result.received, total: result.total, + during_remediation: duringRemediation, }); }); }