Files
handoff-pro/scripts/jobtread_sync.py

91 lines
5.0 KiB
Python
Raw Permalink Normal View History

"""JobTread bidirectional sync tool."""
import json, sys, os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from lib.db import get_db, new_id, now, row_to_dict, rows_to_list
from lib.pave import pave_query, get_org_id
def list_customers():
org_id = get_org_id()
result = pave_query({"organization": {"$": {"id": org_id},
"accounts": {"nodes": {"id": {}, "name": {}, "type": {}}}}})
accounts = result.get("organization", {}).get("accounts", {}).get("nodes", [])
return {"customers": [a for a in accounts if a.get("type") == "customer"]}
def pull_jobs():
"""Pull all jobs from JobTread and upsert to local DB."""
org_id = get_org_id()
result = pave_query({"organization": {"$": {"id": org_id},
"jobs": {"nodes": {"id": {}, "name": {}, "status": {}, "createdAt": {},
"account": {"id": {}, "name": {}},
"location": {"name": {}, "address": {}}}}}})
jobs = result.get("organization", {}).get("jobs", {}).get("nodes", [])
conn = get_db()
synced = 0
for j in jobs:
jt_id = j.get("id")
existing = conn.execute("SELECT id FROM jobs WHERE jobtread_job_id=?", (jt_id,)).fetchone()
if existing:
conn.execute("UPDATE jobs SET status=?, updated_at=?, synced_to_jobtread_at=? WHERE jobtread_job_id=?",
(j.get("status","active"), now(), now(), jt_id))
else:
conn.execute("""INSERT INTO jobs (id, client_name, address, description, jobtread_job_id, jobtread_customer_id, status, created_at, updated_at, synced_to_jobtread_at)
VALUES (?,?,?,?,?,?,?,?,?,?)""",
(new_id(), j.get("account",{}).get("name",""), j.get("location",{}).get("address",""),
j.get("name",""), jt_id, j.get("account",{}).get("id",""),
j.get("status","active"), j.get("createdAt", now()), now(), now()))
synced += 1
conn.commit(); conn.close()
_log_sync(None, None, "job", "pull", "success")
return {"ok": True, "synced": synced, "total_jobs": len(jobs)}
def push_estimate(estimate_id):
"""Push local estimate to JobTread. NOTE: Pave create mutations need verification."""
conn = get_db()
est = row_to_dict(conn.execute("SELECT * FROM estimates WHERE id=?", (estimate_id,)).fetchone())
if not est: conn.close(); return {"error": "estimate not found"}
job = row_to_dict(conn.execute("SELECT * FROM jobs WHERE id=?", (est["job_id"],)).fetchone())
if not job or not job.get("jobtread_job_id"):
conn.close(); return {"error": "job not linked to JobTread"}
items = rows_to_list(conn.execute("SELECT * FROM line_items WHERE estimate_id=?", (estimate_id,)).fetchall())
conn.close()
# Log as pending — actual Pave create mutation TBD (needs Pave Explorer verification)
_log_sync(estimate_id, job["jobtread_job_id"], "estimate", "push", "pending",
"Pave create document mutation needs verification")
return {"ok": True, "status": "pending", "msg": "Estimate queued for JobTread sync. Pave create mutation needs verification via Pave Explorer."}
def push_invoice(invoice_id):
"""Push local invoice to JobTread."""
conn = get_db()
inv = row_to_dict(conn.execute("SELECT * FROM invoices WHERE id=?", (invoice_id,)).fetchone())
if not inv: conn.close(); return {"error": "invoice not found"}
job = row_to_dict(conn.execute("SELECT * FROM jobs WHERE id=?", (inv["job_id"],)).fetchone())
conn.close()
if not job or not job.get("jobtread_job_id"):
return {"error": "job not linked to JobTread"}
_log_sync(invoice_id, job["jobtread_job_id"], "invoice", "push", "pending",
"Pave create invoice mutation needs verification")
return {"ok": True, "status": "pending", "msg": "Invoice queued for JobTread sync."}
def get_sync_status(job_id):
conn = get_db()
job = row_to_dict(conn.execute("SELECT * FROM jobs WHERE id=?", (job_id,)).fetchone())
logs = rows_to_list(conn.execute("SELECT * FROM jobtread_sync_log WHERE local_id=? ORDER BY synced_at DESC LIMIT 10", (job_id,)).fetchall())
conn.close()
return {"job": job, "sync_log": logs}
def _log_sync(local_id, jt_id, entity_type, action, status, error=None):
conn = get_db()
conn.execute("INSERT INTO jobtread_sync_log (id, local_id, jobtread_id, entity_type, action, status, error_message, synced_at) VALUES (?,?,?,?,?,?,?,?)",
(new_id(), local_id, jt_id, entity_type, action, status, error, now()))
conn.commit(); conn.close()
if __name__ == "__main__":
from lib.db import init_db; init_db()
cmd = sys.argv[1] if len(sys.argv) > 1 else "list-customers"
if cmd == "list-customers": print(json.dumps(list_customers(), indent=2))
elif cmd == "pull-jobs": print(json.dumps(pull_jobs(), indent=2))
elif cmd == "push-estimate": print(json.dumps(push_estimate(sys.argv[2])))
elif cmd == "push-invoice": print(json.dumps(push_invoice(sys.argv[2])))
elif cmd == "sync-status": print(json.dumps(get_sync_status(sys.argv[2]), indent=2))
else: print(json.dumps({"error": f"unknown: {cmd}"}))