Files
pm-template/openwebui/pipelines/gainsight_pipeline.py

77 lines
2.4 KiB
Python
Raw Permalink Normal View History

"""
title: Gainsight PX Pipeline
author: PM Factory
version: 0.1.0
description: Query Gainsight PX product analytics.
requirements: requests
"""
import os
import json
import requests
from typing import Optional
class Tools:
def __init__(self):
self.valves = self.Valves()
class Valves:
GAINSIGHT_PX_API_KEY: str = os.environ.get("GAINSIGHT_PX_API_KEY", "")
GAINSIGHT_PX_REGION: str = os.environ.get("GAINSIGHT_PX_REGION", "US")
def _base_url(self) -> str:
if self.valves.GAINSIGHT_PX_REGION.upper() == "EU":
return "https://api-eu.aptrinsic.com/v1"
return "https://api.aptrinsic.com/v1"
def _headers(self) -> dict:
return {
"X-APTRINSIC-API-KEY": self.valves.GAINSIGHT_PX_API_KEY,
"Content-Type": "application/json"
}
def gainsight_query_users(self, filter_expression: str = "", page_size: int = 20, __user__: dict = {}) -> str:
"""
Query Gainsight PX users/accounts.
:param filter_expression: Optional filter (e.g. 'propertyName=value')
:param page_size: Number of results to return. Default: 20
:return: JSON string of user data
"""
try:
params = {"pageSize": page_size}
if filter_expression:
params["filter"] = filter_expression
resp = requests.get(
f"{self._base_url()}/users",
headers=self._headers(),
params=params,
timeout=15
)
resp.raise_for_status()
return json.dumps(resp.json(), indent=2)
except Exception as e:
return f"Error: {str(e)}"
def gainsight_feature_usage(self, feature_id: str, days: int = 30, __user__: dict = {}) -> str:
"""
Get feature usage stats from Gainsight PX.
:param feature_id: The feature ID to query
:param days: Lookback period in days. Default: 30
:return: JSON string of usage data
"""
try:
resp = requests.get(
f"{self._base_url()}/feature/{feature_id}/usage",
headers=self._headers(),
params={"days": days},
timeout=15
)
resp.raise_for_status()
return json.dumps(resp.json(), indent=2)
except Exception as e:
return f"Error: {str(e)}"