v2: Forge Console + Open WebUI artifacts + Docker
- web/: Local chat UI (Express + WS → Codex bridge) - openwebui/: Preset, pipelines, knowledge manifest - Dockerfile + docker-compose.yml - Updated README with 3 frontend options - CLI-agnostic: works with Codex, Claude Code, Kiro, Gemini
This commit is contained in:
76
openwebui/pipelines/gainsight_pipeline.py
Normal file
76
openwebui/pipelines/gainsight_pipeline.py
Normal file
@@ -0,0 +1,76 @@
|
||||
"""
|
||||
title: Gainsight PX Pipeline
|
||||
author: PM Factory
|
||||
version: 0.1.0
|
||||
description: Query Gainsight PX product analytics.
|
||||
requirements: requests
|
||||
"""
|
||||
|
||||
import os
|
||||
import json
|
||||
import requests
|
||||
from typing import Optional
|
||||
|
||||
|
||||
class Tools:
|
||||
def __init__(self):
|
||||
self.valves = self.Valves()
|
||||
|
||||
class Valves:
|
||||
GAINSIGHT_PX_API_KEY: str = os.environ.get("GAINSIGHT_PX_API_KEY", "")
|
||||
GAINSIGHT_PX_REGION: str = os.environ.get("GAINSIGHT_PX_REGION", "US")
|
||||
|
||||
def _base_url(self) -> str:
|
||||
if self.valves.GAINSIGHT_PX_REGION.upper() == "EU":
|
||||
return "https://api-eu.aptrinsic.com/v1"
|
||||
return "https://api.aptrinsic.com/v1"
|
||||
|
||||
def _headers(self) -> dict:
|
||||
return {
|
||||
"X-APTRINSIC-API-KEY": self.valves.GAINSIGHT_PX_API_KEY,
|
||||
"Content-Type": "application/json"
|
||||
}
|
||||
|
||||
def gainsight_query_users(self, filter_expression: str = "", page_size: int = 20, __user__: dict = {}) -> str:
|
||||
"""
|
||||
Query Gainsight PX users/accounts.
|
||||
|
||||
:param filter_expression: Optional filter (e.g. 'propertyName=value')
|
||||
:param page_size: Number of results to return. Default: 20
|
||||
:return: JSON string of user data
|
||||
"""
|
||||
try:
|
||||
params = {"pageSize": page_size}
|
||||
if filter_expression:
|
||||
params["filter"] = filter_expression
|
||||
|
||||
resp = requests.get(
|
||||
f"{self._base_url()}/users",
|
||||
headers=self._headers(),
|
||||
params=params,
|
||||
timeout=15
|
||||
)
|
||||
resp.raise_for_status()
|
||||
return json.dumps(resp.json(), indent=2)
|
||||
except Exception as e:
|
||||
return f"Error: {str(e)}"
|
||||
|
||||
def gainsight_feature_usage(self, feature_id: str, days: int = 30, __user__: dict = {}) -> str:
|
||||
"""
|
||||
Get feature usage stats from Gainsight PX.
|
||||
|
||||
:param feature_id: The feature ID to query
|
||||
:param days: Lookback period in days. Default: 30
|
||||
:return: JSON string of usage data
|
||||
"""
|
||||
try:
|
||||
resp = requests.get(
|
||||
f"{self._base_url()}/feature/{feature_id}/usage",
|
||||
headers=self._headers(),
|
||||
params={"days": days},
|
||||
timeout=15
|
||||
)
|
||||
resp.raise_for_status()
|
||||
return json.dumps(resp.json(), indent=2)
|
||||
except Exception as e:
|
||||
return f"Error: {str(e)}"
|
||||
Reference in New Issue
Block a user