Scaffold dd0c/drift SaaS backend: Fastify, RLS, ingestion, dashboard API
- Fastify server with Zod validation, pino logging, CORS/helmet - Drift report ingestion endpoint with nonce replay prevention - Dashboard API: stacks list, drift history, report detail, summary stats - PostgreSQL schema with RLS: tenants, users, agent_keys, drift_reports, remediation_actions - withTenant() helper for safe connection pool tenant context management - Config via Zod-validated env vars
This commit is contained in:
102
products/02-iac-drift-detection/saas/src/api/routes.ts
Normal file
102
products/02-iac-drift-detection/saas/src/api/routes.ts
Normal file
@@ -0,0 +1,102 @@
|
||||
import { FastifyInstance } from 'fastify';
|
||||
import { withTenant } from '../data/db.js';
|
||||
|
||||
export async function registerApiRoutes(app: FastifyInstance) {
|
||||
// List stacks
|
||||
app.get('/api/v1/stacks', async (request, reply) => {
|
||||
const tenantId = (request as any).tenantId;
|
||||
const pool = (app as any).pool;
|
||||
|
||||
const result = await withTenant(pool, tenantId, async (client) => {
|
||||
const { rows } = await client.query(
|
||||
`SELECT DISTINCT ON (stack_name)
|
||||
stack_name, stack_fingerprint, drift_score, total_resources, scanned_at, state_serial
|
||||
FROM drift_reports
|
||||
WHERE tenant_id = $1
|
||||
ORDER BY stack_name, scanned_at DESC`,
|
||||
[tenantId]
|
||||
);
|
||||
return rows;
|
||||
});
|
||||
|
||||
return reply.send(result);
|
||||
});
|
||||
|
||||
// Get stack drift history
|
||||
app.get('/api/v1/stacks/:stackName/history', async (request, reply) => {
|
||||
const tenantId = (request as any).tenantId;
|
||||
const { stackName } = request.params as { stackName: string };
|
||||
const pool = (app as any).pool;
|
||||
|
||||
const result = await withTenant(pool, tenantId, async (client) => {
|
||||
const { rows } = await client.query(
|
||||
`SELECT id, drift_score, total_resources, scanned_at, state_serial,
|
||||
(raw_report->'drifted_resources') as drifted_resources
|
||||
FROM drift_reports
|
||||
WHERE tenant_id = $1 AND stack_name = $2
|
||||
ORDER BY scanned_at DESC
|
||||
LIMIT 50`,
|
||||
[tenantId, stackName]
|
||||
);
|
||||
return rows;
|
||||
});
|
||||
|
||||
return reply.send(result);
|
||||
});
|
||||
|
||||
// Get single drift report
|
||||
app.get('/api/v1/reports/:reportId', async (request, reply) => {
|
||||
const tenantId = (request as any).tenantId;
|
||||
const { reportId } = request.params as { reportId: string };
|
||||
const pool = (app as any).pool;
|
||||
|
||||
const result = await withTenant(pool, tenantId, async (client) => {
|
||||
const { rows } = await client.query(
|
||||
`SELECT * FROM drift_reports WHERE tenant_id = $1 AND id = $2`,
|
||||
[tenantId, reportId]
|
||||
);
|
||||
return rows[0] ?? null;
|
||||
});
|
||||
|
||||
if (!result) {
|
||||
return reply.status(404).send({ error: 'Not found' });
|
||||
}
|
||||
|
||||
return reply.send(result);
|
||||
});
|
||||
|
||||
// Dashboard summary
|
||||
app.get('/api/v1/dashboard', async (request, reply) => {
|
||||
const tenantId = (request as any).tenantId;
|
||||
const pool = (app as any).pool;
|
||||
|
||||
const result = await withTenant(pool, tenantId, async (client) => {
|
||||
const stacks = await client.query(
|
||||
`SELECT COUNT(DISTINCT stack_name) as stack_count FROM drift_reports WHERE tenant_id = $1`,
|
||||
[tenantId]
|
||||
);
|
||||
const drifted = await client.query(
|
||||
`SELECT COUNT(DISTINCT stack_name) as drifted_count
|
||||
FROM drift_reports
|
||||
WHERE tenant_id = $1 AND drift_score > 0
|
||||
AND scanned_at = (SELECT MAX(scanned_at) FROM drift_reports r2 WHERE r2.stack_name = drift_reports.stack_name AND r2.tenant_id = $1)`,
|
||||
[tenantId]
|
||||
);
|
||||
const critical = await client.query(
|
||||
`SELECT COUNT(*) as critical_count
|
||||
FROM drift_reports
|
||||
WHERE tenant_id = $1 AND drift_score >= 80
|
||||
AND scanned_at >= NOW() - INTERVAL '24 hours'`,
|
||||
[tenantId]
|
||||
);
|
||||
|
||||
return {
|
||||
total_stacks: parseInt(stacks.rows[0]?.stack_count ?? '0'),
|
||||
drifted_stacks: parseInt(drifted.rows[0]?.drifted_count ?? '0'),
|
||||
critical_last_24h: parseInt(critical.rows[0]?.critical_count ?? '0'),
|
||||
};
|
||||
});
|
||||
|
||||
return reply.send(result);
|
||||
});
|
||||
}
|
||||
35
products/02-iac-drift-detection/saas/src/config/index.ts
Normal file
35
products/02-iac-drift-detection/saas/src/config/index.ts
Normal file
@@ -0,0 +1,35 @@
|
||||
import { z } from 'zod';
|
||||
|
||||
const envSchema = z.object({
|
||||
NODE_ENV: z.enum(['development', 'production', 'test']).default('development'),
|
||||
PORT: z.coerce.number().default(3000),
|
||||
DATABASE_URL: z.string().default('postgres://dd0c:dd0c@localhost:5432/dd0c_drift'),
|
||||
REDIS_URL: z.string().default('redis://localhost:6379'),
|
||||
JWT_SECRET: z.string().default('dev-secret-change-me'),
|
||||
CORS_ORIGIN: z.string().default('*'),
|
||||
LOG_LEVEL: z.string().default('info'),
|
||||
SQS_QUEUE_URL: z.string().optional(),
|
||||
S3_BUCKET: z.string().default('dd0c-drift-snapshots'),
|
||||
SLACK_WEBHOOK_URL: z.string().optional(),
|
||||
});
|
||||
|
||||
const parsed = envSchema.safeParse(process.env);
|
||||
if (!parsed.success) {
|
||||
console.error('Invalid environment:', parsed.error.flatten());
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
export const config = {
|
||||
nodeEnv: parsed.data.NODE_ENV,
|
||||
port: parsed.data.PORT,
|
||||
databaseUrl: parsed.data.DATABASE_URL,
|
||||
redisUrl: parsed.data.REDIS_URL,
|
||||
jwtSecret: parsed.data.JWT_SECRET,
|
||||
corsOrigin: parsed.data.CORS_ORIGIN,
|
||||
logLevel: parsed.data.LOG_LEVEL,
|
||||
sqsQueueUrl: parsed.data.SQS_QUEUE_URL,
|
||||
s3Bucket: parsed.data.S3_BUCKET,
|
||||
slackWebhookUrl: parsed.data.SLACK_WEBHOOK_URL,
|
||||
};
|
||||
|
||||
export type Config = typeof config;
|
||||
51
products/02-iac-drift-detection/saas/src/data/db.ts
Normal file
51
products/02-iac-drift-detection/saas/src/data/db.ts
Normal file
@@ -0,0 +1,51 @@
|
||||
import pg from 'pg';
|
||||
|
||||
export function createPool(connectionString: string): pg.Pool {
|
||||
return new pg.Pool({
|
||||
connectionString,
|
||||
max: 20,
|
||||
idleTimeoutMillis: 30000,
|
||||
connectionTimeoutMillis: 5000,
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Set tenant context on a pooled connection for RLS.
|
||||
* MUST be called before any query in a multi-tenant context.
|
||||
* MUST be cleared when returning the connection to the pool.
|
||||
*/
|
||||
export async function setTenantContext(client: pg.PoolClient, tenantId: string): Promise<void> {
|
||||
await client.query('SET LOCAL app.tenant_id = $1', [tenantId]);
|
||||
}
|
||||
|
||||
/**
|
||||
* Clear tenant context — call this in a finally block before releasing the client.
|
||||
*/
|
||||
export async function clearTenantContext(client: pg.PoolClient): Promise<void> {
|
||||
await client.query('RESET app.tenant_id');
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute a query within a tenant-scoped transaction.
|
||||
* Handles SET LOCAL + RESET automatically to prevent RLS leakage via connection pooling.
|
||||
*/
|
||||
export async function withTenant<T>(
|
||||
pool: pg.Pool,
|
||||
tenantId: string,
|
||||
fn: (client: pg.PoolClient) => Promise<T>,
|
||||
): Promise<T> {
|
||||
const client = await pool.connect();
|
||||
try {
|
||||
await client.query('BEGIN');
|
||||
await setTenantContext(client, tenantId);
|
||||
const result = await fn(client);
|
||||
await client.query('COMMIT');
|
||||
return result;
|
||||
} catch (err) {
|
||||
await client.query('ROLLBACK');
|
||||
throw err;
|
||||
} finally {
|
||||
await clearTenantContext(client);
|
||||
client.release();
|
||||
}
|
||||
}
|
||||
10
products/02-iac-drift-detection/saas/src/data/redis.ts
Normal file
10
products/02-iac-drift-detection/saas/src/data/redis.ts
Normal file
@@ -0,0 +1,10 @@
|
||||
import Redis from 'ioredis';
|
||||
|
||||
export function createRedis(url: string): Redis {
|
||||
return new Redis(url, {
|
||||
maxRetriesPerRequest: 3,
|
||||
retryStrategy(times) {
|
||||
return Math.min(times * 200, 3000);
|
||||
},
|
||||
});
|
||||
}
|
||||
48
products/02-iac-drift-detection/saas/src/index.ts
Normal file
48
products/02-iac-drift-detection/saas/src/index.ts
Normal file
@@ -0,0 +1,48 @@
|
||||
import Fastify from 'fastify';
|
||||
import cors from '@fastify/cors';
|
||||
import helmet from '@fastify/helmet';
|
||||
import { config } from './config/index.js';
|
||||
import { registerProcessorRoutes } from './processor/routes.js';
|
||||
import { registerApiRoutes } from './api/routes.js';
|
||||
import { createPool } from './data/db.js';
|
||||
import { createRedis } from './data/redis.js';
|
||||
|
||||
const app = Fastify({
|
||||
logger: {
|
||||
level: config.logLevel,
|
||||
transport: config.nodeEnv === 'development'
|
||||
? { target: 'pino-pretty' }
|
||||
: undefined,
|
||||
},
|
||||
});
|
||||
|
||||
async function start() {
|
||||
await app.register(cors, { origin: config.corsOrigin });
|
||||
await app.register(helmet);
|
||||
|
||||
// Data layer
|
||||
const pool = createPool(config.databaseUrl);
|
||||
const redis = createRedis(config.redisUrl);
|
||||
|
||||
// Decorate fastify instance
|
||||
app.decorate('pool', pool);
|
||||
app.decorate('redis', redis);
|
||||
app.decorate('config', config);
|
||||
|
||||
// Routes
|
||||
await registerProcessorRoutes(app);
|
||||
await registerApiRoutes(app);
|
||||
|
||||
// Health
|
||||
app.get('/health', async () => ({ status: 'ok' }));
|
||||
|
||||
await app.listen({ port: config.port, host: '0.0.0.0' });
|
||||
app.log.info(`dd0c/drift SaaS listening on :${config.port}`);
|
||||
}
|
||||
|
||||
start().catch((err) => {
|
||||
console.error('Failed to start:', err);
|
||||
process.exit(1);
|
||||
});
|
||||
|
||||
export { app };
|
||||
64
products/02-iac-drift-detection/saas/src/processor/routes.ts
Normal file
64
products/02-iac-drift-detection/saas/src/processor/routes.ts
Normal file
@@ -0,0 +1,64 @@
|
||||
import { FastifyInstance } from 'fastify';
|
||||
import { z } from 'zod';
|
||||
import { withTenant } from '../data/db.js';
|
||||
|
||||
const driftReportSchema = z.object({
|
||||
stack_name: z.string(),
|
||||
stack_fingerprint: z.string(),
|
||||
agent_version: z.string(),
|
||||
scanned_at: z.string(),
|
||||
state_serial: z.number(),
|
||||
lineage: z.string(),
|
||||
total_resources: z.number(),
|
||||
drifted_resources: z.array(z.object({
|
||||
resource_type: z.string(),
|
||||
resource_address: z.string(),
|
||||
module: z.string().optional(),
|
||||
provider: z.string(),
|
||||
severity: z.enum(['critical', 'high', 'medium', 'low']),
|
||||
diffs: z.array(z.object({
|
||||
attribute_name: z.string(),
|
||||
old_value: z.string(),
|
||||
new_value: z.string(),
|
||||
sensitive: z.boolean(),
|
||||
})),
|
||||
})),
|
||||
drift_score: z.number(),
|
||||
nonce: z.string(),
|
||||
});
|
||||
|
||||
export async function registerProcessorRoutes(app: FastifyInstance) {
|
||||
app.post('/v1/ingest/drift', async (request, reply) => {
|
||||
const parsed = driftReportSchema.safeParse(request.body);
|
||||
if (!parsed.success) {
|
||||
return reply.status(400).send({ error: 'Invalid drift report', details: parsed.error.flatten() });
|
||||
}
|
||||
|
||||
const report = parsed.data;
|
||||
const tenantId = (request as any).tenantId;
|
||||
|
||||
// Nonce replay prevention
|
||||
const redis = (app as any).redis;
|
||||
const nonceKey = `nonce:${report.nonce}`;
|
||||
const exists = await redis.exists(nonceKey);
|
||||
if (exists) {
|
||||
return reply.status(409).send({ error: 'Nonce already used (replay rejected)' });
|
||||
}
|
||||
await redis.setex(nonceKey, 86400, '1'); // 24h TTL
|
||||
|
||||
// Persist
|
||||
const pool = (app as any).pool;
|
||||
await withTenant(pool, tenantId, async (client) => {
|
||||
await client.query(
|
||||
`INSERT INTO drift_reports (tenant_id, stack_name, stack_fingerprint, agent_version, scanned_at, state_serial, lineage, total_resources, drift_score, nonce, raw_report)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)`,
|
||||
[tenantId, report.stack_name, report.stack_fingerprint, report.agent_version, report.scanned_at, report.state_serial, report.lineage, report.total_resources, report.drift_score, report.nonce, JSON.stringify(report)]
|
||||
);
|
||||
});
|
||||
|
||||
// TODO: Trigger notification if drift_score > threshold
|
||||
// TODO: Trigger remediation workflow if auto-remediate enabled
|
||||
|
||||
return reply.status(201).send({ status: 'accepted' });
|
||||
});
|
||||
}
|
||||
Reference in New Issue
Block a user