Custom Agent Server
If you don't use OpenClaw or want to build a custom agent integration, you can build your own agent server. This page shows what's required.
Minimal Requirements
A custom agent server needs:
- WebSocket client to connect to Relay
- Message handler for incoming events
- Session management for conversation continuity
- Response streaming for tokens and final replies
That's it. You don't need OpenClaw — just a WebSocket client and message loop.
Minimal Architecture
┌─────────────────────┐
│ Your AI Service │
│ (Claude, GPT, etc) │
└──────────┬──────────┘
│
┌──────────▼──────────────┐
│ Custom Agent Server │
│ - WebSocket client │
│ - Session storage │
│ - Response streaming │
└──────────┬──────────────┘
│
┌──────────▼──────────────┐
│ Relay (WebSocket) │
│ wss://api.relay... │
└────────────────────────┘
Node.js Implementation
Here's a complete minimal agent server in Node.js:
const WebSocket = require('ws');
const { v4: uuidv4 } = require('uuid');
class CustomAgentServer {
constructor(agentToken) {
this.agentToken = agentToken;
this.uri = 'wss://api.relay.ckgworks.com/v1/ws/agent';
this.ws = null;
this.sessions = new Map(); // session_key -> conversation history
}
async connect() {
return new Promise((resolve, reject) => {
this.ws = new WebSocket(this.uri, {
headers: {
'Authorization': `Bearer ${this.agentToken}`
}
});
this.ws.on('open', () => {
console.log('Connected to Relay');
resolve();
});
this.ws.on('message', (data) => {
this.handleMessage(JSON.parse(data));
});
this.ws.on('close', () => {
console.log('Disconnected, reconnecting...');
setTimeout(() => this.connectWithRetry(), 1000);
});
this.ws.on('error', (error) => {
console.error('Connection error:', error);
reject(error);
});
});
}
handleMessage(message) {
if (message.type === 'ping') {
this.send({ type: 'pong' });
} else if (message.type === 'event') {
this.processEvent(message);
}
}
async processEvent(event) {
const eventId = event.event_id;
const sessionKey = event.session_key;
const payload = event.payload;
const appId = event.app_id;
try {
// Get or create session
if (!this.sessions.has(sessionKey)) {
this.sessions.set(sessionKey, {
messages: [],
createdAt: Date.now(),
app: appId
});
}
const session = this.sessions.get(sessionKey);
// Extract message from payload
const userMessage = payload.message || JSON.stringify(payload);
// Add to conversation history
session.messages.push({
role: 'user',
content: userMessage
});
// Generate response (replace with your AI)
const responseTokens = await this.generateResponse(userMessage, appId);
// Stream tokens
for (const token of responseTokens) {
this.send({
type: 'token',
event_id: eventId,
token: token
});
}
// Send final reply
const fullResponse = responseTokens.join('');
this.send({
type: 'reply',
event_id: eventId,
content: fullResponse,
done: true,
metadata: {
tokens_used: responseTokens.length,
model: 'custom-agent',
latency_ms: 500
}
});
// Add to conversation history
session.messages.push({
role: 'assistant',
content: fullResponse
});
} catch (error) {
console.error('Error processing event:', error);
this.send({
type: 'error',
event_id: eventId,
error: error.message,
code: 'AGENT_PROCESSING_ERROR'
});
}
}
async generateResponse(userMessage, appId) {
// Replace this with your actual AI logic
// For demo, just echo the message back in tokens
const words = userMessage.split(/\s+/);
return words.map(word => word + ' ');
}
send(message) {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(message));
}
}
async connectWithRetry(retryCount = 0, maxRetries = 5) {
if (retryCount >= maxRetries) {
console.error('Max retries exceeded');
return;
}
const delay = Math.min(1000 * Math.pow(2, retryCount), 60000);
console.log(`Reconnecting in ${delay}ms...`);
setTimeout(() => {
this.connect().catch(() => {
this.connectWithRetry(retryCount + 1, maxRetries);
});
}, delay);
}
startHeartbeat() {
setInterval(() => {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this.send({ type: 'ping' });
}
}, 30 * 1000);
}
cleanupOldSessions() {
const now = Date.now();
const ttlMs = 14 * 24 * 60 * 60 * 1000; // 14 days
for (const [key, session] of this.sessions.entries()) {
if (now - session.createdAt > ttlMs) {
this.sessions.delete(key);
}
}
}
async run() {
await this.connect();
this.startHeartbeat();
// Cleanup old sessions periodically
setInterval(() => this.cleanupOldSessions(), 60 * 60 * 1000);
console.log('Agent server running');
// Keep the server running
await new Promise(() => {});
}
}
// Start the agent
const agent = new CustomAgentServer('rla_athena_...');
agent.run().catch(console.error);
Python Implementation
import asyncio
import websockets
import json
import time
from typing import Dict, List
from datetime import datetime
class CustomAgentServer:
def __init__(self, agent_token: str):
self.agent_token = agent_token
self.uri = "wss://api.relay.ckgworks.com/v1/ws/agent"
self.websocket = None
self.sessions: Dict[str, Dict] = {}
async def connect(self):
"""Connect to Relay"""
headers = {"Authorization": f"Bearer {self.agent_token}"}
self.websocket = await websockets.connect(
self.uri,
extra_headers=headers
)
print("Connected to Relay")
# Start message handler
await self.handle_messages()
async def handle_messages(self):
"""Listen for incoming messages"""
try:
async for message in self.websocket:
data = json.loads(message)
if data["type"] == "ping":
await self.send({"type": "pong"})
elif data["type"] == "event":
await self.process_event(data)
except Exception as e:
print(f"Message handler error: {e}")
await self.reconnect()
async def process_event(self, event: Dict):
"""Process an incoming event"""
event_id = event["event_id"]
session_key = event["session_key"]
payload = event["payload"]
app_id = event["app_id"]
try:
# Get or create session
if session_key not in self.sessions:
self.sessions[session_key] = {
"messages": [],
"created_at": time.time(),
"app": app_id
}
session = self.sessions[session_key]
# Extract user message
user_message = payload.get("message") or json.dumps(payload)
# Add to history
session["messages"].append({
"role": "user",
"content": user_message
})
# Generate response
response_tokens = await self.generate_response(user_message, app_id)
# Stream tokens
for token in response_tokens:
await self.send({
"type": "token",
"event_id": event_id,
"token": token
})
# Yield to event loop
await asyncio.sleep(0)
# Send final reply
full_response = "".join(response_tokens)
await self.send({
"type": "reply",
"event_id": event_id,
"content": full_response,
"done": True,
"metadata": {
"tokens_used": len(response_tokens),
"model": "custom-agent",
"latency_ms": 500
}
})
# Add to history
session["messages"].append({
"role": "assistant",
"content": full_response
})
except Exception as e:
print(f"Error processing event: {e}")
await self.send({
"type": "error",
"event_id": event_id,
"error": str(e),
"code": "AGENT_PROCESSING_ERROR"
})
async def generate_response(self, user_message: str, app_id: str):
"""
Generate response tokens.
Replace this with your actual AI logic.
"""
# Demo: echo the message back in tokens
words = user_message.split()
for word in words:
yield word + " "
async def send(self, message: Dict):
"""Send a message to Relay"""
if self.websocket:
await self.websocket.send(json.dumps(message))
async def heartbeat(self):
"""Send periodic heartbeats"""
while True:
try:
await asyncio.sleep(30)
await self.send({"type": "ping"})
except Exception as e:
print(f"Heartbeat error: {e}")
break
async def cleanup_sessions(self):
"""Remove expired sessions"""
while True:
try:
await asyncio.sleep(3600) # Check hourly
now = time.time()
ttl_seconds = 14 * 24 * 60 * 60 # 14 days
to_delete = [
key for key, session in self.sessions.items()
if now - session["created_at"] > ttl_seconds
]
for key in to_delete:
del self.sessions[key]
if to_delete:
print(f"Cleaned up {len(to_delete)} expired sessions")
except Exception as e:
print(f"Cleanup error: {e}")
async def reconnect(self):
"""Reconnect with exponential backoff"""
retry_count = 0
max_retries = 5
while retry_count < max_retries:
try:
delay = min(2 ** retry_count, 60)
print(f"Reconnecting in {delay}s...")
await asyncio.sleep(delay)
await self.connect()
return
except Exception as e:
print(f"Reconnection failed: {e}")
retry_count += 1
print("Max retries exceeded")
async def run(self):
"""Main run loop"""
# Start background tasks
heartbeat_task = asyncio.create_task(self.heartbeat())
cleanup_task = asyncio.create_task(self.cleanup_sessions())
try:
# Connect and run
await self.connect()
except Exception as e:
print(f"Fatal error: {e}")
finally:
heartbeat_task.cancel()
cleanup_task.cancel()
# Run the agent
if __name__ == "__main__":
agent = CustomAgentServer("rla_athena_...")
asyncio.run(agent.run())
Connecting to Claude API
Replace the generate_response method with actual Claude API calls:
import anthropic
async def generate_response(self, user_message: str, app_id: str):
"""Generate response using Claude API"""
client = anthropic.Anthropic(api_key="your-api-key")
system_prompt = self.get_system_prompt(app_id)
with client.messages.stream(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
system=system_prompt,
messages=[{"role": "user", "content": user_message}]
) as stream:
for text in stream.text_stream:
yield text
def get_system_prompt(self, app_id: str) -> str:
"""App-specific system prompts"""
if app_id == "portal":
return "You are a helpful assistant for Portal task management."
elif app_id == "academy":
return "You are a helpful tutor for Academy."
else:
return "You are a helpful assistant."
Key Design Decisions
Session Storage
Sessions are stored in-memory. For production, use a database:
# Example: PostgreSQL
from sqlalchemy import create_engine, Column, String, Text, DateTime
from datetime import datetime
class Session(Base):
__tablename__ = "agent_sessions"
session_key = Column(String, primary_key=True)
app_id = Column(String)
messages = Column(Text) # JSON serialized
created_at = Column(DateTime, default=datetime.utcnow)
# In process_event:
session = db.query(Session).filter_by(session_key=session_key).first()
if not session:
session = Session(session_key=session_key, app_id=app_id, messages="[]")
db.add(session)
Error Handling
Always handle errors gracefully:
try:
# Process
except TimeoutError:
await self.send({
"type": "error",
"event_id": event_id,
"error": "Processing timeout",
"code": "AGENT_TIMEOUT"
})
except Exception as e:
await self.send({
"type": "error",
"event_id": event_id,
"error": str(e),
"code": "AGENT_PROCESSING_ERROR"
})
Logging
Log everything for debugging:
import logging
logger = logging.getLogger(__name__)
logger.info(f"Event {event_id} from {app_id}")
logger.debug(f"Session: {session_key}")
logger.error(f"Error processing event: {error}")
Best Practices
Do
- Implement proper reconnection logic
- Send heartbeats every 30 seconds
- Handle all message types
- Log events and errors
- Clean up expired sessions
Don't
- Hardcode agent tokens
- Block on I/O (use async/await)
- Skip error handling
- Ignore disconnections
- Keep sessions forever
Production Checklist
- Token stored in environment variable
- Logging configured (file, console, remote)
- Session storage (database, cache)
- Error handling comprehensive
- Health check endpoint
- Monitoring/alerting
- Graceful shutdown
- Rate limiting implemented
- Tests written
- Documentation updated
For most use cases, the OpenClaw plugin is simpler and recommended. Use a custom server only if you have specific requirements.