Python Examples¶
Complete Python examples for AG2Trust API integration.
Setup¶
Installation¶
Configuration¶
Basic Usage¶
Send Message to Pool¶
import requests
def send_message(content: str, thread_id: str = None) -> dict:
"""Send a message to the support agent pool."""
payload = {"content": content}
if thread_id:
payload["thread_id"] = thread_id
response = requests.post(
f"{BASE_URL}/api/v1/ask/support",
headers={
"X-API-Key": API_KEY,
"Content-Type": "application/json"
},
json=payload,
timeout=65
)
response.raise_for_status()
return response.json()
# Usage
result = send_message("Hello, I need help with my order")
print(f"Response: {result['content']}")
print(f"Thread ID: {result['thread_id']}")
Send Message to Specific Agent¶
def send_to_agent(agent_id: str, message: str) -> dict:
"""Send a message to a specific agent."""
response = requests.post(
f"{BASE_URL}/api/v1/agents/{agent_id}/messages",
headers={
"X-API-Key": API_KEY,
"Content-Type": "application/json"
},
json={"message": message},
timeout=65
)
response.raise_for_status()
return response.json()
# Usage
result = send_to_agent("agent-uuid", "Hello!")
for block in result["content"]:
if block["type"] == "text":
print(block["text"])
Conversation Management¶
Multi-Turn Conversation¶
class Conversation:
def __init__(self, endpoint: str = "support"):
self.endpoint = endpoint
self.thread_id = None
def send(self, content: str) -> str:
payload = {"content": content}
if self.thread_id:
payload["thread_id"] = self.thread_id
response = requests.post(
f"{BASE_URL}/api/v1/ask/{self.endpoint}",
headers={
"X-API-Key": API_KEY,
"Content-Type": "application/json"
},
json=payload,
timeout=65
)
response.raise_for_status()
data = response.json()
self.thread_id = data["thread_id"]
return data["content"]
# Usage
conv = Conversation("support")
print(conv.send("I have a billing question"))
print(conv.send("Can you show me my recent invoices?"))
print(conv.send("The one from January looks wrong"))
Error Handling¶
Comprehensive Error Handler¶
import time
from typing import Optional
class AG2TrustError(Exception):
"""Base exception for AG2Trust API errors."""
def __init__(self, message: str, error_code: str = None, details: dict = None):
self.message = message
self.error_code = error_code
self.details = details or {}
super().__init__(message)
class RateLimitError(AG2TrustError):
"""Rate limit exceeded."""
def __init__(self, retry_after: int, **kwargs):
self.retry_after = retry_after
super().__init__("Rate limit exceeded", "RATE_LIMIT_EXCEEDED", **kwargs)
class AgentNotAvailableError(AG2TrustError):
"""No agents available."""
pass
def handle_response(response: requests.Response) -> dict:
"""Handle API response and raise appropriate exceptions."""
if response.status_code == 200:
return response.json()
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 5))
raise RateLimitError(retry_after)
if response.status_code == 503:
error_data = response.json()
raise AgentNotAvailableError(
error_data.get("error", "No agents available"),
error_data.get("error_code")
)
if response.status_code >= 400:
error_data = response.json()
raise AG2TrustError(
error_data.get("error", "Unknown error"),
error_data.get("error_code"),
error_data.get("details")
)
return response.json()
def send_message_with_retry(
content: str,
max_retries: int = 3,
thread_id: str = None
) -> dict:
"""Send message with automatic retry on rate limits."""
for attempt in range(max_retries):
try:
response = requests.post(
f"{BASE_URL}/api/v1/ask/support",
headers={
"X-API-Key": API_KEY,
"Content-Type": "application/json"
},
json={"content": content, "thread_id": thread_id},
timeout=65
)
return handle_response(response)
except RateLimitError as e:
if attempt < max_retries - 1:
print(f"Rate limited, waiting {e.retry_after}s...")
time.sleep(e.retry_after)
else:
raise
raise AG2TrustError("Max retries exceeded")
Async Support¶
Using httpx¶
import httpx
import asyncio
async def send_message_async(content: str, thread_id: str = None) -> dict:
"""Send message asynchronously using httpx."""
payload = {"content": content}
if thread_id:
payload["thread_id"] = thread_id
async with httpx.AsyncClient() as client:
response = await client.post(
f"{BASE_URL}/api/v1/ask/support",
headers={
"X-API-Key": API_KEY,
"Content-Type": "application/json"
},
json=payload,
timeout=65
)
response.raise_for_status()
return response.json()
# Usage
async def main():
result = await send_message_async("Hello!")
print(result["content"])
asyncio.run(main())
Concurrent Requests¶
async def send_multiple_messages(messages: list[str]) -> list[dict]:
"""Send multiple messages concurrently."""
async with httpx.AsyncClient() as client:
tasks = [
client.post(
f"{BASE_URL}/api/v1/ask/support",
headers={
"X-API-Key": API_KEY,
"Content-Type": "application/json"
},
json={"content": msg},
timeout=65
)
for msg in messages
]
responses = await asyncio.gather(*tasks)
return [r.json() for r in responses]
# Usage
async def main():
messages = ["Question 1", "Question 2", "Question 3"]
results = await send_multiple_messages(messages)
for result in results:
print(result["content"])
Webhook Handling¶
Flask Webhook Server¶
from flask import Flask, request
import hmac
import hashlib
app = Flask(__name__)
def verify_signature(body: bytes, signature: str) -> bool:
"""Verify webhook signature."""
expected = hmac.new(
API_KEY.encode(),
body,
hashlib.sha256
).hexdigest()
provided = signature.replace("sha256=", "")
return hmac.compare_digest(expected, provided)
@app.route("/webhooks/ag2trust", methods=["POST"])
def handle_webhook():
# Verify signature
signature = request.headers.get("X-Webhook-Signature", "")
if not verify_signature(request.data, signature):
return "Invalid signature", 401
# Process payload
payload = request.json
callback_id = payload["callback_id"]
if payload["status"] == "completed":
content = payload["content"]
process_response(callback_id, content)
else:
handle_failure(callback_id, payload)
return "OK", 200
def process_response(callback_id: str, content: list):
"""Process successful response."""
for block in content:
if block["type"] == "text":
print(f"[{callback_id}] {block['text']}")
def handle_failure(callback_id: str, payload: dict):
"""Handle failed request."""
print(f"[{callback_id}] Failed: {payload.get('error')}")
if __name__ == "__main__":
app.run(port=3000)
Send Async Request¶
def send_async_message(message: str, webhook_url: str) -> dict:
"""Send message with webhook callback."""
response = requests.post(
f"{BASE_URL}/api/v1/agents/{{agent_id}}/messages",
headers={
"X-API-Key": API_KEY,
"Content-Type": "application/json"
},
json={
"message": message,
"webhook_url": webhook_url
}
)
response.raise_for_status()
return response.json()
# Usage
result = send_async_message(
"Analyze this large dataset...",
"https://your-server.com/webhooks/ag2trust"
)
print(f"Processing started: {result['callback_id']}")
Complete Client Class¶
import requests
import time
from typing import Optional, List
from dataclasses import dataclass
@dataclass
class Message:
content: str
thread_id: Optional[str] = None
agent_id: Optional[str] = None
tokens_used: int = 0
class AG2TrustClient:
"""AG2Trust API client."""
def __init__(self, api_key: str, base_url: str = "https://agents.ag2trust.com"):
self.api_key = api_key
self.base_url = base_url
self.session = requests.Session()
self.session.headers.update({
"X-API-Key": api_key,
"Content-Type": "application/json"
})
def ask(
self,
endpoint: str,
content: str,
thread_id: str = None
) -> Message:
"""Send message to agent pool."""
payload = {"content": content}
if thread_id:
payload["thread_id"] = thread_id
response = self._request(
"POST",
f"/api/v1/ask/{endpoint}",
json=payload
)
return Message(
content=response["content"],
thread_id=response["thread_id"],
agent_id=response.get("agent_id")
)
def send_to_agent(
self,
agent_id: str,
message: str
) -> Message:
"""Send message to specific agent."""
response = self._request(
"POST",
f"/api/v1/agents/{agent_id}/messages",
json={"message": message}
)
content_text = ""
for block in response.get("content", []):
if block["type"] == "text":
content_text += block["text"]
return Message(
content=content_text,
agent_id=agent_id,
tokens_used=response.get("metadata", {}).get("tokens_used", 0)
)
def list_agents(self) -> List[dict]:
"""List available agents."""
response = self._request("GET", "/api/v1/agents")
return response.get("agents", [])
def _request(
self,
method: str,
path: str,
max_retries: int = 3,
**kwargs
) -> dict:
"""Make HTTP request with retry logic."""
kwargs.setdefault("timeout", 65)
for attempt in range(max_retries):
response = self.session.request(
method,
f"{self.base_url}{path}",
**kwargs
)
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 5))
if attempt < max_retries - 1:
time.sleep(retry_after)
continue
raise Exception("Rate limit exceeded")
response.raise_for_status()
return response.json()
raise Exception("Max retries exceeded")
# Usage
client = AG2TrustClient(os.environ["AG2TRUST_API_KEY"])
# Send to pool
response = client.ask("support", "Hello!")
print(response.content)
# Continue conversation
response = client.ask("support", "Tell me more", thread_id=response.thread_id)
print(response.content)
# List agents
agents = client.list_agents()
for agent in agents:
print(f"{agent['name']}: {agent['status']}")