HTTP 订阅链接
本页面由 PageTurner AI 翻译(测试版)。未经项目官方认可。 发现错误? 报告问题 →
httpSubscriptionLink 是一个使用服务器发送事件(SSE)实现订阅功能的终止链接。
SSE 是实现实时通信的不错选择,它比建立 WebSocket 服务器要简单一些。
设置
如果客户端环境不支持 EventSource,您需要安装EventSource polyfill。React Native 的具体配置请参考兼容性章节。
使用 httpSubscriptionLink 时,需要通过splitLink明确指定使用 SSE 处理订阅请求。
client/index.tstsimport type { TRPCLink } from '@trpc/client';import {httpBatchLink,httpSubscriptionLink,loggerLink,splitLink,} from '@trpc/client';const trpcClient = createTRPCClient<AppRouter>({/*** @see https://trpc.io/docs/v11/client/links*/links: [// adds pretty logs to your console in development and logs errors in productionloggerLink(),splitLink({// uses the httpSubscriptionLink for subscriptionscondition: (op) => op.type === 'subscription',true: httpSubscriptionLink({url: `/api/trpc`,}),false: httpBatchLink({url: `/api/trpc`,}),}),],});
client/index.tstsimport type { TRPCLink } from '@trpc/client';import {httpBatchLink,httpSubscriptionLink,loggerLink,splitLink,} from '@trpc/client';const trpcClient = createTRPCClient<AppRouter>({/*** @see https://trpc.io/docs/v11/client/links*/links: [// adds pretty logs to your console in development and logs errors in productionloggerLink(),splitLink({// uses the httpSubscriptionLink for subscriptionscondition: (op) => op.type === 'subscription',true: httpSubscriptionLink({url: `/api/trpc`,}),false: httpBatchLink({url: `/api/trpc`,}),}),],});
本文档重点介绍 httpSubscriptionLink 的具体用法。关于订阅功能的通用指南,请参阅订阅功能文档。
请求头与认证鉴权
Web 应用场景
同域名场景
在 Web 应用中,只要客户端与服务器处于同一域名下,Cookie 将自动随请求发送。
跨域名场景
当客户端与服务器处于不同域名时,可设置 withCredentials: true(详见 MDN 文档)。
示例:
tsx// [...]httpSubscriptionLink({url: 'https://example.com/api/trpc',eventSourceOptions() {return {withCredentials: true, // <---};},});
tsx// [...]httpSubscriptionLink({url: 'https://example.com/api/trpc',eventSourceOptions() {return {withCredentials: true, // <---};},});
通过 Ponyfill 添加自定义请求头
推荐用于非 Web 环境
您可以通过 ponyfill 引入 EventSource,并使用 eventSourceOptions 回调函数配置请求头。
tsximport {createTRPCClient,httpBatchLink,httpSubscriptionLink,splitLink,} from '@trpc/client';import { EventSourcePolyfill } from 'event-source-polyfill';import type { AppRouter } from '../server/index.js';// Initialize the tRPC clientconst trpc = createTRPCClient<AppRouter>({links: [splitLink({condition: (op) => op.type === 'subscription',true: httpSubscriptionLink({url: 'http://localhost:3000',// ponyfill EventSourceEventSource: EventSourcePolyfill,// options to pass to the EventSourcePolyfill constructoreventSourceOptions: async ({ op }) => {// ^ Includes the operation that's being executed// you can use this to generate a signature for the operationconst signature = await getSignature(op);return {headers: {authorization: 'Bearer supersecret','x-signature': signature,},};},}),false: httpBatchLink({url: 'http://localhost:3000',}),}),],});
tsximport {createTRPCClient,httpBatchLink,httpSubscriptionLink,splitLink,} from '@trpc/client';import { EventSourcePolyfill } from 'event-source-polyfill';import type { AppRouter } from '../server/index.js';// Initialize the tRPC clientconst trpc = createTRPCClient<AppRouter>({links: [splitLink({condition: (op) => op.type === 'subscription',true: httpSubscriptionLink({url: 'http://localhost:3000',// ponyfill EventSourceEventSource: EventSourcePolyfill,// options to pass to the EventSourcePolyfill constructoreventSourceOptions: async ({ op }) => {// ^ Includes the operation that's being executed// you can use this to generate a signature for the operationconst signature = await getSignature(op);return {headers: {authorization: 'Bearer supersecret','x-signature': signature,},};},}),false: httpBatchLink({url: 'http://localhost:3000',}),}),],});
更新活动连接的配置
httpSubscriptionLink 基于 EventSource 实现 SSE,确保网络故障或错误响应码等连接问题会自动重试。但 EventSource 不允许重新执行 eventSourceOptions() 或 url() 来更新配置,这在认证信息过期时尤为重要。
为解决此限制,可将 retryLink 与 httpSubscriptionLink 配合使用。此方案能确保使用最新配置(包括更新的认证信息)重新建立连接。
请注意:重启连接会导致 EventSource 完全重建,所有已跟踪的事件状态将丢失。
tsximport {createTRPCClient,httpBatchLink,httpSubscriptionLink,retryLink,splitLink,} from '@trpc/client';import {EventSourcePolyfill,EventSourcePolyfillInit,} from 'event-source-polyfill';import type { AppRouter } from '../server/index.js';// Initialize the tRPC clientconst trpc = createTRPCClient<AppRouter>({links: [splitLink({condition: (op) => op.type === 'subscription',false: httpBatchLink({url: 'http://localhost:3000',}),true: [retryLink({retry: (opts) => {opts.op.type;// ^? will always be 'subscription' since we're in a splitLinkconst code = opts.error.data?.code;if (!code) {// This shouldn't happen as our httpSubscriptionLink will automatically retry within when there's a non-parsable responseconsole.error('No error code found, retrying', opts);return true;}if (code === 'UNAUTHORIZED' || code === 'FORBIDDEN') {console.log('Retrying due to 401/403 error');return true;}return false;},}),httpSubscriptionLink({url: async () => {// calculate the latest URL if needed...return getAuthenticatedUri();},// ponyfill EventSourceEventSource: EventSourcePolyfill,eventSourceOptions: async () => {// ...or maybe renew an access tokenconst token = await auth.getOrRenewToken();return {headers: {authorization: `Bearer ${token}`,},};},}),],}),],});
tsximport {createTRPCClient,httpBatchLink,httpSubscriptionLink,retryLink,splitLink,} from '@trpc/client';import {EventSourcePolyfill,EventSourcePolyfillInit,} from 'event-source-polyfill';import type { AppRouter } from '../server/index.js';// Initialize the tRPC clientconst trpc = createTRPCClient<AppRouter>({links: [splitLink({condition: (op) => op.type === 'subscription',false: httpBatchLink({url: 'http://localhost:3000',}),true: [retryLink({retry: (opts) => {opts.op.type;// ^? will always be 'subscription' since we're in a splitLinkconst code = opts.error.data?.code;if (!code) {// This shouldn't happen as our httpSubscriptionLink will automatically retry within when there's a non-parsable responseconsole.error('No error code found, retrying', opts);return true;}if (code === 'UNAUTHORIZED' || code === 'FORBIDDEN') {console.log('Retrying due to 401/403 error');return true;}return false;},}),httpSubscriptionLink({url: async () => {// calculate the latest URL if needed...return getAuthenticatedUri();},// ponyfill EventSourceEventSource: EventSourcePolyfill,eventSourceOptions: async () => {// ...or maybe renew an access tokenconst token = await auth.getOrRenewToken();return {headers: {authorization: `Bearer ${token}`,},};},}),],}),],});
连接参数
为向 EventSource 提供认证信息,可在 httpSubscriptionLink 中定义 connectionParams(该参数会作为 URL 的一部分发送,因此更推荐其他认证方式)。
server/context.tstsimport type {CreateHTTPContextOptions } from '@trpc/server/adapters/standalone';export constcreateContext = async (opts :CreateHTTPContextOptions ) => {consttoken =opts .info .connectionParams ?.token ;// [... authenticate]return {};};export typeContext =Awaited <ReturnType <typeofcreateContext >>;
server/context.tstsimport type {CreateHTTPContextOptions } from '@trpc/server/adapters/standalone';export constcreateContext = async (opts :CreateHTTPContextOptions ) => {consttoken =opts .info .connectionParams ?.token ;// [... authenticate]return {};};export typeContext =Awaited <ReturnType <typeofcreateContext >>;
client/trpc.tstsimport {createTRPCClient,httpBatchLink,httpSubscriptionLink,splitLink,} from '@trpc/client';import type { AppRouter } from '../server/index.js';// Initialize the tRPC clientconst trpc = createTRPCClient<AppRouter>({links: [splitLink({condition: (op) => op.type === 'subscription',true: httpSubscriptionLink({url: 'http://localhost:3000',connectionParams: async () => {// Will be serialized as part of the URLreturn {token: 'supersecret',};},}),false: httpBatchLink({url: 'http://localhost:3000',}),}),],});
client/trpc.tstsimport {createTRPCClient,httpBatchLink,httpSubscriptionLink,splitLink,} from '@trpc/client';import type { AppRouter } from '../server/index.js';// Initialize the tRPC clientconst trpc = createTRPCClient<AppRouter>({links: [splitLink({condition: (op) => op.type === 'subscription',true: httpSubscriptionLink({url: 'http://localhost:3000',connectionParams: async () => {// Will be serialized as part of the URLreturn {token: 'supersecret',};},}),false: httpBatchLink({url: 'http://localhost:3000',}),}),],});
超时配置
httpSubscriptionLink 支持通过 reconnectAfterInactivityMs 选项配置空闲超时。如果在指定时间内未收到任何消息(包括心跳消息),连接将标记为"连接中"状态并自动尝试重连。
超时配置在服务端初始化 tRPC 时设置:
server/trpc.tstsimport { initTRPC } from '@trpc/server';export const t = initTRPC.create({sse: {client: {reconnectAfterInactivityMs: 3_000,},},});
server/trpc.tstsimport { initTRPC } from '@trpc/server';export const t = initTRPC.create({sse: {client: {reconnectAfterInactivityMs: 3_000,},},});
服务端心跳配置
可配置服务端定期发送心跳消息以保持连接活跃,防止超时断开。此功能与 reconnectAfterInactivityMs 选项配合使用效果尤佳。
server/trpc.tstsimport { initTRPC } from '@trpc/server';export const t = initTRPC.create({sse: {// Maximum duration of a single SSE connection in milliseconds// maxDurationMs: 60_00,ping: {// Enable periodic ping messages to keep connection aliveenabled: true,// Send ping message every 2sintervalMs: 2_000,},// client: {// reconnectAfterInactivityMs: 3_000// }},});
server/trpc.tstsimport { initTRPC } from '@trpc/server';export const t = initTRPC.create({sse: {// Maximum duration of a single SSE connection in milliseconds// maxDurationMs: 60_00,ping: {// Enable periodic ping messages to keep connection aliveenabled: true,// Send ping message every 2sintervalMs: 2_000,},// client: {// reconnectAfterInactivityMs: 3_000// }},});
兼容性 (React Native)
httpSubscriptionLink 使用了 EventSource API、Streams API 和 AsyncIterator,这些 API 在 React Native 中不被原生支持,需要通过 ponyfill 实现。
对于 EventSource 的 ponyfill,我们推荐使用基于 React Native 网络库的 polyfill 方案,而非基于 XMLHttpRequest API 的方案。使用 XMLHttpRequest 实现的 EventSource polyfill 在应用从后台唤醒后无法正常重连。建议采用 rn-eventsource-reborn 包。
Streams API 可通过 web-streams-polyfill 包实现 ponyfill。
AsyncIterator 可通过 @azure/core-asynciterator-polyfill 包实现 polyfill。
安装步骤
安装所需 polyfill:
- npm
- yarn
- pnpm
- bun
- deno
npm install rn-eventsource-reborn web-streams-polyfill @azure/core-asynciterator-polyfill
yarn add rn-eventsource-reborn web-streams-polyfill @azure/core-asynciterator-polyfill
pnpm add rn-eventsource-reborn web-streams-polyfill @azure/core-asynciterator-polyfill
bun add rn-eventsource-reborn web-streams-polyfill @azure/core-asynciterator-polyfill
deno add npm:rn-eventsource-reborn npm:web-streams-polyfill npm:@azure/core-asynciterator-polyfill
在链接被使用前(例如在添加 TRPCReact.Provider 的位置)将 polyfill 引入项目:
utils/api.tsxtsimport '@azure/core-asynciterator-polyfill';import { RNEventSource } from 'rn-eventsource-reborn';import { ReadableStream, TransformStream } from 'web-streams-polyfill';globalThis.ReadableStream = globalThis.ReadableStream || ReadableStream;globalThis.TransformStream = globalThis.TransformStream || TransformStream;
utils/api.tsxtsimport '@azure/core-asynciterator-polyfill';import { RNEventSource } from 'rn-eventsource-reborn';import { ReadableStream, TransformStream } from 'web-streams-polyfill';globalThis.ReadableStream = globalThis.ReadableStream || ReadableStream;globalThis.TransformStream = globalThis.TransformStream || TransformStream;
完成 ponyfill 引入后,即可按照设置章节的说明继续配置 httpSubscriptionLink。
httpSubscriptionLink 配置选项
tstype HTTPSubscriptionLinkOptions<TRoot extends AnyClientTypes,TEventSource extends EventSourceLike.AnyConstructor = typeof EventSource,> = {/*** EventSource ponyfill*/EventSource?: TEventSource;/*** EventSource options or a callback that returns them*/eventSourceOptions?:| EventSourceLike.InitDictOf<TEventSource>| ((opts: {op: Operation;}) =>| EventSourceLike.InitDictOf<TEventSource>| Promise<EventSourceLike.InitDictOf<TEventSource>>);};
tstype HTTPSubscriptionLinkOptions<TRoot extends AnyClientTypes,TEventSource extends EventSourceLike.AnyConstructor = typeof EventSource,> = {/*** EventSource ponyfill*/EventSource?: TEventSource;/*** EventSource options or a callback that returns them*/eventSourceOptions?:| EventSourceLike.InitDictOf<TEventSource>| ((opts: {op: Operation;}) =>| EventSourceLike.InitDictOf<TEventSource>| Promise<EventSourceLike.InitDictOf<TEventSource>>);};
服务端 SSE 配置选项
tsexport interface SSEStreamProducerOptions<TValue = unknown> {ping?: {/*** Enable ping comments sent from the server* @default false*/enabled: boolean;/*** Interval in milliseconds* @default 1000*/intervalMs?: number;};/*** Maximum duration in milliseconds for the request before ending the stream* @default undefined*/maxDurationMs?: number;/*** End the request immediately after data is sent* Only useful for serverless runtimes that do not support streaming responses* @default false*/emitAndEndImmediately?: boolean;/*** Client-specific options - these will be sent to the client as part of the first message* @default {}*/client?: {/*** Timeout and reconnect after inactivity in milliseconds* @default undefined*/reconnectAfterInactivityMs?: number;};}
tsexport interface SSEStreamProducerOptions<TValue = unknown> {ping?: {/*** Enable ping comments sent from the server* @default false*/enabled: boolean;/*** Interval in milliseconds* @default 1000*/intervalMs?: number;};/*** Maximum duration in milliseconds for the request before ending the stream* @default undefined*/maxDurationMs?: number;/*** End the request immediately after data is sent* Only useful for serverless runtimes that do not support streaming responses* @default false*/emitAndEndImmediately?: boolean;/*** Client-specific options - these will be sent to the client as part of the first message* @default {}*/client?: {/*** Timeout and reconnect after inactivity in milliseconds* @default undefined*/reconnectAfterInactivityMs?: number;};}