Processing Events
When an app sends an event to your agent, Relay delivers it over your WebSocket connection. Your agent then processes the event and streams back a response.
Event Message Format
Events arrive in this format:
{
"type": "event",
"event_id": "evt_k9p2m",
"app_id": "portal",
"thread_id": "task-123",
"session_key": "relay:portal:task-123",
"payload": {
"event": "comment.mention",
"message": "@athena summarize this task",
"sender": { "id": "user-456", "name": "Christian Garcia" },
"task_id": "task-123",
"task_title": "Q2 Roadmap Review",
"project": "Smiling Group Platform",
"comments": [...]
}
}
| Field | Type | Notes |
|---|---|---|
type | string | Always "event" |
event_id | string | Relay's unique ID for this event. Echo in token/reply. |
app_id | string | Which app sent this (e.g., "portal", "academy") |
thread_id | string | The thread within that app (e.g., "task-123") |
session_key | string | relay:{app_id}:{thread_id} — use for session continuity |
payload | object | The app-specific context. Relay never validates it. |
Important: The session_key does NOT include the agent_id, because you already know who you are (the token identified you on connection).
Session Key Interpretation
Extract context from the session key:
def parse_session_key(session_key):
"""Parse relay:app:thread into components"""
parts = session_key.split(":")
if len(parts) == 3:
prefix, app_id, thread_id = parts
return {
"app": app_id,
"thread": thread_id
}
return None
# Example
session_key = "relay:portal:task-123"
context = parse_session_key(session_key)
# context = {"app": "portal", "thread": "task-123"}
Use app_id to apply app-specific context (Portal uses tasks, Academy uses lessons, etc.).
Creating and Resuming Sessions
The session key identifies a persistent conversation. When you receive an event with a session_key, create a new OpenClaw session or resume an existing one:
With OpenClaw Plugin
The plugin handles session management automatically:
# OpenClaw plugin config (openclaw.plugin.json)
{
"agents": [
{
"agent_id": "athena",
"token": "rla_athena_...",
"ttl_days": 14 # Sessions expire after 14 days
}
]
}
# The plugin:
# 1. Receives session_key: "relay:portal:task-123"
# 2. Creates/resumes OpenClaw session with that key
# 3. Injects the payload as the user message
# 4. Your agent logic processes it
# 5. Streams tokens back to Relay
Custom Agent Server (without OpenClaw)
Manage sessions manually in your agent server:
import asyncio
import json
from typing import Dict, List
class AgentSessionManager:
def __init__(self):
self.sessions: Dict[str, Dict] = {}
async def process_event(self, event: Dict):
"""Receive and process an event"""
event_id = event["event_id"]
session_key = event["session_key"]
payload = event["payload"]
app_id = event["app_id"]
# Get or create session
if session_key not in self.sessions:
self.sessions[session_key] = {
"created_at": time.time(),
"app": app_id,
"messages": [],
"context": {}
}
session = self.sessions[session_key]
# Extract context from app-specific payload
context = self.extract_context(app_id, payload)
session["context"].update(context)
# Add user message to conversation
session["messages"].append({
"role": "user",
"content": payload.get("message") or json.dumps(payload)
})
return session
def extract_context(self, app_id: str, payload: Dict):
"""Extract app-specific context"""
if app_id == "portal":
return {
"task_id": payload.get("task_id"),
"task_title": payload.get("task_title"),
"project": payload.get("project")
}
elif app_id == "academy":
return {
"lesson_id": payload.get("lesson_id"),
"lesson_title": payload.get("lesson_title")
}
else:
return {}
async def generate_response(self, session_key: str, event_id: str):
"""Generate response for a session"""
session = self.sessions[session_key]
# Build system prompt with context
system_prompt = self.build_system_prompt(session["context"])
# Call your AI model (Claude, GPT, etc.)
response_tokens = await call_ai_model(
messages=session["messages"],
system=system_prompt
)
return response_tokens
Handling Different App Types
Different apps send different payload structures. Extract what you need:
Portal (Task-Based)
def handle_portal_event(payload):
"""Portal sends task context"""
return {
"type": "task",
"task_id": payload["task_id"],
"task_title": payload["task_title"],
"project": payload["project"],
"message": payload["message"],
"sender_name": payload["sender"]["name"]
}
Academy (Lesson-Based)
def handle_academy_event(payload):
"""Academy sends lesson context"""
return {
"type": "lesson",
"lesson_id": payload["lesson_id"],
"lesson_title": payload["lesson_title"],
"subject": payload["subject"],
"question": payload["question"],
"student_name": payload["student_name"]
}
Flow (Job-Based)
def handle_flow_event(payload):
"""Flow sends job context"""
return {
"type": "job",
"job_id": payload["job_id"],
"job_type": payload["type"],
"inputs": payload["inputs"],
"context": payload.get("context", {})
}
Session Continuity
Sessions persist for a configurable TTL (time-to-live). Within the TTL window, Relay maintains conversation history.
Session Lifecycle
Event 1: relay:portal:task-123 arrives
└─ Create OpenClaw session
└─ Agent generates response
└─ Session context saved
[6 hours later]
Event 2: relay:portal:task-123 arrives (same thread_id)
└─ Resume existing session
└─ Agent sees previous conversation
└─ Builds on prior context
[15 days later]
Event 3: relay:portal:task-123 arrives
└─ Session expired (TTL = 14 days)
└─ Create new session
└─ Previous context lost
Configuring TTL
In the OpenClaw plugin config:
{
"agents": [
{
"agent_id": "athena",
"ttl_days": 14 // 14-day memory
},
{
"agent_id": "klyve",
"ttl_days": 30 // 30-day memory for technical workflows
}
]
}
Handling Expired Sessions
When a session expires and a new event arrives, you start fresh. The app can re-inject context:
async def process_event(event):
session_key = event["session_key"]
payload = event["payload"]
# If session is new, payload includes full context
# e.g., payload includes task history, comments, etc.
# If session is resumed, payload might only have
# the new message (app optimizes to send less)
if is_new_session(session_key):
print("New session, full context provided in payload")
else:
print("Resumed session, building on prior context")
Extracting the App ID
Use the app_id from the event to apply app-specific logic:
async def process_event(websocket, event):
event_id = event["event_id"]
session_key = event["session_key"]
app_id = event["app_id"]
payload = event["payload"]
# Route to app-specific handler
if app_id == "portal":
response = await handle_portal_task(payload)
elif app_id == "academy":
response = await handle_academy_question(payload)
elif app_id == "flow":
response = await handle_flow_job(payload)
else:
response = await handle_generic(payload)
# Stream tokens back
for token in response:
await websocket.send(json.dumps({
"type": "token",
"event_id": event_id,
"token": token
}))
# Send final reply
await websocket.send(json.dumps({
"type": "reply",
"event_id": event_id,
"content": full_response,
"done": True
}))
Custom Agent Server Example
import asyncio
import websockets
import json
import time
class SimpleEchoAgent:
"""Minimal agent that echoes the message back"""
async def handle_event(self, websocket, event):
event_id = event["event_id"]
payload = event["payload"]
# Get the user's message
message = payload.get("message", "No message")
# Echo back token by token
tokens = message.split()
for token in tokens:
await websocket.send(json.dumps({
"type": "token",
"event_id": event_id,
"token": token + " "
}))
# Simulate processing delay
await asyncio.sleep(0.1)
# Send final reply
await websocket.send(json.dumps({
"type": "reply",
"event_id": event_id,
"content": f"Echo: {message}",
"done": True,
"metadata": {
"tokens_used": len(tokens),
"model": "echo",
"latency_ms": 100
}
}))
async def run(self, agent_token):
uri = "wss://api.relay.ckgworks.com/v1/ws/agent"
async with websockets.connect(
uri,
extra_headers={"Authorization": f"Bearer {agent_token}"}
) as ws:
print("Connected to Relay")
async for message in ws:
data = json.loads(message)
if data["type"] == "ping":
await ws.send(json.dumps({"type": "pong"}))
elif data["type"] == "event":
await self.handle_event(ws, data)
# Run
agent = SimpleEchoAgent()
asyncio.run(agent.run("rla_athena_..."))
Best Practices
Do
- Always echo event_id in reply
- Extract app context from payload
- Use session_key for continuity
- Handle expired sessions gracefully
- Log event processing for debugging
Don't
- Hardcode app-specific logic without fallback
- Ignore the session_key
- Assume payload structure
- Process events synchronously (too slow)
- Skip metadata in final reply
Troubleshooting
Events not arriving?
- Check if your agent is connected (dashboard shows status)
- Verify your app is allowed to reach your agent (check allowlist)
- Check Relay logs for delivery errors
"Event_id mismatch" errors?
- Always echo the event_id from the incoming event
- Don't generate your own event_id
Sessions not resuming?
- Check if TTL has expired (default varies by agent)
- Verify session_key is being used correctly
- Check OpenClaw logs for session management issues