Subscriptions
Learn how to implement real-time features with tRPC subscriptions in NestJS.
Overview
Subscriptions enable real-time communication between your server and clients using WebSocket connections. They’re perfect for features like:
- Live notifications
- Real-time chat messages
- Live data updates
- Progress tracking
- Collaborative features
Basic Subscription
Create a subscription using the @Subscription()
decorator:
import { Subscription, Input, Context } from '@nexica/nestjs-trpc'
import { z } from 'zod'
@Router()
export class NotificationRouter {
@Subscription({
input: z.object({
userId: z.string(),
}),
output: z.object({
id: z.string(),
message: z.string(),
timestamp: z.date(),
}),
})
async *onNotification(
@Input() input: { userId: string },
@Context() ctx: RequestContext
): AsyncIterable<{
id: string
message: string
timestamp: Date
}> {
const eventEmitter = this.notificationService.getEventEmitter()
const eventQueue: any[] = []
let waitingResolver: ((value: any) => void) | null = null
const handler = (notification: any) => {
if (notification.userId === input.userId) {
if (waitingResolver) {
waitingResolver({ notification })
waitingResolver = null
} else {
eventQueue.push(notification)
}
}
}
eventEmitter.addListener('notification', handler)
try {
while (true) {
if (eventQueue.length > 0) {
yield eventQueue.shift()!
continue
}
const result = await new Promise((resolve) => {
waitingResolver = resolve
})
if (result.notification) {
yield result.notification
}
}
} finally {
eventEmitter.removeListener('notification', handler)
}
}
}
WebSocket Configuration
Enable WebSocket support in your TRPCModule:
import { Module } from '@nestjs/common'
import { TRPCModule } from '@nexica/nestjs-trpc'
@Module({
imports: [
TRPCModule.forRoot({
context: RequestContextFactory,
basePath: '/trpc',
websocket: {
enabled: true,
port: 4001, // WebSocket server port
path: '/trpc',
},
}),
],
})
export class AppModule {}
WebSocket Options
Option | Type | Default | Description |
---|---|---|---|
enabled | boolean | false | Enable WebSocket support |
port | number | 4001 | WebSocket server port |
path | string | /trpc | WebSocket endpoint path |
attachToHttpServer | boolean | false | Attach to main HTTP server |
Advanced Subscription Patterns
Multi-Room Chat
@Router()
export class ChatRouter {
@Subscription({
input: z.object({
roomId: z.string(),
}),
output: z.object({
id: z.string(),
userId: z.string(),
username: z.string(),
message: z.string(),
timestamp: z.date(),
}),
})
async *onMessage(@Input() input: { roomId: string }, @Context() ctx: RequestContext) {
const eventEmitter = this.chatService.getEventEmitter()
const eventQueue: any[] = []
let waitingResolver: ((value: any) => void) | null = null
let isActive = true
const handler = (message: any) => {
if (!isActive || message.roomId !== input.roomId) return
if (waitingResolver) {
waitingResolver({ message })
waitingResolver = null
} else {
eventQueue.push(message)
}
}
const errorHandler = (error: any) => {
isActive = false
if (waitingResolver) {
waitingResolver({ error })
waitingResolver = null
}
}
eventEmitter.addListener('message', handler)
eventEmitter.addListener('error', errorHandler)
try {
// Send existing messages first
const existingMessages = await this.chatService.getRecentMessages(input.roomId)
for (const message of existingMessages) {
yield message
}
// Then listen for new messages
while (isActive) {
if (eventQueue.length > 0) {
yield eventQueue.shift()!
continue
}
const result = await new Promise((resolve) => {
waitingResolver = resolve
})
if (result.error) {
throw result.error
}
if (result.message) {
yield result.message
}
}
} finally {
eventEmitter.removeListener('message', handler)
eventEmitter.removeListener('error', errorHandler)
}
}
}
Live Data Updates
@Router()
export class DashboardRouter {
@Subscription({
input: z.object({
dashboardId: z.string(),
updateInterval: z.number().min(1000).max(60000).default(5000),
}),
output: z.object({
timestamp: z.date(),
metrics: z.object({
activeUsers: z.number(),
revenue: z.number(),
orders: z.number(),
}),
}),
})
async *onMetricsUpdate(@Input() input: { dashboardId: string; updateInterval: number }) {
while (true) {
const metrics = await this.dashboardService.getMetrics(input.dashboardId)
yield {
timestamp: new Date(),
metrics,
}
// Wait for the specified interval
await new Promise((resolve) => setTimeout(resolve, input.updateInterval))
}
}
}
Progress Tracking
@Router()
export class TaskRouter {
@Subscription({
input: z.object({
taskId: z.string(),
}),
output: z.object({
taskId: z.string(),
status: z.enum(['pending', 'in_progress', 'completed', 'failed']),
progress: z.number().min(0).max(100),
message: z.string().optional(),
}),
})
async *onTaskProgress(@Input() input: { taskId: string }, @Context() ctx: RequestContext) {
const eventEmitter = this.taskService.getEventEmitter()
const eventQueue: any[] = []
let waitingResolver: ((value: any) => void) | null = null
let isCompleted = false
const handler = (update: any) => {
if (update.taskId !== input.taskId) return
if (update.status === 'completed' || update.status === 'failed') {
isCompleted = true
}
if (waitingResolver) {
waitingResolver({ update })
waitingResolver = null
} else {
eventQueue.push(update)
}
}
eventEmitter.addListener('taskProgress', handler)
try {
while (!isCompleted) {
if (eventQueue.length > 0) {
const update = eventQueue.shift()!
yield update
if (update.status === 'completed' || update.status === 'failed') {
break
}
continue
}
const result = await new Promise((resolve) => {
waitingResolver = resolve
})
if (result.update) {
yield result.update
if (result.update.status === 'completed' || result.update.status === 'failed') {
break
}
}
}
} finally {
eventEmitter.removeListener('taskProgress', handler)
}
}
}
Client-Side Usage
React with tRPC React Query
import { trpc } from '@/utils/trpc'
function NotificationList({ userId }: { userId: string }) {
const { data: notifications } = trpc.notification.onNotification.useSubscription(
{ userId },
{
onData: (notification) => {
console.log('New notification:', notification)
// Handle new notification (e.g., show toast)
},
onError: (error) => {
console.error('Subscription error:', error)
},
}
)
return (
<div>
{notifications?.map((notification) => (
<div key={notification.id}>
<p>{notification.message}</p>
<small>{notification.timestamp.toLocaleString()}</small>
</div>
))}
</div>
)
}
Vanilla JavaScript
import { createTRPCMsw } from '@trpc/client'
import { createWSClient, wsLink } from '@trpc/client'
const wsClient = createWSClient({
url: 'ws://localhost:4001/trpc',
})
const trpc = createTRPCMsw({
links: [
wsLink({
client: wsClient,
}),
],
})
// Subscribe to notifications
const subscription = trpc.notification.onNotification.subscribe(
{ userId: 'user123' },
{
onData: (notification) => {
console.log('Received:', notification)
},
onError: (error) => {
console.error('Error:', error)
},
}
)
// Unsubscribe when done
subscription.unsubscribe()
Error Handling
Handle errors gracefully in subscriptions:
@Subscription({
input: z.object({ userId: z.string() }),
output: NotificationSchema,
})
async *onNotification(@Input() input: { userId: string }) {
try {
const eventEmitter = this.notificationService.getEventEmitter()
const eventQueue: any[] = []
let waitingResolver: any = null
let hasError = false
const handler = (notification: any) => {
if (notification.userId === input.userId) {
if (waitingResolver) {
waitingResolver({ notification })
waitingResolver = null
} else {
eventQueue.push(notification)
}
}
}
const errorHandler = (error: any) => {
hasError = true
if (waitingResolver) {
waitingResolver({ error })
waitingResolver = null
}
}
eventEmitter.addListener('notification', handler)
eventEmitter.addListener('error', errorHandler)
try {
while (!hasError) {
if (eventQueue.length > 0) {
yield eventQueue.shift()!
continue
}
const result = await new Promise((resolve) => {
waitingResolver = resolve
})
if (result.error) {
throw new TRPCError({
code: 'INTERNAL_SERVER_ERROR',
message: 'Subscription error',
cause: result.error,
})
}
if (result.notification) {
yield result.notification
}
}
} finally {
eventEmitter.removeListener('notification', handler)
eventEmitter.removeListener('error', errorHandler)
}
} catch (error) {
console.error('Subscription failed:', error)
throw error
}
}
Authentication
Secure subscriptions with authentication:
import { Middleware } from '@nexica/nestjs-trpc'
import { AuthMiddleware } from '@/middleware/auth.middleware'
@Router()
export class SecureRouter {
@Subscription({
input: z.object({ channelId: z.string() }),
output: MessageSchema,
})
@Middleware(AuthMiddleware) // Apply auth middleware
async *onPrivateMessage(@Input() input: { channelId: string }, @Context() ctx: RequestContext) {
// ctx.userId is available from auth middleware
const userId = ctx.userId
// Check if user has access to this channel
const hasAccess = await this.channelService.userHasAccess(userId, input.channelId)
if (!hasAccess) {
throw new TRPCError({
code: 'FORBIDDEN',
message: 'Access denied to this channel',
})
}
// ... subscription logic
}
}
Performance Considerations
Subscriptions maintain persistent connections. Monitor memory usage and implement proper cleanup. Consider rate limiting and authentication for production deployments.
Subscription Helpers
NestJS tRPC provides helper utilities for common subscription patterns:
import { createEventSubscription } from '@nexica/nestjs-trpc/helpers'
@Subscription({
input: z.object({ topic: z.string() }),
output: EventSchema,
})
async *onEvent(@Input() input: { topic: string }) {
const eventEmitter = this.eventService.getEventEmitter()
// Use the helper for simplified event handling
yield* createEventSubscription(eventEmitter, 'event', {
filter: (event) => event.topic === input.topic,
maxQueueSize: 100,
timeout: 30000,
})
}
Next Steps
Explore more advanced features:
- Middleware - Add authentication and validation
- Testing - Test your subscriptions
- Examples - See complete implementations
Last updated on: