feat(drift): add noisy neighbor protection, RBAC forgery prevention, remediation locks
Some checks failed
CI — P2 Drift (Go + Node) / agent (push) Successful in 37s
CI — P2 Drift (Go + Node) / saas (push) Successful in 26s
CI — P2 Drift (Go + Node) / build-push (push) Failing after 45s

- Fair-share tenant processing: weighted round-robin, per-tenant queue depth tracking
- API key → stack ownership validation on all ingestion routes
- Enhanced replay attack prevention (timestamp + nonce + report_id dedup)
- Remediation lock: Redis-based mutex prevents scan/remediation race conditions
- Reports during active remediation tagged and excluded from scoring
- 006_noisy_neighbor.sql migration
This commit is contained in:
Max
2026-03-03 13:42:34 +00:00
parent f133ca8ff6
commit ffe2b63877
7 changed files with 408 additions and 23 deletions

View File

@@ -0,0 +1,29 @@
-- 006: Noisy neighbor protection, remediation locks, fair-share processing
-- Add during_remediation flag to drift_reports
ALTER TABLE drift_reports ADD COLUMN IF NOT EXISTS during_remediation BOOLEAN NOT NULL DEFAULT false;
-- Add processing_priority to stacks (tracked via drift_reports for now)
-- We use a dedicated lightweight table so we can set priority per-stack without a full stacks table
CREATE TABLE IF NOT EXISTS stack_settings (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL REFERENCES tenants(id) ON DELETE CASCADE,
stack_name TEXT NOT NULL,
processing_priority TEXT NOT NULL DEFAULT 'normal' CHECK (processing_priority IN ('low', 'normal', 'high')),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE(tenant_id, stack_name)
);
ALTER TABLE stack_settings ENABLE ROW LEVEL SECURITY;
CREATE POLICY tenant_iso_stack_settings ON stack_settings
USING (tenant_id::text = current_setting('app.tenant_id', true));
-- Index for fair-share queue queries: find reports per tenant ordered by time
CREATE INDEX IF NOT EXISTS idx_drift_reports_tenant_created
ON drift_reports(tenant_id, created_at);
-- Index for remediation lock lookups
CREATE INDEX IF NOT EXISTS idx_remediations_active
ON remediations(tenant_id, stack_name, status)
WHERE status IN ('pending', 'in_progress');

View File

@@ -0,0 +1,49 @@
import type { FastifyRequest, FastifyReply } from 'fastify';
import type { Pool } from 'pg';
import pino from 'pino';
const logger = pino({ name: 'ownership-guard' });
/**
* Middleware that validates the authenticated API key's org (tenant) owns
* the stack_name referenced in the request payload.
*
* Runs on ingestion routes. Extracts tenantId from the already-authenticated
* request and cross-checks against the stack's tenant in the database.
*
* For new stacks (first report), we allow the request through — the stack
* will be created under this tenant. For existing stacks, we enforce ownership.
*/
export function ownershipGuard(pool: Pool) {
return async (req: FastifyRequest, reply: FastifyReply) => {
const tenantId = (req as any).tenantId;
if (!tenantId) {
return reply.status(401).send({ error: 'Missing authentication context' });
}
const body = req.body as Record<string, unknown> | undefined;
if (!body) return; // No body to validate
const stackName = body.stack_name as string | undefined;
if (!stackName) return; // No stack_name in payload — let route validation handle it
try {
// Check if this stack_name has ever been reported by a DIFFERENT tenant
const result = await pool.query(
`SELECT tenant_id FROM drift_reports
WHERE stack_name = $1 AND tenant_id != $2
LIMIT 1`,
[stackName, tenantId],
);
if (result.rows.length > 0) {
logger.warn({ tenantId, stackName, ownerTenantId: result.rows[0].tenant_id },
'Stack ownership mismatch — rejecting');
return reply.status(403).send({ error: 'Stack not owned by this organization' });
}
} catch (err) {
logger.error({ error: (err as Error).message }, 'Ownership check failed');
return reply.status(500).send({ error: 'Ownership validation failed' });
}
};
}

View File

@@ -0,0 +1,58 @@
import type Redis from 'ioredis';
import pino from 'pino';
const logger = pino({ name: 'remediation-lock' });
const LOCK_PREFIX = 'lock:remediation:';
const DEFAULT_TTL_SECONDS = 30 * 60; // 30 minutes
/**
* Acquire a remediation lock on a stack.
* Returns true if lock was acquired, false if already locked.
*/
export async function acquireRemediationLock(
redis: Redis,
stackName: string,
ttlSeconds: number = DEFAULT_TTL_SECONDS,
): Promise<boolean> {
const key = `${LOCK_PREFIX}${stackName}`;
const result = await redis.set(key, Date.now().toString(), 'EX', ttlSeconds, 'NX');
if (result) {
logger.info({ stackName, ttlSeconds }, 'Remediation lock acquired');
return true;
}
logger.warn({ stackName }, 'Remediation lock already held');
return false;
}
/**
* Release a remediation lock on a stack.
*/
export async function releaseRemediationLock(
redis: Redis,
stackName: string,
): Promise<void> {
const key = `${LOCK_PREFIX}${stackName}`;
await redis.del(key);
logger.info({ stackName }, 'Remediation lock released');
}
/**
* Check if a stack is currently under remediation.
*/
export async function isRemediationLocked(
redis: Redis,
stackName: string,
): Promise<{ locked: boolean; lockedSince?: number; ttlRemaining?: number }> {
const key = `${LOCK_PREFIX}${stackName}`;
const value = await redis.get(key);
if (!value) {
return { locked: false };
}
const ttl = await redis.ttl(key);
return {
locked: true,
lockedSince: parseInt(value),
ttlRemaining: ttl > 0 ? ttl : undefined,
};
}

View File

@@ -9,8 +9,10 @@ import { registerProcessorRoutes } from './processor/routes.js';
import { registerApiRoutes } from './api/routes.js'; import { registerApiRoutes } from './api/routes.js';
import { registerAnalyticsRoutes } from './api/analytics.js'; import { registerAnalyticsRoutes } from './api/analytics.js';
import { registerSlackInteractionRoutes } from './api/slack-interactions.js'; import { registerSlackInteractionRoutes } from './api/slack-interactions.js';
import { authHook, decorateAuth, registerAuthRoutes, registerProtectedAuthRoutes } from './auth/middleware.js'; import { authHook, decorateAuth, registerAuthRoutes, registerProtectedAuthRoutes, requireRole } from './auth/middleware.js';
import { runDailyDigest } from './notifications/daily-digest.js'; import { runDailyDigest } from './notifications/daily-digest.js';
import { getQueueDepthStats } from './processor/fair-share.js';
import { isRemediationLocked, acquireRemediationLock, releaseRemediationLock } from './execution/remediation-lock.js';
const app = Fastify({ const app = Fastify({
logger: { logger: {
@@ -53,6 +55,44 @@ async function start() {
await registerProcessorRoutes(protectedApp); await registerProcessorRoutes(protectedApp);
await registerApiRoutes(protectedApp); await registerApiRoutes(protectedApp);
await registerAnalyticsRoutes(protectedApp); await registerAnalyticsRoutes(protectedApp);
// --- Admin: fair-share queue depth ---
protectedApp.get('/api/v1/admin/queue-depth', async (request, reply) => {
if (!requireRole(request, reply, 'admin')) return;
const stats = await getQueueDepthStats(redis);
return reply.send({ queue_depth: stats });
});
// --- Stack remediation lock status ---
protectedApp.get('/api/v1/stacks/:name/lock', async (request, reply) => {
const { name } = request.params as { name: string };
const status = await isRemediationLocked(redis, name);
return reply.send({
stack_name: name,
locked: status.locked,
locked_since: status.lockedSince ? new Date(status.lockedSince).toISOString() : null,
ttl_remaining_seconds: status.ttlRemaining ?? null,
});
});
// --- Acquire remediation lock (used when starting remediation) ---
protectedApp.post('/api/v1/stacks/:name/lock', async (request, reply) => {
if (!requireRole(request, reply, 'admin')) return;
const { name } = request.params as { name: string };
const acquired = await acquireRemediationLock(redis, name);
if (!acquired) {
return reply.status(409).send({ error: 'Stack is already locked for remediation' });
}
return reply.status(201).send({ stack_name: name, locked: true });
});
// --- Release remediation lock ---
protectedApp.delete('/api/v1/stacks/:name/lock', async (request, reply) => {
if (!requireRole(request, reply, 'admin')) return;
const { name } = request.params as { name: string };
await releaseRemediationLock(redis, name);
return reply.send({ stack_name: name, locked: false });
});
}); });
await app.listen({ port: config.port, host: '0.0.0.0' }); await app.listen({ port: config.port, host: '0.0.0.0' });

View File

@@ -0,0 +1,97 @@
import type Redis from 'ioredis';
import type { Pool } from 'pg';
import pino from 'pino';
const logger = pino({ name: 'fair-share' });
const QUEUE_KEY = 'fairshare:queue';
const DEPTH_KEY_PREFIX = 'fairshare:depth:';
const DEFAULT_MAX_PER_CYCLE = 1000;
export interface FairShareConfig {
maxResourcesPerTenantPerCycle: number;
}
const defaultConfig: FairShareConfig = {
maxResourcesPerTenantPerCycle: DEFAULT_MAX_PER_CYCLE,
};
/**
* Enqueue a drift report for fair-share processing.
* Uses Redis sorted sets keyed per-tenant to track queue depth.
*/
export async function enqueueForProcessing(
redis: Redis,
tenantId: string,
reportId: string,
resourceCount: number,
): Promise<void> {
const now = Date.now();
// Per-tenant sorted set: score = timestamp, member = reportId
const tenantQueueKey = `${QUEUE_KEY}:${tenantId}`;
await redis.zadd(tenantQueueKey, now.toString(), reportId);
// Track tenant in the global tenant set
await redis.sadd(QUEUE_KEY + ':tenants', tenantId);
// Track queue depth (resource count) per tenant
await redis.incrby(`${DEPTH_KEY_PREFIX}${tenantId}`, resourceCount);
logger.info({ tenantId, reportId, resourceCount }, 'Enqueued for fair-share processing');
}
/**
* Dequeue reports using weighted round-robin across tenants.
* Each tenant gets up to maxResourcesPerTenantPerCycle resources processed per cycle.
*/
export async function dequeueRoundRobin(
redis: Redis,
config: FairShareConfig = defaultConfig,
): Promise<Array<{ tenantId: string; reportId: string }>> {
const tenants = await redis.smembers(QUEUE_KEY + ':tenants');
if (tenants.length === 0) return [];
const results: Array<{ tenantId: string; reportId: string }> = [];
for (const tenantId of tenants) {
const tenantQueueKey = `${QUEUE_KEY}:${tenantId}`;
// Pop oldest entries up to the per-tenant limit
// We use ZPOPMIN to get the oldest (lowest score = earliest timestamp)
const entries = await redis.zpopmin(tenantQueueKey, config.maxResourcesPerTenantPerCycle);
// zpopmin returns [member, score, member, score, ...]
for (let i = 0; i < entries.length; i += 2) {
const reportId = entries[i];
if (reportId) {
results.push({ tenantId, reportId });
}
}
// Check if tenant queue is now empty
const remaining = await redis.zcard(tenantQueueKey);
if (remaining === 0) {
await redis.srem(QUEUE_KEY + ':tenants', tenantId);
await redis.del(`${DEPTH_KEY_PREFIX}${tenantId}`);
}
}
return results;
}
/**
* Get per-tenant queue depth stats for the admin endpoint.
*/
export async function getQueueDepthStats(redis: Redis): Promise<Record<string, { pending: number; resourceCount: number }>> {
const tenants = await redis.smembers(QUEUE_KEY + ':tenants');
const stats: Record<string, { pending: number; resourceCount: number }> = {};
for (const tenantId of tenants) {
const tenantQueueKey = `${QUEUE_KEY}:${tenantId}`;
const pending = await redis.zcard(tenantQueueKey);
const resourceCount = parseInt(await redis.get(`${DEPTH_KEY_PREFIX}${tenantId}`) ?? '0');
stats[tenantId] = { pending, resourceCount };
}
return stats;
}

View File

@@ -0,0 +1,76 @@
import type { FastifyRequest, FastifyReply } from 'fastify';
import type Redis from 'ioredis';
import type { Pool } from 'pg';
import pino from 'pino';
const logger = pino({ name: 'replay-guard' });
const MAX_AGE_MS = 5 * 60 * 1000; // 5 minutes
const NONCE_TTL_SECONDS = 600; // 10 minutes
/**
* Enhanced replay attack prevention middleware.
* Checks:
* 1. Timestamp freshness (reject if older than 5 minutes)
* 2. Nonce uniqueness via Redis SET with 10min TTL
* 3. report_id deduplication against the database
*
* This replaces the inline nonce check in the ingestion route with a
* more comprehensive guard.
*/
export function replayGuard(redis: Redis, pool: Pool) {
return async (req: FastifyRequest, reply: FastifyReply) => {
const body = req.body as Record<string, unknown> | undefined;
if (!body) return;
const nonce = body.nonce as string | undefined;
const scannedAt = body.scanned_at as string | undefined;
const reportId = body.report_id as string | undefined;
// --- 1. Timestamp freshness ---
if (scannedAt) {
const ts = new Date(scannedAt).getTime();
if (isNaN(ts)) {
return reply.status(400).send({ error: 'Invalid scanned_at timestamp' });
}
const age = Date.now() - ts;
if (age > MAX_AGE_MS) {
logger.warn({ nonce, scannedAt, ageMs: age }, 'Replay rejected: timestamp too old');
return reply.status(409).send({ error: 'Report timestamp too old (max 5 minutes)' });
}
// Also reject future timestamps (clock skew tolerance: 30s)
if (age < -30_000) {
logger.warn({ nonce, scannedAt, ageMs: age }, 'Replay rejected: timestamp in the future');
return reply.status(409).send({ error: 'Report timestamp is in the future' });
}
}
// --- 2. Nonce uniqueness (Redis) ---
if (nonce) {
const nonceKey = `nonce:${nonce}`;
// SETNX returns 1 if key was set (new nonce), 0 if already exists
const wasSet = await redis.set(nonceKey, '1', 'EX', NONCE_TTL_SECONDS, 'NX');
if (!wasSet) {
logger.warn({ nonce }, 'Replay rejected: nonce already seen');
return reply.status(409).send({ error: 'Nonce already used (replay rejected)' });
}
}
// --- 3. report_id deduplication (database) ---
if (reportId) {
try {
const result = await pool.query(
'SELECT 1 FROM drift_reports WHERE id = $1 LIMIT 1',
[reportId],
);
if (result.rows.length > 0) {
logger.warn({ reportId }, 'Replay rejected: report_id already exists');
return reply.status(409).send({ error: 'Report already processed (duplicate report_id)' });
}
} catch (err) {
logger.error({ error: (err as Error).message }, 'report_id dedup check failed');
// Non-fatal — continue processing
}
}
};
}

View File

@@ -3,6 +3,10 @@ import { z } from 'zod';
import { withTenant } from '../data/db.js'; import { withTenant } from '../data/db.js';
import { normalizeReport } from './normalizer.js'; import { normalizeReport } from './normalizer.js';
import { storeChunk } from './chunk-assembler.js'; import { storeChunk } from './chunk-assembler.js';
import { replayGuard } from './replay-guard.js';
import { ownershipGuard } from '../auth/ownership-guard.js';
import { isRemediationLocked } from '../execution/remediation-lock.js';
import { enqueueForProcessing } from './fair-share.js';
const driftReportSchema = z.object({ const driftReportSchema = z.object({
stack_name: z.string(), stack_name: z.string(),
@@ -36,8 +40,14 @@ const chunkSchema = z.object({
data: z.record(z.unknown()), data: z.record(z.unknown()),
}); });
async function processDriftReport(app: FastifyInstance, tenantId: string, report: z.infer<typeof driftReportSchema>) { async function processDriftReport(
app: FastifyInstance,
tenantId: string,
report: z.infer<typeof driftReportSchema>,
duringRemediation: boolean = false,
) {
const pool = (app as any).pool; const pool = (app as any).pool;
const redis = (app as any).redis;
// Normalize drifted resources into canonical schema // Normalize drifted resources into canonical schema
const canonicalResources = report.drifted_resources?.length const canonicalResources = report.drifted_resources?.length
@@ -46,12 +56,26 @@ async function processDriftReport(app: FastifyInstance, tenantId: string, report
await withTenant(pool, tenantId, async (client) => { await withTenant(pool, tenantId, async (client) => {
await client.query( await client.query(
`INSERT INTO drift_reports (tenant_id, stack_name, stack_fingerprint, agent_version, scanned_at, state_serial, lineage, total_resources, drift_score, nonce, raw_report, canonical_resources) `INSERT INTO drift_reports (tenant_id, stack_name, stack_fingerprint, agent_version, scanned_at, state_serial, lineage, total_resources, drift_score, nonce, raw_report, canonical_resources, during_remediation)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)`, VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)`,
[tenantId, report.stack_name, report.stack_fingerprint, report.agent_version, report.scanned_at, report.state_serial, report.lineage, report.total_resources, report.drift_score, report.nonce, JSON.stringify(report), JSON.stringify(canonicalResources)] [tenantId, report.stack_name, report.stack_fingerprint, report.agent_version, report.scanned_at, report.state_serial, report.lineage, report.total_resources, report.drift_score, report.nonce, JSON.stringify(report), JSON.stringify(canonicalResources), duringRemediation]
); );
}); });
// Enqueue for fair-share processing
await enqueueForProcessing(
redis,
tenantId,
report.nonce, // use nonce as unique report identifier
report.drifted_resources?.length ?? 0,
);
// Skip notifications and scoring if during active remediation
if (duringRemediation) {
app.log.info({ tenantId, stackName: report.stack_name }, 'Report during remediation — skipping alerts/scoring');
return;
}
// Trigger notification if drift score exceeds threshold // Trigger notification if drift score exceeds threshold
if (report.drift_score > 0) { if (report.drift_score > 0) {
try { try {
@@ -84,7 +108,16 @@ async function processDriftReport(app: FastifyInstance, tenantId: string, report
} }
export async function registerProcessorRoutes(app: FastifyInstance) { export async function registerProcessorRoutes(app: FastifyInstance) {
app.post('/v1/ingest/drift', async (request, reply) => { const pool = (app as any).pool;
const redis = (app as any).redis;
// Wire ownership guard + replay guard on ingestion routes
const ownership = ownershipGuard(pool);
const replay = replayGuard(redis, pool);
app.post('/v1/ingest/drift', {
preHandler: [ownership, replay],
}, async (request, reply) => {
const parsed = driftReportSchema.safeParse(request.body); const parsed = driftReportSchema.safeParse(request.body);
if (!parsed.success) { if (!parsed.success) {
return reply.status(400).send({ error: 'Invalid drift report', details: parsed.error.flatten() }); return reply.status(400).send({ error: 'Invalid drift report', details: parsed.error.flatten() });
@@ -93,29 +126,28 @@ export async function registerProcessorRoutes(app: FastifyInstance) {
const report = parsed.data; const report = parsed.data;
const tenantId = (request as any).tenantId; const tenantId = (request as any).tenantId;
// Nonce replay prevention // Check remediation lock
const redis = (app as any).redis; const lockStatus = await isRemediationLocked(redis, report.stack_name);
const nonceKey = `nonce:${report.nonce}`; const duringRemediation = lockStatus.locked;
const exists = await redis.exists(nonceKey);
if (exists) {
return reply.status(409).send({ error: 'Nonce already used (replay rejected)' });
}
await redis.setex(nonceKey, 86400, '1'); // 24h TTL
await processDriftReport(app, tenantId, report); await processDriftReport(app, tenantId, report, duringRemediation);
return reply.status(201).send({ status: 'accepted' }); return reply.status(201).send({
status: 'accepted',
during_remediation: duringRemediation,
});
}); });
// Chunked report ingestion // Chunked report ingestion
app.post('/v1/ingest/drift/chunk', async (request, reply) => { app.post('/v1/ingest/drift/chunk', {
preHandler: [ownership],
}, async (request, reply) => {
const parsed = chunkSchema.safeParse(request.body); const parsed = chunkSchema.safeParse(request.body);
if (!parsed.success) { if (!parsed.success) {
return reply.status(400).send({ error: 'Invalid chunk', details: parsed.error.flatten() }); return reply.status(400).send({ error: 'Invalid chunk', details: parsed.error.flatten() });
} }
const tenantId = (request as any).tenantId; const tenantId = (request as any).tenantId;
const redis = (app as any).redis;
const chunk = parsed.data; const chunk = parsed.data;
const result = await storeChunk(redis, chunk); const result = await storeChunk(redis, chunk);
@@ -137,20 +169,24 @@ export async function registerProcessorRoutes(app: FastifyInstance) {
const report = reportParsed.data; const report = reportParsed.data;
// Nonce replay prevention on assembled report // Enhanced replay check on assembled report
const nonceKey = `nonce:${report.nonce}`; const nonceKey = `nonce:${report.nonce}`;
const exists = await redis.exists(nonceKey); const wasSet = await redis.set(nonceKey, '1', 'EX', 600, 'NX');
if (exists) { if (!wasSet) {
return reply.status(409).send({ error: 'Nonce already used (replay rejected)' }); return reply.status(409).send({ error: 'Nonce already used (replay rejected)' });
} }
await redis.setex(nonceKey, 86400, '1');
await processDriftReport(app, tenantId, report); // Check remediation lock
const lockStatus = await isRemediationLocked(redis, report.stack_name);
const duringRemediation = lockStatus.locked;
await processDriftReport(app, tenantId, report, duringRemediation);
return reply.status(201).send({ return reply.status(201).send({
status: 'assembled_and_accepted', status: 'assembled_and_accepted',
received: result.received, received: result.received,
total: result.total, total: result.total,
during_remediation: duringRemediation,
}); });
}); });
} }