メインコンテンツへスキップ
バージョン: 11.x

サブスクリプション

非公式ベータ版翻訳

このページは PageTurner AI で翻訳されました(ベータ版)。プロジェクト公式の承認はありません。 エラーを見つけましたか? 問題を報告 →

はじめに

サブスクリプションはクライアントとサーバー間のリアルタイムイベントストリームです。クライアントにリアルタイム更新をプッシュする必要がある場合に使用します。

tRPCのサブスクリプションでは、クライアントはサーバーへの永続的な接続を確立・維持し、tracked()イベントの助けを借りて切断時に自動的に再接続を試み、正常に復旧します。

WebSocketとServer-sent Eventsの選択

tRPCでリアルタイムサブスクリプションを設定するには、WebSocketまたはServer-sent Events(SSE)のいずれかを使用できます。

どちらを選択すべきか迷った場合、サブスクリプションにはSSEの使用をお勧めします。設定が容易でWebSocketサーバーの構築が不要なためです。

参考プロジェクト

TypeExample TypeLink
WebSocketsBare-minimum Node.js WebSockets example/examples/standalone-server
SSEFull-stack SSE implementationgithub.com/trpc/examples-next-sse-chat
WebSocketsFull-stack WebSockets implementationgithub.com/trpc/examples-next-prisma-websockets-starter

基本例

ヒント

完全な例は当社のフルスタックSSEサンプルをご覧ください。

server.ts
ts
import { initTRPC } from '@trpc/server';
const t = initTRPC.create();
const ee = new EventEmitter();
export const appRouter = router({
onPostAdd: publicProcedure.subscription(async function* (opts) {
// listen for new events
for await (const [data] of on(ee, 'add', {
// Passing the AbortSignal from the request automatically cancels the event emitter when the request is aborted
signal: opts.signal,
})) {
const post = data as Post;
yield post;
}
}),
});
server.ts
ts
import { initTRPC } from '@trpc/server';
const t = initTRPC.create();
const ee = new EventEmitter();
export const appRouter = router({
onPostAdd: publicProcedure.subscription(async function* (opts) {
// listen for new events
for await (const [data] of on(ee, 'add', {
// Passing the AbortSignal from the request automatically cancels the event emitter when the request is aborted
signal: opts.signal,
})) {
const post = data as Post;
yield post;
}
}),
});

tracked()を使用したIDの自動追跡(推奨)

tracked()ヘルパーを使用してidを含むイベントをyieldすると、クライアントは切断時に自動的に再接続し、最後に確認されたIDを送信します。

サブスクリプションの初期化時にlastEventIdを送信すると、ブラウザがデータを受信するたびに自動的に更新されます。

  • SSEの場合、これはEventSource仕様の一部であり、.input()lastEventId経由で伝播されます。

  • WebSocketの場合、当社のwsLinkが最後に確認されたIDを自動送信し、ブラウザがデータを受信すると更新します。

ヒント

lastEventIdに基づいてデータを取得する場合、すべてのイベントの捕捉が重要であれば、当社のフルスタックSSEサンプルで行っているように、データベースからイベントを取得する前にイベントリスナーを設定してください。これにより、lastEventIdに基づく元のバッチをyield中に新しく発生したイベントが無視されるのを防げます。

ts
import EventEmitter, { on } from 'events';
import type { Post } from '@prisma/client';
import { tracked } from '@trpc/server';
import { z } from 'zod';
import { publicProcedure, router } from '../trpc';
const ee = new EventEmitter();
export const subRouter = router({
onPostAdd: publicProcedure
.input(
z
.object({
// lastEventId is the last event id that the client has received
// On the first call, it will be whatever was passed in the initial setup
// If the client reconnects, it will be the last event id that the client received
lastEventId: z.string().nullish(),
})
.optional(),
)
.subscription(async function* (opts) {
// We start by subscribing to the ee so that we don't miss any new events while fetching
const iterable = ee.toIterable('add', {
// Passing the AbortSignal from the request automatically cancels the event emitter when the request is aborted
signal: opts.signal,
});
if (opts.input.lastEventId) {
// [...] get the posts since the last event id and yield them
// const items = await db.post.findMany({ ... })
// for (const item of items) {
// yield tracked(item.id, item);
// }
}
// listen for new events
for await (const [data] of on(ee, 'add', {
signal: opts.signal,
})) {
const post = data as Post;
// tracking the post id ensures the client can reconnect at any time and get the latest events this id
yield tracked(post.id, post);
}
}),
});
ts
import EventEmitter, { on } from 'events';
import type { Post } from '@prisma/client';
import { tracked } from '@trpc/server';
import { z } from 'zod';
import { publicProcedure, router } from '../trpc';
const ee = new EventEmitter();
export const subRouter = router({
onPostAdd: publicProcedure
.input(
z
.object({
// lastEventId is the last event id that the client has received
// On the first call, it will be whatever was passed in the initial setup
// If the client reconnects, it will be the last event id that the client received
lastEventId: z.string().nullish(),
})
.optional(),
)
.subscription(async function* (opts) {
// We start by subscribing to the ee so that we don't miss any new events while fetching
const iterable = ee.toIterable('add', {
// Passing the AbortSignal from the request automatically cancels the event emitter when the request is aborted
signal: opts.signal,
});
if (opts.input.lastEventId) {
// [...] get the posts since the last event id and yield them
// const items = await db.post.findMany({ ... })
// for (const item of items) {
// yield tracked(item.id, item);
// }
}
// listen for new events
for await (const [data] of on(ee, 'add', {
signal: opts.signal,
})) {
const post = data as Post;
// tracking the post id ensures the client can reconnect at any time and get the latest events this id
yield tracked(post.id, post);
}
}),
});

ループでのデータ取得

この方法は、データベースなどのソースから定期的に新しいデータをチェックし、クライアントにプッシュしたい場合に有用です。

server.ts
ts
import type { Post } from '@prisma/client';
import { tracked } from '@trpc/server';
import { z } from 'zod';
import { publicProcedure, router } from '../trpc';
export const subRouter = router({
onPostAdd: publicProcedure
.input(
z.object({
// lastEventId is the last event id that the client has received
// On the first call, it will be whatever was passed in the initial setup
// If the client reconnects, it will be the last event id that the client received
// The id is the createdAt of the post
lastEventId: z.coerce.date().nullish(),
}),
)
.subscription(async function* (opts) {
// `opts.signal` is an AbortSignal that will be aborted when the client disconnects.
let lastEventId = opts.input?.lastEventId ?? null;
// We use a `while` loop that checks `!opts.signal.aborted`
while (!opts.signal!.aborted) {
const posts = await db.post.findMany({
// If we have a `lastEventId`, we only fetch posts created after it.
where: lastEventId
? {
createdAt: {
gt: lastEventId,
},
}
: undefined,
orderBy: {
createdAt: 'asc',
},
});
for (const post of posts) {
// `tracked` is a helper that sends an `id` with each event.
// This allows the client to resume from the last received event upon reconnection.
yield tracked(post.createdAt.toJSON(), post);
lastEventId = post.createdAt;
}
// Wait for a bit before polling again to avoid hammering the database.
await sleep(1_000);
}
}),
});
server.ts
ts
import type { Post } from '@prisma/client';
import { tracked } from '@trpc/server';
import { z } from 'zod';
import { publicProcedure, router } from '../trpc';
export const subRouter = router({
onPostAdd: publicProcedure
.input(
z.object({
// lastEventId is the last event id that the client has received
// On the first call, it will be whatever was passed in the initial setup
// If the client reconnects, it will be the last event id that the client received
// The id is the createdAt of the post
lastEventId: z.coerce.date().nullish(),
}),
)
.subscription(async function* (opts) {
// `opts.signal` is an AbortSignal that will be aborted when the client disconnects.
let lastEventId = opts.input?.lastEventId ?? null;
// We use a `while` loop that checks `!opts.signal.aborted`
while (!opts.signal!.aborted) {
const posts = await db.post.findMany({
// If we have a `lastEventId`, we only fetch posts created after it.
where: lastEventId
? {
createdAt: {
gt: lastEventId,
},
}
: undefined,
orderBy: {
createdAt: 'asc',
},
});
for (const post of posts) {
// `tracked` is a helper that sends an `id` with each event.
// This allows the client to resume from the last received event upon reconnection.
yield tracked(post.createdAt.toJSON(), post);
lastEventId = post.createdAt;
}
// Wait for a bit before polling again to avoid hammering the database.
await sleep(1_000);
}
}),
});

サーバー側からのサブスクリプション停止

サーバーからサブスクリプションを停止する必要がある場合は、ジェネレーター関数内で単にreturnします。

ts
import { publicProcedure, router } from '../trpc';
export const subRouter = router({
onPostAdd: publicProcedure
.input(
z.object({
lastEventId: z.string().coerce.number().min(0).optional(),
}),
)
.subscription(async function* (opts) {
let index = opts.input.lastEventId ?? 0;
while (!opts.signal!.aborted) {
const idx = index++;
if (idx > 100) {
// With this, the subscription will stop and the client will disconnect
return;
}
await new Promise((resolve) => setTimeout(resolve, 10));
}
}
}),
});
ts
import { publicProcedure, router } from '../trpc';
export const subRouter = router({
onPostAdd: publicProcedure
.input(
z.object({
lastEventId: z.string().coerce.number().min(0).optional(),
}),
)
.subscription(async function* (opts) {
let index = opts.input.lastEventId ?? 0;
while (!opts.signal!.aborted) {
const idx = index++;
if (idx > 100) {
// With this, the subscription will stop and the client will disconnect
return;
}
await new Promise((resolve) => setTimeout(resolve, 10));
}
}
}),
});

クライアント側では、サブスクリプションを.unsubscribe()するだけです。

副作用のクリーンアップ

サブスクリプションの副作用をクリーンアップする必要がある場合は、try...finallyパターンを使用できます。trpcはサブスクリプションが何らかの理由で停止するとジェネレーターインスタンスの.return()を呼び出すためです。

ts
import EventEmitter, { on } from 'events';
import type { Post } from '@prisma/client';
import { z } from 'zod';
import { publicProcedure, router } from '../trpc';
const ee = new EventEmitter();
export const subRouter = router({
onPostAdd: publicProcedure.subscription(async function* (opts) {
let timeout;
try {
for await (const [data] of on(ee, 'add', {
signal: opts.signal,
})) {
timeout = setTimeout(() => console.log('Pretend like this is useful'));
const post = data as Post;
yield post;
}
} finally {
if (timeout) clearTimeout(timeout);
}
}),
});
ts
import EventEmitter, { on } from 'events';
import type { Post } from '@prisma/client';
import { z } from 'zod';
import { publicProcedure, router } from '../trpc';
const ee = new EventEmitter();
export const subRouter = router({
onPostAdd: publicProcedure.subscription(async function* (opts) {
let timeout;
try {
for await (const [data] of on(ee, 'add', {
signal: opts.signal,
})) {
timeout = setTimeout(() => console.log('Pretend like this is useful'));
const post = data as Post;
yield post;
}
} finally {
if (timeout) clearTimeout(timeout);
}
}),
});

エラー処理

ジェネレーター関数内でエラーをスローすると、バックエンドのtrpconError()に伝播します。

スローされたエラーが5xxエラーの場合、クライアントはtracked()で追跡された最後のイベントIDに基づいて自動的に再接続を試みます。それ以外のエラーの場合、サブスクリプションはキャンセルされonError()コールバックに伝播します。

出力検証

サブスクリプションは非同期イテレータであるため、出力を検証するにはイテレータを処理する必要があります。

zod v4の例

zAsyncIterable.ts
ts
import type { TrackedEnvelope } from '@trpc/server';
import { isTrackedEnvelope, tracked } from '@trpc/server';
import { z } from 'zod';
function isAsyncIterable<TValue, TReturn = unknown>(
value: unknown,
): value is AsyncIterable<TValue, TReturn> {
return !!value && typeof value === 'object' && Symbol.asyncIterator in value;
}
const trackedEnvelopeSchema =
z.custom<TrackedEnvelope<unknown>>(isTrackedEnvelope);
/**
* A Zod schema helper designed specifically for validating async iterables. This schema ensures that:
* 1. The value being validated is an async iterable.
* 2. Each item yielded by the async iterable conforms to a specified type.
* 3. The return value of the async iterable, if any, also conforms to a specified type.
*/
export function zAsyncIterable<
TYieldIn,
TYieldOut,
TReturnIn = void,
TReturnOut = void,
Tracked extends boolean = false,
>(opts: {
/**
* Validate the value yielded by the async generator
*/
yield: z.ZodType<TYieldIn, TYieldOut>;
/**
* Validate the return value of the async generator
* @remark not applicable for subscriptions
*/
return?: z.ZodType<TReturnIn, TReturnOut>;
/**
* Whether if the yielded values are tracked
* @remark only applicable for subscriptions
*/
tracked?: Tracked;
}) {
return z
.custom<
AsyncIterable<
Tracked extends true ? TrackedEnvelope<TYieldIn> : TYieldIn,
TReturnIn
>
>((val) => isAsyncIterable(val))
.transform(async function* (iter) {
const iterator = iter[Symbol.asyncIterator]();
try {
let next;
while ((next = await iterator.next()) && !next.done) {
if (opts.tracked) {
const [id, data] = trackedEnvelopeSchema.parse(next.value);
yield tracked(id, await opts.yield.parseAsync(data));
continue;
}
yield opts.yield.parseAsync(next.value);
}
if (opts.return) {
return await opts.return.parseAsync(next.value);
}
return;
} finally {
await iterator.return?.();
}
}) as z.ZodType<
AsyncIterable<
Tracked extends true ? TrackedEnvelope<TYieldIn> : TYieldIn,
TReturnIn,
unknown
>,
AsyncIterable<
Tracked extends true ? TrackedEnvelope<TYieldOut> : TYieldOut,
TReturnOut,
unknown
>
>;
}
zAsyncIterable.ts
ts
import type { TrackedEnvelope } from '@trpc/server';
import { isTrackedEnvelope, tracked } from '@trpc/server';
import { z } from 'zod';
function isAsyncIterable<TValue, TReturn = unknown>(
value: unknown,
): value is AsyncIterable<TValue, TReturn> {
return !!value && typeof value === 'object' && Symbol.asyncIterator in value;
}
const trackedEnvelopeSchema =
z.custom<TrackedEnvelope<unknown>>(isTrackedEnvelope);
/**
* A Zod schema helper designed specifically for validating async iterables. This schema ensures that:
* 1. The value being validated is an async iterable.
* 2. Each item yielded by the async iterable conforms to a specified type.
* 3. The return value of the async iterable, if any, also conforms to a specified type.
*/
export function zAsyncIterable<
TYieldIn,
TYieldOut,
TReturnIn = void,
TReturnOut = void,
Tracked extends boolean = false,
>(opts: {
/**
* Validate the value yielded by the async generator
*/
yield: z.ZodType<TYieldIn, TYieldOut>;
/**
* Validate the return value of the async generator
* @remark not applicable for subscriptions
*/
return?: z.ZodType<TReturnIn, TReturnOut>;
/**
* Whether if the yielded values are tracked
* @remark only applicable for subscriptions
*/
tracked?: Tracked;
}) {
return z
.custom<
AsyncIterable<
Tracked extends true ? TrackedEnvelope<TYieldIn> : TYieldIn,
TReturnIn
>
>((val) => isAsyncIterable(val))
.transform(async function* (iter) {
const iterator = iter[Symbol.asyncIterator]();
try {
let next;
while ((next = await iterator.next()) && !next.done) {
if (opts.tracked) {
const [id, data] = trackedEnvelopeSchema.parse(next.value);
yield tracked(id, await opts.yield.parseAsync(data));
continue;
}
yield opts.yield.parseAsync(next.value);
}
if (opts.return) {
return await opts.return.parseAsync(next.value);
}
return;
} finally {
await iterator.return?.();
}
}) as z.ZodType<
AsyncIterable<
Tracked extends true ? TrackedEnvelope<TYieldIn> : TYieldIn,
TReturnIn,
unknown
>,
AsyncIterable<
Tracked extends true ? TrackedEnvelope<TYieldOut> : TYieldOut,
TReturnOut,
unknown
>
>;
}

zod v3の例

zAsyncIterable.ts
ts
import type { TrackedEnvelope } from '@trpc/server';
import { isTrackedEnvelope, tracked } from '@trpc/server';
import { z } from 'zod';
function isAsyncIterable<TValue, TReturn = unknown>(
value: unknown,
): value is AsyncIterable<TValue, TReturn> {
return !!value && typeof value === 'object' && Symbol.asyncIterator in value;
}
const trackedEnvelopeSchema =
z.custom<TrackedEnvelope<unknown>>(isTrackedEnvelope);
/**
* A Zod schema helper designed specifically for validating async iterables. This schema ensures that:
* 1. The value being validated is an async iterable.
* 2. Each item yielded by the async iterable conforms to a specified type.
* 3. The return value of the async iterable, if any, also conforms to a specified type.
*/
export function zAsyncIterable<
TYieldIn,
TYieldOut,
TReturnIn = void,
TReturnOut = void,
Tracked extends boolean = false,
>(opts: {
/**
* Validate the value yielded by the async generator
*/
yield: z.ZodType<TYieldIn, any, TYieldOut>;
/**
* Validate the return value of the async generator
* @remark not applicable for subscriptions
*/
return?: z.ZodType<TReturnIn, any, TReturnOut>;
/**
* Whether if the yielded values are tracked
* @remark only applicable for subscriptions
*/
tracked?: Tracked;
}) {
return z
.custom<
AsyncIterable<
Tracked extends true ? TrackedEnvelope<TYieldIn> : TYieldIn,
TReturnIn
>
>((val) => isAsyncIterable(val))
.transform(async function* (iter) {
const iterator = iter[Symbol.asyncIterator]();
try {
let next;
while ((next = await iterator.next()) && !next.done) {
if (opts.tracked) {
const [id, data] = trackedEnvelopeSchema.parse(next.value);
yield tracked(id, await opts.yield.parseAsync(data));
continue;
}
yield opts.yield.parseAsync(next.value);
}
if (opts.return) {
return await opts.return.parseAsync(next.value);
}
return;
} finally {
await iterator.return?.();
}
}) as z.ZodType<
AsyncIterable<
Tracked extends true ? TrackedEnvelope<TYieldIn> : TYieldIn,
TReturnIn,
unknown
>,
any,
AsyncIterable<
Tracked extends true ? TrackedEnvelope<TYieldOut> : TYieldOut,
TReturnOut,
unknown
>
>;
}
zAsyncIterable.ts
ts
import type { TrackedEnvelope } from '@trpc/server';
import { isTrackedEnvelope, tracked } from '@trpc/server';
import { z } from 'zod';
function isAsyncIterable<TValue, TReturn = unknown>(
value: unknown,
): value is AsyncIterable<TValue, TReturn> {
return !!value && typeof value === 'object' && Symbol.asyncIterator in value;
}
const trackedEnvelopeSchema =
z.custom<TrackedEnvelope<unknown>>(isTrackedEnvelope);
/**
* A Zod schema helper designed specifically for validating async iterables. This schema ensures that:
* 1. The value being validated is an async iterable.
* 2. Each item yielded by the async iterable conforms to a specified type.
* 3. The return value of the async iterable, if any, also conforms to a specified type.
*/
export function zAsyncIterable<
TYieldIn,
TYieldOut,
TReturnIn = void,
TReturnOut = void,
Tracked extends boolean = false,
>(opts: {
/**
* Validate the value yielded by the async generator
*/
yield: z.ZodType<TYieldIn, any, TYieldOut>;
/**
* Validate the return value of the async generator
* @remark not applicable for subscriptions
*/
return?: z.ZodType<TReturnIn, any, TReturnOut>;
/**
* Whether if the yielded values are tracked
* @remark only applicable for subscriptions
*/
tracked?: Tracked;
}) {
return z
.custom<
AsyncIterable<
Tracked extends true ? TrackedEnvelope<TYieldIn> : TYieldIn,
TReturnIn
>
>((val) => isAsyncIterable(val))
.transform(async function* (iter) {
const iterator = iter[Symbol.asyncIterator]();
try {
let next;
while ((next = await iterator.next()) && !next.done) {
if (opts.tracked) {
const [id, data] = trackedEnvelopeSchema.parse(next.value);
yield tracked(id, await opts.yield.parseAsync(data));
continue;
}
yield opts.yield.parseAsync(next.value);
}
if (opts.return) {
return await opts.return.parseAsync(next.value);
}
return;
} finally {
await iterator.return?.();
}
}) as z.ZodType<
AsyncIterable<
Tracked extends true ? TrackedEnvelope<TYieldIn> : TYieldIn,
TReturnIn,
unknown
>,
any,
AsyncIterable<
Tracked extends true ? TrackedEnvelope<TYieldOut> : TYieldOut,
TReturnOut,
unknown
>
>;
}

このヘルパーを使用して、サブスクリプションプロシージャの出力を検証できるようになります:

_app.ts
ts
import { publicProcedure, router } from '../trpc';
import { zAsyncIterable } from './zAsyncIterable';
export const appRouter = router({
mySubscription: publicProcedure
.input(
z.object({
lastEventId: z.coerce.number().min(0).optional(),
}),
)
.output(
zAsyncIterable({
yield: z.object({
count: z.number(),
}),
tracked: true,
}),
)
.subscription(async function* (opts) {
let index = opts.input.lastEventId ?? 0;
while (true) {
index++;
yield tracked(index, {
count: index,
});
await new Promise((resolve) => setTimeout(resolve, 1000));
}
}),
});
_app.ts
ts
import { publicProcedure, router } from '../trpc';
import { zAsyncIterable } from './zAsyncIterable';
export const appRouter = router({
mySubscription: publicProcedure
.input(
z.object({
lastEventId: z.coerce.number().min(0).optional(),
}),
)
.output(
zAsyncIterable({
yield: z.object({
count: z.number(),
}),
tracked: true,
}),
)
.subscription(async function* (opts) {
let index = opts.input.lastEventId ?? 0;
while (true) {
index++;
yield tracked(index, {
count: index,
});
await new Promise((resolve) => setTimeout(resolve, 1000));
}
}),
});