Files
dev-intel-poc/db.py
Jarvis Prime 20253329e4 Add db migrations, update watch.sh for uv + idempotent runs
- Schema versioned with migrations table
- watch.sh runs db migrations on every pull
- watch.sh uses uv run/sync instead of pip
- Fixed parser.py reference in watch.sh
2026-03-04 04:32:17 +00:00

263 lines
10 KiB
Python

"""SQLite backend for Developer Intelligence POC."""
import sqlite3
import os
from datetime import datetime
DB_PATH = os.environ.get("DEVINTEL_DB", os.path.join(os.path.dirname(__file__), "devintel.db"))
def get_db() -> sqlite3.Connection:
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
return conn
SCHEMA_VERSION = 2
MIGRATIONS = {
1: """
CREATE TABLE IF NOT EXISTS repos (
name TEXT PRIMARY KEY,
url TEXT,
language TEXT,
documentation TEXT,
staleness TEXT DEFAULT 'fresh',
updated_at TEXT
);
CREATE TABLE IF NOT EXISTS files (
path TEXT PRIMARY KEY,
repo TEXT REFERENCES repos(name),
language TEXT,
documentation TEXT,
prev_documentation TEXT,
functions TEXT,
last_commit TEXT,
staleness TEXT DEFAULT 'fresh',
updated_at TEXT
);
CREATE TABLE IF NOT EXISTS relationships (
from_file TEXT REFERENCES files(path),
to_file TEXT REFERENCES files(path),
rel_type TEXT DEFAULT 'IMPORTS',
documentation TEXT,
staleness TEXT DEFAULT 'fresh',
updated_at TEXT,
PRIMARY KEY (from_file, to_file, rel_type)
);
CREATE INDEX IF NOT EXISTS idx_files_repo ON files(repo);
CREATE INDEX IF NOT EXISTS idx_files_staleness ON files(staleness);
CREATE INDEX IF NOT EXISTS idx_rels_staleness ON relationships(staleness);
CREATE INDEX IF NOT EXISTS idx_rels_to ON relationships(to_file);
""",
2: """
CREATE TABLE IF NOT EXISTS schema_version (
version INTEGER PRIMARY KEY
);
""",
}
def init_db():
"""Create or migrate the database schema."""
conn = get_db()
# Check current version
try:
row = conn.execute("SELECT MAX(version) AS v FROM schema_version").fetchone()
current = row["v"] or 0
except sqlite3.OperationalError:
current = 0
if current >= SCHEMA_VERSION:
conn.close()
return
# Run pending migrations
for v in range(current + 1, SCHEMA_VERSION + 1):
if v in MIGRATIONS:
print(f" [db] Running migration v{v}...")
conn.executescript(MIGRATIONS[v])
# Ensure schema_version table exists and record version
conn.execute("CREATE TABLE IF NOT EXISTS schema_version (version INTEGER PRIMARY KEY)")
conn.execute("INSERT OR REPLACE INTO schema_version (version) VALUES (?)", (SCHEMA_VERSION,))
conn.commit()
conn.close()
print(f" [db] Schema at v{SCHEMA_VERSION}")
class GraphDB:
def __init__(self):
init_db()
self.conn = get_db()
def create_repo(self, name: str, url: str, language: str, documentation: str):
self.conn.execute(
"""INSERT INTO repos (name, url, language, documentation, staleness, updated_at)
VALUES (?, ?, ?, ?, 'fresh', ?)
ON CONFLICT(name) DO UPDATE SET
url=excluded.url, language=excluded.language,
documentation=excluded.documentation, staleness='fresh',
updated_at=excluded.updated_at""",
(name, url, language, documentation, datetime.utcnow().isoformat()),
)
self.conn.commit()
def create_file(self, path: str, repo: str, language: str, documentation: str,
functions: list[str], commit: str = "initial"):
import json
self.conn.execute(
"""INSERT INTO files (path, repo, language, documentation, functions, last_commit, staleness, updated_at)
VALUES (?, ?, ?, ?, ?, ?, 'fresh', ?)
ON CONFLICT(path) DO UPDATE SET
repo=excluded.repo, language=excluded.language,
documentation=excluded.documentation, functions=excluded.functions,
last_commit=excluded.last_commit, staleness='fresh',
updated_at=excluded.updated_at""",
(path, repo, language, documentation, json.dumps(functions), commit,
datetime.utcnow().isoformat()),
)
self.conn.commit()
def create_relationship(self, from_file: str, to_file: str, rel_type: str = "IMPORTS",
documentation: str = ""):
self.conn.execute(
"""INSERT INTO relationships (from_file, to_file, rel_type, documentation, staleness, updated_at)
VALUES (?, ?, ?, ?, 'fresh', ?)
ON CONFLICT(from_file, to_file, rel_type) DO UPDATE SET
documentation=excluded.documentation, staleness=excluded.staleness,
updated_at=excluded.updated_at""",
(from_file, to_file, rel_type, documentation, datetime.utcnow().isoformat()),
)
self.conn.commit()
def mark_relationships_stale(self, file_path: str):
now = datetime.utcnow().isoformat()
self.conn.execute(
"UPDATE relationships SET staleness='stale', updated_at=? WHERE from_file=? OR to_file=?",
(now, file_path, file_path),
)
self.conn.commit()
def mark_repo_stale(self, repo: str):
self.conn.execute(
"UPDATE repos SET staleness='stale', updated_at=? WHERE name=?",
(datetime.utcnow().isoformat(), repo),
)
self.conn.commit()
def update_file_doc(self, path: str, documentation: str, commit: str):
self.conn.execute(
"""UPDATE files SET prev_documentation=documentation,
documentation=?, staleness='fresh', last_commit=?, updated_at=?
WHERE path=?""",
(documentation, commit, datetime.utcnow().isoformat(), path),
)
self.conn.commit()
def get_file(self, path: str) -> dict | None:
row = self.conn.execute("SELECT * FROM files WHERE path=?", (path,)).fetchone()
return dict(row) if row else None
def get_repo(self, name: str = None) -> dict | None:
if name:
row = self.conn.execute("SELECT * FROM repos WHERE name=?", (name,)).fetchone()
else:
row = self.conn.execute("SELECT * FROM repos LIMIT 1").fetchone()
return dict(row) if row else None
def get_dependents(self, path: str) -> list[dict]:
rows = self.conn.execute(
"""SELECT r.from_file, r.documentation AS rel_doc, r.staleness AS rel_staleness,
f.documentation AS file_doc
FROM relationships r
JOIN files f ON f.path = r.from_file
WHERE r.to_file = ?
ORDER BY r.from_file""",
(path,),
).fetchall()
return [dict(r) for r in rows]
def get_dependencies(self, path: str) -> list[dict]:
rows = self.conn.execute(
"""SELECT r.to_file, r.documentation AS rel_doc, r.staleness AS rel_staleness,
f.documentation AS file_doc
FROM relationships r
JOIN files f ON f.path = r.to_file
WHERE r.from_file = ?
ORDER BY r.to_file""",
(path,),
).fetchall()
return [dict(r) for r in rows]
def get_relationship(self, from_file: str, to_file: str) -> dict | None:
row = self.conn.execute(
"SELECT * FROM relationships WHERE from_file=? AND to_file=?",
(from_file, to_file),
).fetchone()
return dict(row) if row else None
def search_docs(self, query: str, limit: int = 10) -> list[dict]:
rows = self.conn.execute(
"""SELECT path, documentation, staleness FROM files
WHERE LOWER(documentation) LIKE LOWER(?)
ORDER BY path LIMIT ?""",
(f"%{query}%", limit),
).fetchall()
return [dict(r) for r in rows]
def get_stale_relationships(self) -> list[dict]:
rows = self.conn.execute(
"""SELECT r.from_file, r.to_file,
f1.documentation AS from_doc, f2.documentation AS to_doc
FROM relationships r
JOIN files f1 ON f1.path = r.from_file
JOIN files f2 ON f2.path = r.to_file
WHERE r.staleness = 'stale'"""
).fetchall()
return [dict(r) for r in rows]
def get_stale_repos(self) -> list[dict]:
rows = self.conn.execute(
"SELECT name, url FROM repos WHERE staleness='stale'"
).fetchall()
return [dict(r) for r in rows]
def get_repo_files_docs(self, repo: str) -> list[tuple[str, str]]:
rows = self.conn.execute(
"SELECT path, documentation FROM files WHERE repo=? ORDER BY path",
(repo,),
).fetchall()
return [(r["path"], r["documentation"]) for r in rows]
def update_relationship_doc(self, from_file: str, to_file: str, documentation: str):
self.conn.execute(
"""UPDATE relationships SET documentation=?, staleness='fresh', updated_at=?
WHERE from_file=? AND to_file=?""",
(documentation, datetime.utcnow().isoformat(), from_file, to_file),
)
self.conn.commit()
def update_repo_doc(self, name: str, documentation: str):
self.conn.execute(
"UPDATE repos SET documentation=?, staleness='fresh', updated_at=? WHERE name=?",
(documentation, datetime.utcnow().isoformat(), name),
)
self.conn.commit()
def get_stats(self) -> dict:
files = self.conn.execute("SELECT count(*) AS c FROM files").fetchone()["c"]
rels = self.conn.execute("SELECT count(*) AS c FROM relationships").fetchone()["c"]
stale_f = self.conn.execute("SELECT count(*) AS c FROM files WHERE staleness='stale'").fetchone()["c"]
stale_r = self.conn.execute("SELECT count(*) AS c FROM relationships WHERE staleness='stale'").fetchone()["c"]
return {"files": files, "relationships": rels, "stale_files": stale_f, "stale_relationships": stale_r}
def close(self):
self.conn.close()