订阅
本页面由 PageTurner AI 翻译(测试版)。未经项目官方认可。 发现错误? 报告问题 →
简介
订阅是客户端与服务器之间的实时事件流机制。当需要向客户端推送实时更新时,请使用订阅功能。
通过 tRPC 的订阅功能,客户端将与服务器建立并维护持久连接,并在连接中断时借助 tracked() 事件自动尝试优雅重连和恢复。
WebSockets 还是 Server-sent Events?
在 tRPC 中设置实时订阅时,您可以选择使用 WebSockets 或 Server-sent Events(SSE)。
-
关于 WebSockets,请参阅 WebSockets 文档
-
关于 SSE,请参阅 httpSubscriptionLink
若不确定选择哪种方案,我们推荐使用 SSE 实现订阅,因为它更易于设置且无需搭建 WebSocket 服务器。
参考项目
| Type | Example Type | Link |
|---|---|---|
| WebSockets | Bare-minimum Node.js WebSockets example | /examples/standalone-server |
| SSE | Full-stack SSE implementation | github.com/trpc/examples-next-sse-chat |
| WebSockets | Full-stack WebSockets implementation | github.com/trpc/examples-next-prisma-websockets-starter |
基础示例
完整示例请参考 我们的全栈 SSE 示例。
server.tstsimport { initTRPC } from '@trpc/server';const t = initTRPC.create();const ee = new EventEmitter();export const appRouter = router({onPostAdd: publicProcedure.subscription(async function* (opts) {// listen for new eventsfor await (const [data] of on(ee, 'add', {// Passing the AbortSignal from the request automatically cancels the event emitter when the request is abortedsignal: opts.signal,})) {const post = data as Post;yield post;}}),});
server.tstsimport { initTRPC } from '@trpc/server';const t = initTRPC.create();const ee = new EventEmitter();export const appRouter = router({onPostAdd: publicProcedure.subscription(async function* (opts) {// listen for new eventsfor await (const [data] of on(ee, 'add', {// Passing the AbortSignal from the request automatically cancels the event emitter when the request is abortedsignal: opts.signal,})) {const post = data as Post;yield post;}}),});
使用 tracked() 自动追踪 ID(推荐)
当您使用 tracked() 辅助函数 yield 包含 id 的事件时,客户端在连接中断时会自动重连并发送最后已知的 ID。
初始化订阅时可发送初始 lastEventId,浏览器在接收数据时会自动更新该值。
-
对于 SSE,此机制遵循
EventSource规范,并通过.input()中的lastEventId传递 -
对于 WebSockets,我们的
wsLink会自动发送并更新最后已知 ID
若需基于 lastEventId 获取数据且事件完整性至关重要,请务必先设置事件监听器再获取数据库事件(如全栈 SSE 示例所示),避免在基于 lastEventId 生成原始批次时忽略新产生的事件。
tsimport EventEmitter, { on } from 'events';import type { Post } from '@prisma/client';import { tracked } from '@trpc/server';import { z } from 'zod';import { publicProcedure, router } from '../trpc';const ee = new EventEmitter();export const subRouter = router({onPostAdd: publicProcedure.input(z.object({// lastEventId is the last event id that the client has received// On the first call, it will be whatever was passed in the initial setup// If the client reconnects, it will be the last event id that the client receivedlastEventId: z.string().nullish(),}).optional(),).subscription(async function* (opts) {// We start by subscribing to the ee so that we don't miss any new events while fetchingconst iterable = ee.toIterable('add', {// Passing the AbortSignal from the request automatically cancels the event emitter when the request is abortedsignal: opts.signal,});if (opts.input.lastEventId) {// [...] get the posts since the last event id and yield them// const items = await db.post.findMany({ ... })// for (const item of items) {// yield tracked(item.id, item);// }}// listen for new eventsfor await (const [data] of on(ee, 'add', {signal: opts.signal,})) {const post = data as Post;// tracking the post id ensures the client can reconnect at any time and get the latest events this idyield tracked(post.id, post);}}),});
tsimport EventEmitter, { on } from 'events';import type { Post } from '@prisma/client';import { tracked } from '@trpc/server';import { z } from 'zod';import { publicProcedure, router } from '../trpc';const ee = new EventEmitter();export const subRouter = router({onPostAdd: publicProcedure.input(z.object({// lastEventId is the last event id that the client has received// On the first call, it will be whatever was passed in the initial setup// If the client reconnects, it will be the last event id that the client receivedlastEventId: z.string().nullish(),}).optional(),).subscription(async function* (opts) {// We start by subscribing to the ee so that we don't miss any new events while fetchingconst iterable = ee.toIterable('add', {// Passing the AbortSignal from the request automatically cancels the event emitter when the request is abortedsignal: opts.signal,});if (opts.input.lastEventId) {// [...] get the posts since the last event id and yield them// const items = await db.post.findMany({ ... })// for (const item of items) {// yield tracked(item.id, item);// }}// listen for new eventsfor await (const [data] of on(ee, 'add', {signal: opts.signal,})) {const post = data as Post;// tracking the post id ensures the client can reconnect at any time and get the latest events this idyield tracked(post.id, post);}}),});
循环拉取数据
此方案适用于需要定期检查数据库等数据源,并将新数据推送给客户端的场景。
server.tstsimport type { Post } from '@prisma/client';import { tracked } from '@trpc/server';import { z } from 'zod';import { publicProcedure, router } from '../trpc';export const subRouter = router({onPostAdd: publicProcedure.input(z.object({// lastEventId is the last event id that the client has received// On the first call, it will be whatever was passed in the initial setup// If the client reconnects, it will be the last event id that the client received// The id is the createdAt of the postlastEventId: z.coerce.date().nullish(),}),).subscription(async function* (opts) {// `opts.signal` is an AbortSignal that will be aborted when the client disconnects.let lastEventId = opts.input?.lastEventId ?? null;// We use a `while` loop that checks `!opts.signal.aborted`while (!opts.signal!.aborted) {const posts = await db.post.findMany({// If we have a `lastEventId`, we only fetch posts created after it.where: lastEventId? {createdAt: {gt: lastEventId,},}: undefined,orderBy: {createdAt: 'asc',},});for (const post of posts) {// `tracked` is a helper that sends an `id` with each event.// This allows the client to resume from the last received event upon reconnection.yield tracked(post.createdAt.toJSON(), post);lastEventId = post.createdAt;}// Wait for a bit before polling again to avoid hammering the database.await sleep(1_000);}}),});
server.tstsimport type { Post } from '@prisma/client';import { tracked } from '@trpc/server';import { z } from 'zod';import { publicProcedure, router } from '../trpc';export const subRouter = router({onPostAdd: publicProcedure.input(z.object({// lastEventId is the last event id that the client has received// On the first call, it will be whatever was passed in the initial setup// If the client reconnects, it will be the last event id that the client received// The id is the createdAt of the postlastEventId: z.coerce.date().nullish(),}),).subscription(async function* (opts) {// `opts.signal` is an AbortSignal that will be aborted when the client disconnects.let lastEventId = opts.input?.lastEventId ?? null;// We use a `while` loop that checks `!opts.signal.aborted`while (!opts.signal!.aborted) {const posts = await db.post.findMany({// If we have a `lastEventId`, we only fetch posts created after it.where: lastEventId? {createdAt: {gt: lastEventId,},}: undefined,orderBy: {createdAt: 'asc',},});for (const post of posts) {// `tracked` is a helper that sends an `id` with each event.// This allows the client to resume from the last received event upon reconnection.yield tracked(post.createdAt.toJSON(), post);lastEventId = post.createdAt;}// Wait for a bit before polling again to avoid hammering the database.await sleep(1_000);}}),});
从服务器停止订阅
若需从服务器端停止订阅,只需在生成器函数中执行 return。
tsimport { publicProcedure, router } from '../trpc';export const subRouter = router({onPostAdd: publicProcedure.input(z.object({lastEventId: z.string().coerce.number().min(0).optional(),}),).subscription(async function* (opts) {let index = opts.input.lastEventId ?? 0;while (!opts.signal!.aborted) {const idx = index++;if (idx > 100) {// With this, the subscription will stop and the client will disconnectreturn;}await new Promise((resolve) => setTimeout(resolve, 10));}}}),});
tsimport { publicProcedure, router } from '../trpc';export const subRouter = router({onPostAdd: publicProcedure.input(z.object({lastEventId: z.string().coerce.number().min(0).optional(),}),).subscription(async function* (opts) {let index = opts.input.lastEventId ?? 0;while (!opts.signal!.aborted) {const idx = index++;if (idx > 100) {// With this, the subscription will stop and the client will disconnectreturn;}await new Promise((resolve) => setTimeout(resolve, 10));}}}),});
在客户端,直接对订阅执行 .unsubscribe() 即可。
副作用清理
如需清理订阅产生的副作用,可采用 try...finally 模式。当订阅因任何原因终止时,trpc 会调用生成器实例的 .return()。
tsimport EventEmitter, { on } from 'events';import type { Post } from '@prisma/client';import { z } from 'zod';import { publicProcedure, router } from '../trpc';const ee = new EventEmitter();export const subRouter = router({onPostAdd: publicProcedure.subscription(async function* (opts) {let timeout;try {for await (const [data] of on(ee, 'add', {signal: opts.signal,})) {timeout = setTimeout(() => console.log('Pretend like this is useful'));const post = data as Post;yield post;}} finally {if (timeout) clearTimeout(timeout);}}),});
tsimport EventEmitter, { on } from 'events';import type { Post } from '@prisma/client';import { z } from 'zod';import { publicProcedure, router } from '../trpc';const ee = new EventEmitter();export const subRouter = router({onPostAdd: publicProcedure.subscription(async function* (opts) {let timeout;try {for await (const [data] of on(ee, 'add', {signal: opts.signal,})) {timeout = setTimeout(() => console.log('Pretend like this is useful'));const post = data as Post;yield post;}} finally {if (timeout) clearTimeout(timeout);}}),});
错误处理
在生成器函数中抛出错误会传递至后端 trpc 的 onError()。
若抛出 5xx 错误,客户端将基于通过 tracked() 追踪的最后事件 ID 自动重连。其他错误将导致订阅取消并触发 onError() 回调。
输出验证
由于订阅是异步迭代器,必须通过迭代器遍历才能验证输出结果。
zod v4 示例
zAsyncIterable.tstsimport type { TrackedEnvelope } from '@trpc/server';import { isTrackedEnvelope, tracked } from '@trpc/server';import { z } from 'zod';function isAsyncIterable<TValue, TReturn = unknown>(value: unknown,): value is AsyncIterable<TValue, TReturn> {return !!value && typeof value === 'object' && Symbol.asyncIterator in value;}const trackedEnvelopeSchema =z.custom<TrackedEnvelope<unknown>>(isTrackedEnvelope);/*** A Zod schema helper designed specifically for validating async iterables. This schema ensures that:* 1. The value being validated is an async iterable.* 2. Each item yielded by the async iterable conforms to a specified type.* 3. The return value of the async iterable, if any, also conforms to a specified type.*/export function zAsyncIterable<TYieldIn,TYieldOut,TReturnIn = void,TReturnOut = void,Tracked extends boolean = false,>(opts: {/*** Validate the value yielded by the async generator*/yield: z.ZodType<TYieldIn, TYieldOut>;/*** Validate the return value of the async generator* @remark not applicable for subscriptions*/return?: z.ZodType<TReturnIn, TReturnOut>;/*** Whether if the yielded values are tracked* @remark only applicable for subscriptions*/tracked?: Tracked;}) {return z.custom<AsyncIterable<Tracked extends true ? TrackedEnvelope<TYieldIn> : TYieldIn,TReturnIn>>((val) => isAsyncIterable(val)).transform(async function* (iter) {const iterator = iter[Symbol.asyncIterator]();try {let next;while ((next = await iterator.next()) && !next.done) {if (opts.tracked) {const [id, data] = trackedEnvelopeSchema.parse(next.value);yield tracked(id, await opts.yield.parseAsync(data));continue;}yield opts.yield.parseAsync(next.value);}if (opts.return) {return await opts.return.parseAsync(next.value);}return;} finally {await iterator.return?.();}}) as z.ZodType<AsyncIterable<Tracked extends true ? TrackedEnvelope<TYieldIn> : TYieldIn,TReturnIn,unknown>,AsyncIterable<Tracked extends true ? TrackedEnvelope<TYieldOut> : TYieldOut,TReturnOut,unknown>>;}
zAsyncIterable.tstsimport type { TrackedEnvelope } from '@trpc/server';import { isTrackedEnvelope, tracked } from '@trpc/server';import { z } from 'zod';function isAsyncIterable<TValue, TReturn = unknown>(value: unknown,): value is AsyncIterable<TValue, TReturn> {return !!value && typeof value === 'object' && Symbol.asyncIterator in value;}const trackedEnvelopeSchema =z.custom<TrackedEnvelope<unknown>>(isTrackedEnvelope);/*** A Zod schema helper designed specifically for validating async iterables. This schema ensures that:* 1. The value being validated is an async iterable.* 2. Each item yielded by the async iterable conforms to a specified type.* 3. The return value of the async iterable, if any, also conforms to a specified type.*/export function zAsyncIterable<TYieldIn,TYieldOut,TReturnIn = void,TReturnOut = void,Tracked extends boolean = false,>(opts: {/*** Validate the value yielded by the async generator*/yield: z.ZodType<TYieldIn, TYieldOut>;/*** Validate the return value of the async generator* @remark not applicable for subscriptions*/return?: z.ZodType<TReturnIn, TReturnOut>;/*** Whether if the yielded values are tracked* @remark only applicable for subscriptions*/tracked?: Tracked;}) {return z.custom<AsyncIterable<Tracked extends true ? TrackedEnvelope<TYieldIn> : TYieldIn,TReturnIn>>((val) => isAsyncIterable(val)).transform(async function* (iter) {const iterator = iter[Symbol.asyncIterator]();try {let next;while ((next = await iterator.next()) && !next.done) {if (opts.tracked) {const [id, data] = trackedEnvelopeSchema.parse(next.value);yield tracked(id, await opts.yield.parseAsync(data));continue;}yield opts.yield.parseAsync(next.value);}if (opts.return) {return await opts.return.parseAsync(next.value);}return;} finally {await iterator.return?.();}}) as z.ZodType<AsyncIterable<Tracked extends true ? TrackedEnvelope<TYieldIn> : TYieldIn,TReturnIn,unknown>,AsyncIterable<Tracked extends true ? TrackedEnvelope<TYieldOut> : TYieldOut,TReturnOut,unknown>>;}
zod v3 示例
zAsyncIterable.tstsimport type { TrackedEnvelope } from '@trpc/server';import { isTrackedEnvelope, tracked } from '@trpc/server';import { z } from 'zod';function isAsyncIterable<TValue, TReturn = unknown>(value: unknown,): value is AsyncIterable<TValue, TReturn> {return !!value && typeof value === 'object' && Symbol.asyncIterator in value;}const trackedEnvelopeSchema =z.custom<TrackedEnvelope<unknown>>(isTrackedEnvelope);/*** A Zod schema helper designed specifically for validating async iterables. This schema ensures that:* 1. The value being validated is an async iterable.* 2. Each item yielded by the async iterable conforms to a specified type.* 3. The return value of the async iterable, if any, also conforms to a specified type.*/export function zAsyncIterable<TYieldIn,TYieldOut,TReturnIn = void,TReturnOut = void,Tracked extends boolean = false,>(opts: {/*** Validate the value yielded by the async generator*/yield: z.ZodType<TYieldIn, any, TYieldOut>;/*** Validate the return value of the async generator* @remark not applicable for subscriptions*/return?: z.ZodType<TReturnIn, any, TReturnOut>;/*** Whether if the yielded values are tracked* @remark only applicable for subscriptions*/tracked?: Tracked;}) {return z.custom<AsyncIterable<Tracked extends true ? TrackedEnvelope<TYieldIn> : TYieldIn,TReturnIn>>((val) => isAsyncIterable(val)).transform(async function* (iter) {const iterator = iter[Symbol.asyncIterator]();try {let next;while ((next = await iterator.next()) && !next.done) {if (opts.tracked) {const [id, data] = trackedEnvelopeSchema.parse(next.value);yield tracked(id, await opts.yield.parseAsync(data));continue;}yield opts.yield.parseAsync(next.value);}if (opts.return) {return await opts.return.parseAsync(next.value);}return;} finally {await iterator.return?.();}}) as z.ZodType<AsyncIterable<Tracked extends true ? TrackedEnvelope<TYieldIn> : TYieldIn,TReturnIn,unknown>,any,AsyncIterable<Tracked extends true ? TrackedEnvelope<TYieldOut> : TYieldOut,TReturnOut,unknown>>;}
zAsyncIterable.tstsimport type { TrackedEnvelope } from '@trpc/server';import { isTrackedEnvelope, tracked } from '@trpc/server';import { z } from 'zod';function isAsyncIterable<TValue, TReturn = unknown>(value: unknown,): value is AsyncIterable<TValue, TReturn> {return !!value && typeof value === 'object' && Symbol.asyncIterator in value;}const trackedEnvelopeSchema =z.custom<TrackedEnvelope<unknown>>(isTrackedEnvelope);/*** A Zod schema helper designed specifically for validating async iterables. This schema ensures that:* 1. The value being validated is an async iterable.* 2. Each item yielded by the async iterable conforms to a specified type.* 3. The return value of the async iterable, if any, also conforms to a specified type.*/export function zAsyncIterable<TYieldIn,TYieldOut,TReturnIn = void,TReturnOut = void,Tracked extends boolean = false,>(opts: {/*** Validate the value yielded by the async generator*/yield: z.ZodType<TYieldIn, any, TYieldOut>;/*** Validate the return value of the async generator* @remark not applicable for subscriptions*/return?: z.ZodType<TReturnIn, any, TReturnOut>;/*** Whether if the yielded values are tracked* @remark only applicable for subscriptions*/tracked?: Tracked;}) {return z.custom<AsyncIterable<Tracked extends true ? TrackedEnvelope<TYieldIn> : TYieldIn,TReturnIn>>((val) => isAsyncIterable(val)).transform(async function* (iter) {const iterator = iter[Symbol.asyncIterator]();try {let next;while ((next = await iterator.next()) && !next.done) {if (opts.tracked) {const [id, data] = trackedEnvelopeSchema.parse(next.value);yield tracked(id, await opts.yield.parseAsync(data));continue;}yield opts.yield.parseAsync(next.value);}if (opts.return) {return await opts.return.parseAsync(next.value);}return;} finally {await iterator.return?.();}}) as z.ZodType<AsyncIterable<Tracked extends true ? TrackedEnvelope<TYieldIn> : TYieldIn,TReturnIn,unknown>,any,AsyncIterable<Tracked extends true ? TrackedEnvelope<TYieldOut> : TYieldOut,TReturnOut,unknown>>;}
现在你可以使用这个辅助函数来验证订阅过程的输出:
_app.tstsimport { publicProcedure, router } from '../trpc';import { zAsyncIterable } from './zAsyncIterable';export const appRouter = router({mySubscription: publicProcedure.input(z.object({lastEventId: z.coerce.number().min(0).optional(),}),).output(zAsyncIterable({yield: z.object({count: z.number(),}),tracked: true,}),).subscription(async function* (opts) {let index = opts.input.lastEventId ?? 0;while (true) {index++;yield tracked(index, {count: index,});await new Promise((resolve) => setTimeout(resolve, 1000));}}),});
_app.tstsimport { publicProcedure, router } from '../trpc';import { zAsyncIterable } from './zAsyncIterable';export const appRouter = router({mySubscription: publicProcedure.input(z.object({lastEventId: z.coerce.number().min(0).optional(),}),).output(zAsyncIterable({yield: z.object({count: z.number(),}),tracked: true,}),).subscription(async function* (opts) {let index = opts.input.lastEventId ?? 0;while (true) {index++;yield tracked(index, {count: index,});await new Promise((resolve) => setTimeout(resolve, 1000));}}),});