import pino from 'pino'; import { Redis } from 'ioredis'; import { Pool } from 'pg'; import { AwsDiscoveryScanner } from './aws-scanner.js'; import { GitHubDiscoveryScanner } from './github-scanner.js'; import { CloudFormationDiscoveryScanner } from './cloudformation-scanner.js'; import { ApiGatewayDiscoveryScanner } from './apigateway-scanner.js'; import { CatalogService } from '../catalog/service.js'; import { withTenant } from '../data/db.js'; const logger = pino({ name: 'scheduled-discovery' }); export type ScannerType = 'aws' | 'github' | 'cloudformation' | 'apigateway'; export class ScheduledDiscovery { private redis: Redis; private pool: Pool; private lockTtlMs: number; constructor(redis: Redis, pool: Pool, lockTtlMs = 10 * 60 * 1000) { this.redis = redis; this.pool = pool; this.lockTtlMs = lockTtlMs; } async acquireLock(tenantId: string, scanner: string): Promise { const key = `scan_lock:${tenantId}:${scanner}`; const result = await this.redis.set(key, Date.now().toString(), 'PX', this.lockTtlMs, 'NX'); return result === 'OK'; } async releaseLock(tenantId: string, scanner: string): Promise { const key = `scan_lock:${tenantId}:${scanner}`; await this.redis.del(key); } async runScan(tenantId: string, scanner: ScannerType): Promise<{ status: string; discovered: number }> { const locked = await this.acquireLock(tenantId, scanner); if (!locked) { logger.info({ tenantId, scanner }, 'Scan already in progress — skipping'); return { status: 'skipped', discovered: 0 }; } // Record scan start await withTenant(tenantId, async (client) => { await client.query( `INSERT INTO scan_history (tenant_id, scanner, status, started_at) VALUES ($1, $2, 'running', now())`, [tenantId, scanner], ); }); try { logger.info({ tenantId, scanner }, 'Starting scheduled scan'); const catalog = new CatalogService(this.pool); if (scanner === 'aws') { const awsScanner = new AwsDiscoveryScanner('us-east-1'); const result = await awsScanner.scan(tenantId); const isPartial = result.status === 'partial_failure'; const merged = await catalog.mergeAwsDiscovery(tenantId, result.services, isPartial); await this.recordScanResult(tenantId, scanner, result.status, result.discovered, result.errors); return { status: result.status, discovered: merged }; } else if (scanner === 'github') { const { Octokit } = await import('@octokit/rest'); const octokit = new Octokit(); const ghScanner = new GitHubDiscoveryScanner(octokit); const result = await ghScanner.scan(tenantId); const isPartial = result.status === 'partial_failure'; const merged = await catalog.mergeGitHubDiscovery(tenantId, result.repos, isPartial); await this.recordScanResult(tenantId, scanner, result.status, result.discovered, result.errors); return { status: result.status, discovered: merged }; } else if (scanner === 'cloudformation') { const cfnScanner = new CloudFormationDiscoveryScanner('us-east-1'); const result = await cfnScanner.scan(tenantId); const isPartial = result.status === 'partial_failure'; const merged = await catalog.mergeAwsDiscovery(tenantId, result.services, isPartial); await this.recordScanResult(tenantId, scanner, result.status, result.discovered, result.errors); return { status: result.status, discovered: merged }; } else if (scanner === 'apigateway') { const apigwScanner = new ApiGatewayDiscoveryScanner('us-east-1'); const result = await apigwScanner.scan(tenantId); const isPartial = result.status === 'partial_failure'; const merged = await catalog.mergeAwsDiscovery(tenantId, result.services, isPartial); await this.recordScanResult(tenantId, scanner, result.status, result.discovered, result.errors); return { status: result.status, discovered: merged }; } return { status: 'failed', discovered: 0 }; } catch (err) { logger.error({ tenantId, scanner, error: (err as Error).message }, 'Scheduled scan failed'); await this.recordScanResult(tenantId, scanner, 'failed', 0, [(err as Error).message]); return { status: 'failed', discovered: 0 }; } finally { await this.releaseLock(tenantId, scanner); } } private async recordScanResult(tenantId: string, scanner: string, status: string, discovered: number, errors: string[]): Promise { await withTenant(tenantId, async (client) => { await client.query( `UPDATE scan_history SET status = $1, discovered = $2, errors = $3, completed_at = now() WHERE tenant_id = $4 AND scanner = $5 AND completed_at IS NULL ORDER BY started_at DESC LIMIT 1`, [status, discovered, errors, tenantId, scanner], ); }); } }