Receiving Replies
After sending an event to Relay, you receive replies through multiple messages. Relay streams the agent's response token-by-token, then sends a final reply message with full metadata.
Message Types
Token Stream Messages
As the agent generates a response, you receive individual tokens:
{ "type": "token", "event_id": "evt_k9p2m", "agent_id": "athena", "token": "Here" }
{ "type": "token", "event_id": "evt_k9p2m", "agent_id": "athena", "token": "'s the" }
{ "type": "token", "event_id": "evt_k9p2m", "agent_id": "athena", "token": " summary" }
Each token message contains:
| Field | Type | Notes |
|---|---|---|
type | string | Always "token" |
event_id | string | Matches the original event's event_id |
agent_id | string | The agent that generated this token |
token | string | A fragment of the response (1-5 words typically) |
Final Reply Message
After all tokens are streamed, you receive a final reply with the complete response:
{
"type": "reply",
"event_id": "evt_k9p2m",
"agent_id": "athena",
"thread_id": "task-123",
"reply": "Here's the summary of the Q2 Roadmap Review: The team has identified three key deliverables for Q2...",
"payload": {
"event": "comment.mention",
"message": "@athena summarize this task",
"task_id": "task-123",
"task_title": "Q2 Roadmap Review"
},
"metadata": {
"tokens_used": 1500,
"model": "claude-sonnet",
"latency_ms": 2300,
"session_key": "relay:athena:portal:task-123"
}
}
The final reply contains:
| Field | Type | Notes |
|---|---|---|
type | string | Always "reply" |
event_id | string | Matches the original event |
agent_id | string | Which agent replied |
thread_id | string | The thread_id from the original event |
reply | string | The complete generated response |
payload | object | Echo of your original payload |
metadata | object | Performance and model info |
Error Messages
If the agent encounters an error, you receive an error message:
{
"type": "error",
"event_id": "evt_k9p2m",
"agent_id": "athena",
"error": "Agent not connected",
"code": "AGENT_OFFLINE"
}
Error messages:
| Field | Type | Notes |
|---|---|---|
type | string | Always "error" |
event_id | string | Matches the original event |
agent_id | string | Which agent had the error |
error | string | Human-readable description |
code | string | Error code (see Error Codes) |
Correlating Replies with Events
Use event_id to match tokens and replies to their original events:
import asyncio
import json
# Track pending events
pending_events = {}
async def send_event_and_track(websocket, agent_id, thread_id, payload):
event = {
"type": "event",
"agent_id": agent_id,
"thread_id": thread_id,
"payload": payload
}
await websocket.send(json.dumps(event))
print(f"Sent event to {agent_id}")
async def receive_replies(websocket):
async for message in websocket:
data = json.loads(message)
event_id = data.get("event_id")
if data["type"] == "accepted":
print(f"Event {event_id} accepted")
pending_events[event_id] = {
"tokens": [],
"agent_id": data["agent_id"],
"accepted_at": time.time()
}
elif data["type"] == "token":
if event_id in pending_events:
pending_events[event_id]["tokens"].append(data["token"])
# Display streaming token in real-time
print(data["token"], end="", flush=True)
elif data["type"] == "reply":
if event_id in pending_events:
reply_data = pending_events.pop(event_id)
print() # Newline after tokens
print(f"Final reply: {data['reply']}")
print(f"Metadata: {data['metadata']}")
elif data["type"] == "error":
if event_id in pending_events:
pending_events.pop(event_id)
print(f"Error for event {event_id}: {data['error']} ({data['code']})")
Building a "Typing..." Indicator
Use the token stream to create a live updating UI:
Python Example
import asyncio
import websockets
import json
import time
async def handle_reply_stream(event_id, websocket, on_token, on_reply, on_error):
"""
on_token: callback(token: str)
on_reply: callback(reply: str, metadata: dict)
on_error: callback(error: str, code: str)
"""
full_reply = []
async for message in websocket:
data = json.loads(message)
if data.get("event_id") != event_id:
continue
if data["type"] == "token":
full_reply.append(data["token"])
on_token(data["token"])
elif data["type"] == "reply":
on_reply(data["reply"], data["metadata"])
return
elif data["type"] == "error":
on_error(data["error"], data["code"])
return
# Usage with UI callbacks
def display_token(token):
print(token, end="", flush=True)
def display_reply(reply, metadata):
print()
print(f"\nFinal reply: {reply}")
print(f"Tokens: {metadata['tokens_used']}")
def display_error(error, code):
print(f"\nError: {error} ({code})")
# In your app
await send_event(ws, "athena", "task-123", payload)
accepted = await ws.recv() # Get "accepted" message
event_id = json.loads(accepted)["event_id"]
await handle_reply_stream(
event_id,
ws,
on_token=display_token,
on_reply=display_reply,
on_error=display_error
)
JavaScript/React Example
import { useState, useRef } from 'react';
function AIComment({ agentId, threadId, payload, ws }) {
const [tokens, setTokens] = useState('');
const [isLoading, setIsLoading] = useState(true);
const [metadata, setMetadata] = useState(null);
const [error, setError] = useState(null);
const eventIdRef = useRef(null);
const sendEvent = async () => {
const event = {
type: 'event',
agent_id: agentId,
thread_id: threadId,
payload: payload
};
ws.send(JSON.stringify(event));
// Listen for responses
const handleMessage = (event) => {
const data = JSON.parse(event.data);
if (data.event_id !== eventIdRef.current) return;
if (data.type === 'accepted') {
eventIdRef.current = data.event_id;
setIsLoading(true);
}
if (data.type === 'token') {
setTokens((prev) => prev + data.token);
}
if (data.type === 'reply') {
setTokens(data.reply);
setMetadata(data.metadata);
setIsLoading(false);
ws.removeEventListener('message', handleMessage);
}
if (data.type === 'error') {
setError(`${data.error} (${data.code})`);
setIsLoading(false);
ws.removeEventListener('message', handleMessage);
}
};
ws.addEventListener('message', handleMessage);
};
return (
<div className="ai-comment">
<button onClick={sendEvent} disabled={isLoading}>
Ask AI
</button>
{isLoading && <p className="typing">Athena is thinking...</p>}
{tokens && <p className="reply">{tokens}</p>}
{metadata && (
<p className="metadata">
{metadata.tokens_used} tokens · {metadata.latency_ms}ms
</p>
)}
{error && <p className="error">{error}</p>}
</div>
);
}
export default AIComment;
Best Practices
Do
- Always use event_id to match replies to events
- Display tokens in real-time as they arrive
- Handle token, reply, and error message types
- Store metadata after reply completes
- Implement timeout for orphaned events
Don't
- Mix tokens from different events
- Wait for final reply before showing feedback
- Assume tokens arrive in order (they do, but be defensive)
- Forget to clear pending events after reply
- Ignore error messages
Handling Multiple Concurrent Events
If you send multiple events in quick succession, use event_id to keep them separate:
import asyncio
import json
pending_events = {}
async def send_multiple_events(websocket, events):
for agent_id, thread_id, payload in events:
event = {
"type": "event",
"agent_id": agent_id,
"thread_id": thread_id,
"payload": payload
}
await websocket.send(json.dumps(event))
async def receive_all_replies(websocket):
"""Receive and demux all concurrent replies"""
async for message in websocket:
data = json.loads(message)
event_id = data.get("event_id")
if event_id not in pending_events:
pending_events[event_id] = {"tokens": [], "status": "pending"}
if data["type"] == "token":
pending_events[event_id]["tokens"].append(data["token"])
elif data["type"] == "reply":
pending_events[event_id].update({
"status": "complete",
"reply": data["reply"],
"metadata": data["metadata"]
})
elif data["type"] == "error":
pending_events[event_id].update({
"status": "error",
"error": data["error"],
"code": data["code"]
})
Timeouts
If an event doesn't receive a reply within a reasonable time, treat it as a timeout:
async def wait_for_reply(event_id, websocket, timeout_seconds=30):
"""Wait for a reply with timeout"""
start_time = time.time()
async for message in websocket:
data = json.loads(message)
if data.get("event_id") != event_id:
continue
if data["type"] == "reply" or data["type"] == "error":
return data
# Check timeout
if time.time() - start_time > timeout_seconds:
raise TimeoutError(f"No reply for event {event_id} after {timeout_seconds}s")