Skip to content

Agent Pool Endpoint

The pool endpoint provides load-balanced access to agents with automatic conversation thread management.

Endpoint

POST /api/v1/ask/{endpoint_slug}

Overview

The pool endpoint:

  • Routes messages to available agents automatically
  • Maintains conversation context across messages
  • Provides sticky routing for conversation continuity
  • Handles load balancing across replicas

Request

Headers

Header Required Description
X-API-Key Yes Your API key
Content-Type Yes application/json

Path Parameters

Parameter Type Description
endpoint_slug string The agent type's endpoint slug

Body

{
  "content": "Your message here",
  "thread_id": "optional-thread-id"
}
Field Type Required Description
content string Yes Message content (max 10,000 chars)
thread_id string No Thread ID for conversation continuity

Response

Success (200 OK)

{
  "thread_id": "thread_abc123xyz",
  "agent_id": "550e8400-e29b-41d4-a716-446655440000",
  "content": "Hello! I'd be happy to help you. What can I assist you with today?",
  "timestamp": "2025-01-15T10:30:00Z"
}
Field Type Description
thread_id string Thread ID (use for follow-up messages)
agent_id string ID of the agent that handled the request
content string Agent's response
timestamp string ISO 8601 timestamp

Examples

First Message (New Thread)

curl -X POST https://agents.ag2trust.com/api/v1/ask/support \
  -H "X-API-Key: cust_your_api_key" \
  -H "Content-Type: application/json" \
  -d '{"content": "Hello, I need help with my order"}'
import requests

response = requests.post(
    "https://agents.ag2trust.com/api/v1/ask/support",
    headers={
        "X-API-Key": "cust_your_api_key",
        "Content-Type": "application/json"
    },
    json={"content": "Hello, I need help with my order"}
)

data = response.json()
print(f"Thread ID: {data['thread_id']}")
print(f"Response: {data['content']}")
const response = await fetch(
  'https://agents.ag2trust.com/api/v1/ask/support',
  {
    method: 'POST',
    headers: {
      'X-API-Key': 'cust_your_api_key',
      'Content-Type': 'application/json'
    },
    body: JSON.stringify({
      content: 'Hello, I need help with my order'
    })
  }
);

const data = await response.json();
console.log(`Thread ID: ${data.thread_id}`);
console.log(`Response: ${data.content}`);

Follow-up Message (Same Thread)

curl -X POST https://agents.ag2trust.com/api/v1/ask/support \
  -H "X-API-Key: cust_your_api_key" \
  -H "Content-Type: application/json" \
  -d '{
    "thread_id": "thread_abc123xyz",
    "content": "Order number is 12345"
  }'
# Continue the conversation using thread_id
response = requests.post(
    "https://agents.ag2trust.com/api/v1/ask/support",
    headers={
        "X-API-Key": "cust_your_api_key",
        "Content-Type": "application/json"
    },
    json={
        "thread_id": "thread_abc123xyz",
        "content": "Order number is 12345"
    }
)
const response = await fetch(
  'https://agents.ag2trust.com/api/v1/ask/support',
  {
    method: 'POST',
    headers: {
      'X-API-Key': 'cust_your_api_key',
      'Content-Type': 'application/json'
    },
    body: JSON.stringify({
      thread_id: 'thread_abc123xyz',
      content: 'Order number is 12345'
    })
  }
);

Thread Management

How Threads Work

  1. First message: No thread_id → new thread created
  2. Subsequent messages: Include thread_id → continues conversation
  3. Context maintained: Agent receives conversation history

Thread Behavior

Aspect Value
Thread TTL 1 hour (sliding window)
Max messages 100 per thread
Context passed Last N messages fitting context window

Sticky Routing

When you include a thread_id:

  1. System checks if previous agent is available
  2. If available (queue < 3), routes to same agent
  3. If unavailable, routes to least busy agent
  4. Context is passed regardless of which agent handles it

Load Balancing

Routing Logic

Priority 1: Sticky (same agent, queue < 3)
Priority 2: Available (any agent, queue < 3)
Priority 3: Overflow (any agent, queue < 10)
Reject: 503 Service Unavailable

Example Scenarios

Scenario Result
3 agents, all idle Routes to any
3 agents, 1 busy Routes to idle ones
All agents at queue 5 Routes to least busy
All agents at queue 10+ 503 error

Setup Requirements

Before using the pool endpoint:

  1. Create an Agent Type with an endpoint_slug
  2. Create agents of that type (or deploy via team)
  3. Start the agents
Agent Type: Customer Support
  └── endpoint_slug: "support"
  └── Agents: support-1, support-2, support-3

API URL: POST /api/v1/ask/support

Error Responses

404 Not Found

{
  "error": "Endpoint not found",
  "error_code": "ENDPOINT_NOT_FOUND"
}

The endpoint slug doesn't exist for your organization.

503 Service Unavailable

{
  "error": "No agents available",
  "error_code": "NO_AGENTS_AVAILABLE"
}

All agents are busy or offline.

Headers:

Retry-After: 5

Handling:

response = requests.post(url, ...)
if response.status_code == 503:
    retry_after = int(response.headers.get("Retry-After", 5))
    time.sleep(retry_after)
    # Retry the request

504 Gateway Timeout

{
  "error": "Agent did not respond in time",
  "error_code": "AGENT_TIMEOUT"
}

Agent didn't respond within 60 seconds.

Best Practices

1. Always Store thread_id

# Store thread_id for conversation continuity
thread_id = None

def send_message(content):
    global thread_id
    payload = {"content": content}
    if thread_id:
        payload["thread_id"] = thread_id

    response = requests.post(url, json=payload, headers=headers)
    data = response.json()

    thread_id = data["thread_id"]  # Save for next message
    return data["content"]

2. Handle 503 with Retry

import time
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

session = requests.Session()
retries = Retry(
    total=3,
    backoff_factor=1,
    status_forcelist=[503]
)
session.mount('https://', HTTPAdapter(max_retries=retries))

3. Set Reasonable Timeouts

response = requests.post(
    url,
    json=payload,
    headers=headers,
    timeout=65  # Slightly longer than server timeout
)

4. Monitor Thread Expiration

Threads expire after 1 hour of inactivity. For long sessions:

# Start new conversation if thread might be expired
last_message_time = get_last_message_time()
if time.time() - last_message_time > 3000:  # 50 minutes
    thread_id = None  # Start fresh

Comparison with Direct Endpoint

Feature Pool Endpoint Direct Endpoint
Load balancing Yes No
Conversation context Built-in Manual
Specify agent No Yes
Best for Production Testing/specific agents

Next Steps