Streaming Replies
After receiving an event, your agent processes it and streams back a response. Relay forwards tokens in real-time, creating a "typing..." effect in the app.
Token Stream Format
As your agent generates text, send individual tokens:
{ "type": "token", "event_id": "evt_k9p2m", "token": "Here" }
{ "type": "token", "event_id": "evt_k9p2m", "token": "'s the" }
{ "type": "token", "event_id": "evt_k9p2m", "token": " summary" }
Each token message contains:
| Field | Type | Notes |
|---|---|---|
type | string | Always "token" |
event_id | string | Echo the event_id from the incoming event |
token | string | A fragment of text (typically 1-5 words) |
Token Size Recommendations
- Too small (1 character): High overhead, many messages
- Too large (entire paragraph): Feels less like streaming
- Sweet spot (1-5 words): Balanced, feels responsive
# Good token sizes
tokens = [
"Here",
"'s the",
" summary",
" of the",
" Q2 Roadmap"
]
Final Reply Format
After all tokens are streamed, send a final reply:
{
"type": "reply",
"event_id": "evt_k9p2m",
"content": "Here's the summary of the Q2 Roadmap Review: The team has identified three key deliverables for Q2. First, we need to finalize the timeline. Second, allocate resources across projects. Third, define success metrics.",
"done": true,
"metadata": {
"tokens_used": 1500,
"model": "claude-sonnet",
"latency_ms": 2300
}
}
| Field | Type | Notes |
|---|---|---|
type | string | Always "reply" |
event_id | string | Echo the event_id |
content | string | The complete response text |
done | boolean | Always true on final reply |
metadata | object | Performance metrics (optional) |
Metadata Fields
| Field | Type | Example | Notes |
|---|---|---|---|
tokens_used | number | 1500 | Total tokens in response |
model | string | "claude-sonnet" | Which model generated this |
latency_ms | number | 2300 | End-to-end processing time |
Error Messages
If something goes wrong during processing, send an error:
{
"type": "error",
"event_id": "evt_k9p2m",
"error": "Session timed out while processing",
"code": "AGENT_TIMEOUT"
}
| Field | Type | Notes |
|---|---|---|
type | string | Always "error" |
event_id | string | Echo the event_id |
error | string | Human-readable description |
code | string | Error code (see Error Codes) |
Common error codes:
AGENT_TIMEOUT: Processing took too longAGENT_PROCESSING_ERROR: Exception during processingINVALID_PAYLOAD: Payload format unexpectedRATE_LIMITED: Too many responses in quick succession
Streaming Implementation
Python Example
import asyncio
import json
async def handle_event_and_stream(websocket, event):
event_id = event["event_id"]
payload = event["payload"]
try:
# Call your AI model with streaming
full_text = []
async for token in call_ai_model_streaming(payload):
# Send token immediately
await websocket.send(json.dumps({
"type": "token",
"event_id": event_id,
"token": token
}))
full_text.append(token)
# Optional: yield occasionally to avoid overwhelming Relay
if len(full_text) % 10 == 0:
await asyncio.sleep(0.01)
# All tokens streamed, now send final reply
complete_response = "".join(full_text)
await websocket.send(json.dumps({
"type": "reply",
"event_id": event_id,
"content": complete_response,
"done": True,
"metadata": {
"tokens_used": len(full_text),
"model": "claude-sonnet",
"latency_ms": 2300
}
}))
except Exception as e:
# Send error instead
await websocket.send(json.dumps({
"type": "error",
"event_id": event_id,
"error": str(e),
"code": "AGENT_PROCESSING_ERROR"
}))
Node.js Example
async function handleEventAndStream(ws, event) {
const eventId = event.event_id;
const payload = event.payload;
try {
const fullText = [];
// Call your AI model
const stream = await callAIModelStreaming(payload);
for await (const token of stream) {
// Send token immediately
ws.send(JSON.stringify({
type: 'token',
event_id: eventId,
token: token
}));
fullText.push(token);
}
// Send final reply
const completeResponse = fullText.join('');
ws.send(JSON.stringify({
type: 'reply',
event_id: eventId,
content: completeResponse,
done: true,
metadata: {
tokens_used: fullText.length,
model: 'claude-sonnet',
latency_ms: 2300
}
}));
} catch (error) {
ws.send(JSON.stringify({
type: 'error',
event_id: eventId,
error: error.message,
code: 'AGENT_PROCESSING_ERROR'
}));
}
}
Streaming Best Practices
Flush Frequently
Don't buffer tokens — send them as soon as they're available:
# Good: send each token immediately
for token in response:
await websocket.send(json.dumps({"type": "token", ...}))
# Bad: buffer tokens before sending
tokens = []
for token in response:
tokens.append(token)
if len(tokens) > 100: # Only send every 100 tokens
send_batch(tokens) # Users see long delay!
Handle Backpressure
If Relay's buffer gets full, slow down:
import asyncio
async def stream_with_backpressure(websocket, tokens):
for token in tokens:
message = json.dumps({"type": "token", "event_id": event_id, "token": token})
try:
await websocket.send(message)
except Exception as e:
# WebSocket buffer full, wait before retrying
if "buffer" in str(e).lower() or "full" in str(e).lower():
await asyncio.sleep(0.01)
await websocket.send(message)
else:
raise
Measuring Latency
Record timing from event receipt to final reply:
import time
async def handle_event_timed(websocket, event):
start_time = time.time()
event_id = event["event_id"]
try:
full_text = []
async for token in call_ai_model_streaming(event["payload"]):
await websocket.send(json.dumps({
"type": "token",
"event_id": event_id,
"token": token
}))
full_text.append(token)
latency_ms = int((time.time() - start_time) * 1000)
await websocket.send(json.dumps({
"type": "reply",
"event_id": event_id,
"content": "".join(full_text),
"done": True,
"metadata": {
"tokens_used": len(full_text),
"model": "claude-sonnet",
"latency_ms": latency_ms
}
}))
except Exception as e:
# ...error handling
Streaming from Claude API
Here's how to stream from Anthropic's Claude API:
import anthropic
import asyncio
async def stream_claude_response(event):
client = anthropic.Anthropic(api_key="your-key")
system_prompt = "You are a helpful assistant for Portal tasks."
user_message = event["payload"]["message"]
full_text = []
with client.messages.stream(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
system=system_prompt,
messages=[{"role": "user", "content": user_message}]
) as stream:
for text in stream.text_stream:
full_text.append(text)
yield text # Yield for streaming
return "".join(full_text)
Complete Example: Claude Echo Agent
import asyncio
import websockets
import json
import time
import anthropic
class ClaudeAgent:
def __init__(self, agent_token: str):
self.agent_token = agent_token
self.client = anthropic.Anthropic()
async def process_event(self, websocket, event: dict):
"""Process one event and stream response"""
event_id = event["event_id"]
payload = event["payload"]
app_id = event["app_id"]
start_time = time.time()
try:
# Build system prompt based on app
system_prompt = self.get_system_prompt(app_id)
# Get user message from payload
user_message = payload.get("message", json.dumps(payload))
full_text = []
token_count = 0
# Stream from Claude
with self.client.messages.stream(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
system=system_prompt,
messages=[{"role": "user", "content": user_message}]
) as stream:
for text in stream.text_stream:
full_text.append(text)
token_count += 1
# Send token to Relay
await websocket.send(json.dumps({
"type": "token",
"event_id": event_id,
"token": text
}))
# Send final reply
latency_ms = int((time.time() - start_time) * 1000)
await websocket.send(json.dumps({
"type": "reply",
"event_id": event_id,
"content": "".join(full_text),
"done": True,
"metadata": {
"tokens_used": token_count,
"model": "claude-3-5-sonnet-20241022",
"latency_ms": latency_ms
}
}))
except Exception as e:
print(f"Error processing event: {e}")
await websocket.send(json.dumps({
"type": "error",
"event_id": event_id,
"error": str(e),
"code": "AGENT_PROCESSING_ERROR"
}))
def get_system_prompt(self, app_id: str) -> str:
prompts = {
"portal": "You are a helpful assistant for Portal task management. Provide concise, actionable summaries.",
"academy": "You are a helpful tutor for Academy. Explain concepts clearly and encourage learning.",
"flow": "You are a technical workflow assistant for Flow. Provide detailed technical guidance."
}
return prompts.get(app_id, "You are a helpful assistant.")
async def run(self):
uri = "wss://api.relay.ckgworks.com/v1/ws/agent"
async with websockets.connect(
uri,
extra_headers={"Authorization": f"Bearer {self.agent_token}"}
) as ws:
print("Claude agent connected to Relay")
# Heartbeat task
async def heartbeat():
while True:
await asyncio.sleep(30)
await ws.send(json.dumps({"type": "ping"}))
heartbeat_task = asyncio.create_task(heartbeat())
try:
async for message in ws:
data = json.loads(message)
if data["type"] == "ping":
await ws.send(json.dumps({"type": "pong"}))
elif data["type"] == "event":
await self.process_event(ws, data)
finally:
heartbeat_task.cancel()
# Run
agent = ClaudeAgent("rla_athena_...")
asyncio.run(agent.run())
Best Practices
Do
- Stream tokens as they arrive (no buffering)
- Always send final reply with metadata
- Echo event_id accurately
- Handle errors gracefully
- Measure and report latency
Don't
- Buffer tokens (causes delay)
- Send empty tokens
- Skip the final reply
- Make up event_ids
- Ignore errors
Troubleshooting
App shows "Athena is typing..." but nothing appears?
- Check tokens are being sent
- Verify event_id matches
- Check for connection issues
Long delays before tokens appear?
- Remove any token buffering logic
- Check network latency
- Reduce token size if it's large
"Pong" errors in Relay logs?
- You may be processing events too slowly
- Send heartbeats more frequently
- Increase processing capacity