跳至主内容
版本:10.x

订阅 / WebSockets

非官方测试版翻译

本页面由 PageTurner AI 翻译(测试版)。未经项目官方认可。 发现错误? 报告问题 →

使用订阅

技巧

添加订阅过程

server/router.ts
tsx
import { EventEmitter } from 'events';
import { initTRPC } from '@trpc/server';
import { observable } from '@trpc/server/observable';
import { z } from 'zod';
// create a global event emitter (could be replaced by redis, etc)
const ee = new EventEmitter();
const t = initTRPC.create();
export const appRouter = t.router({
onAdd: t.procedure.subscription(() => {
// return an `observable` with a callback which is triggered immediately
return observable<Post>((emit) => {
const onAdd = (data: Post) => {
// emit data to client
emit.next(data);
};
// trigger `onAdd()` when `add` is triggered in our event emitter
ee.on('add', onAdd);
// unsubscribe function when client disconnects or stops subscribing
return () => {
ee.off('add', onAdd);
};
});
}),
add: t.procedure
.input(
z.object({
id: z.string().uuid().optional(),
text: z.string().min(1),
}),
)
.mutation(async (opts) => {
const post = { ...opts.input }; /* [..] add to db */
ee.emit('add', post);
return post;
}),
});
server/router.ts
tsx
import { EventEmitter } from 'events';
import { initTRPC } from '@trpc/server';
import { observable } from '@trpc/server/observable';
import { z } from 'zod';
// create a global event emitter (could be replaced by redis, etc)
const ee = new EventEmitter();
const t = initTRPC.create();
export const appRouter = t.router({
onAdd: t.procedure.subscription(() => {
// return an `observable` with a callback which is triggered immediately
return observable<Post>((emit) => {
const onAdd = (data: Post) => {
// emit data to client
emit.next(data);
};
// trigger `onAdd()` when `add` is triggered in our event emitter
ee.on('add', onAdd);
// unsubscribe function when client disconnects or stops subscribing
return () => {
ee.off('add', onAdd);
};
});
}),
add: t.procedure
.input(
z.object({
id: z.string().uuid().optional(),
text: z.string().min(1),
}),
)
.mutation(async (opts) => {
const post = { ...opts.input }; /* [..] add to db */
ee.emit('add', post);
return post;
}),
});

创建 WebSocket 服务器

bash
yarn add ws
bash
yarn add ws
server/wsServer.ts
ts
import { applyWSSHandler } from '@trpc/server/adapters/ws';
import ws from 'ws';
import { appRouter } from './routers/app';
import { createContext } from './trpc';
const wss = new ws.Server({
port: 3001,
});
const handler = applyWSSHandler({ wss, router: appRouter, createContext });
wss.on('connection', (ws) => {
console.log(`➕➕ Connection (${wss.clients.size})`);
ws.once('close', () => {
console.log(`➖➖ Connection (${wss.clients.size})`);
});
});
console.log('✅ WebSocket Server listening on ws://localhost:3001');
process.on('SIGTERM', () => {
console.log('SIGTERM');
handler.broadcastReconnectNotification();
wss.close();
});
server/wsServer.ts
ts
import { applyWSSHandler } from '@trpc/server/adapters/ws';
import ws from 'ws';
import { appRouter } from './routers/app';
import { createContext } from './trpc';
const wss = new ws.Server({
port: 3001,
});
const handler = applyWSSHandler({ wss, router: appRouter, createContext });
wss.on('connection', (ws) => {
console.log(`➕➕ Connection (${wss.clients.size})`);
ws.once('close', () => {
console.log(`➖➖ Connection (${wss.clients.size})`);
});
});
console.log('✅ WebSocket Server listening on ws://localhost:3001');
process.on('SIGTERM', () => {
console.log('SIGTERM');
handler.broadcastReconnectNotification();
wss.close();
});

设置 TRPCClient 使用 WebSockets

技巧

您可以使用链接将查询和/或变更路由到 HTTP 传输,而订阅通过 WebSockets 传输。

client.ts
tsx
import { createTRPCProxyClient, createWSClient, wsLink } from '@trpc/client';
import type { AppRouter } from '../path/to/server/trpc';
// create persistent WebSocket connection
const wsClient = createWSClient({
url: `ws://localhost:3001`,
});
// configure TRPCClient to use WebSockets transport
const client = createTRPCProxyClient<AppRouter>({
links: [
wsLink({
client: wsClient,
}),
],
});
client.ts
tsx
import { createTRPCProxyClient, createWSClient, wsLink } from '@trpc/client';
import type { AppRouter } from '../path/to/server/trpc';
// create persistent WebSocket connection
const wsClient = createWSClient({
url: `ws://localhost:3001`,
});
// configure TRPCClient to use WebSockets transport
const client = createTRPCProxyClient<AppRouter>({
links: [
wsLink({
client: wsClient,
}),
],
});

在 React 中使用

请参考 /examples/next-prisma-starter-websockets

WebSockets RPC 规范

您可以通过深入查看 TypeScript 类型定义获取更多细节:

query / mutation

请求

ts
{
id: number | string;
jsonrpc?: '2.0'; // optional
method: 'query' | 'mutation';
params: {
path: string;
input?: unknown; // <-- pass input of procedure, serialized by transformer
};
}
ts
{
id: number | string;
jsonrpc?: '2.0'; // optional
method: 'query' | 'mutation';
params: {
path: string;
input?: unknown; // <-- pass input of procedure, serialized by transformer
};
}

响应

... 如下,或错误

ts
{
id: number | string;
jsonrpc?: '2.0'; // only defined if included in request
result: {
type: 'data'; // always 'data' for mutation / queries
data: TOutput; // output from procedure
}
}
ts
{
id: number | string;
jsonrpc?: '2.0'; // only defined if included in request
result: {
type: 'data'; // always 'data' for mutation / queries
data: TOutput; // output from procedure
}
}

subscription / subscription.stop

开始订阅

ts
{
id: number | string;
jsonrpc?: '2.0';
method: 'subscription';
params: {
path: string;
input?: unknown; // <-- pass input of procedure, serialized by transformer
};
}
ts
{
id: number | string;
jsonrpc?: '2.0';
method: 'subscription';
params: {
path: string;
input?: unknown; // <-- pass input of procedure, serialized by transformer
};
}

取消订阅需调用 subscription.stop

ts
{
id: number | string; // <-- id of your created subscription
jsonrpc?: '2.0';
method: 'subscription.stop';
}
ts
{
id: number | string; // <-- id of your created subscription
jsonrpc?: '2.0';
method: 'subscription.stop';
}

订阅响应结构

... 如下,或错误

ts
{
id: number | string;
jsonrpc?: '2.0';
result: (
| {
type: 'data';
data: TData; // subscription emitted data
}
| {
type: 'started'; // subscription started
}
| {
type: 'stopped'; // subscription stopped
}
)
}
ts
{
id: number | string;
jsonrpc?: '2.0';
result: (
| {
type: 'data';
data: TData; // subscription emitted data
}
| {
type: 'started'; // subscription started
}
| {
type: 'stopped'; // subscription stopped
}
)
}

错误处理

请参阅 https://www.jsonrpc.org/specification#error_object错误格式化

服务端到客户端的通知

{ id: null, type: 'reconnect' }

通知客户端在关闭服务器前重新连接。通过 wssHandler.broadcastReconnectNotification() 调用。