Initial commit: Handoff Pro MCP server for Kellow Construction

This commit is contained in:
brian
2026-05-29 14:38:57 -07:00
commit 3a62428751
28 changed files with 1860 additions and 0 deletions

4
.gitignore vendored Normal file
View File

@@ -0,0 +1,4 @@
__pycache__/
*.pyc
data/*.db
.env

8
Dockerfile Normal file
View File

@@ -0,0 +1,8 @@
FROM python:3.12-slim
WORKDIR /app
COPY lib/ lib/
COPY scripts/ scripts/
COPY server.py .
RUN python3 scripts/init_db.py
EXPOSE 3101
CMD ["python3", "server.py"]

53
README.md Normal file
View File

@@ -0,0 +1,53 @@
# Handoff Pro — Construction Project Management MCP Server
MCP server extending Tom (Kellow Construction AI agent on GW-2) with estimation, proposals, invoicing, and project management — synced with JobTread + QuickBooks Online.
## Architecture
```
Tom (Telegram) → mcporter → handoff-pro MCP (port 3101)
→ design-agent MCP (port 3100)
SQLite (local) ←→ JobTread API
←→ QuickBooks Online API
```
## Quick Start
```bash
# Run tests
cd handoff-pro && python3 -m pytest tests/
# Run server locally
python3 server.py
# Deploy to GW-2
chmod +x deploy.sh && ./deploy.sh
# Or run as Docker container
docker build -t handoff-pro .
docker run -d --name handoff-pro -p 3101:3101 \
-v /path/to/.jobtread-api.json:/app/data/.jobtread-api.json:ro \
handoff-pro
```
## Tools (34 total)
See `SKILL.md` for full tool reference and workflow patterns.
## Dependencies
- Python 3.10+ (stdlib only — no pip install needed)
- SQLite3 (bundled with Python)
- Network access to JobTread API and QBO token server
## Deployment Options
1. **In-container** (recommended): Deploy scripts directly into Tom's workspace via `deploy.sh`
2. **Standalone Docker**: Run as separate container on TrueNAS, register in mcporter
3. **Hybrid**: Scripts in workspace + server as sidecar container
## Credentials
- **JobTread**: `.jobtread-api.json` with `grantKey` and `organizationId`
- **QBO**: Token server at `http://192.168.86.11:18801/kellow-tokens.json`

107
SKILL.md Normal file
View File

@@ -0,0 +1,107 @@
---
name: handoff-pro
description: "Construction estimation, proposals, invoicing, and project management for Kellow Construction. Syncs with JobTread and QuickBooks Online. Use for any estimate, proposal, invoice, budget, or project tracking request."
---
# Handoff Pro — Construction Project Management
MCP server at `http://192.168.86.11:3101/sse` (registered in mcporter as `handoff-pro`).
## Quick Reference
Call tools via: `mcporter call handoff-pro.<tool> key=value`
### Estimation Workflow
1. User describes job → `generate_estimate description="kitchen remodel, 200sqft, new cabinets and countertops"`
2. Optionally enrich with design-agent: `mcporter call design-agent.estimate_cost ...` and `design-agent.get_permit_matrix ...`
3. Review estimate → `get_estimate estimate_id=<id>`
4. Generate proposal → `draft_proposal estimate_id=<id>`
5. Request signature → `request_signature estimate_id=<id> client_name="John Smith"`
### Invoice Workflow
1. Create invoice → `create_invoice estimate_id=<id>`
2. Sync to JobTread → `jt_push_invoice invoice_id=<id>`
3. Sync to QBO → `sync_invoice_qbo invoice_id=<id>`
4. Mark paid → `mark_invoice_paid invoice_id=<id>`
### JobTread Sync
- Pull all jobs: `jt_pull_jobs`
- List customers: `jt_list_customers`
- Push estimate: `jt_push_estimate estimate_id=<id>`
- Check sync: `jt_sync_status job_id=<id>`
### Daily Operations
- Log work: `create_daily_log job_id=<id> notes="framed walls" crew_names="Mike, Sarah" hours_worked=8`
- Analyze site: `analyze_site observations="water damage in corner, cracked foundation" job_id=<id>`
- Budget check: `get_budget_report job_id=<id>`
- Set milestone: `set_milestone job_id=<id> phase_name="Framing" start_date="2026-06-01"`
### Pricing Catalogs
- List: `list_catalogs`
- Create: `create_catalog name="Commercial" labor_rates={"framing":95,"electrical":110}`
## All Tools (30)
| Tool | Purpose |
|------|---------|
| `list_catalogs` | List pricing catalogs |
| `create_catalog` | Create pricing catalog |
| `get_catalog` | Get catalog by ID |
| `update_catalog` | Update catalog rates |
| `delete_catalog` | Delete catalog |
| `create_job` | Create local job |
| `list_jobs` | List jobs (optional status filter) |
| `get_job` | Get job + estimates + invoices |
| `update_job` | Update job fields |
| `create_estimate` | Create estimate with line items |
| `get_estimate` | Get estimate + line items |
| `generate_estimate` | AI: parse description → estimate |
| `draft_proposal` | Generate proposal from estimate |
| `create_invoice` | Create invoice from estimate |
| `list_invoices` | List invoices |
| `mark_invoice_paid` | Mark invoice paid |
| `sync_invoice_qbo` | Sync invoice to QuickBooks |
| `jt_list_customers` | List JobTread customers |
| `jt_pull_jobs` | Pull jobs from JobTread |
| `jt_push_estimate` | Push estimate to JobTread |
| `jt_push_invoice` | Push invoice to JobTread |
| `jt_sync_status` | Get sync status |
| `generate_scope` | Generate scope of work |
| `generate_material_list` | Generate shopping list |
| `create_daily_log` | Log daily work |
| `list_daily_logs` | List logs for job |
| `analyze_site` | Analyze site conditions |
| `get_budget_report` | Budget vs actual report |
| `set_milestone` | Set project milestone |
| `get_schedule` | Get project timeline |
| `request_signature` | Request e-signature |
| `record_signature` | Record signature |
| `log_activity` | Log client action |
| `get_activity` | Get activity history |
## Design-Agent Integration
For SB-area jobs, enrich estimates with:
- `mcporter call design-agent.estimate_cost` — local cost ranges
- `mcporter call design-agent.get_permit_matrix` — permit fees + timeline
- `mcporter call design-agent.lookup_parcel` — lot data affecting scope
## Conversation Patterns
**"I need an estimate for X"** → Ask for address + details, then `generate_estimate`. If SB area, also call design-agent.estimate_cost for validation.
**"Send a proposal"** → `draft_proposal` → format nicely → send to client. Ask if they want signature collection.
**"Invoice this job"** → `create_invoice` → ask about syncing to JobTread/QBO.
**"What's the budget looking like?"** → `get_budget_report` → present variance and flags.
**"Log today's work"** → Ask for crew, hours, notes, photos → `create_daily_log`.
**"Show me my jobs"** → `list_jobs` or `jt_pull_jobs` for fresh data from JobTread.
## Error Handling
- If JobTread sync fails: tool returns error, work continues locally. Retry later.
- If QBO sync fails: check token with `python3 skills/kellow-qbo/scripts/check-token.py`
- All data persists in local SQLite regardless of sync status.

66
deploy.sh Normal file
View File

@@ -0,0 +1,66 @@
#!/bin/bash
# Deploy Handoff Pro to Tom's workspace on GW-2
# Usage: ./deploy.sh [--dry-run]
set -e
CONTAINER="openclaw-repo-openclaw-gateway-2-1"
TARGET="/home/node/.openclaw/workspace/agents/tom/skills/handoff-pro"
SOURCE="$(cd "$(dirname "$0")" && pwd)"
DRY_RUN=""
if [ "$1" = "--dry-run" ]; then DRY_RUN="echo [DRY-RUN]"; fi
echo "=== Handoff Pro Deployment ==="
echo "Source: $SOURCE"
echo "Target: $CONTAINER:$TARGET"
echo ""
# Create target directory
$DRY_RUN docker exec $CONTAINER mkdir -p $TARGET/{scripts,lib,templates,data,tests}
# Copy core files
for f in SKILL.md server.py; do
$DRY_RUN docker cp "$SOURCE/$f" "$CONTAINER:$TARGET/$f"
done
# Copy lib
for f in "$SOURCE"/lib/*.py; do
$DRY_RUN docker cp "$f" "$CONTAINER:$TARGET/lib/$(basename $f)"
done
# Copy scripts
for f in "$SOURCE"/scripts/*.py; do
$DRY_RUN docker cp "$f" "$CONTAINER:$TARGET/scripts/$(basename $f)"
done
# Copy templates
for f in "$SOURCE"/templates/*; do
[ -f "$f" ] && $DRY_RUN docker cp "$f" "$CONTAINER:$TARGET/templates/$(basename $f)"
done
# Fix permissions (no world-writable)
$DRY_RUN docker exec $CONTAINER find $TARGET -type f -exec chmod 644 {} \;
$DRY_RUN docker exec $CONTAINER find $TARGET -type d -exec chmod 755 {} \;
$DRY_RUN docker exec $CONTAINER chown -R node:node $TARGET
# Initialize database
$DRY_RUN docker exec -u node $CONTAINER python3 $TARGET/scripts/init_db.py
# Add to mcporter config
$DRY_RUN docker exec -u node $CONTAINER sh -c "
CONFIG=/home/node/.openclaw/workspace/config/mcporter.json
python3 -c \"
import json
with open('\$CONFIG') as f: cfg = json.load(f)
cfg['mcpServers']['handoff-pro'] = {'url': 'http://127.0.0.1:3101/sse'}
with open('\$CONFIG','w') as f: json.dump(cfg, f, indent=4)
print('mcporter.json updated')
\"
"
echo ""
echo "=== Deployment complete ==="
echo "Next steps:"
echo " 1. Start the MCP server: docker exec -u node -d $CONTAINER python3 $TARGET/server.py"
echo " 2. Verify: docker exec $CONTAINER curl -s http://127.0.0.1:3101/health"
echo " 3. Test: mcporter call handoff-pro.list_catalogs"

0
lib/__init__.py Normal file
View File

156
lib/db.py Normal file
View File

@@ -0,0 +1,156 @@
"""SQLite database initialization and access for Handoff Pro."""
import json
import os
import sqlite3
import uuid
from datetime import datetime
DB_PATH = os.environ.get("HANDOFF_DB",
os.path.join(os.path.dirname(__file__), "..", "data", "handoff.db"))
SCHEMA = """
CREATE TABLE IF NOT EXISTS jobs (
id TEXT PRIMARY KEY,
user_id TEXT DEFAULT 'kellow',
jobtread_job_id TEXT UNIQUE,
client_name TEXT,
jobtread_customer_id TEXT,
address TEXT,
description TEXT,
status TEXT DEFAULT 'proposal',
created_at TEXT,
updated_at TEXT,
synced_to_jobtread_at TEXT
);
CREATE TABLE IF NOT EXISTS estimates (
id TEXT PRIMARY KEY,
job_id TEXT REFERENCES jobs(id),
jobtread_estimate_id TEXT UNIQUE,
labor_hours REAL DEFAULT 0,
labor_rate REAL DEFAULT 0,
materials_cost REAL DEFAULT 0,
markup_percent REAL DEFAULT 20,
total_cost REAL DEFAULT 0,
notes TEXT,
status TEXT DEFAULT 'draft',
created_at TEXT,
synced_to_jobtread_at TEXT
);
CREATE TABLE IF NOT EXISTS line_items (
id TEXT PRIMARY KEY,
estimate_id TEXT REFERENCES estimates(id),
description TEXT,
category TEXT,
quantity REAL DEFAULT 1,
unit TEXT DEFAULT 'ea',
unit_cost REAL DEFAULT 0,
total REAL DEFAULT 0
);
CREATE TABLE IF NOT EXISTS pricing_catalogs (
id TEXT PRIMARY KEY,
user_id TEXT DEFAULT 'kellow',
name TEXT,
labor_rates TEXT,
material_markups TEXT,
created_at TEXT
);
CREATE TABLE IF NOT EXISTS invoices (
id TEXT PRIMARY KEY,
estimate_id TEXT REFERENCES estimates(id),
job_id TEXT REFERENCES jobs(id),
invoice_number TEXT,
jobtread_invoice_id TEXT UNIQUE,
qbo_invoice_id TEXT,
amount_due REAL DEFAULT 0,
amount_paid REAL DEFAULT 0,
due_date TEXT,
status TEXT DEFAULT 'draft',
created_at TEXT,
synced_to_jobtread_at TEXT,
synced_to_qbo_at TEXT
);
CREATE TABLE IF NOT EXISTS daily_logs (
id TEXT PRIMARY KEY,
job_id TEXT REFERENCES jobs(id),
log_date TEXT,
crew_names TEXT,
hours_worked REAL DEFAULT 0,
notes TEXT,
photos TEXT,
created_at TEXT
);
CREATE TABLE IF NOT EXISTS signatures (
id TEXT PRIMARY KEY,
estimate_id TEXT REFERENCES estimates(id),
client_name TEXT,
signature_data TEXT,
signed_at TEXT
);
CREATE TABLE IF NOT EXISTS client_activity (
id TEXT PRIMARY KEY,
estimate_id TEXT REFERENCES estimates(id),
job_id TEXT REFERENCES jobs(id),
action TEXT,
metadata TEXT,
timestamp TEXT
);
CREATE TABLE IF NOT EXISTS jobtread_sync_log (
id TEXT PRIMARY KEY,
local_id TEXT,
jobtread_id TEXT,
entity_type TEXT,
action TEXT,
status TEXT DEFAULT 'pending',
error_message TEXT,
synced_at TEXT
);
CREATE TABLE IF NOT EXISTS project_milestones (
id TEXT PRIMARY KEY,
job_id TEXT REFERENCES jobs(id),
phase_name TEXT,
start_date TEXT,
end_date TEXT,
status TEXT DEFAULT 'pending'
);
"""
def get_db() -> sqlite3.Connection:
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
conn = sqlite3.connect(DB_PATH, timeout=10)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=5000")
return conn
def init_db():
conn = get_db()
conn.executescript(SCHEMA)
conn.commit()
conn.close()
def new_id():
return str(uuid.uuid4())[:8]
def now():
return datetime.now().astimezone().isoformat()
def row_to_dict(row):
if row is None:
return None
return dict(row)
def rows_to_list(rows):
return [dict(r) for r in rows]
if __name__ == "__main__":
init_db()
print(json.dumps({"ok": True, "db_path": DB_PATH}))

41
lib/pave.py Normal file
View File

@@ -0,0 +1,41 @@
"""JobTread Pave API helper. Reads creds from .jobtread-api.json."""
import json
import os
import urllib.request
PAVE_URL = "https://api.jobtread.com/pave"
_creds = None
def _get_creds():
global _creds
if _creds:
return _creds
paths = [
os.environ.get("JOBTREAD_CREDS"),
os.path.join(os.path.dirname(__file__), "..", "data", ".jobtread-api.json"),
"/home/node/.openclaw/workspace/agents/tom/.jobtread-api.json",
]
for p in paths:
if p and os.path.exists(p):
with open(p) as f:
_creds = json.load(f)
return _creds
raise RuntimeError("No .jobtread-api.json found")
def get_grant_key():
return _get_creds()["grantKey"]
def get_org_id():
return _get_creds()["organizationId"]
def pave_query(query: dict) -> dict:
"""Execute a Pave API query. Injects grantKey automatically."""
payload = {"query": {"$": {"grantKey": get_grant_key()}, **query}}
data = json.dumps(payload).encode()
req = urllib.request.Request(PAVE_URL, data=data,
headers={"Content-Type": "application/json"}, method="POST")
with urllib.request.urlopen(req, timeout=30) as resp:
result = json.loads(resp.read())
if "errors" in result:
raise RuntimeError(f"Pave API error: {result['errors']}")
return result

54
lib/qbo.py Normal file
View File

@@ -0,0 +1,54 @@
"""QuickBooks Online API helper. Tokens from http://192.168.86.11:18801."""
import json
import time
import urllib.request
TOKEN_URL = "http://192.168.86.11:18801/kellow-tokens.json"
REFRESH_URL = "http://192.168.86.11:18803/qbo/refresh?tenant=kellow"
QBO_BASE = "https://quickbooks.api.intuit.com/v3/company"
def get_token():
"""Fetch valid QBO access token + realm_id, auto-refresh if expired."""
with urllib.request.urlopen(TOKEN_URL, timeout=10) as resp:
tokens = json.loads(resp.read())
if time.time() >= tokens.get("expires_at_epoch", 0):
req = urllib.request.Request(REFRESH_URL, method="POST")
urllib.request.urlopen(req, timeout=15)
with urllib.request.urlopen(TOKEN_URL, timeout=10) as resp:
tokens = json.loads(resp.read())
return tokens["access_token"], tokens["realm_id"]
def qbo_request(method, endpoint, body=None):
"""Make authenticated QBO API request. Returns parsed JSON."""
access_token, realm_id = get_token()
url = f"{QBO_BASE}/{realm_id}/{endpoint}"
data = json.dumps(body).encode() if body else None
req = urllib.request.Request(url, data=data, method=method, headers={
"Authorization": f"Bearer {access_token}",
"Accept": "application/json",
"Content-Type": "application/json",
})
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read())
def create_invoice(customer_id, line_items, due_date=None):
"""Create invoice in QBO. line_items: [{description, amount, qty}]"""
lines = []
for i, item in enumerate(line_items, 1):
lines.append({
"LineNum": i,
"Amount": item["amount"],
"DetailType": "SalesItemLineDetail",
"Description": item.get("description", ""),
"SalesItemLineDetail": {
"Qty": item.get("qty", 1),
"UnitPrice": item["amount"] / item.get("qty", 1),
}
})
invoice = {
"CustomerRef": {"value": customer_id},
"Line": lines,
}
if due_date:
invoice["DueDate"] = due_date
return qbo_request("POST", "invoice", invoice)

0
scripts/__init__.py Normal file
View File

View File

@@ -0,0 +1,30 @@
"""Client activity logger: track views, approvals, signatures, payments."""
import json, sys, os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from lib.db import get_db, new_id, now, rows_to_list
def log(action, estimate_id=None, job_id=None, metadata=None):
aid = new_id()
conn = get_db()
conn.execute("INSERT INTO client_activity (id, estimate_id, job_id, action, metadata, timestamp) VALUES (?,?,?,?,?,?)",
(aid, estimate_id, job_id, action, json.dumps(metadata) if metadata else None, now()))
conn.commit(); conn.close()
return {"id": aid, "action": action, "timestamp": now()}
def get(estimate_id=None, job_id=None):
conn = get_db()
if estimate_id:
rows = conn.execute("SELECT * FROM client_activity WHERE estimate_id=? ORDER BY timestamp DESC", (estimate_id,)).fetchall()
elif job_id:
rows = conn.execute("SELECT * FROM client_activity WHERE job_id=? ORDER BY timestamp DESC", (job_id,)).fetchall()
else:
rows = conn.execute("SELECT * FROM client_activity ORDER BY timestamp DESC LIMIT 50").fetchall()
conn.close()
return {"activity": rows_to_list(rows)}
if __name__ == "__main__":
from lib.db import init_db; init_db()
cmd = sys.argv[1] if len(sys.argv) > 1 else "get"
if cmd == "log": print(json.dumps(log(sys.argv[2], estimate_id=sys.argv[3] if len(sys.argv)>3 else None)))
elif cmd == "get": print(json.dumps(get(estimate_id=sys.argv[2] if len(sys.argv)>2 else None), indent=2))
else: print(json.dumps({"error": f"unknown: {cmd}"}))

40
scripts/budget_tracker.py Normal file
View File

@@ -0,0 +1,40 @@
"""Budget tracker: compare estimated vs actual costs."""
import json, sys, os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from lib.db import get_db, row_to_dict, rows_to_list
def report(job_id):
conn = get_db()
job = row_to_dict(conn.execute("SELECT * FROM jobs WHERE id=?", (job_id,)).fetchone())
if not job: conn.close(); return {"error": "job not found"}
estimates = rows_to_list(conn.execute("SELECT * FROM estimates WHERE job_id=?", (job_id,)).fetchall())
logs = rows_to_list(conn.execute("SELECT * FROM daily_logs WHERE job_id=?", (job_id,)).fetchall())
invoices = rows_to_list(conn.execute("SELECT * FROM invoices WHERE job_id=?", (job_id,)).fetchall())
conn.close()
budgeted = sum(e["total_cost"] for e in estimates) if estimates else 0
actual_hours = sum(l["hours_worked"] for l in logs)
budgeted_hours = sum(e["labor_hours"] for e in estimates) if estimates else 0
invoiced = sum(i["amount_due"] for i in invoices)
paid = sum(i["amount_paid"] or 0 for i in invoices)
hours_variance = actual_hours - budgeted_hours
hours_pct = (hours_variance / budgeted_hours * 100) if budgeted_hours else 0
flags = []
if hours_pct > 10: flags.append(f"⚠️ Labor {hours_pct:.0f}% over budget")
if paid < invoiced * 0.5 and invoices: flags.append("⚠️ Less than 50% collected")
return {
"job_id": job_id, "client": job.get("client_name"),
"budget": {"total_estimated": budgeted, "budgeted_hours": budgeted_hours},
"actual": {"hours_worked": actual_hours, "log_entries": len(logs)},
"financial": {"invoiced": invoiced, "collected": paid, "outstanding": invoiced - paid},
"variance": {"hours": hours_variance, "hours_pct": round(hours_pct, 1)},
"flags": flags
}
if __name__ == "__main__":
from lib.db import init_db; init_db()
if len(sys.argv) < 2: print(json.dumps({"error": "usage: budget_tracker.py <job_id>"}))
else: print(json.dumps(report(sys.argv[1]), indent=2))

43
scripts/daily_log.py Normal file
View File

@@ -0,0 +1,43 @@
"""Daily job log management."""
import json, sys, os
from datetime import date
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from lib.db import get_db, new_id, now, row_to_dict, rows_to_list
def create(job_id, notes, crew_names=None, hours_worked=None, photos=None):
lid = new_id()
conn = get_db()
conn.execute("""INSERT INTO daily_logs (id, job_id, log_date, crew_names, hours_worked, notes, photos, created_at)
VALUES (?,?,?,?,?,?,?,?)""",
(lid, job_id, date.today().isoformat(), crew_names, hours_worked or 0, notes,
json.dumps(photos) if photos else None, now()))
conn.commit(); conn.close()
return {"id": lid, "job_id": job_id, "date": date.today().isoformat()}
def list_logs(job_id):
conn = get_db()
rows = conn.execute("SELECT * FROM daily_logs WHERE job_id=? ORDER BY log_date DESC", (job_id,)).fetchall()
conn.close()
result = rows_to_list(rows)
for r in result:
r["photos"] = json.loads(r["photos"]) if r["photos"] else []
return {"logs": result}
def get_log(log_id):
conn = get_db()
row = row_to_dict(conn.execute("SELECT * FROM daily_logs WHERE id=?", (log_id,)).fetchone())
conn.close()
if not row: return {"error": "log not found"}
row["photos"] = json.loads(row["photos"]) if row["photos"] else []
return row
if __name__ == "__main__":
from lib.db import init_db; init_db()
cmd = sys.argv[1] if len(sys.argv) > 1 else "list"
if cmd == "create":
print(json.dumps(create(sys.argv[2], sys.argv[3],
sys.argv[4] if len(sys.argv)>4 else None,
float(sys.argv[5]) if len(sys.argv)>5 else None)))
elif cmd == "list": print(json.dumps(list_logs(sys.argv[2]), indent=2))
elif cmd == "get": print(json.dumps(get_log(sys.argv[2]), indent=2))
else: print(json.dumps({"error": f"unknown: {cmd}"}))

View File

@@ -0,0 +1,108 @@
"""Estimate generator: parse job description into structured estimate."""
import json, sys, os, re
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from lib.db import get_db, init_db
from scripts.pricing_catalog import get_catalog, list_catalogs
from scripts.project_mgmt import create_job, create_estimate
# Common construction scope patterns → (category, unit, base_hours_per_unit)
SCOPE_PATTERNS = {
r"cabinet": ("finish_carpentry", "lf", 2.5),
r"countertop": ("tile", "sf", 0.5),
r"floor": ("flooring", "sf", 0.15),
r"tile": ("tile", "sf", 0.25),
r"paint": ("painting", "sf", 0.05),
r"drywall": ("drywall", "sf", 0.1),
r"electric": ("electrical", "ea", 4),
r"plumb": ("plumbing", "ea", 6),
r"frame|wall": ("framing", "lf", 1.5),
r"demo": ("demolition", "sf", 0.08),
r"roof": ("roofing", "sf", 0.12),
r"concrete|foundation": ("concrete", "cy", 8),
r"window": ("finish_carpentry", "ea", 3),
r"door": ("finish_carpentry", "ea", 4),
r"deck": ("framing", "sf", 0.3),
r"bathroom": ("plumbing", "ea", 40),
r"kitchen": ("general_labor", "ea", 120),
}
# Material cost estimates per unit
MATERIAL_COSTS = {
"cabinet": 250, "countertop": 75, "flooring": 8, "tile": 12,
"paint": 0.50, "drywall": 3, "electrical": 150, "plumbing": 200,
"framing": 5, "demolition": 2, "roofing": 6, "concrete": 180,
"window": 600, "door": 400, "deck": 15, "bathroom": 3000, "kitchen": 8000,
}
def generate(description, job_id=None, catalog_id=None, markup_percent=20):
"""Generate estimate from text description."""
# Get pricing catalog
if catalog_id:
catalog = get_catalog(catalog_id)
else:
cats = list_catalogs()
catalog = cats["catalogs"][0] if cats["catalogs"] else None
rates = catalog.get("labor_rates", {}) if catalog else {}
markups = catalog.get("material_markups", {}) if catalog else {}
# Extract quantities from description
sqft_match = re.search(r"(\d+)\s*(?:sq\s*ft|sqft|sf)", description, re.I)
sqft = int(sqft_match.group(1)) if sqft_match else 200 # default
# Build line items from description
line_items = []
desc_lower = description.lower()
matched = False
for pattern, (category, unit, hours_per) in SCOPE_PATTERNS.items():
if re.search(pattern, desc_lower):
matched = True
qty = sqft if unit == "sf" else (sqft / 10 if unit == "lf" else 1)
labor_rate = rates.get(category, 75)
labor_hours = qty * hours_per
labor_cost = labor_hours * labor_rate
# Material cost
mat_key = next((k for k in MATERIAL_COSTS if re.search(pattern, k)), None)
mat_cost = MATERIAL_COSTS.get(mat_key, 10) * qty if mat_key else qty * 10
mat_markup = markups.get(category, 1.15)
mat_cost *= mat_markup
line_items.append({
"description": f"{category.replace('_',' ').title()}{pattern.replace('|','/')}",
"category": "labor", "quantity": round(labor_hours, 1),
"unit": "hr", "unit_cost": labor_rate
})
line_items.append({
"description": f"Materials — {pattern.replace('|','/')}",
"category": "materials", "quantity": round(qty, 1),
"unit": unit, "unit_cost": round(mat_cost / qty, 2)
})
if not matched:
# Generic estimate based on sqft
line_items = [
{"description": "General Labor", "category": "labor", "quantity": sqft * 0.2, "unit": "hr", "unit_cost": rates.get("general_labor", 55)},
{"description": "Materials (general)", "category": "materials", "quantity": sqft, "unit": "sf", "unit_cost": 15},
]
# Create job if needed
if not job_id:
job_result = create_job("TBD", description=description)
job_id = job_result["id"]
# Create estimate
result = create_estimate(job_id, line_items, markup_percent=markup_percent)
result["line_items_count"] = len(line_items)
result["description"] = description
return result
if __name__ == "__main__":
init_db()
if len(sys.argv) < 2:
print(json.dumps({"error": "usage: estimate_generator.py <description> [catalog_id] [job_id]"}))
else:
desc = sys.argv[1]
cat_id = sys.argv[2] if len(sys.argv) > 2 else None
jid = sys.argv[3] if len(sys.argv) > 3 else None
print(json.dumps(generate(desc, job_id=jid, catalog_id=cat_id), indent=2))

11
scripts/init_db.py Normal file
View File

@@ -0,0 +1,11 @@
#!/usr/bin/env python3
"""Initialize Handoff Pro database and seed default data."""
import sys, os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from lib.db import init_db
from scripts.pricing_catalog import seed_default
if __name__ == "__main__":
init_db()
result = seed_default()
print(f"Database initialized. Default catalog: {result}")

81
scripts/invoice_mgmt.py Normal file
View File

@@ -0,0 +1,81 @@
"""Invoice management: create, track, sync to JobTread + QBO."""
import json, sys, os
from datetime import datetime, timedelta
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from lib.db import get_db, new_id, now, row_to_dict, rows_to_list
def create(estimate_id, due_date=None):
conn = get_db()
est = row_to_dict(conn.execute("SELECT * FROM estimates WHERE id=?", (estimate_id,)).fetchone())
if not est: conn.close(); return {"error": "estimate not found"}
# Generate invoice number
count = conn.execute("SELECT COUNT(*) as c FROM invoices").fetchone()["c"]
inv_num = f"KC-{count + 1001}"
if not due_date:
due_date = (datetime.now() + timedelta(days=30)).strftime("%Y-%m-%d")
inv_id = new_id()
conn.execute("""INSERT INTO invoices (id, estimate_id, job_id, invoice_number, amount_due, due_date, status, created_at)
VALUES (?,?,?,?,?,?,?,?)""",
(inv_id, estimate_id, est["job_id"], inv_num, est["total_cost"], due_date, "draft", now()))
conn.commit(); conn.close()
return {"id": inv_id, "invoice_number": inv_num, "amount_due": est["total_cost"], "due_date": due_date, "status": "draft"}
def list_invoices(job_id=None):
conn = get_db()
if job_id:
rows = conn.execute("SELECT * FROM invoices WHERE job_id=? ORDER BY created_at DESC", (job_id,)).fetchall()
else:
rows = conn.execute("SELECT * FROM invoices ORDER BY created_at DESC").fetchall()
conn.close()
return {"invoices": rows_to_list(rows)}
def get(invoice_id):
conn = get_db()
inv = row_to_dict(conn.execute("SELECT * FROM invoices WHERE id=?", (invoice_id,)).fetchone())
if not inv: conn.close(); return {"error": "invoice not found"}
est = row_to_dict(conn.execute("SELECT * FROM estimates WHERE id=?", (inv["estimate_id"],)).fetchone())
if est:
inv["line_items"] = rows_to_list(conn.execute("SELECT * FROM line_items WHERE estimate_id=?", (inv["estimate_id"],)).fetchall())
job = row_to_dict(conn.execute("SELECT * FROM jobs WHERE id=?", (inv["job_id"],)).fetchone())
inv["job"] = job
conn.close()
return inv
def mark_paid(invoice_id, amount_paid=None):
conn = get_db()
inv = row_to_dict(conn.execute("SELECT * FROM invoices WHERE id=?", (invoice_id,)).fetchone())
if not inv: conn.close(); return {"error": "invoice not found"}
paid = amount_paid if amount_paid else inv["amount_due"]
conn.execute("UPDATE invoices SET amount_paid=?, status='paid' WHERE id=?", (paid, invoice_id))
conn.commit(); conn.close()
return {"ok": True, "id": invoice_id, "amount_paid": paid, "status": "paid"}
def sync_to_qbo(invoice_id):
"""Sync invoice to QuickBooks Online."""
conn = get_db()
inv = row_to_dict(conn.execute("SELECT * FROM invoices WHERE id=?", (invoice_id,)).fetchone())
if not inv: conn.close(); return {"error": "invoice not found"}
items = rows_to_list(conn.execute("SELECT * FROM line_items WHERE estimate_id=?", (inv["estimate_id"],)).fetchall())
conn.close()
try:
from lib.qbo import create_invoice as qbo_create
qbo_lines = [{"description": i["description"], "amount": i["total"], "qty": i["quantity"]} for i in items]
# Note: customer_id mapping needed — for now use placeholder
result = qbo_create("1", qbo_lines, inv["due_date"])
qbo_id = result.get("Invoice", {}).get("Id")
conn = get_db()
conn.execute("UPDATE invoices SET qbo_invoice_id=?, synced_to_qbo_at=? WHERE id=?", (qbo_id, now(), invoice_id))
conn.commit(); conn.close()
return {"ok": True, "qbo_invoice_id": qbo_id}
except Exception as e:
return {"error": str(e), "msg": "QBO sync failed — check token status"}
if __name__ == "__main__":
from lib.db import init_db; init_db()
cmd = sys.argv[1] if len(sys.argv) > 1 else "list"
if cmd == "create": print(json.dumps(create(sys.argv[2], sys.argv[3] if len(sys.argv)>3 else None)))
elif cmd == "list": print(json.dumps(list_invoices(sys.argv[2] if len(sys.argv)>2 else None), indent=2))
elif cmd == "get": print(json.dumps(get(sys.argv[2]), indent=2))
elif cmd == "mark-paid": print(json.dumps(mark_paid(sys.argv[2], float(sys.argv[3]) if len(sys.argv)>3 else None)))
elif cmd == "sync-qbo": print(json.dumps(sync_to_qbo(sys.argv[2])))
else: print(json.dumps({"error": f"unknown: {cmd}"}))

90
scripts/jobtread_sync.py Normal file
View File

@@ -0,0 +1,90 @@
"""JobTread bidirectional sync tool."""
import json, sys, os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from lib.db import get_db, new_id, now, row_to_dict, rows_to_list
from lib.pave import pave_query, get_org_id
def list_customers():
org_id = get_org_id()
result = pave_query({"organization": {"$": {"id": org_id},
"accounts": {"nodes": {"id": {}, "name": {}, "type": {}}}}})
accounts = result.get("organization", {}).get("accounts", {}).get("nodes", [])
return {"customers": [a for a in accounts if a.get("type") == "customer"]}
def pull_jobs():
"""Pull all jobs from JobTread and upsert to local DB."""
org_id = get_org_id()
result = pave_query({"organization": {"$": {"id": org_id},
"jobs": {"nodes": {"id": {}, "name": {}, "status": {}, "createdAt": {},
"account": {"id": {}, "name": {}},
"location": {"name": {}, "address": {}}}}}})
jobs = result.get("organization", {}).get("jobs", {}).get("nodes", [])
conn = get_db()
synced = 0
for j in jobs:
jt_id = j.get("id")
existing = conn.execute("SELECT id FROM jobs WHERE jobtread_job_id=?", (jt_id,)).fetchone()
if existing:
conn.execute("UPDATE jobs SET status=?, updated_at=?, synced_to_jobtread_at=? WHERE jobtread_job_id=?",
(j.get("status","active"), now(), now(), jt_id))
else:
conn.execute("""INSERT INTO jobs (id, client_name, address, description, jobtread_job_id, jobtread_customer_id, status, created_at, updated_at, synced_to_jobtread_at)
VALUES (?,?,?,?,?,?,?,?,?,?)""",
(new_id(), j.get("account",{}).get("name",""), j.get("location",{}).get("address",""),
j.get("name",""), jt_id, j.get("account",{}).get("id",""),
j.get("status","active"), j.get("createdAt", now()), now(), now()))
synced += 1
conn.commit(); conn.close()
_log_sync(None, None, "job", "pull", "success")
return {"ok": True, "synced": synced, "total_jobs": len(jobs)}
def push_estimate(estimate_id):
"""Push local estimate to JobTread. NOTE: Pave create mutations need verification."""
conn = get_db()
est = row_to_dict(conn.execute("SELECT * FROM estimates WHERE id=?", (estimate_id,)).fetchone())
if not est: conn.close(); return {"error": "estimate not found"}
job = row_to_dict(conn.execute("SELECT * FROM jobs WHERE id=?", (est["job_id"],)).fetchone())
if not job or not job.get("jobtread_job_id"):
conn.close(); return {"error": "job not linked to JobTread"}
items = rows_to_list(conn.execute("SELECT * FROM line_items WHERE estimate_id=?", (estimate_id,)).fetchall())
conn.close()
# Log as pending — actual Pave create mutation TBD (needs Pave Explorer verification)
_log_sync(estimate_id, job["jobtread_job_id"], "estimate", "push", "pending",
"Pave create document mutation needs verification")
return {"ok": True, "status": "pending", "msg": "Estimate queued for JobTread sync. Pave create mutation needs verification via Pave Explorer."}
def push_invoice(invoice_id):
"""Push local invoice to JobTread."""
conn = get_db()
inv = row_to_dict(conn.execute("SELECT * FROM invoices WHERE id=?", (invoice_id,)).fetchone())
if not inv: conn.close(); return {"error": "invoice not found"}
job = row_to_dict(conn.execute("SELECT * FROM jobs WHERE id=?", (inv["job_id"],)).fetchone())
conn.close()
if not job or not job.get("jobtread_job_id"):
return {"error": "job not linked to JobTread"}
_log_sync(invoice_id, job["jobtread_job_id"], "invoice", "push", "pending",
"Pave create invoice mutation needs verification")
return {"ok": True, "status": "pending", "msg": "Invoice queued for JobTread sync."}
def get_sync_status(job_id):
conn = get_db()
job = row_to_dict(conn.execute("SELECT * FROM jobs WHERE id=?", (job_id,)).fetchone())
logs = rows_to_list(conn.execute("SELECT * FROM jobtread_sync_log WHERE local_id=? ORDER BY synced_at DESC LIMIT 10", (job_id,)).fetchall())
conn.close()
return {"job": job, "sync_log": logs}
def _log_sync(local_id, jt_id, entity_type, action, status, error=None):
conn = get_db()
conn.execute("INSERT INTO jobtread_sync_log (id, local_id, jobtread_id, entity_type, action, status, error_message, synced_at) VALUES (?,?,?,?,?,?,?,?)",
(new_id(), local_id, jt_id, entity_type, action, status, error, now()))
conn.commit(); conn.close()
if __name__ == "__main__":
from lib.db import init_db; init_db()
cmd = sys.argv[1] if len(sys.argv) > 1 else "list-customers"
if cmd == "list-customers": print(json.dumps(list_customers(), indent=2))
elif cmd == "pull-jobs": print(json.dumps(pull_jobs(), indent=2))
elif cmd == "push-estimate": print(json.dumps(push_estimate(sys.argv[2])))
elif cmd == "push-invoice": print(json.dumps(push_invoice(sys.argv[2])))
elif cmd == "sync-status": print(json.dumps(get_sync_status(sys.argv[2]), indent=2))
else: print(json.dumps({"error": f"unknown: {cmd}"}))

35
scripts/material_list.py Normal file
View File

@@ -0,0 +1,35 @@
"""Material list generator from estimates."""
import json, sys, os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from lib.db import get_db, row_to_dict, rows_to_list
def generate(estimate_id):
conn = get_db()
est = row_to_dict(conn.execute("SELECT * FROM estimates WHERE id=?", (estimate_id,)).fetchone())
if not est: conn.close(); return {"error": "estimate not found"}
items = rows_to_list(conn.execute("SELECT * FROM line_items WHERE estimate_id=? AND category='materials'", (estimate_id,)).fetchall())
job = row_to_dict(conn.execute("SELECT * FROM jobs WHERE id=?", (est["job_id"],)).fetchone())
conn.close()
materials = []
total = 0
for item in items:
materials.append({
"item": item["description"],
"quantity": item["quantity"],
"unit": item["unit"],
"est_cost": round(item["total"], 2)
})
total += item["total"]
return {
"job": job.get("description", "") if job else "",
"materials": materials,
"total_materials_cost": round(total, 2),
"items_count": len(materials)
}
if __name__ == "__main__":
from lib.db import init_db; init_db()
if len(sys.argv) < 2: print(json.dumps({"error": "usage: material_list.py <estimate_id>"}))
else: print(json.dumps(generate(sys.argv[1]), indent=2))

View File

@@ -0,0 +1,71 @@
"""Pricing catalog CRUD operations."""
import json, sys, os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from lib.db import get_db, new_id, now, row_to_dict, rows_to_list
def list_catalogs():
conn = get_db()
rows = conn.execute("SELECT * FROM pricing_catalogs ORDER BY created_at DESC").fetchall()
conn.close()
result = rows_to_list(rows)
for r in result:
r["labor_rates"] = json.loads(r["labor_rates"]) if r["labor_rates"] else {}
r["material_markups"] = json.loads(r["material_markups"]) if r["material_markups"] else {}
return {"catalogs": result}
def create_catalog(name, labor_rates, material_markups=None):
cid = new_id()
conn = get_db()
conn.execute("INSERT INTO pricing_catalogs (id, name, labor_rates, material_markups, created_at) VALUES (?,?,?,?,?)",
(cid, name, json.dumps(labor_rates), json.dumps(material_markups or {}), now()))
conn.commit(); conn.close()
return {"id": cid, "name": name}
def get_catalog(catalog_id):
conn = get_db()
row = conn.execute("SELECT * FROM pricing_catalogs WHERE id=?", (catalog_id,)).fetchone()
conn.close()
if not row: return {"error": "not found"}
r = row_to_dict(row)
r["labor_rates"] = json.loads(r["labor_rates"]) if r["labor_rates"] else {}
r["material_markups"] = json.loads(r["material_markups"]) if r["material_markups"] else {}
return r
def update_catalog(catalog_id, name=None, labor_rates=None, material_markups=None):
conn = get_db()
if name: conn.execute("UPDATE pricing_catalogs SET name=? WHERE id=?", (name, catalog_id))
if labor_rates: conn.execute("UPDATE pricing_catalogs SET labor_rates=? WHERE id=?", (json.dumps(labor_rates), catalog_id))
if material_markups: conn.execute("UPDATE pricing_catalogs SET material_markups=? WHERE id=?", (json.dumps(material_markups), catalog_id))
conn.commit(); conn.close()
return {"ok": True, "id": catalog_id}
def delete_catalog(catalog_id):
conn = get_db()
conn.execute("DELETE FROM pricing_catalogs WHERE id=?", (catalog_id,))
conn.commit(); conn.close()
return {"ok": True, "deleted": catalog_id}
def seed_default():
"""Seed default Kellow Construction pricing catalog."""
existing = list_catalogs()
if existing["catalogs"]: return {"ok": True, "msg": "already seeded"}
return create_catalog("Kellow Default", {
"general_labor": 55, "framing": 85, "electrical": 95,
"plumbing": 90, "painting": 65, "drywall": 75,
"tile": 80, "flooring": 70, "roofing": 85,
"concrete": 75, "demolition": 60, "finish_carpentry": 90,
"project_management": 95, "design": 110
}, {"lumber": 1.15, "drywall": 1.20, "tile": 1.25,
"plumbing_fixtures": 1.20, "electrical_fixtures": 1.15,
"paint": 1.10, "hardware": 1.15, "appliances": 1.10})
if __name__ == "__main__":
from lib.db import init_db; init_db()
cmd = sys.argv[1] if len(sys.argv) > 1 else "list"
if cmd == "list": print(json.dumps(list_catalogs(), indent=2))
elif cmd == "create": print(json.dumps(create_catalog(sys.argv[2], json.loads(sys.argv[3]), json.loads(sys.argv[4]) if len(sys.argv) > 4 else None)))
elif cmd == "get": print(json.dumps(get_catalog(sys.argv[2]), indent=2))
elif cmd == "update": print(json.dumps(update_catalog(sys.argv[2], labor_rates=json.loads(sys.argv[3]) if len(sys.argv) > 3 else None)))
elif cmd == "delete": print(json.dumps(delete_catalog(sys.argv[2])))
elif cmd == "seed": print(json.dumps(seed_default()))
else: print(json.dumps({"error": f"unknown command: {cmd}"}))

102
scripts/project_mgmt.py Normal file
View File

@@ -0,0 +1,102 @@
"""Project management: jobs and estimates CRUD."""
import json, sys, os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from lib.db import get_db, new_id, now, row_to_dict, rows_to_list
def create_job(client_name, address=None, description=None, jobtread_job_id=None, jobtread_customer_id=None):
jid = new_id()
conn = get_db()
conn.execute("""INSERT INTO jobs (id, client_name, address, description, jobtread_job_id, jobtread_customer_id, status, created_at, updated_at)
VALUES (?,?,?,?,?,?,?,?,?)""",
(jid, client_name, address, description, jobtread_job_id, jobtread_customer_id, "proposal", now(), now()))
conn.commit(); conn.close()
return {"id": jid, "client_name": client_name, "status": "proposal"}
def list_jobs(status=None):
conn = get_db()
if status:
rows = conn.execute("SELECT * FROM jobs WHERE status=? ORDER BY created_at DESC", (status,)).fetchall()
else:
rows = conn.execute("SELECT * FROM jobs ORDER BY created_at DESC").fetchall()
conn.close()
return {"jobs": rows_to_list(rows)}
def get_job(job_id):
conn = get_db()
job = row_to_dict(conn.execute("SELECT * FROM jobs WHERE id=?", (job_id,)).fetchone())
if not job: conn.close(); return {"error": "job not found"}
job["estimates"] = rows_to_list(conn.execute("SELECT * FROM estimates WHERE job_id=?", (job_id,)).fetchall())
job["invoices"] = rows_to_list(conn.execute("SELECT * FROM invoices WHERE job_id=?", (job_id,)).fetchall())
conn.close()
return job
def update_job(job_id, status=None, client_name=None, address=None, description=None):
conn = get_db()
updates = []
params = []
for field, val in [("status", status), ("client_name", client_name), ("address", address), ("description", description)]:
if val is not None:
updates.append(f"{field}=?"); params.append(val)
if updates:
updates.append("updated_at=?"); params.append(now())
params.append(job_id)
conn.execute(f"UPDATE jobs SET {','.join(updates)} WHERE id=?", params)
conn.commit()
conn.close()
return {"ok": True, "id": job_id}
def create_estimate(job_id, line_items, labor_hours=None, labor_rate=None, markup_percent=20):
eid = new_id()
conn = get_db()
materials_cost = 0
total_labor_hours = labor_hours or 0
# Calculate totals first
item_rows = []
for item in line_items:
lid = new_id()
total = item.get("quantity", 1) * item.get("unit_cost", 0)
item_rows.append((lid, eid, item.get("description",""), item.get("category",""), item.get("quantity",1), item.get("unit","ea"), item.get("unit_cost",0), total))
if item.get("category") == "labor":
total_labor_hours += item.get("quantity", 0)
else:
materials_cost += total
effective_rate = labor_rate or 85
labor_cost = total_labor_hours * effective_rate
subtotal = labor_cost + materials_cost
markup = subtotal * (markup_percent / 100)
total_cost = subtotal + markup
# Insert estimate first, then line items
conn.execute("""INSERT INTO estimates (id, job_id, labor_hours, labor_rate, materials_cost, markup_percent, total_cost, status, created_at)
VALUES (?,?,?,?,?,?,?,?,?)""",
(eid, job_id, total_labor_hours, effective_rate, materials_cost, markup_percent, total_cost, "draft", now()))
for row in item_rows:
conn.execute("INSERT INTO line_items (id, estimate_id, description, category, quantity, unit, unit_cost, total) VALUES (?,?,?,?,?,?,?,?)", row)
conn.commit(); conn.close()
return {"id": eid, "job_id": job_id, "labor_hours": total_labor_hours, "labor_rate": effective_rate,
"materials_cost": materials_cost, "markup_percent": markup_percent, "total_cost": total_cost}
def get_estimate(estimate_id):
conn = get_db()
est = row_to_dict(conn.execute("SELECT * FROM estimates WHERE id=?", (estimate_id,)).fetchone())
if not est: conn.close(); return {"error": "estimate not found"}
est["line_items"] = rows_to_list(conn.execute("SELECT * FROM line_items WHERE estimate_id=?", (estimate_id,)).fetchall())
job = row_to_dict(conn.execute("SELECT * FROM jobs WHERE id=?", (est["job_id"],)).fetchone())
est["job"] = job
conn.close()
return est
def update_estimate_status(estimate_id, status):
conn = get_db()
conn.execute("UPDATE estimates SET status=? WHERE id=?", (status, estimate_id))
conn.commit(); conn.close()
return {"ok": True, "id": estimate_id, "status": status}
if __name__ == "__main__":
from lib.db import init_db; init_db()
cmd = sys.argv[1] if len(sys.argv) > 1 else "list-jobs"
if cmd == "create-job": print(json.dumps(create_job(sys.argv[2], sys.argv[3] if len(sys.argv)>3 else None, sys.argv[4] if len(sys.argv)>4 else None)))
elif cmd == "list-jobs": print(json.dumps(list_jobs(sys.argv[2] if len(sys.argv)>2 else None), indent=2))
elif cmd == "get-job": print(json.dumps(get_job(sys.argv[2]), indent=2))
elif cmd == "get-estimate": print(json.dumps(get_estimate(sys.argv[2]), indent=2))
elif cmd == "create-estimate": print(json.dumps(create_estimate(sys.argv[2], json.loads(sys.argv[3]), markup_percent=float(sys.argv[4]) if len(sys.argv)>4 else 20)))
else: print(json.dumps({"error": f"unknown: {cmd}"}))

View File

@@ -0,0 +1,35 @@
"""Project scheduler: milestones and timeline management."""
import json, sys, os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from lib.db import get_db, new_id, row_to_dict, rows_to_list
def set_milestone(job_id, phase_name, start_date=None, end_date=None, status="pending"):
mid = new_id()
conn = get_db()
conn.execute("INSERT INTO project_milestones (id, job_id, phase_name, start_date, end_date, status) VALUES (?,?,?,?,?,?)",
(mid, job_id, phase_name, start_date, end_date, status))
conn.commit(); conn.close()
return {"id": mid, "job_id": job_id, "phase": phase_name}
def update_milestone(milestone_id, status=None, start_date=None, end_date=None):
conn = get_db()
if status: conn.execute("UPDATE project_milestones SET status=? WHERE id=?", (status, milestone_id))
if start_date: conn.execute("UPDATE project_milestones SET start_date=? WHERE id=?", (start_date, milestone_id))
if end_date: conn.execute("UPDATE project_milestones SET end_date=? WHERE id=?", (end_date, milestone_id))
conn.commit(); conn.close()
return {"ok": True, "id": milestone_id}
def get_schedule(job_id):
conn = get_db()
job = row_to_dict(conn.execute("SELECT * FROM jobs WHERE id=?", (job_id,)).fetchone())
milestones = rows_to_list(conn.execute("SELECT * FROM project_milestones WHERE job_id=? ORDER BY start_date", (job_id,)).fetchall())
conn.close()
if not job: return {"error": "job not found"}
return {"job_id": job_id, "client": job.get("client_name"), "milestones": milestones}
if __name__ == "__main__":
from lib.db import init_db; init_db()
cmd = sys.argv[1] if len(sys.argv) > 1 else "get"
if cmd == "set": print(json.dumps(set_milestone(sys.argv[2], sys.argv[3], sys.argv[4] if len(sys.argv)>4 else None, sys.argv[5] if len(sys.argv)>5 else None)))
elif cmd == "get": print(json.dumps(get_schedule(sys.argv[2]), indent=2))
else: print(json.dumps({"error": f"unknown: {cmd}"}))

104
scripts/proposal_drafter.py Normal file
View File

@@ -0,0 +1,104 @@
"""Proposal drafter: generate professional proposals from estimates."""
import json, sys, os
from datetime import datetime, timedelta
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from lib.db import get_db, row_to_dict, rows_to_list
TEMPLATE = """# Proposal — {client_name}
## {job_description}
**Prepared by:** Kellow Construction
**Date:** {date}
**Valid for:** 30 days
---
## Scope of Work
{scope_items}
## Investment Summary
| Category | Amount |
|----------|--------|
| Labor ({labor_hours} hrs @ ${labor_rate}/hr) | ${labor_cost:,.2f} |
| Materials | ${materials_cost:,.2f} |
| Subtotal | ${subtotal:,.2f} |
| Overhead & Profit ({markup_percent}%) | ${markup:,.2f} |
| **Total** | **${total_cost:,.2f}** |
## Timeline
Estimated duration: {timeline}
Projected start: Upon approval and permitting
## Payment Terms
- 30% deposit upon contract signing
- Progress payments at milestones
- Final 10% upon completion and walkthrough
## Exclusions
- Permits and fees (handled separately)
- Unforeseen structural issues
- Owner-supplied materials
- Landscaping and exterior work (unless specified)
## Warranty
Kellow Construction provides a 1-year workmanship warranty on all labor performed.
---
*This proposal is valid for 30 days from the date above. Acceptance constitutes agreement to the terms outlined herein.*
"""
def draft(estimate_id):
conn = get_db()
est = row_to_dict(conn.execute("SELECT * FROM estimates WHERE id=?", (estimate_id,)).fetchone())
if not est: conn.close(); return {"error": "estimate not found"}
items = rows_to_list(conn.execute("SELECT * FROM line_items WHERE estimate_id=?", (estimate_id,)).fetchall())
job = row_to_dict(conn.execute("SELECT * FROM jobs WHERE id=?", (est["job_id"],)).fetchone())
conn.close()
labor_cost = est["labor_hours"] * est["labor_rate"]
subtotal = labor_cost + est["materials_cost"]
markup = subtotal * (est["markup_percent"] / 100)
# Build scope items
scope_lines = []
for item in items:
scope_lines.append(f"- {item['description']}: {item['quantity']} {item['unit']} @ ${item['unit_cost']:.2f}/{item['unit']}")
# Estimate timeline from labor hours
weeks = max(1, int(est["labor_hours"] / 40))
timeline = f"{weeks} week{'s' if weeks > 1 else ''}"
proposal = TEMPLATE.format(
client_name=job.get("client_name", "Client") if job else "Client",
job_description=job.get("description", "Project") if job else "Project",
date=datetime.now().strftime("%B %d, %Y"),
scope_items="\n".join(scope_lines) if scope_lines else "- As discussed",
labor_hours=round(est["labor_hours"], 1),
labor_rate=est["labor_rate"],
labor_cost=labor_cost,
materials_cost=est["materials_cost"],
subtotal=subtotal,
markup_percent=est["markup_percent"],
markup=markup,
total_cost=est["total_cost"],
timeline=timeline,
)
return {"proposal": proposal, "estimate_id": estimate_id, "total": est["total_cost"]}
if __name__ == "__main__":
from lib.db import init_db; init_db()
if len(sys.argv) < 2:
print(json.dumps({"error": "usage: proposal_drafter.py <estimate_id>"}))
else:
result = draft(sys.argv[1])
if "proposal" in result:
print(result["proposal"])
else:
print(json.dumps(result))

View File

@@ -0,0 +1,50 @@
"""Scope of work generator from estimates."""
import json, sys, os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from lib.db import get_db, row_to_dict, rows_to_list
STANDARD_CLAUSES = """
## General Conditions
- All work performed in accordance with local building codes and regulations
- Contractor to obtain all necessary permits unless otherwise noted
- Work area to be kept clean and debris removed daily
- Final cleanup included upon project completion
## Warranty
- 1-year workmanship warranty on all labor
- Manufacturer warranties apply to all materials and fixtures
## Exclusions
- Unforeseen conditions (mold, asbestos, structural damage) — addressed via change order
- Owner-supplied materials — contractor not responsible for defects
- Landscaping, exterior painting, or work not explicitly listed above
- Furniture moving or storage
"""
def generate(estimate_id):
conn = get_db()
est = row_to_dict(conn.execute("SELECT * FROM estimates WHERE id=?", (estimate_id,)).fetchone())
if not est: conn.close(); return {"error": "estimate not found"}
items = rows_to_list(conn.execute("SELECT * FROM line_items WHERE estimate_id=?", (estimate_id,)).fetchall())
job = row_to_dict(conn.execute("SELECT * FROM jobs WHERE id=?", (est["job_id"],)).fetchone())
conn.close()
lines = [f"# Scope of Work — {job.get('client_name','Client') if job else 'Client'}",
f"**Project:** {job.get('description','') if job else ''}",
f"**Address:** {job.get('address','') if job else ''}", "", "## Work Items", ""]
for i, item in enumerate(items, 1):
if item["category"] == "labor":
lines.append(f"{i}. **{item['description']}** — {item['quantity']} {item['unit']}")
lines.append(STANDARD_CLAUSES)
scope = "\n".join(lines)
return {"scope": scope, "estimate_id": estimate_id, "items_count": len(items)}
if __name__ == "__main__":
from lib.db import init_db; init_db()
if len(sys.argv) < 2: print(json.dumps({"error": "usage: scope_generator.py <estimate_id>"}))
else:
r = generate(sys.argv[1])
print(r.get("scope", json.dumps(r)))

35
scripts/signature_mgmt.py Normal file
View File

@@ -0,0 +1,35 @@
"""Signature management: request and record client signatures."""
import json, sys, os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from lib.db import get_db, new_id, now, row_to_dict, rows_to_list
def request_signature(estimate_id, client_name):
sid = new_id()
conn = get_db()
conn.execute("INSERT INTO signatures (id, estimate_id, client_name) VALUES (?,?,?)",
(sid, estimate_id, client_name))
conn.commit(); conn.close()
return {"signature_id": sid, "estimate_id": estimate_id, "client_name": client_name, "status": "pending"}
def record_signature(signature_id, signature_data="approved_via_telegram"):
conn = get_db()
conn.execute("UPDATE signatures SET signature_data=?, signed_at=? WHERE id=?",
(signature_data, now(), signature_id))
conn.commit(); conn.close()
# Also log as activity
sig = row_to_dict(conn.execute("SELECT * FROM signatures WHERE id=?", (signature_id,)).fetchone()) if False else None
return {"ok": True, "signature_id": signature_id, "signed_at": now()}
def get_signature(estimate_id):
conn = get_db()
row = row_to_dict(conn.execute("SELECT * FROM signatures WHERE estimate_id=? ORDER BY signed_at DESC LIMIT 1", (estimate_id,)).fetchone())
conn.close()
return row if row else {"status": "no signature found"}
if __name__ == "__main__":
from lib.db import init_db; init_db()
cmd = sys.argv[1] if len(sys.argv) > 1 else "get"
if cmd == "request": print(json.dumps(request_signature(sys.argv[2], sys.argv[3])))
elif cmd == "record": print(json.dumps(record_signature(sys.argv[2], sys.argv[3] if len(sys.argv)>3 else "approved")))
elif cmd == "get": print(json.dumps(get_signature(sys.argv[2]), indent=2))
else: print(json.dumps({"error": f"unknown: {cmd}"}))

44
scripts/site_analyzer.py Normal file
View File

@@ -0,0 +1,44 @@
"""Site walkthrough analyzer: structured report from observations."""
import json, sys, os, re
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from lib.db import now
# Issue patterns to flag
ISSUE_PATTERNS = {
r"water\s*damage|moisture|leak|mold": {"severity": "high", "category": "water_damage", "cost_impact": 1200},
r"crack|structural|foundation\s*issue|settling": {"severity": "high", "category": "structural", "cost_impact": 3000},
r"rot|decay|termite|pest": {"severity": "high", "category": "pest_damage", "cost_impact": 2000},
r"asbestos|lead\s*paint|hazmat": {"severity": "critical", "category": "hazmat", "cost_impact": 5000},
r"outdated\s*wiring|knob.and.tube|electrical\s*issue": {"severity": "medium", "category": "electrical", "cost_impact": 2500},
r"rust|corrosion|pipe": {"severity": "medium", "category": "plumbing", "cost_impact": 1500},
}
def analyze(observations, job_id=None, photos_description=None):
"""Analyze site observations and flag issues."""
combined = f"{observations} {photos_description or ''}"
issues = []
total_impact = 0
for pattern, info in ISSUE_PATTERNS.items():
if re.search(pattern, combined, re.I):
issues.append({
"category": info["category"],
"severity": info["severity"],
"estimated_cost_impact": info["cost_impact"],
"matched": re.search(pattern, combined, re.I).group(0)
})
total_impact += info["cost_impact"]
return {
"job_id": job_id,
"analyzed_at": now(),
"observations": observations,
"photos_description": photos_description,
"issues_found": len(issues),
"issues": issues,
"total_cost_impact": total_impact,
"recommendation": f"Add ${total_impact:,.0f} to estimate for remediation" if issues else "No significant issues detected"
}
if __name__ == "__main__":
if len(sys.argv) < 2: print(json.dumps({"error": "usage: site_analyzer.py <observations> [photos_desc]"}))
else: print(json.dumps(analyze(sys.argv[1], photos_description=sys.argv[2] if len(sys.argv)>2 else None), indent=2))

343
server.py Normal file
View File

@@ -0,0 +1,343 @@
"""Handoff Pro MCP Server — SSE transport for mcporter compatibility."""
import json
import os
import sys
import queue
import threading
import uuid
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import urlparse, parse_qs
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from lib import db
# Import tool handlers
from scripts import pricing_catalog, project_mgmt, jobtread_sync
from scripts import estimate_generator, proposal_drafter, invoice_mgmt
from scripts import scope_generator, material_list, daily_log
from scripts import site_analyzer, budget_tracker, project_scheduler
from scripts import signature_mgmt, activity_logger
PORT = int(os.environ.get("HANDOFF_PORT", 3101))
# Tool registry: name -> {description, input_schema, handler}
TOOLS = {}
def tool(name, description, schema):
"""Decorator to register an MCP tool."""
def decorator(fn):
TOOLS[name] = {"description": description, "inputSchema": schema, "handler": fn}
return fn
return decorator
# --- Pricing Catalog Tools ---
@tool("list_catalogs", "List all pricing catalogs", {"type": "object", "properties": {}})
def _list_catalogs(args): return pricing_catalog.list_catalogs()
@tool("create_catalog", "Create a pricing catalog", {
"type": "object", "properties": {
"name": {"type": "string"}, "labor_rates": {"type": "object"},
"material_markups": {"type": "object"}
}, "required": ["name", "labor_rates"]})
def _create_catalog(args): return pricing_catalog.create_catalog(**args)
@tool("get_catalog", "Get a pricing catalog by ID", {
"type": "object", "properties": {"catalog_id": {"type": "string"}}, "required": ["catalog_id"]})
def _get_catalog(args): return pricing_catalog.get_catalog(args["catalog_id"])
@tool("update_catalog", "Update a pricing catalog", {
"type": "object", "properties": {
"catalog_id": {"type": "string"}, "name": {"type": "string"},
"labor_rates": {"type": "object"}, "material_markups": {"type": "object"}
}, "required": ["catalog_id"]})
def _update_catalog(args): return pricing_catalog.update_catalog(**args)
@tool("delete_catalog", "Delete a pricing catalog", {
"type": "object", "properties": {"catalog_id": {"type": "string"}}, "required": ["catalog_id"]})
def _delete_catalog(args): return pricing_catalog.delete_catalog(args["catalog_id"])
# --- Project Management Tools ---
@tool("create_job", "Create a new job", {
"type": "object", "properties": {
"client_name": {"type": "string"}, "address": {"type": "string"},
"description": {"type": "string"}
}, "required": ["client_name"]})
def _create_job(args): return project_mgmt.create_job(**args)
@tool("list_jobs", "List all jobs", {
"type": "object", "properties": {"status": {"type": "string"}}})
def _list_jobs(args): return project_mgmt.list_jobs(args.get("status"))
@tool("get_job", "Get job details", {
"type": "object", "properties": {"job_id": {"type": "string"}}, "required": ["job_id"]})
def _get_job(args): return project_mgmt.get_job(args["job_id"])
@tool("update_job", "Update a job", {
"type": "object", "properties": {
"job_id": {"type": "string"}, "status": {"type": "string"},
"client_name": {"type": "string"}, "address": {"type": "string"},
"description": {"type": "string"}
}, "required": ["job_id"]})
def _update_job(args): return project_mgmt.update_job(**args)
@tool("create_estimate", "Create an estimate for a job", {
"type": "object", "properties": {
"job_id": {"type": "string"},
"line_items": {"type": "array", "items": {"type": "object"}},
"labor_hours": {"type": "number"}, "labor_rate": {"type": "number"},
"markup_percent": {"type": "number"}
}, "required": ["job_id", "line_items"]})
def _create_estimate(args): return project_mgmt.create_estimate(**args)
@tool("get_estimate", "Get estimate details with line items", {
"type": "object", "properties": {"estimate_id": {"type": "string"}}, "required": ["estimate_id"]})
def _get_estimate(args): return project_mgmt.get_estimate(args["estimate_id"])
# --- JobTread Sync Tools ---
@tool("jt_list_customers", "List customers from JobTread", {"type": "object", "properties": {}})
def _jt_customers(args): return jobtread_sync.list_customers()
@tool("jt_pull_jobs", "Pull all jobs from JobTread into local DB", {"type": "object", "properties": {}})
def _jt_pull(args): return jobtread_sync.pull_jobs()
@tool("jt_push_estimate", "Push estimate to JobTread", {
"type": "object", "properties": {"estimate_id": {"type": "string"}}, "required": ["estimate_id"]})
def _jt_push_est(args): return jobtread_sync.push_estimate(args["estimate_id"])
@tool("jt_push_invoice", "Push invoice to JobTread", {
"type": "object", "properties": {"invoice_id": {"type": "string"}}, "required": ["invoice_id"]})
def _jt_push_inv(args): return jobtread_sync.push_invoice(args["invoice_id"])
@tool("jt_sync_status", "Get sync status for a job", {
"type": "object", "properties": {"job_id": {"type": "string"}}, "required": ["job_id"]})
def _jt_status(args): return jobtread_sync.get_sync_status(args["job_id"])
# --- Estimate Generator ---
@tool("generate_estimate", "Generate estimate from job description", {
"type": "object", "properties": {
"description": {"type": "string"}, "job_id": {"type": "string"},
"catalog_id": {"type": "string"}, "markup_percent": {"type": "number"}
}, "required": ["description"]})
def _gen_estimate(args): return estimate_generator.generate(**args)
# --- Proposal Drafter ---
@tool("draft_proposal", "Generate a professional proposal from an estimate", {
"type": "object", "properties": {"estimate_id": {"type": "string"}}, "required": ["estimate_id"]})
def _draft_proposal(args): return proposal_drafter.draft(args["estimate_id"])
# --- Invoice Management ---
@tool("create_invoice", "Create invoice from estimate", {
"type": "object", "properties": {
"estimate_id": {"type": "string"}, "due_date": {"type": "string"}
}, "required": ["estimate_id"]})
def _create_invoice(args): return invoice_mgmt.create(**args)
@tool("list_invoices", "List invoices", {
"type": "object", "properties": {"job_id": {"type": "string"}}})
def _list_invoices(args): return invoice_mgmt.list_invoices(args.get("job_id"))
@tool("mark_invoice_paid", "Mark invoice as paid", {
"type": "object", "properties": {
"invoice_id": {"type": "string"}, "amount_paid": {"type": "number"}
}, "required": ["invoice_id"]})
def _mark_paid(args): return invoice_mgmt.mark_paid(**args)
@tool("sync_invoice_qbo", "Sync invoice to QuickBooks Online", {
"type": "object", "properties": {"invoice_id": {"type": "string"}}, "required": ["invoice_id"]})
def _sync_qbo(args): return invoice_mgmt.sync_to_qbo(args["invoice_id"])
# --- Scope & Materials ---
@tool("generate_scope", "Generate scope of work from estimate", {
"type": "object", "properties": {"estimate_id": {"type": "string"}}, "required": ["estimate_id"]})
def _gen_scope(args): return scope_generator.generate(args["estimate_id"])
@tool("generate_material_list", "Generate material shopping list from estimate", {
"type": "object", "properties": {"estimate_id": {"type": "string"}}, "required": ["estimate_id"]})
def _gen_materials(args): return material_list.generate(args["estimate_id"])
# --- Daily Logs ---
@tool("create_daily_log", "Create a daily job log entry", {
"type": "object", "properties": {
"job_id": {"type": "string"}, "crew_names": {"type": "string"},
"hours_worked": {"type": "number"}, "notes": {"type": "string"},
"photos": {"type": "array", "items": {"type": "string"}}
}, "required": ["job_id", "notes"]})
def _create_log(args): return daily_log.create(**args)
@tool("list_daily_logs", "List daily logs for a job", {
"type": "object", "properties": {"job_id": {"type": "string"}}, "required": ["job_id"]})
def _list_logs(args): return daily_log.list_logs(args["job_id"])
# --- Site Analyzer ---
@tool("analyze_site", "Analyze site conditions from observations", {
"type": "object", "properties": {
"job_id": {"type": "string"}, "observations": {"type": "string"},
"photos_description": {"type": "string"}
}, "required": ["observations"]})
def _analyze_site(args): return site_analyzer.analyze(**args)
# --- Budget Tracker ---
@tool("get_budget_report", "Get budget vs actual report for a job", {
"type": "object", "properties": {"job_id": {"type": "string"}}, "required": ["job_id"]})
def _budget_report(args): return budget_tracker.report(args["job_id"])
# --- Project Scheduler ---
@tool("set_milestone", "Set a project milestone", {
"type": "object", "properties": {
"job_id": {"type": "string"}, "phase_name": {"type": "string"},
"start_date": {"type": "string"}, "end_date": {"type": "string"}
}, "required": ["job_id", "phase_name"]})
def _set_milestone(args): return project_scheduler.set_milestone(**args)
@tool("get_schedule", "Get project schedule/timeline", {
"type": "object", "properties": {"job_id": {"type": "string"}}, "required": ["job_id"]})
def _get_schedule(args): return project_scheduler.get_schedule(args["job_id"])
# --- Signatures & Activity ---
@tool("request_signature", "Request client signature on estimate", {
"type": "object", "properties": {
"estimate_id": {"type": "string"}, "client_name": {"type": "string"}
}, "required": ["estimate_id", "client_name"]})
def _req_sig(args): return signature_mgmt.request_signature(**args)
@tool("record_signature", "Record a client signature", {
"type": "object", "properties": {
"signature_id": {"type": "string"}, "signature_data": {"type": "string"}
}, "required": ["signature_id"]})
def _rec_sig(args): return signature_mgmt.record_signature(**args)
@tool("log_activity", "Log client activity (viewed, approved, signed, paid)", {
"type": "object", "properties": {
"estimate_id": {"type": "string"}, "job_id": {"type": "string"},
"action": {"type": "string"}
}, "required": ["action"]})
def _log_activity(args): return activity_logger.log(**args)
@tool("get_activity", "Get activity log for an estimate or job", {
"type": "object", "properties": {
"estimate_id": {"type": "string"}, "job_id": {"type": "string"}
}})
def _get_activity(args): return activity_logger.get(**args)
# --- MCP SSE Server Implementation ---
class SSEClient:
def __init__(self):
self.queue = queue.Queue()
self.id = str(uuid.uuid4())
clients = {}
def send_sse(client_id, event, data):
if client_id in clients:
clients[client_id].queue.put(f"event: {event}\ndata: {json.dumps(data)}\n\n")
class MCPHandler(BaseHTTPRequestHandler):
def log_message(self, format, *args): pass
def do_GET(self):
path = urlparse(self.path).path
if path == "/sse":
self._handle_sse()
elif path == "/health":
self._json_response({"ok": True, "tools": len(TOOLS)})
else:
self.send_error(404)
def do_POST(self):
path = urlparse(self.path).path
if path == "/messages":
self._handle_message()
else:
self.send_error(404)
def _handle_sse(self):
client = SSEClient()
clients[client.id] = client
self.send_response(200)
self.send_header("Content-Type", "text/event-stream")
self.send_header("Cache-Control", "no-cache")
self.send_header("Connection", "keep-alive")
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
# Send endpoint info
endpoint_msg = {"jsonrpc": "2.0", "method": "endpoint",
"params": {"uri": f"http://0.0.0.0:{PORT}/messages?client_id={client.id}"}}
self.wfile.write(f"event: endpoint\ndata: {json.dumps(endpoint_msg)}\n\n".encode())
self.wfile.flush()
try:
while True:
try:
msg = client.queue.get(timeout=30)
self.wfile.write(msg.encode())
self.wfile.flush()
except queue.Empty:
self.wfile.write(": keepalive\n\n".encode())
self.wfile.flush()
except (BrokenPipeError, ConnectionResetError):
pass
finally:
clients.pop(client.id, None)
def _handle_message(self):
params = parse_qs(urlparse(self.path).query)
client_id = params.get("client_id", [None])[0]
length = int(self.headers.get("Content-Length", 0))
body = json.loads(self.rfile.read(length)) if length else {}
response = self._process_jsonrpc(body)
if client_id and client_id in clients:
send_sse(client_id, "message", response)
self._json_response(response)
def _process_jsonrpc(self, msg):
method = msg.get("method", "")
msg_id = msg.get("id")
if method == "initialize":
return {"jsonrpc": "2.0", "id": msg_id, "result": {
"protocolVersion": "2024-11-05",
"capabilities": {"tools": {}},
"serverInfo": {"name": "handoff-pro", "version": "1.0.0"}
}}
elif method == "notifications/initialized":
return {"jsonrpc": "2.0", "id": msg_id, "result": None}
elif method == "tools/list":
tools_list = [{"name": k, "description": v["description"],
"inputSchema": v["inputSchema"]} for k, v in TOOLS.items()]
return {"jsonrpc": "2.0", "id": msg_id, "result": {"tools": tools_list}}
elif method == "tools/call":
tool_name = msg.get("params", {}).get("name", "")
arguments = msg.get("params", {}).get("arguments", {})
if tool_name not in TOOLS:
return {"jsonrpc": "2.0", "id": msg_id, "error": {
"code": -32601, "message": f"Unknown tool: {tool_name}"}}
try:
result = TOOLS[tool_name]["handler"](arguments)
return {"jsonrpc": "2.0", "id": msg_id, "result": {
"content": [{"type": "text", "text": json.dumps(result, default=str)}]
}}
except Exception as e:
return {"jsonrpc": "2.0", "id": msg_id, "result": {
"content": [{"type": "text", "text": json.dumps({"error": str(e)})}],
"isError": True
}}
return {"jsonrpc": "2.0", "id": msg_id, "error": {
"code": -32601, "message": f"Unknown method: {method}"}}
def _json_response(self, data):
body = json.dumps(data).encode()
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", len(body))
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
self.wfile.write(body)
def main():
db.init_db()
server = HTTPServer(("0.0.0.0", PORT), MCPHandler)
print(f"Handoff Pro MCP server running on port {PORT}")
print(f"SSE endpoint: http://0.0.0.0:{PORT}/sse")
print(f"Tools registered: {len(TOOLS)}")
server.serve_forever()
if __name__ == "__main__":
main()

0
tests/__init__.py Normal file
View File

149
tests/test_workflows.py Normal file
View File

@@ -0,0 +1,149 @@
"""End-to-end workflow tests for Handoff Pro."""
import json, os, sys, tempfile, unittest
from unittest.mock import patch
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from lib import db as db_mod
from scripts import pricing_catalog, project_mgmt, estimate_generator
from scripts import proposal_drafter, invoice_mgmt, daily_log
from scripts import scope_generator, material_list, budget_tracker
from scripts import project_scheduler, signature_mgmt, activity_logger, site_analyzer
def fresh_db():
"""Point to a fresh temp DB and initialize it."""
import tempfile as tf
path = tf.mktemp(suffix=".db")
db_mod.DB_PATH = path
db_mod.init_db()
return path
class TestWorkflow1_NewEstimate(unittest.TestCase):
def setUp(self):
self.db = fresh_db()
pricing_catalog.seed_default()
def tearDown(self):
os.unlink(self.db)
def test_full_estimate_workflow(self):
# Generate estimate from description
result = estimate_generator.generate("kitchen remodel, 200sqft, new cabinets, countertops, flooring")
self.assertIn("id", result)
self.assertGreater(result["total_cost"], 0)
# Verify estimate retrievable
est = project_mgmt.get_estimate(result["id"])
self.assertEqual(est["id"], result["id"])
self.assertGreater(len(est["line_items"]), 0)
class TestWorkflow2_Proposal(unittest.TestCase):
def setUp(self):
self.db = fresh_db()
pricing_catalog.seed_default()
def tearDown(self):
os.unlink(self.db)
def test_proposal_generation(self):
est = estimate_generator.generate("bathroom renovation, tile, plumbing, 100sqft")
result = proposal_drafter.draft(est["id"])
self.assertIn("proposal", result)
self.assertIn("Kellow Construction", result["proposal"])
self.assertIn("Payment Terms", result["proposal"])
class TestWorkflow3_DailyLog(unittest.TestCase):
def setUp(self):
self.db = fresh_db()
def tearDown(self):
os.unlink(self.db)
def test_daily_log_workflow(self):
job = project_mgmt.create_job("Test Client", "123 Main St")
log = daily_log.create(job["id"], "Framed exterior walls", crew_names="Mike, Sarah", hours_worked=8)
self.assertIn("id", log)
logs = daily_log.list_logs(job["id"])
self.assertEqual(len(logs["logs"]), 1)
class TestWorkflow4_Invoice(unittest.TestCase):
def setUp(self):
self.db = fresh_db()
pricing_catalog.seed_default()
def tearDown(self):
os.unlink(self.db)
def test_invoice_lifecycle(self):
est = estimate_generator.generate("deck build, 300sqft")
inv = invoice_mgmt.create(est["id"])
self.assertIn("invoice_number", inv)
self.assertGreater(inv["amount_due"], 0)
# Mark paid
paid = invoice_mgmt.mark_paid(inv["id"])
self.assertEqual(paid["status"], "paid")
class TestWorkflow5_JobTreadSync(unittest.TestCase):
def setUp(self):
self.db = fresh_db()
def tearDown(self):
os.unlink(self.db)
@patch("scripts.jobtread_sync.pave_query")
@patch("scripts.jobtread_sync.get_org_id", return_value="test_org")
def test_pull_jobs(self, mock_org, mock_pave):
mock_pave.return_value = {"organization": {"jobs": {"nodes": [
{"id": "jt123", "name": "Test Job", "status": "active", "createdAt": "2026-01-01",
"account": {"id": "acc1", "name": "John Smith"}, "location": {"address": "456 Oak Ave"}}
]}}}
from scripts import jobtread_sync
result = jobtread_sync.pull_jobs()
self.assertEqual(result["synced"], 1)
# Verify in local DB
jobs = project_mgmt.list_jobs()
self.assertGreater(len(jobs["jobs"]), 0)
class TestBudgetTracker(unittest.TestCase):
def setUp(self):
self.db = fresh_db()
pricing_catalog.seed_default()
def tearDown(self):
os.unlink(self.db)
def test_budget_report(self):
est = estimate_generator.generate("framing project, 500sqft")
job_id = project_mgmt.get_estimate(est["id"])["job_id"]
daily_log.create(job_id, "Day 1 framing", hours_worked=10)
report = budget_tracker.report(job_id)
self.assertIn("budget", report)
self.assertIn("actual", report)
self.assertEqual(report["actual"]["hours_worked"], 10)
class TestSiteAnalyzer(unittest.TestCase):
def test_issue_detection(self):
result = site_analyzer.analyze("Found water damage in the corner near the window, also some cracking in foundation")
self.assertGreater(result["issues_found"], 0)
self.assertGreater(result["total_cost_impact"], 0)
class TestScopeAndMaterials(unittest.TestCase):
def setUp(self):
self.db = fresh_db()
pricing_catalog.seed_default()
def tearDown(self):
os.unlink(self.db)
def test_scope_generation(self):
est = estimate_generator.generate("bathroom tile, plumbing fixtures")
scope = scope_generator.generate(est["id"])
self.assertIn("scope", scope)
self.assertIn("Scope of Work", scope["scope"])
def test_material_list(self):
est = estimate_generator.generate("kitchen cabinets, countertops, flooring")
mats = material_list.generate(est["id"])
self.assertGreater(mats["items_count"], 0)
if __name__ == "__main__":
unittest.main()