Files
handoff-pro/server.py

349 lines
15 KiB
Python
Raw Normal View History

"""Handoff Pro MCP Server — SSE transport for mcporter compatibility."""
import json
import os
import sys
import queue
import threading
import uuid
from http.server import HTTPServer, BaseHTTPRequestHandler
from socketserver import ThreadingMixIn
from urllib.parse import urlparse, parse_qs
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from lib import db
# Import tool handlers
from scripts import pricing_catalog, project_mgmt, jobtread_sync
from scripts import estimate_generator, proposal_drafter, invoice_mgmt
from scripts import scope_generator, material_list, daily_log
from scripts import site_analyzer, budget_tracker, project_scheduler
from scripts import signature_mgmt, activity_logger
PORT = int(os.environ.get("HANDOFF_PORT", 3101))
# Tool registry: name -> {description, input_schema, handler}
TOOLS = {}
def tool(name, description, schema):
"""Decorator to register an MCP tool."""
def decorator(fn):
TOOLS[name] = {"description": description, "inputSchema": schema, "handler": fn}
return fn
return decorator
# --- Pricing Catalog Tools ---
@tool("list_catalogs", "List all pricing catalogs", {"type": "object", "properties": {}})
def _list_catalogs(args): return pricing_catalog.list_catalogs()
@tool("create_catalog", "Create a pricing catalog", {
"type": "object", "properties": {
"name": {"type": "string"}, "labor_rates": {"type": "object"},
"material_markups": {"type": "object"}
}, "required": ["name", "labor_rates"]})
def _create_catalog(args): return pricing_catalog.create_catalog(**args)
@tool("get_catalog", "Get a pricing catalog by ID", {
"type": "object", "properties": {"catalog_id": {"type": "string"}}, "required": ["catalog_id"]})
def _get_catalog(args): return pricing_catalog.get_catalog(args["catalog_id"])
@tool("update_catalog", "Update a pricing catalog", {
"type": "object", "properties": {
"catalog_id": {"type": "string"}, "name": {"type": "string"},
"labor_rates": {"type": "object"}, "material_markups": {"type": "object"}
}, "required": ["catalog_id"]})
def _update_catalog(args): return pricing_catalog.update_catalog(**args)
@tool("delete_catalog", "Delete a pricing catalog", {
"type": "object", "properties": {"catalog_id": {"type": "string"}}, "required": ["catalog_id"]})
def _delete_catalog(args): return pricing_catalog.delete_catalog(args["catalog_id"])
# --- Project Management Tools ---
@tool("create_job", "Create a new job", {
"type": "object", "properties": {
"client_name": {"type": "string"}, "address": {"type": "string"},
"description": {"type": "string"}
}, "required": ["client_name"]})
def _create_job(args): return project_mgmt.create_job(**args)
@tool("list_jobs", "List all jobs", {
"type": "object", "properties": {"status": {"type": "string"}}})
def _list_jobs(args): return project_mgmt.list_jobs(args.get("status"))
@tool("get_job", "Get job details", {
"type": "object", "properties": {"job_id": {"type": "string"}}, "required": ["job_id"]})
def _get_job(args): return project_mgmt.get_job(args["job_id"])
@tool("update_job", "Update a job", {
"type": "object", "properties": {
"job_id": {"type": "string"}, "status": {"type": "string"},
"client_name": {"type": "string"}, "address": {"type": "string"},
"description": {"type": "string"}
}, "required": ["job_id"]})
def _update_job(args): return project_mgmt.update_job(**args)
@tool("create_estimate", "Create an estimate for a job", {
"type": "object", "properties": {
"job_id": {"type": "string"},
"line_items": {"type": "array", "items": {"type": "object"}},
"labor_hours": {"type": "number"}, "labor_rate": {"type": "number"},
"markup_percent": {"type": "number"}
}, "required": ["job_id", "line_items"]})
def _create_estimate(args): return project_mgmt.create_estimate(**args)
@tool("get_estimate", "Get estimate details with line items", {
"type": "object", "properties": {"estimate_id": {"type": "string"}}, "required": ["estimate_id"]})
def _get_estimate(args): return project_mgmt.get_estimate(args["estimate_id"])
# --- JobTread Sync Tools ---
@tool("jt_list_customers", "List customers from JobTread", {"type": "object", "properties": {}})
def _jt_customers(args): return jobtread_sync.list_customers()
@tool("jt_pull_jobs", "Pull all jobs from JobTread into local DB", {"type": "object", "properties": {}})
def _jt_pull(args): return jobtread_sync.pull_jobs()
@tool("jt_push_estimate", "Push estimate to JobTread", {
"type": "object", "properties": {"estimate_id": {"type": "string"}}, "required": ["estimate_id"]})
def _jt_push_est(args): return jobtread_sync.push_estimate(args["estimate_id"])
@tool("jt_push_invoice", "Push invoice to JobTread", {
"type": "object", "properties": {"invoice_id": {"type": "string"}}, "required": ["invoice_id"]})
def _jt_push_inv(args): return jobtread_sync.push_invoice(args["invoice_id"])
@tool("jt_sync_status", "Get sync status for a job", {
"type": "object", "properties": {"job_id": {"type": "string"}}, "required": ["job_id"]})
def _jt_status(args): return jobtread_sync.get_sync_status(args["job_id"])
# --- Estimate Generator ---
@tool("generate_estimate", "Generate estimate from job description", {
"type": "object", "properties": {
"description": {"type": "string"}, "job_id": {"type": "string"},
"catalog_id": {"type": "string"}, "markup_percent": {"type": "number"}
}, "required": ["description"]})
def _gen_estimate(args): return estimate_generator.generate(**args)
# --- Proposal Drafter ---
@tool("draft_proposal", "Generate a professional proposal from an estimate", {
"type": "object", "properties": {"estimate_id": {"type": "string"}}, "required": ["estimate_id"]})
def _draft_proposal(args): return proposal_drafter.draft(args["estimate_id"])
# --- Invoice Management ---
@tool("create_invoice", "Create invoice from estimate", {
"type": "object", "properties": {
"estimate_id": {"type": "string"}, "due_date": {"type": "string"}
}, "required": ["estimate_id"]})
def _create_invoice(args): return invoice_mgmt.create(**args)
@tool("list_invoices", "List invoices", {
"type": "object", "properties": {"job_id": {"type": "string"}}})
def _list_invoices(args): return invoice_mgmt.list_invoices(args.get("job_id"))
@tool("mark_invoice_paid", "Mark invoice as paid", {
"type": "object", "properties": {
"invoice_id": {"type": "string"}, "amount_paid": {"type": "number"}
}, "required": ["invoice_id"]})
def _mark_paid(args): return invoice_mgmt.mark_paid(**args)
@tool("sync_invoice_qbo", "Sync invoice to QuickBooks Online", {
"type": "object", "properties": {"invoice_id": {"type": "string"}}, "required": ["invoice_id"]})
def _sync_qbo(args): return invoice_mgmt.sync_to_qbo(args["invoice_id"])
# --- Scope & Materials ---
@tool("generate_scope", "Generate scope of work from estimate", {
"type": "object", "properties": {"estimate_id": {"type": "string"}}, "required": ["estimate_id"]})
def _gen_scope(args): return scope_generator.generate(args["estimate_id"])
@tool("generate_material_list", "Generate material shopping list from estimate", {
"type": "object", "properties": {"estimate_id": {"type": "string"}}, "required": ["estimate_id"]})
def _gen_materials(args): return material_list.generate(args["estimate_id"])
# --- Daily Logs ---
@tool("create_daily_log", "Create a daily job log entry", {
"type": "object", "properties": {
"job_id": {"type": "string"}, "crew_names": {"type": "string"},
"hours_worked": {"type": "number"}, "notes": {"type": "string"},
"photos": {"type": "array", "items": {"type": "string"}}
}, "required": ["job_id", "notes"]})
def _create_log(args): return daily_log.create(**args)
@tool("list_daily_logs", "List daily logs for a job", {
"type": "object", "properties": {"job_id": {"type": "string"}}, "required": ["job_id"]})
def _list_logs(args): return daily_log.list_logs(args["job_id"])
# --- Site Analyzer ---
@tool("analyze_site", "Analyze site conditions from observations", {
"type": "object", "properties": {
"job_id": {"type": "string"}, "observations": {"type": "string"},
"photos_description": {"type": "string"}
}, "required": ["observations"]})
def _analyze_site(args): return site_analyzer.analyze(**args)
# --- Budget Tracker ---
@tool("get_budget_report", "Get budget vs actual report for a job", {
"type": "object", "properties": {"job_id": {"type": "string"}}, "required": ["job_id"]})
def _budget_report(args): return budget_tracker.report(args["job_id"])
# --- Project Scheduler ---
@tool("set_milestone", "Set a project milestone", {
"type": "object", "properties": {
"job_id": {"type": "string"}, "phase_name": {"type": "string"},
"start_date": {"type": "string"}, "end_date": {"type": "string"}
}, "required": ["job_id", "phase_name"]})
def _set_milestone(args): return project_scheduler.set_milestone(**args)
@tool("get_schedule", "Get project schedule/timeline", {
"type": "object", "properties": {"job_id": {"type": "string"}}, "required": ["job_id"]})
def _get_schedule(args): return project_scheduler.get_schedule(args["job_id"])
# --- Signatures & Activity ---
@tool("request_signature", "Request client signature on estimate", {
"type": "object", "properties": {
"estimate_id": {"type": "string"}, "client_name": {"type": "string"}
}, "required": ["estimate_id", "client_name"]})
def _req_sig(args): return signature_mgmt.request_signature(**args)
@tool("record_signature", "Record a client signature", {
"type": "object", "properties": {
"signature_id": {"type": "string"}, "signature_data": {"type": "string"}
}, "required": ["signature_id"]})
def _rec_sig(args): return signature_mgmt.record_signature(**args)
@tool("log_activity", "Log client activity (viewed, approved, signed, paid)", {
"type": "object", "properties": {
"estimate_id": {"type": "string"}, "job_id": {"type": "string"},
"action": {"type": "string"}
}, "required": ["action"]})
def _log_activity(args): return activity_logger.log(**args)
@tool("get_activity", "Get activity log for an estimate or job", {
"type": "object", "properties": {
"estimate_id": {"type": "string"}, "job_id": {"type": "string"}
}})
def _get_activity(args): return activity_logger.get(**args)
# --- MCP SSE Server Implementation ---
class SSEClient:
def __init__(self):
self.queue = queue.Queue()
self.id = str(uuid.uuid4())
clients = {}
def send_sse(client_id, event, data):
if client_id in clients:
clients[client_id].queue.put(f"event: {event}\ndata: {json.dumps(data)}\n\n")
class MCPHandler(BaseHTTPRequestHandler):
def log_message(self, format, *args): pass
def do_GET(self):
path = urlparse(self.path).path
if path == "/sse":
self._handle_sse()
elif path == "/health":
self._json_response({"ok": True, "tools": len(TOOLS)})
else:
self.send_error(404)
def do_POST(self):
path = urlparse(self.path).path
if path == "/messages":
self._handle_message()
else:
self.send_error(404)
def _handle_sse(self):
client = SSEClient()
clients[client.id] = client
self.send_response(200)
self.send_header("Content-Type", "text/event-stream")
self.send_header("Cache-Control", "no-cache")
self.send_header("Connection", "keep-alive")
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
self.wfile.flush()
# Send endpoint event immediately per MCP SSE spec
msg_url = f"http://192.168.86.11:{PORT}/messages?client_id={client.id}"
self.wfile.write(f"event: endpoint\ndata: {msg_url}\n\n".encode())
self.wfile.flush()
try:
while True:
try:
msg = client.queue.get(timeout=30)
self.wfile.write(msg.encode())
self.wfile.flush()
except queue.Empty:
self.wfile.write(": keepalive\n\n".encode())
self.wfile.flush()
except (BrokenPipeError, ConnectionResetError):
pass
finally:
clients.pop(client.id, None)
def _handle_message(self):
params = parse_qs(urlparse(self.path).query)
client_id = params.get("client_id", [None])[0]
length = int(self.headers.get("Content-Length", 0))
body = json.loads(self.rfile.read(length)) if length else {}
response = self._process_jsonrpc(body)
if client_id and client_id in clients:
send_sse(client_id, "message", response)
self._json_response(response)
def _process_jsonrpc(self, msg):
method = msg.get("method", "")
msg_id = msg.get("id")
if method == "initialize":
return {"jsonrpc": "2.0", "id": msg_id, "result": {
"protocolVersion": "2024-11-05",
"capabilities": {"tools": {}},
"serverInfo": {"name": "handoff-pro", "version": "1.0.0"}
}}
elif method == "notifications/initialized":
return {"jsonrpc": "2.0", "id": msg_id, "result": None}
elif method == "tools/list":
tools_list = [{"name": k, "description": v["description"],
"inputSchema": v["inputSchema"]} for k, v in TOOLS.items()]
return {"jsonrpc": "2.0", "id": msg_id, "result": {"tools": tools_list}}
elif method == "tools/call":
tool_name = msg.get("params", {}).get("name", "")
arguments = msg.get("params", {}).get("arguments", {})
if tool_name not in TOOLS:
return {"jsonrpc": "2.0", "id": msg_id, "error": {
"code": -32601, "message": f"Unknown tool: {tool_name}"}}
try:
result = TOOLS[tool_name]["handler"](arguments)
return {"jsonrpc": "2.0", "id": msg_id, "result": {
"content": [{"type": "text", "text": json.dumps(result, default=str)}]
}}
except Exception as e:
return {"jsonrpc": "2.0", "id": msg_id, "result": {
"content": [{"type": "text", "text": json.dumps({"error": str(e)})}],
"isError": True
}}
return {"jsonrpc": "2.0", "id": msg_id, "error": {
"code": -32601, "message": f"Unknown method: {method}"}}
def _json_response(self, data):
body = json.dumps(data).encode()
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", len(body))
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
self.wfile.write(body)
class ThreadingHTTPServer(ThreadingMixIn, HTTPServer):
daemon_threads = True
def main():
db.init_db()
server = ThreadingHTTPServer(("0.0.0.0", PORT), MCPHandler)
print(f"Handoff Pro MCP server running on port {PORT}")
print(f"SSE endpoint: http://0.0.0.0:{PORT}/sse")
print(f"Tools registered: {len(TOOLS)}")
sys.stdout.flush()
server.serve_forever()
if __name__ == "__main__":
main()