WebSockets
本页面由 PageTurner AI 翻译(测试版)。未经项目官方认可。 发现错误? 报告问题 →
您可以将 WebSockets 用于与服务器的全部或部分通信,客户端配置方法请参阅 wsLink。
本文档重点说明 WebSockets 的特定使用细节。关于订阅的常规用法,请参见 订阅指南。
创建 WebSocket 服务器
bashyarn add ws
bashyarn add ws
server/wsServer.tstsimport { applyWSSHandler } from '@trpc/server/adapters/ws';import ws from 'ws';import { appRouter } from './routers/app';import { createContext } from './trpc';const wss = new ws.Server({port: 3001,});const handler = applyWSSHandler({wss,router: appRouter,createContext,// Enable heartbeat messages to keep connection open (disabled by default)keepAlive: {enabled: true,// server ping message interval in millisecondspingMs: 30000,// connection is terminated if pong message is not received in this many millisecondspongWaitMs: 5000,},});wss.on('connection', (ws) => {console.log(`➕➕ Connection (${wss.clients.size})`);ws.once('close', () => {console.log(`➖➖ Connection (${wss.clients.size})`);});});console.log('✅ WebSocket Server listening on ws://localhost:3001');process.on('SIGTERM', () => {console.log('SIGTERM');handler.broadcastReconnectNotification();wss.close();});
server/wsServer.tstsimport { applyWSSHandler } from '@trpc/server/adapters/ws';import ws from 'ws';import { appRouter } from './routers/app';import { createContext } from './trpc';const wss = new ws.Server({port: 3001,});const handler = applyWSSHandler({wss,router: appRouter,createContext,// Enable heartbeat messages to keep connection open (disabled by default)keepAlive: {enabled: true,// server ping message interval in millisecondspingMs: 30000,// connection is terminated if pong message is not received in this many millisecondspongWaitMs: 5000,},});wss.on('connection', (ws) => {console.log(`➕➕ Connection (${wss.clients.size})`);ws.once('close', () => {console.log(`➖➖ Connection (${wss.clients.size})`);});});console.log('✅ WebSocket Server listening on ws://localhost:3001');process.on('SIGTERM', () => {console.log('SIGTERM');handler.broadcastReconnectNotification();wss.close();});
设置 TRPCClient 使用 WebSockets
您可以使用链接将查询和/或变更路由到 HTTP 传输,而订阅通过 WebSockets 传输。
client.tstsximport { createTRPCClient, createWSClient, wsLink } from '@trpc/client';import type { AppRouter } from '../path/to/server/trpc';// create persistent WebSocket connectionconst wsClient = createWSClient({url: `ws://localhost:3001`,});// configure TRPCClient to use WebSockets transportconst client = createTRPCClient<AppRouter>({links: [wsLink({client: wsClient,}),],});
client.tstsximport { createTRPCClient, createWSClient, wsLink } from '@trpc/client';import type { AppRouter } from '../path/to/server/trpc';// create persistent WebSocket connectionconst wsClient = createWSClient({url: `ws://localhost:3001`,});// configure TRPCClient to use WebSockets transportconst client = createTRPCClient<AppRouter>({links: [wsLink({client: wsClient,}),],});
认证/连接参数
如果是 Web 应用程序,可忽略本节内容,因为 cookies 会随请求自动发送。
要在 WebSockets 中进行身份认证,您可为 createWSClient 定义 connectionParams。该参数将在客户端建立 WebSocket 连接时作为第一条消息发送。
server/context.tstsimport type {CreateWSSContextFnOptions } from '@trpc/server/adapters/ws';export constcreateContext = async (opts :CreateWSSContextFnOptions ) => {consttoken =opts .info .connectionParams ?.token ;// [... authenticate]return {};};export typeContext =Awaited <ReturnType <typeofcreateContext >>;
server/context.tstsimport type {CreateWSSContextFnOptions } from '@trpc/server/adapters/ws';export constcreateContext = async (opts :CreateWSSContextFnOptions ) => {consttoken =opts .info .connectionParams ?.token ;// [... authenticate]return {};};export typeContext =Awaited <ReturnType <typeofcreateContext >>;
client/trpc.tstsimport { createTRPCClient, createWSClient, wsLink } from '@trpc/client';import type { AppRouter } from '~/server/routers/_app';const wsClient = createWSClient({url: `ws://localhost:3000`,connectionParams: async () => {return {token: 'supersecret',};},});export const trpc = createTRPCClient<AppRouter>({links: [wsLink({ client: wsClient, transformer: superjson })],});
client/trpc.tstsimport { createTRPCClient, createWSClient, wsLink } from '@trpc/client';import type { AppRouter } from '~/server/routers/_app';const wsClient = createWSClient({url: `ws://localhost:3000`,connectionParams: async () => {return {token: 'supersecret',};},});export const trpc = createTRPCClient<AppRouter>({links: [wsLink({ client: wsClient, transformer: superjson })],});
使用 tracked() 自动追踪 ID(推荐)
若您使用 tracked() 辅助函数 yield 事件并包含 id,客户端在断开连接后将自动重连,并在重连时通过 lastEventId 参数发送最后已知的 ID。
初始化订阅时可发送初始 lastEventId,浏览器在接收数据时会自动更新该值。
若基于 lastEventId 获取数据且事件完整性至关重要,建议使用 ReadableStream 或类似模式作为中间层(如 全栈 SSE 示例 所示),避免在基于 lastEventId 批量生成时忽略新产生的事件。
tsimport EventEmitter, { on } from 'events';import { tracked } from '@trpc/server';import { z } from 'zod';import { publicProcedure, router } from '../trpc';const ee = new EventEmitter();export const subRouter = router({onPostAdd: publicProcedure.input(z.object({// lastEventId is the last event id that the client has received// On the first call, it will be whatever was passed in the initial setup// If the client reconnects, it will be the last event id that the client receivedlastEventId: z.string().nullish(),}).optional(),).subscription(async function* (opts) {if (opts.input.lastEventId) {// [...] get the posts since the last event id and yield them}// listen for new eventsfor await (const [data] of on(ee, 'add', {// Passing the AbortSignal from the request automatically cancels the event emitter when the subscription is abortedsignal: opts.signal,})) {const post = data as Post;// tracking the post id ensures the client can reconnect at any time and get the latest events this idyield tracked(post.id, post);}}),});
tsimport EventEmitter, { on } from 'events';import { tracked } from '@trpc/server';import { z } from 'zod';import { publicProcedure, router } from '../trpc';const ee = new EventEmitter();export const subRouter = router({onPostAdd: publicProcedure.input(z.object({// lastEventId is the last event id that the client has received// On the first call, it will be whatever was passed in the initial setup// If the client reconnects, it will be the last event id that the client receivedlastEventId: z.string().nullish(),}).optional(),).subscription(async function* (opts) {if (opts.input.lastEventId) {// [...] get the posts since the last event id and yield them}// listen for new eventsfor await (const [data] of on(ee, 'add', {// Passing the AbortSignal from the request automatically cancels the event emitter when the subscription is abortedsignal: opts.signal,})) {const post = data as Post;// tracking the post id ensures the client can reconnect at any time and get the latest events this idyield tracked(post.id, post);}}),});
WebSockets RPC 规范
您可以通过深入查看 TypeScript 类型定义获取更多细节:
query / mutation
请求
ts{id: number | string;jsonrpc?: '2.0'; // optionalmethod: 'query' | 'mutation';params: {path: string;input?: unknown; // <-- pass input of procedure, serialized by transformer};}
ts{id: number | string;jsonrpc?: '2.0'; // optionalmethod: 'query' | 'mutation';params: {path: string;input?: unknown; // <-- pass input of procedure, serialized by transformer};}
响应
... 如下,或错误
ts{id: number | string;jsonrpc?: '2.0'; // only defined if included in requestresult: {type: 'data'; // always 'data' for mutation / queriesdata: TOutput; // output from procedure}}
ts{id: number | string;jsonrpc?: '2.0'; // only defined if included in requestresult: {type: 'data'; // always 'data' for mutation / queriesdata: TOutput; // output from procedure}}
subscription / subscription.stop
开始订阅
ts{id: number | string;jsonrpc?: '2.0';method: 'subscription';params: {path: string;input?: unknown; // <-- pass input of procedure, serialized by transformer};}
ts{id: number | string;jsonrpc?: '2.0';method: 'subscription';params: {path: string;input?: unknown; // <-- pass input of procedure, serialized by transformer};}
取消订阅需调用 subscription.stop
ts{id: number | string; // <-- id of your created subscriptionjsonrpc?: '2.0';method: 'subscription.stop';}
ts{id: number | string; // <-- id of your created subscriptionjsonrpc?: '2.0';method: 'subscription.stop';}
订阅响应结构
... 如下,或错误
ts{id: number | string;jsonrpc?: '2.0';result: (| {type: 'data';data: TData; // subscription emitted data}| {type: 'started'; // subscription started}| {type: 'stopped'; // subscription stopped})}
ts{id: number | string;jsonrpc?: '2.0';result: (| {type: 'data';data: TData; // subscription emitted data}| {type: 'started'; // subscription started}| {type: 'stopped'; // subscription stopped})}
连接参数
若连接通过 ?connectionParams=1 初始化,首条消息必须是连接参数。
ts{data: Record<string, string> | null;method: 'connectionParams';}
ts{data: Record<string, string> | null;method: 'connectionParams';}
错误处理
参见 https://www.jsonrpc.org/specification#error_object 或 错误格式化。
服务端到客户端的通知
{ id: null, type: 'reconnect' }
通知客户端在关闭服务器前重新连接。通过 wssHandler.broadcastReconnectNotification() 调用。