feat: add find_path and get_file_signatures MCP tools
- find_path: BFS traversal to trace relationship chains between two files (max N hops) - get_file_signatures: lightweight context mode returning just function/type names - db.find_path(): bidirectional BFS with shortest-path tracking - db.get_all_files(): list all files with docs and staleness - db.get_file_signatures(): return functions list without full doc payload Inspired by Octocode's GraphRAG path-finding pattern. Addresses Mike/Dmitry feedback on usable roll-up summaries.
This commit is contained in:
56
db.py
56
db.py
@@ -203,6 +203,62 @@ class GraphDB:
|
|||||||
).fetchone()
|
).fetchone()
|
||||||
return dict(row) if row else None
|
return dict(row) if row else None
|
||||||
|
|
||||||
|
def find_path(self, source: str, target: str, max_depth: int = 4) -> list[list[str]]:
|
||||||
|
"""BFS to find all shortest paths between two files via import relationships."""
|
||||||
|
from collections import deque
|
||||||
|
|
||||||
|
# Build adjacency list (both directions — imports are directional but we trace both ways)
|
||||||
|
adj: dict[str, set[str]] = {}
|
||||||
|
rows = self.conn.execute("SELECT from_file, to_file FROM relationships").fetchall()
|
||||||
|
for r in rows:
|
||||||
|
adj.setdefault(r["from_file"], set()).add(r["to_file"])
|
||||||
|
adj.setdefault(r["to_file"], set()).add(r["from_file"])
|
||||||
|
|
||||||
|
if source not in adj or target not in adj:
|
||||||
|
return []
|
||||||
|
|
||||||
|
# BFS with path tracking
|
||||||
|
queue: deque[list[str]] = deque([[source]])
|
||||||
|
visited: set[str] = {source}
|
||||||
|
found_paths: list[list[str]] = []
|
||||||
|
found_depth: int | None = None
|
||||||
|
|
||||||
|
while queue:
|
||||||
|
path = queue.popleft()
|
||||||
|
if found_depth is not None and len(path) > found_depth:
|
||||||
|
break
|
||||||
|
if len(path) > max_depth + 1:
|
||||||
|
break
|
||||||
|
|
||||||
|
node = path[-1]
|
||||||
|
if node == target:
|
||||||
|
found_paths.append(path)
|
||||||
|
found_depth = len(path)
|
||||||
|
continue
|
||||||
|
|
||||||
|
for neighbor in adj.get(node, []):
|
||||||
|
if neighbor not in visited or (found_depth and len(path) < found_depth):
|
||||||
|
new_path = path + [neighbor]
|
||||||
|
queue.append(new_path)
|
||||||
|
if found_depth is None:
|
||||||
|
visited.add(neighbor)
|
||||||
|
|
||||||
|
return found_paths
|
||||||
|
|
||||||
|
def get_all_files(self) -> list[dict]:
|
||||||
|
"""Get all files with their docs and staleness."""
|
||||||
|
rows = self.conn.execute(
|
||||||
|
"SELECT path, repo, language, documentation, staleness FROM files ORDER BY path"
|
||||||
|
).fetchall()
|
||||||
|
return [dict(r) for r in rows]
|
||||||
|
|
||||||
|
def get_file_signatures(self, path: str) -> dict | None:
|
||||||
|
"""Get file with just functions list (signatures) — lightweight context."""
|
||||||
|
row = self.conn.execute(
|
||||||
|
"SELECT path, language, functions, staleness FROM files WHERE path=?", (path,)
|
||||||
|
).fetchone()
|
||||||
|
return dict(row) if row else None
|
||||||
|
|
||||||
def search_docs(self, query: str, limit: int = 10) -> list[dict]:
|
def search_docs(self, query: str, limit: int = 10) -> list[dict]:
|
||||||
rows = self.conn.execute(
|
rows = self.conn.execute(
|
||||||
"""SELECT path, documentation, staleness FROM files
|
"""SELECT path, documentation, staleness FROM files
|
||||||
|
|||||||
@@ -143,6 +143,46 @@ def get_stale_docs() -> str:
|
|||||||
return "\n".join(lines)
|
return "\n".join(lines)
|
||||||
|
|
||||||
|
|
||||||
|
@mcp.tool()
|
||||||
|
def find_path(source: str, target: str, max_depth: int = 4) -> str:
|
||||||
|
"""Trace the relationship chain between two files. Shows how file A connects to file B through imports/dependencies. Useful for understanding impact radius and architectural coupling."""
|
||||||
|
db = _get_db()
|
||||||
|
paths = db.find_path(source, target, max_depth)
|
||||||
|
db.close()
|
||||||
|
if not paths:
|
||||||
|
return f"No connection found between {source} and {target} within {max_depth} hops."
|
||||||
|
lines = [f"Connection paths from {source} → {target} ({len(paths)} found):\n"]
|
||||||
|
for i, path in enumerate(paths[:5]): # Cap at 5 paths
|
||||||
|
chain = " → ".join(path)
|
||||||
|
lines.append(f" Path {i+1} ({len(path)-1} hops): {chain}")
|
||||||
|
if len(paths) > 5:
|
||||||
|
lines.append(f" ... and {len(paths) - 5} more paths")
|
||||||
|
return "\n".join(lines)
|
||||||
|
|
||||||
|
|
||||||
|
@mcp.tool()
|
||||||
|
def get_file_signatures(path: str) -> str:
|
||||||
|
"""Get just the function/type signatures for a file — lightweight context without full documentation. Useful when you need a quick map of what a file exports."""
|
||||||
|
db = _get_db()
|
||||||
|
f = db.get_file_signatures(path)
|
||||||
|
db.close()
|
||||||
|
if not f:
|
||||||
|
return f"File not found: {path}"
|
||||||
|
import json
|
||||||
|
try:
|
||||||
|
funcs = json.loads(f["functions"]) if f["functions"] else []
|
||||||
|
except (json.JSONDecodeError, TypeError):
|
||||||
|
funcs = []
|
||||||
|
staleness = " [STALE]" if f["staleness"] == "stale" else ""
|
||||||
|
lines = [f"Signatures for {path}{staleness} ({f['language']}):\n"]
|
||||||
|
if funcs:
|
||||||
|
for fn in funcs:
|
||||||
|
lines.append(f" • {fn}")
|
||||||
|
else:
|
||||||
|
lines.append(" (no functions extracted)")
|
||||||
|
return "\n".join(lines)
|
||||||
|
|
||||||
|
|
||||||
@mcp.tool()
|
@mcp.tool()
|
||||||
def get_graph_stats() -> str:
|
def get_graph_stats() -> str:
|
||||||
"""Get overall knowledge graph statistics — file count, relationship count, staleness."""
|
"""Get overall knowledge graph statistics — file count, relationship count, staleness."""
|
||||||
|
|||||||
Reference in New Issue
Block a user