Subscription Helpers
Powerful utilities for building robust event-driven subscriptions with advanced features like queue management, error handling, and performance monitoring.
Overview
The subscription helpers provide a high-level API for creating tRPC subscriptions from EventEmitter events. They handle common challenges like:
- Queue Management: Prevent memory leaks with configurable event queues
- Error Recovery: Resilient mode for handling errors gracefully
- Performance Monitoring: Built-in statistics and performance tracking
- Type Safety: Type guards and transforms for runtime safety
- Resource Management: Proper cleanup and cancellation support
- Testing Utilities: Tools for testing subscriptions easily
Quick Start
Basic Event Subscription
import { Injectable } from '@nestjs/common'
import { EventEmitter } from 'events'
import { Router, Subscription, Input } from '@nexica/nestjs-trpc'
import { createEventSubscription } from '@nexica/nestjs-trpc'
import { z } from 'zod'
// Service for managing chat events
@Injectable()
export class ChatService {
private readonly eventEmitter = new EventEmitter()
getEventEmitter() {
return this.eventEmitter
}
sendMessage(roomId: string, message: any) {
this.eventEmitter.emit(`room:${roomId}:message`, message)
}
}
// Router with subscription endpoint
@Router()
export class ChatRouter {
constructor(private readonly chatService: ChatService) {}
@Subscription({
input: z.object({
roomId: z.string(),
}),
output: z.object({
id: z.string(),
message: z.string(),
userId: z.string(),
timestamp: z.date(),
}),
})
async *onMessage(@Input() input: { roomId: string }) {
const { subscription, controller } = createEventSubscription({
eventEmitter: this.chatService.getEventEmitter(),
eventName: `room:${input.roomId}:message`,
filter: (event: any) => event.roomId === input.roomId,
})
try {
yield* subscription
} finally {
controller.close()
}
}
}
API Reference
createEventSubscription
The main function for creating event-based subscriptions.
function createEventSubscription<T>(options: SubscriptionHelperOptions<T>): {
subscription: AsyncIterable<T>
controller: SubscriptionController
}
Options
Property | Type | Default | Description |
---|---|---|---|
eventEmitter | EventEmitter | - | Required. The EventEmitter instance to listen to |
eventName | string | - | Required. The event name to listen for |
errorEventName | string | ${eventName}Error | Event name for error handling |
maxQueueSize | number | 1000 | Maximum events to queue (prevents memory leaks) |
dropNewOnFull | boolean | false | Drop new events when queue is full (vs. dropping oldest) |
timeout | number | - | Timeout for waiting for events (milliseconds) |
throwOnTimeout | boolean | false | Whether to throw on timeout or continue waiting |
resilientMode | boolean | false | Continue after errors instead of throwing |
filter | (event: unknown) => boolean | - | Custom filter function for events |
transform | (event: unknown) => T | - | Custom transform function for events |
typeGuard | (event: unknown) => event is T | - | Type guard for runtime type validation |
signal | AbortSignal | - | AbortSignal to cancel the subscription |
onError | (error: Error, context) => void | - | Custom error handler |
Return Value
subscription
: AsyncIterable that yields filtered and transformed eventscontroller
: Object for managing the subscription lifecycle
SubscriptionController
Interface for managing subscription lifecycle and monitoring.
interface SubscriptionController {
close(): void
isClosed(): boolean
getStats(): SubscriptionStats
}
SubscriptionStats
Performance and monitoring statistics.
interface SubscriptionStats {
eventsProcessed: number
errorsEncountered: number
currentQueueSize: number
startTime: number
lastEventTime: number | null
}
Advanced Features
Queue Management
Prevent memory leaks with automatic queue management:
const { subscription, controller } = createEventSubscription({
eventEmitter,
eventName: 'high-frequency-event',
maxQueueSize: 500, // Limit queue to 500 events
dropNewOnFull: false, // Drop oldest events when full (default)
// dropNewOnFull: true, // Drop new events when full
})
// Monitor queue size
setInterval(() => {
const stats = controller.getStats()
if (stats.currentQueueSize > 400) {
console.warn('Queue getting full:', stats.currentQueueSize)
}
}, 1000)
Error Handling & Resilient Mode
Handle errors gracefully without stopping the subscription:
const { subscription } = createEventSubscription({
eventEmitter,
eventName: 'unreliable-event',
resilientMode: true, // Continue after errors
onError: (error, context) => {
console.error(`Error in ${context.eventName}:`, error)
// Log to monitoring service
monitoringService.logError(error, context.stats)
},
})
Type Safety with Guards and Transforms
Ensure runtime type safety and data transformation:
interface ChatMessage {
id: string
content: string
userId: string
timestamp: Date
}
const isValidMessage = (event: unknown): event is ChatMessage => {
return typeof event === 'object' && event !== null && typeof (event as any).id === 'string' && typeof (event as any).content === 'string'
}
const { subscription } = createEventSubscription<ChatMessage>({
eventEmitter,
eventName: 'chat-message',
typeGuard: isValidMessage,
transform: (event: any) => ({
...event,
timestamp: new Date(event.timestamp), // Convert to Date object
}),
filter: (event: any) => event.content.trim().length > 0, // Filter empty messages
})
Timeout Handling
Handle slow or missing events with timeouts:
const { subscription } = createEventSubscription({
eventEmitter,
eventName: 'periodic-update',
timeout: 5000, // 5 second timeout
throwOnTimeout: false, // Continue waiting instead of throwing
onError: (error, context) => {
if (error.message.includes('timeout')) {
console.log('No updates received in 5 seconds, continuing...')
}
},
})
Cancellation with AbortSignal
Cancel subscriptions using AbortController:
const abortController = new AbortController()
const { subscription } = createEventSubscription({
eventEmitter,
eventName: 'long-running-process',
signal: abortController.signal,
})
// Cancel after 30 seconds
setTimeout(() => {
abortController.abort()
}, 30000)
try {
for await (const event of subscription) {
console.log('Event:', event)
}
} catch (error) {
if (error.message === 'Subscription cancelled') {
console.log('Subscription was cancelled')
}
}
Performance Monitoring
Real-time Statistics
Monitor subscription performance in real-time:
const { subscription, controller } = createEventSubscription({
eventEmitter,
eventName: 'monitored-event',
})
// Log statistics every 10 seconds
const statsInterval = setInterval(() => {
const stats = controller.getStats()
const runtime = Date.now() - stats.startTime
const eventsPerSecond = stats.eventsProcessed / (runtime / 1000)
console.log('Subscription Stats:', {
runtime: `${runtime}ms`,
eventsProcessed: stats.eventsProcessed,
eventsPerSecond: eventsPerSecond.toFixed(2),
currentQueueSize: stats.currentQueueSize,
errorsEncountered: stats.errorsEncountered,
lastEventTime: stats.lastEventTime ? new Date(stats.lastEventTime) : null,
})
}, 10000)
// Cleanup
try {
yield * subscription
} finally {
clearInterval(statsInterval)
controller.close()
}
Performance Alerts
Set up automatic alerts for performance issues:
const { subscription, controller } = createEventSubscription({
eventEmitter,
eventName: 'critical-event',
maxQueueSize: 1000,
onError: (error, context) => {
// Alert on high error rate
if (context.stats.errorsEncountered > 10) {
alertService.send('High error rate in subscription', context.stats)
}
},
})
// Monitor for performance issues
const healthCheck = setInterval(() => {
const stats = controller.getStats()
// Alert on high queue size
if (stats.currentQueueSize > 800) {
alertService.send('Subscription queue near capacity', stats)
}
// Alert on no recent events (possible producer issue)
if (stats.lastEventTime && Date.now() - stats.lastEventTime > 60000) {
alertService.send('No events received in 60 seconds', stats)
}
}, 5000)
Testing Utilities
createMockEventEmitter
Create a mock EventEmitter for testing:
import { createMockEventEmitter, createSubscriptionTester } from '@nexica/nestjs-trpc'
describe('ChatService', () => {
it('should handle chat messages', async () => {
const { emitter, emit } = createMockEventEmitter()
const { subscription } = createEventSubscription({
eventEmitter: emitter,
eventName: 'chat-message',
})
const tester = createSubscriptionTester(subscription)
// Emit test events
emit('chat-message', { id: '1', content: 'Hello' })
emit('chat-message', { id: '2', content: 'World' })
// Collect results
const messages = await tester.collect(2)
expect(messages).toHaveLength(2)
expect(messages[0]).toEqual({ id: '1', content: 'Hello' })
expect(messages[1]).toEqual({ id: '2', content: 'World' })
tester.close()
})
})
createSubscriptionTester
Utility for testing async iterables:
const tester = createSubscriptionTester(subscription)
// Get next event
const nextEvent = await tester.next()
// Collect multiple events
const events = await tester.collect(5)
// Cleanup
tester.close()
Real-world Examples
Notification System
// Service for managing notifications
@Injectable()
export class NotificationService {
private readonly eventEmitter = new EventEmitter()
getEventEmitter() {
return this.eventEmitter
}
sendNotification(userId: string, notification: any) {
this.eventEmitter.emit(`user:${userId}:notification`, notification)
}
}
// Router with notification subscription
@Router()
export class NotificationRouter {
constructor(
private readonly notificationService: NotificationService,
private readonly logger: Logger
) {}
@Subscription({
input: z.object({
userId: z.string(),
types: z.array(z.string()).optional(),
}),
output: z.object({
id: z.string(),
type: z.string(),
title: z.string(),
message: z.string(),
timestamp: z.date(),
priority: z.enum(['low', 'medium', 'high']),
}),
})
async *onNotification(@Input() input: { userId: string; types?: string[] }, @Context() ctx: RequestContext) {
const { subscription, controller } = createEventSubscription({
eventEmitter: this.notificationService.getEventEmitter(),
eventName: `user:${input.userId}:notification`,
maxQueueSize: 100, // Limit notifications per user
resilientMode: true, // Don't fail on individual notification errors
filter: (event: any) => {
// Filter by notification types if specified
return !input.types || input.types.includes(event.type)
},
transform: (event: any) => ({
...event,
timestamp: new Date(event.timestamp),
}),
typeGuard: (event: unknown): event is NotificationEvent => {
return isValidNotification(event)
},
onError: (error, context) => {
this.logger.error(`Notification error for user ${input.userId}:`, error)
},
})
try {
yield* subscription
} finally {
controller.close()
}
}
}
Live Data Dashboard
// Service for managing dashboard metrics
@Injectable()
export class DashboardService {
private readonly metricsEmitter = new EventEmitter()
getMetricsEmitter() {
return this.metricsEmitter
}
updateDashboard(dashboardId: string, metrics: any) {
this.metricsEmitter.emit(`dashboard:${dashboardId}:update`, metrics)
}
}
// Router with dashboard subscription
@Router()
export class DashboardRouter {
constructor(
private readonly dashboardService: DashboardService,
private readonly monitoringService: MonitoringService
) {}
@Subscription({
input: z.object({
dashboardId: z.string(),
refreshRate: z.number().min(1000).default(5000),
}),
output: z.object({
metrics: z.record(z.number()),
timestamp: z.date(),
status: z.enum(['healthy', 'warning', 'error']),
}),
})
async *onDashboardUpdate(@Input() input: { dashboardId: string; refreshRate: number }, @Context() ctx: RequestContext) {
const abortController = new AbortController()
// Auto-cleanup after 1 hour
setTimeout(() => abortController.abort(), 3600000)
const { subscription, controller } = createEventSubscription({
eventEmitter: this.dashboardService.getMetricsEmitter(),
eventName: `dashboard:${input.dashboardId}:update`,
timeout: input.refreshRate * 2, // Timeout if no updates
throwOnTimeout: false, // Continue waiting
signal: abortController.signal,
maxQueueSize: 10, // Only keep recent updates
dropNewOnFull: true, // Drop old updates for fresh data
transform: (event: any) => ({
...event,
timestamp: new Date(),
}),
onError: (error, context) => {
this.monitoringService.logDashboardError(input.dashboardId, error, context.stats)
},
})
try {
yield* subscription
} finally {
controller.close()
}
}
}
Best Practices
Resource Management
Always clean up subscriptions properly:
async function* mySubscription() {
const { subscription, controller } = createEventSubscription({
eventEmitter,
eventName: 'my-event',
})
try {
yield* subscription
} finally {
// Always clean up
controller.close()
}
}
Error Handling
Use resilient mode for production services:
const { subscription } = createEventSubscription({
eventEmitter,
eventName: 'production-event',
resilientMode: true, // Don't crash on errors
onError: (error, context) => {
// Log to monitoring service
logger.error('Subscription error:', { error, stats: context.stats })
// Send alert if error rate is high
if (context.stats.errorsEncountered > 10) {
alertService.sendAlert('High error rate in subscription')
}
},
})
Performance Optimization
Configure queue sizes based on your use case:
// High-frequency events: smaller queue, drop old events
const { subscription } = createEventSubscription({
eventEmitter,
eventName: 'high-frequency-event',
maxQueueSize: 50,
dropNewOnFull: false, // Drop oldest events
})
// Critical events: larger queue, never drop events
const { subscription } = createEventSubscription({
eventEmitter,
eventName: 'critical-event',
maxQueueSize: 10000,
dropNewOnFull: true, // Never drop events
})
Type Safety
Always use type guards for runtime safety:
const isValidEvent = (event: unknown): event is MyEventType => {
return typeof event === 'object' && event !== null && 'id' in event && 'data' in event
}
const { subscription } = createEventSubscription<MyEventType>({
eventEmitter,
eventName: 'typed-event',
typeGuard: isValidEvent,
onError: (error, context) => {
if (error.message.includes('Type validation failed')) {
logger.warn('Received invalid event data')
}
},
})
Migration from Legacy
From createEventSubscriptionLegacy
The legacy function is deprecated but still available for backward compatibility:
// Old way (deprecated)
async function* oldSubscription() {
yield* createEventSubscriptionLegacy({
eventEmitter,
eventName: 'my-event',
})
}
// New way (recommended)
async function* newSubscription() {
const { subscription, controller } = createEventSubscription({
eventEmitter,
eventName: 'my-event',
})
try {
yield* subscription
} finally {
controller.close()
}
}
Migration Note
The new createEventSubscription
function provides better resource management and additional features. Update your code to use the new API
for improved performance and reliability.
Troubleshooting
Common Issues
Memory Leaks
- Ensure you always call
controller.close()
in a finally block - Set appropriate
maxQueueSize
for your use case - Monitor queue size with
controller.getStats()
Performance Issues
- Use
dropNewOnFull: false
for high-frequency events - Implement efficient
filter
functions - Monitor with
controller.getStats()
Type Errors
- Use
typeGuard
functions for runtime type validation - Implement proper
transform
functions for data conversion - Handle type validation errors in
onError
callback
Connection Issues
- Use
resilientMode: true
for production - Implement proper
onError
handling - Use
timeout
settings for detecting stale connections