diff --git a/products/01-llm-cost-router/Cargo.toml b/products/01-llm-cost-router/Cargo.toml index d28e3ce..40b8066 100644 --- a/products/01-llm-cost-router/Cargo.toml +++ b/products/01-llm-cost-router/Cargo.toml @@ -68,6 +68,10 @@ serde_yaml = "0.9" chrono = { version = "0.4", features = ["serde"] } thiserror = "1" anyhow = "1" +getrandom = "0.2" +regex = "1" +async-trait = "0.1" +rand = "0.8" [dev-dependencies] # Testing diff --git a/products/01-llm-cost-router/src/api/handler.rs b/products/01-llm-cost-router/src/api/handler.rs new file mode 100644 index 0000000..de312af --- /dev/null +++ b/products/01-llm-cost-router/src/api/handler.rs @@ -0,0 +1,593 @@ +use axum::{ + extract::{Path, Query, State}, + http::{HeaderMap, StatusCode}, + response::IntoResponse, + routing::{get, post, put, delete}, + Json, Router, +}; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; +use uuid::Uuid; + +use crate::auth::{AuthContext, AuthError, AuthProvider, Role}; +use crate::config::AppConfig; + +pub struct ApiState { + pub auth: Arc, + pub pg_pool: sqlx::PgPool, + pub ts_pool: sqlx::PgPool, + pub config: Arc, +} + +pub fn create_api_router(state: Arc) -> Router { + Router::new() + // Dashboard analytics + .route("/api/v1/analytics/summary", get(get_analytics_summary)) + .route("/api/v1/analytics/timeseries", get(get_analytics_timeseries)) + .route("/api/v1/analytics/models", get(get_model_breakdown)) + // Routing rules + .route("/api/v1/rules", get(list_rules).post(create_rule)) + .route("/api/v1/rules/:id", put(update_rule).delete(delete_rule)) + // API keys + .route("/api/v1/keys", get(list_keys).post(create_key)) + .route("/api/v1/keys/:id", delete(revoke_key)) + // Provider configs + .route("/api/v1/providers", get(list_providers).post(upsert_provider)) + // Org settings + .route("/api/v1/org", get(get_org)) + // Health + .route("/health", get(health)) + .with_state(state) +} + +async fn health() -> &'static str { "ok" } + +// --- Analytics Endpoints --- + +#[derive(Deserialize)] +pub struct TimeRange { + pub from: Option, + pub to: Option, + pub interval: Option, // "hour" or "day" +} + +#[derive(Serialize)] +pub struct AnalyticsSummary { + pub total_requests: i64, + pub total_cost_original: f64, + pub total_cost_actual: f64, + pub total_cost_saved: f64, + pub savings_pct: f64, + pub avg_latency_ms: i32, + pub p99_latency_ms: i32, + pub routing_decisions: RoutingBreakdown, +} + +#[derive(Serialize)] +pub struct RoutingBreakdown { + pub passthrough: i64, + pub cheapest: i64, + pub cascading: i64, +} + +async fn get_analytics_summary( + State(state): State>, + headers: HeaderMap, + Query(range): Query, +) -> Result, ApiError> { + let auth = state.auth.authenticate(&headers).await?; + let from = range.from.unwrap_or_else(|| "now() - interval '7 days'".to_string()); + + let row = sqlx::query_as::<_, (i64, f64, f64, f64, i32, i32, i64, i64, i64)>( + "SELECT + COUNT(*), + COALESCE(SUM(cost_original), 0)::float8, + COALESCE(SUM(cost_actual), 0)::float8, + COALESCE(SUM(cost_saved), 0)::float8, + COALESCE(AVG(latency_ms), 0)::int, + COALESCE(PERCENTILE_CONT(0.99) WITHIN GROUP (ORDER BY latency_ms), 0)::int, + COUNT(*) FILTER (WHERE strategy = 'passthrough'), + COUNT(*) FILTER (WHERE strategy = 'cheapest'), + COUNT(*) FILTER (WHERE strategy = 'cascading') + FROM request_events + WHERE org_id = $1 AND time >= now() - interval '7 days'" + ) + .bind(auth.org_id.parse::().unwrap_or_default()) + .fetch_one(&state.ts_pool) + .await + .map_err(|e| ApiError::Internal(e.to_string()))?; + + let savings_pct = if row.1 > 0.0 { (row.3 / row.1) * 100.0 } else { 0.0 }; + + Ok(Json(AnalyticsSummary { + total_requests: row.0, + total_cost_original: row.1, + total_cost_actual: row.2, + total_cost_saved: row.3, + savings_pct, + avg_latency_ms: row.4, + p99_latency_ms: row.5, + routing_decisions: RoutingBreakdown { + passthrough: row.6, + cheapest: row.7, + cascading: row.8, + }, + })) +} + +#[derive(Serialize)] +pub struct TimeseriesPoint { + pub bucket: String, + pub request_count: i64, + pub cost_saved: f64, + pub avg_latency_ms: i32, +} + +async fn get_analytics_timeseries( + State(state): State>, + headers: HeaderMap, + Query(range): Query, +) -> Result>, ApiError> { + let auth = state.auth.authenticate(&headers).await?; + let interval = range.interval.unwrap_or_else(|| "hour".to_string()); + + let view = if interval == "day" { "request_events_daily" } else { "request_events_hourly" }; + + let rows = sqlx::query_as::<_, (chrono::DateTime, i64, f64, i32)>( + &format!( + "SELECT bucket, request_count, COALESCE(total_cost_saved, 0)::float8, COALESCE(avg_latency_ms, 0)::int + FROM {} + WHERE org_id = $1 AND bucket >= now() - interval '7 days' + ORDER BY bucket", + view + ) + ) + .bind(auth.org_id.parse::().unwrap_or_default()) + .fetch_all(&state.ts_pool) + .await + .map_err(|e| ApiError::Internal(e.to_string()))?; + + Ok(Json(rows.iter().map(|r| TimeseriesPoint { + bucket: r.0.to_rfc3339(), + request_count: r.1, + cost_saved: r.2, + avg_latency_ms: r.3, + }).collect())) +} + +#[derive(Serialize)] +pub struct ModelBreakdown { + pub model: String, + pub request_count: i64, + pub total_tokens: i64, + pub total_cost: f64, +} + +async fn get_model_breakdown( + State(state): State>, + headers: HeaderMap, +) -> Result>, ApiError> { + let auth = state.auth.authenticate(&headers).await?; + + let rows = sqlx::query_as::<_, (String, i64, i64, f64)>( + "SELECT original_model, COUNT(*), SUM(prompt_tokens + completion_tokens), COALESCE(SUM(cost_actual), 0)::float8 + FROM request_events + WHERE org_id = $1 AND time >= now() - interval '7 days' + GROUP BY original_model + ORDER BY COUNT(*) DESC" + ) + .bind(auth.org_id.parse::().unwrap_or_default()) + .fetch_all(&state.ts_pool) + .await + .map_err(|e| ApiError::Internal(e.to_string()))?; + + Ok(Json(rows.iter().map(|r| ModelBreakdown { + model: r.0.clone(), + request_count: r.1, + total_tokens: r.2, + total_cost: r.3, + }).collect())) +} + +// --- Routing Rules CRUD --- + +#[derive(Serialize, Deserialize)] +pub struct RoutingRuleDto { + pub id: Option, + pub priority: i32, + pub name: String, + pub match_model: Option, + pub match_feature: Option, + pub match_team: Option, + pub match_complexity: Option, + pub strategy: String, + pub target_model: Option, + pub target_provider: Option, + pub fallback_models: Option>, + pub enabled: bool, +} + +async fn list_rules( + State(state): State>, + headers: HeaderMap, +) -> Result>, ApiError> { + let auth = state.auth.authenticate(&headers).await?; + + let rows = sqlx::query_as::<_, (Uuid, i32, String, Option, Option, Option, Option, String, Option, Option, Option>, bool)>( + "SELECT id, priority, name, match_model, match_feature, match_team, match_complexity, strategy, target_model, target_provider, fallback_models, enabled + FROM routing_rules WHERE org_id = $1 ORDER BY priority" + ) + .bind(auth.org_id.parse::().unwrap_or_default()) + .fetch_all(&state.pg_pool) + .await + .map_err(|e| ApiError::Internal(e.to_string()))?; + + Ok(Json(rows.iter().map(|r| RoutingRuleDto { + id: Some(r.0), + priority: r.1, + name: r.2.clone(), + match_model: r.3.clone(), + match_feature: r.4.clone(), + match_team: r.5.clone(), + match_complexity: r.6.clone(), + strategy: r.7.clone(), + target_model: r.8.clone(), + target_provider: r.9.clone(), + fallback_models: r.10.clone(), + enabled: r.11, + }).collect())) +} + +async fn create_rule( + State(state): State>, + headers: HeaderMap, + Json(rule): Json, +) -> Result<(StatusCode, Json), ApiError> { + let auth = state.auth.authenticate(&headers).await?; + require_role(&auth, Role::Owner)?; + + let id = Uuid::new_v4(); + sqlx::query( + "INSERT INTO routing_rules (id, org_id, priority, name, match_model, match_feature, match_team, match_complexity, strategy, target_model, target_provider, fallback_models, enabled) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)" + ) + .bind(id) + .bind(auth.org_id.parse::().unwrap_or_default()) + .bind(rule.priority) + .bind(&rule.name) + .bind(&rule.match_model) + .bind(&rule.match_feature) + .bind(&rule.match_team) + .bind(&rule.match_complexity) + .bind(&rule.strategy) + .bind(&rule.target_model) + .bind(&rule.target_provider) + .bind(&rule.fallback_models) + .bind(rule.enabled) + .execute(&state.pg_pool) + .await + .map_err(|e| ApiError::Internal(e.to_string()))?; + + let mut created = rule; + created.id = Some(id); + Ok((StatusCode::CREATED, Json(created))) +} + +async fn update_rule( + State(state): State>, + headers: HeaderMap, + Path(id): Path, + Json(rule): Json, +) -> Result { + let auth = state.auth.authenticate(&headers).await?; + require_role(&auth, Role::Owner)?; + + let result = sqlx::query( + "UPDATE routing_rules SET priority=$1, name=$2, match_model=$3, match_feature=$4, match_team=$5, match_complexity=$6, strategy=$7, target_model=$8, target_provider=$9, fallback_models=$10, enabled=$11 + WHERE id=$12 AND org_id=$13" + ) + .bind(rule.priority) + .bind(&rule.name) + .bind(&rule.match_model) + .bind(&rule.match_feature) + .bind(&rule.match_team) + .bind(&rule.match_complexity) + .bind(&rule.strategy) + .bind(&rule.target_model) + .bind(&rule.target_provider) + .bind(&rule.fallback_models) + .bind(rule.enabled) + .bind(id) + .bind(auth.org_id.parse::().unwrap_or_default()) + .execute(&state.pg_pool) + .await + .map_err(|e| ApiError::Internal(e.to_string()))?; + + if result.rows_affected() == 0 { + return Err(ApiError::NotFound); + } + Ok(StatusCode::NO_CONTENT) +} + +async fn delete_rule( + State(state): State>, + headers: HeaderMap, + Path(id): Path, +) -> Result { + let auth = state.auth.authenticate(&headers).await?; + require_role(&auth, Role::Owner)?; + + let result = sqlx::query("DELETE FROM routing_rules WHERE id=$1 AND org_id=$2") + .bind(id) + .bind(auth.org_id.parse::().unwrap_or_default()) + .execute(&state.pg_pool) + .await + .map_err(|e| ApiError::Internal(e.to_string()))?; + + if result.rows_affected() == 0 { + return Err(ApiError::NotFound); + } + Ok(StatusCode::NO_CONTENT) +} + +// --- API Keys --- + +#[derive(Serialize)] +pub struct ApiKeyDto { + pub id: Uuid, + pub name: String, + pub key_prefix: String, + pub scopes: Vec, + pub last_used_at: Option>, + pub created_at: chrono::DateTime, +} + +#[derive(Serialize)] +pub struct ApiKeyCreated { + pub id: Uuid, + pub key: String, // Only returned on creation + pub name: String, +} + +#[derive(Deserialize)] +pub struct CreateKeyRequest { + pub name: String, + pub scopes: Option>, +} + +async fn list_keys( + State(state): State>, + headers: HeaderMap, +) -> Result>, ApiError> { + let auth = state.auth.authenticate(&headers).await?; + + let rows = sqlx::query_as::<_, (Uuid, String, String, Vec, Option>, chrono::DateTime)>( + "SELECT id, name, key_prefix, scopes, last_used_at, created_at + FROM api_keys WHERE org_id = $1 AND revoked_at IS NULL ORDER BY created_at DESC" + ) + .bind(auth.org_id.parse::().unwrap_or_default()) + .fetch_all(&state.pg_pool) + .await + .map_err(|e| ApiError::Internal(e.to_string()))?; + + Ok(Json(rows.iter().map(|r| ApiKeyDto { + id: r.0, + name: r.1.clone(), + key_prefix: r.2.clone(), + scopes: r.3.clone(), + last_used_at: r.4, + created_at: r.5, + }).collect())) +} + +async fn create_key( + State(state): State>, + headers: HeaderMap, + Json(req): Json, +) -> Result<(StatusCode, Json), ApiError> { + let auth = state.auth.authenticate(&headers).await?; + require_role(&auth, Role::Owner)?; + + // Generate key: dd0c_ + 32 random chars + let raw_key = format!("dd0c_{}", generate_random_key(32)); + let prefix = &raw_key[..8]; + let hash = bcrypt::hash(&raw_key, 10).map_err(|e| ApiError::Internal(e.to_string()))?; + + let id = Uuid::new_v4(); + let scopes = req.scopes.unwrap_or_else(|| vec!["proxy".to_string()]); + + sqlx::query( + "INSERT INTO api_keys (id, org_id, key_prefix, key_hash, name, scopes) VALUES ($1, $2, $3, $4, $5, $6)" + ) + .bind(id) + .bind(auth.org_id.parse::().unwrap_or_default()) + .bind(prefix) + .bind(&hash) + .bind(&req.name) + .bind(&scopes) + .execute(&state.pg_pool) + .await + .map_err(|e| ApiError::Internal(e.to_string()))?; + + Ok((StatusCode::CREATED, Json(ApiKeyCreated { + id, + key: raw_key, + name: req.name, + }))) +} + +async fn revoke_key( + State(state): State>, + headers: HeaderMap, + Path(id): Path, +) -> Result { + let auth = state.auth.authenticate(&headers).await?; + require_role(&auth, Role::Owner)?; + + let result = sqlx::query( + "UPDATE api_keys SET revoked_at = now() WHERE id = $1 AND org_id = $2 AND revoked_at IS NULL" + ) + .bind(id) + .bind(auth.org_id.parse::().unwrap_or_default()) + .execute(&state.pg_pool) + .await + .map_err(|e| ApiError::Internal(e.to_string()))?; + + if result.rows_affected() == 0 { + return Err(ApiError::NotFound); + } + Ok(StatusCode::NO_CONTENT) +} + +// --- Providers --- + +#[derive(Serialize, Deserialize)] +pub struct ProviderDto { + pub provider: String, + pub base_url: Option, + pub is_default: bool, + pub has_key: bool, +} + +async fn list_providers( + State(state): State>, + headers: HeaderMap, +) -> Result>, ApiError> { + let auth = state.auth.authenticate(&headers).await?; + + let rows = sqlx::query_as::<_, (String, Option, bool)>( + "SELECT provider, base_url, is_default FROM provider_configs WHERE org_id = $1" + ) + .bind(auth.org_id.parse::().unwrap_or_default()) + .fetch_all(&state.pg_pool) + .await + .map_err(|e| ApiError::Internal(e.to_string()))?; + + Ok(Json(rows.iter().map(|r| ProviderDto { + provider: r.0.clone(), + base_url: r.1.clone(), + is_default: r.2, + has_key: true, + }).collect())) +} + +#[derive(Deserialize)] +pub struct UpsertProviderRequest { + pub provider: String, + pub api_key: String, + pub base_url: Option, + pub is_default: bool, +} + +async fn upsert_provider( + State(state): State>, + headers: HeaderMap, + Json(req): Json, +) -> Result { + let auth = state.auth.authenticate(&headers).await?; + require_role(&auth, Role::Owner)?; + + // TODO: Encrypt API key with AES-256-GCM before storing + let encrypted = req.api_key.as_bytes().to_vec(); + + sqlx::query( + "INSERT INTO provider_configs (id, org_id, provider, encrypted_api_key, base_url, is_default) + VALUES (gen_random_uuid(), $1, $2, $3, $4, $5) + ON CONFLICT (org_id, provider) DO UPDATE SET encrypted_api_key = $3, base_url = $4, is_default = $5" + ) + .bind(auth.org_id.parse::().unwrap_or_default()) + .bind(&req.provider) + .bind(&encrypted) + .bind(&req.base_url) + .bind(req.is_default) + .execute(&state.pg_pool) + .await + .map_err(|e| ApiError::Internal(e.to_string()))?; + + Ok(StatusCode::OK) +} + +// --- Org --- + +#[derive(Serialize)] +pub struct OrgDto { + pub id: Uuid, + pub name: String, + pub slug: String, + pub tier: String, +} + +async fn get_org( + State(state): State>, + headers: HeaderMap, +) -> Result, ApiError> { + let auth = state.auth.authenticate(&headers).await?; + + let row = sqlx::query_as::<_, (Uuid, String, String, String)>( + "SELECT id, name, slug, tier FROM organizations WHERE id = $1" + ) + .bind(auth.org_id.parse::().unwrap_or_default()) + .fetch_optional(&state.pg_pool) + .await + .map_err(|e| ApiError::Internal(e.to_string()))? + .ok_or(ApiError::NotFound)?; + + Ok(Json(OrgDto { + id: row.0, + name: row.1, + slug: row.2, + tier: row.3, + })) +} + +// --- Helpers --- + +fn require_role(auth: &AuthContext, required: Role) -> Result<(), ApiError> { + if auth.role != required && auth.role != Role::Owner { + return Err(ApiError::Forbidden); + } + Ok(()) +} + +fn generate_random_key(len: usize) -> String { + use std::fmt::Write; + let mut key = String::with_capacity(len); + for _ in 0..len { + let byte: u8 = rand_byte(); + let _ = write!(key, "{:x}", byte % 16); + } + key +} + +fn rand_byte() -> u8 { + // Simple random byte using getrandom + let mut buf = [0u8; 1]; + getrandom::getrandom(&mut buf).unwrap_or_default(); + buf[0] +} + +// --- Error types --- + +#[derive(Debug, thiserror::Error)] +pub enum ApiError { + #[error("Authentication failed")] + AuthError(#[from] AuthError), + #[error("Forbidden")] + Forbidden, + #[error("Not found")] + NotFound, + #[error("Internal error: {0}")] + Internal(String), +} + +impl IntoResponse for ApiError { + fn into_response(self) -> axum::response::Response { + let (status, msg) = match &self { + ApiError::AuthError(_) => (StatusCode::UNAUTHORIZED, self.to_string()), + ApiError::Forbidden => (StatusCode::FORBIDDEN, self.to_string()), + ApiError::NotFound => (StatusCode::NOT_FOUND, self.to_string()), + ApiError::Internal(_) => (StatusCode::INTERNAL_SERVER_ERROR, "Internal error".to_string()), + }; + (status, serde_json::json!({"error": msg}).to_string()).into_response() + } +} diff --git a/products/01-llm-cost-router/src/api/main.rs b/products/01-llm-cost-router/src/api/main.rs index d0a64e4..7a5297c 100644 --- a/products/01-llm-cost-router/src/api/main.rs +++ b/products/01-llm-cost-router/src/api/main.rs @@ -1,4 +1,44 @@ -fn main() { - println!("dd0c/route API server — not yet implemented"); - // TODO: Dashboard API (Epic 4) +use std::sync::Arc; +use tracing::info; + +use dd0c_route::{AppConfig, LocalAuthProvider}; + +mod handler; +use handler::{create_api_router, ApiState}; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt() + .with_env_filter( + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| "dd0c_route=info,tower_http=info".into()), + ) + .json() + .init(); + + let config = Arc::new(AppConfig::from_env()?); + info!(port = config.api_port, "Starting dd0c/route Dashboard API"); + + let pg_pool = sqlx::PgPool::connect(&config.database_url).await?; + let ts_pool = sqlx::PgPool::connect(&config.timescale_url).await?; + let redis_cfg = deadpool_redis::Config::from_url(&config.redis_url); + let redis_pool = redis_cfg.create_pool(Some(deadpool_redis::Runtime::Tokio1))?; + + let state = Arc::new(ApiState { + auth: Arc::new(LocalAuthProvider::new( + pg_pool.clone(), + config.jwt_secret.clone(), + redis_pool, + )), + pg_pool, + ts_pool, + config: config.clone(), + }); + + let app = create_api_router(state); + let listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{}", config.api_port)).await?; + info!(port = config.api_port, "Dashboard API listening"); + axum::serve(listener, app).await?; + + Ok(()) } diff --git a/products/01-llm-cost-router/src/api/mod.rs b/products/01-llm-cost-router/src/api/mod.rs new file mode 100644 index 0000000..fe00e34 --- /dev/null +++ b/products/01-llm-cost-router/src/api/mod.rs @@ -0,0 +1,3 @@ +pub mod handler; + +pub use handler::{create_api_router, ApiState, ApiError}; diff --git a/products/01-llm-cost-router/src/lib.rs b/products/01-llm-cost-router/src/lib.rs index b8d5bcf..7f4025e 100644 --- a/products/01-llm-cost-router/src/lib.rs +++ b/products/01-llm-cost-router/src/lib.rs @@ -1,8 +1,8 @@ -mod auth; -mod config; -mod data; -mod proxy; -mod router; +pub mod auth; +pub mod config; +pub mod data; +pub mod proxy; +pub mod router; pub use auth::{AuthProvider, AuthContext, AuthError, LocalAuthProvider, Role}; pub use config::AppConfig; diff --git a/products/01-llm-cost-router/src/proxy/handler.rs b/products/01-llm-cost-router/src/proxy/handler.rs index e144865..9842e3e 100644 --- a/products/01-llm-cost-router/src/proxy/handler.rs +++ b/products/01-llm-cost-router/src/proxy/handler.rs @@ -11,7 +11,7 @@ use std::sync::Arc; use tokio::sync::mpsc; use tracing::{info, warn}; -mod middleware; +use super::middleware; use crate::auth::AuthProvider; use crate::config::AppConfig; diff --git a/products/01-llm-cost-router/src/proxy/mod.rs b/products/01-llm-cost-router/src/proxy/mod.rs index f516748..a95466c 100644 --- a/products/01-llm-cost-router/src/proxy/mod.rs +++ b/products/01-llm-cost-router/src/proxy/mod.rs @@ -1,4 +1,4 @@ -mod handler; -mod middleware; +pub mod handler; +pub mod middleware; pub use handler::{create_router, ProxyState, ProxyError};