跳至主内容
版本:11.x

订阅

非官方测试版翻译

本页面由 PageTurner AI 翻译(测试版)。未经项目官方认可。 发现错误? 报告问题 →

简介

订阅是客户端与服务器之间的实时事件流机制。当需要向客户端推送实时更新时,请使用订阅功能。

通过 tRPC 的订阅功能,客户端将与服务器建立并维护持久连接,并在连接中断时借助 tracked() 事件自动尝试优雅重连和恢复。

WebSockets 还是 Server-sent Events?

在 tRPC 中设置实时订阅时,您可以选择使用 WebSockets 或 Server-sent Events(SSE)。

若不确定选择哪种方案,我们推荐使用 SSE 实现订阅,因为它更易于设置且无需搭建 WebSocket 服务器。

参考项目

TypeExample TypeLink
WebSocketsBare-minimum Node.js WebSockets example/examples/standalone-server
SSEFull-stack SSE implementationgithub.com/trpc/examples-next-sse-chat
WebSocketsFull-stack WebSockets implementationgithub.com/trpc/examples-next-prisma-websockets-starter

基础示例

技巧

完整示例请参考 我们的全栈 SSE 示例

server.ts
ts
import { initTRPC } from '@trpc/server';
const t = initTRPC.create();
const ee = new EventEmitter();
export const appRouter = router({
onPostAdd: publicProcedure.subscription(async function* (opts) {
// listen for new events
for await (const [data] of on(ee, 'add', {
// Passing the AbortSignal from the request automatically cancels the event emitter when the request is aborted
signal: opts.signal,
})) {
const post = data as Post;
yield post;
}
}),
});
server.ts
ts
import { initTRPC } from '@trpc/server';
const t = initTRPC.create();
const ee = new EventEmitter();
export const appRouter = router({
onPostAdd: publicProcedure.subscription(async function* (opts) {
// listen for new events
for await (const [data] of on(ee, 'add', {
// Passing the AbortSignal from the request automatically cancels the event emitter when the request is aborted
signal: opts.signal,
})) {
const post = data as Post;
yield post;
}
}),
});

使用 tracked() 自动追踪 ID(推荐)

当您使用 tracked() 辅助函数 yield 包含 id 的事件时,客户端在连接中断时会自动重连并发送最后已知的 ID。

初始化订阅时可发送初始 lastEventId,浏览器在接收数据时会自动更新该值。

  • 对于 SSE,此机制遵循 EventSource 规范,并通过 .input() 中的 lastEventId 传递

  • 对于 WebSockets,我们的 wsLink 会自动发送并更新最后已知 ID

技巧

若需基于 lastEventId 获取数据且事件完整性至关重要,请务必先设置事件监听器再获取数据库事件(如全栈 SSE 示例所示),避免在基于 lastEventId 生成原始批次时忽略新产生的事件。

ts
import EventEmitter, { on } from 'events';
import type { Post } from '@prisma/client';
import { tracked } from '@trpc/server';
import { z } from 'zod';
import { publicProcedure, router } from '../trpc';
const ee = new EventEmitter();
export const subRouter = router({
onPostAdd: publicProcedure
.input(
z
.object({
// lastEventId is the last event id that the client has received
// On the first call, it will be whatever was passed in the initial setup
// If the client reconnects, it will be the last event id that the client received
lastEventId: z.string().nullish(),
})
.optional(),
)
.subscription(async function* (opts) {
// We start by subscribing to the ee so that we don't miss any new events while fetching
const iterable = ee.toIterable('add', {
// Passing the AbortSignal from the request automatically cancels the event emitter when the request is aborted
signal: opts.signal,
});
if (opts.input.lastEventId) {
// [...] get the posts since the last event id and yield them
// const items = await db.post.findMany({ ... })
// for (const item of items) {
// yield tracked(item.id, item);
// }
}
// listen for new events
for await (const [data] of on(ee, 'add', {
signal: opts.signal,
})) {
const post = data as Post;
// tracking the post id ensures the client can reconnect at any time and get the latest events this id
yield tracked(post.id, post);
}
}),
});
ts
import EventEmitter, { on } from 'events';
import type { Post } from '@prisma/client';
import { tracked } from '@trpc/server';
import { z } from 'zod';
import { publicProcedure, router } from '../trpc';
const ee = new EventEmitter();
export const subRouter = router({
onPostAdd: publicProcedure
.input(
z
.object({
// lastEventId is the last event id that the client has received
// On the first call, it will be whatever was passed in the initial setup
// If the client reconnects, it will be the last event id that the client received
lastEventId: z.string().nullish(),
})
.optional(),
)
.subscription(async function* (opts) {
// We start by subscribing to the ee so that we don't miss any new events while fetching
const iterable = ee.toIterable('add', {
// Passing the AbortSignal from the request automatically cancels the event emitter when the request is aborted
signal: opts.signal,
});
if (opts.input.lastEventId) {
// [...] get the posts since the last event id and yield them
// const items = await db.post.findMany({ ... })
// for (const item of items) {
// yield tracked(item.id, item);
// }
}
// listen for new events
for await (const [data] of on(ee, 'add', {
signal: opts.signal,
})) {
const post = data as Post;
// tracking the post id ensures the client can reconnect at any time and get the latest events this id
yield tracked(post.id, post);
}
}),
});

循环拉取数据

此方案适用于需要定期检查数据库等数据源,并将新数据推送给客户端的场景。

server.ts
ts
import type { Post } from '@prisma/client';
import { tracked } from '@trpc/server';
import { z } from 'zod';
import { publicProcedure, router } from '../trpc';
export const subRouter = router({
onPostAdd: publicProcedure
.input(
z.object({
// lastEventId is the last event id that the client has received
// On the first call, it will be whatever was passed in the initial setup
// If the client reconnects, it will be the last event id that the client received
// The id is the createdAt of the post
lastEventId: z.coerce.date().nullish(),
}),
)
.subscription(async function* (opts) {
// `opts.signal` is an AbortSignal that will be aborted when the client disconnects.
let lastEventId = opts.input?.lastEventId ?? null;
// We use a `while` loop that checks `!opts.signal.aborted`
while (!opts.signal!.aborted) {
const posts = await db.post.findMany({
// If we have a `lastEventId`, we only fetch posts created after it.
where: lastEventId
? {
createdAt: {
gt: lastEventId,
},
}
: undefined,
orderBy: {
createdAt: 'asc',
},
});
for (const post of posts) {
// `tracked` is a helper that sends an `id` with each event.
// This allows the client to resume from the last received event upon reconnection.
yield tracked(post.createdAt.toJSON(), post);
lastEventId = post.createdAt;
}
// Wait for a bit before polling again to avoid hammering the database.
await sleep(1_000);
}
}),
});
server.ts
ts
import type { Post } from '@prisma/client';
import { tracked } from '@trpc/server';
import { z } from 'zod';
import { publicProcedure, router } from '../trpc';
export const subRouter = router({
onPostAdd: publicProcedure
.input(
z.object({
// lastEventId is the last event id that the client has received
// On the first call, it will be whatever was passed in the initial setup
// If the client reconnects, it will be the last event id that the client received
// The id is the createdAt of the post
lastEventId: z.coerce.date().nullish(),
}),
)
.subscription(async function* (opts) {
// `opts.signal` is an AbortSignal that will be aborted when the client disconnects.
let lastEventId = opts.input?.lastEventId ?? null;
// We use a `while` loop that checks `!opts.signal.aborted`
while (!opts.signal!.aborted) {
const posts = await db.post.findMany({
// If we have a `lastEventId`, we only fetch posts created after it.
where: lastEventId
? {
createdAt: {
gt: lastEventId,
},
}
: undefined,
orderBy: {
createdAt: 'asc',
},
});
for (const post of posts) {
// `tracked` is a helper that sends an `id` with each event.
// This allows the client to resume from the last received event upon reconnection.
yield tracked(post.createdAt.toJSON(), post);
lastEventId = post.createdAt;
}
// Wait for a bit before polling again to avoid hammering the database.
await sleep(1_000);
}
}),
});

从服务器停止订阅

若需从服务器端停止订阅,只需在生成器函数中执行 return

ts
import { publicProcedure, router } from '../trpc';
export const subRouter = router({
onPostAdd: publicProcedure
.input(
z.object({
lastEventId: z.string().coerce.number().min(0).optional(),
}),
)
.subscription(async function* (opts) {
let index = opts.input.lastEventId ?? 0;
while (!opts.signal!.aborted) {
const idx = index++;
if (idx > 100) {
// With this, the subscription will stop and the client will disconnect
return;
}
await new Promise((resolve) => setTimeout(resolve, 10));
}
}
}),
});
ts
import { publicProcedure, router } from '../trpc';
export const subRouter = router({
onPostAdd: publicProcedure
.input(
z.object({
lastEventId: z.string().coerce.number().min(0).optional(),
}),
)
.subscription(async function* (opts) {
let index = opts.input.lastEventId ?? 0;
while (!opts.signal!.aborted) {
const idx = index++;
if (idx > 100) {
// With this, the subscription will stop and the client will disconnect
return;
}
await new Promise((resolve) => setTimeout(resolve, 10));
}
}
}),
});

在客户端,直接对订阅执行 .unsubscribe() 即可。

副作用清理

如需清理订阅产生的副作用,可采用 try...finally 模式。当订阅因任何原因终止时,trpc 会调用生成器实例的 .return()

ts
import EventEmitter, { on } from 'events';
import type { Post } from '@prisma/client';
import { z } from 'zod';
import { publicProcedure, router } from '../trpc';
const ee = new EventEmitter();
export const subRouter = router({
onPostAdd: publicProcedure.subscription(async function* (opts) {
let timeout;
try {
for await (const [data] of on(ee, 'add', {
signal: opts.signal,
})) {
timeout = setTimeout(() => console.log('Pretend like this is useful'));
const post = data as Post;
yield post;
}
} finally {
if (timeout) clearTimeout(timeout);
}
}),
});
ts
import EventEmitter, { on } from 'events';
import type { Post } from '@prisma/client';
import { z } from 'zod';
import { publicProcedure, router } from '../trpc';
const ee = new EventEmitter();
export const subRouter = router({
onPostAdd: publicProcedure.subscription(async function* (opts) {
let timeout;
try {
for await (const [data] of on(ee, 'add', {
signal: opts.signal,
})) {
timeout = setTimeout(() => console.log('Pretend like this is useful'));
const post = data as Post;
yield post;
}
} finally {
if (timeout) clearTimeout(timeout);
}
}),
});

错误处理

在生成器函数中抛出错误会传递至后端 trpconError()

若抛出 5xx 错误,客户端将基于通过 tracked() 追踪的最后事件 ID 自动重连。其他错误将导致订阅取消并触发 onError() 回调。

输出验证

由于订阅是异步迭代器,必须通过迭代器遍历才能验证输出结果。

zod v4 示例

zAsyncIterable.ts
ts
import type { TrackedEnvelope } from '@trpc/server';
import { isTrackedEnvelope, tracked } from '@trpc/server';
import { z } from 'zod';
function isAsyncIterable<TValue, TReturn = unknown>(
value: unknown,
): value is AsyncIterable<TValue, TReturn> {
return !!value && typeof value === 'object' && Symbol.asyncIterator in value;
}
const trackedEnvelopeSchema =
z.custom<TrackedEnvelope<unknown>>(isTrackedEnvelope);
/**
* A Zod schema helper designed specifically for validating async iterables. This schema ensures that:
* 1. The value being validated is an async iterable.
* 2. Each item yielded by the async iterable conforms to a specified type.
* 3. The return value of the async iterable, if any, also conforms to a specified type.
*/
export function zAsyncIterable<
TYieldIn,
TYieldOut,
TReturnIn = void,
TReturnOut = void,
Tracked extends boolean = false,
>(opts: {
/**
* Validate the value yielded by the async generator
*/
yield: z.ZodType<TYieldIn, TYieldOut>;
/**
* Validate the return value of the async generator
* @remark not applicable for subscriptions
*/
return?: z.ZodType<TReturnIn, TReturnOut>;
/**
* Whether if the yielded values are tracked
* @remark only applicable for subscriptions
*/
tracked?: Tracked;
}) {
return z
.custom<
AsyncIterable<
Tracked extends true ? TrackedEnvelope<TYieldIn> : TYieldIn,
TReturnIn
>
>((val) => isAsyncIterable(val))
.transform(async function* (iter) {
const iterator = iter[Symbol.asyncIterator]();
try {
let next;
while ((next = await iterator.next()) && !next.done) {
if (opts.tracked) {
const [id, data] = trackedEnvelopeSchema.parse(next.value);
yield tracked(id, await opts.yield.parseAsync(data));
continue;
}
yield opts.yield.parseAsync(next.value);
}
if (opts.return) {
return await opts.return.parseAsync(next.value);
}
return;
} finally {
await iterator.return?.();
}
}) as z.ZodType<
AsyncIterable<
Tracked extends true ? TrackedEnvelope<TYieldIn> : TYieldIn,
TReturnIn,
unknown
>,
AsyncIterable<
Tracked extends true ? TrackedEnvelope<TYieldOut> : TYieldOut,
TReturnOut,
unknown
>
>;
}
zAsyncIterable.ts
ts
import type { TrackedEnvelope } from '@trpc/server';
import { isTrackedEnvelope, tracked } from '@trpc/server';
import { z } from 'zod';
function isAsyncIterable<TValue, TReturn = unknown>(
value: unknown,
): value is AsyncIterable<TValue, TReturn> {
return !!value && typeof value === 'object' && Symbol.asyncIterator in value;
}
const trackedEnvelopeSchema =
z.custom<TrackedEnvelope<unknown>>(isTrackedEnvelope);
/**
* A Zod schema helper designed specifically for validating async iterables. This schema ensures that:
* 1. The value being validated is an async iterable.
* 2. Each item yielded by the async iterable conforms to a specified type.
* 3. The return value of the async iterable, if any, also conforms to a specified type.
*/
export function zAsyncIterable<
TYieldIn,
TYieldOut,
TReturnIn = void,
TReturnOut = void,
Tracked extends boolean = false,
>(opts: {
/**
* Validate the value yielded by the async generator
*/
yield: z.ZodType<TYieldIn, TYieldOut>;
/**
* Validate the return value of the async generator
* @remark not applicable for subscriptions
*/
return?: z.ZodType<TReturnIn, TReturnOut>;
/**
* Whether if the yielded values are tracked
* @remark only applicable for subscriptions
*/
tracked?: Tracked;
}) {
return z
.custom<
AsyncIterable<
Tracked extends true ? TrackedEnvelope<TYieldIn> : TYieldIn,
TReturnIn
>
>((val) => isAsyncIterable(val))
.transform(async function* (iter) {
const iterator = iter[Symbol.asyncIterator]();
try {
let next;
while ((next = await iterator.next()) && !next.done) {
if (opts.tracked) {
const [id, data] = trackedEnvelopeSchema.parse(next.value);
yield tracked(id, await opts.yield.parseAsync(data));
continue;
}
yield opts.yield.parseAsync(next.value);
}
if (opts.return) {
return await opts.return.parseAsync(next.value);
}
return;
} finally {
await iterator.return?.();
}
}) as z.ZodType<
AsyncIterable<
Tracked extends true ? TrackedEnvelope<TYieldIn> : TYieldIn,
TReturnIn,
unknown
>,
AsyncIterable<
Tracked extends true ? TrackedEnvelope<TYieldOut> : TYieldOut,
TReturnOut,
unknown
>
>;
}

zod v3 示例

zAsyncIterable.ts
ts
import type { TrackedEnvelope } from '@trpc/server';
import { isTrackedEnvelope, tracked } from '@trpc/server';
import { z } from 'zod';
function isAsyncIterable<TValue, TReturn = unknown>(
value: unknown,
): value is AsyncIterable<TValue, TReturn> {
return !!value && typeof value === 'object' && Symbol.asyncIterator in value;
}
const trackedEnvelopeSchema =
z.custom<TrackedEnvelope<unknown>>(isTrackedEnvelope);
/**
* A Zod schema helper designed specifically for validating async iterables. This schema ensures that:
* 1. The value being validated is an async iterable.
* 2. Each item yielded by the async iterable conforms to a specified type.
* 3. The return value of the async iterable, if any, also conforms to a specified type.
*/
export function zAsyncIterable<
TYieldIn,
TYieldOut,
TReturnIn = void,
TReturnOut = void,
Tracked extends boolean = false,
>(opts: {
/**
* Validate the value yielded by the async generator
*/
yield: z.ZodType<TYieldIn, any, TYieldOut>;
/**
* Validate the return value of the async generator
* @remark not applicable for subscriptions
*/
return?: z.ZodType<TReturnIn, any, TReturnOut>;
/**
* Whether if the yielded values are tracked
* @remark only applicable for subscriptions
*/
tracked?: Tracked;
}) {
return z
.custom<
AsyncIterable<
Tracked extends true ? TrackedEnvelope<TYieldIn> : TYieldIn,
TReturnIn
>
>((val) => isAsyncIterable(val))
.transform(async function* (iter) {
const iterator = iter[Symbol.asyncIterator]();
try {
let next;
while ((next = await iterator.next()) && !next.done) {
if (opts.tracked) {
const [id, data] = trackedEnvelopeSchema.parse(next.value);
yield tracked(id, await opts.yield.parseAsync(data));
continue;
}
yield opts.yield.parseAsync(next.value);
}
if (opts.return) {
return await opts.return.parseAsync(next.value);
}
return;
} finally {
await iterator.return?.();
}
}) as z.ZodType<
AsyncIterable<
Tracked extends true ? TrackedEnvelope<TYieldIn> : TYieldIn,
TReturnIn,
unknown
>,
any,
AsyncIterable<
Tracked extends true ? TrackedEnvelope<TYieldOut> : TYieldOut,
TReturnOut,
unknown
>
>;
}
zAsyncIterable.ts
ts
import type { TrackedEnvelope } from '@trpc/server';
import { isTrackedEnvelope, tracked } from '@trpc/server';
import { z } from 'zod';
function isAsyncIterable<TValue, TReturn = unknown>(
value: unknown,
): value is AsyncIterable<TValue, TReturn> {
return !!value && typeof value === 'object' && Symbol.asyncIterator in value;
}
const trackedEnvelopeSchema =
z.custom<TrackedEnvelope<unknown>>(isTrackedEnvelope);
/**
* A Zod schema helper designed specifically for validating async iterables. This schema ensures that:
* 1. The value being validated is an async iterable.
* 2. Each item yielded by the async iterable conforms to a specified type.
* 3. The return value of the async iterable, if any, also conforms to a specified type.
*/
export function zAsyncIterable<
TYieldIn,
TYieldOut,
TReturnIn = void,
TReturnOut = void,
Tracked extends boolean = false,
>(opts: {
/**
* Validate the value yielded by the async generator
*/
yield: z.ZodType<TYieldIn, any, TYieldOut>;
/**
* Validate the return value of the async generator
* @remark not applicable for subscriptions
*/
return?: z.ZodType<TReturnIn, any, TReturnOut>;
/**
* Whether if the yielded values are tracked
* @remark only applicable for subscriptions
*/
tracked?: Tracked;
}) {
return z
.custom<
AsyncIterable<
Tracked extends true ? TrackedEnvelope<TYieldIn> : TYieldIn,
TReturnIn
>
>((val) => isAsyncIterable(val))
.transform(async function* (iter) {
const iterator = iter[Symbol.asyncIterator]();
try {
let next;
while ((next = await iterator.next()) && !next.done) {
if (opts.tracked) {
const [id, data] = trackedEnvelopeSchema.parse(next.value);
yield tracked(id, await opts.yield.parseAsync(data));
continue;
}
yield opts.yield.parseAsync(next.value);
}
if (opts.return) {
return await opts.return.parseAsync(next.value);
}
return;
} finally {
await iterator.return?.();
}
}) as z.ZodType<
AsyncIterable<
Tracked extends true ? TrackedEnvelope<TYieldIn> : TYieldIn,
TReturnIn,
unknown
>,
any,
AsyncIterable<
Tracked extends true ? TrackedEnvelope<TYieldOut> : TYieldOut,
TReturnOut,
unknown
>
>;
}

现在你可以使用这个辅助函数来验证订阅过程的输出:

_app.ts
ts
import { publicProcedure, router } from '../trpc';
import { zAsyncIterable } from './zAsyncIterable';
export const appRouter = router({
mySubscription: publicProcedure
.input(
z.object({
lastEventId: z.coerce.number().min(0).optional(),
}),
)
.output(
zAsyncIterable({
yield: z.object({
count: z.number(),
}),
tracked: true,
}),
)
.subscription(async function* (opts) {
let index = opts.input.lastEventId ?? 0;
while (true) {
index++;
yield tracked(index, {
count: index,
});
await new Promise((resolve) => setTimeout(resolve, 1000));
}
}),
});
_app.ts
ts
import { publicProcedure, router } from '../trpc';
import { zAsyncIterable } from './zAsyncIterable';
export const appRouter = router({
mySubscription: publicProcedure
.input(
z.object({
lastEventId: z.coerce.number().min(0).optional(),
}),
)
.output(
zAsyncIterable({
yield: z.object({
count: z.number(),
}),
tracked: true,
}),
)
.subscription(async function* (opts) {
let index = opts.input.lastEventId ?? 0;
while (true) {
index++;
yield tracked(index, {
count: index,
});
await new Promise((resolve) => setTimeout(resolve, 1000));
}
}),
});