Webhook Integration Guide¶
This guide walks through setting up webhooks for asynchronous response delivery.
When to Use Webhooks¶
Use webhooks when:
- Agent tasks may take > 30 seconds
- You want non-blocking request handling
- Your architecture is event-driven
- You need guaranteed delivery with retries
Architecture Overview¶
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Your App │────►│ AG2Trust API │────►│ Agent │
│ │ │ │ │ │
│ POST /message │ │ 202 Accepted │ │ Processing... │
│ webhook_url:X │◄────│ callback_id │ │ │
└─────────────────┘ └─────────────────┘ └────────┬────────┘
│
│ Done
▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Your Webhook │◄────│ Webhook │◄────│ Response │
│ Endpoint │ │ Delivery │ │ Ready │
│ │ │ │ │ │
│ 200 OK │────►│ Delivered! │ │ │
└─────────────────┘ └─────────────────┘ └─────────────────┘
Step 1: Create a Webhook Endpoint¶
Your webhook endpoint must:
- Accept POST requests
- Use HTTPS
- Return 2xx within 10 seconds
- Verify signatures
Example Implementations¶
from flask import Flask, request
import hmac
import hashlib
app = Flask(__name__)
API_KEY = "cust_your_api_key_here"
def verify_signature(body: bytes, signature: str) -> bool:
expected = hmac.new(
API_KEY.encode(),
body,
hashlib.sha256
).hexdigest()
provided = signature.replace("sha256=", "")
return hmac.compare_digest(expected, provided)
@app.route("/webhooks/ag2trust", methods=["POST"])
def handle_webhook():
# Verify signature
signature = request.headers.get("X-Webhook-Signature", "")
if not verify_signature(request.data, signature):
return "Invalid signature", 401
# Process payload
payload = request.json
callback_id = payload["callback_id"]
status = payload["status"]
if status == "completed":
content = payload["content"]
# Handle successful response
process_response(callback_id, content)
else:
# Handle failure
handle_failure(callback_id, payload)
return "OK", 200
def process_response(callback_id, content):
for block in content:
if block["type"] == "text":
print(f"Response: {block['text']}")
def handle_failure(callback_id, payload):
print(f"Request {callback_id} failed")
from fastapi import FastAPI, Request, HTTPException
import hmac
import hashlib
app = FastAPI()
API_KEY = "cust_your_api_key_here"
def verify_signature(body: bytes, signature: str) -> bool:
expected = hmac.new(
API_KEY.encode(),
body,
hashlib.sha256
).hexdigest()
provided = signature.replace("sha256=", "")
return hmac.compare_digest(expected, provided)
@app.post("/webhooks/ag2trust")
async def handle_webhook(request: Request):
body = await request.body()
signature = request.headers.get("X-Webhook-Signature", "")
if not verify_signature(body, signature):
raise HTTPException(status_code=401, detail="Invalid signature")
payload = await request.json()
# Queue for async processing
await process_webhook_async(payload)
return {"status": "received"}
async def process_webhook_async(payload):
# Implement your processing logic
pass
const express = require('express');
const crypto = require('crypto');
const app = express();
const API_KEY = 'cust_your_api_key_here';
// Use raw body for signature verification
app.use('/webhooks', express.raw({ type: '*/*' }));
function verifySignature(body, signature) {
const expected = crypto
.createHmac('sha256', API_KEY)
.update(body)
.digest('hex');
const provided = signature.replace('sha256=', '');
return crypto.timingSafeEqual(
Buffer.from(expected),
Buffer.from(provided)
);
}
app.post('/webhooks/ag2trust', (req, res) => {
const signature = req.headers['x-webhook-signature'] || '';
if (!verifySignature(req.body, signature)) {
return res.status(401).send('Invalid signature');
}
const payload = JSON.parse(req.body);
// Process async
processWebhook(payload);
res.status(200).send('OK');
});
function processWebhook(payload) {
console.log(`Received callback: ${payload.callback_id}`);
// Implement processing
}
app.listen(3000);
package main
import (
"crypto/hmac"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"io"
"net/http"
"strings"
)
const apiKey = "cust_your_api_key_here"
func verifySignature(body []byte, signature string) bool {
mac := hmac.New(sha256.New, []byte(apiKey))
mac.Write(body)
expected := hex.EncodeToString(mac.Sum(nil))
provided := strings.TrimPrefix(signature, "sha256=")
return hmac.Equal([]byte(expected), []byte(provided))
}
func webhookHandler(w http.ResponseWriter, r *http.Request) {
body, _ := io.ReadAll(r.Body)
signature := r.Header.Get("X-Webhook-Signature")
if !verifySignature(body, signature) {
http.Error(w, "Invalid signature", http.StatusUnauthorized)
return
}
var payload map[string]interface{}
json.Unmarshal(body, &payload)
// Process webhook
go processWebhook(payload)
w.WriteHeader(http.StatusOK)
w.Write([]byte("OK"))
}
func processWebhook(payload map[string]interface{}) {
// Implement processing
}
func main() {
http.HandleFunc("/webhooks/ag2trust", webhookHandler)
http.ListenAndServe(":3000", nil)
}
Step 2: Test Your Endpoint¶
Local Testing¶
Use a tunneling service for local development:
# Install ngrok
brew install ngrok
# Start tunnel
ngrok http 3000
# Note the HTTPS URL
# https://abc123.ngrok.io
Test with AG2Trust¶
curl -X POST https://agents.ag2trust.com/api/v1/webhook/test \
-H "X-API-Key: cust_your_api_key" \
-H "Content-Type: application/json" \
-d '{"webhook_url": "https://abc123.ngrok.io/webhooks/ag2trust"}'
Expected response:
Step 3: Send Async Requests¶
Once your webhook is working, send async requests:
curl -X POST https://agents.ag2trust.com/api/v1/agents/{agent_id}/messages \
-H "X-API-Key: cust_your_api_key" \
-H "Content-Type: application/json" \
-d '{
"message": "Analyze this large dataset and provide insights...",
"webhook_url": "https://your-server.com/webhooks/ag2trust"
}'
Response (immediate):
{
"callback_id": "cb_xyz789",
"status": "processing",
"agent_id": "uuid",
"timestamp": "2025-01-15T10:30:00Z"
}
Webhook delivery (later):
{
"callback_id": "cb_xyz789",
"message_id": "msg_abc123",
"agent_id": "uuid",
"timestamp": "2025-01-15T10:30:15Z",
"status": "completed",
"content": [
{
"type": "text",
"text": "Based on my analysis..."
}
],
"metadata": {
"tokens_used": 450,
"model": "gpt-4o",
"duration_ms": 15000
}
}
Step 4: Set Default Webhook¶
Instead of specifying webhook_url per request, set a default:
curl -X PUT https://agents.ag2trust.com/api/v1/webhook \
-H "X-API-Key: cust_your_api_key" \
-H "Content-Type: application/json" \
-d '{"webhook_url": "https://your-server.com/webhooks/ag2trust"}'
Now async requests use this URL automatically.
Handling Webhook Payloads¶
Success Payload¶
{
"callback_id": "cb_xyz789",
"message_id": "msg_abc123",
"session_id": "session_456",
"agent_id": "uuid",
"timestamp": "2025-01-15T10:30:15Z",
"status": "completed",
"content": [
{"type": "text", "text": "Response content..."}
],
"metadata": {
"tokens_used": 450,
"model": "gpt-4o",
"duration_ms": 15000,
"tool_calls": []
}
}
Failure Payload¶
{
"callback_id": "cb_xyz789",
"agent_id": "uuid",
"timestamp": "2025-01-15T10:30:15Z",
"status": "failed",
"error": "Agent timeout",
"error_code": "AGENT_TIMEOUT"
}
Processing Example¶
def process_webhook(payload):
callback_id = payload["callback_id"]
status = payload["status"]
if status == "completed":
content = payload["content"]
metadata = payload["metadata"]
# Extract text response
text_blocks = [
b["text"] for b in content
if b["type"] == "text"
]
response_text = "\n".join(text_blocks)
# Log metrics
log_metrics(
callback_id=callback_id,
tokens=metadata["tokens_used"],
duration_ms=metadata["duration_ms"]
)
# Update your database
update_request_status(callback_id, "completed", response_text)
elif status == "failed":
error = payload.get("error", "Unknown error")
update_request_status(callback_id, "failed", error)
alert_operations(callback_id, error)
Idempotency¶
Webhooks may be delivered multiple times. Implement idempotency:
# Using Redis for tracking
import redis
r = redis.Redis()
def process_webhook(payload):
callback_id = payload["callback_id"]
# Check if already processed
if r.get(f"processed:{callback_id}"):
return # Already handled
# Process the webhook
handle_webhook(payload)
# Mark as processed (24h TTL)
r.setex(f"processed:{callback_id}", 86400, "1")
Retry Behavior¶
AG2Trust retries failed webhook deliveries:
| Attempt | Delay | Cumulative |
|---|---|---|
| 1 | Immediate | 0s |
| 2 | 1s | 1s |
| 3 | 5s | 6s |
| 4 | 30s | 36s |
| 5 | 5m | 5m 36s |
After 5 failed attempts, the webhook is marked as failed.
Security Best Practices¶
1. Always Verify Signatures¶
Never skip signature verification:
# WRONG - Don't do this
@app.route("/webhooks/ag2trust", methods=["POST"])
def webhook():
payload = request.json # No verification!
process(payload)
# CORRECT
@app.route("/webhooks/ag2trust", methods=["POST"])
def webhook():
if not verify_signature(request.data, request.headers.get("X-Webhook-Signature")):
return "Unauthorized", 401
process(request.json)
2. Use Timing-Safe Comparison¶
Prevent timing attacks:
# WRONG - Timing attack vulnerable
def verify(expected, provided):
return expected == provided
# CORRECT - Timing-safe
import hmac
def verify(expected, provided):
return hmac.compare_digest(expected, provided)
3. Respond Quickly¶
Process async to avoid timeouts:
@app.route("/webhooks/ag2trust", methods=["POST"])
def webhook():
verify_signature(request)
queue.add(request.json) # Queue for async processing
return "OK", 200 # Return immediately
4. Use HTTPS Only¶
Never use HTTP webhook URLs in production.
Monitoring¶
Track Webhook Metrics¶
Monitor:
- Delivery success rate
- Average processing time
- Retry frequency
- Failed deliveries
Example Dashboard Queries¶
-- Failed webhooks in last hour
SELECT callback_id, error, timestamp
FROM webhook_deliveries
WHERE status = 'failed'
AND timestamp > NOW() - INTERVAL '1 hour';
-- Average processing time
SELECT AVG(duration_ms)
FROM webhook_deliveries
WHERE status = 'delivered';
Troubleshooting¶
Webhook not received¶
- Check webhook URL is correct
- Verify HTTPS is working
- Check firewall allows AG2Trust IPs
- Review server logs
Signature verification fails¶
- Ensure you're using raw request body
- Check API key matches
- Verify no middleware modifying body
Timeouts¶
- Return 200 faster (process async)
- Increase server timeout limits
- Check for blocking operations
Next Steps¶
- API Reference: Webhooks - Technical details
- Agent Pools - Load-balanced agents
- Error Handling - Handle failures