"""SQLite backend for Developer Intelligence POC.""" import sqlite3 import os from datetime import datetime DB_PATH = os.environ.get("DEVINTEL_DB", os.path.join(os.path.dirname(__file__), "devintel.db")) def get_db() -> sqlite3.Connection: conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA foreign_keys=ON") return conn def init_db(): """Create tables if they don't exist.""" conn = get_db() conn.executescript(""" CREATE TABLE IF NOT EXISTS repos ( name TEXT PRIMARY KEY, url TEXT, language TEXT, documentation TEXT, staleness TEXT DEFAULT 'fresh', updated_at TEXT ); CREATE TABLE IF NOT EXISTS files ( path TEXT PRIMARY KEY, repo TEXT REFERENCES repos(name), language TEXT, documentation TEXT, prev_documentation TEXT, functions TEXT, -- JSON array last_commit TEXT, staleness TEXT DEFAULT 'fresh', updated_at TEXT ); CREATE TABLE IF NOT EXISTS relationships ( from_file TEXT REFERENCES files(path), to_file TEXT REFERENCES files(path), rel_type TEXT DEFAULT 'IMPORTS', documentation TEXT, staleness TEXT DEFAULT 'fresh', updated_at TEXT, PRIMARY KEY (from_file, to_file, rel_type) ); CREATE INDEX IF NOT EXISTS idx_files_repo ON files(repo); CREATE INDEX IF NOT EXISTS idx_files_staleness ON files(staleness); CREATE INDEX IF NOT EXISTS idx_rels_staleness ON relationships(staleness); CREATE INDEX IF NOT EXISTS idx_rels_to ON relationships(to_file); """) conn.commit() conn.close() class GraphDB: def __init__(self): init_db() self.conn = get_db() def create_repo(self, name: str, url: str, language: str, documentation: str): self.conn.execute( """INSERT INTO repos (name, url, language, documentation, staleness, updated_at) VALUES (?, ?, ?, ?, 'fresh', ?) ON CONFLICT(name) DO UPDATE SET url=excluded.url, language=excluded.language, documentation=excluded.documentation, staleness='fresh', updated_at=excluded.updated_at""", (name, url, language, documentation, datetime.utcnow().isoformat()), ) self.conn.commit() def create_file(self, path: str, repo: str, language: str, documentation: str, functions: list[str], commit: str = "initial"): import json self.conn.execute( """INSERT INTO files (path, repo, language, documentation, functions, last_commit, staleness, updated_at) VALUES (?, ?, ?, ?, ?, ?, 'fresh', ?) ON CONFLICT(path) DO UPDATE SET repo=excluded.repo, language=excluded.language, documentation=excluded.documentation, functions=excluded.functions, last_commit=excluded.last_commit, staleness='fresh', updated_at=excluded.updated_at""", (path, repo, language, documentation, json.dumps(functions), commit, datetime.utcnow().isoformat()), ) self.conn.commit() def create_relationship(self, from_file: str, to_file: str, rel_type: str = "IMPORTS", documentation: str = ""): self.conn.execute( """INSERT INTO relationships (from_file, to_file, rel_type, documentation, staleness, updated_at) VALUES (?, ?, ?, ?, 'fresh', ?) ON CONFLICT(from_file, to_file, rel_type) DO UPDATE SET documentation=excluded.documentation, staleness=excluded.staleness, updated_at=excluded.updated_at""", (from_file, to_file, rel_type, documentation, datetime.utcnow().isoformat()), ) self.conn.commit() def mark_relationships_stale(self, file_path: str): now = datetime.utcnow().isoformat() self.conn.execute( "UPDATE relationships SET staleness='stale', updated_at=? WHERE from_file=? OR to_file=?", (now, file_path, file_path), ) self.conn.commit() def mark_repo_stale(self, repo: str): self.conn.execute( "UPDATE repos SET staleness='stale', updated_at=? WHERE name=?", (datetime.utcnow().isoformat(), repo), ) self.conn.commit() def update_file_doc(self, path: str, documentation: str, commit: str): self.conn.execute( """UPDATE files SET prev_documentation=documentation, documentation=?, staleness='fresh', last_commit=?, updated_at=? WHERE path=?""", (documentation, commit, datetime.utcnow().isoformat(), path), ) self.conn.commit() def get_file(self, path: str) -> dict | None: row = self.conn.execute("SELECT * FROM files WHERE path=?", (path,)).fetchone() return dict(row) if row else None def get_repo(self, name: str = None) -> dict | None: if name: row = self.conn.execute("SELECT * FROM repos WHERE name=?", (name,)).fetchone() else: row = self.conn.execute("SELECT * FROM repos LIMIT 1").fetchone() return dict(row) if row else None def get_dependents(self, path: str) -> list[dict]: rows = self.conn.execute( """SELECT r.from_file, r.documentation AS rel_doc, r.staleness AS rel_staleness, f.documentation AS file_doc FROM relationships r JOIN files f ON f.path = r.from_file WHERE r.to_file = ? ORDER BY r.from_file""", (path,), ).fetchall() return [dict(r) for r in rows] def get_dependencies(self, path: str) -> list[dict]: rows = self.conn.execute( """SELECT r.to_file, r.documentation AS rel_doc, r.staleness AS rel_staleness, f.documentation AS file_doc FROM relationships r JOIN files f ON f.path = r.to_file WHERE r.from_file = ? ORDER BY r.to_file""", (path,), ).fetchall() return [dict(r) for r in rows] def get_relationship(self, from_file: str, to_file: str) -> dict | None: row = self.conn.execute( "SELECT * FROM relationships WHERE from_file=? AND to_file=?", (from_file, to_file), ).fetchone() return dict(row) if row else None def search_docs(self, query: str, limit: int = 10) -> list[dict]: rows = self.conn.execute( """SELECT path, documentation, staleness FROM files WHERE LOWER(documentation) LIKE LOWER(?) ORDER BY path LIMIT ?""", (f"%{query}%", limit), ).fetchall() return [dict(r) for r in rows] def get_stale_relationships(self) -> list[dict]: rows = self.conn.execute( """SELECT r.from_file, r.to_file, f1.documentation AS from_doc, f2.documentation AS to_doc FROM relationships r JOIN files f1 ON f1.path = r.from_file JOIN files f2 ON f2.path = r.to_file WHERE r.staleness = 'stale'""" ).fetchall() return [dict(r) for r in rows] def get_stale_repos(self) -> list[dict]: rows = self.conn.execute( "SELECT name, url FROM repos WHERE staleness='stale'" ).fetchall() return [dict(r) for r in rows] def get_repo_files_docs(self, repo: str) -> list[tuple[str, str]]: rows = self.conn.execute( "SELECT path, documentation FROM files WHERE repo=? ORDER BY path", (repo,), ).fetchall() return [(r["path"], r["documentation"]) for r in rows] def update_relationship_doc(self, from_file: str, to_file: str, documentation: str): self.conn.execute( """UPDATE relationships SET documentation=?, staleness='fresh', updated_at=? WHERE from_file=? AND to_file=?""", (documentation, datetime.utcnow().isoformat(), from_file, to_file), ) self.conn.commit() def update_repo_doc(self, name: str, documentation: str): self.conn.execute( "UPDATE repos SET documentation=?, staleness='fresh', updated_at=? WHERE name=?", (documentation, datetime.utcnow().isoformat(), name), ) self.conn.commit() def get_stats(self) -> dict: files = self.conn.execute("SELECT count(*) AS c FROM files").fetchone()["c"] rels = self.conn.execute("SELECT count(*) AS c FROM relationships").fetchone()["c"] stale_f = self.conn.execute("SELECT count(*) AS c FROM files WHERE staleness='stale'").fetchone()["c"] stale_r = self.conn.execute("SELECT count(*) AS c FROM relationships WHERE staleness='stale'").fetchone()["c"] return {"files": files, "relationships": rels, "stale_files": stale_f, "stale_relationships": stale_r} def close(self): self.conn.close()