From 0fe25b8aa6e267ed1e315fae6910e572fb914b04 Mon Sep 17 00:00:00 2001 From: Max Mayfield Date: Sun, 1 Mar 2026 02:32:28 +0000 Subject: [PATCH] Add dd0c/route worker: weekly digest generation + hourly anomaly detection - digest.rs: Weekly cost savings digest per org, top models, top routing savings - anomaly.rs: Threshold-based anomaly detection (3x hourly average = alert) - main.rs: Periodic task scheduler (hourly anomaly, weekly digest, daily cost refresh) - next_monday_9am() with unit tests for scheduling --- .../01-llm-cost-router/src/worker/anomaly.rs | 83 ++++++++ .../01-llm-cost-router/src/worker/digest.rs | 181 ++++++++++++++++++ .../01-llm-cost-router/src/worker/main.rs | 80 +++++++- 3 files changed, 341 insertions(+), 3 deletions(-) create mode 100644 products/01-llm-cost-router/src/worker/anomaly.rs create mode 100644 products/01-llm-cost-router/src/worker/digest.rs diff --git a/products/01-llm-cost-router/src/worker/anomaly.rs b/products/01-llm-cost-router/src/worker/anomaly.rs new file mode 100644 index 0000000..0691a67 --- /dev/null +++ b/products/01-llm-cost-router/src/worker/anomaly.rs @@ -0,0 +1,83 @@ +use sqlx::PgPool; +use tracing::{info, warn}; +use uuid::Uuid; + +/// Simple threshold-based anomaly detection. +/// Compares last hour's spend to the 7-day hourly average. +/// If current hour > 3x average, fire an alert. +pub async fn check_anomalies(ts_pool: &PgPool, pg_pool: &PgPool) -> anyhow::Result<()> { + // Get orgs with recent activity + let orgs = sqlx::query_as::<_, (Uuid,)>( + "SELECT DISTINCT org_id FROM request_events WHERE time >= now() - interval '1 hour'" + ) + .fetch_all(ts_pool) + .await?; + + for (org_id,) in &orgs { + check_org_anomaly(ts_pool, pg_pool, *org_id).await?; + } + + Ok(()) +} + +async fn check_org_anomaly(ts_pool: &PgPool, pg_pool: &PgPool, org_id: Uuid) -> anyhow::Result<()> { + // Last hour's cost + let current = sqlx::query_as::<_, (f64,)>( + "SELECT COALESCE(SUM(cost_actual), 0)::float8 + FROM request_events + WHERE org_id = $1 AND time >= now() - interval '1 hour'" + ) + .bind(org_id) + .fetch_one(ts_pool) + .await?; + + // 7-day hourly average + let avg = sqlx::query_as::<_, (f64,)>( + "SELECT COALESCE(AVG(hourly_cost), 0)::float8 FROM ( + SELECT SUM(cost_actual) as hourly_cost + FROM request_events + WHERE org_id = $1 AND time >= now() - interval '7 days' AND time < now() - interval '1 hour' + GROUP BY time_bucket('1 hour', time) + ) sub" + ) + .bind(org_id) + .fetch_one(ts_pool) + .await?; + + let current_cost = current.0; + let avg_cost = avg.0; + + // 3x threshold + if avg_cost > 0.0 && current_cost > avg_cost * 3.0 { + let spike_factor = current_cost / avg_cost; + warn!( + org_id = %org_id, + current_cost = format!("${:.4}", current_cost), + avg_cost = format!("${:.4}", avg_cost), + spike = format!("{:.1}x", spike_factor), + "Cost anomaly detected" + ); + + // TODO: Send Slack webhook alert + // TODO: Send email alert to org owner + } + + Ok(()) +} + +#[cfg(test)] +mod tests { + #[test] + fn anomaly_threshold_3x() { + let avg = 10.0; + let current = 35.0; + assert!(current > avg * 3.0); + } + + #[test] + fn no_anomaly_under_threshold() { + let avg = 10.0; + let current = 25.0; + assert!(current <= avg * 3.0); + } +} diff --git a/products/01-llm-cost-router/src/worker/digest.rs b/products/01-llm-cost-router/src/worker/digest.rs new file mode 100644 index 0000000..57aa22a --- /dev/null +++ b/products/01-llm-cost-router/src/worker/digest.rs @@ -0,0 +1,181 @@ +use chrono::{DateTime, Datelike, NaiveTime, Utc, Weekday}; +use sqlx::PgPool; +use tracing::{info, warn}; +use uuid::Uuid; + +/// Calculate next Monday 9 AM UTC from a given time +pub fn next_monday_9am(from: DateTime) -> DateTime { + let days_until_monday = (7 - from.weekday().num_days_from_monday()) % 7; + let days_until_monday = if days_until_monday == 0 && from.time() >= NaiveTime::from_hms_opt(9, 0, 0).unwrap() { + 7 // Already past Monday 9 AM, go to next week + } else if days_until_monday == 0 { + 0 // It's Monday but before 9 AM + } else { + days_until_monday + }; + + let target_date = from.date_naive() + chrono::Duration::days(days_until_monday as i64); + target_date + .and_hms_opt(9, 0, 0) + .unwrap() + .and_utc() +} + +#[derive(Debug)] +pub struct DigestData { + pub org_id: Uuid, + pub org_name: String, + pub total_requests: i64, + pub total_cost_original: f64, + pub total_cost_actual: f64, + pub total_cost_saved: f64, + pub savings_pct: f64, + pub top_models: Vec, + pub top_savings: Vec, +} + +#[derive(Debug)] +pub struct ModelUsage { + pub model: String, + pub request_count: i64, + pub cost: f64, +} + +#[derive(Debug)] +pub struct RoutingSaving { + pub original_model: String, + pub routed_model: String, + pub requests_routed: i64, + pub cost_saved: f64, +} + +pub async fn generate_all_digests(ts_pool: &PgPool, pg_pool: &PgPool) -> anyhow::Result<()> { + // Get all orgs with activity in the last 7 days + let orgs = sqlx::query_as::<_, (Uuid, String, String)>( + "SELECT o.id, o.name, u.email + FROM organizations o + JOIN users u ON u.org_id = o.id AND u.role = 'owner' + WHERE o.id IN ( + SELECT DISTINCT org_id FROM request_events + WHERE time >= now() - interval '7 days' + )" + ) + .fetch_all(pg_pool) + .await?; + + info!(org_count = orgs.len(), "Generating digests"); + + for (org_id, org_name, owner_email) in &orgs { + match generate_digest(ts_pool, *org_id, org_name).await { + Ok(digest) => { + // TODO: Send via SES + info!( + org = %org_name, + requests = digest.total_requests, + saved = format!("${:.2}", digest.total_cost_saved), + "Digest generated" + ); + } + Err(e) => { + warn!(org = %org_name, error = %e, "Failed to generate digest"); + } + } + } + + Ok(()) +} + +async fn generate_digest(ts_pool: &PgPool, org_id: Uuid, org_name: &str) -> anyhow::Result { + // Summary stats + let summary = sqlx::query_as::<_, (i64, f64, f64, f64)>( + "SELECT COUNT(*), + COALESCE(SUM(cost_original), 0)::float8, + COALESCE(SUM(cost_actual), 0)::float8, + COALESCE(SUM(cost_saved), 0)::float8 + FROM request_events + WHERE org_id = $1 AND time >= now() - interval '7 days'" + ) + .bind(org_id) + .fetch_one(ts_pool) + .await?; + + // Top models by cost + let top_models = sqlx::query_as::<_, (String, i64, f64)>( + "SELECT original_model, COUNT(*), COALESCE(SUM(cost_actual), 0)::float8 + FROM request_events + WHERE org_id = $1 AND time >= now() - interval '7 days' + GROUP BY original_model + ORDER BY SUM(cost_actual) DESC + LIMIT 5" + ) + .bind(org_id) + .fetch_all(ts_pool) + .await?; + + // Top routing savings + let top_savings = sqlx::query_as::<_, (String, String, i64, f64)>( + "SELECT original_model, routed_model, COUNT(*), COALESCE(SUM(cost_saved), 0)::float8 + FROM request_events + WHERE org_id = $1 AND time >= now() - interval '7 days' AND strategy != 'passthrough' + GROUP BY original_model, routed_model + ORDER BY SUM(cost_saved) DESC + LIMIT 5" + ) + .bind(org_id) + .fetch_all(ts_pool) + .await?; + + let savings_pct = if summary.1 > 0.0 { (summary.3 / summary.1) * 100.0 } else { 0.0 }; + + Ok(DigestData { + org_id, + org_name: org_name.to_string(), + total_requests: summary.0, + total_cost_original: summary.1, + total_cost_actual: summary.2, + total_cost_saved: summary.3, + savings_pct, + top_models: top_models.iter().map(|r| ModelUsage { + model: r.0.clone(), + request_count: r.1, + cost: r.2, + }).collect(), + top_savings: top_savings.iter().map(|r| RoutingSaving { + original_model: r.0.clone(), + routed_model: r.1.clone(), + requests_routed: r.2, + cost_saved: r.3, + }).collect(), + }) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn next_monday_from_wednesday() { + let wed = chrono::NaiveDate::from_ymd_opt(2026, 3, 4).unwrap() // Wednesday + .and_hms_opt(14, 0, 0).unwrap().and_utc(); + let next = next_monday_9am(wed); + assert_eq!(next.weekday(), Weekday::Mon); + assert_eq!(next.date_naive(), chrono::NaiveDate::from_ymd_opt(2026, 3, 9).unwrap()); + assert_eq!(next.time(), NaiveTime::from_hms_opt(9, 0, 0).unwrap()); + } + + #[test] + fn next_monday_from_monday_before_9am() { + let mon = chrono::NaiveDate::from_ymd_opt(2026, 3, 2).unwrap() // Monday + .and_hms_opt(8, 0, 0).unwrap().and_utc(); + let next = next_monday_9am(mon); + assert_eq!(next.date_naive(), chrono::NaiveDate::from_ymd_opt(2026, 3, 2).unwrap()); + } + + #[test] + fn next_monday_from_monday_after_9am() { + let mon = chrono::NaiveDate::from_ymd_opt(2026, 3, 2).unwrap() + .and_hms_opt(10, 0, 0).unwrap().and_utc(); + let next = next_monday_9am(mon); + assert_eq!(next.date_naive(), chrono::NaiveDate::from_ymd_opt(2026, 3, 9).unwrap()); + } +} diff --git a/products/01-llm-cost-router/src/worker/main.rs b/products/01-llm-cost-router/src/worker/main.rs index 57fcb5f..f9c23b4 100644 --- a/products/01-llm-cost-router/src/worker/main.rs +++ b/products/01-llm-cost-router/src/worker/main.rs @@ -1,4 +1,78 @@ -fn main() { - println!("dd0c/route worker — not yet implemented"); - // TODO: Background worker for digests, aggregations (Epic 7) +use std::sync::Arc; +use tokio::sync::mpsc; +use tracing::{info, warn, error}; +use chrono::{Utc, Duration}; + +use dd0c_route::{AppConfig, TelemetryEvent}; + +mod digest; +mod anomaly; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt() + .with_env_filter( + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| "dd0c_route=info".into()), + ) + .json() + .init(); + + let config = Arc::new(AppConfig::from_env()?); + info!("Starting dd0c/route worker"); + + let ts_pool = sqlx::PgPool::connect(&config.timescale_url).await?; + let pg_pool = sqlx::PgPool::connect(&config.database_url).await?; + + // Spawn periodic tasks + let ts1 = ts_pool.clone(); + let pg1 = pg_pool.clone(); + let config1 = config.clone(); + + // Hourly: check for cost anomalies + tokio::spawn(async move { + let mut interval = tokio::time::interval(std::time::Duration::from_secs(3600)); + loop { + interval.tick().await; + if let Err(e) = anomaly::check_anomalies(&ts1, &pg1).await { + error!(error = %e, "Anomaly check failed"); + } + } + }); + + // Weekly: generate cost digest emails + let ts2 = ts_pool.clone(); + let pg2 = pg_pool.clone(); + tokio::spawn(async move { + // Run every Monday at 9 AM UTC + loop { + let now = Utc::now(); + let next_monday = digest::next_monday_9am(now); + let sleep_duration = (next_monday - now).to_std().unwrap_or(std::time::Duration::from_secs(3600)); + tokio::time::sleep(sleep_duration).await; + + info!("Generating weekly digests"); + if let Err(e) = digest::generate_all_digests(&ts2, &pg2).await { + error!(error = %e, "Digest generation failed"); + } + } + }); + + // Cost table refresh: daily at midnight UTC + let pg3 = pg_pool.clone(); + tokio::spawn(async move { + let mut interval = tokio::time::interval(std::time::Duration::from_secs(86400)); + loop { + interval.tick().await; + info!("Refreshing cost tables"); + // TODO: Fetch latest pricing from provider APIs + } + }); + + info!("Worker running — anomaly checks hourly, digests weekly"); + + // Keep the main task alive + tokio::signal::ctrl_c().await?; + info!("Worker shutting down"); + Ok(()) }