"""Handoff Pro MCP Server — SSE transport for mcporter compatibility.""" import json import os import sys import queue import threading import uuid from http.server import HTTPServer, BaseHTTPRequestHandler from socketserver import ThreadingMixIn from urllib.parse import urlparse, parse_qs sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from lib import db # Import tool handlers from scripts import pricing_catalog, project_mgmt, jobtread_sync from scripts import estimate_generator, proposal_drafter, invoice_mgmt from scripts import scope_generator, material_list, daily_log from scripts import site_analyzer, budget_tracker, project_scheduler from scripts import signature_mgmt, activity_logger PORT = int(os.environ.get("HANDOFF_PORT", 3101)) # Tool registry: name -> {description, input_schema, handler} TOOLS = {} def tool(name, description, schema): """Decorator to register an MCP tool.""" def decorator(fn): TOOLS[name] = {"description": description, "inputSchema": schema, "handler": fn} return fn return decorator # --- Pricing Catalog Tools --- @tool("list_catalogs", "List all pricing catalogs", {"type": "object", "properties": {}}) def _list_catalogs(args): return pricing_catalog.list_catalogs() @tool("create_catalog", "Create a pricing catalog", { "type": "object", "properties": { "name": {"type": "string"}, "labor_rates": {"type": "object"}, "material_markups": {"type": "object"} }, "required": ["name", "labor_rates"]}) def _create_catalog(args): return pricing_catalog.create_catalog(**args) @tool("get_catalog", "Get a pricing catalog by ID", { "type": "object", "properties": {"catalog_id": {"type": "string"}}, "required": ["catalog_id"]}) def _get_catalog(args): return pricing_catalog.get_catalog(args["catalog_id"]) @tool("update_catalog", "Update a pricing catalog", { "type": "object", "properties": { "catalog_id": {"type": "string"}, "name": {"type": "string"}, "labor_rates": {"type": "object"}, "material_markups": {"type": "object"} }, "required": ["catalog_id"]}) def _update_catalog(args): return pricing_catalog.update_catalog(**args) @tool("delete_catalog", "Delete a pricing catalog", { "type": "object", "properties": {"catalog_id": {"type": "string"}}, "required": ["catalog_id"]}) def _delete_catalog(args): return pricing_catalog.delete_catalog(args["catalog_id"]) # --- Project Management Tools --- @tool("create_job", "Create a new job", { "type": "object", "properties": { "client_name": {"type": "string"}, "address": {"type": "string"}, "description": {"type": "string"} }, "required": ["client_name"]}) def _create_job(args): return project_mgmt.create_job(**args) @tool("list_jobs", "List all jobs", { "type": "object", "properties": {"status": {"type": "string"}}}) def _list_jobs(args): return project_mgmt.list_jobs(args.get("status")) @tool("get_job", "Get job details", { "type": "object", "properties": {"job_id": {"type": "string"}}, "required": ["job_id"]}) def _get_job(args): return project_mgmt.get_job(args["job_id"]) @tool("update_job", "Update a job", { "type": "object", "properties": { "job_id": {"type": "string"}, "status": {"type": "string"}, "client_name": {"type": "string"}, "address": {"type": "string"}, "description": {"type": "string"} }, "required": ["job_id"]}) def _update_job(args): return project_mgmt.update_job(**args) @tool("create_estimate", "Create an estimate for a job", { "type": "object", "properties": { "job_id": {"type": "string"}, "line_items": {"type": "array", "items": {"type": "object"}}, "labor_hours": {"type": "number"}, "labor_rate": {"type": "number"}, "markup_percent": {"type": "number"} }, "required": ["job_id", "line_items"]}) def _create_estimate(args): return project_mgmt.create_estimate(**args) @tool("get_estimate", "Get estimate details with line items", { "type": "object", "properties": {"estimate_id": {"type": "string"}}, "required": ["estimate_id"]}) def _get_estimate(args): return project_mgmt.get_estimate(args["estimate_id"]) # --- JobTread Sync Tools --- @tool("jt_list_customers", "List customers from JobTread", {"type": "object", "properties": {}}) def _jt_customers(args): return jobtread_sync.list_customers() @tool("jt_pull_jobs", "Pull all jobs from JobTread into local DB", {"type": "object", "properties": {}}) def _jt_pull(args): return jobtread_sync.pull_jobs() @tool("jt_push_estimate", "Push estimate to JobTread", { "type": "object", "properties": {"estimate_id": {"type": "string"}}, "required": ["estimate_id"]}) def _jt_push_est(args): return jobtread_sync.push_estimate(args["estimate_id"]) @tool("jt_push_invoice", "Push invoice to JobTread", { "type": "object", "properties": {"invoice_id": {"type": "string"}}, "required": ["invoice_id"]}) def _jt_push_inv(args): return jobtread_sync.push_invoice(args["invoice_id"]) @tool("jt_sync_status", "Get sync status for a job", { "type": "object", "properties": {"job_id": {"type": "string"}}, "required": ["job_id"]}) def _jt_status(args): return jobtread_sync.get_sync_status(args["job_id"]) # --- Estimate Generator --- @tool("generate_estimate", "Generate estimate from job description", { "type": "object", "properties": { "description": {"type": "string"}, "job_id": {"type": "string"}, "catalog_id": {"type": "string"}, "markup_percent": {"type": "number"} }, "required": ["description"]}) def _gen_estimate(args): return estimate_generator.generate(**args) # --- Proposal Drafter --- @tool("draft_proposal", "Generate a professional proposal from an estimate", { "type": "object", "properties": {"estimate_id": {"type": "string"}}, "required": ["estimate_id"]}) def _draft_proposal(args): return proposal_drafter.draft(args["estimate_id"]) # --- Invoice Management --- @tool("create_invoice", "Create invoice from estimate", { "type": "object", "properties": { "estimate_id": {"type": "string"}, "due_date": {"type": "string"} }, "required": ["estimate_id"]}) def _create_invoice(args): return invoice_mgmt.create(**args) @tool("list_invoices", "List invoices", { "type": "object", "properties": {"job_id": {"type": "string"}}}) def _list_invoices(args): return invoice_mgmt.list_invoices(args.get("job_id")) @tool("mark_invoice_paid", "Mark invoice as paid", { "type": "object", "properties": { "invoice_id": {"type": "string"}, "amount_paid": {"type": "number"} }, "required": ["invoice_id"]}) def _mark_paid(args): return invoice_mgmt.mark_paid(**args) @tool("sync_invoice_qbo", "Sync invoice to QuickBooks Online", { "type": "object", "properties": {"invoice_id": {"type": "string"}}, "required": ["invoice_id"]}) def _sync_qbo(args): return invoice_mgmt.sync_to_qbo(args["invoice_id"]) # --- Scope & Materials --- @tool("generate_scope", "Generate scope of work from estimate", { "type": "object", "properties": {"estimate_id": {"type": "string"}}, "required": ["estimate_id"]}) def _gen_scope(args): return scope_generator.generate(args["estimate_id"]) @tool("generate_material_list", "Generate material shopping list from estimate", { "type": "object", "properties": {"estimate_id": {"type": "string"}}, "required": ["estimate_id"]}) def _gen_materials(args): return material_list.generate(args["estimate_id"]) # --- Daily Logs --- @tool("create_daily_log", "Create a daily job log entry", { "type": "object", "properties": { "job_id": {"type": "string"}, "crew_names": {"type": "string"}, "hours_worked": {"type": "number"}, "notes": {"type": "string"}, "photos": {"type": "array", "items": {"type": "string"}} }, "required": ["job_id", "notes"]}) def _create_log(args): return daily_log.create(**args) @tool("list_daily_logs", "List daily logs for a job", { "type": "object", "properties": {"job_id": {"type": "string"}}, "required": ["job_id"]}) def _list_logs(args): return daily_log.list_logs(args["job_id"]) # --- Site Analyzer --- @tool("analyze_site", "Analyze site conditions from observations", { "type": "object", "properties": { "job_id": {"type": "string"}, "observations": {"type": "string"}, "photos_description": {"type": "string"} }, "required": ["observations"]}) def _analyze_site(args): return site_analyzer.analyze(**args) # --- Budget Tracker --- @tool("get_budget_report", "Get budget vs actual report for a job", { "type": "object", "properties": {"job_id": {"type": "string"}}, "required": ["job_id"]}) def _budget_report(args): return budget_tracker.report(args["job_id"]) # --- Project Scheduler --- @tool("set_milestone", "Set a project milestone", { "type": "object", "properties": { "job_id": {"type": "string"}, "phase_name": {"type": "string"}, "start_date": {"type": "string"}, "end_date": {"type": "string"} }, "required": ["job_id", "phase_name"]}) def _set_milestone(args): return project_scheduler.set_milestone(**args) @tool("get_schedule", "Get project schedule/timeline", { "type": "object", "properties": {"job_id": {"type": "string"}}, "required": ["job_id"]}) def _get_schedule(args): return project_scheduler.get_schedule(args["job_id"]) # --- Signatures & Activity --- @tool("request_signature", "Request client signature on estimate", { "type": "object", "properties": { "estimate_id": {"type": "string"}, "client_name": {"type": "string"} }, "required": ["estimate_id", "client_name"]}) def _req_sig(args): return signature_mgmt.request_signature(**args) @tool("record_signature", "Record a client signature", { "type": "object", "properties": { "signature_id": {"type": "string"}, "signature_data": {"type": "string"} }, "required": ["signature_id"]}) def _rec_sig(args): return signature_mgmt.record_signature(**args) @tool("log_activity", "Log client activity (viewed, approved, signed, paid)", { "type": "object", "properties": { "estimate_id": {"type": "string"}, "job_id": {"type": "string"}, "action": {"type": "string"} }, "required": ["action"]}) def _log_activity(args): return activity_logger.log(**args) @tool("get_activity", "Get activity log for an estimate or job", { "type": "object", "properties": { "estimate_id": {"type": "string"}, "job_id": {"type": "string"} }}) def _get_activity(args): return activity_logger.get(**args) # --- MCP SSE Server Implementation --- class SSEClient: def __init__(self): self.queue = queue.Queue() self.id = str(uuid.uuid4()) clients = {} def send_sse(client_id, event, data): if client_id in clients: clients[client_id].queue.put(f"event: {event}\ndata: {json.dumps(data)}\n\n") class MCPHandler(BaseHTTPRequestHandler): def log_message(self, format, *args): pass def do_GET(self): path = urlparse(self.path).path if path == "/sse": self._handle_sse() elif path == "/health": self._json_response({"ok": True, "tools": len(TOOLS)}) else: self.send_error(404) def do_POST(self): path = urlparse(self.path).path if path == "/messages": self._handle_message() else: self.send_error(404) def _handle_sse(self): client = SSEClient() clients[client.id] = client self.send_response(200) self.send_header("Content-Type", "text/event-stream") self.send_header("Cache-Control", "no-cache") self.send_header("Connection", "keep-alive") self.send_header("Access-Control-Allow-Origin", "*") self.end_headers() self.wfile.flush() # Send endpoint event immediately per MCP SSE spec msg_url = f"http://192.168.86.11:{PORT}/messages?client_id={client.id}" self.wfile.write(f"event: endpoint\ndata: {msg_url}\n\n".encode()) self.wfile.flush() try: while True: try: msg = client.queue.get(timeout=30) self.wfile.write(msg.encode()) self.wfile.flush() except queue.Empty: self.wfile.write(": keepalive\n\n".encode()) self.wfile.flush() except (BrokenPipeError, ConnectionResetError): pass finally: clients.pop(client.id, None) def _handle_message(self): params = parse_qs(urlparse(self.path).query) client_id = params.get("client_id", [None])[0] length = int(self.headers.get("Content-Length", 0)) body = json.loads(self.rfile.read(length)) if length else {} response = self._process_jsonrpc(body) if client_id and client_id in clients: send_sse(client_id, "message", response) self._json_response(response) def _process_jsonrpc(self, msg): method = msg.get("method", "") msg_id = msg.get("id") if method == "initialize": return {"jsonrpc": "2.0", "id": msg_id, "result": { "protocolVersion": "2024-11-05", "capabilities": {"tools": {}}, "serverInfo": {"name": "handoff-pro", "version": "1.0.0"} }} elif method == "notifications/initialized": return {"jsonrpc": "2.0", "id": msg_id, "result": None} elif method == "tools/list": tools_list = [{"name": k, "description": v["description"], "inputSchema": v["inputSchema"]} for k, v in TOOLS.items()] return {"jsonrpc": "2.0", "id": msg_id, "result": {"tools": tools_list}} elif method == "tools/call": tool_name = msg.get("params", {}).get("name", "") arguments = msg.get("params", {}).get("arguments", {}) if tool_name not in TOOLS: return {"jsonrpc": "2.0", "id": msg_id, "error": { "code": -32601, "message": f"Unknown tool: {tool_name}"}} try: result = TOOLS[tool_name]["handler"](arguments) return {"jsonrpc": "2.0", "id": msg_id, "result": { "content": [{"type": "text", "text": json.dumps(result, default=str)}] }} except Exception as e: return {"jsonrpc": "2.0", "id": msg_id, "result": { "content": [{"type": "text", "text": json.dumps({"error": str(e)})}], "isError": True }} return {"jsonrpc": "2.0", "id": msg_id, "error": { "code": -32601, "message": f"Unknown method: {method}"}} def _json_response(self, data): body = json.dumps(data).encode() self.send_response(200) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", len(body)) self.send_header("Access-Control-Allow-Origin", "*") self.end_headers() self.wfile.write(body) class ThreadingHTTPServer(ThreadingMixIn, HTTPServer): daemon_threads = True def main(): db.init_db() server = ThreadingHTTPServer(("0.0.0.0", PORT), MCPHandler) print(f"Handoff Pro MCP server running on port {PORT}") print(f"SSE endpoint: http://0.0.0.0:{PORT}/sse") print(f"Tools registered: {len(TOOLS)}") sys.stdout.flush() server.serve_forever() if __name__ == "__main__": main()