Aller au contenu principal
Version : 11.x

Abonnements

Traduction Bêta Non Officielle

Cette page a été traduite par PageTurner AI (bêta). Non approuvée officiellement par le projet. Vous avez trouvé une erreur ? Signaler un problème →

Introduction

Les abonnements sont un flux d'événements en temps réel entre le client et le serveur. Utilisez-les lorsque vous devez envoyer des mises à jour en temps réel au client.

Avec les abonnements tRPC, le client établit et maintient une connexion persistante au serveur, et tente automatiquement de se reconnecter et de récupérer normalement en cas de déconnexion grâce aux événements tracked().

WebSockets ou Server-sent Events ?

Vous pouvez utiliser soit WebSockets soit les Server-sent Events (SSE) pour configurer des abonnements temps réel dans tRPC.

Si vous ne savez pas lequel choisir, nous recommandons les SSE pour les abonnements car ils sont plus simples à configurer et ne nécessitent pas de serveur WebSocket.

Projets de référence

TypeExample TypeLink
WebSocketsBare-minimum Node.js WebSockets example/examples/standalone-server
SSEFull-stack SSE implementationgithub.com/trpc/examples-next-sse-chat
WebSocketsFull-stack WebSockets implementationgithub.com/trpc/examples-next-prisma-websockets-starter

Exemple de base

astuce

Pour un exemple complet, consultez notre exemple full-stack SSE.

server.ts
ts
import { initTRPC } from '@trpc/server';
const t = initTRPC.create();
const ee = new EventEmitter();
export const appRouter = router({
onPostAdd: publicProcedure.subscription(async function* (opts) {
// listen for new events
for await (const [data] of on(ee, 'add', {
// Passing the AbortSignal from the request automatically cancels the event emitter when the request is aborted
signal: opts.signal,
})) {
const post = data as Post;
yield post;
}
}),
});
server.ts
ts
import { initTRPC } from '@trpc/server';
const t = initTRPC.create();
const ee = new EventEmitter();
export const appRouter = router({
onPostAdd: publicProcedure.subscription(async function* (opts) {
// listen for new events
for await (const [data] of on(ee, 'add', {
// Passing the AbortSignal from the request automatically cancels the event emitter when the request is aborted
signal: opts.signal,
})) {
const post = data as Post;
yield post;
}
}),
});

Suivi automatique de l'ID avec tracked() (recommandé)

Si vous utilisez yield avec notre helper tracked() en incluant un id, le client se reconnectera automatiquement après une déconnexion et enverra le dernier ID connu.

Vous pouvez envoyer un lastEventId initial lors de l'initialisation de l'abonnement, qui sera automatiquement mis à jour au fur et à mesure que le navigateur reçoit des données.

  • Pour SSE, ceci fait partie de la spécification EventSource et sera propagé via lastEventId dans votre .input().

  • Pour WebSockets, notre wsLink enverra automatiquement le dernier ID connu et le mettra à jour à mesure que les données arrivent.

astuce

Si vous récupérez des données basées sur le lastEventId et que la capture de tous les événements est critique, assurez-vous de configurer l'écouteur d'événements avant d'extraire les données de votre base, comme dans notre exemple full-stack SSE. Cela évite d'ignorer de nouveaux événements pendant le yield du lot initial basé sur lastEventId.

ts
import EventEmitter, { on } from 'events';
import type { Post } from '@prisma/client';
import { tracked } from '@trpc/server';
import { z } from 'zod';
import { publicProcedure, router } from '../trpc';
const ee = new EventEmitter();
export const subRouter = router({
onPostAdd: publicProcedure
.input(
z
.object({
// lastEventId is the last event id that the client has received
// On the first call, it will be whatever was passed in the initial setup
// If the client reconnects, it will be the last event id that the client received
lastEventId: z.string().nullish(),
})
.optional(),
)
.subscription(async function* (opts) {
// We start by subscribing to the ee so that we don't miss any new events while fetching
const iterable = ee.toIterable('add', {
// Passing the AbortSignal from the request automatically cancels the event emitter when the request is aborted
signal: opts.signal,
});
if (opts.input.lastEventId) {
// [...] get the posts since the last event id and yield them
// const items = await db.post.findMany({ ... })
// for (const item of items) {
// yield tracked(item.id, item);
// }
}
// listen for new events
for await (const [data] of on(ee, 'add', {
signal: opts.signal,
})) {
const post = data as Post;
// tracking the post id ensures the client can reconnect at any time and get the latest events this id
yield tracked(post.id, post);
}
}),
});
ts
import EventEmitter, { on } from 'events';
import type { Post } from '@prisma/client';
import { tracked } from '@trpc/server';
import { z } from 'zod';
import { publicProcedure, router } from '../trpc';
const ee = new EventEmitter();
export const subRouter = router({
onPostAdd: publicProcedure
.input(
z
.object({
// lastEventId is the last event id that the client has received
// On the first call, it will be whatever was passed in the initial setup
// If the client reconnects, it will be the last event id that the client received
lastEventId: z.string().nullish(),
})
.optional(),
)
.subscription(async function* (opts) {
// We start by subscribing to the ee so that we don't miss any new events while fetching
const iterable = ee.toIterable('add', {
// Passing the AbortSignal from the request automatically cancels the event emitter when the request is aborted
signal: opts.signal,
});
if (opts.input.lastEventId) {
// [...] get the posts since the last event id and yield them
// const items = await db.post.findMany({ ... })
// for (const item of items) {
// yield tracked(item.id, item);
// }
}
// listen for new events
for await (const [data] of on(ee, 'add', {
signal: opts.signal,
})) {
const post = data as Post;
// tracking the post id ensures the client can reconnect at any time and get the latest events this id
yield tracked(post.id, post);
}
}),
});

Extraction périodique de données

Cette méthode est utile lorsque vous souhaitez vérifier périodiquement de nouvelles données depuis une source (comme une base de données) et les envoyer au client.

server.ts
ts
import type { Post } from '@prisma/client';
import { tracked } from '@trpc/server';
import { z } from 'zod';
import { publicProcedure, router } from '../trpc';
export const subRouter = router({
onPostAdd: publicProcedure
.input(
z.object({
// lastEventId is the last event id that the client has received
// On the first call, it will be whatever was passed in the initial setup
// If the client reconnects, it will be the last event id that the client received
// The id is the createdAt of the post
lastEventId: z.coerce.date().nullish(),
}),
)
.subscription(async function* (opts) {
// `opts.signal` is an AbortSignal that will be aborted when the client disconnects.
let lastEventId = opts.input?.lastEventId ?? null;
// We use a `while` loop that checks `!opts.signal.aborted`
while (!opts.signal!.aborted) {
const posts = await db.post.findMany({
// If we have a `lastEventId`, we only fetch posts created after it.
where: lastEventId
? {
createdAt: {
gt: lastEventId,
},
}
: undefined,
orderBy: {
createdAt: 'asc',
},
});
for (const post of posts) {
// `tracked` is a helper that sends an `id` with each event.
// This allows the client to resume from the last received event upon reconnection.
yield tracked(post.createdAt.toJSON(), post);
lastEventId = post.createdAt;
}
// Wait for a bit before polling again to avoid hammering the database.
await sleep(1_000);
}
}),
});
server.ts
ts
import type { Post } from '@prisma/client';
import { tracked } from '@trpc/server';
import { z } from 'zod';
import { publicProcedure, router } from '../trpc';
export const subRouter = router({
onPostAdd: publicProcedure
.input(
z.object({
// lastEventId is the last event id that the client has received
// On the first call, it will be whatever was passed in the initial setup
// If the client reconnects, it will be the last event id that the client received
// The id is the createdAt of the post
lastEventId: z.coerce.date().nullish(),
}),
)
.subscription(async function* (opts) {
// `opts.signal` is an AbortSignal that will be aborted when the client disconnects.
let lastEventId = opts.input?.lastEventId ?? null;
// We use a `while` loop that checks `!opts.signal.aborted`
while (!opts.signal!.aborted) {
const posts = await db.post.findMany({
// If we have a `lastEventId`, we only fetch posts created after it.
where: lastEventId
? {
createdAt: {
gt: lastEventId,
},
}
: undefined,
orderBy: {
createdAt: 'asc',
},
});
for (const post of posts) {
// `tracked` is a helper that sends an `id` with each event.
// This allows the client to resume from the last received event upon reconnection.
yield tracked(post.createdAt.toJSON(), post);
lastEventId = post.createdAt;
}
// Wait for a bit before polling again to avoid hammering the database.
await sleep(1_000);
}
}),
});

Arrêt d'un abonnement depuis le serveur

Pour arrêter un abonnement depuis le serveur, utilisez simplement return dans la fonction génératrice.

ts
import { publicProcedure, router } from '../trpc';
export const subRouter = router({
onPostAdd: publicProcedure
.input(
z.object({
lastEventId: z.string().coerce.number().min(0).optional(),
}),
)
.subscription(async function* (opts) {
let index = opts.input.lastEventId ?? 0;
while (!opts.signal!.aborted) {
const idx = index++;
if (idx > 100) {
// With this, the subscription will stop and the client will disconnect
return;
}
await new Promise((resolve) => setTimeout(resolve, 10));
}
}
}),
});
ts
import { publicProcedure, router } from '../trpc';
export const subRouter = router({
onPostAdd: publicProcedure
.input(
z.object({
lastEventId: z.string().coerce.number().min(0).optional(),
}),
)
.subscription(async function* (opts) {
let index = opts.input.lastEventId ?? 0;
while (!opts.signal!.aborted) {
const idx = index++;
if (idx > 100) {
// With this, the subscription will stop and the client will disconnect
return;
}
await new Promise((resolve) => setTimeout(resolve, 10));
}
}
}),
});

Côté client, appelez .unsubscribe() sur l'abonnement.

Nettoyage des effets secondaires

Pour nettoyer les effets secondaires de votre abonnement, utilisez le pattern try...finally, car trpc appelle .return() sur l'instance du générateur lorsque l'abonnement s'arrête pour n'importe quelle raison.

ts
import EventEmitter, { on } from 'events';
import type { Post } from '@prisma/client';
import { z } from 'zod';
import { publicProcedure, router } from '../trpc';
const ee = new EventEmitter();
export const subRouter = router({
onPostAdd: publicProcedure.subscription(async function* (opts) {
let timeout;
try {
for await (const [data] of on(ee, 'add', {
signal: opts.signal,
})) {
timeout = setTimeout(() => console.log('Pretend like this is useful'));
const post = data as Post;
yield post;
}
} finally {
if (timeout) clearTimeout(timeout);
}
}),
});
ts
import EventEmitter, { on } from 'events';
import type { Post } from '@prisma/client';
import { z } from 'zod';
import { publicProcedure, router } from '../trpc';
const ee = new EventEmitter();
export const subRouter = router({
onPostAdd: publicProcedure.subscription(async function* (opts) {
let timeout;
try {
for await (const [data] of on(ee, 'add', {
signal: opts.signal,
})) {
timeout = setTimeout(() => console.log('Pretend like this is useful'));
const post = data as Post;
yield post;
}
} finally {
if (timeout) clearTimeout(timeout);
}
}),
});

Gestion des erreurs

Lancer une erreur dans une fonction génératrice la propage vers onError() de trpc côté serveur.

Pour les erreurs 5xx, le client tentera automatiquement de se reconnecter en utilisant le dernier event ID suivi via tracked(). Pour les autres erreurs, l'abonnement sera annulé et propagé au callback onError().

Validation de sortie

Comme les abonnements sont des itérateurs asynchrones, vous devez parcourir l'itérateur pour valider la sortie.

Exemple avec zod v4

zAsyncIterable.ts
ts
import type { TrackedEnvelope } from '@trpc/server';
import { isTrackedEnvelope, tracked } from '@trpc/server';
import { z } from 'zod';
function isAsyncIterable<TValue, TReturn = unknown>(
value: unknown,
): value is AsyncIterable<TValue, TReturn> {
return !!value && typeof value === 'object' && Symbol.asyncIterator in value;
}
const trackedEnvelopeSchema =
z.custom<TrackedEnvelope<unknown>>(isTrackedEnvelope);
/**
* A Zod schema helper designed specifically for validating async iterables. This schema ensures that:
* 1. The value being validated is an async iterable.
* 2. Each item yielded by the async iterable conforms to a specified type.
* 3. The return value of the async iterable, if any, also conforms to a specified type.
*/
export function zAsyncIterable<
TYieldIn,
TYieldOut,
TReturnIn = void,
TReturnOut = void,
Tracked extends boolean = false,
>(opts: {
/**
* Validate the value yielded by the async generator
*/
yield: z.ZodType<TYieldIn, TYieldOut>;
/**
* Validate the return value of the async generator
* @remark not applicable for subscriptions
*/
return?: z.ZodType<TReturnIn, TReturnOut>;
/**
* Whether if the yielded values are tracked
* @remark only applicable for subscriptions
*/
tracked?: Tracked;
}) {
return z
.custom<
AsyncIterable<
Tracked extends true ? TrackedEnvelope<TYieldIn> : TYieldIn,
TReturnIn
>
>((val) => isAsyncIterable(val))
.transform(async function* (iter) {
const iterator = iter[Symbol.asyncIterator]();
try {
let next;
while ((next = await iterator.next()) && !next.done) {
if (opts.tracked) {
const [id, data] = trackedEnvelopeSchema.parse(next.value);
yield tracked(id, await opts.yield.parseAsync(data));
continue;
}
yield opts.yield.parseAsync(next.value);
}
if (opts.return) {
return await opts.return.parseAsync(next.value);
}
return;
} finally {
await iterator.return?.();
}
}) as z.ZodType<
AsyncIterable<
Tracked extends true ? TrackedEnvelope<TYieldIn> : TYieldIn,
TReturnIn,
unknown
>,
AsyncIterable<
Tracked extends true ? TrackedEnvelope<TYieldOut> : TYieldOut,
TReturnOut,
unknown
>
>;
}
zAsyncIterable.ts
ts
import type { TrackedEnvelope } from '@trpc/server';
import { isTrackedEnvelope, tracked } from '@trpc/server';
import { z } from 'zod';
function isAsyncIterable<TValue, TReturn = unknown>(
value: unknown,
): value is AsyncIterable<TValue, TReturn> {
return !!value && typeof value === 'object' && Symbol.asyncIterator in value;
}
const trackedEnvelopeSchema =
z.custom<TrackedEnvelope<unknown>>(isTrackedEnvelope);
/**
* A Zod schema helper designed specifically for validating async iterables. This schema ensures that:
* 1. The value being validated is an async iterable.
* 2. Each item yielded by the async iterable conforms to a specified type.
* 3. The return value of the async iterable, if any, also conforms to a specified type.
*/
export function zAsyncIterable<
TYieldIn,
TYieldOut,
TReturnIn = void,
TReturnOut = void,
Tracked extends boolean = false,
>(opts: {
/**
* Validate the value yielded by the async generator
*/
yield: z.ZodType<TYieldIn, TYieldOut>;
/**
* Validate the return value of the async generator
* @remark not applicable for subscriptions
*/
return?: z.ZodType<TReturnIn, TReturnOut>;
/**
* Whether if the yielded values are tracked
* @remark only applicable for subscriptions
*/
tracked?: Tracked;
}) {
return z
.custom<
AsyncIterable<
Tracked extends true ? TrackedEnvelope<TYieldIn> : TYieldIn,
TReturnIn
>
>((val) => isAsyncIterable(val))
.transform(async function* (iter) {
const iterator = iter[Symbol.asyncIterator]();
try {
let next;
while ((next = await iterator.next()) && !next.done) {
if (opts.tracked) {
const [id, data] = trackedEnvelopeSchema.parse(next.value);
yield tracked(id, await opts.yield.parseAsync(data));
continue;
}
yield opts.yield.parseAsync(next.value);
}
if (opts.return) {
return await opts.return.parseAsync(next.value);
}
return;
} finally {
await iterator.return?.();
}
}) as z.ZodType<
AsyncIterable<
Tracked extends true ? TrackedEnvelope<TYieldIn> : TYieldIn,
TReturnIn,
unknown
>,
AsyncIterable<
Tracked extends true ? TrackedEnvelope<TYieldOut> : TYieldOut,
TReturnOut,
unknown
>
>;
}

Exemple avec zod v3

zAsyncIterable.ts
ts
import type { TrackedEnvelope } from '@trpc/server';
import { isTrackedEnvelope, tracked } from '@trpc/server';
import { z } from 'zod';
function isAsyncIterable<TValue, TReturn = unknown>(
value: unknown,
): value is AsyncIterable<TValue, TReturn> {
return !!value && typeof value === 'object' && Symbol.asyncIterator in value;
}
const trackedEnvelopeSchema =
z.custom<TrackedEnvelope<unknown>>(isTrackedEnvelope);
/**
* A Zod schema helper designed specifically for validating async iterables. This schema ensures that:
* 1. The value being validated is an async iterable.
* 2. Each item yielded by the async iterable conforms to a specified type.
* 3. The return value of the async iterable, if any, also conforms to a specified type.
*/
export function zAsyncIterable<
TYieldIn,
TYieldOut,
TReturnIn = void,
TReturnOut = void,
Tracked extends boolean = false,
>(opts: {
/**
* Validate the value yielded by the async generator
*/
yield: z.ZodType<TYieldIn, any, TYieldOut>;
/**
* Validate the return value of the async generator
* @remark not applicable for subscriptions
*/
return?: z.ZodType<TReturnIn, any, TReturnOut>;
/**
* Whether if the yielded values are tracked
* @remark only applicable for subscriptions
*/
tracked?: Tracked;
}) {
return z
.custom<
AsyncIterable<
Tracked extends true ? TrackedEnvelope<TYieldIn> : TYieldIn,
TReturnIn
>
>((val) => isAsyncIterable(val))
.transform(async function* (iter) {
const iterator = iter[Symbol.asyncIterator]();
try {
let next;
while ((next = await iterator.next()) && !next.done) {
if (opts.tracked) {
const [id, data] = trackedEnvelopeSchema.parse(next.value);
yield tracked(id, await opts.yield.parseAsync(data));
continue;
}
yield opts.yield.parseAsync(next.value);
}
if (opts.return) {
return await opts.return.parseAsync(next.value);
}
return;
} finally {
await iterator.return?.();
}
}) as z.ZodType<
AsyncIterable<
Tracked extends true ? TrackedEnvelope<TYieldIn> : TYieldIn,
TReturnIn,
unknown
>,
any,
AsyncIterable<
Tracked extends true ? TrackedEnvelope<TYieldOut> : TYieldOut,
TReturnOut,
unknown
>
>;
}
zAsyncIterable.ts
ts
import type { TrackedEnvelope } from '@trpc/server';
import { isTrackedEnvelope, tracked } from '@trpc/server';
import { z } from 'zod';
function isAsyncIterable<TValue, TReturn = unknown>(
value: unknown,
): value is AsyncIterable<TValue, TReturn> {
return !!value && typeof value === 'object' && Symbol.asyncIterator in value;
}
const trackedEnvelopeSchema =
z.custom<TrackedEnvelope<unknown>>(isTrackedEnvelope);
/**
* A Zod schema helper designed specifically for validating async iterables. This schema ensures that:
* 1. The value being validated is an async iterable.
* 2. Each item yielded by the async iterable conforms to a specified type.
* 3. The return value of the async iterable, if any, also conforms to a specified type.
*/
export function zAsyncIterable<
TYieldIn,
TYieldOut,
TReturnIn = void,
TReturnOut = void,
Tracked extends boolean = false,
>(opts: {
/**
* Validate the value yielded by the async generator
*/
yield: z.ZodType<TYieldIn, any, TYieldOut>;
/**
* Validate the return value of the async generator
* @remark not applicable for subscriptions
*/
return?: z.ZodType<TReturnIn, any, TReturnOut>;
/**
* Whether if the yielded values are tracked
* @remark only applicable for subscriptions
*/
tracked?: Tracked;
}) {
return z
.custom<
AsyncIterable<
Tracked extends true ? TrackedEnvelope<TYieldIn> : TYieldIn,
TReturnIn
>
>((val) => isAsyncIterable(val))
.transform(async function* (iter) {
const iterator = iter[Symbol.asyncIterator]();
try {
let next;
while ((next = await iterator.next()) && !next.done) {
if (opts.tracked) {
const [id, data] = trackedEnvelopeSchema.parse(next.value);
yield tracked(id, await opts.yield.parseAsync(data));
continue;
}
yield opts.yield.parseAsync(next.value);
}
if (opts.return) {
return await opts.return.parseAsync(next.value);
}
return;
} finally {
await iterator.return?.();
}
}) as z.ZodType<
AsyncIterable<
Tracked extends true ? TrackedEnvelope<TYieldIn> : TYieldIn,
TReturnIn,
unknown
>,
any,
AsyncIterable<
Tracked extends true ? TrackedEnvelope<TYieldOut> : TYieldOut,
TReturnOut,
unknown
>
>;
}

Vous pouvez maintenant utiliser cet assistant pour valider la sortie de vos procédures d'abonnement :

_app.ts
ts
import { publicProcedure, router } from '../trpc';
import { zAsyncIterable } from './zAsyncIterable';
export const appRouter = router({
mySubscription: publicProcedure
.input(
z.object({
lastEventId: z.coerce.number().min(0).optional(),
}),
)
.output(
zAsyncIterable({
yield: z.object({
count: z.number(),
}),
tracked: true,
}),
)
.subscription(async function* (opts) {
let index = opts.input.lastEventId ?? 0;
while (true) {
index++;
yield tracked(index, {
count: index,
});
await new Promise((resolve) => setTimeout(resolve, 1000));
}
}),
});
_app.ts
ts
import { publicProcedure, router } from '../trpc';
import { zAsyncIterable } from './zAsyncIterable';
export const appRouter = router({
mySubscription: publicProcedure
.input(
z.object({
lastEventId: z.coerce.number().min(0).optional(),
}),
)
.output(
zAsyncIterable({
yield: z.object({
count: z.number(),
}),
tracked: true,
}),
)
.subscription(async function* (opts) {
let index = opts.input.lastEventId ?? 0;
while (true) {
index++;
yield tracked(index, {
count: index,
});
await new Promise((resolve) => setTimeout(resolve, 1000));
}
}),
});