Back to Blog
WebSocketReal-timeNode.jsSocket.io

WebSocket Implementation Guide

Build real-time features with WebSockets. From connection handling to scaling to fallback strategies.

B
Bootspring Team
Engineering
May 5, 2023
7 min read

WebSockets enable bidirectional, real-time communication between clients and servers. Here's how to implement them effectively.

Basic WebSocket Server#

1import { WebSocketServer, WebSocket } from 'ws'; 2import http from 'http'; 3 4const server = http.createServer(); 5const wss = new WebSocketServer({ server }); 6 7// Connection handling 8wss.on('connection', (ws: WebSocket, req) => { 9 const clientId = crypto.randomUUID(); 10 console.log(`Client connected: ${clientId}`); 11 12 // Handle incoming messages 13 ws.on('message', (data) => { 14 try { 15 const message = JSON.parse(data.toString()); 16 handleMessage(ws, clientId, message); 17 } catch (error) { 18 ws.send(JSON.stringify({ type: 'error', message: 'Invalid JSON' })); 19 } 20 }); 21 22 // Handle disconnection 23 ws.on('close', () => { 24 console.log(`Client disconnected: ${clientId}`); 25 cleanupClient(clientId); 26 }); 27 28 // Handle errors 29 ws.on('error', (error) => { 30 console.error(`WebSocket error for ${clientId}:`, error); 31 }); 32 33 // Send welcome message 34 ws.send(JSON.stringify({ 35 type: 'connected', 36 clientId, 37 timestamp: new Date().toISOString(), 38 })); 39}); 40 41function handleMessage(ws: WebSocket, clientId: string, message: any) { 42 switch (message.type) { 43 case 'ping': 44 ws.send(JSON.stringify({ type: 'pong' })); 45 break; 46 case 'subscribe': 47 subscribeToChannel(clientId, message.channel); 48 break; 49 case 'unsubscribe': 50 unsubscribeFromChannel(clientId, message.channel); 51 break; 52 default: 53 ws.send(JSON.stringify({ type: 'error', message: 'Unknown message type' })); 54 } 55} 56 57server.listen(3000);

Socket.io Implementation#

1import { Server } from 'socket.io'; 2import http from 'http'; 3 4const httpServer = http.createServer(); 5const io = new Server(httpServer, { 6 cors: { 7 origin: process.env.ALLOWED_ORIGINS?.split(','), 8 credentials: true, 9 }, 10 pingTimeout: 60000, 11 pingInterval: 25000, 12}); 13 14// Authentication middleware 15io.use(async (socket, next) => { 16 const token = socket.handshake.auth.token; 17 18 try { 19 const user = await verifyToken(token); 20 socket.data.user = user; 21 next(); 22 } catch { 23 next(new Error('Authentication failed')); 24 } 25}); 26 27// Connection handling 28io.on('connection', (socket) => { 29 const user = socket.data.user; 30 console.log(`User connected: ${user.id}`); 31 32 // Join user's personal room 33 socket.join(`user:${user.id}`); 34 35 // Join organization room 36 if (user.organizationId) { 37 socket.join(`org:${user.organizationId}`); 38 } 39 40 // Handle room subscription 41 socket.on('join', (room: string) => { 42 if (canJoinRoom(user, room)) { 43 socket.join(room); 44 socket.emit('joined', { room }); 45 } else { 46 socket.emit('error', { message: 'Cannot join room' }); 47 } 48 }); 49 50 // Handle messages 51 socket.on('message', async (data) => { 52 try { 53 const message = await saveMessage(user.id, data); 54 io.to(data.room).emit('message', message); 55 } catch (error) { 56 socket.emit('error', { message: 'Failed to send message' }); 57 } 58 }); 59 60 // Handle typing indicators 61 socket.on('typing:start', (room) => { 62 socket.to(room).emit('typing', { userId: user.id, typing: true }); 63 }); 64 65 socket.on('typing:stop', (room) => { 66 socket.to(room).emit('typing', { userId: user.id, typing: false }); 67 }); 68 69 // Cleanup on disconnect 70 socket.on('disconnect', () => { 71 console.log(`User disconnected: ${user.id}`); 72 }); 73}); 74 75httpServer.listen(3000);

Client Implementation#

1// Socket.io client 2import { io, Socket } from 'socket.io-client'; 3 4class WebSocketClient { 5 private socket: Socket | null = null; 6 private reconnectAttempts = 0; 7 private maxReconnectAttempts = 5; 8 9 connect(token: string): void { 10 this.socket = io(process.env.WS_URL!, { 11 auth: { token }, 12 reconnection: true, 13 reconnectionAttempts: this.maxReconnectAttempts, 14 reconnectionDelay: 1000, 15 reconnectionDelayMax: 5000, 16 }); 17 18 this.socket.on('connect', () => { 19 console.log('Connected to WebSocket'); 20 this.reconnectAttempts = 0; 21 }); 22 23 this.socket.on('disconnect', (reason) => { 24 console.log('Disconnected:', reason); 25 }); 26 27 this.socket.on('connect_error', (error) => { 28 console.error('Connection error:', error); 29 this.reconnectAttempts++; 30 }); 31 } 32 33 join(room: string): void { 34 this.socket?.emit('join', room); 35 } 36 37 sendMessage(room: string, content: string): void { 38 this.socket?.emit('message', { room, content }); 39 } 40 41 onMessage(callback: (message: any) => void): void { 42 this.socket?.on('message', callback); 43 } 44 45 disconnect(): void { 46 this.socket?.disconnect(); 47 this.socket = null; 48 } 49} 50 51// React hook 52function useWebSocket(token: string) { 53 const [connected, setConnected] = useState(false); 54 const [messages, setMessages] = useState<Message[]>([]); 55 const clientRef = useRef<WebSocketClient>(); 56 57 useEffect(() => { 58 const client = new WebSocketClient(); 59 client.connect(token); 60 clientRef.current = client; 61 62 client.socket?.on('connect', () => setConnected(true)); 63 client.socket?.on('disconnect', () => setConnected(false)); 64 65 client.onMessage((message) => { 66 setMessages((prev) => [...prev, message]); 67 }); 68 69 return () => client.disconnect(); 70 }, [token]); 71 72 const sendMessage = useCallback((room: string, content: string) => { 73 clientRef.current?.sendMessage(room, content); 74 }, []); 75 76 return { connected, messages, sendMessage }; 77}

Scaling with Redis#

1import { Server } from 'socket.io'; 2import { createAdapter } from '@socket.io/redis-adapter'; 3import { createClient } from 'redis'; 4 5// Create Redis clients 6const pubClient = createClient({ url: process.env.REDIS_URL }); 7const subClient = pubClient.duplicate(); 8 9await Promise.all([pubClient.connect(), subClient.connect()]); 10 11// Configure Socket.io with Redis adapter 12const io = new Server(httpServer); 13io.adapter(createAdapter(pubClient, subClient)); 14 15// Now events are shared across all instances 16io.to('room1').emit('message', { text: 'Hello' }); // Reaches all clients in room1 17 18// Broadcast to all connected clients across instances 19io.emit('announcement', { text: 'Server maintenance in 5 minutes' }); 20 21// Get connected sockets count across instances 22const sockets = await io.fetchSockets(); 23console.log(`Total connected: ${sockets.length}`);

Presence System#

1import Redis from 'ioredis'; 2 3const redis = new Redis(process.env.REDIS_URL); 4 5class PresenceManager { 6 private readonly TTL = 30; // seconds 7 8 async setOnline(userId: string, socketId: string): Promise<void> { 9 const key = `presence:${userId}`; 10 await redis.hset(key, 'socketId', socketId, 'lastSeen', Date.now().toString()); 11 await redis.expire(key, this.TTL); 12 } 13 14 async setOffline(userId: string): Promise<void> { 15 await redis.del(`presence:${userId}`); 16 } 17 18 async isOnline(userId: string): Promise<boolean> { 19 return (await redis.exists(`presence:${userId}`)) === 1; 20 } 21 22 async getOnlineUsers(userIds: string[]): Promise<string[]> { 23 const pipeline = redis.pipeline(); 24 userIds.forEach((id) => pipeline.exists(`presence:${id}`)); 25 const results = await pipeline.exec(); 26 27 return userIds.filter((_, index) => results?.[index]?.[1] === 1); 28 } 29 30 async heartbeat(userId: string): Promise<void> { 31 const key = `presence:${userId}`; 32 await redis.hset(key, 'lastSeen', Date.now().toString()); 33 await redis.expire(key, this.TTL); 34 } 35} 36 37const presence = new PresenceManager(); 38 39// In Socket.io connection handler 40io.on('connection', async (socket) => { 41 const userId = socket.data.user.id; 42 43 // Mark as online 44 await presence.setOnline(userId, socket.id); 45 46 // Notify friends 47 const friends = await getFriends(userId); 48 friends.forEach((friendId) => { 49 io.to(`user:${friendId}`).emit('presence', { 50 userId, 51 status: 'online', 52 }); 53 }); 54 55 // Heartbeat interval 56 const heartbeatInterval = setInterval(() => { 57 presence.heartbeat(userId); 58 }, 10000); 59 60 socket.on('disconnect', async () => { 61 clearInterval(heartbeatInterval); 62 await presence.setOffline(userId); 63 64 // Notify friends 65 friends.forEach((friendId) => { 66 io.to(`user:${friendId}`).emit('presence', { 67 userId, 68 status: 'offline', 69 }); 70 }); 71 }); 72});

Room-Based Chat#

1interface Room { 2 id: string; 3 name: string; 4 members: Set<string>; 5} 6 7const rooms = new Map<string, Room>(); 8 9io.on('connection', (socket) => { 10 const userId = socket.data.user.id; 11 12 socket.on('room:create', async (name: string, callback) => { 13 const room: Room = { 14 id: crypto.randomUUID(), 15 name, 16 members: new Set([userId]), 17 }; 18 19 rooms.set(room.id, room); 20 socket.join(room.id); 21 22 callback({ success: true, room: { id: room.id, name: room.name } }); 23 }); 24 25 socket.on('room:join', async (roomId: string, callback) => { 26 const room = rooms.get(roomId); 27 28 if (!room) { 29 return callback({ success: false, error: 'Room not found' }); 30 } 31 32 room.members.add(userId); 33 socket.join(roomId); 34 35 // Notify room members 36 socket.to(roomId).emit('room:user_joined', { 37 userId, 38 roomId, 39 }); 40 41 callback({ success: true }); 42 }); 43 44 socket.on('room:message', async (data: { roomId: string; content: string }) => { 45 const room = rooms.get(data.roomId); 46 47 if (!room || !room.members.has(userId)) { 48 return socket.emit('error', { message: 'Not a room member' }); 49 } 50 51 const message = { 52 id: crypto.randomUUID(), 53 roomId: data.roomId, 54 userId, 55 content: data.content, 56 timestamp: new Date().toISOString(), 57 }; 58 59 // Save to database 60 await saveMessage(message); 61 62 // Broadcast to room 63 io.to(data.roomId).emit('room:message', message); 64 }); 65});

Error Handling and Reconnection#

1// Client-side reconnection handling 2const socket = io(WS_URL, { 3 reconnection: true, 4 reconnectionAttempts: 10, 5 reconnectionDelay: 1000, 6 reconnectionDelayMax: 10000, 7 timeout: 20000, 8}); 9 10socket.on('connect', () => { 11 console.log('Connected'); 12 13 // Rejoin rooms after reconnection 14 const rooms = getStoredRooms(); 15 rooms.forEach((room) => socket.emit('join', room)); 16}); 17 18socket.on('disconnect', (reason) => { 19 if (reason === 'io server disconnect') { 20 // Server initiated disconnect, need to manually reconnect 21 socket.connect(); 22 } 23 // Otherwise socket.io will auto-reconnect 24}); 25 26socket.on('reconnect_attempt', (attempt) => { 27 console.log(`Reconnection attempt ${attempt}`); 28}); 29 30socket.on('reconnect_failed', () => { 31 console.error('Failed to reconnect'); 32 showReconnectionDialog(); 33});

Best Practices#

Connection: ✓ Implement authentication ✓ Handle reconnection gracefully ✓ Use heartbeats for presence ✓ Clean up on disconnect Scaling: ✓ Use Redis adapter for multiple instances ✓ Implement sticky sessions or Redis ✓ Monitor connection counts ✓ Set appropriate timeouts Performance: ✓ Batch messages when possible ✓ Use binary data for large payloads ✓ Implement message compression ✓ Rate limit messages per client

Conclusion#

WebSockets enable powerful real-time features. Use Socket.io for easier handling, Redis for scaling, and implement proper presence and reconnection logic. Monitor connection health and plan for graceful degradation when WebSockets aren't available.

Share this article

Help spread the word about Bootspring