Skip to main content

Custom Agent Server

If you don't use OpenClaw or want to build a custom agent integration, you can build your own agent server. This page shows what's required.

Minimal Requirements

A custom agent server needs:

  1. WebSocket client to connect to Relay
  2. Message handler for incoming events
  3. Session management for conversation continuity
  4. Response streaming for tokens and final replies

That's it. You don't need OpenClaw — just a WebSocket client and message loop.

Minimal Architecture

┌─────────────────────┐
│ Your AI Service │
│ (Claude, GPT, etc) │
└──────────┬──────────┘

┌──────────▼──────────────┐
│ Custom Agent Server │
│ - WebSocket client │
│ - Session storage │
│ - Response streaming │
└──────────┬──────────────┘

┌──────────▼──────────────┐
│ Relay (WebSocket) │
│ wss://api.relay... │
└────────────────────────┘

Node.js Implementation

Here's a complete minimal agent server in Node.js:

const WebSocket = require('ws');
const { v4: uuidv4 } = require('uuid');

class CustomAgentServer {
constructor(agentToken) {
this.agentToken = agentToken;
this.uri = 'wss://api.relay.ckgworks.com/v1/ws/agent';
this.ws = null;
this.sessions = new Map(); // session_key -> conversation history
}

async connect() {
return new Promise((resolve, reject) => {
this.ws = new WebSocket(this.uri, {
headers: {
'Authorization': `Bearer ${this.agentToken}`
}
});

this.ws.on('open', () => {
console.log('Connected to Relay');
resolve();
});

this.ws.on('message', (data) => {
this.handleMessage(JSON.parse(data));
});

this.ws.on('close', () => {
console.log('Disconnected, reconnecting...');
setTimeout(() => this.connectWithRetry(), 1000);
});

this.ws.on('error', (error) => {
console.error('Connection error:', error);
reject(error);
});
});
}

handleMessage(message) {
if (message.type === 'ping') {
this.send({ type: 'pong' });

} else if (message.type === 'event') {
this.processEvent(message);
}
}

async processEvent(event) {
const eventId = event.event_id;
const sessionKey = event.session_key;
const payload = event.payload;
const appId = event.app_id;

try {
// Get or create session
if (!this.sessions.has(sessionKey)) {
this.sessions.set(sessionKey, {
messages: [],
createdAt: Date.now(),
app: appId
});
}

const session = this.sessions.get(sessionKey);

// Extract message from payload
const userMessage = payload.message || JSON.stringify(payload);

// Add to conversation history
session.messages.push({
role: 'user',
content: userMessage
});

// Generate response (replace with your AI)
const responseTokens = await this.generateResponse(userMessage, appId);

// Stream tokens
for (const token of responseTokens) {
this.send({
type: 'token',
event_id: eventId,
token: token
});
}

// Send final reply
const fullResponse = responseTokens.join('');

this.send({
type: 'reply',
event_id: eventId,
content: fullResponse,
done: true,
metadata: {
tokens_used: responseTokens.length,
model: 'custom-agent',
latency_ms: 500
}
});

// Add to conversation history
session.messages.push({
role: 'assistant',
content: fullResponse
});

} catch (error) {
console.error('Error processing event:', error);
this.send({
type: 'error',
event_id: eventId,
error: error.message,
code: 'AGENT_PROCESSING_ERROR'
});
}
}

async generateResponse(userMessage, appId) {
// Replace this with your actual AI logic
// For demo, just echo the message back in tokens

const words = userMessage.split(/\s+/);
return words.map(word => word + ' ');
}

send(message) {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(message));
}
}

async connectWithRetry(retryCount = 0, maxRetries = 5) {
if (retryCount >= maxRetries) {
console.error('Max retries exceeded');
return;
}

const delay = Math.min(1000 * Math.pow(2, retryCount), 60000);
console.log(`Reconnecting in ${delay}ms...`);

setTimeout(() => {
this.connect().catch(() => {
this.connectWithRetry(retryCount + 1, maxRetries);
});
}, delay);
}

startHeartbeat() {
setInterval(() => {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this.send({ type: 'ping' });
}
}, 30 * 1000);
}

cleanupOldSessions() {
const now = Date.now();
const ttlMs = 14 * 24 * 60 * 60 * 1000; // 14 days

for (const [key, session] of this.sessions.entries()) {
if (now - session.createdAt > ttlMs) {
this.sessions.delete(key);
}
}
}

async run() {
await this.connect();
this.startHeartbeat();

// Cleanup old sessions periodically
setInterval(() => this.cleanupOldSessions(), 60 * 60 * 1000);

console.log('Agent server running');

// Keep the server running
await new Promise(() => {});
}
}

// Start the agent
const agent = new CustomAgentServer('rla_athena_...');
agent.run().catch(console.error);

Python Implementation

import asyncio
import websockets
import json
import time
from typing import Dict, List
from datetime import datetime

class CustomAgentServer:
def __init__(self, agent_token: str):
self.agent_token = agent_token
self.uri = "wss://api.relay.ckgworks.com/v1/ws/agent"
self.websocket = None
self.sessions: Dict[str, Dict] = {}

async def connect(self):
"""Connect to Relay"""
headers = {"Authorization": f"Bearer {self.agent_token}"}

self.websocket = await websockets.connect(
self.uri,
extra_headers=headers
)
print("Connected to Relay")

# Start message handler
await self.handle_messages()

async def handle_messages(self):
"""Listen for incoming messages"""
try:
async for message in self.websocket:
data = json.loads(message)

if data["type"] == "ping":
await self.send({"type": "pong"})

elif data["type"] == "event":
await self.process_event(data)

except Exception as e:
print(f"Message handler error: {e}")
await self.reconnect()

async def process_event(self, event: Dict):
"""Process an incoming event"""
event_id = event["event_id"]
session_key = event["session_key"]
payload = event["payload"]
app_id = event["app_id"]

try:
# Get or create session
if session_key not in self.sessions:
self.sessions[session_key] = {
"messages": [],
"created_at": time.time(),
"app": app_id
}

session = self.sessions[session_key]

# Extract user message
user_message = payload.get("message") or json.dumps(payload)

# Add to history
session["messages"].append({
"role": "user",
"content": user_message
})

# Generate response
response_tokens = await self.generate_response(user_message, app_id)

# Stream tokens
for token in response_tokens:
await self.send({
"type": "token",
"event_id": event_id,
"token": token
})

# Yield to event loop
await asyncio.sleep(0)

# Send final reply
full_response = "".join(response_tokens)

await self.send({
"type": "reply",
"event_id": event_id,
"content": full_response,
"done": True,
"metadata": {
"tokens_used": len(response_tokens),
"model": "custom-agent",
"latency_ms": 500
}
})

# Add to history
session["messages"].append({
"role": "assistant",
"content": full_response
})

except Exception as e:
print(f"Error processing event: {e}")
await self.send({
"type": "error",
"event_id": event_id,
"error": str(e),
"code": "AGENT_PROCESSING_ERROR"
})

async def generate_response(self, user_message: str, app_id: str):
"""
Generate response tokens.
Replace this with your actual AI logic.
"""
# Demo: echo the message back in tokens
words = user_message.split()
for word in words:
yield word + " "

async def send(self, message: Dict):
"""Send a message to Relay"""
if self.websocket:
await self.websocket.send(json.dumps(message))

async def heartbeat(self):
"""Send periodic heartbeats"""
while True:
try:
await asyncio.sleep(30)
await self.send({"type": "ping"})
except Exception as e:
print(f"Heartbeat error: {e}")
break

async def cleanup_sessions(self):
"""Remove expired sessions"""
while True:
try:
await asyncio.sleep(3600) # Check hourly
now = time.time()
ttl_seconds = 14 * 24 * 60 * 60 # 14 days

to_delete = [
key for key, session in self.sessions.items()
if now - session["created_at"] > ttl_seconds
]

for key in to_delete:
del self.sessions[key]

if to_delete:
print(f"Cleaned up {len(to_delete)} expired sessions")

except Exception as e:
print(f"Cleanup error: {e}")

async def reconnect(self):
"""Reconnect with exponential backoff"""
retry_count = 0
max_retries = 5

while retry_count < max_retries:
try:
delay = min(2 ** retry_count, 60)
print(f"Reconnecting in {delay}s...")
await asyncio.sleep(delay)

await self.connect()
return

except Exception as e:
print(f"Reconnection failed: {e}")
retry_count += 1

print("Max retries exceeded")

async def run(self):
"""Main run loop"""
# Start background tasks
heartbeat_task = asyncio.create_task(self.heartbeat())
cleanup_task = asyncio.create_task(self.cleanup_sessions())

try:
# Connect and run
await self.connect()

except Exception as e:
print(f"Fatal error: {e}")

finally:
heartbeat_task.cancel()
cleanup_task.cancel()

# Run the agent
if __name__ == "__main__":
agent = CustomAgentServer("rla_athena_...")
asyncio.run(agent.run())

Connecting to Claude API

Replace the generate_response method with actual Claude API calls:

import anthropic

async def generate_response(self, user_message: str, app_id: str):
"""Generate response using Claude API"""
client = anthropic.Anthropic(api_key="your-api-key")

system_prompt = self.get_system_prompt(app_id)

with client.messages.stream(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
system=system_prompt,
messages=[{"role": "user", "content": user_message}]
) as stream:
for text in stream.text_stream:
yield text

def get_system_prompt(self, app_id: str) -> str:
"""App-specific system prompts"""
if app_id == "portal":
return "You are a helpful assistant for Portal task management."
elif app_id == "academy":
return "You are a helpful tutor for Academy."
else:
return "You are a helpful assistant."

Key Design Decisions

Session Storage

Sessions are stored in-memory. For production, use a database:

# Example: PostgreSQL
from sqlalchemy import create_engine, Column, String, Text, DateTime
from datetime import datetime

class Session(Base):
__tablename__ = "agent_sessions"
session_key = Column(String, primary_key=True)
app_id = Column(String)
messages = Column(Text) # JSON serialized
created_at = Column(DateTime, default=datetime.utcnow)

# In process_event:
session = db.query(Session).filter_by(session_key=session_key).first()
if not session:
session = Session(session_key=session_key, app_id=app_id, messages="[]")
db.add(session)

Error Handling

Always handle errors gracefully:

try:
# Process
except TimeoutError:
await self.send({
"type": "error",
"event_id": event_id,
"error": "Processing timeout",
"code": "AGENT_TIMEOUT"
})
except Exception as e:
await self.send({
"type": "error",
"event_id": event_id,
"error": str(e),
"code": "AGENT_PROCESSING_ERROR"
})

Logging

Log everything for debugging:

import logging

logger = logging.getLogger(__name__)

logger.info(f"Event {event_id} from {app_id}")
logger.debug(f"Session: {session_key}")
logger.error(f"Error processing event: {error}")

Best Practices

Do

  • Implement proper reconnection logic
  • Send heartbeats every 30 seconds
  • Handle all message types
  • Log events and errors
  • Clean up expired sessions

Don't

  • Hardcode agent tokens
  • Block on I/O (use async/await)
  • Skip error handling
  • Ignore disconnections
  • Keep sessions forever

Production Checklist

  • Token stored in environment variable
  • Logging configured (file, console, remote)
  • Session storage (database, cache)
  • Error handling comprehensive
  • Health check endpoint
  • Monitoring/alerting
  • Graceful shutdown
  • Rate limiting implemented
  • Tests written
  • Documentation updated

For most use cases, the OpenClaw plugin is simpler and recommended. Use a custom server only if you have specific requirements.