P1: API key redaction, SSE billing leak, token math edge cases, CI runner config P2: mTLS revocation lockout, terraform state lock recovery, RLS pool leak, entropy scrubber, pgmq visibility P3: HMAC replay prevention, cross-tenant negative tests, correlation window edge cases, SQS claim-check, free tier P4: Discovery partial failure recovery, ownership conflict integration test, VCR freshness CI, Meilisearch rebuild, Cmd+K latency P5: Concurrent baseline conflicts, remediation RBAC, Clock interface for governance, 10K property-based runs, Redis panic fallback P6: Cryptographic agent update signatures, streaming audit logs with WAL, shell AST parsing (mvdan/sh), intervention deadlock TTL, canary suite CI gate
53 KiB
dd0c/portal — Test Architecture & TDD Strategy
Product: dd0c/portal — Lightweight Internal Developer Platform Author: Test Architecture Phase Date: February 28, 2026 Status: V1 MVP — Solo Founder Scope
Section 1: Testing Philosophy & TDD Workflow
1.1 Core Philosophy
dd0c/portal is a trust-critical catalog tool — if auto-discovery assigns a service to the wrong team, or misses a service entirely, the platform loses credibility instantly. The >80% auto-discovery accuracy target from the party mode review is a hard gate, not a suggestion.
Guiding principle: tests validate what the platform engineer sees in the catalog. Every test should map to a visible outcome — a service appearing, an ownership assignment, a scorecard grade.
1.2 Red-Green-Refactor Adapted to dd0c/portal
RED → Write a failing test that describes the desired catalog state
(e.g., "after scanning an AWS account with 3 ECS services,
the catalog should contain 3 services with correct names")
GREEN → Write the minimum code to make it pass
REFACTOR → Extract the discovery logic, add confidence scoring,
optimize the scan parallelism
When to write tests first (strict TDD):
- All ownership inference logic (CODEOWNERS parsing, git blame weighting, signal merging)
- All service reconciliation (AWS + GitHub cross-referencing)
- All confidence scoring calculations
- All governance policy enforcement (strict suggest-only vs. audit auto-mutate)
- All phantom service quarantine logic
When integration tests lead:
- AWS scanner (implement against LocalStack, then lock in contract tests)
- GitHub GraphQL scanner (implement against recorded responses, then contract test)
- Meilisearch indexing (build the index, then test search relevance)
When E2E tests lead:
- 5-minute auto-discovery journey — define the expected catalog state, build backward
- Cmd+K search experience — define expected search results, then build the index
1.3 Test Naming Conventions
# Python unit tests (pytest) — AWS/GitHub scanners
class TestAWSScanner:
def test_discovers_ecs_services_from_cluster_listing(self): ...
def test_groups_resources_by_cloudformation_stack_name(self): ...
def test_assigns_confidence_095_to_cfn_stack_services(self): ...
class TestOwnershipInference:
def test_codeowners_signal_weighted_040(self): ...
def test_top_committer_signal_weighted_030(self): ...
def test_returns_ambiguous_when_top_scores_tied_under_050(self): ...
// TypeScript tests (vitest) — API, frontend
describe('CatalogAPI', () => {
it('returns services sorted by confidence score descending', () => {});
it('filters services by team ownership', () => {});
});
describe('OwnershipInference', () => {
it('merges CODEOWNERS + git blame + PR reviewer signals', () => {});
it('flags service as ambiguous when confidence < 0.50', () => {});
});
Section 2: Test Pyramid
2.1 Ratio
| Level | Target | Count (V1) | Runtime |
|---|---|---|---|
| Unit | 70% | ~300 tests | <30s |
| Integration | 20% | ~85 tests | <5min |
| E2E/Smoke | 10% | ~15 tests | <10min |
2.2 Unit Test Targets
| Component | Key Behaviors | Est. Tests |
|---|---|---|
| AWS Scanner (CloudFormation, ECS, Lambda, RDS) | Resource enumeration, tag extraction, service grouping | 50 |
| GitHub Scanner (repos, CODEOWNERS, workflows) | GraphQL parsing, CODEOWNERS parsing, CI/CD target extraction | 40 |
| Reconciliation Engine | AWS↔GitHub cross-reference, confidence scoring, dedup | 35 |
| Ownership Inference | Signal weighting, ambiguity detection, team resolution | 40 |
| Catalog API | CRUD, search, filtering, pagination | 30 |
| Governance Policy | Strict/audit modes, panic mode, per-team overrides | 25 |
| Feature Flags | Phantom quarantine circuit breaker, flag lifecycle | 15 |
| Scorecard Engine (V1 basic) | Criteria evaluation, grade calculation | 20 |
| Template Engine | Service template generation from catalog data | 15 |
| Slack Bot | Command parsing, response formatting | 30 |
2.3 Integration Test Boundaries
| Boundary | What's Tested | Infrastructure |
|---|---|---|
| AWS Scanner → AWS APIs | STS assume role, CloudFormation, ECS, Lambda, RDS listing | LocalStack |
| GitHub Scanner → GitHub API | GraphQL queries, rate limiting, pagination | WireMock (recorded responses) |
| Reconciler → PostgreSQL | Service upsert, ownership writes, conflict resolution | Testcontainers PostgreSQL |
| API → PostgreSQL | Catalog queries, tenant isolation, search | Testcontainers PostgreSQL |
| API → Meilisearch | Index sync, full-text search, faceted filtering | Testcontainers Meilisearch |
| API → Redis | Session management, cache invalidation, rate limiting | Testcontainers Redis |
| Slack Bot → Slack API | Command handling, block formatting | WireMock |
| Step Functions → Lambdas | Discovery orchestration flow | LocalStack |
2.4 E2E/Smoke Scenarios
- 5-Minute Miracle: Connect AWS + GitHub → auto-discover services → catalog populated with >80% accuracy
- Cmd+K Search: Type service name → results appear in <200ms with correct ranking
- Ownership Assignment: Discover services → infer ownership → correct team assigned
- Phantom Quarantine: Bad discovery rule → phantom services quarantined, not added to catalog
- Panic Mode: Enable panic → all discovery halts → catalog frozen read-only
Section 3: Unit Test Strategy
3.1 AWS Scanner
# tests/unit/scanners/test_aws_scanner.py
class TestCloudFormationScanner:
def test_lists_all_stacks_with_pagination(self): ...
def test_extracts_service_name_from_stack_name(self): ...
def test_maps_stack_resources_to_service_components(self): ...
def test_assigns_confidence_095_to_cfn_discovered_services(self): ...
def test_handles_deleted_stacks_gracefully(self): ...
def test_extracts_service_team_project_tags(self): ...
class TestECSScanner:
def test_lists_all_clusters_and_services(self): ...
def test_extracts_container_image_from_task_definition(self): ...
def test_maps_ecs_service_to_cfn_stack_when_tagged(self): ...
def test_standalone_ecs_service_without_cfn_gets_confidence_070(self): ...
def test_handles_empty_cluster_without_error(self): ...
class TestLambdaScanner:
def test_lists_all_functions_with_pagination(self): ...
def test_extracts_api_gateway_event_source_mapping(self): ...
def test_links_lambda_to_api_gateway_route(self): ...
def test_standalone_lambda_without_trigger_still_discovered(self): ...
class TestRDSScanner:
def test_lists_rds_instances_with_tags(self): ...
def test_maps_database_to_service_by_naming_prefix(self): ...
def test_maps_database_to_service_by_cfn_stack_membership(self): ...
def test_marks_rds_as_infrastructure_not_service(self): ...
class TestSTSRoleAssumption:
def test_assumes_cross_account_role_with_external_id(self): ...
def test_raises_clear_error_on_role_not_found(self): ...
def test_raises_clear_error_on_invalid_external_id(self): ...
def test_caches_credentials_until_expiry(self): ...
Mocking strategy: moto library for AWS API mocking in unit tests. LocalStack for integration tests.
3.2 GitHub Scanner
# tests/unit/scanners/test_github_scanner.py
class TestRepoScanner:
def test_lists_active_non_archived_non_forked_repos(self): ...
def test_extracts_primary_language(self): ...
def test_extracts_top_5_committers(self): ...
def test_batches_graphql_queries_at_100_repos_per_call(self): ...
def test_handles_rate_limit_with_retry_after(self): ...
def test_paginates_through_large_orgs(self): ...
class TestCodeownersParser:
def test_parses_team_ownership_from_codeowners(self): ...
def test_handles_wildcard_pattern_matching(self): ...
def test_handles_multiple_owners_per_path(self): ...
def test_returns_empty_when_codeowners_missing(self): ...
def test_handles_comment_lines_and_blank_lines(self): ...
def test_resolves_github_team_to_display_name(self): ...
class TestWorkflowParser:
def test_extracts_ecs_deploy_action_target(self): ...
def test_extracts_lambda_deploy_action_target(self): ...
def test_links_repo_to_aws_service_by_task_definition_name(self): ...
def test_handles_matrix_strategy_with_multiple_targets(self): ...
def test_ignores_non_deploy_workflows(self): ...
class TestReadmeExtractor:
def test_extracts_first_descriptive_paragraph(self): ...
def test_skips_badges_and_header_images(self): ...
def test_returns_empty_for_missing_readme(self): ...
def test_truncates_at_500_characters(self): ...
Mocking strategy: Recorded GraphQL responses in fixtures/github/. Use responses library for HTTP mocking.
3.3 Reconciliation Engine
# tests/unit/test_reconciler.py
class TestReconciler:
def test_matches_github_repo_to_aws_service_by_deploy_target(self): ...
def test_matches_github_repo_to_aws_service_by_naming_convention(self): ...
def test_merges_aws_and_github_metadata_into_single_service(self): ...
def test_deduplicates_services_discovered_from_multiple_sources(self): ...
def test_assigns_higher_confidence_when_both_sources_agree(self): ...
def test_creates_separate_services_when_no_cross_reference_found(self): ...
def test_preserves_manual_overrides_during_rescan(self): ...
def test_marks_previously_discovered_service_as_stale_when_missing(self): ...
3.4 Ownership Inference
The highest-risk logic in the product. Exhaustive testing required.
# tests/unit/test_ownership_inference.py
class TestOwnershipInference:
# Signal weighting
def test_codeowners_signal_weighted_040(self): ...
def test_top_committer_signal_weighted_030(self): ...
def test_pr_reviewer_signal_weighted_020(self): ...
def test_aws_tag_signal_weighted_010(self): ...
# Confidence calculation
def test_single_strong_signal_produces_moderate_confidence(self): ...
def test_multiple_agreeing_signals_produce_high_confidence(self): ...
def test_conflicting_signals_produce_low_confidence(self): ...
def test_returns_ambiguous_when_top_scores_tied(self): ...
def test_returns_ambiguous_when_confidence_under_050(self): ...
def test_flags_unowned_when_no_signals_found(self): ...
# Edge cases
def test_handles_individual_owner_not_in_any_team(self): ...
def test_handles_deleted_github_team(self): ...
def test_handles_repo_with_single_committer(self): ...
def test_handles_repo_with_no_codeowners_file(self): ...
def test_manual_override_always_wins_regardless_of_signals(self): ...
# Table-driven: signal combinations
@pytest.mark.parametrize("signals,expected_team,expected_confidence", [
({"codeowners": "team-a", "committers": "team-a", "reviewers": "team-a"}, "team-a", 0.90),
({"codeowners": "team-a", "committers": "team-b", "reviewers": "team-a"}, "team-a", 0.60),
({"codeowners": None, "committers": "team-b", "reviewers": "team-b"}, "team-b", 0.50),
({"codeowners": "team-a", "committers": "team-b", "reviewers": "team-c"}, None, None), # ambiguous
])
def test_signal_combination_produces_expected_ownership(self, signals, expected_team, expected_confidence): ...
3.5 Catalog API
// tests/unit/api/catalog.test.ts
describe('CatalogAPI', () => {
describe('Service CRUD', () => {
it('creates service with all required fields', () => {});
it('returns 404 for non-existent service', () => {});
it('updates service metadata without overwriting ownership', () => {});
it('soft-deletes service (marks stale, does not remove)', () => {});
});
describe('Search & Filtering', () => {
it('returns services sorted by confidence descending', () => {});
it('filters by team ownership', () => {});
it('filters by language', () => {});
it('filters by discovery source (aws/github/manual)', () => {});
it('paginates with cursor-based pagination', () => {});
});
describe('Tenant Isolation', () => {
it('never returns services from another tenant', () => {});
it('enforces tenant_id on all queries', () => {});
});
});
3.6 Governance Policy Engine
describe('GovernancePolicy', () => {
describe('Mode Enforcement', () => {
it('strict mode: discovery populates pending review queue', () => {});
it('strict mode: never auto-mutates catalog', () => {});
it('audit mode: auto-applies discoveries with logging', () => {});
it('defaults new tenants to strict mode', () => {});
});
describe('Panic Mode', () => {
it('halts all discovery scans when panic=true', () => {});
it('freezes catalog as read-only', () => {});
it('API returns 503 for write operations during panic', () => {});
it('shows maintenance banner in API response headers', () => {});
});
describe('Per-Team Override', () => {
it('team can lock services to strict even when system is audit', () => {});
it('team cannot downgrade from system strict to audit', () => {});
it('merge logic: max_restrictive(system, team)', () => {});
});
});
3.7 Feature Flag Circuit Breaker
describe('PhantomQuarantineBreaker', () => {
it('allows service creation when discovery rate is normal', () => {});
it('trips breaker when >5 unconfirmed services created in single scan', () => {});
it('quarantines phantom services instead of deleting them', () => {});
it('auto-disables the discovery flag when breaker trips', () => {});
it('quarantined services have status=quarantined, not active', () => {});
it('quarantined services visible in admin review queue', () => {});
});
3.8 Slack Bot
describe('SlackBot', () => {
describe('Command Parsing', () => {
it('parses /portal search <query> command', () => {});
it('parses /portal service <name> command', () => {});
it('parses /portal owner <service> command', () => {});
it('returns help text for unknown commands', () => {});
});
describe('Response Formatting', () => {
it('formats service card with name, team, language, links', () => {});
it('formats search results as compact list (max 10)', () => {});
it('formats ownership info with confidence badge', () => {});
it('includes "View in Portal" button link', () => {});
});
});
Section 4: Integration Test Strategy
4.1 AWS Scanner → LocalStack
# tests/integration/scanners/test_aws_integration.py
class TestAWSIntegration:
@pytest.fixture(autouse=True)
def setup_localstack(self, localstack_endpoint):
"""Create test resources in LocalStack."""
self.cfn = boto3.client('cloudformation', endpoint_url=localstack_endpoint)
self.ecs = boto3.client('ecs', endpoint_url=localstack_endpoint)
# Create test stacks, clusters, services, lambdas
self.cfn.create_stack(StackName='payment-api', TemplateBody=MINIMAL_TEMPLATE)
self.ecs.create_cluster(clusterName='prod')
self.ecs.create_service(cluster='prod', serviceName='payment-api', ...)
def test_full_aws_scan_discovers_all_resource_types(self): ...
def test_scan_groups_resources_by_cfn_stack(self): ...
def test_scan_handles_cross_region_resources(self): ...
def test_scan_respects_api_rate_limits(self): ...
def test_scan_completes_within_60_seconds_for_50_resources(self): ...
4.2 GitHub Scanner → WireMock
# tests/integration/scanners/test_github_integration.py
class TestGitHubIntegration:
@pytest.fixture(autouse=True)
def setup_wiremock(self, wiremock_url):
"""Load recorded GitHub GraphQL responses."""
# Stub: POST /graphql → recorded response with 10 repos
wiremock.stub_for(post('/graphql').will_return(
json_response(load_fixture('github/org-repos-page1.json'))
))
def test_full_github_scan_discovers_repos_with_metadata(self): ...
def test_scan_extracts_codeowners_for_each_repo(self): ...
def test_scan_extracts_deploy_workflows(self): ...
def test_scan_handles_graphql_rate_limit_with_retry(self): ...
def test_scan_paginates_through_100_plus_repos(self): ...
4.3 Reconciler → PostgreSQL
# tests/integration/test_reconciler_db.py
class TestReconcilerDB:
@pytest.fixture(autouse=True)
def setup_db(self, pg_container):
"""Run migrations against Testcontainers PostgreSQL."""
run_migrations(pg_container.get_connection_url())
def test_upserts_discovered_service_without_duplicates(self): ...
def test_preserves_manual_ownership_override_on_rescan(self): ...
def test_marks_missing_services_as_stale(self): ...
def test_tenant_isolation_enforced_at_db_level(self): ...
def test_concurrent_scans_for_different_tenants_dont_conflict(self): ...
4.4 API → Meilisearch
// tests/integration/search/meilisearch.test.ts
describe('Meilisearch Integration', () => {
let meili: StartedTestContainer;
beforeAll(async () => {
meili = await new GenericContainer('getmeili/meilisearch:v1')
.withExposedPorts(7700)
.start();
// Index test services
await indexServices(testCatalog);
});
it('returns relevant results for service name search', async () => {
const results = await search('payment');
expect(results[0].name).toContain('payment');
});
it('returns results within 200ms for 1000-service catalog', async () => {
await indexServices(generate1000Services());
const start = performance.now();
await search('api');
expect(performance.now() - start).toBeLessThan(200);
});
it('supports faceted filtering by team and language', async () => {
const results = await search('', { filters: { team: 'platform', language: 'TypeScript' } });
expect(results.every(r => r.team === 'platform')).toBe(true);
});
});
4.5 Step Functions → Lambda Orchestration (LocalStack)
# tests/integration/test_discovery_orchestration.py
class TestDiscoveryOrchestration:
def test_step_function_executes_aws_then_github_then_reconcile(self): ...
def test_step_function_retries_failed_scanner_once(self): ...
def test_step_function_completes_within_5_minutes(self): ...
def test_step_function_sends_completion_event_to_sqs(self): ...
Section 5: E2E & Smoke Tests
5.1 The 5-Minute Miracle
// tests/e2e/journeys/five-minute-miracle.test.ts
describe('5-Minute Auto-Discovery', () => {
it('discovers >80% of services from AWS + GitHub within 5 minutes', async () => {
// Setup: LocalStack with 20 known services, WireMock GitHub with 15 repos
const knownServices = await setupTestInfrastructure(20);
const knownRepos = await setupTestGitHub(15);
// Trigger discovery
const start = Date.now();
await triggerDiscovery('e2e-tenant');
await waitForDiscoveryComplete('e2e-tenant', { timeoutMs: 5 * 60 * 1000 });
const elapsed = Date.now() - start;
// Validate
expect(elapsed).toBeLessThan(5 * 60 * 1000);
const catalog = await getCatalog('e2e-tenant');
const matchedServices = catalog.filter(s =>
knownServices.some(k => s.name === k.name)
);
const accuracy = matchedServices.length / knownServices.length;
expect(accuracy).toBeGreaterThan(0.80);
});
});
5.2 Cmd+K Search
describe('Cmd+K Search Experience', () => {
it('returns search results within 200ms', async () => {
await populateCatalog(100);
const start = performance.now();
const results = await searchAPI('payment');
expect(performance.now() - start).toBeLessThan(200);
expect(results.length).toBeGreaterThan(0);
});
it('ranks exact name match above partial match', async () => {
await populateCatalog([
{ name: 'payment-api' },
{ name: 'payment-processor' },
{ name: 'api-gateway' },
]);
const results = await searchAPI('payment-api');
expect(results[0].name).toBe('payment-api');
});
});
5.3 Phantom Quarantine Journey
describe('Phantom Quarantine', () => {
it('quarantines phantom services when discovery rule misfires', async () => {
// Enable a bad discovery flag that creates phantom services
await enableFlag('experimental-tag-scanner');
// Trigger discovery — bad rule creates 8 phantom services
await triggerDiscovery('e2e-tenant');
await waitForDiscoveryComplete('e2e-tenant');
// Circuit breaker should have tripped (>5 unconfirmed)
const catalog = await getCatalog('e2e-tenant');
const quarantined = catalog.filter(s => s.status === 'quarantined');
expect(quarantined.length).toBeGreaterThanOrEqual(5);
// Flag should be auto-disabled
const flagState = await getFlagState('experimental-tag-scanner');
expect(flagState.enabled).toBe(false);
});
});
5.4 E2E Infrastructure
# docker-compose.e2e.yml
services:
localstack:
image: localstack/localstack:3
environment:
SERVICES: sts,cloudformation,ecs,lambda,rds,s3,sqs,stepfunctions
ports: ["4566:4566"]
postgres:
image: postgres:16-alpine
environment:
POSTGRES_PASSWORD: test
ports: ["5432:5432"]
redis:
image: redis:7-alpine
ports: ["6379:6379"]
meilisearch:
image: getmeili/meilisearch:v1
ports: ["7700:7700"]
wiremock:
image: wiremock/wiremock:3
ports: ["8080:8080"]
volumes:
- ./fixtures/wiremock:/home/wiremock/mappings
app:
build: .
environment:
AWS_ENDPOINT: http://localstack:4566
DATABASE_URL: postgres://postgres:test@postgres:5432/test
REDIS_URL: redis://redis:6379
MEILI_URL: http://meilisearch:7700
GITHUB_API_URL: http://wiremock:8080
SLACK_API_URL: http://wiremock:8080
depends_on: [localstack, postgres, redis, meilisearch, wiremock]
Section 6: Performance & Load Testing
6.1 Discovery Scan Benchmarks
# tests/perf/test_discovery_performance.py
class TestDiscoveryPerformance:
def test_aws_scan_completes_within_60s_for_50_resources(self): ...
def test_aws_scan_completes_within_3min_for_500_resources(self): ...
def test_github_scan_completes_within_60s_for_100_repos(self): ...
def test_github_scan_completes_within_3min_for_500_repos(self): ...
def test_full_discovery_pipeline_completes_within_5min_for_medium_org(self):
"""Medium org: 200 AWS resources + 150 GitHub repos."""
...
def test_reconciliation_completes_within_30s_for_200_services(self): ...
6.2 Catalog Query Latency
describe('Catalog Query Performance', () => {
it('returns service list in <100ms with 1000 services', async () => {
await populateCatalog(1000);
const start = performance.now();
await getCatalog('perf-tenant', { limit: 50 });
expect(performance.now() - start).toBeLessThan(100);
});
it('Meilisearch returns results in <200ms with 5000 services', async () => {
await indexServices(generate5000Services());
const start = performance.now();
await search('payment');
expect(performance.now() - start).toBeLessThan(200);
});
it('concurrent 50 catalog queries complete within 500ms p95', async () => {
await populateCatalog(1000);
const results = await Promise.all(
Array.from({ length: 50 }, () => timedQuery('perf-tenant'))
);
const p95 = percentile(results.map(r => r.elapsed), 95);
expect(p95).toBeLessThan(500);
});
});
6.3 Ownership Inference at Scale
class TestOwnershipPerformance:
def test_infers_ownership_for_200_services_within_60s(self): ...
def test_memory_stays_under_256mb_during_500_service_inference(self): ...
def test_handles_org_with_50_teams_without_degradation(self): ...
Section 7: CI/CD Pipeline Integration
7.1 Pipeline Stages
┌─────────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
│ Pre-Commit │───▶│ PR Gate │───▶│ Merge │───▶│ Staging │───▶│ Prod │
│ (local) │ │ (CI) │ │ (CI) │ │ (CD) │ │ (CD) │
└─────────────┘ └──────────┘ └──────────┘ └──────────┘ └──────────┘
lint + type unit tests full suite E2E + perf smoke + canary
<10s <5min <10min <15min 5-min miracle
7.2 Coverage Thresholds
| Component | Minimum | Target |
|---|---|---|
| Ownership Inference | 90% | 95% |
| Reconciliation Engine | 85% | 90% |
| AWS Scanner | 80% | 85% |
| GitHub Scanner | 80% | 85% |
| Governance Policy | 90% | 95% |
| Catalog API | 80% | 85% |
| Overall | 80% | 85% |
7.3 Test Parallelization
# .github/workflows/test.yml
jobs:
unit-python:
runs-on: ubuntu-latest
strategy:
matrix:
suite: [scanners, reconciler, ownership, governance]
steps:
- run: pytest tests/unit/${{ matrix.suite }} -x --tb=short
unit-typescript:
runs-on: ubuntu-latest
strategy:
matrix:
shard: [1, 2, 3]
steps:
- run: vitest --shard=${{ matrix.shard }}/3
integration:
runs-on: ubuntu-latest
services:
localstack: { image: localstack/localstack:3 }
postgres: { image: postgres:16-alpine }
redis: { image: redis:7-alpine }
meilisearch: { image: getmeili/meilisearch:v1 }
steps:
- run: pytest tests/integration/ -x
- run: vitest --project=integration
e2e:
needs: [unit-python, unit-typescript, integration]
runs-on: ubuntu-latest
steps:
- run: docker compose -f docker-compose.e2e.yml up -d
- run: vitest --project=e2e
Section 8: Transparent Factory Tenet Testing
8.1 Atomic Flagging — Phantom Quarantine Circuit Breaker
describe('Atomic Flagging', () => {
describe('Flag Lifecycle', () => {
it('new discovery source flag defaults to false', () => {});
it('flag has owner and ttl metadata (max 14 days)', () => {});
it('CI blocks when flag at 100% exceeds TTL', () => {});
});
describe('Phantom Quarantine Breaker', () => {
it('allows service creation when <5 unconfirmed per scan', () => {});
it('trips breaker when >5 unconfirmed services in single scan', () => {});
it('quarantines phantom services (status=quarantined)', () => {});
it('auto-disables the discovery flag', () => {});
it('quarantined services appear in admin review queue', () => {});
it('admin can approve quarantined services into catalog', () => {});
it('admin can purge quarantined services', () => {});
});
describe('Local Evaluation', () => {
it('flag check does not make network calls during scan', () => {});
it('flag state refreshed from file/env every 60s', () => {});
});
});
8.2 Elastic Schema — Migration Validation
class TestElasticSchema:
def test_rejects_migration_with_drop_column(self): ...
def test_rejects_migration_with_alter_column_type(self): ...
def test_rejects_migration_with_rename_column(self): ...
def test_accepts_migration_with_add_nullable_column(self): ...
def test_accepts_migration_with_new_table(self): ...
def test_v1_code_ignores_v2_columns_without_error(self): ...
def test_every_migration_has_sunset_date_comment(self):
for f in glob.glob('migrations/*.sql'):
content = open(f).read()
assert re.search(r'-- sunset_date: \d{4}-\d{2}-\d{2}', content)
def test_ci_warns_on_past_sunset_migrations(self): ...
8.3 Cognitive Durability — Decision Log Validation
describe('Cognitive Durability', () => {
it('decision_log.json required for PRs touching ownership inference', () => {});
it('decision_log.json required for PRs touching reconciliation', () => {});
it('decision log has all required fields', () => {
const logs = glob.sync('docs/decisions/*.json');
for (const log of logs) {
const entry = JSON.parse(fs.readFileSync(log, 'utf-8'));
expect(entry).toHaveProperty('reasoning');
expect(entry).toHaveProperty('alternatives_considered');
expect(entry).toHaveProperty('confidence');
expect(entry).toHaveProperty('timestamp');
expect(entry).toHaveProperty('author');
}
});
it('ownership signal weight changes include before/after examples', () => {
// Decision logs for ownership changes must include sample scenarios
});
});
8.4 Semantic Observability — OTEL Span Assertions
describe('Semantic Observability', () => {
let spanExporter: InMemorySpanExporter;
describe('Discovery Scan Spans', () => {
it('emits parent catalog_scan span', async () => {
await triggerDiscovery('test-tenant');
const spans = spanExporter.getFinishedSpans();
expect(spans.find(s => s.name === 'catalog_scan')).toBeDefined();
});
it('emits child aws_scan and github_scan spans', async () => {
await triggerDiscovery('test-tenant');
const spans = spanExporter.getFinishedSpans();
expect(spans.find(s => s.name === 'aws_scan')).toBeDefined();
expect(spans.find(s => s.name === 'github_scan')).toBeDefined();
});
});
describe('Ownership Inference Spans', () => {
it('emits ownership_inference span with all signals considered', async () => {
await inferOwnership('test-service');
const span = spanExporter.getFinishedSpans().find(s => s.name === 'ownership_inference');
expect(span.attributes['catalog.ownership_signals']).toBeDefined();
expect(span.attributes['catalog.confidence_score']).toBeGreaterThanOrEqual(0);
});
it('includes rejected signals in span attributes', async () => {
await inferOwnership('test-service');
const span = spanExporter.getFinishedSpans().find(s => s.name === 'ownership_inference');
const signals = JSON.parse(span.attributes['catalog.ownership_signals']);
expect(signals.length).toBeGreaterThan(0);
// Each signal should have: source, team, weight, accepted/rejected
});
});
describe('PII Protection', () => {
it('hashes repo names in span attributes', async () => {
await triggerDiscovery('test-tenant');
const spans = spanExporter.getFinishedSpans();
for (const span of spans) {
const attrs = JSON.stringify(span.attributes);
expect(attrs).not.toContain('payment-api'); // real name
}
});
it('hashes team names in ownership spans', async () => {
await inferOwnership('test-service');
const span = spanExporter.getFinishedSpans().find(s => s.name === 'ownership_inference');
expect(span.attributes['catalog.service_id']).toMatch(/^[a-f0-9]+$/);
});
});
});
8.5 Configurable Autonomy — Governance Tests
describe('Configurable Autonomy', () => {
describe('Strict Mode (suggest-only)', () => {
it('discovery results go to pending review queue', async () => {
setPolicy({ governance_mode: 'strict' });
await triggerDiscovery('test-tenant');
const pending = await getPendingReview('test-tenant');
expect(pending.length).toBeGreaterThan(0);
const catalog = await getCatalog('test-tenant');
expect(catalog.length).toBe(0); // Nothing auto-added
});
});
describe('Audit Mode (auto-mutate)', () => {
it('discovery results auto-applied to catalog with logging', async () => {
setPolicy({ governance_mode: 'audit' });
await triggerDiscovery('test-tenant');
const catalog = await getCatalog('test-tenant');
expect(catalog.length).toBeGreaterThan(0);
const logs = await getPolicyLogs('test-tenant');
expect(logs.some(l => l.includes('auto-created in audit mode'))).toBe(true);
});
});
describe('Panic Mode', () => {
it('halts discovery scans immediately', async () => {
await activatePanic();
const result = await triggerDiscovery('test-tenant');
expect(result.status).toBe('halted');
});
it('catalog API returns 503 for writes', async () => {
await activatePanic();
const res = await fetch('/api/services', { method: 'POST', body: '{}' });
expect(res.status).toBe(503);
});
it('catalog API allows reads during panic', async () => {
await activatePanic();
const res = await fetch('/api/services');
expect(res.status).toBe(200);
});
});
describe('Per-Team Override', () => {
it('team strict lock prevents auto-mutation even in audit mode', async () => {
setPolicy({ governance_mode: 'audit' });
setTeamPolicy('platform-team', { governance_mode: 'strict' });
await triggerDiscovery('test-tenant');
const platformServices = (await getCatalog('test-tenant'))
.filter(s => s.team === 'platform-team');
expect(platformServices.length).toBe(0); // Blocked by team lock
});
});
});
Section 9: Test Data & Fixtures
9.1 Directory Structure
tests/
fixtures/
aws/
cloudformation/
payment-api-stack.json
user-service-stack.json
empty-stack.json
ecs/
prod-cluster-services.json
staging-cluster-services.json
lambda/
functions-list.json
api-gateway-mappings.json
rds/
instances-list.json
github/
graphql/
org-repos-page1.json
org-repos-page2.json
repo-details-with-codeowners.json
repo-details-no-codeowners.json
codeowners/
simple-team-ownership.txt
multi-path-ownership.txt
wildcard-patterns.txt
empty.txt
workflows/
ecs-deploy.yml
lambda-deploy.yml
matrix-deploy.yml
non-deploy-ci.yml
scenarios/
medium-org-200-resources.json
large-org-500-resources.json
conflicting-ownership.json
no-github-match.json
slack/
service-card-blocks.json
search-results-blocks.json
ownership-info-blocks.json
9.2 Service Factory
# tests/helpers/factories.py
def make_aws_service(overrides=None):
defaults = {
"name": f"service-{fake.word()}",
"source": "aws",
"aws_resources": [
{"type": "ecs-service", "arn": f"arn:aws:ecs:us-east-1:123456789:service/prod/{fake.word()}"},
],
"tags": {"service": fake.word(), "team": fake.word()},
"confidence": 0.85,
"discovered_at": datetime.utcnow().isoformat(),
}
return {**defaults, **(overrides or {})}
def make_github_repo(overrides=None):
defaults = {
"name": f"{fake.word()}-{fake.word()}",
"language": random.choice(["TypeScript", "Python", "Go", "Java"]),
"codeowners": [{"path": "*", "owners": [f"@org/{fake.word()}-team"]}],
"top_committers": [fake.name() for _ in range(5)],
"has_deploy_workflow": random.choice([True, False]),
"deploy_target": None,
}
return {**defaults, **(overrides or {})}
def make_catalog_service(overrides=None):
defaults = {
"service_id": str(uuid4()),
"tenant_id": "test-tenant",
"name": f"{fake.word()}-{random.choice(['api', 'service', 'worker', 'lambda'])}",
"team": f"{fake.word()}-team",
"language": random.choice(["TypeScript", "Python", "Go"]),
"sources": random.sample(["aws", "github"], k=random.randint(1, 2)),
"confidence": round(random.uniform(0.5, 1.0), 2),
"status": "active",
"ownership_signals": [],
}
return {**defaults, **(overrides or {})}
9.3 Synthetic Org Topology Generator
# tests/helpers/org_generator.py
def generate_org_topology(num_teams=5, services_per_team=10, repos_per_service=1.5):
"""Generate a realistic org with teams, services, repos, and dependencies."""
teams = [f"team-{fake.word()}" for _ in range(num_teams)]
services = []
repos = []
for team in teams:
for i in range(services_per_team):
svc_name = f"{team.split('-')[1]}-{fake.word()}-{random.choice(['api', 'worker', 'lambda'])}"
services.append(make_aws_service({"name": svc_name, "tags": {"team": team}}))
# Each service has 1-2 repos
for j in range(int(repos_per_service)):
repos.append(make_github_repo({
"name": svc_name if j == 0 else f"{svc_name}-lib",
"codeowners": [{"path": "*", "owners": [f"@org/{team}"]}],
"deploy_target": svc_name if j == 0 else None,
}))
return {"teams": teams, "services": services, "repos": repos}
Section 10: TDD Implementation Order
10.1 Bootstrap Sequence
Phase 0: Test Infrastructure (Week 0)
├── 0.1 pytest + vitest config
├── 0.2 LocalStack helper (STS, CFN, ECS, Lambda, RDS, SQS, Step Functions)
├── 0.3 Testcontainers helpers (PostgreSQL, Redis, Meilisearch)
├── 0.4 WireMock GitHub GraphQL stubs
├── 0.5 Factory functions (make_aws_service, make_github_repo, make_catalog_service)
├── 0.6 Org topology generator
└── 0.7 CI pipeline with test stages
10.2 Epic-by-Epic TDD Order
Phase 1: AWS Discovery (Epic 1) — Tests First for STS, Integration-Led for Scanners
├── 1.1 RED: STS role assumption tests (security-critical)
├── 1.2 GREEN: Implement STS client
├── 1.3 Implement CFN scanner against LocalStack
├── 1.4 RED: CFN scanner unit tests (lock in behavior)
├── 1.5 Implement ECS + Lambda + RDS scanners
├── 1.6 RED: Scanner unit tests for each resource type
├── 1.7 INTEGRATION: Full AWS scan against LocalStack
└── 1.8 REFACTOR: Extract scanner interface, add parallelism
Phase 2: GitHub Discovery (Epic 2) — Integration-Led
├── 2.1 Implement repo scanner against WireMock
├── 2.2 RED: CODEOWNERS parser tests (strict TDD)
├── 2.3 GREEN: Implement CODEOWNERS parser
├── 2.4 RED: Workflow parser tests
├── 2.5 GREEN: Implement workflow parser
├── 2.6 INTEGRATION: Full GitHub scan against WireMock
└── 2.7 RED: Rate limit handling tests
Phase 3: Reconciliation (Epic 3) — Tests First
├── 3.1 RED: Cross-reference matching tests
├── 3.2 GREEN: Implement reconciler
├── 3.3 RED: Deduplication tests
├── 3.4 GREEN: Implement dedup logic
├── 3.5 INTEGRATION: Reconciler → PostgreSQL
└── 3.6 REFACTOR: Confidence scoring pipeline
Phase 4: Ownership Inference (Epic 4) — Strict TDD
├── 4.1 RED: Signal weighting tests (all combinations)
├── 4.2 GREEN: Implement inference engine
├── 4.3 RED: Ambiguity detection tests
├── 4.4 GREEN: Implement ambiguity logic
├── 4.5 RED: Manual override tests
├── 4.6 GREEN: Implement override handling
└── 4.7 INTEGRATION: Inference → PostgreSQL
Phase 5: Catalog API + Search (Epics 5-6) — Integration-Led
├── 5.1 Implement API endpoints
├── 5.2 RED: API unit tests (CRUD, filtering, pagination)
├── 5.3 INTEGRATION: API → PostgreSQL
├── 5.4 INTEGRATION: API → Meilisearch
└── 5.5 RED: Tenant isolation tests
Phase 6: Governance (Epic 10) — Strict TDD
├── 6.1 RED: Strict/audit mode tests
├── 6.2 GREEN: Implement policy engine
├── 6.3 RED: Panic mode tests
├── 6.4 GREEN: Implement panic mode
├── 6.5 RED: Phantom quarantine circuit breaker tests
├── 6.6 GREEN: Implement circuit breaker
├── 6.7 RED: OTEL span assertion tests
└── 6.8 GREEN: Instrument all components
Phase 7: E2E Validation
├── 7.1 5-Minute Miracle journey (>80% accuracy gate)
├── 7.2 Cmd+K search journey (<200ms gate)
├── 7.3 Phantom quarantine journey
├── 7.4 Panic mode journey
└── 7.5 Performance benchmarks
10.3 "Never Ship Without" Checklist
- All STS role assumption tests (security gate)
- All ownership inference tests (accuracy gate — >80%)
- All CODEOWNERS parser tests (correctness gate)
- All governance policy tests (compliance gate)
- Phantom quarantine circuit breaker test (safety gate)
- 5-Minute Miracle E2E journey (product promise gate)
- PII protection span tests (privacy gate)
- Schema migration lint (no breaking changes)
- Coverage ≥80% overall, ≥90% on ownership inference
- Meilisearch search latency <200ms with 1000 services
End of dd0c/portal Test Architecture
11. Review Remediation Addendum (Post-Gemini Review)
11.1 Resolve Database Misalignment (PostgreSQL vs DynamoDB)
Epic 10.2 specified DynamoDB Single-Table, but the Architecture and Test Architecture are fundamentally built around PostgreSQL (Aurora Serverless v2) with pgvector. Resolution: The IDP requires relational joins and vector search. PostgreSQL is the definitive catalog database. DynamoDB references are removed.
// tests/schema/migration_validation_test.rs
#[tokio::test]
async fn elastic_schema_postgres_migration_is_additive_only() {
let migrations = read_sql_migrations("./migrations");
for migration in migrations {
assert!(!migration.contains("DROP COLUMN"), "Destructive schema change detected");
assert!(!migration.contains("ALTER COLUMN"), "Type modification detected");
assert!(!migration.contains("RENAME COLUMN"), "Column rename detected");
}
}
#[tokio::test]
async fn migration_does_not_hold_exclusive_locks_on_reads() {
// Concurrent index creation tests
assert!(migration_contains("CREATE INDEX CONCURRENTLY"),
"Indexes must be created concurrently to avoid locking the catalog");
}
11.2 Invert the Test Pyramid (Integration Honeycomb)
Shift from 70% Unit (with heavy moto/responses mocking) to 30/60/10 with VCR and LocalStack.
# tests/integration/scanners/test_aws_scanner.py
@pytest.mark.vcr()
def test_aws_scanner_discovers_ecs_services_and_api_gateways(vcr_cassette):
# Uses real recorded AWS API responses, not moto mocks
# Validates actual boto3 parsing against real-world AWS shapes
scanner = AWSDiscoveryScanner(account_id="123456789012", region="us-east-1")
services = scanner.scan()
assert len(services) > 0
assert any(s.type == "ecs_service" for s in services)
@pytest.mark.vcr()
def test_github_scanner_handles_graphql_pagination(vcr_cassette):
# Validates real GitHub GraphQL paginated responses
scanner = GitHubDiscoveryScanner(org_name="dd0c")
repos = scanner.scan()
assert len(repos) > 100 # Proves pagination logic works
11.3 Missing Epic Coverage
Epic 3.4: PagerDuty & OpsGenie Integrations
# tests/integration/test_pagerduty_sync.py
@pytest.mark.vcr()
def test_pagerduty_sync_maps_schedules_to_catalog_teams():
sync = PagerDutySyncer(api_key="sk-test-key")
teams = sync.fetch_oncall_schedules()
assert teams[0].oncall_email is not None
def test_pagerduty_credentials_are_encrypted_at_rest():
# Verify KMS envelope encryption for 3rd party API keys
pass
Epic 4.3: Redis Prefix Caching for Cmd+K
# tests/integration/test_search_cache.py
def test_cmd_k_search_hits_redis_cache_before_postgres():
redis_client.set("search:auth", json.dumps([{"name": "auth-service"}]))
# Must return < 5ms from Redis, skipping DB
result = search_api.query("auth")
assert result[0]['name'] == "auth-service"
def test_catalog_update_invalidates_search_cache():
# Create new service
catalog_api.create_service("billing-api")
# Prefix cache must be purged
assert redis_client.keys("search:*") == []
Epics 5 & 6: UI and Dashboards (Playwright)
// tests/e2e/ui/catalog.spec.ts
test('service catalog renders progressive disclosure UI', async ({ page }) => {
await page.goto('/catalog');
// Click expands details instead of navigating away
await page.click('[data-testid="service-row-auth-api"]');
await expect(page.locator('[data-testid="service-drawer"]')).toBeVisible();
});
test('dashboard KPI aggregation shows total services and ownership coverage', async ({ page }) => {
await page.goto('/dashboard');
await expect(page.locator('[data-testid="kpi-total-services"]')).toHaveText("150");
await expect(page.locator('[data-testid="kpi-ownership"]')).toHaveText("85%");
});
Epic 9: Onboarding & Stripe
# tests/integration/test_stripe_webhooks.py
def test_stripe_checkout_completed_upgrades_tenant_tier():
payload = load_fixture("stripe_checkout_completed.json")
signature = generate_stripe_signature(payload, secret)
response = api_client.post("/webhooks/stripe", data=payload, headers={"Stripe-Signature": signature})
assert response.status_code == 200
tenant = db.get_tenant("t-123")
assert tenant.tier == "pro"
def test_websocket_streams_discovery_progress_during_onboarding():
# Connect WS client, trigger discovery, assert WS receives "discovering AWS...", "found 50 resources..."
pass
11.4 Scaled Performance Benchmarks
# tests/performance/test_discovery_scale.py
def test_discovery_pipeline_handles_10000_aws_resources_without_step_functions_payload_limit():
# Simulate an AWS environment with 10k resources
# Must chunk state machine transitions to stay under 256KB Step Functions limit
pass
def test_discovery_pipeline_handles_1000_github_repos():
# Verify GraphQL batching and rate limit backoff
pass
11.5 Edge Case Resilience
def test_github_graphql_concurrent_rate_limiting():
# If 5 tenants scan concurrently, respect Retry-After headers across workers
pass
def test_partial_discovery_scan_does_not_corrupt_catalog():
# If GitHub scan times out halfway, existing services must NOT be marked stale
pass
def test_ownership_conflict_resolution():
# If two discovery sources claim the same repo, prioritize Explicit (Config) over Implicit (Tags)
pass
def test_meilisearch_index_rebuild_does_not_drop_search():
# Verify zero-downtime index swapping during mapping updates
pass
12. BMad Review Implementation (Must-Have Before Launch)
12.1 Discovery Scan Timeout / Partial Failure Recovery
# tests/integration/test_discovery_resilience.py
def test_partial_aws_scan_does_not_delete_existing_services():
"""If AWS scanner times out after discovering 500 of 1000 resources,
existing catalog entries must NOT be marked stale or deleted."""
# Seed catalog with 1000 services
seed_catalog(count=1000)
# Simulate scanner timeout after 500 resources
with mock_aws_timeout_after(500):
result = run_aws_discovery_scan()
assert result.status == "partial_failure"
assert result.discovered == 500
# All 1000 services must still exist in catalog
services = catalog_api.list_services()
assert len(services) == 1000 # NOT 500
# Partial results should be staged, not committed
staged = catalog_api.list_staged_updates()
assert len(staged) == 500
def test_partial_github_scan_does_not_corrupt_ownership():
"""If GitHub scanner hits rate limit mid-scan, existing ownership
mappings must be preserved."""
seed_catalog_with_ownership(count=100)
with mock_github_rate_limit_after(50):
result = run_github_discovery_scan()
assert result.status == "partial_failure"
# All 100 ownership mappings intact
services = catalog_api.list_services()
owned = [s for s in services if s.owner is not None]
assert len(owned) == 100 # NOT 50
def test_scan_failure_triggers_alert_not_silent_failure():
result = run_aws_discovery_scan_with_invalid_credentials()
assert result.status == "failed"
# Must alert the admin
alerts = get_admin_alerts()
assert any("discovery scan failed" in a.message for a in alerts)
12.2 Ownership Conflict Resolution Integration Test
# tests/integration/test_ownership_conflict.py
def test_explicit_config_overrides_implicit_tag():
"""Explicit (CODEOWNERS/config) > Implicit (AWS tags) > Heuristic (commits)"""
# AWS tag says owner is "team-infra"
aws_scanner.discover_service("auth-api", owner_tag="team-infra")
# GitHub CODEOWNERS says owner is "team-platform"
github_scanner.discover_service("auth-api", codeowners="team-platform")
# Resolve conflict
service = catalog_api.get_service("auth-api")
assert service.owner == "team-platform" # Explicit wins
assert service.owner_source == "codeowners"
def test_concurrent_discovery_sources_do_not_race():
"""Two scanners discovering the same service simultaneously
must not create duplicate entries."""
import asyncio
async def run_both():
await asyncio.gather(
aws_scanner.discover_service_async("billing-api"),
github_scanner.discover_service_async("billing-api"),
)
asyncio.run(run_both())
services = catalog_api.search("billing-api")
assert len(services) == 1 # No duplicates
def test_heuristic_ownership_does_not_override_explicit():
# Explicit owner set via config
catalog_api.set_owner("auth-api", "team-platform", source="config")
# Heuristic scanner infers different owner from commit history
github_scanner.infer_ownership("auth-api", top_committer="dev@other-team.com")
service = catalog_api.get_service("auth-api")
assert service.owner == "team-platform" # Explicit preserved
12.3 VCR Cassette Freshness Validation
# .github/workflows/vcr-refresh.yml
# Weekly job to re-record VCR cassettes against real AWS
name: VCR Cassette Freshness
on:
schedule:
- cron: '0 6 * * 1' # Every Monday 6 AM UTC
jobs:
refresh:
runs-on: self-hosted
steps:
- uses: actions/checkout@v4
- name: Re-record cassettes
env:
AWS_ACCESS_KEY_ID: ${{ secrets.VCR_AWS_KEY }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.VCR_AWS_SECRET }}
run: |
VCR_RECORD=all pytest tests/integration/scanners/ -v
- name: Diff cassettes
run: |
git diff --stat tests/cassettes/
CHANGED=$(git diff --name-only tests/cassettes/ | wc -l)
if [ "$CHANGED" -gt 0 ]; then
echo "⚠️ $CHANGED cassettes changed — AWS API responses have drifted"
echo "Review and commit updated cassettes"
fi
- name: Create PR if cassettes changed
uses: peter-evans/create-pull-request@v6
with:
title: "chore: refresh VCR cassettes (AWS API drift)"
branch: vcr-refresh
12.4 Meilisearch Zero-Downtime Index Rebuild
# tests/integration/test_meilisearch_rebuild.py
def test_search_returns_results_during_index_rebuild():
"""Cmd+K search must work during index rebuild (zero downtime)."""
# Seed index with 100 services
meili.index("services").add_documents(make_services(100))
meili.index("services").wait_for_pending_update()
# Start rebuild (creates services_v2, swaps when ready)
rebuild_task = start_index_rebuild()
# Search must still work during rebuild
results = meili.index("services").search("auth")
assert len(results["hits"]) > 0
# Wait for rebuild to complete
rebuild_task.wait()
# Search still works after swap
results = meili.index("services").search("auth")
assert len(results["hits"]) > 0
def test_index_rebuild_failure_does_not_corrupt_active_index():
meili.index("services").add_documents(make_services(50))
# Simulate rebuild failure (e.g., OOM during indexing)
with mock_meili_oom_during_rebuild():
result = start_index_rebuild()
assert result.status == "failed"
# Active index must be untouched
results = meili.index("services").search("billing")
assert len(results["hits"]) > 0 # Still works
12.5 Cmd+K Search Latency from Redis Cache
# tests/performance/test_search_latency.py
def test_cmd_k_search_under_10ms_from_redis():
"""Prefix cache hit must return in <10ms."""
# Warm the cache
redis_client.set("search:prefix:auth", json.dumps([
{"name": "auth-service", "owner": "team-platform"},
{"name": "auth-proxy", "owner": "team-infra"},
]))
import time
start = time.perf_counter_ns()
results = search_api.prefix_search("auth")
elapsed_ms = (time.perf_counter_ns() - start) / 1_000_000
assert elapsed_ms < 10, f"Cmd+K search took {elapsed_ms:.1f}ms — exceeds 10ms SLA"
assert len(results) == 2
12.6 Free Tier Enforcement (50 Services)
def test_free_tier_allows_50_services():
tenant = create_tenant(tier="free")
for i in range(50):
resp = catalog_api.create_service(tenant, f"service-{i}")
assert resp.status_code == 201
def test_free_tier_rejects_51st_service():
tenant = create_tenant(tier="free")
seed_services(tenant, count=50)
resp = catalog_api.create_service(tenant, "service-51")
assert resp.status_code == 403
assert "upgrade" in resp.json()["error"].lower()
End of P4 BMad Implementation