Back to Blog
MicroservicesArchitecturegRPCMessage Queue

Microservices Communication Patterns

Connect microservices effectively. From REST to gRPC to message queues to service mesh patterns.

B
Bootspring Team
Engineering
June 12, 2023
6 min read

Microservices need to communicate reliably. The choice between synchronous and asynchronous patterns affects performance, reliability, and complexity.

Communication Styles#

Synchronous: - Request/Response - Client waits for response - REST, gRPC - Simpler to understand Asynchronous: - Fire and forget or pub/sub - Decoupled services - Message queues, events - Better resilience

REST API Communication#

1// Service client with retry and circuit breaker 2import axios, { AxiosInstance } from 'axios'; 3import axiosRetry from 'axios-retry'; 4 5class UserServiceClient { 6 private client: AxiosInstance; 7 8 constructor(baseURL: string) { 9 this.client = axios.create({ 10 baseURL, 11 timeout: 5000, 12 headers: { 13 'Content-Type': 'application/json', 14 }, 15 }); 16 17 // Retry configuration 18 axiosRetry(this.client, { 19 retries: 3, 20 retryDelay: axiosRetry.exponentialDelay, 21 retryCondition: (error) => { 22 return axiosRetry.isNetworkOrIdempotentRequestError(error) || 23 error.response?.status === 503; 24 }, 25 }); 26 27 // Request interceptor for auth 28 this.client.interceptors.request.use((config) => { 29 config.headers.Authorization = `Bearer ${getServiceToken()}`; 30 config.headers['X-Request-ID'] = getRequestId(); 31 return config; 32 }); 33 } 34 35 async getUser(id: string): Promise<User> { 36 const { data } = await this.client.get(`/users/${id}`); 37 return data; 38 } 39 40 async createUser(input: CreateUserInput): Promise<User> { 41 const { data } = await this.client.post('/users', input); 42 return data; 43 } 44} 45 46const userService = new UserServiceClient(process.env.USER_SERVICE_URL);

gRPC Communication#

1// user.proto 2syntax = "proto3"; 3 4package user; 5 6service UserService { 7 rpc GetUser(GetUserRequest) returns (User); 8 rpc CreateUser(CreateUserRequest) returns (User); 9 rpc ListUsers(ListUsersRequest) returns (stream User); 10} 11 12message GetUserRequest { 13 string id = 1; 14} 15 16message CreateUserRequest { 17 string email = 1; 18 string name = 2; 19} 20 21message User { 22 string id = 1; 23 string email = 2; 24 string name = 3; 25 string created_at = 4; 26} 27 28message ListUsersRequest { 29 int32 page_size = 1; 30 string page_token = 2; 31}
1// gRPC server 2import * as grpc from '@grpc/grpc-js'; 3import * as protoLoader from '@grpc/proto-loader'; 4 5const packageDefinition = protoLoader.loadSync('user.proto'); 6const userProto = grpc.loadPackageDefinition(packageDefinition) as any; 7 8const server = new grpc.Server(); 9 10server.addService(userProto.user.UserService.service, { 11 getUser: async (call, callback) => { 12 try { 13 const user = await db.user.findUnique({ 14 where: { id: call.request.id }, 15 }); 16 callback(null, user); 17 } catch (error) { 18 callback({ 19 code: grpc.status.INTERNAL, 20 message: error.message, 21 }); 22 } 23 }, 24 25 createUser: async (call, callback) => { 26 try { 27 const user = await db.user.create({ 28 data: call.request, 29 }); 30 callback(null, user); 31 } catch (error) { 32 callback({ 33 code: grpc.status.INTERNAL, 34 message: error.message, 35 }); 36 } 37 }, 38 39 listUsers: async (call) => { 40 const cursor = call.request.page_token || undefined; 41 const users = await db.user.findMany({ 42 take: call.request.page_size, 43 cursor: cursor ? { id: cursor } : undefined, 44 }); 45 46 for (const user of users) { 47 call.write(user); 48 } 49 call.end(); 50 }, 51}); 52 53server.bindAsync( 54 '0.0.0.0:50051', 55 grpc.ServerCredentials.createInsecure(), 56 () => server.start() 57);
1// gRPC client 2const client = new userProto.user.UserService( 3 'localhost:50051', 4 grpc.credentials.createInsecure() 5); 6 7// Promisified call 8function getUser(id: string): Promise<User> { 9 return new Promise((resolve, reject) => { 10 client.getUser({ id }, (error, response) => { 11 if (error) reject(error); 12 else resolve(response); 13 }); 14 }); 15} 16 17// Streaming 18function listUsers(pageSize: number): AsyncIterable<User> { 19 const call = client.listUsers({ page_size: pageSize }); 20 21 return { 22 async *[Symbol.asyncIterator]() { 23 for await (const user of call) { 24 yield user; 25 } 26 }, 27 }; 28}

Message Queue Communication#

1import { Queue, Worker } from 'bullmq'; 2import Redis from 'ioredis'; 3 4const redis = new Redis(process.env.REDIS_URL); 5 6// Publisher 7const orderQueue = new Queue('orders', { connection: redis }); 8 9async function placeOrder(order: Order): Promise<string> { 10 const job = await orderQueue.add('process-order', order, { 11 attempts: 3, 12 backoff: { 13 type: 'exponential', 14 delay: 1000, 15 }, 16 removeOnComplete: 100, 17 removeOnFail: 1000, 18 }); 19 20 return job.id; 21} 22 23// Consumer 24const orderWorker = new Worker( 25 'orders', 26 async (job) => { 27 const order = job.data; 28 29 // Process order 30 await validateOrder(order); 31 await reserveInventory(order.items); 32 await chargePayment(order); 33 await sendConfirmation(order); 34 35 return { processed: true }; 36 }, 37 { 38 connection: redis, 39 concurrency: 5, 40 } 41); 42 43orderWorker.on('completed', (job, result) => { 44 console.log(`Order ${job.id} completed`); 45}); 46 47orderWorker.on('failed', (job, error) => { 48 console.error(`Order ${job?.id} failed:`, error); 49});

Event-Driven Communication#

1// Event bus with Redis pub/sub 2import Redis from 'ioredis'; 3 4class EventBus { 5 private publisher: Redis; 6 private subscriber: Redis; 7 private handlers = new Map<string, ((event: any) => Promise<void>)[]>(); 8 9 constructor(redisUrl: string) { 10 this.publisher = new Redis(redisUrl); 11 this.subscriber = new Redis(redisUrl); 12 13 this.subscriber.on('message', async (channel, message) => { 14 const event = JSON.parse(message); 15 const handlers = this.handlers.get(channel) || []; 16 17 await Promise.all(handlers.map((h) => h(event))); 18 }); 19 } 20 21 async publish(eventType: string, payload: any): Promise<void> { 22 const event = { 23 id: crypto.randomUUID(), 24 type: eventType, 25 payload, 26 timestamp: new Date().toISOString(), 27 }; 28 29 await this.publisher.publish(eventType, JSON.stringify(event)); 30 } 31 32 subscribe(eventType: string, handler: (event: any) => Promise<void>): void { 33 if (!this.handlers.has(eventType)) { 34 this.handlers.set(eventType, []); 35 this.subscriber.subscribe(eventType); 36 } 37 38 this.handlers.get(eventType)!.push(handler); 39 } 40} 41 42const eventBus = new EventBus(process.env.REDIS_URL); 43 44// Publisher (Order Service) 45await eventBus.publish('order.placed', { 46 orderId: order.id, 47 customerId: order.customerId, 48 items: order.items, 49 total: order.total, 50}); 51 52// Subscriber (Inventory Service) 53eventBus.subscribe('order.placed', async (event) => { 54 await reserveInventory(event.payload.items); 55}); 56 57// Subscriber (Notification Service) 58eventBus.subscribe('order.placed', async (event) => { 59 await sendOrderConfirmation(event.payload.customerId, event.payload.orderId); 60});

Service Discovery#

1// Consul-based service discovery 2import Consul from 'consul'; 3 4const consul = new Consul(); 5 6// Register service 7async function registerService(service: ServiceConfig): Promise<void> { 8 await consul.agent.service.register({ 9 id: `${service.name}-${process.env.INSTANCE_ID}`, 10 name: service.name, 11 address: service.host, 12 port: service.port, 13 check: { 14 http: `http://${service.host}:${service.port}/health`, 15 interval: '10s', 16 timeout: '5s', 17 }, 18 }); 19} 20 21// Discover service 22async function getServiceInstances(serviceName: string): Promise<ServiceInstance[]> { 23 const result = await consul.health.service({ 24 service: serviceName, 25 passing: true, 26 }); 27 28 return result.map((entry) => ({ 29 host: entry.Service.Address, 30 port: entry.Service.Port, 31 })); 32} 33 34// Load balancing 35class LoadBalancer { 36 private index = 0; 37 38 roundRobin(instances: ServiceInstance[]): ServiceInstance { 39 const instance = instances[this.index % instances.length]; 40 this.index++; 41 return instance; 42 } 43} 44 45// Usage 46const instances = await getServiceInstances('user-service'); 47const instance = loadBalancer.roundRobin(instances); 48const url = `http://${instance.host}:${instance.port}`;

API Gateway Pattern#

1// Simple API gateway 2import express from 'express'; 3import { createProxyMiddleware } from 'http-proxy-middleware'; 4 5const app = express(); 6 7// Route to services 8const services = { 9 '/api/users': 'http://user-service:3000', 10 '/api/orders': 'http://order-service:3000', 11 '/api/products': 'http://product-service:3000', 12}; 13 14for (const [path, target] of Object.entries(services)) { 15 app.use( 16 path, 17 createProxyMiddleware({ 18 target, 19 changeOrigin: true, 20 pathRewrite: { [`^${path}`]: '' }, 21 onProxyReq: (proxyReq, req) => { 22 // Forward auth headers 23 if (req.headers.authorization) { 24 proxyReq.setHeader('Authorization', req.headers.authorization); 25 } 26 // Add request tracing 27 proxyReq.setHeader('X-Request-ID', req.id); 28 }, 29 }) 30 ); 31} 32 33// Rate limiting 34app.use(rateLimit({ 35 windowMs: 60 * 1000, 36 max: 100, 37})); 38 39// Authentication 40app.use(authenticate);

Saga Pattern#

1// Orchestration-based saga 2class OrderSaga { 3 private steps: SagaStep[] = []; 4 5 addStep( 6 execute: () => Promise<void>, 7 compensate: () => Promise<void> 8 ): this { 9 this.steps.push({ execute, compensate }); 10 return this; 11 } 12 13 async run(): Promise<void> { 14 const executedSteps: SagaStep[] = []; 15 16 try { 17 for (const step of this.steps) { 18 await step.execute(); 19 executedSteps.push(step); 20 } 21 } catch (error) { 22 // Compensate in reverse order 23 for (const step of executedSteps.reverse()) { 24 try { 25 await step.compensate(); 26 } catch (compensateError) { 27 console.error('Compensation failed:', compensateError); 28 } 29 } 30 throw error; 31 } 32 } 33} 34 35// Usage 36const saga = new OrderSaga() 37 .addStep( 38 () => inventoryService.reserve(orderId), 39 () => inventoryService.release(orderId) 40 ) 41 .addStep( 42 () => paymentService.charge(orderId), 43 () => paymentService.refund(orderId) 44 ) 45 .addStep( 46 () => orderService.confirm(orderId), 47 () => orderService.cancel(orderId) 48 ); 49 50await saga.run();

Best Practices#

Synchronous: ✓ Use for queries and real-time needs ✓ Implement timeouts ✓ Add circuit breakers ✓ Handle partial failures Asynchronous: ✓ Use for background processing ✓ Ensure idempotency ✓ Implement dead letter queues ✓ Monitor queue depths General: ✓ Use correlation IDs ✓ Version your APIs ✓ Document contracts ✓ Test failure scenarios

Conclusion#

Choose communication patterns based on requirements. Use synchronous for real-time needs, asynchronous for resilience. Implement proper error handling, retries, and monitoring regardless of pattern.

Share this article

Help spread the word about Bootspring