Skip to Content
đź‘‹ Hey there! Welcome to NestJS tRPC.
DocumentationHelpersSubscription Helpers

Subscription Helpers

Powerful utilities for building robust event-driven subscriptions with advanced features like queue management, error handling, and performance monitoring.

Overview

The subscription helpers provide a high-level API for creating tRPC subscriptions from EventEmitter events. They handle common challenges like:

  • Queue Management: Prevent memory leaks with configurable event queues
  • Error Recovery: Resilient mode for handling errors gracefully
  • Performance Monitoring: Built-in statistics and performance tracking
  • Type Safety: Type guards and transforms for runtime safety
  • Resource Management: Proper cleanup and cancellation support
  • Testing Utilities: Tools for testing subscriptions easily

Quick Start

Basic Event Subscription

import { Injectable } from '@nestjs/common'
import { EventEmitter } from 'events'
import { Router, Subscription, Input } from '@nexica/nestjs-trpc'
import { createEventSubscription } from '@nexica/nestjs-trpc'
import { z } from 'zod'
 
// Service for managing chat events
@Injectable()
export class ChatService {
    private readonly eventEmitter = new EventEmitter()
 
    getEventEmitter() {
        return this.eventEmitter
    }
 
    sendMessage(roomId: string, message: any) {
        this.eventEmitter.emit(`room:${roomId}:message`, message)
    }
}
 
// Router with subscription endpoint
@Router()
export class ChatRouter {
    constructor(private readonly chatService: ChatService) {}
 
    @Subscription({
        input: z.object({
            roomId: z.string(),
        }),
        output: z.object({
            id: z.string(),
            message: z.string(),
            userId: z.string(),
            timestamp: z.date(),
        }),
    })
    async *onMessage(@Input() input: { roomId: string }) {
        const { subscription, controller } = createEventSubscription({
            eventEmitter: this.chatService.getEventEmitter(),
            eventName: `room:${input.roomId}:message`,
            filter: (event: any) => event.roomId === input.roomId,
        })
 
        try {
            yield* subscription
        } finally {
            controller.close()
        }
    }
}

API Reference

createEventSubscription

The main function for creating event-based subscriptions.

function createEventSubscription<T>(options: SubscriptionHelperOptions<T>): {
    subscription: AsyncIterable<T>
    controller: SubscriptionController
}

Options

PropertyTypeDefaultDescription
eventEmitterEventEmitter-Required. The EventEmitter instance to listen to
eventNamestring-Required. The event name to listen for
errorEventNamestring${eventName}ErrorEvent name for error handling
maxQueueSizenumber1000Maximum events to queue (prevents memory leaks)
dropNewOnFullbooleanfalseDrop new events when queue is full (vs. dropping oldest)
timeoutnumber-Timeout for waiting for events (milliseconds)
throwOnTimeoutbooleanfalseWhether to throw on timeout or continue waiting
resilientModebooleanfalseContinue after errors instead of throwing
filter(event: unknown) => boolean-Custom filter function for events
transform(event: unknown) => T-Custom transform function for events
typeGuard(event: unknown) => event is T-Type guard for runtime type validation
signalAbortSignal-AbortSignal to cancel the subscription
onError(error: Error, context) => void-Custom error handler

Return Value

  • subscription: AsyncIterable that yields filtered and transformed events
  • controller: Object for managing the subscription lifecycle

SubscriptionController

Interface for managing subscription lifecycle and monitoring.

interface SubscriptionController {
    close(): void
    isClosed(): boolean
    getStats(): SubscriptionStats
}

SubscriptionStats

Performance and monitoring statistics.

interface SubscriptionStats {
    eventsProcessed: number
    errorsEncountered: number
    currentQueueSize: number
    startTime: number
    lastEventTime: number | null
}

Advanced Features

Queue Management

Prevent memory leaks with automatic queue management:

const { subscription, controller } = createEventSubscription({
    eventEmitter,
    eventName: 'high-frequency-event',
    maxQueueSize: 500, // Limit queue to 500 events
    dropNewOnFull: false, // Drop oldest events when full (default)
    // dropNewOnFull: true, // Drop new events when full
})
 
// Monitor queue size
setInterval(() => {
    const stats = controller.getStats()
    if (stats.currentQueueSize > 400) {
        console.warn('Queue getting full:', stats.currentQueueSize)
    }
}, 1000)

Error Handling & Resilient Mode

Handle errors gracefully without stopping the subscription:

const { subscription } = createEventSubscription({
    eventEmitter,
    eventName: 'unreliable-event',
    resilientMode: true, // Continue after errors
    onError: (error, context) => {
        console.error(`Error in ${context.eventName}:`, error)
        // Log to monitoring service
        monitoringService.logError(error, context.stats)
    },
})

Type Safety with Guards and Transforms

Ensure runtime type safety and data transformation:

interface ChatMessage {
    id: string
    content: string
    userId: string
    timestamp: Date
}
 
const isValidMessage = (event: unknown): event is ChatMessage => {
    return typeof event === 'object' && event !== null && typeof (event as any).id === 'string' && typeof (event as any).content === 'string'
}
 
const { subscription } = createEventSubscription<ChatMessage>({
    eventEmitter,
    eventName: 'chat-message',
    typeGuard: isValidMessage,
    transform: (event: any) => ({
        ...event,
        timestamp: new Date(event.timestamp), // Convert to Date object
    }),
    filter: (event: any) => event.content.trim().length > 0, // Filter empty messages
})

Timeout Handling

Handle slow or missing events with timeouts:

const { subscription } = createEventSubscription({
    eventEmitter,
    eventName: 'periodic-update',
    timeout: 5000, // 5 second timeout
    throwOnTimeout: false, // Continue waiting instead of throwing
    onError: (error, context) => {
        if (error.message.includes('timeout')) {
            console.log('No updates received in 5 seconds, continuing...')
        }
    },
})

Cancellation with AbortSignal

Cancel subscriptions using AbortController:

const abortController = new AbortController()
 
const { subscription } = createEventSubscription({
    eventEmitter,
    eventName: 'long-running-process',
    signal: abortController.signal,
})
 
// Cancel after 30 seconds
setTimeout(() => {
    abortController.abort()
}, 30000)
 
try {
    for await (const event of subscription) {
        console.log('Event:', event)
    }
} catch (error) {
    if (error.message === 'Subscription cancelled') {
        console.log('Subscription was cancelled')
    }
}

Performance Monitoring

Real-time Statistics

Monitor subscription performance in real-time:

const { subscription, controller } = createEventSubscription({
    eventEmitter,
    eventName: 'monitored-event',
})
 
// Log statistics every 10 seconds
const statsInterval = setInterval(() => {
    const stats = controller.getStats()
    const runtime = Date.now() - stats.startTime
    const eventsPerSecond = stats.eventsProcessed / (runtime / 1000)
 
    console.log('Subscription Stats:', {
        runtime: `${runtime}ms`,
        eventsProcessed: stats.eventsProcessed,
        eventsPerSecond: eventsPerSecond.toFixed(2),
        currentQueueSize: stats.currentQueueSize,
        errorsEncountered: stats.errorsEncountered,
        lastEventTime: stats.lastEventTime ? new Date(stats.lastEventTime) : null,
    })
}, 10000)
 
// Cleanup
try {
    yield * subscription
} finally {
    clearInterval(statsInterval)
    controller.close()
}

Performance Alerts

Set up automatic alerts for performance issues:

const { subscription, controller } = createEventSubscription({
    eventEmitter,
    eventName: 'critical-event',
    maxQueueSize: 1000,
    onError: (error, context) => {
        // Alert on high error rate
        if (context.stats.errorsEncountered > 10) {
            alertService.send('High error rate in subscription', context.stats)
        }
    },
})
 
// Monitor for performance issues
const healthCheck = setInterval(() => {
    const stats = controller.getStats()
 
    // Alert on high queue size
    if (stats.currentQueueSize > 800) {
        alertService.send('Subscription queue near capacity', stats)
    }
 
    // Alert on no recent events (possible producer issue)
    if (stats.lastEventTime && Date.now() - stats.lastEventTime > 60000) {
        alertService.send('No events received in 60 seconds', stats)
    }
}, 5000)

Testing Utilities

createMockEventEmitter

Create a mock EventEmitter for testing:

import { createMockEventEmitter, createSubscriptionTester } from '@nexica/nestjs-trpc'
 
describe('ChatService', () => {
    it('should handle chat messages', async () => {
        const { emitter, emit } = createMockEventEmitter()
 
        const { subscription } = createEventSubscription({
            eventEmitter: emitter,
            eventName: 'chat-message',
        })
 
        const tester = createSubscriptionTester(subscription)
 
        // Emit test events
        emit('chat-message', { id: '1', content: 'Hello' })
        emit('chat-message', { id: '2', content: 'World' })
 
        // Collect results
        const messages = await tester.collect(2)
 
        expect(messages).toHaveLength(2)
        expect(messages[0]).toEqual({ id: '1', content: 'Hello' })
        expect(messages[1]).toEqual({ id: '2', content: 'World' })
 
        tester.close()
    })
})

createSubscriptionTester

Utility for testing async iterables:

const tester = createSubscriptionTester(subscription)
 
// Get next event
const nextEvent = await tester.next()
 
// Collect multiple events
const events = await tester.collect(5)
 
// Cleanup
tester.close()

Real-world Examples

Notification System

// Service for managing notifications
@Injectable()
export class NotificationService {
    private readonly eventEmitter = new EventEmitter()
 
    getEventEmitter() {
        return this.eventEmitter
    }
 
    sendNotification(userId: string, notification: any) {
        this.eventEmitter.emit(`user:${userId}:notification`, notification)
    }
}
 
// Router with notification subscription
@Router()
export class NotificationRouter {
    constructor(
        private readonly notificationService: NotificationService,
        private readonly logger: Logger
    ) {}
 
    @Subscription({
        input: z.object({
            userId: z.string(),
            types: z.array(z.string()).optional(),
        }),
        output: z.object({
            id: z.string(),
            type: z.string(),
            title: z.string(),
            message: z.string(),
            timestamp: z.date(),
            priority: z.enum(['low', 'medium', 'high']),
        }),
    })
    async *onNotification(@Input() input: { userId: string; types?: string[] }, @Context() ctx: RequestContext) {
        const { subscription, controller } = createEventSubscription({
            eventEmitter: this.notificationService.getEventEmitter(),
            eventName: `user:${input.userId}:notification`,
            maxQueueSize: 100, // Limit notifications per user
            resilientMode: true, // Don't fail on individual notification errors
            filter: (event: any) => {
                // Filter by notification types if specified
                return !input.types || input.types.includes(event.type)
            },
            transform: (event: any) => ({
                ...event,
                timestamp: new Date(event.timestamp),
            }),
            typeGuard: (event: unknown): event is NotificationEvent => {
                return isValidNotification(event)
            },
            onError: (error, context) => {
                this.logger.error(`Notification error for user ${input.userId}:`, error)
            },
        })
 
        try {
            yield* subscription
        } finally {
            controller.close()
        }
    }
}

Live Data Dashboard

// Service for managing dashboard metrics
@Injectable()
export class DashboardService {
    private readonly metricsEmitter = new EventEmitter()
 
    getMetricsEmitter() {
        return this.metricsEmitter
    }
 
    updateDashboard(dashboardId: string, metrics: any) {
        this.metricsEmitter.emit(`dashboard:${dashboardId}:update`, metrics)
    }
}
 
// Router with dashboard subscription
@Router()
export class DashboardRouter {
    constructor(
        private readonly dashboardService: DashboardService,
        private readonly monitoringService: MonitoringService
    ) {}
 
    @Subscription({
        input: z.object({
            dashboardId: z.string(),
            refreshRate: z.number().min(1000).default(5000),
        }),
        output: z.object({
            metrics: z.record(z.number()),
            timestamp: z.date(),
            status: z.enum(['healthy', 'warning', 'error']),
        }),
    })
    async *onDashboardUpdate(@Input() input: { dashboardId: string; refreshRate: number }, @Context() ctx: RequestContext) {
        const abortController = new AbortController()
 
        // Auto-cleanup after 1 hour
        setTimeout(() => abortController.abort(), 3600000)
 
        const { subscription, controller } = createEventSubscription({
            eventEmitter: this.dashboardService.getMetricsEmitter(),
            eventName: `dashboard:${input.dashboardId}:update`,
            timeout: input.refreshRate * 2, // Timeout if no updates
            throwOnTimeout: false, // Continue waiting
            signal: abortController.signal,
            maxQueueSize: 10, // Only keep recent updates
            dropNewOnFull: true, // Drop old updates for fresh data
            transform: (event: any) => ({
                ...event,
                timestamp: new Date(),
            }),
            onError: (error, context) => {
                this.monitoringService.logDashboardError(input.dashboardId, error, context.stats)
            },
        })
 
        try {
            yield* subscription
        } finally {
            controller.close()
        }
    }
}

Best Practices

Resource Management

Always clean up subscriptions properly:

async function* mySubscription() {
    const { subscription, controller } = createEventSubscription({
        eventEmitter,
        eventName: 'my-event',
    })
 
    try {
        yield* subscription
    } finally {
        // Always clean up
        controller.close()
    }
}

Error Handling

Use resilient mode for production services:

const { subscription } = createEventSubscription({
    eventEmitter,
    eventName: 'production-event',
    resilientMode: true, // Don't crash on errors
    onError: (error, context) => {
        // Log to monitoring service
        logger.error('Subscription error:', { error, stats: context.stats })
 
        // Send alert if error rate is high
        if (context.stats.errorsEncountered > 10) {
            alertService.sendAlert('High error rate in subscription')
        }
    },
})

Performance Optimization

Configure queue sizes based on your use case:

// High-frequency events: smaller queue, drop old events
const { subscription } = createEventSubscription({
    eventEmitter,
    eventName: 'high-frequency-event',
    maxQueueSize: 50,
    dropNewOnFull: false, // Drop oldest events
})
 
// Critical events: larger queue, never drop events
const { subscription } = createEventSubscription({
    eventEmitter,
    eventName: 'critical-event',
    maxQueueSize: 10000,
    dropNewOnFull: true, // Never drop events
})

Type Safety

Always use type guards for runtime safety:

const isValidEvent = (event: unknown): event is MyEventType => {
    return typeof event === 'object' && event !== null && 'id' in event && 'data' in event
}
 
const { subscription } = createEventSubscription<MyEventType>({
    eventEmitter,
    eventName: 'typed-event',
    typeGuard: isValidEvent,
    onError: (error, context) => {
        if (error.message.includes('Type validation failed')) {
            logger.warn('Received invalid event data')
        }
    },
})

Migration from Legacy

From createEventSubscriptionLegacy

The legacy function is deprecated but still available for backward compatibility:

// Old way (deprecated)
async function* oldSubscription() {
    yield* createEventSubscriptionLegacy({
        eventEmitter,
        eventName: 'my-event',
    })
}
 
// New way (recommended)
async function* newSubscription() {
    const { subscription, controller } = createEventSubscription({
        eventEmitter,
        eventName: 'my-event',
    })
 
    try {
        yield* subscription
    } finally {
        controller.close()
    }
}

Troubleshooting

Common Issues

Memory Leaks

  • Ensure you always call controller.close() in a finally block
  • Set appropriate maxQueueSize for your use case
  • Monitor queue size with controller.getStats()

Performance Issues

  • Use dropNewOnFull: false for high-frequency events
  • Implement efficient filter functions
  • Monitor with controller.getStats()

Type Errors

  • Use typeGuard functions for runtime type validation
  • Implement proper transform functions for data conversion
  • Handle type validation errors in onError callback

Connection Issues

  • Use resilientMode: true for production
  • Implement proper onError handling
  • Use timeout settings for detecting stale connections
Last updated on: