Add dd0c/route worker: weekly digest generation + hourly anomaly detection

- digest.rs: Weekly cost savings digest per org, top models, top routing savings
- anomaly.rs: Threshold-based anomaly detection (3x hourly average = alert)
- main.rs: Periodic task scheduler (hourly anomaly, weekly digest, daily cost refresh)
- next_monday_9am() with unit tests for scheduling
This commit is contained in:
2026-03-01 02:32:28 +00:00
parent e234f66b9b
commit 0fe25b8aa6
3 changed files with 341 additions and 3 deletions

View File

@@ -0,0 +1,83 @@
use sqlx::PgPool;
use tracing::{info, warn};
use uuid::Uuid;
/// Simple threshold-based anomaly detection.
/// Compares last hour's spend to the 7-day hourly average.
/// If current hour > 3x average, fire an alert.
pub async fn check_anomalies(ts_pool: &PgPool, pg_pool: &PgPool) -> anyhow::Result<()> {
// Get orgs with recent activity
let orgs = sqlx::query_as::<_, (Uuid,)>(
"SELECT DISTINCT org_id FROM request_events WHERE time >= now() - interval '1 hour'"
)
.fetch_all(ts_pool)
.await?;
for (org_id,) in &orgs {
check_org_anomaly(ts_pool, pg_pool, *org_id).await?;
}
Ok(())
}
async fn check_org_anomaly(ts_pool: &PgPool, pg_pool: &PgPool, org_id: Uuid) -> anyhow::Result<()> {
// Last hour's cost
let current = sqlx::query_as::<_, (f64,)>(
"SELECT COALESCE(SUM(cost_actual), 0)::float8
FROM request_events
WHERE org_id = $1 AND time >= now() - interval '1 hour'"
)
.bind(org_id)
.fetch_one(ts_pool)
.await?;
// 7-day hourly average
let avg = sqlx::query_as::<_, (f64,)>(
"SELECT COALESCE(AVG(hourly_cost), 0)::float8 FROM (
SELECT SUM(cost_actual) as hourly_cost
FROM request_events
WHERE org_id = $1 AND time >= now() - interval '7 days' AND time < now() - interval '1 hour'
GROUP BY time_bucket('1 hour', time)
) sub"
)
.bind(org_id)
.fetch_one(ts_pool)
.await?;
let current_cost = current.0;
let avg_cost = avg.0;
// 3x threshold
if avg_cost > 0.0 && current_cost > avg_cost * 3.0 {
let spike_factor = current_cost / avg_cost;
warn!(
org_id = %org_id,
current_cost = format!("${:.4}", current_cost),
avg_cost = format!("${:.4}", avg_cost),
spike = format!("{:.1}x", spike_factor),
"Cost anomaly detected"
);
// TODO: Send Slack webhook alert
// TODO: Send email alert to org owner
}
Ok(())
}
#[cfg(test)]
mod tests {
#[test]
fn anomaly_threshold_3x() {
let avg = 10.0;
let current = 35.0;
assert!(current > avg * 3.0);
}
#[test]
fn no_anomaly_under_threshold() {
let avg = 10.0;
let current = 25.0;
assert!(current <= avg * 3.0);
}
}

View File

@@ -0,0 +1,181 @@
use chrono::{DateTime, Datelike, NaiveTime, Utc, Weekday};
use sqlx::PgPool;
use tracing::{info, warn};
use uuid::Uuid;
/// Calculate next Monday 9 AM UTC from a given time
pub fn next_monday_9am(from: DateTime<Utc>) -> DateTime<Utc> {
let days_until_monday = (7 - from.weekday().num_days_from_monday()) % 7;
let days_until_monday = if days_until_monday == 0 && from.time() >= NaiveTime::from_hms_opt(9, 0, 0).unwrap() {
7 // Already past Monday 9 AM, go to next week
} else if days_until_monday == 0 {
0 // It's Monday but before 9 AM
} else {
days_until_monday
};
let target_date = from.date_naive() + chrono::Duration::days(days_until_monday as i64);
target_date
.and_hms_opt(9, 0, 0)
.unwrap()
.and_utc()
}
#[derive(Debug)]
pub struct DigestData {
pub org_id: Uuid,
pub org_name: String,
pub total_requests: i64,
pub total_cost_original: f64,
pub total_cost_actual: f64,
pub total_cost_saved: f64,
pub savings_pct: f64,
pub top_models: Vec<ModelUsage>,
pub top_savings: Vec<RoutingSaving>,
}
#[derive(Debug)]
pub struct ModelUsage {
pub model: String,
pub request_count: i64,
pub cost: f64,
}
#[derive(Debug)]
pub struct RoutingSaving {
pub original_model: String,
pub routed_model: String,
pub requests_routed: i64,
pub cost_saved: f64,
}
pub async fn generate_all_digests(ts_pool: &PgPool, pg_pool: &PgPool) -> anyhow::Result<()> {
// Get all orgs with activity in the last 7 days
let orgs = sqlx::query_as::<_, (Uuid, String, String)>(
"SELECT o.id, o.name, u.email
FROM organizations o
JOIN users u ON u.org_id = o.id AND u.role = 'owner'
WHERE o.id IN (
SELECT DISTINCT org_id FROM request_events
WHERE time >= now() - interval '7 days'
)"
)
.fetch_all(pg_pool)
.await?;
info!(org_count = orgs.len(), "Generating digests");
for (org_id, org_name, owner_email) in &orgs {
match generate_digest(ts_pool, *org_id, org_name).await {
Ok(digest) => {
// TODO: Send via SES
info!(
org = %org_name,
requests = digest.total_requests,
saved = format!("${:.2}", digest.total_cost_saved),
"Digest generated"
);
}
Err(e) => {
warn!(org = %org_name, error = %e, "Failed to generate digest");
}
}
}
Ok(())
}
async fn generate_digest(ts_pool: &PgPool, org_id: Uuid, org_name: &str) -> anyhow::Result<DigestData> {
// Summary stats
let summary = sqlx::query_as::<_, (i64, f64, f64, f64)>(
"SELECT COUNT(*),
COALESCE(SUM(cost_original), 0)::float8,
COALESCE(SUM(cost_actual), 0)::float8,
COALESCE(SUM(cost_saved), 0)::float8
FROM request_events
WHERE org_id = $1 AND time >= now() - interval '7 days'"
)
.bind(org_id)
.fetch_one(ts_pool)
.await?;
// Top models by cost
let top_models = sqlx::query_as::<_, (String, i64, f64)>(
"SELECT original_model, COUNT(*), COALESCE(SUM(cost_actual), 0)::float8
FROM request_events
WHERE org_id = $1 AND time >= now() - interval '7 days'
GROUP BY original_model
ORDER BY SUM(cost_actual) DESC
LIMIT 5"
)
.bind(org_id)
.fetch_all(ts_pool)
.await?;
// Top routing savings
let top_savings = sqlx::query_as::<_, (String, String, i64, f64)>(
"SELECT original_model, routed_model, COUNT(*), COALESCE(SUM(cost_saved), 0)::float8
FROM request_events
WHERE org_id = $1 AND time >= now() - interval '7 days' AND strategy != 'passthrough'
GROUP BY original_model, routed_model
ORDER BY SUM(cost_saved) DESC
LIMIT 5"
)
.bind(org_id)
.fetch_all(ts_pool)
.await?;
let savings_pct = if summary.1 > 0.0 { (summary.3 / summary.1) * 100.0 } else { 0.0 };
Ok(DigestData {
org_id,
org_name: org_name.to_string(),
total_requests: summary.0,
total_cost_original: summary.1,
total_cost_actual: summary.2,
total_cost_saved: summary.3,
savings_pct,
top_models: top_models.iter().map(|r| ModelUsage {
model: r.0.clone(),
request_count: r.1,
cost: r.2,
}).collect(),
top_savings: top_savings.iter().map(|r| RoutingSaving {
original_model: r.0.clone(),
routed_model: r.1.clone(),
requests_routed: r.2,
cost_saved: r.3,
}).collect(),
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn next_monday_from_wednesday() {
let wed = chrono::NaiveDate::from_ymd_opt(2026, 3, 4).unwrap() // Wednesday
.and_hms_opt(14, 0, 0).unwrap().and_utc();
let next = next_monday_9am(wed);
assert_eq!(next.weekday(), Weekday::Mon);
assert_eq!(next.date_naive(), chrono::NaiveDate::from_ymd_opt(2026, 3, 9).unwrap());
assert_eq!(next.time(), NaiveTime::from_hms_opt(9, 0, 0).unwrap());
}
#[test]
fn next_monday_from_monday_before_9am() {
let mon = chrono::NaiveDate::from_ymd_opt(2026, 3, 2).unwrap() // Monday
.and_hms_opt(8, 0, 0).unwrap().and_utc();
let next = next_monday_9am(mon);
assert_eq!(next.date_naive(), chrono::NaiveDate::from_ymd_opt(2026, 3, 2).unwrap());
}
#[test]
fn next_monday_from_monday_after_9am() {
let mon = chrono::NaiveDate::from_ymd_opt(2026, 3, 2).unwrap()
.and_hms_opt(10, 0, 0).unwrap().and_utc();
let next = next_monday_9am(mon);
assert_eq!(next.date_naive(), chrono::NaiveDate::from_ymd_opt(2026, 3, 9).unwrap());
}
}

View File

@@ -1,4 +1,78 @@
fn main() {
println!("dd0c/route worker — not yet implemented");
// TODO: Background worker for digests, aggregations (Epic 7)
use std::sync::Arc;
use tokio::sync::mpsc;
use tracing::{info, warn, error};
use chrono::{Utc, Duration};
use dd0c_route::{AppConfig, TelemetryEvent};
mod digest;
mod anomaly;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "dd0c_route=info".into()),
)
.json()
.init();
let config = Arc::new(AppConfig::from_env()?);
info!("Starting dd0c/route worker");
let ts_pool = sqlx::PgPool::connect(&config.timescale_url).await?;
let pg_pool = sqlx::PgPool::connect(&config.database_url).await?;
// Spawn periodic tasks
let ts1 = ts_pool.clone();
let pg1 = pg_pool.clone();
let config1 = config.clone();
// Hourly: check for cost anomalies
tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(3600));
loop {
interval.tick().await;
if let Err(e) = anomaly::check_anomalies(&ts1, &pg1).await {
error!(error = %e, "Anomaly check failed");
}
}
});
// Weekly: generate cost digest emails
let ts2 = ts_pool.clone();
let pg2 = pg_pool.clone();
tokio::spawn(async move {
// Run every Monday at 9 AM UTC
loop {
let now = Utc::now();
let next_monday = digest::next_monday_9am(now);
let sleep_duration = (next_monday - now).to_std().unwrap_or(std::time::Duration::from_secs(3600));
tokio::time::sleep(sleep_duration).await;
info!("Generating weekly digests");
if let Err(e) = digest::generate_all_digests(&ts2, &pg2).await {
error!(error = %e, "Digest generation failed");
}
}
});
// Cost table refresh: daily at midnight UTC
let pg3 = pg_pool.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(86400));
loop {
interval.tick().await;
info!("Refreshing cost tables");
// TODO: Fetch latest pricing from provider APIs
}
});
info!("Worker running — anomaly checks hourly, digests weekly");
// Keep the main task alive
tokio::signal::ctrl_c().await?;
info!("Worker shutting down");
Ok(())
}