メインコンテンツへスキップ
バージョン: 11.x

HTTPサブスクリプションリンク

非公式ベータ版翻訳

このページは PageTurner AI で翻訳されました(ベータ版)。プロジェクト公式の承認はありません。 エラーを見つけましたか? 問題を報告 →

httpSubscriptionLinkは、サブスクリプションにServer-sent Events(SSE)を使用する終端リンクです。

SSEはリアルタイム通信に適した選択肢であり、WebSocketサーバーのセットアップよりも若干簡単です。

セットアップ

情報

クライアント環境がEventSourceをサポートしていない場合、EventSourceポリフィルが必要です。React Native固有の手順については互換性セクションを参照してください。

httpSubscriptionLinkを使用するには、splitLinkを併用してSSEを明示的に指定する必要があります。

client/index.ts
ts
import type { TRPCLink } from '@trpc/client';
import {
httpBatchLink,
httpSubscriptionLink,
loggerLink,
splitLink,
} from '@trpc/client';
const trpcClient = createTRPCClient<AppRouter>({
/**
* @see https://trpc.io/docs/v11/client/links
*/
links: [
// adds pretty logs to your console in development and logs errors in production
loggerLink(),
splitLink({
// uses the httpSubscriptionLink for subscriptions
condition: (op) => op.type === 'subscription',
true: httpSubscriptionLink({
url: `/api/trpc`,
}),
false: httpBatchLink({
url: `/api/trpc`,
}),
}),
],
});
client/index.ts
ts
import type { TRPCLink } from '@trpc/client';
import {
httpBatchLink,
httpSubscriptionLink,
loggerLink,
splitLink,
} from '@trpc/client';
const trpcClient = createTRPCClient<AppRouter>({
/**
* @see https://trpc.io/docs/v11/client/links
*/
links: [
// adds pretty logs to your console in development and logs errors in production
loggerLink(),
splitLink({
// uses the httpSubscriptionLink for subscriptions
condition: (op) => op.type === 'subscription',
true: httpSubscriptionLink({
url: `/api/trpc`,
}),
false: httpBatchLink({
url: `/api/trpc`,
}),
}),
],
});
ヒント

このドキュメントではhttpSubscriptionLinkの具体的な使用方法を説明しています。サブスクリプション全般の利用方法についてはサブスクリプションガイドを参照してください。

ヘッダーと認証/認可

Webアプリケーション

同一ドメイン

Webアプリケーションの場合、クライアントとサーバーが同じドメイン上にある限り、リクエストの一部としてクッキーが送信されます。

クロスドメイン

クライアントとサーバーが異なるドメインにある場合、withCredentials: trueを使用できます(MDNでの詳細)。

例:

tsx
// [...]
httpSubscriptionLink({
url: 'https://example.com/api/trpc',
eventSourceOptions() {
return {
withCredentials: true, // <---
};
},
});
tsx
// [...]
httpSubscriptionLink({
url: 'https://example.com/api/trpc',
eventSourceOptions() {
return {
withCredentials: true, // <---
};
},
});

ポリフィルによるカスタムヘッダー

非Web環境向けに推奨

EventSourceをポリフィルし、eventSourceOptionsコールバックを使用してヘッダーを設定できます。

tsx
import {
createTRPCClient,
httpBatchLink,
httpSubscriptionLink,
splitLink,
} from '@trpc/client';
import { EventSourcePolyfill } from 'event-source-polyfill';
import type { AppRouter } from '../server/index.js';
// Initialize the tRPC client
const trpc = createTRPCClient<AppRouter>({
links: [
splitLink({
condition: (op) => op.type === 'subscription',
true: httpSubscriptionLink({
url: 'http://localhost:3000',
// ponyfill EventSource
EventSource: EventSourcePolyfill,
// options to pass to the EventSourcePolyfill constructor
eventSourceOptions: async ({ op }) => {
// ^ Includes the operation that's being executed
// you can use this to generate a signature for the operation
const signature = await getSignature(op);
return {
headers: {
authorization: 'Bearer supersecret',
'x-signature': signature,
},
};
},
}),
false: httpBatchLink({
url: 'http://localhost:3000',
}),
}),
],
});
tsx
import {
createTRPCClient,
httpBatchLink,
httpSubscriptionLink,
splitLink,
} from '@trpc/client';
import { EventSourcePolyfill } from 'event-source-polyfill';
import type { AppRouter } from '../server/index.js';
// Initialize the tRPC client
const trpc = createTRPCClient<AppRouter>({
links: [
splitLink({
condition: (op) => op.type === 'subscription',
true: httpSubscriptionLink({
url: 'http://localhost:3000',
// ponyfill EventSource
EventSource: EventSourcePolyfill,
// options to pass to the EventSourcePolyfill constructor
eventSourceOptions: async ({ op }) => {
// ^ Includes the operation that's being executed
// you can use this to generate a signature for the operation
const signature = await getSignature(op);
return {
headers: {
authorization: 'Bearer supersecret',
'x-signature': signature,
},
};
},
}),
false: httpBatchLink({
url: 'http://localhost:3000',
}),
}),
],
});

アクティブ接続の設定更新

httpSubscriptionLinkはSSEをEventSource経由で活用し、ネットワーク障害や不正な応答コードなどのエラー発生時に接続を自動再試行します。ただしEventSourceは、eventSourceOptions()url()オプションを再実行して設定を更新する機能を提供せず、特に認証が期限切れになった場合に問題となります。

この制限に対処するには、httpSubscriptionLinkと併せてretryLinkを使用します。これにより、更新された認証情報を含む最新の設定で接続を再確立できます。

注意

接続を再開するとEventSourceが最初から再作成されるため、以前に追跡されていたイベントはすべて失われることに注意してください。

tsx
import {
createTRPCClient,
httpBatchLink,
httpSubscriptionLink,
retryLink,
splitLink,
} from '@trpc/client';
import {
EventSourcePolyfill,
EventSourcePolyfillInit,
} from 'event-source-polyfill';
import type { AppRouter } from '../server/index.js';
// Initialize the tRPC client
const trpc = createTRPCClient<AppRouter>({
links: [
splitLink({
condition: (op) => op.type === 'subscription',
false: httpBatchLink({
url: 'http://localhost:3000',
}),
true: [
retryLink({
retry: (opts) => {
opts.op.type;
// ^? will always be 'subscription' since we're in a splitLink
const code = opts.error.data?.code;
if (!code) {
// This shouldn't happen as our httpSubscriptionLink will automatically retry within when there's a non-parsable response
console.error('No error code found, retrying', opts);
return true;
}
if (code === 'UNAUTHORIZED' || code === 'FORBIDDEN') {
console.log('Retrying due to 401/403 error');
return true;
}
return false;
},
}),
httpSubscriptionLink({
url: async () => {
// calculate the latest URL if needed...
return getAuthenticatedUri();
},
// ponyfill EventSource
EventSource: EventSourcePolyfill,
eventSourceOptions: async () => {
// ...or maybe renew an access token
const token = await auth.getOrRenewToken();
return {
headers: {
authorization: `Bearer ${token}`,
},
};
},
}),
],
}),
],
});
tsx
import {
createTRPCClient,
httpBatchLink,
httpSubscriptionLink,
retryLink,
splitLink,
} from '@trpc/client';
import {
EventSourcePolyfill,
EventSourcePolyfillInit,
} from 'event-source-polyfill';
import type { AppRouter } from '../server/index.js';
// Initialize the tRPC client
const trpc = createTRPCClient<AppRouter>({
links: [
splitLink({
condition: (op) => op.type === 'subscription',
false: httpBatchLink({
url: 'http://localhost:3000',
}),
true: [
retryLink({
retry: (opts) => {
opts.op.type;
// ^? will always be 'subscription' since we're in a splitLink
const code = opts.error.data?.code;
if (!code) {
// This shouldn't happen as our httpSubscriptionLink will automatically retry within when there's a non-parsable response
console.error('No error code found, retrying', opts);
return true;
}
if (code === 'UNAUTHORIZED' || code === 'FORBIDDEN') {
console.log('Retrying due to 401/403 error');
return true;
}
return false;
},
}),
httpSubscriptionLink({
url: async () => {
// calculate the latest URL if needed...
return getAuthenticatedUri();
},
// ponyfill EventSource
EventSource: EventSourcePolyfill,
eventSourceOptions: async () => {
// ...or maybe renew an access token
const token = await auth.getOrRenewToken();
return {
headers: {
authorization: `Bearer ${token}`,
},
};
},
}),
],
}),
],
});

接続パラメータ

EventSourceで認証するには、httpSubscriptionLink内でconnectionParamsを定義できます(URLの一部として送信されるため、他の方法が推奨されます)。

server/context.ts
ts
import type { CreateHTTPContextOptions } from '@trpc/server/adapters/standalone';
 
export const createContext = async (opts: CreateHTTPContextOptions) => {
const token = opts.info.connectionParams?.token;
const token: string | undefined
 
// [... authenticate]
 
return {};
};
 
export type Context = Awaited<ReturnType<typeof createContext>>;
server/context.ts
ts
import type { CreateHTTPContextOptions } from '@trpc/server/adapters/standalone';
 
export const createContext = async (opts: CreateHTTPContextOptions) => {
const token = opts.info.connectionParams?.token;
const token: string | undefined
 
// [... authenticate]
 
return {};
};
 
export type Context = Awaited<ReturnType<typeof createContext>>;
client/trpc.ts
ts
import {
createTRPCClient,
httpBatchLink,
httpSubscriptionLink,
splitLink,
} from '@trpc/client';
import type { AppRouter } from '../server/index.js';
// Initialize the tRPC client
const trpc = createTRPCClient<AppRouter>({
links: [
splitLink({
condition: (op) => op.type === 'subscription',
true: httpSubscriptionLink({
url: 'http://localhost:3000',
connectionParams: async () => {
// Will be serialized as part of the URL
return {
token: 'supersecret',
};
},
}),
false: httpBatchLink({
url: 'http://localhost:3000',
}),
}),
],
});
client/trpc.ts
ts
import {
createTRPCClient,
httpBatchLink,
httpSubscriptionLink,
splitLink,
} from '@trpc/client';
import type { AppRouter } from '../server/index.js';
// Initialize the tRPC client
const trpc = createTRPCClient<AppRouter>({
links: [
splitLink({
condition: (op) => op.type === 'subscription',
true: httpSubscriptionLink({
url: 'http://localhost:3000',
connectionParams: async () => {
// Will be serialized as part of the URL
return {
token: 'supersecret',
};
},
}),
false: httpBatchLink({
url: 'http://localhost:3000',
}),
}),
],
});

タイムアウト設定

httpSubscriptionLinkreconnectAfterInactivityMsオプションによる非アクティブタイムアウトの設定をサポートしています。指定期間内に(pingメッセージを含む)メッセージが受信されない場合、接続は「接続中」とマークされ自動的に再接続を試みます。

タイムアウト設定はサーバーサイドでtRPCを初期化する際に設定します:

server/trpc.ts
ts
import { initTRPC } from '@trpc/server';
export const t = initTRPC.create({
sse: {
client: {
reconnectAfterInactivityMs: 3_000,
},
},
});
server/trpc.ts
ts
import { initTRPC } from '@trpc/server';
export const t = initTRPC.create({
sse: {
client: {
reconnectAfterInactivityMs: 3_000,
},
},
});

サーバーping設定

サーバーは定期的なpingメッセージを送信するように設定でき、接続を維持しタイムアウトによる切断を防ぎます。これはreconnectAfterInactivityMsオプションと組み合わせると特に有用です。

server/trpc.ts
ts
import { initTRPC } from '@trpc/server';
export const t = initTRPC.create({
sse: {
// Maximum duration of a single SSE connection in milliseconds
// maxDurationMs: 60_00,
ping: {
// Enable periodic ping messages to keep connection alive
enabled: true,
// Send ping message every 2s
intervalMs: 2_000,
},
// client: {
// reconnectAfterInactivityMs: 3_000
// }
},
});
server/trpc.ts
ts
import { initTRPC } from '@trpc/server';
export const t = initTRPC.create({
sse: {
// Maximum duration of a single SSE connection in milliseconds
// maxDurationMs: 60_00,
ping: {
// Enable periodic ping messages to keep connection alive
enabled: true,
// Send ping message every 2s
intervalMs: 2_000,
},
// client: {
// reconnectAfterInactivityMs: 3_000
// }
},
});

互換性(React Native)

httpSubscriptionLinkEventSource API、Streams API、およびAsyncIteratorを使用しますが、これらはReact Nativeでネイティブサポートされておらず、ポリフィルが必要です。

EventSourceをポリフィルする場合、XMLHttpRequest APIを使用するポリフィルよりも、React Nativeが公開するネットワークライブラリを利用するポリフィルを使用することを推奨します。XMLHttpRequestを使用してEventSourceをポリフィルするライブラリは、アプリがバックグラウンドから復帰した後に再接続に失敗する可能性があります。rn-eventsource-rebornパッケージの使用を検討してください。

Streams APIはweb-streams-polyfillパッケージでポリフィルできます。

AsyncIterator@azure/core-asynciterator-polyfillパッケージでポリフィルできます。

インストール

必要なポリフィルをインストールします:

npm install rn-eventsource-reborn web-streams-polyfill @azure/core-asynciterator-polyfill

リンクが使用される前に(例: TRPCReact.Providerを追加する場所で)プロジェクトにポリフィルを追加します:

utils/api.tsx
ts
import '@azure/core-asynciterator-polyfill';
import { RNEventSource } from 'rn-eventsource-reborn';
import { ReadableStream, TransformStream } from 'web-streams-polyfill';
globalThis.ReadableStream = globalThis.ReadableStream || ReadableStream;
globalThis.TransformStream = globalThis.TransformStream || TransformStream;
utils/api.tsx
ts
import '@azure/core-asynciterator-polyfill';
import { RNEventSource } from 'rn-eventsource-reborn';
import { ReadableStream, TransformStream } from 'web-streams-polyfill';
globalThis.ReadableStream = globalThis.ReadableStream || ReadableStream;
globalThis.TransformStream = globalThis.TransformStream || TransformStream;

ポリフィルが追加されたら、セットアップセクションで説明されている通りにhttpSubscriptionLinkの設定を続行できます。

httpSubscriptionLinkのオプション

ts
type HTTPSubscriptionLinkOptions<
TRoot extends AnyClientTypes,
TEventSource extends EventSourceLike.AnyConstructor = typeof EventSource,
> = {
/**
* EventSource ponyfill
*/
EventSource?: TEventSource;
/**
* EventSource options or a callback that returns them
*/
eventSourceOptions?:
| EventSourceLike.InitDictOf<TEventSource>
| ((opts: {
op: Operation;
}) =>
| EventSourceLike.InitDictOf<TEventSource>
| Promise<EventSourceLike.InitDictOf<TEventSource>>);
};
ts
type HTTPSubscriptionLinkOptions<
TRoot extends AnyClientTypes,
TEventSource extends EventSourceLike.AnyConstructor = typeof EventSource,
> = {
/**
* EventSource ponyfill
*/
EventSource?: TEventSource;
/**
* EventSource options or a callback that returns them
*/
eventSourceOptions?:
| EventSourceLike.InitDictOf<TEventSource>
| ((opts: {
op: Operation;
}) =>
| EventSourceLike.InitDictOf<TEventSource>
| Promise<EventSourceLike.InitDictOf<TEventSource>>);
};

サーバー側のSSEオプション

ts
export interface SSEStreamProducerOptions<TValue = unknown> {
ping?: {
/**
* Enable ping comments sent from the server
* @default false
*/
enabled: boolean;
/**
* Interval in milliseconds
* @default 1000
*/
intervalMs?: number;
};
/**
* Maximum duration in milliseconds for the request before ending the stream
* @default undefined
*/
maxDurationMs?: number;
/**
* End the request immediately after data is sent
* Only useful for serverless runtimes that do not support streaming responses
* @default false
*/
emitAndEndImmediately?: boolean;
/**
* Client-specific options - these will be sent to the client as part of the first message
* @default {}
*/
client?: {
/**
* Timeout and reconnect after inactivity in milliseconds
* @default undefined
*/
reconnectAfterInactivityMs?: number;
};
}
ts
export interface SSEStreamProducerOptions<TValue = unknown> {
ping?: {
/**
* Enable ping comments sent from the server
* @default false
*/
enabled: boolean;
/**
* Interval in milliseconds
* @default 1000
*/
intervalMs?: number;
};
/**
* Maximum duration in milliseconds for the request before ending the stream
* @default undefined
*/
maxDurationMs?: number;
/**
* End the request immediately after data is sent
* Only useful for serverless runtimes that do not support streaming responses
* @default false
*/
emitAndEndImmediately?: boolean;
/**
* Client-specific options - these will be sent to the client as part of the first message
* @default {}
*/
client?: {
/**
* Timeout and reconnect after inactivity in milliseconds
* @default undefined
*/
reconnectAfterInactivityMs?: number;
};
}