- parser.py renamed to go_parser.py (avoids Python builtin conflict) - docgen.py was missing from flat structure - Added pyproject.toml for uv - Updated .mcp.json to use uv run - Updated README for uv workflow
150 lines
5.3 KiB
Python
150 lines
5.3 KiB
Python
"""Main ingestion script. Clone repo, parse, generate docs, load SQLite."""
|
|
|
|
import os
|
|
import subprocess
|
|
import time
|
|
import json
|
|
from pathlib import Path
|
|
|
|
from go_parser import parse_go_file, filter_imports, get_repo_module, resolve_import_to_file
|
|
from docgen import generate_file_doc, generate_repo_doc, generate_docs_batch
|
|
from db import GraphDB
|
|
|
|
TARGET_REPO = os.environ.get("TARGET_REPO", "https://github.com/labstack/echo.git")
|
|
REPO_DIR = os.environ.get("REPO_DIR", os.path.join(os.path.dirname(__file__), "repos", "target"))
|
|
|
|
|
|
def clone_repo():
|
|
if Path(REPO_DIR).exists() and (Path(REPO_DIR) / ".git").exists():
|
|
print(f"Repo already cloned at {REPO_DIR}, pulling latest...")
|
|
subprocess.run(["git", "-C", REPO_DIR, "pull"], check=True)
|
|
else:
|
|
print(f"Cloning {TARGET_REPO}...")
|
|
Path(REPO_DIR).parent.mkdir(parents=True, exist_ok=True)
|
|
subprocess.run(["git", "clone", "--depth=1", TARGET_REPO, REPO_DIR], check=True)
|
|
|
|
result = subprocess.run(
|
|
["git", "-C", REPO_DIR, "rev-parse", "HEAD"],
|
|
capture_output=True, text=True, check=True,
|
|
)
|
|
return result.stdout.strip()
|
|
|
|
|
|
def discover_go_files() -> dict[str, str]:
|
|
files = {}
|
|
repo = Path(REPO_DIR)
|
|
for gofile in repo.rglob("*.go"):
|
|
rel = str(gofile.relative_to(repo))
|
|
if "vendor/" in rel or "_test.go" in rel:
|
|
continue
|
|
try:
|
|
files[rel] = gofile.read_text(errors="replace")
|
|
except Exception as e:
|
|
print(f" Skipping {rel}: {e}")
|
|
return files
|
|
|
|
|
|
def run():
|
|
start = time.time()
|
|
print("=" * 60)
|
|
print("Developer Intelligence POC — Ingestion")
|
|
print("=" * 60)
|
|
|
|
# Step 1: Clone
|
|
print("\n[1/6] Cloning repository...")
|
|
commit = clone_repo()
|
|
repo_name = TARGET_REPO.rstrip("/").split("/")[-1].replace(".git", "")
|
|
print(f" Repo: {repo_name}, commit: {commit[:8]}")
|
|
|
|
# Step 2: Discover files
|
|
print("\n[2/6] Discovering Go files...")
|
|
go_files = discover_go_files()
|
|
print(f" Found {len(go_files)} Go files (excluding tests and vendor)")
|
|
|
|
# Step 3: Parse AST
|
|
print("\n[3/6] Parsing AST (tree-sitter)...")
|
|
repo_module = get_repo_module(REPO_DIR)
|
|
print(f" Module: {repo_module}")
|
|
|
|
parsed = {}
|
|
all_imports = {}
|
|
for rel_path, content in go_files.items():
|
|
info = parse_go_file(rel_path, content, repo_module)
|
|
parsed[rel_path] = info
|
|
filtered = filter_imports(info.imports, repo_module)
|
|
all_imports[rel_path] = filtered
|
|
|
|
total_imports = sum(len(v) for v in all_imports.values())
|
|
first_party = sum(1 for v in all_imports.values() for i in v if i.startswith(repo_module))
|
|
print(f" Parsed {len(parsed)} files, {total_imports} filtered imports ({first_party} first-party)")
|
|
|
|
# Step 4: Generate file docs
|
|
print("\n[4/6] Generating file documentation (Ollama)...")
|
|
file_items = [(path, content) for path, content in go_files.items()]
|
|
file_docs = generate_docs_batch(file_items, generate_file_doc)
|
|
file_doc_map = {file_items[i][0]: file_docs[i] for i in range(len(file_items))}
|
|
|
|
# Step 5: Load into SQLite
|
|
print("\n[5/6] Loading into database...")
|
|
db = GraphDB()
|
|
|
|
for rel_path, content in go_files.items():
|
|
info = parsed[rel_path]
|
|
doc = file_doc_map.get(rel_path, "")
|
|
db.create_file(
|
|
path=rel_path,
|
|
repo=repo_name,
|
|
language="go",
|
|
documentation=doc,
|
|
functions=info.functions,
|
|
commit=commit[:8],
|
|
)
|
|
|
|
edges_created = 0
|
|
for from_file, imports in all_imports.items():
|
|
for imp in imports:
|
|
if not imp.startswith(repo_module):
|
|
continue
|
|
target_dir = resolve_import_to_file(imp, repo_module, go_files)
|
|
if target_dir:
|
|
# Find actual files in that directory
|
|
for fpath in go_files:
|
|
fdir = str(Path(fpath).parent)
|
|
if fdir == target_dir or fdir.endswith("/" + target_dir):
|
|
db.create_relationship(from_file, fpath)
|
|
edges_created += 1
|
|
|
|
print(f" Created {len(go_files)} file nodes, {edges_created} import edges")
|
|
|
|
# Step 6: Generate repo-level doc
|
|
print("\n[6/6] Generating repo-level documentation...")
|
|
readme_path = Path(REPO_DIR) / "README.md"
|
|
readme = readme_path.read_text(errors="replace") if readme_path.exists() else ""
|
|
|
|
entry_candidates = ["echo.go", "router.go", "context.go", "group.go", "middleware.go"]
|
|
entry_files = []
|
|
for candidate in entry_candidates:
|
|
for path, content in go_files.items():
|
|
if path.endswith(candidate):
|
|
entry_files.append((path, content))
|
|
break
|
|
|
|
repo_doc = generate_repo_doc(readme, entry_files)
|
|
db.create_repo(name=repo_name, url=TARGET_REPO, language="go", documentation=repo_doc)
|
|
|
|
stats = db.get_stats()
|
|
elapsed = time.time() - start
|
|
print("\n" + "=" * 60)
|
|
print("Ingestion complete!")
|
|
print(f" Files: {stats['files']}")
|
|
print(f" Relationships: {stats['relationships']}")
|
|
print(f" Time: {elapsed:.1f}s ({elapsed/60:.1f}m)")
|
|
print(f" Database: {os.path.abspath(db.conn.execute('PRAGMA database_list').fetchone()[2])}")
|
|
print("=" * 60)
|
|
|
|
db.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
run()
|