Agent Pool Endpoint¶
The pool endpoint provides load-balanced access to agents with automatic conversation thread management.
Endpoint¶
Overview¶
The pool endpoint:
- Routes messages to available agents automatically
- Maintains conversation context across messages
- Provides sticky routing for conversation continuity
- Handles load balancing across replicas
Request¶
Headers¶
| Header | Required | Description |
|---|---|---|
X-API-Key | Yes | Your API key |
Content-Type | Yes | application/json |
Path Parameters¶
| Parameter | Type | Description |
|---|---|---|
endpoint_slug | string | The agent type's endpoint slug |
Body¶
| Field | Type | Required | Description |
|---|---|---|---|
content | string | Yes | Message content (max 10,000 chars) |
thread_id | string | No | Thread ID for conversation continuity |
Response¶
Success (200 OK)¶
{
"thread_id": "thread_abc123xyz",
"agent_id": "550e8400-e29b-41d4-a716-446655440000",
"content": "Hello! I'd be happy to help you. What can I assist you with today?",
"timestamp": "2025-01-15T10:30:00Z"
}
| Field | Type | Description |
|---|---|---|
thread_id | string | Thread ID (use for follow-up messages) |
agent_id | string | ID of the agent that handled the request |
content | string | Agent's response |
timestamp | string | ISO 8601 timestamp |
Examples¶
First Message (New Thread)¶
import requests
response = requests.post(
"https://agents.ag2trust.com/api/v1/ask/support",
headers={
"X-API-Key": "cust_your_api_key",
"Content-Type": "application/json"
},
json={"content": "Hello, I need help with my order"}
)
data = response.json()
print(f"Thread ID: {data['thread_id']}")
print(f"Response: {data['content']}")
const response = await fetch(
'https://agents.ag2trust.com/api/v1/ask/support',
{
method: 'POST',
headers: {
'X-API-Key': 'cust_your_api_key',
'Content-Type': 'application/json'
},
body: JSON.stringify({
content: 'Hello, I need help with my order'
})
}
);
const data = await response.json();
console.log(`Thread ID: ${data.thread_id}`);
console.log(`Response: ${data.content}`);
Follow-up Message (Same Thread)¶
Thread Management¶
How Threads Work¶
- First message: No
thread_id→ new thread created - Subsequent messages: Include
thread_id→ continues conversation - Context maintained: Agent receives conversation history
Thread Behavior¶
| Aspect | Value |
|---|---|
| Thread TTL | 1 hour (sliding window) |
| Max messages | 100 per thread |
| Context passed | Last N messages fitting context window |
Sticky Routing¶
When you include a thread_id:
- System checks if previous agent is available
- If available (queue < 3), routes to same agent
- If unavailable, routes to least busy agent
- Context is passed regardless of which agent handles it
Load Balancing¶
Routing Logic¶
Priority 1: Sticky (same agent, queue < 3)
↓
Priority 2: Available (any agent, queue < 3)
↓
Priority 3: Overflow (any agent, queue < 10)
↓
Reject: 503 Service Unavailable
Example Scenarios¶
| Scenario | Result |
|---|---|
| 3 agents, all idle | Routes to any |
| 3 agents, 1 busy | Routes to idle ones |
| All agents at queue 5 | Routes to least busy |
| All agents at queue 10+ | 503 error |
Setup Requirements¶
Before using the pool endpoint:
- Create an Agent Type with an
endpoint_slug - Create agents of that type (or deploy via team)
- Start the agents
Agent Type: Customer Support
└── endpoint_slug: "support"
└── Agents: support-1, support-2, support-3
API URL: POST /api/v1/ask/support
Error Responses¶
404 Not Found¶
The endpoint slug doesn't exist for your organization.
503 Service Unavailable¶
All agents are busy or offline.
Headers:
Handling:
response = requests.post(url, ...)
if response.status_code == 503:
retry_after = int(response.headers.get("Retry-After", 5))
time.sleep(retry_after)
# Retry the request
504 Gateway Timeout¶
Agent didn't respond within 60 seconds.
Best Practices¶
1. Always Store thread_id¶
# Store thread_id for conversation continuity
thread_id = None
def send_message(content):
global thread_id
payload = {"content": content}
if thread_id:
payload["thread_id"] = thread_id
response = requests.post(url, json=payload, headers=headers)
data = response.json()
thread_id = data["thread_id"] # Save for next message
return data["content"]
2. Handle 503 with Retry¶
import time
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
session = requests.Session()
retries = Retry(
total=3,
backoff_factor=1,
status_forcelist=[503]
)
session.mount('https://', HTTPAdapter(max_retries=retries))
3. Set Reasonable Timeouts¶
response = requests.post(
url,
json=payload,
headers=headers,
timeout=65 # Slightly longer than server timeout
)
4. Monitor Thread Expiration¶
Threads expire after 1 hour of inactivity. For long sessions:
# Start new conversation if thread might be expired
last_message_time = get_last_message_time()
if time.time() - last_message_time > 3000: # 50 minutes
thread_id = None # Start fresh
Comparison with Direct Endpoint¶
| Feature | Pool Endpoint | Direct Endpoint |
|---|---|---|
| Load balancing | Yes | No |
| Conversation context | Built-in | Manual |
| Specify agent | No | Yes |
| Best for | Production | Testing/specific agents |
Next Steps¶
- Direct Agent Endpoint - Target specific agents
- Webhooks - Async response delivery
- Error Codes - Complete error reference