Skip to main content

Receiving Replies

After sending an event to Relay, you receive replies through multiple messages. Relay streams the agent's response token-by-token, then sends a final reply message with full metadata.

Message Types

Token Stream Messages

As the agent generates a response, you receive individual tokens:

{ "type": "token", "event_id": "evt_k9p2m", "agent_id": "athena", "token": "Here" }
{ "type": "token", "event_id": "evt_k9p2m", "agent_id": "athena", "token": "'s the" }
{ "type": "token", "event_id": "evt_k9p2m", "agent_id": "athena", "token": " summary" }

Each token message contains:

FieldTypeNotes
typestringAlways "token"
event_idstringMatches the original event's event_id
agent_idstringThe agent that generated this token
tokenstringA fragment of the response (1-5 words typically)

Final Reply Message

After all tokens are streamed, you receive a final reply with the complete response:

{
"type": "reply",
"event_id": "evt_k9p2m",
"agent_id": "athena",
"thread_id": "task-123",
"reply": "Here's the summary of the Q2 Roadmap Review: The team has identified three key deliverables for Q2...",
"payload": {
"event": "comment.mention",
"message": "@athena summarize this task",
"task_id": "task-123",
"task_title": "Q2 Roadmap Review"
},
"metadata": {
"tokens_used": 1500,
"model": "claude-sonnet",
"latency_ms": 2300,
"session_key": "relay:athena:portal:task-123"
}
}

The final reply contains:

FieldTypeNotes
typestringAlways "reply"
event_idstringMatches the original event
agent_idstringWhich agent replied
thread_idstringThe thread_id from the original event
replystringThe complete generated response
payloadobjectEcho of your original payload
metadataobjectPerformance and model info

Error Messages

If the agent encounters an error, you receive an error message:

{
"type": "error",
"event_id": "evt_k9p2m",
"agent_id": "athena",
"error": "Agent not connected",
"code": "AGENT_OFFLINE"
}

Error messages:

FieldTypeNotes
typestringAlways "error"
event_idstringMatches the original event
agent_idstringWhich agent had the error
errorstringHuman-readable description
codestringError code (see Error Codes)

Correlating Replies with Events

Use event_id to match tokens and replies to their original events:

import asyncio
import json

# Track pending events
pending_events = {}

async def send_event_and_track(websocket, agent_id, thread_id, payload):
event = {
"type": "event",
"agent_id": agent_id,
"thread_id": thread_id,
"payload": payload
}

await websocket.send(json.dumps(event))
print(f"Sent event to {agent_id}")


async def receive_replies(websocket):
async for message in websocket:
data = json.loads(message)
event_id = data.get("event_id")

if data["type"] == "accepted":
print(f"Event {event_id} accepted")
pending_events[event_id] = {
"tokens": [],
"agent_id": data["agent_id"],
"accepted_at": time.time()
}

elif data["type"] == "token":
if event_id in pending_events:
pending_events[event_id]["tokens"].append(data["token"])
# Display streaming token in real-time
print(data["token"], end="", flush=True)

elif data["type"] == "reply":
if event_id in pending_events:
reply_data = pending_events.pop(event_id)
print() # Newline after tokens
print(f"Final reply: {data['reply']}")
print(f"Metadata: {data['metadata']}")

elif data["type"] == "error":
if event_id in pending_events:
pending_events.pop(event_id)
print(f"Error for event {event_id}: {data['error']} ({data['code']})")

Building a "Typing..." Indicator

Use the token stream to create a live updating UI:

Python Example

import asyncio
import websockets
import json
import time

async def handle_reply_stream(event_id, websocket, on_token, on_reply, on_error):
"""
on_token: callback(token: str)
on_reply: callback(reply: str, metadata: dict)
on_error: callback(error: str, code: str)
"""
full_reply = []

async for message in websocket:
data = json.loads(message)

if data.get("event_id") != event_id:
continue

if data["type"] == "token":
full_reply.append(data["token"])
on_token(data["token"])

elif data["type"] == "reply":
on_reply(data["reply"], data["metadata"])
return

elif data["type"] == "error":
on_error(data["error"], data["code"])
return


# Usage with UI callbacks
def display_token(token):
print(token, end="", flush=True)

def display_reply(reply, metadata):
print()
print(f"\nFinal reply: {reply}")
print(f"Tokens: {metadata['tokens_used']}")

def display_error(error, code):
print(f"\nError: {error} ({code})")

# In your app
await send_event(ws, "athena", "task-123", payload)
accepted = await ws.recv() # Get "accepted" message
event_id = json.loads(accepted)["event_id"]

await handle_reply_stream(
event_id,
ws,
on_token=display_token,
on_reply=display_reply,
on_error=display_error
)

JavaScript/React Example

import { useState, useRef } from 'react';

function AIComment({ agentId, threadId, payload, ws }) {
const [tokens, setTokens] = useState('');
const [isLoading, setIsLoading] = useState(true);
const [metadata, setMetadata] = useState(null);
const [error, setError] = useState(null);
const eventIdRef = useRef(null);

const sendEvent = async () => {
const event = {
type: 'event',
agent_id: agentId,
thread_id: threadId,
payload: payload
};

ws.send(JSON.stringify(event));

// Listen for responses
const handleMessage = (event) => {
const data = JSON.parse(event.data);

if (data.event_id !== eventIdRef.current) return;

if (data.type === 'accepted') {
eventIdRef.current = data.event_id;
setIsLoading(true);
}

if (data.type === 'token') {
setTokens((prev) => prev + data.token);
}

if (data.type === 'reply') {
setTokens(data.reply);
setMetadata(data.metadata);
setIsLoading(false);
ws.removeEventListener('message', handleMessage);
}

if (data.type === 'error') {
setError(`${data.error} (${data.code})`);
setIsLoading(false);
ws.removeEventListener('message', handleMessage);
}
};

ws.addEventListener('message', handleMessage);
};

return (
<div className="ai-comment">
<button onClick={sendEvent} disabled={isLoading}>
Ask AI
</button>

{isLoading && <p className="typing">Athena is thinking...</p>}

{tokens && <p className="reply">{tokens}</p>}

{metadata && (
<p className="metadata">
{metadata.tokens_used} tokens · {metadata.latency_ms}ms
</p>
)}

{error && <p className="error">{error}</p>}
</div>
);
}

export default AIComment;

Best Practices

Do

  • Always use event_id to match replies to events
  • Display tokens in real-time as they arrive
  • Handle token, reply, and error message types
  • Store metadata after reply completes
  • Implement timeout for orphaned events

Don't

  • Mix tokens from different events
  • Wait for final reply before showing feedback
  • Assume tokens arrive in order (they do, but be defensive)
  • Forget to clear pending events after reply
  • Ignore error messages

Handling Multiple Concurrent Events

If you send multiple events in quick succession, use event_id to keep them separate:

import asyncio
import json

pending_events = {}

async def send_multiple_events(websocket, events):
for agent_id, thread_id, payload in events:
event = {
"type": "event",
"agent_id": agent_id,
"thread_id": thread_id,
"payload": payload
}
await websocket.send(json.dumps(event))


async def receive_all_replies(websocket):
"""Receive and demux all concurrent replies"""
async for message in websocket:
data = json.loads(message)
event_id = data.get("event_id")

if event_id not in pending_events:
pending_events[event_id] = {"tokens": [], "status": "pending"}

if data["type"] == "token":
pending_events[event_id]["tokens"].append(data["token"])

elif data["type"] == "reply":
pending_events[event_id].update({
"status": "complete",
"reply": data["reply"],
"metadata": data["metadata"]
})

elif data["type"] == "error":
pending_events[event_id].update({
"status": "error",
"error": data["error"],
"code": data["code"]
})

Timeouts

If an event doesn't receive a reply within a reasonable time, treat it as a timeout:

async def wait_for_reply(event_id, websocket, timeout_seconds=30):
"""Wait for a reply with timeout"""
start_time = time.time()

async for message in websocket:
data = json.loads(message)

if data.get("event_id") != event_id:
continue

if data["type"] == "reply" or data["type"] == "error":
return data

# Check timeout
if time.time() - start_time > timeout_seconds:
raise TimeoutError(f"No reply for event {event_id} after {timeout_seconds}s")