Implement review remediation + PLG analytics SDK

- All 6 test architectures patched with Section 11 addendums
- P5 (cost) fully rewritten from 232 to ~600 lines
- PLG brainstorm + party mode advisory board results
- Analytics SDK v2 (PostHog Cloud, Zod strict, Lambda-safe)
- Analytics tests v2 (safeParse, no , no timestamp, no PII)
- Addresses all Gemini review findings across P1-P6
This commit is contained in:
2026-03-01 01:42:49 +00:00
parent 2fe0ed856e
commit 03bfe931fc
9 changed files with 2950 additions and 85 deletions

View File

@@ -0,0 +1,138 @@
import { PostHog } from 'posthog-node';
import { z } from 'zod';
// ---------------------------------------------------------
// 1. Unified Event Taxonomy (Zod Enforced, Strictly Typed)
// ---------------------------------------------------------
export enum EventName {
SignupCompleted = 'account.signup.completed',
FirstDollarSaved = 'routing.savings.first_dollar',
UpgradeCompleted = 'billing.upgrade.completed',
}
// Per-event property schemas — no z.any() PII loophole
const SignupProperties = z.object({
method: z.enum(['github_sso', 'google_sso', 'email']),
}).strict();
const ActivationProperties = z.object({
savings_amount: z.number().nonnegative(),
}).strict();
const UpgradeProperties = z.object({
plan: z.enum(['pro', 'business']),
mrr_increase: z.number().nonnegative(),
}).strict();
const PropertiesMap = {
[EventName.SignupCompleted]: SignupProperties,
[EventName.FirstDollarSaved]: ActivationProperties,
[EventName.UpgradeCompleted]: UpgradeProperties,
} as const;
export const EventSchema = z.object({
name: z.nativeEnum(EventName),
tenant_id: z.string().min(1, 'tenant_id is required'),
product: z.literal('route'),
properties: z.record(z.unknown()).optional().default({}),
});
export type AnalyticsEvent = z.infer<typeof EventSchema>;
// ---------------------------------------------------------
// 2. NoOp Client for local/test environments
// ---------------------------------------------------------
class NoOpPostHog {
capture() {}
identify() {}
async flushAsync() {}
async shutdown() {}
}
// ---------------------------------------------------------
// 3. Analytics SDK (PostHog Cloud, Lambda-Safe)
// ---------------------------------------------------------
export class Analytics {
private client: PostHog | NoOpPostHog;
public readonly isSessionReplayEnabled = false;
constructor(client?: PostHog) {
if (client) {
this.client = client;
} else {
const apiKey = process.env.POSTHOG_API_KEY;
if (!apiKey) {
// No key = NoOp. Never silently send to a mock key.
console.warn('[Analytics] POSTHOG_API_KEY not set — using NoOp client');
this.client = new NoOpPostHog();
} else {
this.client = new PostHog(apiKey, {
host: 'https://us.i.posthog.com',
flushAt: 20, // Batch up to 20 events
flushInterval: 5000, // Or flush every 5s
});
}
}
}
/**
* Identify a tenant once (on signup). Sets $set properties.
* Call this instead of embedding $set in every track() call.
*/
public identify(tenantId: string, properties?: Record<string, unknown>): void {
this.client.identify({
distinctId: tenantId,
properties: { tenant_id: tenantId, ...properties },
});
}
/**
* Track an event. Uses safeParse — never crashes the caller.
* Does NOT flush. Call flush() at Lambda teardown.
*/
public track(event: AnalyticsEvent): boolean {
// 1. Base schema validation
const baseResult = EventSchema.safeParse(event);
if (!baseResult.success) {
console.error('[Analytics] Invalid event (base):', baseResult.error.format());
return false;
}
// 2. Per-event property validation (strict, no PII loophole)
const propSchema = PropertiesMap[baseResult.data.name];
if (propSchema) {
const propResult = propSchema.safeParse(baseResult.data.properties);
if (!propResult.success) {
console.error('[Analytics] Invalid properties:', propResult.error.format());
return false;
}
}
// 3. Capture — let PostHog assign the timestamp (avoids clock skew)
this.client.capture({
distinctId: baseResult.data.tenant_id,
event: baseResult.data.name,
properties: {
product: baseResult.data.product,
...baseResult.data.properties,
},
});
return true;
}
/**
* Flush all queued events. Call once at Lambda teardown
* (e.g., in a Middy middleware or handler's finally block).
*/
public async flush(): Promise<void> {
await this.client.flushAsync();
}
public async shutdown(): Promise<void> {
await this.client.shutdown();
}
}

View File

@@ -2239,3 +2239,315 @@ Before writing any new function, ask:
*Test Architecture document generated for dd0c/route V1 MVP.*
*Total estimated test count at V1 launch: ~400 tests.*
*Target CI runtime: <8 minutes (unit + integration), <15 minutes (full pipeline with E2E).*
---
## 11. Review Remediation Addendum (Post-Gemini Review)
### 11.1 Replace MockKeyCache/MockKeyStore with Testcontainers
```rust
// BEFORE (anti-pattern — mocks hide real latency):
// let cache = MockKeyCache::new();
// let store = MockKeyStore::new();
// AFTER: Use Testcontainers for hot-path auth tests
#[tokio::test]
async fn auth_middleware_validates_key_under_5ms_with_real_redis() {
let redis = TestcontainersRedis::start().await;
let pg = TestcontainersPostgres::start().await;
let cache = RedisKeyCache::new(redis.connection_string());
let store = PgKeyStore::new(pg.connection_string());
let start = Instant::now();
let result = auth_middleware(&cache, &store, "sk-valid-key").await;
assert!(start.elapsed() < Duration::from_millis(5));
assert!(result.is_ok());
}
#[tokio::test]
async fn auth_middleware_handles_redis_connection_pool_exhaustion() {
// Exhaust all connections, verify fallback to PG
let redis = TestcontainersRedis::start().await;
let cache = RedisKeyCache::with_pool_size(redis.connection_string(), 1);
// Hold the single connection
let _held = cache.raw_connection().await;
// Auth must still work via PG fallback
let result = auth_middleware(&cache, &pg_store, "sk-valid-key").await;
assert!(result.is_ok());
}
```
### 11.2 Fix Encryption Test (Decrypt, Don't Just Assert Non-Plaintext)
```rust
// BEFORE (anti-pattern — passes if stored as random garbage):
// assert_ne!(stored.encrypted_key, b"sk-plaintext-key");
// AFTER: Full round-trip encryption test
#[tokio::test]
async fn provider_credential_encrypts_and_decrypts_correctly() {
let kms = LocalStackKMS::start().await;
let key_id = kms.create_key().await;
let store = CredentialStore::new(pg.pool(), kms.client(), key_id);
let original = "sk-live-abc123xyz";
store.save_credential("org-1", "openai", original).await.unwrap();
// Read raw from DB — must NOT be plaintext
let raw = pg.query_raw("SELECT encrypted_key FROM credentials LIMIT 1").await;
assert!(!String::from_utf8_lossy(&raw).contains(original));
// Decrypt via the store — must match original
let decrypted = store.get_credential("org-1", "openai").await.unwrap();
assert_eq!(decrypted, original);
}
#[tokio::test]
async fn kms_key_rotation_old_deks_still_decrypt_old_credentials() {
let kms = LocalStackKMS::start().await;
let key_id = kms.create_key().await;
let store = CredentialStore::new(pg.pool(), kms.client(), key_id);
// Save with original key
store.save_credential("org-1", "openai", "sk-old").await.unwrap();
// Rotate KMS key
kms.rotate_key(key_id).await;
// Old credential must still decrypt
let decrypted = store.get_credential("org-1", "openai").await.unwrap();
assert_eq!(decrypted, "sk-old");
// New credential uses new DEK
store.save_credential("org-1", "anthropic", "sk-new").await.unwrap();
let decrypted_new = store.get_credential("org-1", "anthropic").await.unwrap();
assert_eq!(decrypted_new, "sk-new");
}
```
### 11.3 Slow Dependency Chaos Test
```rust
#[tokio::test]
async fn chaos_slow_db_does_not_block_proxy_hot_path() {
let stack = E2EStack::start().await;
// Inject 5-second network delay on TimescaleDB port via tc netem
stack.inject_latency("timescaledb", Duration::from_secs(5)).await;
// Proxy must still route requests within SLA
let start = Instant::now();
let resp = stack.proxy()
.post("/v1/chat/completions")
.header("Authorization", "Bearer sk-valid")
.json(&chat_request())
.send().await;
let latency = start.elapsed();
assert_eq!(resp.status(), 200);
// Telemetry is dropped, but routing works
assert!(latency < Duration::from_millis(50),
"Proxy blocked by slow DB: {:?}", latency);
}
#[tokio::test]
async fn chaos_slow_redis_falls_back_to_pg_for_auth() {
let stack = E2EStack::start().await;
stack.inject_latency("redis", Duration::from_secs(3)).await;
let resp = stack.proxy()
.post("/v1/chat/completions")
.header("Authorization", "Bearer sk-valid")
.json(&chat_request())
.send().await;
assert_eq!(resp.status(), 200);
}
```
### 11.4 IDOR / Cross-Tenant Test Suite
```rust
// tests/integration/idor_test.rs
#[tokio::test]
async fn idor_org_a_cannot_read_org_b_routing_rules() {
let stack = E2EStack::start().await;
let org_a_token = stack.create_org_and_token("org-a").await;
let org_b_token = stack.create_org_and_token("org-b").await;
// Org B creates a routing rule
let rule = stack.api()
.post("/v1/routing-rules")
.bearer_auth(&org_b_token)
.json(&json!({ "name": "secret-rule", "model": "gpt-4" }))
.send().await.json::<RoutingRule>().await;
// Org A tries to read it
let resp = stack.api()
.get(&format!("/v1/routing-rules/{}", rule.id))
.bearer_auth(&org_a_token)
.send().await;
assert_eq!(resp.status(), 404); // Not 403 — don't leak existence
}
#[tokio::test]
async fn idor_org_a_cannot_read_org_b_api_keys() {
// Same pattern — create key as org B, attempt read as org A
}
#[tokio::test]
async fn idor_org_a_cannot_read_org_b_telemetry() {}
#[tokio::test]
async fn idor_org_a_cannot_mutate_org_b_routing_rules() {}
```
### 11.5 SSE Connection Drop / Billing Leak Test
```rust
#[tokio::test]
async fn sse_client_disconnect_aborts_upstream_provider_request() {
let stack = E2EStack::start().await;
let mock_provider = stack.mock_provider();
// Configure provider to stream slowly (1 token/sec for 60 tokens)
mock_provider.configure_slow_stream(60, Duration::from_secs(1));
// Start streaming request
let mut stream = stack.proxy()
.post("/v1/chat/completions")
.json(&json!({ "stream": true, "model": "gpt-4" }))
.send().await
.bytes_stream();
// Read 5 tokens then drop the connection
for _ in 0..5 {
stream.next().await;
}
drop(stream);
// Wait briefly for cleanup
tokio::time::sleep(Duration::from_millis(500)).await;
// Provider connection must be aborted — not still streaming
assert_eq!(mock_provider.active_connections(), 0);
// Billing: customer should only be charged for 5 tokens, not 60
let usage = stack.get_last_usage_record().await;
assert!(usage.completion_tokens <= 10); // Some buffer for in-flight
}
```
### 11.6 Concurrent Circuit Breaker Race Condition
```rust
#[tokio::test]
async fn circuit_breaker_handles_50_concurrent_failures_cleanly() {
let redis = TestcontainersRedis::start().await;
let breaker = RedisCircuitBreaker::new(redis.connection_string(), "openai", 10);
let mut handles = vec![];
for _ in 0..50 {
let b = breaker.clone();
handles.push(tokio::spawn(async move {
b.record_failure().await;
}));
}
futures::future::join_all(handles).await;
// Breaker must be open — no race condition leaving it closed
assert_eq!(breaker.state().await, CircuitState::Open);
// Failure count must be exactly 50 (atomic increments)
assert_eq!(breaker.failure_count().await, 50);
}
```
### 11.7 Trace Context Propagation
```rust
#[tokio::test]
async fn otel_trace_propagates_from_client_through_proxy_to_provider() {
let stack = E2EStack::start().await;
let tracer = stack.in_memory_tracer();
let resp = stack.proxy()
.post("/v1/chat/completions")
.header("traceparent", "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01")
.json(&chat_request())
.send().await;
let spans = tracer.finished_spans();
let proxy_span = spans.iter().find(|s| s.name == "proxy.route").unwrap();
// Proxy span must be child of the incoming trace
assert_eq!(proxy_span.trace_id, "4bf92f3577b34da6a3ce929d0e0e4736");
// Provider request must carry the same trace_id
let provider_req = stack.mock_provider().last_request();
assert!(provider_req.headers["traceparent"].contains("4bf92f3577b34da6a3ce929d0e0e4736"));
}
```
### 11.8 Flag Provider Fallback Test
```rust
#[test]
fn flag_provider_unreachable_falls_back_to_safe_default() {
// Simulate missing/corrupt flag config file
let provider = JsonFileProvider::new("/nonexistent/flags.json");
let result = provider.evaluate("enable_new_router", false);
// Must return the safe default (false), not panic or error
assert_eq!(result, false);
}
#[test]
fn flag_provider_malformed_json_falls_back_to_safe_default() {
let provider = JsonFileProvider::from_string("{ invalid json }}}");
let result = provider.evaluate("enable_new_router", false);
assert_eq!(result, false);
}
```
### 11.9 24-Hour Soak Test Spec
```rust
// tests/soak/long_running_latency.rs
// Run manually: cargo test --test soak -- --ignored
#[tokio::test]
#[ignore] // Only run in nightly CI
async fn soak_24h_proxy_latency_stays_under_5ms_p99() {
// k6 config: 10 RPS sustained for 24 hours
// Assert: p99 < 5ms, no memory growth > 50MB, no connection leaks
// This catches memory fragmentation and connection pool exhaustion
}
```
### 11.10 Panic Mode Authorization
```rust
#[tokio::test]
async fn panic_mode_requires_owner_role() {
let stack = E2EStack::start().await;
let viewer_token = stack.create_token_with_role("org-1", Role::Viewer).await;
let resp = stack.api()
.post("/admin/panic")
.bearer_auth(&viewer_token)
.send().await;
assert_eq!(resp.status(), 403);
}
#[tokio::test]
async fn panic_mode_allowed_for_owner_role() {
let owner_token = stack.create_token_with_role("org-1", Role::Owner).await;
let resp = stack.api()
.post("/admin/panic")
.bearer_auth(&owner_token)
.send().await;
assert_eq!(resp.status(), 200);
}
```
*End of P1 Review Remediation Addendum*

View File

@@ -0,0 +1,204 @@
import { describe, it, expect, vi, beforeEach } from 'vitest';
import { Analytics, EventSchema, EventName } from '../../src/analytics';
import { PostHog } from 'posthog-node';
vi.mock('posthog-node');
describe('Analytics SDK (PostHog Cloud — v2 Post-Review)', () => {
let analytics: Analytics;
let mockPostHog: vi.Mocked<PostHog>;
beforeEach(() => {
vi.clearAllMocks();
mockPostHog = new PostHog('phc_test_key', { host: 'https://us.i.posthog.com' }) as any;
analytics = new Analytics(mockPostHog);
});
// ── Schema Validation (Zod) ──────────────────────────────
describe('Event Taxonomy Validation', () => {
it('accepts valid account.signup.completed event', () => {
const event = {
name: EventName.SignupCompleted,
tenant_id: 'tenant-123',
product: 'route' as const,
properties: { method: 'github_sso' },
};
expect(() => EventSchema.parse(event)).not.toThrow();
});
it('rejects events missing tenant_id', () => {
const event = {
name: EventName.SignupCompleted,
product: 'route',
properties: { method: 'email' },
};
expect(() => EventSchema.parse(event as any)).toThrow(/tenant_id/);
});
it('accepts valid activation event', () => {
const event = {
name: EventName.FirstDollarSaved,
tenant_id: 'tenant-123',
product: 'route' as const,
properties: { savings_amount: 1.50 },
};
expect(() => EventSchema.parse(event)).not.toThrow();
});
it('accepts valid upgrade event', () => {
const event = {
name: EventName.UpgradeCompleted,
tenant_id: 'tenant-123',
product: 'route' as const,
properties: { plan: 'pro', mrr_increase: 49 },
};
expect(() => EventSchema.parse(event)).not.toThrow();
});
});
// ── track() Behavior ─────────────────────────────────────
describe('track()', () => {
it('captures valid events via PostHog client', () => {
const result = analytics.track({
name: EventName.SignupCompleted,
tenant_id: 'tenant-123',
product: 'route',
properties: { method: 'email' },
});
expect(result).toBe(true);
expect(mockPostHog.capture).toHaveBeenCalledWith(
expect.objectContaining({
distinctId: 'tenant-123',
event: 'account.signup.completed',
properties: expect.objectContaining({
product: 'route',
method: 'email',
}),
})
);
});
it('does NOT include $set in track calls (use identify instead)', () => {
analytics.track({
name: EventName.SignupCompleted,
tenant_id: 'tenant-123',
product: 'route',
properties: { method: 'github_sso' },
});
const captureCall = mockPostHog.capture.mock.calls[0][0];
expect(captureCall.properties).not.toHaveProperty('$set');
});
it('does NOT pass timestamp (let PostHog handle it to avoid clock skew)', () => {
analytics.track({
name: EventName.SignupCompleted,
tenant_id: 'tenant-123',
product: 'route',
properties: { method: 'email' },
});
const captureCall = mockPostHog.capture.mock.calls[0][0];
expect(captureCall).not.toHaveProperty('timestamp');
});
it('returns false and does NOT call PostHog if base validation fails', () => {
const result = analytics.track({
name: 'invalid.event' as any,
tenant_id: 'tenant-123',
product: 'route',
});
expect(result).toBe(false);
expect(mockPostHog.capture).not.toHaveBeenCalled();
});
it('returns false if per-event property validation fails (strict schema)', () => {
const result = analytics.track({
name: EventName.SignupCompleted,
tenant_id: 'tenant-123',
product: 'route',
properties: { method: 'invalid_method' }, // Not in enum
});
expect(result).toBe(false);
expect(mockPostHog.capture).not.toHaveBeenCalled();
});
it('rejects unknown properties (strict mode — no PII loophole)', () => {
const result = analytics.track({
name: EventName.SignupCompleted,
tenant_id: 'tenant-123',
product: 'route',
properties: { method: 'email', email: 'user@example.com' }, // PII leak attempt
});
expect(result).toBe(false);
expect(mockPostHog.capture).not.toHaveBeenCalled();
});
it('does NOT flush after each track call (Lambda batching)', () => {
analytics.track({
name: EventName.SignupCompleted,
tenant_id: 'tenant-123',
product: 'route',
properties: { method: 'email' },
});
expect(mockPostHog.flushAsync).not.toHaveBeenCalled();
});
});
// ── identify() ───────────────────────────────────────────
describe('identify()', () => {
it('calls PostHog identify with tenant_id as distinctId', () => {
analytics.identify('tenant-123', { company: 'Acme' });
expect(mockPostHog.identify).toHaveBeenCalledWith(
expect.objectContaining({
distinctId: 'tenant-123',
properties: expect.objectContaining({
tenant_id: 'tenant-123',
company: 'Acme',
}),
})
);
});
});
// ── flush() ──────────────────────────────────────────────
describe('flush()', () => {
it('calls flushAsync on the PostHog client', async () => {
await analytics.flush();
expect(mockPostHog.flushAsync).toHaveBeenCalledTimes(1);
});
});
// ── NoOp Client ──────────────────────────────────────────
describe('NoOp Client (missing API key)', () => {
it('does not throw when tracking without API key', () => {
const noopAnalytics = new Analytics(); // No client, no env var
const result = noopAnalytics.track({
name: EventName.SignupCompleted,
tenant_id: 'tenant-123',
product: 'route',
properties: { method: 'email' },
});
expect(result).toBe(true); // NoOp accepts everything silently
});
});
// ── Session Replay ───────────────────────────────────────
describe('Security', () => {
it('session replay is disabled', () => {
expect(analytics.isSessionReplayEnabled).toBe(false);
});
});
});