feat(drift): add normalizer, chunk assembly, daily digest, Slack interactions, analytics
Some checks failed
CI — P2 Drift (Go + Node) / agent (push) Successful in 49s
CI — P2 Drift (Go + Node) / saas (push) Successful in 29s
CI — P2 Drift (Go + Node) / build-push (push) Failing after 48s

- Canonical schema normalizer: cross-provider resource type mapping
- Chunked report reassembly via Redis (10min TTL, out-of-order safe)
- Daily drift digest worker with Slack Block Kit summary
- Slack interactive handler: remediate + accept drift actions
- Analytics API: drift trends and health summary
- 005_drift_features.sql migration (remediations, acceptances, indexes)
This commit is contained in:
Max
2026-03-03 06:56:44 +00:00
parent ef3d00f124
commit f133ca8ff6
9 changed files with 810 additions and 41 deletions

View File

@@ -0,0 +1,39 @@
-- 005: Canonical resources, remediations, drift acceptances, analytics indexes
-- Add canonical_resources JSONB column to drift_reports
ALTER TABLE drift_reports ADD COLUMN IF NOT EXISTS canonical_resources JSONB;
-- Remediations (from Slack interactive actions)
CREATE TABLE IF NOT EXISTS remediations (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL REFERENCES tenants(id) ON DELETE CASCADE,
stack_name TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'pending' CHECK (status IN ('pending', 'in_progress', 'completed', 'failed', 'cancelled')),
requested_by TEXT NOT NULL,
requested_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
completed_at TIMESTAMPTZ
);
CREATE INDEX IF NOT EXISTS idx_remediations_tenant_stack ON remediations(tenant_id, stack_name);
ALTER TABLE remediations ENABLE ROW LEVEL SECURITY;
CREATE POLICY tenant_iso_remediations ON remediations
USING (tenant_id::text = current_setting('app.tenant_id', true));
-- Drift acceptances (from Slack interactive actions)
CREATE TABLE IF NOT EXISTS drift_acceptances (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL REFERENCES tenants(id) ON DELETE CASCADE,
report_id UUID NOT NULL REFERENCES drift_reports(id) ON DELETE CASCADE,
accepted_by TEXT NOT NULL,
accepted_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
reason TEXT
);
CREATE INDEX IF NOT EXISTS idx_drift_acceptances_tenant ON drift_acceptances(tenant_id, report_id);
ALTER TABLE drift_acceptances ENABLE ROW LEVEL SECURITY;
CREATE POLICY tenant_iso_drift_acceptances ON drift_acceptances
USING (tenant_id::text = current_setting('app.tenant_id', true));
-- Indexes for time-series analytics queries
CREATE INDEX IF NOT EXISTS idx_drift_reports_tenant_scanned ON drift_reports(tenant_id, scanned_at DESC);
CREATE INDEX IF NOT EXISTS idx_drift_reports_score_time ON drift_reports(tenant_id, stack_name, scanned_at DESC, drift_score);

View File

@@ -0,0 +1,105 @@
import { FastifyInstance } from 'fastify';
import { withTenant } from '../data/db.js';
export async function registerAnalyticsRoutes(app: FastifyInstance) {
// Drift score trends per stack over last 30 days
app.get('/api/v1/analytics/trends', async (request, reply) => {
const tenantId = (request as any).tenantId;
const pool = (app as any).pool;
const { stack_name, days } = request.query as { stack_name?: string; days?: string };
const lookbackDays = Math.min(parseInt(days ?? '30') || 30, 90);
const result = await withTenant(pool, tenantId, async (client) => {
if (stack_name) {
const { rows } = await client.query(
`SELECT stack_name, drift_score, total_resources, scanned_at::date as date,
drift_score as score
FROM drift_reports
WHERE tenant_id = $1 AND stack_name = $2
AND scanned_at >= NOW() - make_interval(days => $3)
ORDER BY scanned_at ASC`,
[tenantId, stack_name, lookbackDays],
);
return rows;
}
// All stacks: daily max drift score per stack
const { rows } = await client.query(
`SELECT stack_name, scanned_at::date as date,
MAX(drift_score) as max_score,
MIN(drift_score) as min_score,
AVG(drift_score)::numeric(5,2) as avg_score,
COUNT(*) as scan_count
FROM drift_reports
WHERE tenant_id = $1
AND scanned_at >= NOW() - make_interval(days => $2)
GROUP BY stack_name, scanned_at::date
ORDER BY date ASC, stack_name`,
[tenantId, lookbackDays],
);
return rows;
});
return reply.send({ days: lookbackDays, trends: result });
});
// Overall drift health summary
app.get('/api/v1/analytics/summary', async (request, reply) => {
const tenantId = (request as any).tenantId;
const pool = (app as any).pool;
const result = await withTenant(pool, tenantId, async (client) => {
// Latest report per stack
const latestRes = await client.query(
`SELECT DISTINCT ON (stack_name) stack_name, drift_score, total_resources, scanned_at
FROM drift_reports
WHERE tenant_id = $1
ORDER BY stack_name, scanned_at DESC`,
[tenantId],
);
const latest = latestRes.rows;
const totalStacks = latest.length;
const cleanStacks = latest.filter((r: any) => r.drift_score === 0).length;
const cleanPct = totalStacks > 0 ? Math.round((cleanStacks / totalStacks) * 100) : 100;
const avgScore = totalStacks > 0
? parseFloat((latest.reduce((sum: number, r: any) => sum + parseFloat(r.drift_score), 0) / totalStacks).toFixed(2))
: 0;
const worstOffenders = [...latest]
.sort((a: any, b: any) => b.drift_score - a.drift_score)
.slice(0, 5)
.map((r: any) => ({
stack_name: r.stack_name,
drift_score: parseFloat(r.drift_score),
total_resources: r.total_resources,
last_scanned: r.scanned_at,
}));
// 24h trend
const dayAgoRes = await client.query(
`SELECT AVG(drift_score)::numeric(5,2) as avg_score
FROM (
SELECT DISTINCT ON (stack_name) stack_name, drift_score
FROM drift_reports
WHERE tenant_id = $1 AND scanned_at < NOW() - INTERVAL '24 hours'
ORDER BY stack_name, scanned_at DESC
) prev`,
[tenantId],
);
const prevAvg = parseFloat(dayAgoRes.rows[0]?.avg_score ?? '0');
const trend = avgScore - prevAvg;
return {
total_stacks: totalStacks,
clean_stacks: cleanStacks,
clean_pct: cleanPct,
avg_drift_score: avgScore,
score_trend_24h: parseFloat(trend.toFixed(2)),
worst_offenders: worstOffenders,
};
});
return reply.send(result);
});
}

View File

@@ -0,0 +1,118 @@
import { FastifyInstance } from 'fastify';
import crypto from 'node:crypto';
import pino from 'pino';
const logger = pino({ name: 'slack-interactions' });
function verifySlackSignature(signingSecret: string, timestamp: string, body: string, signature: string): boolean {
const fiveMinAgo = Math.floor(Date.now() / 1000) - 300;
if (parseInt(timestamp) < fiveMinAgo) return false;
const baseString = `v0:${timestamp}:${body}`;
const hmac = crypto.createHmac('sha256', signingSecret).update(baseString).digest('hex');
const computed = `v0=${hmac}`;
return crypto.timingSafeEqual(Buffer.from(computed), Buffer.from(signature));
}
export async function registerSlackInteractionRoutes(app: FastifyInstance) {
// Must add raw body support for signature verification
app.addContentTypeParser('application/x-www-form-urlencoded', { parseAs: 'string' }, (req, body, done) => {
done(null, body);
});
app.post('/api/v1/slack/interactions', async (request, reply) => {
const config = (app as any).config;
const signingSecret = config.slackSigningSecret;
if (!signingSecret) {
logger.warn('SLACK_SIGNING_SECRET not configured');
return reply.status(500).send({ error: 'Slack signing secret not configured' });
}
const timestamp = request.headers['x-slack-request-timestamp'] as string;
const signature = request.headers['x-slack-signature'] as string;
const rawBody = request.body as string;
if (!timestamp || !signature || !rawBody) {
return reply.status(400).send({ error: 'Missing Slack headers' });
}
if (!verifySlackSignature(signingSecret, timestamp, rawBody, signature)) {
return reply.status(401).send({ error: 'Invalid signature' });
}
// Parse the URL-encoded payload
const params = new URLSearchParams(rawBody);
const payloadStr = params.get('payload');
if (!payloadStr) {
return reply.status(400).send({ error: 'Missing payload' });
}
const payload = JSON.parse(payloadStr);
const actions = payload.actions ?? [];
const userId = payload.user?.id ?? 'unknown';
const userName = payload.user?.name ?? 'unknown';
const pool = (app as any).pool;
for (const action of actions) {
const actionId: string = action.action_id ?? '';
if (actionId.startsWith('remediate_')) {
const stackName = actionId.replace('remediate_', '');
logger.info({ stackName, userId: userName }, 'Remediation requested via Slack');
// Find tenant from the stack — look up via notification_configs channel metadata
// For now, use the first tenant that has this stack
const tenantRes = await pool.query(
`SELECT DISTINCT tenant_id FROM drift_reports WHERE stack_name = $1 LIMIT 1`,
[stackName],
);
if (tenantRes.rows[0]) {
const tenantId = tenantRes.rows[0].tenant_id;
await pool.query(
`INSERT INTO remediations (tenant_id, stack_name, status, requested_by)
VALUES ($1, $2, 'pending', $3)`,
[tenantId, stackName, userName],
);
}
return reply.send({
response_type: 'ephemeral',
text: `✅ Remediation plan queued for *${stackName}*. Requested by <@${userId}>.`,
});
}
if (actionId.startsWith('accept_')) {
const stackName = actionId.replace('accept_', '');
logger.info({ stackName, userId: userName }, 'Drift acceptance via Slack');
// Find latest report for this stack
const reportRes = await pool.query(
`SELECT id, tenant_id FROM drift_reports
WHERE stack_name = $1
ORDER BY scanned_at DESC LIMIT 1`,
[stackName],
);
if (reportRes.rows[0]) {
const { id: reportId, tenant_id: tenantId } = reportRes.rows[0];
await pool.query(
`INSERT INTO drift_acceptances (tenant_id, report_id, accepted_by, reason)
VALUES ($1, $2, $3, $4)`,
[tenantId, reportId, userName, 'Accepted via Slack'],
);
}
return reply.send({
response_type: 'ephemeral',
text: `✅ Drift accepted for *${stackName}*. Marked by <@${userId}>.`,
});
}
}
// Unknown action — acknowledge
return reply.send({ response_type: 'ephemeral', text: 'Action received.' });
});
}

View File

@@ -11,6 +11,8 @@ const envSchema = z.object({
SQS_QUEUE_URL: z.string().optional(), SQS_QUEUE_URL: z.string().optional(),
S3_BUCKET: z.string().default('dd0c-drift-snapshots'), S3_BUCKET: z.string().default('dd0c-drift-snapshots'),
SLACK_WEBHOOK_URL: z.string().optional(), SLACK_WEBHOOK_URL: z.string().optional(),
SLACK_SIGNING_SECRET: z.string().optional(),
DAILY_DIGEST_CRON: z.string().default('0 9 * * *'),
}); });
const parsed = envSchema.safeParse(process.env); const parsed = envSchema.safeParse(process.env);
@@ -30,6 +32,8 @@ export const config = {
sqsQueueUrl: parsed.data.SQS_QUEUE_URL, sqsQueueUrl: parsed.data.SQS_QUEUE_URL,
s3Bucket: parsed.data.S3_BUCKET, s3Bucket: parsed.data.S3_BUCKET,
slackWebhookUrl: parsed.data.SLACK_WEBHOOK_URL, slackWebhookUrl: parsed.data.SLACK_WEBHOOK_URL,
slackSigningSecret: parsed.data.SLACK_SIGNING_SECRET,
dailyDigestCron: parsed.data.DAILY_DIGEST_CRON,
}; };
export type Config = typeof config; export type Config = typeof config;

View File

@@ -7,7 +7,10 @@ import { createPool } from './data/db.js';
import { createRedis } from './data/redis.js'; import { createRedis } from './data/redis.js';
import { registerProcessorRoutes } from './processor/routes.js'; import { registerProcessorRoutes } from './processor/routes.js';
import { registerApiRoutes } from './api/routes.js'; import { registerApiRoutes } from './api/routes.js';
import { registerAnalyticsRoutes } from './api/analytics.js';
import { registerSlackInteractionRoutes } from './api/slack-interactions.js';
import { authHook, decorateAuth, registerAuthRoutes, registerProtectedAuthRoutes } from './auth/middleware.js'; import { authHook, decorateAuth, registerAuthRoutes, registerProtectedAuthRoutes } from './auth/middleware.js';
import { runDailyDigest } from './notifications/daily-digest.js';
const app = Fastify({ const app = Fastify({
logger: { logger: {
@@ -38,16 +41,44 @@ async function start() {
// Auth routes (public - login/signup) // Auth routes (public - login/signup)
registerAuthRoutes(app, config.jwtSecret, pool); registerAuthRoutes(app, config.jwtSecret, pool);
// Slack interactions (public — verified via Slack signing secret, not JWT)
await app.register(async function slackRoutes(slackApp) {
await registerSlackInteractionRoutes(slackApp);
});
// Protected routes (auth required) // Protected routes (auth required)
app.register(async function protectedRoutes(protectedApp) { app.register(async function protectedRoutes(protectedApp) {
protectedApp.addHook('onRequest', authHook(config.jwtSecret, pool)); protectedApp.addHook('onRequest', authHook(config.jwtSecret, pool));
registerProtectedAuthRoutes(protectedApp, config.jwtSecret, pool); registerProtectedAuthRoutes(protectedApp, config.jwtSecret, pool);
await registerProcessorRoutes(protectedApp); await registerProcessorRoutes(protectedApp);
await registerApiRoutes(protectedApp); await registerApiRoutes(protectedApp);
await registerAnalyticsRoutes(protectedApp);
}); });
await app.listen({ port: config.port, host: '0.0.0.0' }); await app.listen({ port: config.port, host: '0.0.0.0' });
app.log.info(`dd0c/drift SaaS listening on :${config.port}`); app.log.info(`dd0c/drift SaaS listening on :${config.port}`);
// Schedule daily digest
scheduleDailyDigest(pool);
}
function scheduleDailyDigest(pool: ReturnType<typeof createPool>) {
// Simple interval-based scheduler: run every hour, check if it's the right time
// In production, use a proper cron library or external scheduler
const DIGEST_HOUR = 9; // 9 AM UTC by default
const check = () => {
const now = new Date();
if (now.getUTCHours() === DIGEST_HOUR && now.getUTCMinutes() < 5) {
runDailyDigest(pool).catch((err) => {
app.log.error({ error: (err as Error).message }, 'Daily digest failed');
});
}
};
// Check every 5 minutes
setInterval(check, 5 * 60 * 1000);
app.log.info({ digestHour: DIGEST_HOUR }, 'Daily digest scheduler started');
} }
start().catch((err) => { start().catch((err) => {

View File

@@ -0,0 +1,172 @@
import type pg from 'pg';
import pino from 'pino';
import { sendSlackNotification } from './service.js';
const logger = pino({ name: 'daily-digest' });
interface DigestData {
tenantId: string;
totalStacks: number;
totalDrifted: number;
worstDriftScores: Array<{ stack_name: string; drift_score: number }>;
newDrifts: number;
resolvedDrifts: number;
}
function buildDigestBlocks(digest: DigestData) {
const cleanPct = digest.totalStacks > 0
? Math.round(((digest.totalStacks - digest.totalDrifted) / digest.totalStacks) * 100)
: 100;
const worstList = digest.worstDriftScores
.slice(0, 5)
.map((w) => `• *${w.stack_name}*: ${w.drift_score}/100`)
.join('\n') || '_None_';
return {
blocks: [
{
type: 'header',
text: { type: 'plain_text', text: '📊 Daily Drift Digest' },
},
{
type: 'section',
fields: [
{ type: 'mrkdwn', text: `*Stacks Scanned:* ${digest.totalStacks}` },
{ type: 'mrkdwn', text: `*Currently Drifted:* ${digest.totalDrifted}` },
{ type: 'mrkdwn', text: `*Clean:* ${cleanPct}%` },
{ type: 'mrkdwn', text: `*New Drifts:* ${digest.newDrifts}` },
],
},
{
type: 'section',
text: { type: 'mrkdwn', text: `*Resolved (last 24h):* ${digest.resolvedDrifts}` },
},
{
type: 'section',
text: { type: 'mrkdwn', text: `*Worst Offenders:*\n${worstList}` },
},
{
type: 'context',
elements: [
{ type: 'mrkdwn', text: `Report generated at ${new Date().toISOString()}` },
],
},
],
};
}
async function gatherDigest(pool: pg.Pool, tenantId: string): Promise<DigestData> {
const client = await pool.connect();
try {
await client.query('BEGIN');
await client.query("SELECT set_config('app.tenant_id', $1, true)", [tenantId]);
// Total distinct stacks
const stacksRes = await client.query(
'SELECT COUNT(DISTINCT stack_name) as cnt FROM drift_reports WHERE tenant_id = $1',
[tenantId],
);
const totalStacks = parseInt(stacksRes.rows[0]?.cnt ?? '0');
// Currently drifted (latest report per stack with score > 0)
const driftedRes = await client.query(
`SELECT COUNT(*) as cnt FROM (
SELECT DISTINCT ON (stack_name) stack_name, drift_score
FROM drift_reports WHERE tenant_id = $1
ORDER BY stack_name, scanned_at DESC
) latest WHERE drift_score > 0`,
[tenantId],
);
const totalDrifted = parseInt(driftedRes.rows[0]?.cnt ?? '0');
// Worst drift scores (latest per stack, top 5)
const worstRes = await client.query(
`SELECT DISTINCT ON (stack_name) stack_name, drift_score
FROM drift_reports WHERE tenant_id = $1
ORDER BY stack_name, scanned_at DESC`,
[tenantId],
);
const worstDriftScores = worstRes.rows
.filter((r: any) => r.drift_score > 0)
.sort((a: any, b: any) => b.drift_score - a.drift_score)
.slice(0, 5);
// New drifts in last 24h (reports where previous report for same stack had score 0)
const newRes = await client.query(
`SELECT COUNT(*) as cnt FROM drift_reports
WHERE tenant_id = $1 AND drift_score > 0
AND scanned_at >= NOW() - INTERVAL '24 hours'`,
[tenantId],
);
const newDrifts = parseInt(newRes.rows[0]?.cnt ?? '0');
// Resolved: stacks where latest report is clean but had drift in last 24h
const resolvedRes = await client.query(
`SELECT COUNT(*) as cnt FROM (
SELECT DISTINCT ON (stack_name) stack_name, drift_score
FROM drift_reports WHERE tenant_id = $1
ORDER BY stack_name, scanned_at DESC
) latest
WHERE drift_score = 0
AND stack_name IN (
SELECT DISTINCT stack_name FROM drift_reports
WHERE tenant_id = $1 AND drift_score > 0
AND scanned_at >= NOW() - INTERVAL '48 hours'
AND scanned_at < NOW() - INTERVAL '0 hours'
)`,
[tenantId],
);
const resolvedDrifts = parseInt(resolvedRes.rows[0]?.cnt ?? '0');
await client.query('COMMIT');
return { tenantId, totalStacks, totalDrifted, worstDriftScores, newDrifts, resolvedDrifts };
} catch (err) {
await client.query('ROLLBACK');
throw err;
} finally {
client.release();
}
}
export async function runDailyDigest(pool: pg.Pool): Promise<void> {
logger.info('Starting daily digest run');
// Get all tenants with Slack notification configured
const tenantsRes = await pool.query(
`SELECT t.id as tenant_id, nc.config->>'webhook_url' as webhook_url
FROM tenants t
JOIN notification_configs nc ON nc.tenant_id = t.id
WHERE nc.channel = 'slack' AND nc.enabled = true`,
);
for (const row of tenantsRes.rows) {
try {
const digest = await gatherDigest(pool, row.tenant_id);
if (digest.totalStacks === 0) {
logger.info({ tenantId: row.tenant_id }, 'No stacks — skipping digest');
continue;
}
const payload = buildDigestBlocks(digest);
const resp = await fetch(row.webhook_url, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(payload),
});
if (!resp.ok) {
const text = await resp.text();
logger.error({ tenantId: row.tenant_id, status: resp.status, text }, 'Digest Slack send failed');
} else {
logger.info({ tenantId: row.tenant_id }, 'Daily digest sent');
}
} catch (err) {
logger.error({ tenantId: row.tenant_id, error: (err as Error).message }, 'Digest failed for tenant');
}
}
logger.info('Daily digest run complete');
}

View File

@@ -0,0 +1,96 @@
import type Redis from 'ioredis';
import pino from 'pino';
const logger = pino({ name: 'chunk-assembler' });
const CHUNK_TTL = 600; // 10 minutes
function chunkKey(reportId: string, index: number): string {
return `chunk:${reportId}:${index}`;
}
function metaKey(reportId: string): string {
return `chunk:${reportId}:meta`;
}
export interface ChunkInput {
report_id: string;
chunk_index: number;
total_chunks: number;
data: Record<string, unknown>;
}
export interface AssemblyResult {
complete: boolean;
received: number;
total: number;
assembled?: Record<string, unknown>;
}
export async function storeChunk(redis: Redis, chunk: ChunkInput): Promise<AssemblyResult> {
const { report_id, chunk_index, total_chunks, data } = chunk;
// Store chunk data with TTL
const key = chunkKey(report_id, chunk_index);
await redis.setex(key, CHUNK_TTL, JSON.stringify(data));
// Track metadata: total_chunks and received set
const meta = metaKey(report_id);
await redis.hset(meta, 'total', total_chunks.toString());
await redis.hset(meta, `idx:${chunk_index}`, '1');
await redis.expire(meta, CHUNK_TTL);
// Count received chunks
const fields = await redis.hgetall(meta);
const received = Object.keys(fields).filter((k) => k.startsWith('idx:')).length;
logger.info({ report_id, chunk_index, received, total_chunks }, 'Chunk stored');
if (received < total_chunks) {
return { complete: false, received, total: total_chunks };
}
// All chunks received — reassemble
const assembled = await reassemble(redis, report_id, total_chunks);
await cleanup(redis, report_id, total_chunks);
return { complete: true, received, total: total_chunks, assembled };
}
async function reassemble(redis: Redis, reportId: string, totalChunks: number): Promise<Record<string, unknown>> {
const parts: Record<string, unknown>[] = [];
for (let i = 0; i < totalChunks; i++) {
const raw = await redis.get(chunkKey(reportId, i));
if (!raw) throw new Error(`Missing chunk ${i} for report ${reportId}`);
parts.push(JSON.parse(raw));
}
// First chunk is the base, subsequent chunks append drifted_resources
const base = { ...parts[0] };
const allResources: unknown[] = (base.drifted_resources as unknown[]) ?? [];
for (let i = 1; i < parts.length; i++) {
const chunk = parts[i];
if (Array.isArray(chunk.drifted_resources)) {
allResources.push(...chunk.drifted_resources);
}
for (const [k, v] of Object.entries(chunk)) {
if (k !== 'drifted_resources') {
base[k] = v;
}
}
}
base.drifted_resources = allResources;
logger.info({ reportId, totalChunks, resources: allResources.length }, 'Report reassembled');
return base;
}
async function cleanup(redis: Redis, reportId: string, totalChunks: number): Promise<void> {
const keys = [metaKey(reportId)];
for (let i = 0; i < totalChunks; i++) {
keys.push(chunkKey(reportId, i));
}
await redis.del(...keys);
}

View File

@@ -0,0 +1,141 @@
import crypto from 'node:crypto';
// --- Provider → Canonical resource type mapping ---
const RESOURCE_TYPE_MAP: Record<string, string> = {
// Compute
aws_instance: 'compute/instance',
aws_launch_template: 'compute/launch-template',
google_compute_instance: 'compute/instance',
google_compute_instance_template: 'compute/launch-template',
azurerm_virtual_machine: 'compute/instance',
azurerm_linux_virtual_machine: 'compute/instance',
azurerm_windows_virtual_machine: 'compute/instance',
// Networking
aws_vpc: 'network/vpc',
google_compute_network: 'network/vpc',
azurerm_virtual_network: 'network/vpc',
aws_subnet: 'network/subnet',
google_compute_subnetwork: 'network/subnet',
azurerm_subnet: 'network/subnet',
aws_security_group: 'network/firewall',
google_compute_firewall: 'network/firewall',
azurerm_network_security_group: 'network/firewall',
aws_lb: 'network/load-balancer',
google_compute_forwarding_rule: 'network/load-balancer',
azurerm_lb: 'network/load-balancer',
// Storage
aws_s3_bucket: 'storage/bucket',
google_storage_bucket: 'storage/bucket',
azurerm_storage_account: 'storage/bucket',
aws_ebs_volume: 'storage/disk',
google_compute_disk: 'storage/disk',
azurerm_managed_disk: 'storage/disk',
// Database
aws_db_instance: 'database/instance',
google_sql_database_instance: 'database/instance',
azurerm_mssql_server: 'database/instance',
azurerm_postgresql_server: 'database/instance',
// IAM
aws_iam_role: 'iam/role',
google_project_iam_member: 'iam/role',
azurerm_role_assignment: 'iam/role',
aws_iam_policy: 'iam/policy',
google_project_iam_policy: 'iam/policy',
// DNS
aws_route53_record: 'dns/record',
google_dns_record_set: 'dns/record',
azurerm_dns_a_record: 'dns/record',
// Kubernetes
kubernetes_deployment: 'k8s/deployment',
kubernetes_service: 'k8s/service',
kubernetes_namespace: 'k8s/namespace',
kubernetes_config_map: 'k8s/configmap',
kubernetes_secret: 'k8s/secret',
};
const PROVIDER_PREFIXES = ['aws_', 'google_', 'azurerm_', 'gcp_'];
export interface DriftedResource {
resource_type: string;
resource_address: string;
module?: string;
provider: string;
severity: string;
diffs: Array<{
attribute_name: string;
old_value: string;
new_value: string;
sensitive: boolean;
}>;
}
export interface CanonicalResource {
canonical_type: string;
original_type: string;
resource_address: string;
provider: string;
severity: string;
fingerprint: string;
normalized_diffs: Array<{
attribute: string;
original_attribute: string;
old_value: string;
new_value: string;
sensitive: boolean;
}>;
}
function normalizeResourceType(resourceType: string): string {
return RESOURCE_TYPE_MAP[resourceType] ?? `other/${resourceType}`;
}
function normalizeAttributeName(attr: string): string {
let normalized = attr;
for (const prefix of PROVIDER_PREFIXES) {
if (normalized.startsWith(prefix)) {
normalized = normalized.slice(prefix.length);
break;
}
}
return normalized.replace(/\./g, '/');
}
function computeFingerprint(resourceAddress: string, diffKeys: string[]): string {
const sorted = [...diffKeys].sort();
const input = `${resourceAddress}:${sorted.join(',')}`;
return crypto.createHash('sha256').update(input).digest('hex').slice(0, 16);
}
export function normalizeResource(resource: DriftedResource): CanonicalResource {
const normalizedDiffs = resource.diffs.map((d) => ({
attribute: normalizeAttributeName(d.attribute_name),
original_attribute: d.attribute_name,
old_value: d.old_value,
new_value: d.new_value,
sensitive: d.sensitive,
}));
const diffKeys = normalizedDiffs.map((d) => d.attribute);
const fingerprint = computeFingerprint(resource.resource_address, diffKeys);
return {
canonical_type: normalizeResourceType(resource.resource_type),
original_type: resource.resource_type,
resource_address: resource.resource_address,
provider: resource.provider,
severity: resource.severity,
fingerprint,
normalized_diffs: normalizedDiffs,
};
}
export function normalizeReport(driftedResources: DriftedResource[]): CanonicalResource[] {
return driftedResources.map(normalizeResource);
}

View File

@@ -1,6 +1,8 @@
import { FastifyInstance } from 'fastify'; import { FastifyInstance } from 'fastify';
import { z } from 'zod'; import { z } from 'zod';
import { withTenant } from '../data/db.js'; import { withTenant } from '../data/db.js';
import { normalizeReport } from './normalizer.js';
import { storeChunk } from './chunk-assembler.js';
const driftReportSchema = z.object({ const driftReportSchema = z.object({
stack_name: z.string(), stack_name: z.string(),
@@ -27,6 +29,60 @@ const driftReportSchema = z.object({
nonce: z.string(), nonce: z.string(),
}); });
const chunkSchema = z.object({
report_id: z.string().min(1),
chunk_index: z.number().int().min(0),
total_chunks: z.number().int().min(1).max(100),
data: z.record(z.unknown()),
});
async function processDriftReport(app: FastifyInstance, tenantId: string, report: z.infer<typeof driftReportSchema>) {
const pool = (app as any).pool;
// Normalize drifted resources into canonical schema
const canonicalResources = report.drifted_resources?.length
? normalizeReport(report.drifted_resources)
: [];
await withTenant(pool, tenantId, async (client) => {
await client.query(
`INSERT INTO drift_reports (tenant_id, stack_name, stack_fingerprint, agent_version, scanned_at, state_serial, lineage, total_resources, drift_score, nonce, raw_report, canonical_resources)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)`,
[tenantId, report.stack_name, report.stack_fingerprint, report.agent_version, report.scanned_at, report.state_serial, report.lineage, report.total_resources, report.drift_score, report.nonce, JSON.stringify(report), JSON.stringify(canonicalResources)]
);
});
// Trigger notification if drift score exceeds threshold
if (report.drift_score > 0) {
try {
const { dispatchNotifications, shouldNotify } = await import('../notifications/service.js');
const maxSeverity = report.drifted_resources?.some((r) => r.severity === 'critical') ? 'critical'
: report.drifted_resources?.some((r) => r.severity === 'high') ? 'high' : 'medium';
const channelResult = await withTenant(pool, tenantId, async (client) => {
return client.query('SELECT * FROM notification_configs WHERE enabled = true');
});
const channels = channelResult.rows.filter((ch: any) => shouldNotify(ch, report.drift_score, maxSeverity));
if (channels.length > 0) {
await dispatchNotifications(channels, {
tenantId,
stackName: report.stack_name,
driftScore: report.drift_score,
totalResources: report.total_resources,
totalDrifted: report.drifted_resources?.length ?? 0,
criticalCount: report.drifted_resources?.filter((r) => r.severity === 'critical').length ?? 0,
highCount: report.drifted_resources?.filter((r) => r.severity === 'high').length ?? 0,
reportUrl: `https://drift.dd0c.dev/reports/${report.stack_fingerprint}`,
}, maxSeverity);
}
} catch (err) {
app.log.warn({ tenantId, error: (err as Error).message }, 'Notification dispatch failed (non-fatal)');
}
}
}
export async function registerProcessorRoutes(app: FastifyInstance) { export async function registerProcessorRoutes(app: FastifyInstance) {
app.post('/v1/ingest/drift', async (request, reply) => { app.post('/v1/ingest/drift', async (request, reply) => {
const parsed = driftReportSchema.safeParse(request.body); const parsed = driftReportSchema.safeParse(request.body);
@@ -46,48 +102,55 @@ export async function registerProcessorRoutes(app: FastifyInstance) {
} }
await redis.setex(nonceKey, 86400, '1'); // 24h TTL await redis.setex(nonceKey, 86400, '1'); // 24h TTL
// Persist await processDriftReport(app, tenantId, report);
const pool = (app as any).pool;
await withTenant(pool, tenantId, async (client) => {
await client.query(
`INSERT INTO drift_reports (tenant_id, stack_name, stack_fingerprint, agent_version, scanned_at, state_serial, lineage, total_resources, drift_score, nonce, raw_report)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)`,
[tenantId, report.stack_name, report.stack_fingerprint, report.agent_version, report.scanned_at, report.state_serial, report.lineage, report.total_resources, report.drift_score, report.nonce, JSON.stringify(report)]
);
});
// Trigger notification if drift score exceeds threshold
if (report.drift_score > 0) {
try {
const { dispatchNotifications, shouldNotify } = await import('../notifications/service.js');
const { withTenant } = await import('../data/db.js');
const maxSeverity = report.drifted_resources?.some((r: any) => r.severity === 'critical') ? 'critical'
: report.drifted_resources?.some((r: any) => r.severity === 'high') ? 'high' : 'medium';
// Fetch tenant notification channels
const channelResult = await withTenant(pool, tenantId, async (client: any) => {
return client.query('SELECT * FROM notification_channels WHERE enabled = true');
});
const channels = channelResult.rows.filter((ch: any) => shouldNotify(ch, report.drift_score, maxSeverity));
if (channels.length > 0) {
await dispatchNotifications(channels, {
tenantId,
stackName: report.stack_name,
driftScore: report.drift_score,
totalResources: report.total_resources,
totalDrifted: report.drifted_resources?.length ?? 0,
criticalCount: report.drifted_resources?.filter((r: any) => r.severity === 'critical').length ?? 0,
highCount: report.drifted_resources?.filter((r: any) => r.severity === 'high').length ?? 0,
reportUrl: `https://drift.dd0c.dev/reports/${report.stack_fingerprint}`,
}, maxSeverity);
}
} catch (err) {
app.log.warn({ tenantId, error: (err as Error).message }, 'Notification dispatch failed (non-fatal)');
}
}
return reply.status(201).send({ status: 'accepted' }); return reply.status(201).send({ status: 'accepted' });
}); });
// Chunked report ingestion
app.post('/v1/ingest/drift/chunk', async (request, reply) => {
const parsed = chunkSchema.safeParse(request.body);
if (!parsed.success) {
return reply.status(400).send({ error: 'Invalid chunk', details: parsed.error.flatten() });
}
const tenantId = (request as any).tenantId;
const redis = (app as any).redis;
const chunk = parsed.data;
const result = await storeChunk(redis, chunk);
if (!result.complete) {
return reply.status(202).send({
status: 'chunk_received',
received: result.received,
total: result.total,
});
}
// All chunks received — validate and process the assembled report
const assembled = result.assembled!;
const reportParsed = driftReportSchema.safeParse(assembled);
if (!reportParsed.success) {
return reply.status(400).send({ error: 'Assembled report invalid', details: reportParsed.error.flatten() });
}
const report = reportParsed.data;
// Nonce replay prevention on assembled report
const nonceKey = `nonce:${report.nonce}`;
const exists = await redis.exists(nonceKey);
if (exists) {
return reply.status(409).send({ error: 'Nonce already used (replay rejected)' });
}
await redis.setex(nonceKey, 86400, '1');
await processDriftReport(app, tenantId, report);
return reply.status(201).send({
status: 'assembled_and_accepted',
received: result.received,
total: result.total,
});
});
} }