サブスクリプション
このページは PageTurner AI で翻訳されました(ベータ版)。プロジェクト公式の承認はありません。 エラーを見つけましたか? 問題を報告 →
はじめに
サブスクリプションはクライアントとサーバー間のリアルタイムイベントストリームです。クライアントにリアルタイム更新をプッシュする必要がある場合に使用します。
tRPCのサブスクリプションでは、クライアントはサーバーへの永続的な接続を確立・維持し、tracked()イベントの助けを借りて切断時に自動的に再接続を試み、正常に復旧します。
WebSocketとServer-sent Eventsの選択
tRPCでリアルタイムサブスクリプションを設定するには、WebSocketまたはServer-sent Events(SSE)のいずれかを使用できます。
-
WebSocketについてはWebSocketページを参照
-
SSEについてはhttpSubscriptionLinkを参照
どちらを選択すべきか迷った場合、サブスクリプションにはSSEの使用をお勧めします。設定が容易でWebSocketサーバーの構築が不要なためです。
参考プロジェクト
| Type | Example Type | Link |
|---|---|---|
| WebSockets | Bare-minimum Node.js WebSockets example | /examples/standalone-server |
| SSE | Full-stack SSE implementation | github.com/trpc/examples-next-sse-chat |
| WebSockets | Full-stack WebSockets implementation | github.com/trpc/examples-next-prisma-websockets-starter |
基本例
完全な例は当社のフルスタックSSEサンプルをご覧ください。
server.tstsimport { initTRPC } from '@trpc/server';const t = initTRPC.create();const ee = new EventEmitter();export const appRouter = router({onPostAdd: publicProcedure.subscription(async function* (opts) {// listen for new eventsfor await (const [data] of on(ee, 'add', {// Passing the AbortSignal from the request automatically cancels the event emitter when the request is abortedsignal: opts.signal,})) {const post = data as Post;yield post;}}),});
server.tstsimport { initTRPC } from '@trpc/server';const t = initTRPC.create();const ee = new EventEmitter();export const appRouter = router({onPostAdd: publicProcedure.subscription(async function* (opts) {// listen for new eventsfor await (const [data] of on(ee, 'add', {// Passing the AbortSignal from the request automatically cancels the event emitter when the request is abortedsignal: opts.signal,})) {const post = data as Post;yield post;}}),});
tracked()を使用したIDの自動追跡(推奨)
tracked()ヘルパーを使用してidを含むイベントをyieldすると、クライアントは切断時に自動的に再接続し、最後に確認されたIDを送信します。
サブスクリプションの初期化時にlastEventIdを送信すると、ブラウザがデータを受信するたびに自動的に更新されます。
-
SSEの場合、これは
EventSource仕様の一部であり、.input()のlastEventId経由で伝播されます。 -
WebSocketの場合、当社の
wsLinkが最後に確認されたIDを自動送信し、ブラウザがデータを受信すると更新します。
lastEventIdに基づいてデータを取得する場合、すべてのイベントの捕捉が重要であれば、当社のフルスタックSSEサンプルで行っているように、データベースからイベントを取得する前にイベントリスナーを設定してください。これにより、lastEventIdに基づく元のバッチをyield中に新しく発生したイベントが無視されるのを防げます。
tsimport EventEmitter, { on } from 'events';import type { Post } from '@prisma/client';import { tracked } from '@trpc/server';import { z } from 'zod';import { publicProcedure, router } from '../trpc';const ee = new EventEmitter();export const subRouter = router({onPostAdd: publicProcedure.input(z.object({// lastEventId is the last event id that the client has received// On the first call, it will be whatever was passed in the initial setup// If the client reconnects, it will be the last event id that the client receivedlastEventId: z.string().nullish(),}).optional(),).subscription(async function* (opts) {// We start by subscribing to the ee so that we don't miss any new events while fetchingconst iterable = ee.toIterable('add', {// Passing the AbortSignal from the request automatically cancels the event emitter when the request is abortedsignal: opts.signal,});if (opts.input.lastEventId) {// [...] get the posts since the last event id and yield them// const items = await db.post.findMany({ ... })// for (const item of items) {// yield tracked(item.id, item);// }}// listen for new eventsfor await (const [data] of on(ee, 'add', {signal: opts.signal,})) {const post = data as Post;// tracking the post id ensures the client can reconnect at any time and get the latest events this idyield tracked(post.id, post);}}),});
tsimport EventEmitter, { on } from 'events';import type { Post } from '@prisma/client';import { tracked } from '@trpc/server';import { z } from 'zod';import { publicProcedure, router } from '../trpc';const ee = new EventEmitter();export const subRouter = router({onPostAdd: publicProcedure.input(z.object({// lastEventId is the last event id that the client has received// On the first call, it will be whatever was passed in the initial setup// If the client reconnects, it will be the last event id that the client receivedlastEventId: z.string().nullish(),}).optional(),).subscription(async function* (opts) {// We start by subscribing to the ee so that we don't miss any new events while fetchingconst iterable = ee.toIterable('add', {// Passing the AbortSignal from the request automatically cancels the event emitter when the request is abortedsignal: opts.signal,});if (opts.input.lastEventId) {// [...] get the posts since the last event id and yield them// const items = await db.post.findMany({ ... })// for (const item of items) {// yield tracked(item.id, item);// }}// listen for new eventsfor await (const [data] of on(ee, 'add', {signal: opts.signal,})) {const post = data as Post;// tracking the post id ensures the client can reconnect at any time and get the latest events this idyield tracked(post.id, post);}}),});
ループでのデータ取得
この方法は、データベースなどのソースから定期的に新しいデータをチェックし、クライアントにプッシュしたい場合に有用です。
server.tstsimport type { Post } from '@prisma/client';import { tracked } from '@trpc/server';import { z } from 'zod';import { publicProcedure, router } from '../trpc';export const subRouter = router({onPostAdd: publicProcedure.input(z.object({// lastEventId is the last event id that the client has received// On the first call, it will be whatever was passed in the initial setup// If the client reconnects, it will be the last event id that the client received// The id is the createdAt of the postlastEventId: z.coerce.date().nullish(),}),).subscription(async function* (opts) {// `opts.signal` is an AbortSignal that will be aborted when the client disconnects.let lastEventId = opts.input?.lastEventId ?? null;// We use a `while` loop that checks `!opts.signal.aborted`while (!opts.signal!.aborted) {const posts = await db.post.findMany({// If we have a `lastEventId`, we only fetch posts created after it.where: lastEventId? {createdAt: {gt: lastEventId,},}: undefined,orderBy: {createdAt: 'asc',},});for (const post of posts) {// `tracked` is a helper that sends an `id` with each event.// This allows the client to resume from the last received event upon reconnection.yield tracked(post.createdAt.toJSON(), post);lastEventId = post.createdAt;}// Wait for a bit before polling again to avoid hammering the database.await sleep(1_000);}}),});
server.tstsimport type { Post } from '@prisma/client';import { tracked } from '@trpc/server';import { z } from 'zod';import { publicProcedure, router } from '../trpc';export const subRouter = router({onPostAdd: publicProcedure.input(z.object({// lastEventId is the last event id that the client has received// On the first call, it will be whatever was passed in the initial setup// If the client reconnects, it will be the last event id that the client received// The id is the createdAt of the postlastEventId: z.coerce.date().nullish(),}),).subscription(async function* (opts) {// `opts.signal` is an AbortSignal that will be aborted when the client disconnects.let lastEventId = opts.input?.lastEventId ?? null;// We use a `while` loop that checks `!opts.signal.aborted`while (!opts.signal!.aborted) {const posts = await db.post.findMany({// If we have a `lastEventId`, we only fetch posts created after it.where: lastEventId? {createdAt: {gt: lastEventId,},}: undefined,orderBy: {createdAt: 'asc',},});for (const post of posts) {// `tracked` is a helper that sends an `id` with each event.// This allows the client to resume from the last received event upon reconnection.yield tracked(post.createdAt.toJSON(), post);lastEventId = post.createdAt;}// Wait for a bit before polling again to avoid hammering the database.await sleep(1_000);}}),});
サーバー側からのサブスクリプション停止
サーバーからサブスクリプションを停止する必要がある場合は、ジェネレーター関数内で単にreturnします。
tsimport { publicProcedure, router } from '../trpc';export const subRouter = router({onPostAdd: publicProcedure.input(z.object({lastEventId: z.string().coerce.number().min(0).optional(),}),).subscription(async function* (opts) {let index = opts.input.lastEventId ?? 0;while (!opts.signal!.aborted) {const idx = index++;if (idx > 100) {// With this, the subscription will stop and the client will disconnectreturn;}await new Promise((resolve) => setTimeout(resolve, 10));}}}),});
tsimport { publicProcedure, router } from '../trpc';export const subRouter = router({onPostAdd: publicProcedure.input(z.object({lastEventId: z.string().coerce.number().min(0).optional(),}),).subscription(async function* (opts) {let index = opts.input.lastEventId ?? 0;while (!opts.signal!.aborted) {const idx = index++;if (idx > 100) {// With this, the subscription will stop and the client will disconnectreturn;}await new Promise((resolve) => setTimeout(resolve, 10));}}}),});
クライアント側では、サブスクリプションを.unsubscribe()するだけです。
副作用のクリーンアップ
サブスクリプションの副作用をクリーンアップする必要がある場合は、try...finallyパターンを使用できます。trpcはサブスクリプションが何らかの理由で停止するとジェネレーターインスタンスの.return()を呼び出すためです。
tsimport EventEmitter, { on } from 'events';import type { Post } from '@prisma/client';import { z } from 'zod';import { publicProcedure, router } from '../trpc';const ee = new EventEmitter();export const subRouter = router({onPostAdd: publicProcedure.subscription(async function* (opts) {let timeout;try {for await (const [data] of on(ee, 'add', {signal: opts.signal,})) {timeout = setTimeout(() => console.log('Pretend like this is useful'));const post = data as Post;yield post;}} finally {if (timeout) clearTimeout(timeout);}}),});
tsimport EventEmitter, { on } from 'events';import type { Post } from '@prisma/client';import { z } from 'zod';import { publicProcedure, router } from '../trpc';const ee = new EventEmitter();export const subRouter = router({onPostAdd: publicProcedure.subscription(async function* (opts) {let timeout;try {for await (const [data] of on(ee, 'add', {signal: opts.signal,})) {timeout = setTimeout(() => console.log('Pretend like this is useful'));const post = data as Post;yield post;}} finally {if (timeout) clearTimeout(timeout);}}),});
エラー処理
ジェネレーター関数内でエラーをスローすると、バックエンドのtrpcのonError()に伝播します。
スローされたエラーが5xxエラーの場合、クライアントはtracked()で追跡された最後のイベントIDに基づいて自動的に再接続を試みます。それ以外のエラーの場合、サブスクリプションはキャンセルされonError()コールバックに伝播します。
出力検証
サブスクリプションは非同期イテレータであるため、出力を検証するにはイテレータを処理する必要があります。
zod v4の例
zAsyncIterable.tstsimport type { TrackedEnvelope } from '@trpc/server';import { isTrackedEnvelope, tracked } from '@trpc/server';import { z } from 'zod';function isAsyncIterable<TValue, TReturn = unknown>(value: unknown,): value is AsyncIterable<TValue, TReturn> {return !!value && typeof value === 'object' && Symbol.asyncIterator in value;}const trackedEnvelopeSchema =z.custom<TrackedEnvelope<unknown>>(isTrackedEnvelope);/*** A Zod schema helper designed specifically for validating async iterables. This schema ensures that:* 1. The value being validated is an async iterable.* 2. Each item yielded by the async iterable conforms to a specified type.* 3. The return value of the async iterable, if any, also conforms to a specified type.*/export function zAsyncIterable<TYieldIn,TYieldOut,TReturnIn = void,TReturnOut = void,Tracked extends boolean = false,>(opts: {/*** Validate the value yielded by the async generator*/yield: z.ZodType<TYieldIn, TYieldOut>;/*** Validate the return value of the async generator* @remark not applicable for subscriptions*/return?: z.ZodType<TReturnIn, TReturnOut>;/*** Whether if the yielded values are tracked* @remark only applicable for subscriptions*/tracked?: Tracked;}) {return z.custom<AsyncIterable<Tracked extends true ? TrackedEnvelope<TYieldIn> : TYieldIn,TReturnIn>>((val) => isAsyncIterable(val)).transform(async function* (iter) {const iterator = iter[Symbol.asyncIterator]();try {let next;while ((next = await iterator.next()) && !next.done) {if (opts.tracked) {const [id, data] = trackedEnvelopeSchema.parse(next.value);yield tracked(id, await opts.yield.parseAsync(data));continue;}yield opts.yield.parseAsync(next.value);}if (opts.return) {return await opts.return.parseAsync(next.value);}return;} finally {await iterator.return?.();}}) as z.ZodType<AsyncIterable<Tracked extends true ? TrackedEnvelope<TYieldIn> : TYieldIn,TReturnIn,unknown>,AsyncIterable<Tracked extends true ? TrackedEnvelope<TYieldOut> : TYieldOut,TReturnOut,unknown>>;}
zAsyncIterable.tstsimport type { TrackedEnvelope } from '@trpc/server';import { isTrackedEnvelope, tracked } from '@trpc/server';import { z } from 'zod';function isAsyncIterable<TValue, TReturn = unknown>(value: unknown,): value is AsyncIterable<TValue, TReturn> {return !!value && typeof value === 'object' && Symbol.asyncIterator in value;}const trackedEnvelopeSchema =z.custom<TrackedEnvelope<unknown>>(isTrackedEnvelope);/*** A Zod schema helper designed specifically for validating async iterables. This schema ensures that:* 1. The value being validated is an async iterable.* 2. Each item yielded by the async iterable conforms to a specified type.* 3. The return value of the async iterable, if any, also conforms to a specified type.*/export function zAsyncIterable<TYieldIn,TYieldOut,TReturnIn = void,TReturnOut = void,Tracked extends boolean = false,>(opts: {/*** Validate the value yielded by the async generator*/yield: z.ZodType<TYieldIn, TYieldOut>;/*** Validate the return value of the async generator* @remark not applicable for subscriptions*/return?: z.ZodType<TReturnIn, TReturnOut>;/*** Whether if the yielded values are tracked* @remark only applicable for subscriptions*/tracked?: Tracked;}) {return z.custom<AsyncIterable<Tracked extends true ? TrackedEnvelope<TYieldIn> : TYieldIn,TReturnIn>>((val) => isAsyncIterable(val)).transform(async function* (iter) {const iterator = iter[Symbol.asyncIterator]();try {let next;while ((next = await iterator.next()) && !next.done) {if (opts.tracked) {const [id, data] = trackedEnvelopeSchema.parse(next.value);yield tracked(id, await opts.yield.parseAsync(data));continue;}yield opts.yield.parseAsync(next.value);}if (opts.return) {return await opts.return.parseAsync(next.value);}return;} finally {await iterator.return?.();}}) as z.ZodType<AsyncIterable<Tracked extends true ? TrackedEnvelope<TYieldIn> : TYieldIn,TReturnIn,unknown>,AsyncIterable<Tracked extends true ? TrackedEnvelope<TYieldOut> : TYieldOut,TReturnOut,unknown>>;}
zod v3の例
zAsyncIterable.tstsimport type { TrackedEnvelope } from '@trpc/server';import { isTrackedEnvelope, tracked } from '@trpc/server';import { z } from 'zod';function isAsyncIterable<TValue, TReturn = unknown>(value: unknown,): value is AsyncIterable<TValue, TReturn> {return !!value && typeof value === 'object' && Symbol.asyncIterator in value;}const trackedEnvelopeSchema =z.custom<TrackedEnvelope<unknown>>(isTrackedEnvelope);/*** A Zod schema helper designed specifically for validating async iterables. This schema ensures that:* 1. The value being validated is an async iterable.* 2. Each item yielded by the async iterable conforms to a specified type.* 3. The return value of the async iterable, if any, also conforms to a specified type.*/export function zAsyncIterable<TYieldIn,TYieldOut,TReturnIn = void,TReturnOut = void,Tracked extends boolean = false,>(opts: {/*** Validate the value yielded by the async generator*/yield: z.ZodType<TYieldIn, any, TYieldOut>;/*** Validate the return value of the async generator* @remark not applicable for subscriptions*/return?: z.ZodType<TReturnIn, any, TReturnOut>;/*** Whether if the yielded values are tracked* @remark only applicable for subscriptions*/tracked?: Tracked;}) {return z.custom<AsyncIterable<Tracked extends true ? TrackedEnvelope<TYieldIn> : TYieldIn,TReturnIn>>((val) => isAsyncIterable(val)).transform(async function* (iter) {const iterator = iter[Symbol.asyncIterator]();try {let next;while ((next = await iterator.next()) && !next.done) {if (opts.tracked) {const [id, data] = trackedEnvelopeSchema.parse(next.value);yield tracked(id, await opts.yield.parseAsync(data));continue;}yield opts.yield.parseAsync(next.value);}if (opts.return) {return await opts.return.parseAsync(next.value);}return;} finally {await iterator.return?.();}}) as z.ZodType<AsyncIterable<Tracked extends true ? TrackedEnvelope<TYieldIn> : TYieldIn,TReturnIn,unknown>,any,AsyncIterable<Tracked extends true ? TrackedEnvelope<TYieldOut> : TYieldOut,TReturnOut,unknown>>;}
zAsyncIterable.tstsimport type { TrackedEnvelope } from '@trpc/server';import { isTrackedEnvelope, tracked } from '@trpc/server';import { z } from 'zod';function isAsyncIterable<TValue, TReturn = unknown>(value: unknown,): value is AsyncIterable<TValue, TReturn> {return !!value && typeof value === 'object' && Symbol.asyncIterator in value;}const trackedEnvelopeSchema =z.custom<TrackedEnvelope<unknown>>(isTrackedEnvelope);/*** A Zod schema helper designed specifically for validating async iterables. This schema ensures that:* 1. The value being validated is an async iterable.* 2. Each item yielded by the async iterable conforms to a specified type.* 3. The return value of the async iterable, if any, also conforms to a specified type.*/export function zAsyncIterable<TYieldIn,TYieldOut,TReturnIn = void,TReturnOut = void,Tracked extends boolean = false,>(opts: {/*** Validate the value yielded by the async generator*/yield: z.ZodType<TYieldIn, any, TYieldOut>;/*** Validate the return value of the async generator* @remark not applicable for subscriptions*/return?: z.ZodType<TReturnIn, any, TReturnOut>;/*** Whether if the yielded values are tracked* @remark only applicable for subscriptions*/tracked?: Tracked;}) {return z.custom<AsyncIterable<Tracked extends true ? TrackedEnvelope<TYieldIn> : TYieldIn,TReturnIn>>((val) => isAsyncIterable(val)).transform(async function* (iter) {const iterator = iter[Symbol.asyncIterator]();try {let next;while ((next = await iterator.next()) && !next.done) {if (opts.tracked) {const [id, data] = trackedEnvelopeSchema.parse(next.value);yield tracked(id, await opts.yield.parseAsync(data));continue;}yield opts.yield.parseAsync(next.value);}if (opts.return) {return await opts.return.parseAsync(next.value);}return;} finally {await iterator.return?.();}}) as z.ZodType<AsyncIterable<Tracked extends true ? TrackedEnvelope<TYieldIn> : TYieldIn,TReturnIn,unknown>,any,AsyncIterable<Tracked extends true ? TrackedEnvelope<TYieldOut> : TYieldOut,TReturnOut,unknown>>;}
このヘルパーを使用して、サブスクリプションプロシージャの出力を検証できるようになります:
_app.tstsimport { publicProcedure, router } from '../trpc';import { zAsyncIterable } from './zAsyncIterable';export const appRouter = router({mySubscription: publicProcedure.input(z.object({lastEventId: z.coerce.number().min(0).optional(),}),).output(zAsyncIterable({yield: z.object({count: z.number(),}),tracked: true,}),).subscription(async function* (opts) {let index = opts.input.lastEventId ?? 0;while (true) {index++;yield tracked(index, {count: index,});await new Promise((resolve) => setTimeout(resolve, 1000));}}),});
_app.tstsimport { publicProcedure, router } from '../trpc';import { zAsyncIterable } from './zAsyncIterable';export const appRouter = router({mySubscription: publicProcedure.input(z.object({lastEventId: z.coerce.number().min(0).optional(),}),).output(zAsyncIterable({yield: z.object({count: z.number(),}),tracked: true,}),).subscription(async function* (opts) {let index = opts.input.lastEventId ?? 0;while (true) {index++;yield tracked(index, {count: index,});await new Promise((resolve) => setTimeout(resolve, 1000));}}),});