Subscriptions enable real-time data pushing from server to clients.
Schema Definition#
1type Subscription {
2 messageAdded(channelId: ID!): Message!
3 userStatusChanged: UserStatus!
4 notificationReceived: Notification!
5}
6
7type Message {
8 id: ID!
9 content: String!
10 author: User!
11 createdAt: DateTime!
12}
13
14type UserStatus {
15 userId: ID!
16 status: Status!
17}
18
19enum Status {
20 ONLINE
21 OFFLINE
22 AWAY
23}Server Setup with Apollo#
1import { ApolloServer } from '@apollo/server';
2import { expressMiddleware } from '@apollo/server/express4';
3import { createServer } from 'http';
4import { WebSocketServer } from 'ws';
5import { useServer } from 'graphql-ws/lib/use/ws';
6import { makeExecutableSchema } from '@graphql-tools/schema';
7import { PubSub } from 'graphql-subscriptions';
8
9const pubsub = new PubSub();
10
11const typeDefs = `#graphql
12 type Query {
13 messages(channelId: ID!): [Message!]!
14 }
15
16 type Mutation {
17 sendMessage(channelId: ID!, content: String!): Message!
18 }
19
20 type Subscription {
21 messageAdded(channelId: ID!): Message!
22 }
23
24 type Message {
25 id: ID!
26 content: String!
27 authorId: ID!
28 channelId: ID!
29 }
30`;
31
32const resolvers = {
33 Query: {
34 messages: (_, { channelId }) => getMessages(channelId),
35 },
36
37 Mutation: {
38 sendMessage: async (_, { channelId, content }, { userId }) => {
39 const message = await createMessage({ channelId, content, authorId: userId });
40
41 // Publish to subscribers
42 pubsub.publish(`MESSAGE_ADDED_${channelId}`, {
43 messageAdded: message,
44 });
45
46 return message;
47 },
48 },
49
50 Subscription: {
51 messageAdded: {
52 subscribe: (_, { channelId }) => {
53 return pubsub.asyncIterator(`MESSAGE_ADDED_${channelId}`);
54 },
55 },
56 },
57};
58
59// Create schema
60const schema = makeExecutableSchema({ typeDefs, resolvers });
61
62// Create HTTP and WebSocket servers
63const app = express();
64const httpServer = createServer(app);
65
66const wsServer = new WebSocketServer({
67 server: httpServer,
68 path: '/graphql',
69});
70
71useServer({ schema }, wsServer);
72
73const server = new ApolloServer({ schema });
74await server.start();
75
76app.use('/graphql', expressMiddleware(server));
77
78httpServer.listen(4000);Client Setup#
1import {
2 ApolloClient,
3 InMemoryCache,
4 split,
5 HttpLink
6} from '@apollo/client';
7import { GraphQLWsLink } from '@apollo/client/link/subscriptions';
8import { createClient } from 'graphql-ws';
9import { getMainDefinition } from '@apollo/client/utilities';
10
11const httpLink = new HttpLink({
12 uri: 'http://localhost:4000/graphql',
13});
14
15const wsLink = new GraphQLWsLink(
16 createClient({
17 url: 'ws://localhost:4000/graphql',
18 })
19);
20
21// Route subscriptions to WebSocket, others to HTTP
22const splitLink = split(
23 ({ query }) => {
24 const definition = getMainDefinition(query);
25 return (
26 definition.kind === 'OperationDefinition' &&
27 definition.operation === 'subscription'
28 );
29 },
30 wsLink,
31 httpLink
32);
33
34const client = new ApolloClient({
35 link: splitLink,
36 cache: new InMemoryCache(),
37});React Hook Usage#
1import { useSubscription, gql } from '@apollo/client';
2
3const MESSAGE_SUBSCRIPTION = gql`
4 subscription OnMessageAdded($channelId: ID!) {
5 messageAdded(channelId: $channelId) {
6 id
7 content
8 authorId
9 }
10 }
11`;
12
13function ChatRoom({ channelId }: { channelId: string }) {
14 const { data, loading, error } = useSubscription(MESSAGE_SUBSCRIPTION, {
15 variables: { channelId },
16 onData: ({ data }) => {
17 console.log('New message:', data.data?.messageAdded);
18 },
19 });
20
21 if (loading) return <p>Connecting...</p>;
22 if (error) return <p>Error: {error.message}</p>;
23
24 return (
25 <div>
26 {data?.messageAdded && (
27 <p>New: {data.messageAdded.content}</p>
28 )}
29 </div>
30 );
31}Updating Cache on Subscription#
1function ChatRoom({ channelId }) {
2 const { data: messagesData } = useQuery(GET_MESSAGES, {
3 variables: { channelId },
4 });
5
6 useSubscription(MESSAGE_SUBSCRIPTION, {
7 variables: { channelId },
8 onData: ({ client, data }) => {
9 const newMessage = data.data?.messageAdded;
10 if (!newMessage) return;
11
12 // Update cache with new message
13 client.cache.modify({
14 fields: {
15 messages(existingMessages = []) {
16 const newMessageRef = client.cache.writeFragment({
17 data: newMessage,
18 fragment: gql`
19 fragment NewMessage on Message {
20 id
21 content
22 authorId
23 }
24 `,
25 });
26 return [...existingMessages, newMessageRef];
27 },
28 },
29 });
30 },
31 });
32
33 return <MessageList messages={messagesData?.messages} />;
34}Scaling with Redis PubSub#
1import { RedisPubSub } from 'graphql-redis-subscriptions';
2import Redis from 'ioredis';
3
4const options = {
5 host: process.env.REDIS_HOST,
6 port: parseInt(process.env.REDIS_PORT || '6379'),
7 retryStrategy: (times: number) => Math.min(times * 50, 2000),
8};
9
10const pubsub = new RedisPubSub({
11 publisher: new Redis(options),
12 subscriber: new Redis(options),
13});
14
15// Now works across multiple server instances
16pubsub.publish('MESSAGE_ADDED_123', { messageAdded: message });Authentication#
1const wsServer = new WebSocketServer({ server: httpServer, path: '/graphql' });
2
3useServer(
4 {
5 schema,
6 context: async (ctx) => {
7 const token = ctx.connectionParams?.authToken;
8
9 if (!token) {
10 throw new Error('Missing auth token');
11 }
12
13 const user = await verifyToken(token);
14 return { user };
15 },
16 onConnect: async (ctx) => {
17 console.log('Client connected');
18 },
19 onDisconnect: async (ctx) => {
20 console.log('Client disconnected');
21 },
22 },
23 wsServer
24);Subscriptions enable real-time features while maintaining GraphQL's typed, declarative approach.