Suscripciones
Esta página fue traducida por PageTurner AI (beta). No está respaldada oficialmente por el proyecto. ¿Encontraste un error? Reportar problema →
Introducción
Las suscripciones son un tipo de flujo de eventos en tiempo real entre el cliente y el servidor. Úsalas cuando necesites enviar actualizaciones en tiempo real al cliente.
Con las suscripciones de tRPC, el cliente establece y mantiene una conexión persistente con el servidor, además de intentar reconectarse y recuperarse automáticamente si se desconecta, gracias a los eventos tracked().
¿WebSockets o Eventos Enviados por el Servidor?
Puedes usar WebSockets o Eventos Enviados por el Servidor (SSE) para configurar suscripciones en tiempo real con tRPC.
-
Para WebSockets, consulta la página de WebSockets
-
Para SSE, consulta el httpSubscriptionLink
Si no estás seguro cuál usar, recomendamos SSE para suscripciones porque es más sencillo de configurar y no requiere un servidor WebSocket.
Proyectos de referencia
| Type | Example Type | Link |
|---|---|---|
| WebSockets | Bare-minimum Node.js WebSockets example | /examples/standalone-server |
| SSE | Full-stack SSE implementation | github.com/trpc/examples-next-sse-chat |
| WebSockets | Full-stack WebSockets implementation | github.com/trpc/examples-next-prisma-websockets-starter |
Ejemplo básico
Para un ejemplo completo, consulta nuestro ejemplo full-stack con SSE.
server.tstsimport { initTRPC } from '@trpc/server';const t = initTRPC.create();const ee = new EventEmitter();export const appRouter = router({onPostAdd: publicProcedure.subscription(async function* (opts) {// listen for new eventsfor await (const [data] of on(ee, 'add', {// Passing the AbortSignal from the request automatically cancels the event emitter when the request is abortedsignal: opts.signal,})) {const post = data as Post;yield post;}}),});
server.tstsimport { initTRPC } from '@trpc/server';const t = initTRPC.create();const ee = new EventEmitter();export const appRouter = router({onPostAdd: publicProcedure.subscription(async function* (opts) {// listen for new eventsfor await (const [data] of on(ee, 'add', {// Passing the AbortSignal from the request automatically cancels the event emitter when the request is abortedsignal: opts.signal,})) {const post = data as Post;yield post;}}),});
Seguimiento automático de ID usando tracked() (recomendado)
Si envías (yield) un evento usando nuestro helper tracked() e incluyes un id, el cliente se reconectará automáticamente al desconectarse y enviará el último ID conocido.
Puedes enviar un lastEventId inicial al iniciar la suscripción, que se actualizará automáticamente cuando el navegador reciba datos.
-
En SSE, esto es parte de la especificación
EventSourcey se propagará mediantelastEventIden tu.input(). -
En WebSockets, nuestro
wsLinkenviará automáticamente el último ID conocido y lo actualizará cuando el navegador reciba datos.
Si obtienes datos basados en lastEventId y es crítico capturar todos los eventos, configura el listener de eventos antes de obtener datos de tu base de datos, como en nuestro ejemplo full-stack con SSE. Esto evita que se ignoren eventos nuevos mientras se envían los datos iniciales basados en lastEventId.
tsimport EventEmitter, { on } from 'events';import type { Post } from '@prisma/client';import { tracked } from '@trpc/server';import { z } from 'zod';import { publicProcedure, router } from '../trpc';const ee = new EventEmitter();export const subRouter = router({onPostAdd: publicProcedure.input(z.object({// lastEventId is the last event id that the client has received// On the first call, it will be whatever was passed in the initial setup// If the client reconnects, it will be the last event id that the client receivedlastEventId: z.string().nullish(),}).optional(),).subscription(async function* (opts) {// We start by subscribing to the ee so that we don't miss any new events while fetchingconst iterable = ee.toIterable('add', {// Passing the AbortSignal from the request automatically cancels the event emitter when the request is abortedsignal: opts.signal,});if (opts.input.lastEventId) {// [...] get the posts since the last event id and yield them// const items = await db.post.findMany({ ... })// for (const item of items) {// yield tracked(item.id, item);// }}// listen for new eventsfor await (const [data] of on(ee, 'add', {signal: opts.signal,})) {const post = data as Post;// tracking the post id ensures the client can reconnect at any time and get the latest events this idyield tracked(post.id, post);}}),});
tsimport EventEmitter, { on } from 'events';import type { Post } from '@prisma/client';import { tracked } from '@trpc/server';import { z } from 'zod';import { publicProcedure, router } from '../trpc';const ee = new EventEmitter();export const subRouter = router({onPostAdd: publicProcedure.input(z.object({// lastEventId is the last event id that the client has received// On the first call, it will be whatever was passed in the initial setup// If the client reconnects, it will be the last event id that the client receivedlastEventId: z.string().nullish(),}).optional(),).subscription(async function* (opts) {// We start by subscribing to the ee so that we don't miss any new events while fetchingconst iterable = ee.toIterable('add', {// Passing the AbortSignal from the request automatically cancels the event emitter when the request is abortedsignal: opts.signal,});if (opts.input.lastEventId) {// [...] get the posts since the last event id and yield them// const items = await db.post.findMany({ ... })// for (const item of items) {// yield tracked(item.id, item);// }}// listen for new eventsfor await (const [data] of on(ee, 'add', {signal: opts.signal,})) {const post = data as Post;// tracking the post id ensures the client can reconnect at any time and get the latest events this idyield tracked(post.id, post);}}),});
Obtener datos periódicamente
Esta técnica es útil cuando necesitas verificar periódicamente nuevos datos desde fuentes como una base de datos y enviarlos al cliente.
server.tstsimport type { Post } from '@prisma/client';import { tracked } from '@trpc/server';import { z } from 'zod';import { publicProcedure, router } from '../trpc';export const subRouter = router({onPostAdd: publicProcedure.input(z.object({// lastEventId is the last event id that the client has received// On the first call, it will be whatever was passed in the initial setup// If the client reconnects, it will be the last event id that the client received// The id is the createdAt of the postlastEventId: z.coerce.date().nullish(),}),).subscription(async function* (opts) {// `opts.signal` is an AbortSignal that will be aborted when the client disconnects.let lastEventId = opts.input?.lastEventId ?? null;// We use a `while` loop that checks `!opts.signal.aborted`while (!opts.signal!.aborted) {const posts = await db.post.findMany({// If we have a `lastEventId`, we only fetch posts created after it.where: lastEventId? {createdAt: {gt: lastEventId,},}: undefined,orderBy: {createdAt: 'asc',},});for (const post of posts) {// `tracked` is a helper that sends an `id` with each event.// This allows the client to resume from the last received event upon reconnection.yield tracked(post.createdAt.toJSON(), post);lastEventId = post.createdAt;}// Wait for a bit before polling again to avoid hammering the database.await sleep(1_000);}}),});
server.tstsimport type { Post } from '@prisma/client';import { tracked } from '@trpc/server';import { z } from 'zod';import { publicProcedure, router } from '../trpc';export const subRouter = router({onPostAdd: publicProcedure.input(z.object({// lastEventId is the last event id that the client has received// On the first call, it will be whatever was passed in the initial setup// If the client reconnects, it will be the last event id that the client received// The id is the createdAt of the postlastEventId: z.coerce.date().nullish(),}),).subscription(async function* (opts) {// `opts.signal` is an AbortSignal that will be aborted when the client disconnects.let lastEventId = opts.input?.lastEventId ?? null;// We use a `while` loop that checks `!opts.signal.aborted`while (!opts.signal!.aborted) {const posts = await db.post.findMany({// If we have a `lastEventId`, we only fetch posts created after it.where: lastEventId? {createdAt: {gt: lastEventId,},}: undefined,orderBy: {createdAt: 'asc',},});for (const post of posts) {// `tracked` is a helper that sends an `id` with each event.// This allows the client to resume from the last received event upon reconnection.yield tracked(post.createdAt.toJSON(), post);lastEventId = post.createdAt;}// Wait for a bit before polling again to avoid hammering the database.await sleep(1_000);}}),});
Detener una suscripción desde el servidor
Si necesitas detener una suscripción desde el servidor, simplemente usa return en la función generadora.
tsimport { publicProcedure, router } from '../trpc';export const subRouter = router({onPostAdd: publicProcedure.input(z.object({lastEventId: z.string().coerce.number().min(0).optional(),}),).subscription(async function* (opts) {let index = opts.input.lastEventId ?? 0;while (!opts.signal!.aborted) {const idx = index++;if (idx > 100) {// With this, the subscription will stop and the client will disconnectreturn;}await new Promise((resolve) => setTimeout(resolve, 10));}}}),});
tsimport { publicProcedure, router } from '../trpc';export const subRouter = router({onPostAdd: publicProcedure.input(z.object({lastEventId: z.string().coerce.number().min(0).optional(),}),).subscription(async function* (opts) {let index = opts.input.lastEventId ?? 0;while (!opts.signal!.aborted) {const idx = index++;if (idx > 100) {// With this, the subscription will stop and the client will disconnectreturn;}await new Promise((resolve) => setTimeout(resolve, 10));}}}),});
En el cliente, solo ejecuta .unsubscribe() en la suscripción.
Limpieza de efectos secundarios
Si necesitas limpiar efectos secundarios de tu suscripción, usa el patrón try...finally, ya que trpc invoca .return() en la instancia del generador cuando la suscripción se detiene por cualquier motivo.
tsimport EventEmitter, { on } from 'events';import type { Post } from '@prisma/client';import { z } from 'zod';import { publicProcedure, router } from '../trpc';const ee = new EventEmitter();export const subRouter = router({onPostAdd: publicProcedure.subscription(async function* (opts) {let timeout;try {for await (const [data] of on(ee, 'add', {signal: opts.signal,})) {timeout = setTimeout(() => console.log('Pretend like this is useful'));const post = data as Post;yield post;}} finally {if (timeout) clearTimeout(timeout);}}),});
tsimport EventEmitter, { on } from 'events';import type { Post } from '@prisma/client';import { z } from 'zod';import { publicProcedure, router } from '../trpc';const ee = new EventEmitter();export const subRouter = router({onPostAdd: publicProcedure.subscription(async function* (opts) {let timeout;try {for await (const [data] of on(ee, 'add', {signal: opts.signal,})) {timeout = setTimeout(() => console.log('Pretend like this is useful'));const post = data as Post;yield post;}} finally {if (timeout) clearTimeout(timeout);}}),});
Manejo de errores
Lanzar un error en una función generadora propaga el error al onError() de trpc en el backend.
Si el error es de tipo 5xx, el cliente intentará reconectarse automáticamente usando el último ID de evento rastreado con tracked(). Para otros errores, la suscripción se cancelará y propagará al callback onError().
Validación de salida
Dado que las suscripciones son iteradores asíncronos, debes recorrer el iterador para validar la salida.
Ejemplo con zod v4
zAsyncIterable.tstsimport type { TrackedEnvelope } from '@trpc/server';import { isTrackedEnvelope, tracked } from '@trpc/server';import { z } from 'zod';function isAsyncIterable<TValue, TReturn = unknown>(value: unknown,): value is AsyncIterable<TValue, TReturn> {return !!value && typeof value === 'object' && Symbol.asyncIterator in value;}const trackedEnvelopeSchema =z.custom<TrackedEnvelope<unknown>>(isTrackedEnvelope);/*** A Zod schema helper designed specifically for validating async iterables. This schema ensures that:* 1. The value being validated is an async iterable.* 2. Each item yielded by the async iterable conforms to a specified type.* 3. The return value of the async iterable, if any, also conforms to a specified type.*/export function zAsyncIterable<TYieldIn,TYieldOut,TReturnIn = void,TReturnOut = void,Tracked extends boolean = false,>(opts: {/*** Validate the value yielded by the async generator*/yield: z.ZodType<TYieldIn, TYieldOut>;/*** Validate the return value of the async generator* @remark not applicable for subscriptions*/return?: z.ZodType<TReturnIn, TReturnOut>;/*** Whether if the yielded values are tracked* @remark only applicable for subscriptions*/tracked?: Tracked;}) {return z.custom<AsyncIterable<Tracked extends true ? TrackedEnvelope<TYieldIn> : TYieldIn,TReturnIn>>((val) => isAsyncIterable(val)).transform(async function* (iter) {const iterator = iter[Symbol.asyncIterator]();try {let next;while ((next = await iterator.next()) && !next.done) {if (opts.tracked) {const [id, data] = trackedEnvelopeSchema.parse(next.value);yield tracked(id, await opts.yield.parseAsync(data));continue;}yield opts.yield.parseAsync(next.value);}if (opts.return) {return await opts.return.parseAsync(next.value);}return;} finally {await iterator.return?.();}}) as z.ZodType<AsyncIterable<Tracked extends true ? TrackedEnvelope<TYieldIn> : TYieldIn,TReturnIn,unknown>,AsyncIterable<Tracked extends true ? TrackedEnvelope<TYieldOut> : TYieldOut,TReturnOut,unknown>>;}
zAsyncIterable.tstsimport type { TrackedEnvelope } from '@trpc/server';import { isTrackedEnvelope, tracked } from '@trpc/server';import { z } from 'zod';function isAsyncIterable<TValue, TReturn = unknown>(value: unknown,): value is AsyncIterable<TValue, TReturn> {return !!value && typeof value === 'object' && Symbol.asyncIterator in value;}const trackedEnvelopeSchema =z.custom<TrackedEnvelope<unknown>>(isTrackedEnvelope);/*** A Zod schema helper designed specifically for validating async iterables. This schema ensures that:* 1. The value being validated is an async iterable.* 2. Each item yielded by the async iterable conforms to a specified type.* 3. The return value of the async iterable, if any, also conforms to a specified type.*/export function zAsyncIterable<TYieldIn,TYieldOut,TReturnIn = void,TReturnOut = void,Tracked extends boolean = false,>(opts: {/*** Validate the value yielded by the async generator*/yield: z.ZodType<TYieldIn, TYieldOut>;/*** Validate the return value of the async generator* @remark not applicable for subscriptions*/return?: z.ZodType<TReturnIn, TReturnOut>;/*** Whether if the yielded values are tracked* @remark only applicable for subscriptions*/tracked?: Tracked;}) {return z.custom<AsyncIterable<Tracked extends true ? TrackedEnvelope<TYieldIn> : TYieldIn,TReturnIn>>((val) => isAsyncIterable(val)).transform(async function* (iter) {const iterator = iter[Symbol.asyncIterator]();try {let next;while ((next = await iterator.next()) && !next.done) {if (opts.tracked) {const [id, data] = trackedEnvelopeSchema.parse(next.value);yield tracked(id, await opts.yield.parseAsync(data));continue;}yield opts.yield.parseAsync(next.value);}if (opts.return) {return await opts.return.parseAsync(next.value);}return;} finally {await iterator.return?.();}}) as z.ZodType<AsyncIterable<Tracked extends true ? TrackedEnvelope<TYieldIn> : TYieldIn,TReturnIn,unknown>,AsyncIterable<Tracked extends true ? TrackedEnvelope<TYieldOut> : TYieldOut,TReturnOut,unknown>>;}
Ejemplo con zod v3
zAsyncIterable.tstsimport type { TrackedEnvelope } from '@trpc/server';import { isTrackedEnvelope, tracked } from '@trpc/server';import { z } from 'zod';function isAsyncIterable<TValue, TReturn = unknown>(value: unknown,): value is AsyncIterable<TValue, TReturn> {return !!value && typeof value === 'object' && Symbol.asyncIterator in value;}const trackedEnvelopeSchema =z.custom<TrackedEnvelope<unknown>>(isTrackedEnvelope);/*** A Zod schema helper designed specifically for validating async iterables. This schema ensures that:* 1. The value being validated is an async iterable.* 2. Each item yielded by the async iterable conforms to a specified type.* 3. The return value of the async iterable, if any, also conforms to a specified type.*/export function zAsyncIterable<TYieldIn,TYieldOut,TReturnIn = void,TReturnOut = void,Tracked extends boolean = false,>(opts: {/*** Validate the value yielded by the async generator*/yield: z.ZodType<TYieldIn, any, TYieldOut>;/*** Validate the return value of the async generator* @remark not applicable for subscriptions*/return?: z.ZodType<TReturnIn, any, TReturnOut>;/*** Whether if the yielded values are tracked* @remark only applicable for subscriptions*/tracked?: Tracked;}) {return z.custom<AsyncIterable<Tracked extends true ? TrackedEnvelope<TYieldIn> : TYieldIn,TReturnIn>>((val) => isAsyncIterable(val)).transform(async function* (iter) {const iterator = iter[Symbol.asyncIterator]();try {let next;while ((next = await iterator.next()) && !next.done) {if (opts.tracked) {const [id, data] = trackedEnvelopeSchema.parse(next.value);yield tracked(id, await opts.yield.parseAsync(data));continue;}yield opts.yield.parseAsync(next.value);}if (opts.return) {return await opts.return.parseAsync(next.value);}return;} finally {await iterator.return?.();}}) as z.ZodType<AsyncIterable<Tracked extends true ? TrackedEnvelope<TYieldIn> : TYieldIn,TReturnIn,unknown>,any,AsyncIterable<Tracked extends true ? TrackedEnvelope<TYieldOut> : TYieldOut,TReturnOut,unknown>>;}
zAsyncIterable.tstsimport type { TrackedEnvelope } from '@trpc/server';import { isTrackedEnvelope, tracked } from '@trpc/server';import { z } from 'zod';function isAsyncIterable<TValue, TReturn = unknown>(value: unknown,): value is AsyncIterable<TValue, TReturn> {return !!value && typeof value === 'object' && Symbol.asyncIterator in value;}const trackedEnvelopeSchema =z.custom<TrackedEnvelope<unknown>>(isTrackedEnvelope);/*** A Zod schema helper designed specifically for validating async iterables. This schema ensures that:* 1. The value being validated is an async iterable.* 2. Each item yielded by the async iterable conforms to a specified type.* 3. The return value of the async iterable, if any, also conforms to a specified type.*/export function zAsyncIterable<TYieldIn,TYieldOut,TReturnIn = void,TReturnOut = void,Tracked extends boolean = false,>(opts: {/*** Validate the value yielded by the async generator*/yield: z.ZodType<TYieldIn, any, TYieldOut>;/*** Validate the return value of the async generator* @remark not applicable for subscriptions*/return?: z.ZodType<TReturnIn, any, TReturnOut>;/*** Whether if the yielded values are tracked* @remark only applicable for subscriptions*/tracked?: Tracked;}) {return z.custom<AsyncIterable<Tracked extends true ? TrackedEnvelope<TYieldIn> : TYieldIn,TReturnIn>>((val) => isAsyncIterable(val)).transform(async function* (iter) {const iterator = iter[Symbol.asyncIterator]();try {let next;while ((next = await iterator.next()) && !next.done) {if (opts.tracked) {const [id, data] = trackedEnvelopeSchema.parse(next.value);yield tracked(id, await opts.yield.parseAsync(data));continue;}yield opts.yield.parseAsync(next.value);}if (opts.return) {return await opts.return.parseAsync(next.value);}return;} finally {await iterator.return?.();}}) as z.ZodType<AsyncIterable<Tracked extends true ? TrackedEnvelope<TYieldIn> : TYieldIn,TReturnIn,unknown>,any,AsyncIterable<Tracked extends true ? TrackedEnvelope<TYieldOut> : TYieldOut,TReturnOut,unknown>>;}
Ahora puedes usar este ayudante para validar la salida de tus procedimientos de suscripción:
_app.tstsimport { publicProcedure, router } from '../trpc';import { zAsyncIterable } from './zAsyncIterable';export const appRouter = router({mySubscription: publicProcedure.input(z.object({lastEventId: z.coerce.number().min(0).optional(),}),).output(zAsyncIterable({yield: z.object({count: z.number(),}),tracked: true,}),).subscription(async function* (opts) {let index = opts.input.lastEventId ?? 0;while (true) {index++;yield tracked(index, {count: index,});await new Promise((resolve) => setTimeout(resolve, 1000));}}),});
_app.tstsimport { publicProcedure, router } from '../trpc';import { zAsyncIterable } from './zAsyncIterable';export const appRouter = router({mySubscription: publicProcedure.input(z.object({lastEventId: z.coerce.number().min(0).optional(),}),).output(zAsyncIterable({yield: z.object({count: z.number(),}),tracked: true,}),).subscription(async function* (opts) {let index = opts.input.lastEventId ?? 0;while (true) {index++;yield tracked(index, {count: index,});await new Promise((resolve) => setTimeout(resolve, 1000));}}),});