Skip to main content

Code Examples

Here are complete, production-ready examples of integrating with Relay in different languages and frameworks.

Python (asyncio + websockets)

import asyncio
import websockets
import json
import time
from typing import Optional

class RelayClient:
def __init__(self, app_token: str):
self.app_token = app_token
self.uri = "wss://api.relay.ckgworks.com/v1/ws/app"
self.websocket = None
self.agents = []
self.pending_events = {}

async def connect(self) -> bool:
"""Connect to Relay with retry logic"""
retry_count = 0
max_retries = 5

while retry_count < max_retries:
try:
self.websocket = await websockets.connect(
self.uri,
extra_headers={"Authorization": f"Bearer {self.app_token}"}
)
print("Connected to Relay")
return True

except Exception as e:
retry_count += 1
delay = min(2 ** retry_count, 60)
print(f"Connection failed: {e}. Retrying in {delay}s...")
await asyncio.sleep(delay)

return False

async def discover_agents(self):
"""Get list of available agents"""
await self.websocket.send(json.dumps({"type": "discover"}))
response = await self.websocket.recv()
data = json.loads(response)

if data["type"] == "agents":
self.agents = data["agents"]
print(f"Discovered {len(self.agents)} agents")
return self.agents

return []

async def send_event(
self,
agent_id: str,
thread_id: str,
payload: dict
) -> Optional[str]:
"""Send an event and return event_id on acceptance"""
event = {
"type": "event",
"agent_id": agent_id,
"thread_id": thread_id,
"payload": payload
}

await self.websocket.send(json.dumps(event))
print(f"Sent event to {agent_id} for {thread_id}")

# Wait for acceptance
response = await self.websocket.recv()
data = json.loads(response)

if data["type"] == "accepted":
event_id = data["event_id"]
self.pending_events[event_id] = {
"agent_id": agent_id,
"thread_id": thread_id,
"tokens": [],
"started_at": time.time()
}
return event_id
else:
print(f"Event rejected: {data}")
return None

async def receive_messages(self):
"""Listen for incoming messages (tokens, replies, errors)"""
try:
async for message in self.websocket:
data = json.loads(message)

if data["type"] == "ping":
await self.websocket.send(json.dumps({"type": "pong"}))

elif data["type"] == "token":
event_id = data["event_id"]
token = data["token"]
if event_id in self.pending_events:
self.pending_events[event_id]["tokens"].append(token)
print(token, end="", flush=True)

elif data["type"] == "reply":
event_id = data["event_id"]
if event_id in self.pending_events:
event = self.pending_events.pop(event_id)
print()
print(f"\nReply complete!")
print(f"Full text: {data['reply']}")
print(f"Tokens: {data['metadata']['tokens_used']}")
print(f"Latency: {data['metadata']['latency_ms']}ms")

elif data["type"] == "error":
event_id = data.get("event_id")
print(f"Error: {data['error']} ({data['code']})")
if event_id and event_id in self.pending_events:
self.pending_events.pop(event_id)

except Exception as e:
print(f"Receive loop error: {e}")

async def heartbeat(self):
"""Send periodic heartbeats"""
while True:
try:
await asyncio.sleep(30)
await self.websocket.send(json.dumps({"type": "ping"}))
except Exception as e:
print(f"Heartbeat error: {e}")
break

async def run(self):
"""Main loop"""
if not await self.connect():
return

# Start heartbeat task
heartbeat_task = asyncio.create_task(self.heartbeat())

# Start receive task
receive_task = asyncio.create_task(self.receive_messages())

# Discover agents
await self.discover_agents()

# Send example event
event_id = await self.send_event(
agent_id="athena",
thread_id="task-123",
payload={
"event": "comment.mention",
"message": "@athena summarize this task",
"task_id": "task-123",
"task_title": "Q2 Roadmap Review"
}
)

# Wait a bit for reply to complete
await asyncio.sleep(10)

# Clean up
heartbeat_task.cancel()
receive_task.cancel()
await self.websocket.close()

# Usage
async def main():
client = RelayClient("rlk_yourapp_x8k2m9p...")
await client.run()

if __name__ == "__main__":
asyncio.run(main())

Node.js (ws library)

const WebSocket = require('ws');

class RelayClient {
constructor(appToken) {
this.appToken = appToken;
this.uri = 'wss://api.relay.ckgworks.com/v1/ws/app';
this.ws = null;
this.agents = [];
this.pendingEvents = {};
}

async connect() {
return new Promise((resolve, reject) => {
try {
this.ws = new WebSocket(this.uri, {
headers: {
'Authorization': `Bearer ${this.appToken}`
}
});

this.ws.on('open', () => {
console.log('Connected to Relay');
resolve(true);
});

this.ws.on('close', () => {
console.log('Disconnected from Relay');
this.reconnectWithBackoff();
});

this.ws.on('error', (error) => {
console.error('WebSocket error:', error);
reject(error);
});

this.setupMessageHandler();
} catch (error) {
reject(error);
}
});
}

setupMessageHandler() {
this.ws.on('message', (message) => {
const data = JSON.parse(message);

if (data.type === 'ping') {
this.ws.send(JSON.stringify({ type: 'pong' }));
} else if (data.type === 'agents') {
this.agents = data.agents;
console.log(`Discovered ${data.agents.length} agents`);
} else if (data.type === 'token') {
const eventId = data.event_id;
if (this.pendingEvents[eventId]) {
this.pendingEvents[eventId].tokens.push(data.token);
process.stdout.write(data.token);
}
} else if (data.type === 'reply') {
const eventId = data.event_id;
if (this.pendingEvents[eventId]) {
delete this.pendingEvents[eventId];
console.log('\nReply complete!');
console.log(`Full text: ${data.reply}`);
console.log(`Tokens: ${data.metadata.tokens_used}`);
console.log(`Latency: ${data.metadata.latency_ms}ms`);
}
} else if (data.type === 'error') {
console.error(`Error: ${data.error} (${data.code})`);
const eventId = data.event_id;
if (eventId && this.pendingEvents[eventId]) {
delete this.pendingEvents[eventId];
}
}
});
}

discover() {
return new Promise((resolve) => {
const handler = (message) => {
const data = JSON.parse(message);
if (data.type === 'agents') {
this.ws.removeEventListener('message', handler);
resolve(data.agents);
}
};

this.ws.on('message', handler);
this.ws.send(JSON.stringify({ type: 'discover' }));
});
}

sendEvent(agentId, threadId, payload) {
return new Promise((resolve) => {
const event = {
type: 'event',
agent_id: agentId,
thread_id: threadId,
payload: payload
};

const handler = (message) => {
const data = JSON.parse(message);
if (data.event_id === eventId && data.type === 'accepted') {
this.ws.removeEventListener('message', handler);
this.pendingEvents[eventId] = {
agentId: agentId,
threadId: threadId,
tokens: []
};
resolve(eventId);
}
};

this.ws.on('message', handler);
this.ws.send(JSON.stringify(event));
console.log(`Sent event to ${agentId}`);

// Generate event_id locally (Relay will echo it back)
const eventId = `evt_${Math.random().toString(36).substr(2, 5)}`;
});
}

startHeartbeat() {
setInterval(() => {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify({ type: 'ping' }));
}
}, 30000);
}

reconnectWithBackoff(retryCount = 0, maxRetries = 5) {
if (retryCount >= maxRetries) {
console.error('Max retries exceeded');
return;
}

const delay = Math.min(1000 * Math.pow(2, retryCount), 60000);
console.log(`Reconnecting in ${delay}ms...`);

setTimeout(async () => {
try {
await this.connect();
} catch (error) {
this.reconnectWithBackoff(retryCount + 1, maxRetries);
}
}, delay);
}

async run() {
try {
await this.connect();
this.startHeartbeat();

const agents = await this.discover();
console.log(`Available agents: ${agents.map((a) => a.name).join(', ')}`);

const eventId = await this.sendEvent(
'athena',
'task-123',
{
event: 'comment.mention',
message: '@athena summarize this task',
task_id: 'task-123',
task_title: 'Q2 Roadmap Review'
}
);

console.log(`Event ${eventId} sent`);

// Keep connection open
await new Promise(() => {});
} catch (error) {
console.error('Error:', error);
}
}
}

// Usage
const client = new RelayClient('rlk_yourapp_x8k2m9p...');
client.run();

Next.js (Server-Side WebSocket Client)

// lib/relay-client.ts
import WebSocket from 'ws';

class RelayServerClient {
private token: string;
private ws: WebSocket | null = null;
private pendingEvents: Map<string, any> = new Map();
private messageQueue: any[] = [];

constructor(token: string) {
this.token = token;
}

async connect(): Promise<void> {
return new Promise((resolve, reject) => {
try {
this.ws = new WebSocket('wss://api.relay.ckgworks.com/v1/ws/app', {
headers: { Authorization: `Bearer ${this.token}` }
});

this.ws.on('open', () => {
console.log('[Relay] Connected');
this.flushMessageQueue();
resolve();
});

this.ws.on('message', (message) => {
this.handleMessage(JSON.parse(message));
});

this.ws.on('error', (error) => {
console.error('[Relay] Error:', error);
reject(error);
});
} catch (error) {
reject(error);
}
});
}

private handleMessage(data: any) {
if (data.type === 'ping') {
this.send({ type: 'pong' });
}
}

private send(message: any) {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(message));
} else {
this.messageQueue.push(message);
}
}

private flushMessageQueue() {
while (this.messageQueue.length > 0) {
const message = this.messageQueue.shift();
this.ws!.send(JSON.stringify(message));
}
}

async sendEvent(
agentId: string,
threadId: string,
payload: any
): Promise<any> {
return new Promise((resolve) => {
const event = {
type: 'event',
agent_id: agentId,
thread_id: threadId,
payload
};

this.send(event);

// Listen for acceptance
const originalHandler = this.ws?.on.bind(this.ws);
const handler = (message: any) => {
const data = JSON.parse(message);
if (data.type === 'accepted') {
originalHandler('message', handler);
resolve(data.event_id);
}
};

this.ws?.on('message', handler);
});
}

close() {
this.ws?.close();
}
}

// pages/api/ai-comment.ts
import { NextApiRequest, NextApiResponse } from 'next';

const relayClient = new RelayServerClient(process.env.RELAY_APP_TOKEN!);
let clientConnected = false;

export default async function handler(
req: NextApiRequest,
res: NextApiResponse
) {
if (req.method !== 'POST') {
return res.status(405).json({ error: 'Method not allowed' });
}

try {
// Ensure connection
if (!clientConnected) {
await relayClient.connect();
clientConnected = true;
}

const { agentId, threadId, payload } = req.body;

const eventId = await relayClient.sendEvent(agentId, threadId, payload);

return res.status(200).json({
success: true,
eventId
});
} catch (error) {
console.error('Error:', error);
return res.status(500).json({ error: 'Internal server error' });
}
}

Go (gorilla/websocket)

package main

import (
"encoding/json"
"fmt"
"log"
"net/url"
"time"

"github.com/gorilla/websocket"
)

type RelayClient struct {
token string
ws *websocket.Conn
agents []map[string]interface{}
}

type Event struct {
Type string `json:"type"`
AgentID string `json:"agent_id"`
ThreadID string `json:"thread_id"`
Payload interface{} `json:"payload"`
}

type AcceptedMessage struct {
Type string `json:"type"`
EventID string `json:"event_id"`
AgentID string `json:"agent_id"`
SessionKey string `json:"session_key"`
Status string `json:"status"`
}

type TokenMessage struct {
Type string `json:"type"`
EventID string `json:"event_id"`
AgentID string `json:"agent_id"`
Token string `json:"token"`
}

func NewRelayClient(token string) *RelayClient {
return &RelayClient{token: token}
}

func (c *RelayClient) Connect() error {
uri := url.URL{
Scheme: "wss",
Host: "api.relay.ckgworks.com",
Path: "/v1/ws/app",
}

header := make(map[string][]string)
header["Authorization"] = []string{fmt.Sprintf("Bearer %s", c.token)}

ws, _, err := websocket.DefaultDialer.Dial(uri.String(), header)
if err != nil {
return err
}

c.ws = ws
log.Println("Connected to Relay")

// Start receiving messages
go c.receiveMessages()
go c.heartbeat()

return nil
}

func (c *RelayClient) SendEvent(agentID, threadID string, payload interface{}) error {
event := Event{
Type: "event",
AgentID: agentID,
ThreadID: threadID,
Payload: payload,
}

return c.ws.WriteJSON(event)
}

func (c *RelayClient) receiveMessages() {
defer c.ws.Close()

for {
var msg map[string]interface{}
err := c.ws.ReadJSON(&msg)
if err != nil {
log.Printf("Read error: %v", err)
return
}

msgType := msg["type"].(string)

switch msgType {
case "accepted":
log.Printf("Event accepted: %v\n", msg["event_id"])
case "token":
token := msg["token"].(string)
fmt.Print(token)
case "reply":
fmt.Println()
log.Printf("Reply: %v\n", msg["reply"])
case "error":
log.Printf("Error: %v (%v)\n", msg["error"], msg["code"])
}
}
}

func (c *RelayClient) heartbeat() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()

for range ticker.C {
c.ws.WriteJSON(map[string]string{"type": "ping"})
}
}

func main() {
client := NewRelayClient("rlk_yourapp_x8k2m9p...")

if err := client.Connect(); err != nil {
log.Fatal(err)
}

payload := map[string]interface{}{
"event": "comment.mention",
"message": "@athena summarize this task",
"task_id": "task-123",
"task_title": "Q2 Roadmap Review",
}

if err := client.SendEvent("athena", "task-123", payload); err != nil {
log.Fatal(err)
}

// Keep running
select {}
}

Common Patterns

Handling Concurrent Requests

Use a map to track multiple pending events:

pending_events = {} # event_id -> event metadata

async def send_multiple_events(ws, events):
for agent_id, thread_id, payload in events:
event_id = await send_event(ws, agent_id, thread_id, payload)
pending_events[event_id] = {"agent_id": agent_id, "tokens": []}

async def process_all_replies(ws):
async for message in ws:
data = json.loads(message)
event_id = data.get("event_id")

if event_id not in pending_events:
continue

if data["type"] == "token":
pending_events[event_id]["tokens"].append(data["token"])
elif data["type"] == "reply":
# All tokens received
pending_events.pop(event_id)

Rate Limiting

Implement a simple queue-based rate limiter:

from collections import deque
import asyncio

class RateLimiter:
def __init__(self, max_events_per_minute=60):
self.max_events = max_events_per_minute
self.min_interval = 60 / max_events_per_minute
self.timestamps = deque()

async def acquire(self):
now = time.time()

# Remove old timestamps
while self.timestamps and self.timestamps[0] < now - 60:
self.timestamps.popleft()

if len(self.timestamps) >= self.max_events:
sleep_time = self.min_interval
await asyncio.sleep(sleep_time)

self.timestamps.append(time.time())

All examples are production-ready and include error handling, reconnection logic, and proper resource cleanup.


Common Mistakes

Before shipping your integration, avoid these pitfalls:

MistakeWhy it's wrongWhat to do instead
Storing rla_ agent tokens in your app's DBYou don't need them — just the agent_id stringStore only agent_id for routing
Building agent registration UI in your appAgents are managed in the Relay DashboardLink to the Dashboard instead
Polling discover in a loopWastes resources, no new info between changesSend once on connect
Generating event_id client-sideRelay generates it and returns it in acceptedUse the event_id from the accepted response
Storing connection status in your DBIt's ephemeral and changes constantlyQuery discover on connect for live status
Trying to rotate tokens from your appToken rotation is a Dashboard-only actionRedirect admins to the Dashboard

See the App Integration Guide for a full breakdown of ownership boundaries.