Skip to content

Python Examples

Complete Python examples for AG2Trust API integration.

Setup

Installation

pip install requests
# or
pip install httpx  # for async support

Configuration

import os

API_KEY = os.environ["AG2TRUST_API_KEY"]
BASE_URL = "https://agents.ag2trust.com"

Basic Usage

Send Message to Pool

import requests

def send_message(content: str, thread_id: str = None) -> dict:
    """Send a message to the support agent pool."""
    payload = {"content": content}
    if thread_id:
        payload["thread_id"] = thread_id

    response = requests.post(
        f"{BASE_URL}/api/v1/ask/support",
        headers={
            "X-API-Key": API_KEY,
            "Content-Type": "application/json"
        },
        json=payload,
        timeout=65
    )
    response.raise_for_status()
    return response.json()

# Usage
result = send_message("Hello, I need help with my order")
print(f"Response: {result['content']}")
print(f"Thread ID: {result['thread_id']}")

Send Message to Specific Agent

def send_to_agent(agent_id: str, message: str) -> dict:
    """Send a message to a specific agent."""
    response = requests.post(
        f"{BASE_URL}/api/v1/agents/{agent_id}/messages",
        headers={
            "X-API-Key": API_KEY,
            "Content-Type": "application/json"
        },
        json={"message": message},
        timeout=65
    )
    response.raise_for_status()
    return response.json()

# Usage
result = send_to_agent("agent-uuid", "Hello!")
for block in result["content"]:
    if block["type"] == "text":
        print(block["text"])

Conversation Management

Multi-Turn Conversation

class Conversation:
    def __init__(self, endpoint: str = "support"):
        self.endpoint = endpoint
        self.thread_id = None

    def send(self, content: str) -> str:
        payload = {"content": content}
        if self.thread_id:
            payload["thread_id"] = self.thread_id

        response = requests.post(
            f"{BASE_URL}/api/v1/ask/{self.endpoint}",
            headers={
                "X-API-Key": API_KEY,
                "Content-Type": "application/json"
            },
            json=payload,
            timeout=65
        )
        response.raise_for_status()
        data = response.json()

        self.thread_id = data["thread_id"]
        return data["content"]

# Usage
conv = Conversation("support")
print(conv.send("I have a billing question"))
print(conv.send("Can you show me my recent invoices?"))
print(conv.send("The one from January looks wrong"))

Error Handling

Comprehensive Error Handler

import time
from typing import Optional

class AG2TrustError(Exception):
    """Base exception for AG2Trust API errors."""
    def __init__(self, message: str, error_code: str = None, details: dict = None):
        self.message = message
        self.error_code = error_code
        self.details = details or {}
        super().__init__(message)

class RateLimitError(AG2TrustError):
    """Rate limit exceeded."""
    def __init__(self, retry_after: int, **kwargs):
        self.retry_after = retry_after
        super().__init__("Rate limit exceeded", "RATE_LIMIT_EXCEEDED", **kwargs)

class AgentNotAvailableError(AG2TrustError):
    """No agents available."""
    pass

def handle_response(response: requests.Response) -> dict:
    """Handle API response and raise appropriate exceptions."""
    if response.status_code == 200:
        return response.json()

    if response.status_code == 429:
        retry_after = int(response.headers.get("Retry-After", 5))
        raise RateLimitError(retry_after)

    if response.status_code == 503:
        error_data = response.json()
        raise AgentNotAvailableError(
            error_data.get("error", "No agents available"),
            error_data.get("error_code")
        )

    if response.status_code >= 400:
        error_data = response.json()
        raise AG2TrustError(
            error_data.get("error", "Unknown error"),
            error_data.get("error_code"),
            error_data.get("details")
        )

    return response.json()

def send_message_with_retry(
    content: str,
    max_retries: int = 3,
    thread_id: str = None
) -> dict:
    """Send message with automatic retry on rate limits."""
    for attempt in range(max_retries):
        try:
            response = requests.post(
                f"{BASE_URL}/api/v1/ask/support",
                headers={
                    "X-API-Key": API_KEY,
                    "Content-Type": "application/json"
                },
                json={"content": content, "thread_id": thread_id},
                timeout=65
            )
            return handle_response(response)

        except RateLimitError as e:
            if attempt < max_retries - 1:
                print(f"Rate limited, waiting {e.retry_after}s...")
                time.sleep(e.retry_after)
            else:
                raise

    raise AG2TrustError("Max retries exceeded")

Async Support

Using httpx

import httpx
import asyncio

async def send_message_async(content: str, thread_id: str = None) -> dict:
    """Send message asynchronously using httpx."""
    payload = {"content": content}
    if thread_id:
        payload["thread_id"] = thread_id

    async with httpx.AsyncClient() as client:
        response = await client.post(
            f"{BASE_URL}/api/v1/ask/support",
            headers={
                "X-API-Key": API_KEY,
                "Content-Type": "application/json"
            },
            json=payload,
            timeout=65
        )
        response.raise_for_status()
        return response.json()

# Usage
async def main():
    result = await send_message_async("Hello!")
    print(result["content"])

asyncio.run(main())

Concurrent Requests

async def send_multiple_messages(messages: list[str]) -> list[dict]:
    """Send multiple messages concurrently."""
    async with httpx.AsyncClient() as client:
        tasks = [
            client.post(
                f"{BASE_URL}/api/v1/ask/support",
                headers={
                    "X-API-Key": API_KEY,
                    "Content-Type": "application/json"
                },
                json={"content": msg},
                timeout=65
            )
            for msg in messages
        ]
        responses = await asyncio.gather(*tasks)
        return [r.json() for r in responses]

# Usage
async def main():
    messages = ["Question 1", "Question 2", "Question 3"]
    results = await send_multiple_messages(messages)
    for result in results:
        print(result["content"])

Webhook Handling

Flask Webhook Server

from flask import Flask, request
import hmac
import hashlib

app = Flask(__name__)

def verify_signature(body: bytes, signature: str) -> bool:
    """Verify webhook signature."""
    expected = hmac.new(
        API_KEY.encode(),
        body,
        hashlib.sha256
    ).hexdigest()
    provided = signature.replace("sha256=", "")
    return hmac.compare_digest(expected, provided)

@app.route("/webhooks/ag2trust", methods=["POST"])
def handle_webhook():
    # Verify signature
    signature = request.headers.get("X-Webhook-Signature", "")
    if not verify_signature(request.data, signature):
        return "Invalid signature", 401

    # Process payload
    payload = request.json
    callback_id = payload["callback_id"]

    if payload["status"] == "completed":
        content = payload["content"]
        process_response(callback_id, content)
    else:
        handle_failure(callback_id, payload)

    return "OK", 200

def process_response(callback_id: str, content: list):
    """Process successful response."""
    for block in content:
        if block["type"] == "text":
            print(f"[{callback_id}] {block['text']}")

def handle_failure(callback_id: str, payload: dict):
    """Handle failed request."""
    print(f"[{callback_id}] Failed: {payload.get('error')}")

if __name__ == "__main__":
    app.run(port=3000)

Send Async Request

def send_async_message(message: str, webhook_url: str) -> dict:
    """Send message with webhook callback."""
    response = requests.post(
        f"{BASE_URL}/api/v1/agents/{{agent_id}}/messages",
        headers={
            "X-API-Key": API_KEY,
            "Content-Type": "application/json"
        },
        json={
            "message": message,
            "webhook_url": webhook_url
        }
    )
    response.raise_for_status()
    return response.json()

# Usage
result = send_async_message(
    "Analyze this large dataset...",
    "https://your-server.com/webhooks/ag2trust"
)
print(f"Processing started: {result['callback_id']}")

Complete Client Class

import requests
import time
from typing import Optional, List
from dataclasses import dataclass

@dataclass
class Message:
    content: str
    thread_id: Optional[str] = None
    agent_id: Optional[str] = None
    tokens_used: int = 0

class AG2TrustClient:
    """AG2Trust API client."""

    def __init__(self, api_key: str, base_url: str = "https://agents.ag2trust.com"):
        self.api_key = api_key
        self.base_url = base_url
        self.session = requests.Session()
        self.session.headers.update({
            "X-API-Key": api_key,
            "Content-Type": "application/json"
        })

    def ask(
        self,
        endpoint: str,
        content: str,
        thread_id: str = None
    ) -> Message:
        """Send message to agent pool."""
        payload = {"content": content}
        if thread_id:
            payload["thread_id"] = thread_id

        response = self._request(
            "POST",
            f"/api/v1/ask/{endpoint}",
            json=payload
        )

        return Message(
            content=response["content"],
            thread_id=response["thread_id"],
            agent_id=response.get("agent_id")
        )

    def send_to_agent(
        self,
        agent_id: str,
        message: str
    ) -> Message:
        """Send message to specific agent."""
        response = self._request(
            "POST",
            f"/api/v1/agents/{agent_id}/messages",
            json={"message": message}
        )

        content_text = ""
        for block in response.get("content", []):
            if block["type"] == "text":
                content_text += block["text"]

        return Message(
            content=content_text,
            agent_id=agent_id,
            tokens_used=response.get("metadata", {}).get("tokens_used", 0)
        )

    def list_agents(self) -> List[dict]:
        """List available agents."""
        response = self._request("GET", "/api/v1/agents")
        return response.get("agents", [])

    def _request(
        self,
        method: str,
        path: str,
        max_retries: int = 3,
        **kwargs
    ) -> dict:
        """Make HTTP request with retry logic."""
        kwargs.setdefault("timeout", 65)

        for attempt in range(max_retries):
            response = self.session.request(
                method,
                f"{self.base_url}{path}",
                **kwargs
            )

            if response.status_code == 429:
                retry_after = int(response.headers.get("Retry-After", 5))
                if attempt < max_retries - 1:
                    time.sleep(retry_after)
                    continue
                raise Exception("Rate limit exceeded")

            response.raise_for_status()
            return response.json()

        raise Exception("Max retries exceeded")

# Usage
client = AG2TrustClient(os.environ["AG2TRUST_API_KEY"])

# Send to pool
response = client.ask("support", "Hello!")
print(response.content)

# Continue conversation
response = client.ask("support", "Tell me more", thread_id=response.thread_id)
print(response.content)

# List agents
agents = client.list_agents()
for agent in agents:
    print(f"{agent['name']}: {agent['status']}")