Skip to main content

Streaming Replies

After receiving an event, your agent processes it and streams back a response. Relay forwards tokens in real-time, creating a "typing..." effect in the app.

Token Stream Format

As your agent generates text, send individual tokens:

{ "type": "token", "event_id": "evt_k9p2m", "token": "Here" }
{ "type": "token", "event_id": "evt_k9p2m", "token": "'s the" }
{ "type": "token", "event_id": "evt_k9p2m", "token": " summary" }

Each token message contains:

FieldTypeNotes
typestringAlways "token"
event_idstringEcho the event_id from the incoming event
tokenstringA fragment of text (typically 1-5 words)

Token Size Recommendations

  • Too small (1 character): High overhead, many messages
  • Too large (entire paragraph): Feels less like streaming
  • Sweet spot (1-5 words): Balanced, feels responsive
# Good token sizes
tokens = [
"Here",
"'s the",
" summary",
" of the",
" Q2 Roadmap"
]

Final Reply Format

After all tokens are streamed, send a final reply:

{
"type": "reply",
"event_id": "evt_k9p2m",
"content": "Here's the summary of the Q2 Roadmap Review: The team has identified three key deliverables for Q2. First, we need to finalize the timeline. Second, allocate resources across projects. Third, define success metrics.",
"done": true,
"metadata": {
"tokens_used": 1500,
"model": "claude-sonnet",
"latency_ms": 2300
}
}
FieldTypeNotes
typestringAlways "reply"
event_idstringEcho the event_id
contentstringThe complete response text
donebooleanAlways true on final reply
metadataobjectPerformance metrics (optional)

Metadata Fields

FieldTypeExampleNotes
tokens_usednumber1500Total tokens in response
modelstring"claude-sonnet"Which model generated this
latency_msnumber2300End-to-end processing time

Error Messages

If something goes wrong during processing, send an error:

{
"type": "error",
"event_id": "evt_k9p2m",
"error": "Session timed out while processing",
"code": "AGENT_TIMEOUT"
}
FieldTypeNotes
typestringAlways "error"
event_idstringEcho the event_id
errorstringHuman-readable description
codestringError code (see Error Codes)

Common error codes:

  • AGENT_TIMEOUT: Processing took too long
  • AGENT_PROCESSING_ERROR: Exception during processing
  • INVALID_PAYLOAD: Payload format unexpected
  • RATE_LIMITED: Too many responses in quick succession

Streaming Implementation

Python Example

import asyncio
import json

async def handle_event_and_stream(websocket, event):
event_id = event["event_id"]
payload = event["payload"]

try:
# Call your AI model with streaming
full_text = []

async for token in call_ai_model_streaming(payload):
# Send token immediately
await websocket.send(json.dumps({
"type": "token",
"event_id": event_id,
"token": token
}))

full_text.append(token)

# Optional: yield occasionally to avoid overwhelming Relay
if len(full_text) % 10 == 0:
await asyncio.sleep(0.01)

# All tokens streamed, now send final reply
complete_response = "".join(full_text)

await websocket.send(json.dumps({
"type": "reply",
"event_id": event_id,
"content": complete_response,
"done": True,
"metadata": {
"tokens_used": len(full_text),
"model": "claude-sonnet",
"latency_ms": 2300
}
}))

except Exception as e:
# Send error instead
await websocket.send(json.dumps({
"type": "error",
"event_id": event_id,
"error": str(e),
"code": "AGENT_PROCESSING_ERROR"
}))

Node.js Example

async function handleEventAndStream(ws, event) {
const eventId = event.event_id;
const payload = event.payload;

try {
const fullText = [];

// Call your AI model
const stream = await callAIModelStreaming(payload);

for await (const token of stream) {
// Send token immediately
ws.send(JSON.stringify({
type: 'token',
event_id: eventId,
token: token
}));

fullText.push(token);
}

// Send final reply
const completeResponse = fullText.join('');

ws.send(JSON.stringify({
type: 'reply',
event_id: eventId,
content: completeResponse,
done: true,
metadata: {
tokens_used: fullText.length,
model: 'claude-sonnet',
latency_ms: 2300
}
}));

} catch (error) {
ws.send(JSON.stringify({
type: 'error',
event_id: eventId,
error: error.message,
code: 'AGENT_PROCESSING_ERROR'
}));
}
}

Streaming Best Practices

Flush Frequently

Don't buffer tokens — send them as soon as they're available:

# Good: send each token immediately
for token in response:
await websocket.send(json.dumps({"type": "token", ...}))

# Bad: buffer tokens before sending
tokens = []
for token in response:
tokens.append(token)
if len(tokens) > 100: # Only send every 100 tokens
send_batch(tokens) # Users see long delay!

Handle Backpressure

If Relay's buffer gets full, slow down:

import asyncio

async def stream_with_backpressure(websocket, tokens):
for token in tokens:
message = json.dumps({"type": "token", "event_id": event_id, "token": token})

try:
await websocket.send(message)
except Exception as e:
# WebSocket buffer full, wait before retrying
if "buffer" in str(e).lower() or "full" in str(e).lower():
await asyncio.sleep(0.01)
await websocket.send(message)
else:
raise

Measuring Latency

Record timing from event receipt to final reply:

import time

async def handle_event_timed(websocket, event):
start_time = time.time()
event_id = event["event_id"]

try:
full_text = []

async for token in call_ai_model_streaming(event["payload"]):
await websocket.send(json.dumps({
"type": "token",
"event_id": event_id,
"token": token
}))
full_text.append(token)

latency_ms = int((time.time() - start_time) * 1000)

await websocket.send(json.dumps({
"type": "reply",
"event_id": event_id,
"content": "".join(full_text),
"done": True,
"metadata": {
"tokens_used": len(full_text),
"model": "claude-sonnet",
"latency_ms": latency_ms
}
}))

except Exception as e:
# ...error handling

Streaming from Claude API

Here's how to stream from Anthropic's Claude API:

import anthropic
import asyncio

async def stream_claude_response(event):
client = anthropic.Anthropic(api_key="your-key")

system_prompt = "You are a helpful assistant for Portal tasks."
user_message = event["payload"]["message"]

full_text = []

with client.messages.stream(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
system=system_prompt,
messages=[{"role": "user", "content": user_message}]
) as stream:
for text in stream.text_stream:
full_text.append(text)
yield text # Yield for streaming

return "".join(full_text)

Complete Example: Claude Echo Agent

import asyncio
import websockets
import json
import time
import anthropic

class ClaudeAgent:
def __init__(self, agent_token: str):
self.agent_token = agent_token
self.client = anthropic.Anthropic()

async def process_event(self, websocket, event: dict):
"""Process one event and stream response"""
event_id = event["event_id"]
payload = event["payload"]
app_id = event["app_id"]

start_time = time.time()

try:
# Build system prompt based on app
system_prompt = self.get_system_prompt(app_id)

# Get user message from payload
user_message = payload.get("message", json.dumps(payload))

full_text = []
token_count = 0

# Stream from Claude
with self.client.messages.stream(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
system=system_prompt,
messages=[{"role": "user", "content": user_message}]
) as stream:
for text in stream.text_stream:
full_text.append(text)
token_count += 1

# Send token to Relay
await websocket.send(json.dumps({
"type": "token",
"event_id": event_id,
"token": text
}))

# Send final reply
latency_ms = int((time.time() - start_time) * 1000)

await websocket.send(json.dumps({
"type": "reply",
"event_id": event_id,
"content": "".join(full_text),
"done": True,
"metadata": {
"tokens_used": token_count,
"model": "claude-3-5-sonnet-20241022",
"latency_ms": latency_ms
}
}))

except Exception as e:
print(f"Error processing event: {e}")
await websocket.send(json.dumps({
"type": "error",
"event_id": event_id,
"error": str(e),
"code": "AGENT_PROCESSING_ERROR"
}))

def get_system_prompt(self, app_id: str) -> str:
prompts = {
"portal": "You are a helpful assistant for Portal task management. Provide concise, actionable summaries.",
"academy": "You are a helpful tutor for Academy. Explain concepts clearly and encourage learning.",
"flow": "You are a technical workflow assistant for Flow. Provide detailed technical guidance."
}
return prompts.get(app_id, "You are a helpful assistant.")

async def run(self):
uri = "wss://api.relay.ckgworks.com/v1/ws/agent"

async with websockets.connect(
uri,
extra_headers={"Authorization": f"Bearer {self.agent_token}"}
) as ws:
print("Claude agent connected to Relay")

# Heartbeat task
async def heartbeat():
while True:
await asyncio.sleep(30)
await ws.send(json.dumps({"type": "ping"}))

heartbeat_task = asyncio.create_task(heartbeat())

try:
async for message in ws:
data = json.loads(message)

if data["type"] == "ping":
await ws.send(json.dumps({"type": "pong"}))

elif data["type"] == "event":
await self.process_event(ws, data)

finally:
heartbeat_task.cancel()

# Run
agent = ClaudeAgent("rla_athena_...")
asyncio.run(agent.run())

Best Practices

Do

  • Stream tokens as they arrive (no buffering)
  • Always send final reply with metadata
  • Echo event_id accurately
  • Handle errors gracefully
  • Measure and report latency

Don't

  • Buffer tokens (causes delay)
  • Send empty tokens
  • Skip the final reply
  • Make up event_ids
  • Ignore errors

Troubleshooting

App shows "Athena is typing..." but nothing appears?

  • Check tokens are being sent
  • Verify event_id matches
  • Check for connection issues

Long delays before tokens appear?

  • Remove any token buffering logic
  • Check network latency
  • Reduce token size if it's large

"Pong" errors in Relay logs?

  • You may be processing events too slowly
  • Send heartbeats more frequently
  • Increase processing capacity