diff --git a/products/04-lightweight-idp/src/discovery/scheduler.ts b/products/04-lightweight-idp/src/discovery/scheduler.ts index 4001c7f..6ed586d 100644 --- a/products/04-lightweight-idp/src/discovery/scheduler.ts +++ b/products/04-lightweight-idp/src/discovery/scheduler.ts @@ -1,26 +1,24 @@ import pino from 'pino'; import Redis from 'ioredis'; -import { config } from '../config/index.js'; +import { Pool } from 'pg'; +import { AwsDiscoveryScanner } from './aws-scanner.js'; +import { GitHubDiscoveryScanner } from './github-scanner.js'; +import { CatalogService } from '../catalog/service.js'; +import { withTenant } from '../data/db.js'; const logger = pino({ name: 'scheduled-discovery' }); -/** - * Scheduled discovery job — runs AWS + GitHub scans on a cron schedule. - * Uses Redis-based distributed lock to prevent concurrent scans. - */ export class ScheduledDiscovery { private redis: Redis; + private pool: Pool; private lockTtlMs: number; - constructor(redis: Redis, lockTtlMs = 10 * 60 * 1000) { + constructor(redis: Redis, pool: Pool, lockTtlMs = 10 * 60 * 1000) { this.redis = redis; + this.pool = pool; this.lockTtlMs = lockTtlMs; } - /** - * Attempt to acquire a distributed lock for a tenant scan. - * Returns true if lock acquired, false if another scan is running. - */ async acquireLock(tenantId: string, scanner: string): Promise { const key = `scan_lock:${tenantId}:${scanner}`; const result = await this.redis.set(key, Date.now().toString(), 'PX', this.lockTtlMs, 'NX'); @@ -32,10 +30,6 @@ export class ScheduledDiscovery { await this.redis.del(key); } - /** - * Run a scheduled scan for a tenant. - * Called by cron job or manual trigger. - */ async runScan(tenantId: string, scanner: 'aws' | 'github'): Promise<{ status: string; discovered: number }> { const locked = await this.acquireLock(tenantId, scanner); if (!locked) { @@ -43,22 +37,58 @@ export class ScheduledDiscovery { return { status: 'skipped', discovered: 0 }; } + // Record scan start + await withTenant(tenantId, async (client) => { + await client.query( + `INSERT INTO scan_history (tenant_id, scanner, status, started_at) VALUES ($1, $2, 'running', now())`, + [tenantId, scanner], + ); + }); + try { logger.info({ tenantId, scanner }, 'Starting scheduled scan'); + const catalog = new CatalogService(this.pool); - // TODO: Instantiate appropriate scanner and run - // const result = scanner === 'aws' - // ? await awsScanner.scan(region, account) - // : await githubScanner.scan(org); + if (scanner === 'aws') { + // Get tenant's AWS config (region, credentials would come from tenant settings) + const awsScanner = new AwsDiscoveryScanner('us-east-1'); + const result = await awsScanner.scan(tenantId); + const isPartial = result.status === 'partial_failure'; - // TODO: Merge results into catalog via CatalogService + const merged = await catalog.mergeAwsDiscovery(tenantId, result.services, isPartial); - return { status: 'completed', discovered: 0 }; + await this.recordScanResult(tenantId, scanner, result.status, result.discovered, result.errors); + return { status: result.status, discovered: merged }; + } else { + // GitHub scan — would need org name + token from tenant settings + const { Octokit } = await import('@octokit/rest'); + const octokit = new Octokit(); // Would use tenant's GitHub token + const ghScanner = new GitHubDiscoveryScanner(octokit); + const result = await ghScanner.scan(tenantId); // Would use tenant's org name + const isPartial = result.status === 'partial_failure'; + + const merged = await catalog.mergeGitHubDiscovery(tenantId, result.repos, isPartial); + + await this.recordScanResult(tenantId, scanner, result.status, result.discovered, result.errors); + return { status: result.status, discovered: merged }; + } } catch (err) { logger.error({ tenantId, scanner, error: (err as Error).message }, 'Scheduled scan failed'); + await this.recordScanResult(tenantId, scanner, 'failed', 0, [(err as Error).message]); return { status: 'failed', discovered: 0 }; } finally { await this.releaseLock(tenantId, scanner); } } + + private async recordScanResult(tenantId: string, scanner: string, status: string, discovered: number, errors: string[]): Promise { + await withTenant(tenantId, async (client) => { + await client.query( + `UPDATE scan_history SET status = $1, discovered = $2, errors = $3, completed_at = now() + WHERE tenant_id = $4 AND scanner = $5 AND completed_at IS NULL + ORDER BY started_at DESC LIMIT 1`, + [status, discovered, errors, tenantId, scanner], + ); + }); + } }