メインコンテンツへスキップ
バージョン: 11.x

WebSockets

非公式ベータ版翻訳

このページは PageTurner AI で翻訳されました(ベータ版)。プロジェクト公式の承認はありません。 エラーを見つけましたか? 問題を報告 →

サーバーとの通信の全部または一部にWebSocketsを利用できます。クライアント側での設定方法についてはwsLinkを参照してください。

ヒント

このドキュメントではWebSockets利用に特化した詳細を説明しています。サブスクリプションの一般的な使用方法についてはサブスクリプションガイドをご覧ください。

WebSocketサーバーの作成

bash
yarn add ws
bash
yarn add ws
server/wsServer.ts
ts
import { applyWSSHandler } from '@trpc/server/adapters/ws';
import ws from 'ws';
import { appRouter } from './routers/app';
import { createContext } from './trpc';
const wss = new ws.Server({
port: 3001,
});
const handler = applyWSSHandler({
wss,
router: appRouter,
createContext,
// Enable heartbeat messages to keep connection open (disabled by default)
keepAlive: {
enabled: true,
// server ping message interval in milliseconds
pingMs: 30000,
// connection is terminated if pong message is not received in this many milliseconds
pongWaitMs: 5000,
},
});
wss.on('connection', (ws) => {
console.log(`➕➕ Connection (${wss.clients.size})`);
ws.once('close', () => {
console.log(`➖➖ Connection (${wss.clients.size})`);
});
});
console.log('✅ WebSocket Server listening on ws://localhost:3001');
process.on('SIGTERM', () => {
console.log('SIGTERM');
handler.broadcastReconnectNotification();
wss.close();
});
server/wsServer.ts
ts
import { applyWSSHandler } from '@trpc/server/adapters/ws';
import ws from 'ws';
import { appRouter } from './routers/app';
import { createContext } from './trpc';
const wss = new ws.Server({
port: 3001,
});
const handler = applyWSSHandler({
wss,
router: appRouter,
createContext,
// Enable heartbeat messages to keep connection open (disabled by default)
keepAlive: {
enabled: true,
// server ping message interval in milliseconds
pingMs: 30000,
// connection is terminated if pong message is not received in this many milliseconds
pongWaitMs: 5000,
},
});
wss.on('connection', (ws) => {
console.log(`➕➕ Connection (${wss.clients.size})`);
ws.once('close', () => {
console.log(`➖➖ Connection (${wss.clients.size})`);
});
});
console.log('✅ WebSocket Server listening on ws://localhost:3001');
process.on('SIGTERM', () => {
console.log('SIGTERM');
handler.broadcastReconnectNotification();
wss.close();
});

TRPCClientでWebSocketsを使用する設定

ヒント

Linksを使用して、クエリやミューテーションをHTTP転送、サブスクリプションをWebSocket経由でルーティングできます。

client.ts
tsx
import { createTRPCClient, createWSClient, wsLink } from '@trpc/client';
import type { AppRouter } from '../path/to/server/trpc';
// create persistent WebSocket connection
const wsClient = createWSClient({
url: `ws://localhost:3001`,
});
// configure TRPCClient to use WebSockets transport
const client = createTRPCClient<AppRouter>({
links: [
wsLink({
client: wsClient,
}),
],
});
client.ts
tsx
import { createTRPCClient, createWSClient, wsLink } from '@trpc/client';
import type { AppRouter } from '../path/to/server/trpc';
// create persistent WebSocket connection
const wsClient = createWSClient({
url: `ws://localhost:3001`,
});
// configure TRPCClient to use WebSockets transport
const client = createTRPCClient<AppRouter>({
links: [
wsLink({
client: wsClient,
}),
],
});

認証 / 接続パラメータ

ヒント

Webアプリケーションを開発している場合、リクエストの一部としてクッキーが送信されるため、このセクションは無視して構いません。

WebSocketsでの認証を行うには、createWSClientconnectionParamsを定義します。これはクライアントがWebSocket接続を確立する際の最初のメッセージとして送信されます。

server/context.ts
ts
import type { CreateWSSContextFnOptions } from '@trpc/server/adapters/ws';
 
export const createContext = async (opts: CreateWSSContextFnOptions) => {
const token = opts.info.connectionParams?.token;
const token: string | undefined
 
// [... authenticate]
 
return {};
};
 
export type Context = Awaited<ReturnType<typeof createContext>>;
server/context.ts
ts
import type { CreateWSSContextFnOptions } from '@trpc/server/adapters/ws';
 
export const createContext = async (opts: CreateWSSContextFnOptions) => {
const token = opts.info.connectionParams?.token;
const token: string | undefined
 
// [... authenticate]
 
return {};
};
 
export type Context = Awaited<ReturnType<typeof createContext>>;
client/trpc.ts
ts
import { createTRPCClient, createWSClient, wsLink } from '@trpc/client';
import type { AppRouter } from '~/server/routers/_app';
const wsClient = createWSClient({
url: `ws://localhost:3000`,
connectionParams: async () => {
return {
token: 'supersecret',
};
},
});
export const trpc = createTRPCClient<AppRouter>({
links: [wsLink({ client: wsClient, transformer: superjson })],
});
client/trpc.ts
ts
import { createTRPCClient, createWSClient, wsLink } from '@trpc/client';
import type { AppRouter } from '~/server/routers/_app';
const wsClient = createWSClient({
url: `ws://localhost:3000`,
connectionParams: async () => {
return {
token: 'supersecret',
};
},
});
export const trpc = createTRPCClient<AppRouter>({
links: [wsLink({ client: wsClient, transformer: superjson })],
});

tracked()を使用したIDの自動追跡(推奨)

tracked()ヘルパーを使用してイベントをyieldし、idを含めることで、クライアントは切断時に自動的に再接続し、再接続時に最後に認識されたIDをlastEventId入力の一部として送信します。

サブスクリプションの初期化時にlastEventIdを送信すると、ブラウザがデータを受信するたびに自動的に更新されます。

情報

lastEventIdに基づいてデータを取得する場合、すべてのイベントの捕捉が重要であるときは、当社のフルスタックSSE例のようにReadableStreamや類似パターンを中間層として使用し、lastEventIdに基づく元のバッチをyieldしている間に新しく発行されるイベントが無視されるのを防ぐことを検討してください。

ts
import EventEmitter, { on } from 'events';
import { tracked } from '@trpc/server';
import { z } from 'zod';
import { publicProcedure, router } from '../trpc';
const ee = new EventEmitter();
export const subRouter = router({
onPostAdd: publicProcedure
.input(
z
.object({
// lastEventId is the last event id that the client has received
// On the first call, it will be whatever was passed in the initial setup
// If the client reconnects, it will be the last event id that the client received
lastEventId: z.string().nullish(),
})
.optional(),
)
.subscription(async function* (opts) {
if (opts.input.lastEventId) {
// [...] get the posts since the last event id and yield them
}
// listen for new events
for await (const [data] of on(ee, 'add', {
// Passing the AbortSignal from the request automatically cancels the event emitter when the subscription is aborted
signal: opts.signal,
})) {
const post = data as Post;
// tracking the post id ensures the client can reconnect at any time and get the latest events this id
yield tracked(post.id, post);
}
}),
});
ts
import EventEmitter, { on } from 'events';
import { tracked } from '@trpc/server';
import { z } from 'zod';
import { publicProcedure, router } from '../trpc';
const ee = new EventEmitter();
export const subRouter = router({
onPostAdd: publicProcedure
.input(
z
.object({
// lastEventId is the last event id that the client has received
// On the first call, it will be whatever was passed in the initial setup
// If the client reconnects, it will be the last event id that the client received
lastEventId: z.string().nullish(),
})
.optional(),
)
.subscription(async function* (opts) {
if (opts.input.lastEventId) {
// [...] get the posts since the last event id and yield them
}
// listen for new events
for await (const [data] of on(ee, 'add', {
// Passing the AbortSignal from the request automatically cancels the event emitter when the subscription is aborted
signal: opts.signal,
})) {
const post = data as Post;
// tracking the post id ensures the client can reconnect at any time and get the latest events this id
yield tracked(post.id, post);
}
}),
});

WebSockets RPC仕様

詳細は以下のTypeScript定義を参照してください:

query / mutation

リクエスト

ts
{
id: number | string;
jsonrpc?: '2.0'; // optional
method: 'query' | 'mutation';
params: {
path: string;
input?: unknown; // <-- pass input of procedure, serialized by transformer
};
}
ts
{
id: number | string;
jsonrpc?: '2.0'; // optional
method: 'query' | 'mutation';
params: {
path: string;
input?: unknown; // <-- pass input of procedure, serialized by transformer
};
}

レスポンス

...下記、またはエラー

ts
{
id: number | string;
jsonrpc?: '2.0'; // only defined if included in request
result: {
type: 'data'; // always 'data' for mutation / queries
data: TOutput; // output from procedure
}
}
ts
{
id: number | string;
jsonrpc?: '2.0'; // only defined if included in request
result: {
type: 'data'; // always 'data' for mutation / queries
data: TOutput; // output from procedure
}
}

subscription / subscription.stop

サブスクリプションの開始

ts
{
id: number | string;
jsonrpc?: '2.0';
method: 'subscription';
params: {
path: string;
input?: unknown; // <-- pass input of procedure, serialized by transformer
};
}
ts
{
id: number | string;
jsonrpc?: '2.0';
method: 'subscription';
params: {
path: string;
input?: unknown; // <-- pass input of procedure, serialized by transformer
};
}

サブスクリプションのキャンセルにはsubscription.stopを呼び出します

ts
{
id: number | string; // <-- id of your created subscription
jsonrpc?: '2.0';
method: 'subscription.stop';
}
ts
{
id: number | string; // <-- id of your created subscription
jsonrpc?: '2.0';
method: 'subscription.stop';
}

サブスクリプションのレスポンス形式

...下記、またはエラー

ts
{
id: number | string;
jsonrpc?: '2.0';
result: (
| {
type: 'data';
data: TData; // subscription emitted data
}
| {
type: 'started'; // subscription started
}
| {
type: 'stopped'; // subscription stopped
}
)
}
ts
{
id: number | string;
jsonrpc?: '2.0';
result: (
| {
type: 'data';
data: TData; // subscription emitted data
}
| {
type: 'started'; // subscription started
}
| {
type: 'stopped'; // subscription stopped
}
)
}

接続パラメータ

接続が?connectionParams=1で初期化される場合、最初のメッセージは接続パラメータでなければなりません。

ts
{
data: Record<string, string> | null;
method: 'connectionParams';
}
ts
{
data: Record<string, string> | null;
method: 'connectionParams';
}

エラー

https://www.jsonrpc.org/specification#error_objectまたはエラーフォーマットを参照してください。

サーバーからクライアントへの通知

{ id: null, type: 'reconnect' }

サーバーシャットダウン前にクライアントに再接続を指示します。wssHandler.broadcastReconnectNotification()によって呼び出されます。