본문 바로가기
버전: 11.x

구독

비공식 베타 번역

이 페이지는 PageTurner AI로 번역되었습니다(베타). 프로젝트 공식 승인을 받지 않았습니다. 오류를 발견하셨나요? 문제 신고 →

소개

구독(Subscriptions)은 클라이언트와 서버 간의 실시간 이벤트 스트림입니다. 클라이언트에 실시간 업데이트를 푸시해야 할 때 사용하세요.

tRPC의 구독 기능을 사용하면 클라이언트가 서버와 지속적인 연결을 설정 및 유지하며, tracked() 이벤트를 통해 연결이 끊어졌을 때 자동으로 재연결을 시도하고 정상적으로 복구됩니다.

WebSockets vs Server-sent Events?

tRPC에서 실시간 구독을 설정하려면 WebSockets 또는 Server-sent Events(SSE)를 사용할 수 있습니다.

선택이 어려운 경우 구독에는 SSE 사용을 권장합니다. 설정이 더 쉽고 WebSocket 서버를 별도로 구성할 필요가 없기 때문입니다.

참조 프로젝트

TypeExample TypeLink
WebSocketsBare-minimum Node.js WebSockets example/examples/standalone-server
SSEFull-stack SSE implementationgithub.com/trpc/examples-next-sse-chat
WebSocketsFull-stack WebSockets implementationgithub.com/trpc/examples-next-prisma-websockets-starter

기본 예시

전체 예시는 풀스택 SSE 예시 프로젝트를 참조하세요.

server.ts
ts
import { initTRPC } from '@trpc/server';
const t = initTRPC.create();
const ee = new EventEmitter();
export const appRouter = router({
onPostAdd: publicProcedure.subscription(async function* (opts) {
// listen for new events
for await (const [data] of on(ee, 'add', {
// Passing the AbortSignal from the request automatically cancels the event emitter when the request is aborted
signal: opts.signal,
})) {
const post = data as Post;
yield post;
}
}),
});
server.ts
ts
import { initTRPC } from '@trpc/server';
const t = initTRPC.create();
const ee = new EventEmitter();
export const appRouter = router({
onPostAdd: publicProcedure.subscription(async function* (opts) {
// listen for new events
for await (const [data] of on(ee, 'add', {
// Passing the AbortSignal from the request automatically cancels the event emitter when the request is aborted
signal: opts.signal,
})) {
const post = data as Post;
yield post;
}
}),
});

tracked()를 이용한 ID 자동 추적 (권장)

tracked() 헬퍼를 사용해 이벤트를 yield할 때 id를 포함시키면, 클라이언트 연결이 끊어졌을 때 자동으로 재연결되며 마지막으로 확인된 ID를 전송합니다.

구독 초기화 시 초기 lastEventId를 전송할 수 있으며, 브라우저가 데이터를 수신하면 자동으로 업데이트됩니다.

  • SSE의 경우 이는 EventSource 명세의 일부이며, .input()lastEventId를 통해 전파됩니다.

  • WebSockets의 경우 wsLink가 마지막으로 확인된 ID를 자동으로 전송하고, 브라우저가 데이터를 수신하면 업데이트합니다.

lastEventId를 기반으로 데이터를 가져오고 모든 이벤트 캡처가 중요한 경우, 풀스택 SSE 예시에서처럼 데이터베이스에서 이벤트를 가져오기 전에 이벤트 리스너를 설정하세요. 이렇게 하면 lastEventId를 기반으로 한 원본 배치를 yield하는 동안 새로 발생한 이벤트가 무시되는 것을 방지할 수 있습니다.

ts
import EventEmitter, { on } from 'events';
import type { Post } from '@prisma/client';
import { tracked } from '@trpc/server';
import { z } from 'zod';
import { publicProcedure, router } from '../trpc';
const ee = new EventEmitter();
export const subRouter = router({
onPostAdd: publicProcedure
.input(
z
.object({
// lastEventId is the last event id that the client has received
// On the first call, it will be whatever was passed in the initial setup
// If the client reconnects, it will be the last event id that the client received
lastEventId: z.string().nullish(),
})
.optional(),
)
.subscription(async function* (opts) {
// We start by subscribing to the ee so that we don't miss any new events while fetching
const iterable = ee.toIterable('add', {
// Passing the AbortSignal from the request automatically cancels the event emitter when the request is aborted
signal: opts.signal,
});
if (opts.input.lastEventId) {
// [...] get the posts since the last event id and yield them
// const items = await db.post.findMany({ ... })
// for (const item of items) {
// yield tracked(item.id, item);
// }
}
// listen for new events
for await (const [data] of on(ee, 'add', {
signal: opts.signal,
})) {
const post = data as Post;
// tracking the post id ensures the client can reconnect at any time and get the latest events this id
yield tracked(post.id, post);
}
}),
});
ts
import EventEmitter, { on } from 'events';
import type { Post } from '@prisma/client';
import { tracked } from '@trpc/server';
import { z } from 'zod';
import { publicProcedure, router } from '../trpc';
const ee = new EventEmitter();
export const subRouter = router({
onPostAdd: publicProcedure
.input(
z
.object({
// lastEventId is the last event id that the client has received
// On the first call, it will be whatever was passed in the initial setup
// If the client reconnects, it will be the last event id that the client received
lastEventId: z.string().nullish(),
})
.optional(),
)
.subscription(async function* (opts) {
// We start by subscribing to the ee so that we don't miss any new events while fetching
const iterable = ee.toIterable('add', {
// Passing the AbortSignal from the request automatically cancels the event emitter when the request is aborted
signal: opts.signal,
});
if (opts.input.lastEventId) {
// [...] get the posts since the last event id and yield them
// const items = await db.post.findMany({ ... })
// for (const item of items) {
// yield tracked(item.id, item);
// }
}
// listen for new events
for await (const [data] of on(ee, 'add', {
signal: opts.signal,
})) {
const post = data as Post;
// tracking the post id ensures the client can reconnect at any time and get the latest events this id
yield tracked(post.id, post);
}
}),
});

루프에서 데이터 풀링

이 패턴은 데이터베이스 같은 소스에서 주기적으로 새 데이터를 확인해 클라이언트에 푸시해야 할 때 유용합니다.

server.ts
ts
import type { Post } from '@prisma/client';
import { tracked } from '@trpc/server';
import { z } from 'zod';
import { publicProcedure, router } from '../trpc';
export const subRouter = router({
onPostAdd: publicProcedure
.input(
z.object({
// lastEventId is the last event id that the client has received
// On the first call, it will be whatever was passed in the initial setup
// If the client reconnects, it will be the last event id that the client received
// The id is the createdAt of the post
lastEventId: z.coerce.date().nullish(),
}),
)
.subscription(async function* (opts) {
// `opts.signal` is an AbortSignal that will be aborted when the client disconnects.
let lastEventId = opts.input?.lastEventId ?? null;
// We use a `while` loop that checks `!opts.signal.aborted`
while (!opts.signal!.aborted) {
const posts = await db.post.findMany({
// If we have a `lastEventId`, we only fetch posts created after it.
where: lastEventId
? {
createdAt: {
gt: lastEventId,
},
}
: undefined,
orderBy: {
createdAt: 'asc',
},
});
for (const post of posts) {
// `tracked` is a helper that sends an `id` with each event.
// This allows the client to resume from the last received event upon reconnection.
yield tracked(post.createdAt.toJSON(), post);
lastEventId = post.createdAt;
}
// Wait for a bit before polling again to avoid hammering the database.
await sleep(1_000);
}
}),
});
server.ts
ts
import type { Post } from '@prisma/client';
import { tracked } from '@trpc/server';
import { z } from 'zod';
import { publicProcedure, router } from '../trpc';
export const subRouter = router({
onPostAdd: publicProcedure
.input(
z.object({
// lastEventId is the last event id that the client has received
// On the first call, it will be whatever was passed in the initial setup
// If the client reconnects, it will be the last event id that the client received
// The id is the createdAt of the post
lastEventId: z.coerce.date().nullish(),
}),
)
.subscription(async function* (opts) {
// `opts.signal` is an AbortSignal that will be aborted when the client disconnects.
let lastEventId = opts.input?.lastEventId ?? null;
// We use a `while` loop that checks `!opts.signal.aborted`
while (!opts.signal!.aborted) {
const posts = await db.post.findMany({
// If we have a `lastEventId`, we only fetch posts created after it.
where: lastEventId
? {
createdAt: {
gt: lastEventId,
},
}
: undefined,
orderBy: {
createdAt: 'asc',
},
});
for (const post of posts) {
// `tracked` is a helper that sends an `id` with each event.
// This allows the client to resume from the last received event upon reconnection.
yield tracked(post.createdAt.toJSON(), post);
lastEventId = post.createdAt;
}
// Wait for a bit before polling again to avoid hammering the database.
await sleep(1_000);
}
}),
});

서버에서 구독 중단하기

서버에서 구독을 중단해야 할 경우, 생성자 함수(generator function)에서 return하세요.

ts
import { publicProcedure, router } from '../trpc';
export const subRouter = router({
onPostAdd: publicProcedure
.input(
z.object({
lastEventId: z.string().coerce.number().min(0).optional(),
}),
)
.subscription(async function* (opts) {
let index = opts.input.lastEventId ?? 0;
while (!opts.signal!.aborted) {
const idx = index++;
if (idx > 100) {
// With this, the subscription will stop and the client will disconnect
return;
}
await new Promise((resolve) => setTimeout(resolve, 10));
}
}
}),
});
ts
import { publicProcedure, router } from '../trpc';
export const subRouter = router({
onPostAdd: publicProcedure
.input(
z.object({
lastEventId: z.string().coerce.number().min(0).optional(),
}),
)
.subscription(async function* (opts) {
let index = opts.input.lastEventId ?? 0;
while (!opts.signal!.aborted) {
const idx = index++;
if (idx > 100) {
// With this, the subscription will stop and the client will disconnect
return;
}
await new Promise((resolve) => setTimeout(resolve, 10));
}
}
}),
});

클라이언트에서는 구독 객체에 .unsubscribe()를 호출하면 됩니다.

부작용 정리

구독의 부작용을 정리해야 한다면 try...finally 패턴을 사용하세요. trpc는 구독이 중단될 때 생성자 인스턴스의 .return()을 호출합니다.

ts
import EventEmitter, { on } from 'events';
import type { Post } from '@prisma/client';
import { z } from 'zod';
import { publicProcedure, router } from '../trpc';
const ee = new EventEmitter();
export const subRouter = router({
onPostAdd: publicProcedure.subscription(async function* (opts) {
let timeout;
try {
for await (const [data] of on(ee, 'add', {
signal: opts.signal,
})) {
timeout = setTimeout(() => console.log('Pretend like this is useful'));
const post = data as Post;
yield post;
}
} finally {
if (timeout) clearTimeout(timeout);
}
}),
});
ts
import EventEmitter, { on } from 'events';
import type { Post } from '@prisma/client';
import { z } from 'zod';
import { publicProcedure, router } from '../trpc';
const ee = new EventEmitter();
export const subRouter = router({
onPostAdd: publicProcedure.subscription(async function* (opts) {
let timeout;
try {
for await (const [data] of on(ee, 'add', {
signal: opts.signal,
})) {
timeout = setTimeout(() => console.log('Pretend like this is useful'));
const post = data as Post;
yield post;
}
} finally {
if (timeout) clearTimeout(timeout);
}
}),
});

오류 처리

생성자 함수에서 오류를 throw하면 백엔드의 trpc onError()로 전파됩니다.

throw된 오류가 5xx 오류인 경우, 클라이언트는 tracked()로 추적된 마지막 이벤트 ID를 기반으로 자동 재연결을 시도합니다. 다른 오류의 경우 구독이 취소되고 onError() 콜백으로 전파됩니다.

출력 유효성 검사

구독은 비동기 이터레이터(async iterators)이므로 출력을 검증하려면 반드시 이터레이터를 순회해야 합니다.

zod v4 예시

zAsyncIterable.ts
ts
import type { TrackedEnvelope } from '@trpc/server';
import { isTrackedEnvelope, tracked } from '@trpc/server';
import { z } from 'zod';
function isAsyncIterable<TValue, TReturn = unknown>(
value: unknown,
): value is AsyncIterable<TValue, TReturn> {
return !!value && typeof value === 'object' && Symbol.asyncIterator in value;
}
const trackedEnvelopeSchema =
z.custom<TrackedEnvelope<unknown>>(isTrackedEnvelope);
/**
* A Zod schema helper designed specifically for validating async iterables. This schema ensures that:
* 1. The value being validated is an async iterable.
* 2. Each item yielded by the async iterable conforms to a specified type.
* 3. The return value of the async iterable, if any, also conforms to a specified type.
*/
export function zAsyncIterable<
TYieldIn,
TYieldOut,
TReturnIn = void,
TReturnOut = void,
Tracked extends boolean = false,
>(opts: {
/**
* Validate the value yielded by the async generator
*/
yield: z.ZodType<TYieldIn, TYieldOut>;
/**
* Validate the return value of the async generator
* @remark not applicable for subscriptions
*/
return?: z.ZodType<TReturnIn, TReturnOut>;
/**
* Whether if the yielded values are tracked
* @remark only applicable for subscriptions
*/
tracked?: Tracked;
}) {
return z
.custom<
AsyncIterable<
Tracked extends true ? TrackedEnvelope<TYieldIn> : TYieldIn,
TReturnIn
>
>((val) => isAsyncIterable(val))
.transform(async function* (iter) {
const iterator = iter[Symbol.asyncIterator]();
try {
let next;
while ((next = await iterator.next()) && !next.done) {
if (opts.tracked) {
const [id, data] = trackedEnvelopeSchema.parse(next.value);
yield tracked(id, await opts.yield.parseAsync(data));
continue;
}
yield opts.yield.parseAsync(next.value);
}
if (opts.return) {
return await opts.return.parseAsync(next.value);
}
return;
} finally {
await iterator.return?.();
}
}) as z.ZodType<
AsyncIterable<
Tracked extends true ? TrackedEnvelope<TYieldIn> : TYieldIn,
TReturnIn,
unknown
>,
AsyncIterable<
Tracked extends true ? TrackedEnvelope<TYieldOut> : TYieldOut,
TReturnOut,
unknown
>
>;
}
zAsyncIterable.ts
ts
import type { TrackedEnvelope } from '@trpc/server';
import { isTrackedEnvelope, tracked } from '@trpc/server';
import { z } from 'zod';
function isAsyncIterable<TValue, TReturn = unknown>(
value: unknown,
): value is AsyncIterable<TValue, TReturn> {
return !!value && typeof value === 'object' && Symbol.asyncIterator in value;
}
const trackedEnvelopeSchema =
z.custom<TrackedEnvelope<unknown>>(isTrackedEnvelope);
/**
* A Zod schema helper designed specifically for validating async iterables. This schema ensures that:
* 1. The value being validated is an async iterable.
* 2. Each item yielded by the async iterable conforms to a specified type.
* 3. The return value of the async iterable, if any, also conforms to a specified type.
*/
export function zAsyncIterable<
TYieldIn,
TYieldOut,
TReturnIn = void,
TReturnOut = void,
Tracked extends boolean = false,
>(opts: {
/**
* Validate the value yielded by the async generator
*/
yield: z.ZodType<TYieldIn, TYieldOut>;
/**
* Validate the return value of the async generator
* @remark not applicable for subscriptions
*/
return?: z.ZodType<TReturnIn, TReturnOut>;
/**
* Whether if the yielded values are tracked
* @remark only applicable for subscriptions
*/
tracked?: Tracked;
}) {
return z
.custom<
AsyncIterable<
Tracked extends true ? TrackedEnvelope<TYieldIn> : TYieldIn,
TReturnIn
>
>((val) => isAsyncIterable(val))
.transform(async function* (iter) {
const iterator = iter[Symbol.asyncIterator]();
try {
let next;
while ((next = await iterator.next()) && !next.done) {
if (opts.tracked) {
const [id, data] = trackedEnvelopeSchema.parse(next.value);
yield tracked(id, await opts.yield.parseAsync(data));
continue;
}
yield opts.yield.parseAsync(next.value);
}
if (opts.return) {
return await opts.return.parseAsync(next.value);
}
return;
} finally {
await iterator.return?.();
}
}) as z.ZodType<
AsyncIterable<
Tracked extends true ? TrackedEnvelope<TYieldIn> : TYieldIn,
TReturnIn,
unknown
>,
AsyncIterable<
Tracked extends true ? TrackedEnvelope<TYieldOut> : TYieldOut,
TReturnOut,
unknown
>
>;
}

zod v3 예시

zAsyncIterable.ts
ts
import type { TrackedEnvelope } from '@trpc/server';
import { isTrackedEnvelope, tracked } from '@trpc/server';
import { z } from 'zod';
function isAsyncIterable<TValue, TReturn = unknown>(
value: unknown,
): value is AsyncIterable<TValue, TReturn> {
return !!value && typeof value === 'object' && Symbol.asyncIterator in value;
}
const trackedEnvelopeSchema =
z.custom<TrackedEnvelope<unknown>>(isTrackedEnvelope);
/**
* A Zod schema helper designed specifically for validating async iterables. This schema ensures that:
* 1. The value being validated is an async iterable.
* 2. Each item yielded by the async iterable conforms to a specified type.
* 3. The return value of the async iterable, if any, also conforms to a specified type.
*/
export function zAsyncIterable<
TYieldIn,
TYieldOut,
TReturnIn = void,
TReturnOut = void,
Tracked extends boolean = false,
>(opts: {
/**
* Validate the value yielded by the async generator
*/
yield: z.ZodType<TYieldIn, any, TYieldOut>;
/**
* Validate the return value of the async generator
* @remark not applicable for subscriptions
*/
return?: z.ZodType<TReturnIn, any, TReturnOut>;
/**
* Whether if the yielded values are tracked
* @remark only applicable for subscriptions
*/
tracked?: Tracked;
}) {
return z
.custom<
AsyncIterable<
Tracked extends true ? TrackedEnvelope<TYieldIn> : TYieldIn,
TReturnIn
>
>((val) => isAsyncIterable(val))
.transform(async function* (iter) {
const iterator = iter[Symbol.asyncIterator]();
try {
let next;
while ((next = await iterator.next()) && !next.done) {
if (opts.tracked) {
const [id, data] = trackedEnvelopeSchema.parse(next.value);
yield tracked(id, await opts.yield.parseAsync(data));
continue;
}
yield opts.yield.parseAsync(next.value);
}
if (opts.return) {
return await opts.return.parseAsync(next.value);
}
return;
} finally {
await iterator.return?.();
}
}) as z.ZodType<
AsyncIterable<
Tracked extends true ? TrackedEnvelope<TYieldIn> : TYieldIn,
TReturnIn,
unknown
>,
any,
AsyncIterable<
Tracked extends true ? TrackedEnvelope<TYieldOut> : TYieldOut,
TReturnOut,
unknown
>
>;
}
zAsyncIterable.ts
ts
import type { TrackedEnvelope } from '@trpc/server';
import { isTrackedEnvelope, tracked } from '@trpc/server';
import { z } from 'zod';
function isAsyncIterable<TValue, TReturn = unknown>(
value: unknown,
): value is AsyncIterable<TValue, TReturn> {
return !!value && typeof value === 'object' && Symbol.asyncIterator in value;
}
const trackedEnvelopeSchema =
z.custom<TrackedEnvelope<unknown>>(isTrackedEnvelope);
/**
* A Zod schema helper designed specifically for validating async iterables. This schema ensures that:
* 1. The value being validated is an async iterable.
* 2. Each item yielded by the async iterable conforms to a specified type.
* 3. The return value of the async iterable, if any, also conforms to a specified type.
*/
export function zAsyncIterable<
TYieldIn,
TYieldOut,
TReturnIn = void,
TReturnOut = void,
Tracked extends boolean = false,
>(opts: {
/**
* Validate the value yielded by the async generator
*/
yield: z.ZodType<TYieldIn, any, TYieldOut>;
/**
* Validate the return value of the async generator
* @remark not applicable for subscriptions
*/
return?: z.ZodType<TReturnIn, any, TReturnOut>;
/**
* Whether if the yielded values are tracked
* @remark only applicable for subscriptions
*/
tracked?: Tracked;
}) {
return z
.custom<
AsyncIterable<
Tracked extends true ? TrackedEnvelope<TYieldIn> : TYieldIn,
TReturnIn
>
>((val) => isAsyncIterable(val))
.transform(async function* (iter) {
const iterator = iter[Symbol.asyncIterator]();
try {
let next;
while ((next = await iterator.next()) && !next.done) {
if (opts.tracked) {
const [id, data] = trackedEnvelopeSchema.parse(next.value);
yield tracked(id, await opts.yield.parseAsync(data));
continue;
}
yield opts.yield.parseAsync(next.value);
}
if (opts.return) {
return await opts.return.parseAsync(next.value);
}
return;
} finally {
await iterator.return?.();
}
}) as z.ZodType<
AsyncIterable<
Tracked extends true ? TrackedEnvelope<TYieldIn> : TYieldIn,
TReturnIn,
unknown
>,
any,
AsyncIterable<
Tracked extends true ? TrackedEnvelope<TYieldOut> : TYieldOut,
TReturnOut,
unknown
>
>;
}

이제 이 헬퍼를 사용해 구독 프로시저의 출력을 검증할 수 있습니다:

_app.ts
ts
import { publicProcedure, router } from '../trpc';
import { zAsyncIterable } from './zAsyncIterable';
export const appRouter = router({
mySubscription: publicProcedure
.input(
z.object({
lastEventId: z.coerce.number().min(0).optional(),
}),
)
.output(
zAsyncIterable({
yield: z.object({
count: z.number(),
}),
tracked: true,
}),
)
.subscription(async function* (opts) {
let index = opts.input.lastEventId ?? 0;
while (true) {
index++;
yield tracked(index, {
count: index,
});
await new Promise((resolve) => setTimeout(resolve, 1000));
}
}),
});
_app.ts
ts
import { publicProcedure, router } from '../trpc';
import { zAsyncIterable } from './zAsyncIterable';
export const appRouter = router({
mySubscription: publicProcedure
.input(
z.object({
lastEventId: z.coerce.number().min(0).optional(),
}),
)
.output(
zAsyncIterable({
yield: z.object({
count: z.number(),
}),
tracked: true,
}),
)
.subscription(async function* (opts) {
let index = opts.input.lastEventId ?? 0;
while (true) {
index++;
yield tracked(index, {
count: index,
});
await new Promise((resolve) => setTimeout(resolve, 1000));
}
}),
});