Skip to Content
đź‘‹ Hey there! Welcome to NestJS tRPC.

Subscriptions

Learn how to implement real-time features with tRPC subscriptions in NestJS.

Overview

Subscriptions enable real-time communication between your server and clients using WebSocket connections. They’re perfect for features like:

  • Live notifications
  • Real-time chat messages
  • Live data updates
  • Progress tracking
  • Collaborative features

Basic Subscription

Create a subscription using the @Subscription() decorator:

import { Subscription, Input, Context } from '@nexica/nestjs-trpc'
import { z } from 'zod'
 
@Router()
export class NotificationRouter {
    @Subscription({
        input: z.object({
            userId: z.string(),
        }),
        output: z.object({
            id: z.string(),
            message: z.string(),
            timestamp: z.date(),
        }),
    })
    async *onNotification(
        @Input() input: { userId: string },
        @Context() ctx: RequestContext
    ): AsyncIterable<{
        id: string
        message: string
        timestamp: Date
    }> {
        const eventEmitter = this.notificationService.getEventEmitter()
        const eventQueue: any[] = []
        let waitingResolver: ((value: any) => void) | null = null
 
        const handler = (notification: any) => {
            if (notification.userId === input.userId) {
                if (waitingResolver) {
                    waitingResolver({ notification })
                    waitingResolver = null
                } else {
                    eventQueue.push(notification)
                }
            }
        }
 
        eventEmitter.addListener('notification', handler)
 
        try {
            while (true) {
                if (eventQueue.length > 0) {
                    yield eventQueue.shift()!
                    continue
                }
 
                const result = await new Promise((resolve) => {
                    waitingResolver = resolve
                })
 
                if (result.notification) {
                    yield result.notification
                }
            }
        } finally {
            eventEmitter.removeListener('notification', handler)
        }
    }
}

WebSocket Configuration

Enable WebSocket support in your TRPCModule:

import { Module } from '@nestjs/common'
import { TRPCModule } from '@nexica/nestjs-trpc'
 
@Module({
    imports: [
        TRPCModule.forRoot({
            context: RequestContextFactory,
            basePath: '/trpc',
            websocket: {
                enabled: true,
                port: 4001, // WebSocket server port
                path: '/trpc',
            },
        }),
    ],
})
export class AppModule {}

WebSocket Options

OptionTypeDefaultDescription
enabledbooleanfalseEnable WebSocket support
portnumber4001WebSocket server port
pathstring/trpcWebSocket endpoint path
attachToHttpServerbooleanfalseAttach to main HTTP server

Advanced Subscription Patterns

Multi-Room Chat

@Router()
export class ChatRouter {
    @Subscription({
        input: z.object({
            roomId: z.string(),
        }),
        output: z.object({
            id: z.string(),
            userId: z.string(),
            username: z.string(),
            message: z.string(),
            timestamp: z.date(),
        }),
    })
    async *onMessage(@Input() input: { roomId: string }, @Context() ctx: RequestContext) {
        const eventEmitter = this.chatService.getEventEmitter()
        const eventQueue: any[] = []
        let waitingResolver: ((value: any) => void) | null = null
        let isActive = true
 
        const handler = (message: any) => {
            if (!isActive || message.roomId !== input.roomId) return
 
            if (waitingResolver) {
                waitingResolver({ message })
                waitingResolver = null
            } else {
                eventQueue.push(message)
            }
        }
 
        const errorHandler = (error: any) => {
            isActive = false
            if (waitingResolver) {
                waitingResolver({ error })
                waitingResolver = null
            }
        }
 
        eventEmitter.addListener('message', handler)
        eventEmitter.addListener('error', errorHandler)
 
        try {
            // Send existing messages first
            const existingMessages = await this.chatService.getRecentMessages(input.roomId)
            for (const message of existingMessages) {
                yield message
            }
 
            // Then listen for new messages
            while (isActive) {
                if (eventQueue.length > 0) {
                    yield eventQueue.shift()!
                    continue
                }
 
                const result = await new Promise((resolve) => {
                    waitingResolver = resolve
                })
 
                if (result.error) {
                    throw result.error
                }
 
                if (result.message) {
                    yield result.message
                }
            }
        } finally {
            eventEmitter.removeListener('message', handler)
            eventEmitter.removeListener('error', errorHandler)
        }
    }
}

Live Data Updates

@Router()
export class DashboardRouter {
    @Subscription({
        input: z.object({
            dashboardId: z.string(),
            updateInterval: z.number().min(1000).max(60000).default(5000),
        }),
        output: z.object({
            timestamp: z.date(),
            metrics: z.object({
                activeUsers: z.number(),
                revenue: z.number(),
                orders: z.number(),
            }),
        }),
    })
    async *onMetricsUpdate(@Input() input: { dashboardId: string; updateInterval: number }) {
        while (true) {
            const metrics = await this.dashboardService.getMetrics(input.dashboardId)
 
            yield {
                timestamp: new Date(),
                metrics,
            }
 
            // Wait for the specified interval
            await new Promise((resolve) => setTimeout(resolve, input.updateInterval))
        }
    }
}

Progress Tracking

@Router()
export class TaskRouter {
    @Subscription({
        input: z.object({
            taskId: z.string(),
        }),
        output: z.object({
            taskId: z.string(),
            status: z.enum(['pending', 'in_progress', 'completed', 'failed']),
            progress: z.number().min(0).max(100),
            message: z.string().optional(),
        }),
    })
    async *onTaskProgress(@Input() input: { taskId: string }, @Context() ctx: RequestContext) {
        const eventEmitter = this.taskService.getEventEmitter()
        const eventQueue: any[] = []
        let waitingResolver: ((value: any) => void) | null = null
        let isCompleted = false
 
        const handler = (update: any) => {
            if (update.taskId !== input.taskId) return
 
            if (update.status === 'completed' || update.status === 'failed') {
                isCompleted = true
            }
 
            if (waitingResolver) {
                waitingResolver({ update })
                waitingResolver = null
            } else {
                eventQueue.push(update)
            }
        }
 
        eventEmitter.addListener('taskProgress', handler)
 
        try {
            while (!isCompleted) {
                if (eventQueue.length > 0) {
                    const update = eventQueue.shift()!
                    yield update
                    if (update.status === 'completed' || update.status === 'failed') {
                        break
                    }
                    continue
                }
 
                const result = await new Promise((resolve) => {
                    waitingResolver = resolve
                })
 
                if (result.update) {
                    yield result.update
                    if (result.update.status === 'completed' || result.update.status === 'failed') {
                        break
                    }
                }
            }
        } finally {
            eventEmitter.removeListener('taskProgress', handler)
        }
    }
}

Client-Side Usage

React with tRPC React Query

import { trpc } from '@/utils/trpc'
 
function NotificationList({ userId }: { userId: string }) {
    const { data: notifications } = trpc.notification.onNotification.useSubscription(
        { userId },
        {
            onData: (notification) => {
                console.log('New notification:', notification)
                // Handle new notification (e.g., show toast)
            },
            onError: (error) => {
                console.error('Subscription error:', error)
            },
        }
    )
 
    return (
        <div>
            {notifications?.map((notification) => (
                <div key={notification.id}>
                    <p>{notification.message}</p>
                    <small>{notification.timestamp.toLocaleString()}</small>
                </div>
            ))}
        </div>
    )
}

Vanilla JavaScript

import { createTRPCMsw } from '@trpc/client'
import { createWSClient, wsLink } from '@trpc/client'
 
const wsClient = createWSClient({
    url: 'ws://localhost:4001/trpc',
})
 
const trpc = createTRPCMsw({
    links: [
        wsLink({
            client: wsClient,
        }),
    ],
})
 
// Subscribe to notifications
const subscription = trpc.notification.onNotification.subscribe(
    { userId: 'user123' },
    {
        onData: (notification) => {
            console.log('Received:', notification)
        },
        onError: (error) => {
            console.error('Error:', error)
        },
    }
)
 
// Unsubscribe when done
subscription.unsubscribe()

Error Handling

Handle errors gracefully in subscriptions:

@Subscription({
    input: z.object({ userId: z.string() }),
    output: NotificationSchema,
})
async *onNotification(@Input() input: { userId: string }) {
    try {
        const eventEmitter = this.notificationService.getEventEmitter()
        const eventQueue: any[] = []
        let waitingResolver: any = null
        let hasError = false
 
        const handler = (notification: any) => {
            if (notification.userId === input.userId) {
                if (waitingResolver) {
                    waitingResolver({ notification })
                    waitingResolver = null
                } else {
                    eventQueue.push(notification)
                }
            }
        }
 
        const errorHandler = (error: any) => {
            hasError = true
            if (waitingResolver) {
                waitingResolver({ error })
                waitingResolver = null
            }
        }
 
        eventEmitter.addListener('notification', handler)
        eventEmitter.addListener('error', errorHandler)
 
        try {
            while (!hasError) {
                if (eventQueue.length > 0) {
                    yield eventQueue.shift()!
                    continue
                }
 
                const result = await new Promise((resolve) => {
                    waitingResolver = resolve
                })
 
                if (result.error) {
                    throw new TRPCError({
                        code: 'INTERNAL_SERVER_ERROR',
                        message: 'Subscription error',
                        cause: result.error,
                    })
                }
 
                if (result.notification) {
                    yield result.notification
                }
            }
        } finally {
            eventEmitter.removeListener('notification', handler)
            eventEmitter.removeListener('error', errorHandler)
        }
    } catch (error) {
        console.error('Subscription failed:', error)
        throw error
    }
}

Authentication

Secure subscriptions with authentication:

import { Middleware } from '@nexica/nestjs-trpc'
import { AuthMiddleware } from '@/middleware/auth.middleware'
 
@Router()
export class SecureRouter {
    @Subscription({
        input: z.object({ channelId: z.string() }),
        output: MessageSchema,
    })
    @Middleware(AuthMiddleware) // Apply auth middleware
    async *onPrivateMessage(@Input() input: { channelId: string }, @Context() ctx: RequestContext) {
        // ctx.userId is available from auth middleware
        const userId = ctx.userId
 
        // Check if user has access to this channel
        const hasAccess = await this.channelService.userHasAccess(userId, input.channelId)
 
        if (!hasAccess) {
            throw new TRPCError({
                code: 'FORBIDDEN',
                message: 'Access denied to this channel',
            })
        }
 
        // ... subscription logic
    }
}

Subscription Helpers

NestJS tRPC provides helper utilities for common subscription patterns:

import { createEventSubscription } from '@nexica/nestjs-trpc/helpers'
 
@Subscription({
    input: z.object({ topic: z.string() }),
    output: EventSchema,
})
async *onEvent(@Input() input: { topic: string }) {
    const eventEmitter = this.eventService.getEventEmitter()
 
    // Use the helper for simplified event handling
    yield* createEventSubscription(eventEmitter, 'event', {
        filter: (event) => event.topic === input.topic,
        maxQueueSize: 100,
        timeout: 30000,
    })
}

Next Steps

Explore more advanced features:

Last updated on: