Skip to main content

Processing Events

When an app sends an event to your agent, Relay delivers it over your WebSocket connection. Your agent then processes the event and streams back a response.

Event Message Format

Events arrive in this format:

{
"type": "event",
"event_id": "evt_k9p2m",
"app_id": "portal",
"thread_id": "task-123",
"session_key": "relay:portal:task-123",
"payload": {
"event": "comment.mention",
"message": "@athena summarize this task",
"sender": { "id": "user-456", "name": "Christian Garcia" },
"task_id": "task-123",
"task_title": "Q2 Roadmap Review",
"project": "Smiling Group Platform",
"comments": [...]
}
}
FieldTypeNotes
typestringAlways "event"
event_idstringRelay's unique ID for this event. Echo in token/reply.
app_idstringWhich app sent this (e.g., "portal", "academy")
thread_idstringThe thread within that app (e.g., "task-123")
session_keystringrelay:{app_id}:{thread_id} — use for session continuity
payloadobjectThe app-specific context. Relay never validates it.

Important: The session_key does NOT include the agent_id, because you already know who you are (the token identified you on connection).

Session Key Interpretation

Extract context from the session key:

def parse_session_key(session_key):
"""Parse relay:app:thread into components"""
parts = session_key.split(":")
if len(parts) == 3:
prefix, app_id, thread_id = parts
return {
"app": app_id,
"thread": thread_id
}
return None

# Example
session_key = "relay:portal:task-123"
context = parse_session_key(session_key)
# context = {"app": "portal", "thread": "task-123"}

Use app_id to apply app-specific context (Portal uses tasks, Academy uses lessons, etc.).

Creating and Resuming Sessions

The session key identifies a persistent conversation. When you receive an event with a session_key, create a new OpenClaw session or resume an existing one:

With OpenClaw Plugin

The plugin handles session management automatically:

# OpenClaw plugin config (openclaw.plugin.json)
{
"agents": [
{
"agent_id": "athena",
"token": "rla_athena_...",
"ttl_days": 14 # Sessions expire after 14 days
}
]
}

# The plugin:
# 1. Receives session_key: "relay:portal:task-123"
# 2. Creates/resumes OpenClaw session with that key
# 3. Injects the payload as the user message
# 4. Your agent logic processes it
# 5. Streams tokens back to Relay

Custom Agent Server (without OpenClaw)

Manage sessions manually in your agent server:

import asyncio
import json
from typing import Dict, List

class AgentSessionManager:
def __init__(self):
self.sessions: Dict[str, Dict] = {}

async def process_event(self, event: Dict):
"""Receive and process an event"""
event_id = event["event_id"]
session_key = event["session_key"]
payload = event["payload"]
app_id = event["app_id"]

# Get or create session
if session_key not in self.sessions:
self.sessions[session_key] = {
"created_at": time.time(),
"app": app_id,
"messages": [],
"context": {}
}

session = self.sessions[session_key]

# Extract context from app-specific payload
context = self.extract_context(app_id, payload)
session["context"].update(context)

# Add user message to conversation
session["messages"].append({
"role": "user",
"content": payload.get("message") or json.dumps(payload)
})

return session

def extract_context(self, app_id: str, payload: Dict):
"""Extract app-specific context"""
if app_id == "portal":
return {
"task_id": payload.get("task_id"),
"task_title": payload.get("task_title"),
"project": payload.get("project")
}
elif app_id == "academy":
return {
"lesson_id": payload.get("lesson_id"),
"lesson_title": payload.get("lesson_title")
}
else:
return {}

async def generate_response(self, session_key: str, event_id: str):
"""Generate response for a session"""
session = self.sessions[session_key]

# Build system prompt with context
system_prompt = self.build_system_prompt(session["context"])

# Call your AI model (Claude, GPT, etc.)
response_tokens = await call_ai_model(
messages=session["messages"],
system=system_prompt
)

return response_tokens

Handling Different App Types

Different apps send different payload structures. Extract what you need:

Portal (Task-Based)

def handle_portal_event(payload):
"""Portal sends task context"""
return {
"type": "task",
"task_id": payload["task_id"],
"task_title": payload["task_title"],
"project": payload["project"],
"message": payload["message"],
"sender_name": payload["sender"]["name"]
}

Academy (Lesson-Based)

def handle_academy_event(payload):
"""Academy sends lesson context"""
return {
"type": "lesson",
"lesson_id": payload["lesson_id"],
"lesson_title": payload["lesson_title"],
"subject": payload["subject"],
"question": payload["question"],
"student_name": payload["student_name"]
}

Flow (Job-Based)

def handle_flow_event(payload):
"""Flow sends job context"""
return {
"type": "job",
"job_id": payload["job_id"],
"job_type": payload["type"],
"inputs": payload["inputs"],
"context": payload.get("context", {})
}

Session Continuity

Sessions persist for a configurable TTL (time-to-live). Within the TTL window, Relay maintains conversation history.

Session Lifecycle

Event 1: relay:portal:task-123 arrives
└─ Create OpenClaw session
└─ Agent generates response
└─ Session context saved

[6 hours later]

Event 2: relay:portal:task-123 arrives (same thread_id)
└─ Resume existing session
└─ Agent sees previous conversation
└─ Builds on prior context

[15 days later]

Event 3: relay:portal:task-123 arrives
└─ Session expired (TTL = 14 days)
└─ Create new session
└─ Previous context lost

Configuring TTL

In the OpenClaw plugin config:

{
"agents": [
{
"agent_id": "athena",
"ttl_days": 14 // 14-day memory
},
{
"agent_id": "klyve",
"ttl_days": 30 // 30-day memory for technical workflows
}
]
}

Handling Expired Sessions

When a session expires and a new event arrives, you start fresh. The app can re-inject context:

async def process_event(event):
session_key = event["session_key"]
payload = event["payload"]

# If session is new, payload includes full context
# e.g., payload includes task history, comments, etc.

# If session is resumed, payload might only have
# the new message (app optimizes to send less)

if is_new_session(session_key):
print("New session, full context provided in payload")
else:
print("Resumed session, building on prior context")

Extracting the App ID

Use the app_id from the event to apply app-specific logic:

async def process_event(websocket, event):
event_id = event["event_id"]
session_key = event["session_key"]
app_id = event["app_id"]
payload = event["payload"]

# Route to app-specific handler
if app_id == "portal":
response = await handle_portal_task(payload)
elif app_id == "academy":
response = await handle_academy_question(payload)
elif app_id == "flow":
response = await handle_flow_job(payload)
else:
response = await handle_generic(payload)

# Stream tokens back
for token in response:
await websocket.send(json.dumps({
"type": "token",
"event_id": event_id,
"token": token
}))

# Send final reply
await websocket.send(json.dumps({
"type": "reply",
"event_id": event_id,
"content": full_response,
"done": True
}))

Custom Agent Server Example

import asyncio
import websockets
import json
import time

class SimpleEchoAgent:
"""Minimal agent that echoes the message back"""

async def handle_event(self, websocket, event):
event_id = event["event_id"]
payload = event["payload"]

# Get the user's message
message = payload.get("message", "No message")

# Echo back token by token
tokens = message.split()
for token in tokens:
await websocket.send(json.dumps({
"type": "token",
"event_id": event_id,
"token": token + " "
}))

# Simulate processing delay
await asyncio.sleep(0.1)

# Send final reply
await websocket.send(json.dumps({
"type": "reply",
"event_id": event_id,
"content": f"Echo: {message}",
"done": True,
"metadata": {
"tokens_used": len(tokens),
"model": "echo",
"latency_ms": 100
}
}))

async def run(self, agent_token):
uri = "wss://api.relay.ckgworks.com/v1/ws/agent"

async with websockets.connect(
uri,
extra_headers={"Authorization": f"Bearer {agent_token}"}
) as ws:
print("Connected to Relay")

async for message in ws:
data = json.loads(message)

if data["type"] == "ping":
await ws.send(json.dumps({"type": "pong"}))

elif data["type"] == "event":
await self.handle_event(ws, data)

# Run
agent = SimpleEchoAgent()
asyncio.run(agent.run("rla_athena_..."))

Best Practices

Do

  • Always echo event_id in reply
  • Extract app context from payload
  • Use session_key for continuity
  • Handle expired sessions gracefully
  • Log event processing for debugging

Don't

  • Hardcode app-specific logic without fallback
  • Ignore the session_key
  • Assume payload structure
  • Process events synchronously (too slow)
  • Skip metadata in final reply

Troubleshooting

Events not arriving?

  • Check if your agent is connected (dashboard shows status)
  • Verify your app is allowed to reach your agent (check allowlist)
  • Check Relay logs for delivery errors

"Event_id mismatch" errors?

  • Always echo the event_id from the incoming event
  • Don't generate your own event_id

Sessions not resuming?

  • Check if TTL has expired (default varies by agent)
  • Verify session_key is being used correctly
  • Check OpenClaw logs for session management issues