"""JobTread bidirectional sync tool.""" import json, sys, os sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) from lib.db import get_db, new_id, now, row_to_dict, rows_to_list from lib.pave import pave_query, get_org_id def list_customers(): org_id = get_org_id() result = pave_query({"organization": {"$": {"id": org_id}, "accounts": {"nodes": {"id": {}, "name": {}, "type": {}}}}}) accounts = result.get("organization", {}).get("accounts", {}).get("nodes", []) return {"customers": [a for a in accounts if a.get("type") == "customer"]} def pull_jobs(): """Pull all jobs from JobTread and upsert to local DB.""" org_id = get_org_id() result = pave_query({"organization": {"$": {"id": org_id}, "jobs": {"nodes": {"id": {}, "name": {}, "status": {}, "createdAt": {}, "account": {"id": {}, "name": {}}, "location": {"name": {}, "address": {}}}}}}) jobs = result.get("organization", {}).get("jobs", {}).get("nodes", []) conn = get_db() synced = 0 for j in jobs: jt_id = j.get("id") existing = conn.execute("SELECT id FROM jobs WHERE jobtread_job_id=?", (jt_id,)).fetchone() if existing: conn.execute("UPDATE jobs SET status=?, updated_at=?, synced_to_jobtread_at=? WHERE jobtread_job_id=?", (j.get("status","active"), now(), now(), jt_id)) else: conn.execute("""INSERT INTO jobs (id, client_name, address, description, jobtread_job_id, jobtread_customer_id, status, created_at, updated_at, synced_to_jobtread_at) VALUES (?,?,?,?,?,?,?,?,?,?)""", (new_id(), j.get("account",{}).get("name",""), j.get("location",{}).get("address",""), j.get("name",""), jt_id, j.get("account",{}).get("id",""), j.get("status","active"), j.get("createdAt", now()), now(), now())) synced += 1 conn.commit(); conn.close() _log_sync(None, None, "job", "pull", "success") return {"ok": True, "synced": synced, "total_jobs": len(jobs)} def push_estimate(estimate_id): """Push local estimate to JobTread. NOTE: Pave create mutations need verification.""" conn = get_db() est = row_to_dict(conn.execute("SELECT * FROM estimates WHERE id=?", (estimate_id,)).fetchone()) if not est: conn.close(); return {"error": "estimate not found"} job = row_to_dict(conn.execute("SELECT * FROM jobs WHERE id=?", (est["job_id"],)).fetchone()) if not job or not job.get("jobtread_job_id"): conn.close(); return {"error": "job not linked to JobTread"} items = rows_to_list(conn.execute("SELECT * FROM line_items WHERE estimate_id=?", (estimate_id,)).fetchall()) conn.close() # Log as pending — actual Pave create mutation TBD (needs Pave Explorer verification) _log_sync(estimate_id, job["jobtread_job_id"], "estimate", "push", "pending", "Pave create document mutation needs verification") return {"ok": True, "status": "pending", "msg": "Estimate queued for JobTread sync. Pave create mutation needs verification via Pave Explorer."} def push_invoice(invoice_id): """Push local invoice to JobTread.""" conn = get_db() inv = row_to_dict(conn.execute("SELECT * FROM invoices WHERE id=?", (invoice_id,)).fetchone()) if not inv: conn.close(); return {"error": "invoice not found"} job = row_to_dict(conn.execute("SELECT * FROM jobs WHERE id=?", (inv["job_id"],)).fetchone()) conn.close() if not job or not job.get("jobtread_job_id"): return {"error": "job not linked to JobTread"} _log_sync(invoice_id, job["jobtread_job_id"], "invoice", "push", "pending", "Pave create invoice mutation needs verification") return {"ok": True, "status": "pending", "msg": "Invoice queued for JobTread sync."} def get_sync_status(job_id): conn = get_db() job = row_to_dict(conn.execute("SELECT * FROM jobs WHERE id=?", (job_id,)).fetchone()) logs = rows_to_list(conn.execute("SELECT * FROM jobtread_sync_log WHERE local_id=? ORDER BY synced_at DESC LIMIT 10", (job_id,)).fetchall()) conn.close() return {"job": job, "sync_log": logs} def _log_sync(local_id, jt_id, entity_type, action, status, error=None): conn = get_db() conn.execute("INSERT INTO jobtread_sync_log (id, local_id, jobtread_id, entity_type, action, status, error_message, synced_at) VALUES (?,?,?,?,?,?,?,?)", (new_id(), local_id, jt_id, entity_type, action, status, error, now())) conn.commit(); conn.close() if __name__ == "__main__": from lib.db import init_db; init_db() cmd = sys.argv[1] if len(sys.argv) > 1 else "list-customers" if cmd == "list-customers": print(json.dumps(list_customers(), indent=2)) elif cmd == "pull-jobs": print(json.dumps(pull_jobs(), indent=2)) elif cmd == "push-estimate": print(json.dumps(push_estimate(sys.argv[2]))) elif cmd == "push-invoice": print(json.dumps(push_invoice(sys.argv[2]))) elif cmd == "sync-status": print(json.dumps(get_sync_status(sys.argv[2]), indent=2)) else: print(json.dumps({"error": f"unknown: {cmd}"}))