Skip to content
This repository was archived by the owner on Aug 5, 2025. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified bun.lockb
Binary file not shown.
6 changes: 3 additions & 3 deletions example/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Elysia, ws, t } from 'elysia'
import { Elysia, t } from 'elysia'
import { trpc, compile as c } from '../src'

import { initTRPC } from '@trpc/server'
Expand Down Expand Up @@ -34,11 +34,11 @@ const router = p.router({
export type Router = typeof router

new Elysia()
.use(ws())
.get('/', () => 'tRPC')
.use(
trpc(router, {
createContext
createContext,
useSubscription: true
})
)
.listen(8080, ({ hostname, port }) => {
Expand Down
6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@elysiajs/trpc",
"version": "0.5.2",
"version": "0.7",
"description": "A plugin for Elysia that add support for using tRPC",
"author": {
"name": "saltyAom",
Expand Down Expand Up @@ -40,13 +40,13 @@
"@types/node": "^20.1.4",
"@types/ws": "^8.5.4",
"bun-types": "^0.5.8",
"elysia": "0.5.12",
"elysia": "^0.7.15",
"eslint": "^8.40.0",
"rimraf": "4.4.1",
"typescript": "^5.0.4"
},
"peerDependencies": {
"@trpc/server": ">= 10.0.0",
"elysia": ">= 0.5.12"
"elysia": ">= 0.7.15"
}
}
223 changes: 138 additions & 85 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,22 @@
import { Elysia, getSchemaValidator } from 'elysia'

import { callProcedure, TRPCError, type Router } from '@trpc/server'
import {
DefinitionBase,
Elysia,
InputSchema,
MergeSchema,
RouteSchema,
UnwrapSchema,
getSchemaValidator
} from 'elysia'

import { TRPCError, callProcedure, type Router } from '@trpc/server'
import { fetchRequestHandler } from '@trpc/server/adapters/fetch'
import { isObservable, Unsubscribable } from '@trpc/server/observable'
import { Unsubscribable, isObservable } from '@trpc/server/observable'

import { transformTRPCResponse, getTRPCErrorFromUnknown } from './utils'
import { getTRPCErrorFromUnknown, transformTRPCResponse } from './utils'

import type { TSchema } from '@sinclair/typebox'
import type { TRPCClientIncomingRequest, TRPCOptions } from './types'
import { getErrorShape } from '@trpc/server/shared'

export function compile<T extends TSchema>(schema: T) {
const check = getSchemaValidator(schema, {})
Expand All @@ -34,15 +43,17 @@ const getPath = (url: string) => {
return url.slice(start, end)
}

type ClientSubscripted = Map<string, Unsubscribable>

export const trpc =
(
router: Router<any>,
{ endpoint = '/trpc', ...options }: TRPCOptions = {
endpoint: '/trpc'
}
) =>
(eri: Elysia) => {
let app = eri
(eri: Elysia): Elysia => {
const app = eri
.onParse(async ({ request: { url } }) => {
if (getPath(url).startsWith(endpoint)) return true
})
Expand All @@ -63,28 +74,47 @@ export const trpc =
})
})

const observers: Map<string, Unsubscribable> = new Map()
const observers: Map<string, ClientSubscripted> = new Map()

// @ts-ignore
if (app.wsRouter)
app.ws<any, any>(endpoint, {
if (options.useSubscription)
app.ws(endpoint, {
open(ws) {
const id =
ws.data.headers['sec-websocket-key'] ??
crypto.randomUUID()

// @ts-ignore
ws.data.id = id
},
async message(ws, message) {
// @ts-ignore
const id = ws.data.id

if (!observers.get(id)) {
observers.set(id, new Map())
}

const msg =
typeof message === 'string'
? JSON.parse(message)
: message

const messages: TRPCClientIncomingRequest[] = Array.isArray(
message
msg
)
? message
: [message]
? msg
: [msg]

let observer: Unsubscribable | undefined
await Promise.allSettled(messages.map((incoming) => {}))

for (const incoming of messages) {
if(!incoming.method || !incoming.params) {
continue
}

if (incoming.method === 'subscription.stop') {
const clientObservers = observers.get(id)
const observer = clientObservers?.get(
incoming.id.toString()
)
observer?.unsubscribe()
observers.delete(ws.data.id.toString())
clientObservers?.delete(incoming.id.toString())

return void ws.send(
JSON.stringify({
Expand All @@ -97,99 +127,122 @@ export const trpc =
)
}

const result = await callProcedure({
procedures: router._def.procedures,
path: incoming.params.path,
rawInput: incoming.params.input?.json,
type: incoming.method,
ctx: {}
})
if (!incoming.method || !incoming.params) {
continue
}

if (incoming.method !== 'subscription')
return void ws.send(
const sendErrorMessage = (err: unknown) => {
ws.send(
JSON.stringify(
transformTRPCResponse(router, {
id: incoming.id,
jsonrpc: incoming.jsonrpc,
result: {
type: 'data',
data: result
}
error: getErrorShape({
error: getTRPCErrorFromUnknown(err),
type: incoming.method as 'subscription',
path: incoming.params.path,
input: incoming.params.input,
ctx: {},
config: router._def._config
})
})
)
)
}

ws.send(
JSON.stringify({
id: incoming.id,
jsonrpc: incoming.jsonrpc,
result: {
type: 'started'
}
})
)

if (!isObservable(result))
throw new TRPCError({
message: `Subscription ${incoming.params.path} did not return an observable`,
code: 'INTERNAL_SERVER_ERROR'
try {
const result = await callProcedure({
procedures: router._def.procedures,
path: incoming.params.path,
rawInput: incoming.params.input?.json,
type: incoming.method,
ctx: {}
})

observer = result.subscribe({
next(data) {
ws.send(
if (incoming.method !== 'subscription') {
return void ws.send(
JSON.stringify(
transformTRPCResponse(router, {
id: incoming.id,
jsonrpc: incoming.jsonrpc,
result: {
type: 'data',
data
data: result
}
})
)
)
},
error(err) {
ws.send(
JSON.stringify(
transformTRPCResponse(router, {
id: incoming.id,
jsonrpc: incoming.jsonrpc,
error: router.getErrorShape({
error: getTRPCErrorFromUnknown(
err
),
type: incoming.method as 'subscription',
path: incoming.params.path,
input: incoming.params.input,
ctx: {}
}

ws.send(
JSON.stringify({
id: incoming.id,
jsonrpc: incoming.jsonrpc,
result: {
type: 'started'
}
})
)

if (!isObservable(result)) {
throw new TRPCError({
message: `Subscription ${incoming.params.path} did not return an observable`,
code: 'INTERNAL_SERVER_ERROR'
})
}

const observer = result.subscribe({
next(data) {
ws.send(
JSON.stringify(
transformTRPCResponse(router, {
id: incoming.id,
jsonrpc: incoming.jsonrpc,
result: {
type: 'data',
data
}
})
})
)
)
)
},
complete() {
ws.send(
JSON.stringify(
transformTRPCResponse(router, {
id: incoming.id,
jsonrpc: incoming.jsonrpc,
result: {
type: 'stopped'
}
})
},
error(err) {
sendErrorMessage(err)
},
complete() {
ws.send(
JSON.stringify(
transformTRPCResponse(router, {
id: incoming.id,
jsonrpc: incoming.jsonrpc,
result: {
type: 'stopped'
}
})
)
)
)
}
})
}
})

observers.set(ws.data.id.toString(), observer)
observers
.get(id)
?.set(incoming.id.toString(), observer)
} catch (err) {
sendErrorMessage(err)
}
}
},
close(ws) {
observers.get(ws.data.id.toString())?.unsubscribe()
observers.delete(ws.data.id.toString())
// @ts-ignore
const id = ws.data.id

const clientObservers = observers.get(id)

clientObservers?.forEach((val, key) => {
val.unsubscribe()
})

observers.delete(id)
}
})

Expand Down
2 changes: 1 addition & 1 deletion src/types.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import type { Router } from '@trpc/server'
import type { FetchHandlerRequestOptions } from '@trpc/server/adapters/fetch'

export interface TRPCClientIncomingRequest {
Expand All @@ -24,4 +23,5 @@ export interface TRPCOptions
* @default '/trpc'
*/
endpoint?: string
useSubscription?: boolean;
}