From 9d53a0dcd1858a8ce572779e4cbc985b0f7dc15f Mon Sep 17 00:00:00 2001 From: Mattia Lau Date: Fri, 29 Sep 2023 15:33:53 +0800 Subject: [PATCH 1/2] chore: support elysia 0.7 --- bun.lockb | Bin 36467 -> 42397 bytes example/index.ts | 6 +- package.json | 4 +- src/index.ts | 223 +++++++++++++++++++++++++++++------------------ src/types.ts | 2 +- 5 files changed, 144 insertions(+), 91 deletions(-) diff --git a/bun.lockb b/bun.lockb index 3fc99d05daed6693b2c667c6ecc98ec5c4a912e5..4edd0d1323a3f5aa8db54a5dd83084bc37bcedbe 100755 GIT binary patch delta 1959 zcmeHIeN0nV6u+OYv?9#SMTacs+?zbXySw|%`8%vrbulh% z5}Ak2!tBDXf-Jghkc|)vAzeRa2Ih9mj+h&r5aMBOwdge#f0|mIsZ=0j;M<&`dp{WW z$(&l#;N%!p-hSii3C`(5j})FM-fuVe`}N8TyTahsm-of7k8EMqK1gofc*L^5u}#uu zq9YVfvp6hNTScyqwzESK9~P*Ie@KR8a@3Wuhb}iiNM6PuaXwKvzt^B^nUr+K*@j^Y zW?3@E7Hn+91_9ZxM}d|Z!sx`FNbIqNY^IbnS#-$4!C@SPLa>BD%#|)xVT%f+_I1Np z3SCZKaGWHGPEZQg(H?T(tU%6K>5PyJhjCzzB_-m($8#M~XHa@J(-~Bmk5bb?;2Qv& zeMt(2&A#RTCan-{7euHNoCjlr3R zGbK%~`ezFz4&$@U7jCz=59C!;gw}LF&fW(-vBrp!6CFOv13QM?`0HE}KJ}>hkv~in zA3qvd3(Ifwn!DUNZ_LYg2GC~&yr-EPOq$@0zx`b1Rde0jZ$UEOv!=-Ip5XNnK zH=D}eGHiHC;u~50&U4|8edD3-?5zinxZm8>-#xXMd*|@g`Uhiv#rjJp;Yi)r5!;5s z7>Nm;{`T{Z6P!O|qGKP*<0t6syEkHRa1L$kKOY&6lOfRJl0|$WG}g>8+oT-( zHrD6)Nu!&0Q~Hsj=_jriN<8{P{l4Iu^ck7{+?8i_6887>XP;NyG5+EC08Uf4Ep&dS543UZgqZvX|kp7Oj_yQ znBauq3uwGdgg^eXqiYad! z1`Uc6T0xiKZQyTjBH=wz0CX}*&@uUtERy|?|9A~*NhUL)(EMKS7eZh@2gXwT!6vnw z>A2VIp5F%zsYyfw%%y$>$I}*={7=k6RS_0elg}-~FuZdOEr`0zIT+Lzr^ZlW?7^Ud zvM}dk6kuE~!Vf%(szIJ^vQG*WNkw9bL7MRsOBE~@iKW)IY;CwS0>?;1fe2X}x3wi> zMJx&ui4n54c5Bnd3Z6kJ)}Mg2y}y`)XCc%N*?ByP^{269s`;L{Iys-x@sgro6o->! z;?CGA(E0lvP^5AQ`C2};YG5M_pbYj6d1Oj(WDv^{0$%Wq-7MV(Bf zP2U@#QL55*rzmp*w1ruURCR&*j6#hvU6rEE%~2@w6e_J^k5a2uqqGuda{7UUdm^5umQf-N`Kc9f6At X>vitXShdRb#S4LvQa8gym7e$u%KeW4 delta 1620 zcmb7EYfw{H5I#2n^3XsCi98Al86F{pyecFjLR#CZt*voH0Y@2LX-MUfKp2utHBoDa zT8QGQIAAIoky52#Olg%uK~!SZwpwg`R0RuC1!V{prs7QKxso`J^@rVi_kOeI+ugHg z@4frQdBQ_8VJDAW@fq21#ALtUyHEO?KUgXfO1~rw*OtY05UX@6_{Vw&Dyw937$h(w zMi(B>vT9XC$uA55XaE>{5K<6+i;#}6!G-ty00>0(4rKcyT*;jAw}pB%5;^La?W62l zjLUbfMsMg#toEZXE^oSU%EGKV$BLhh$?k}d+3!YvKfV!;kwi&OA14NY-nBmt9&cZ< z`?PqIZWj+<&@TvY4|?VLrijxGP0_P#TavK;i;6jqExMr_3A%)@S+-2_;Ic62peI9>kHnXNjlGIB3541WiE@NgtALJZQld|L|F__D6yyp<8s|Ix%_IpRdVtnIf^1=07!Ysc|F@wyruWomZVjUVoxFZ2e@vP8QALnp*d2I?vR^*(<;Z*TC1jmE2&^ zswd9n1_!h)j8a!Ddh|`IbLjW$<8yEC?y3FQ6LzEoEdMIx$fF1LAACJ)_bd@jPp>+u zn3I0Ba(|n2|EN+m%X@kJF2~P~%z@K}I;XW~$=11l)(noOi6{>)ii_p)Q3B#3YgnQ4z2y1&S~o%JJa*pJJu=snedp+UCStjz7rc&Tf^D< z@Rp}BBrU-k?-Gxef~LufjM3;8ubYR?syawrUMH1T zl%GtrRxUco9$dFj(oVgXX&wvgy*4n`?YNVAquj42th1)8AjbOX=|r=_2}9NdNb^E& zmNp6m>n7Fg{@#C>-?i7uic`26XVdFFrJeViFL{O@)%^79;|)r4zW3m<#aozKGq_Wl z_}4!94UXQCiP=imbYifaOovD5WT++2#t3k8S{*c|acrUKHDpTmOuZG^FjyyrYjxYH z$6ZV2TsW%BfHHj}wg*n=r>SRiXME=?5WyxNc8QJiQ6fQ9j5rn2BBX^#3y|UtYJ_-S z%8+W2<|)tvNn{zU-ZV}U@Zxwv5wPtr4ABfC2~Q|+PXhNpPQWteBPTHc?wR4928a>z z&;@{ds<`KcD@Fi-d+vn$5?i@7g$oFn0ENs)1-K`ZPv71Fp((SBfI)AQSBL=fG4;Ig{Hs&_lR;29|Fe34+P*+Q#@6LpA*T^GFWahQu|%z zAC)%74l_fBAC{+4=%ppf!kqjpWiel)Q^~W7v{D>P`1wV-StWVAocy9fd9GZp`XH-B z6RR$j7wbwi#mYj(^E~|fKy%ddi~QLCv7_&bHh>QEc8z83ozuwW_b diff --git a/example/index.ts b/example/index.ts index f1f9fd6..515cf5c 100644 --- a/example/index.ts +++ b/example/index.ts @@ -1,4 +1,4 @@ -import { Elysia, ws, t } from 'elysia' +import { Elysia, t } from 'elysia' import { trpc, compile as c } from '../src' import { initTRPC } from '@trpc/server' @@ -34,11 +34,11 @@ const router = p.router({ export type Router = typeof router new Elysia() - .use(ws()) .get('/', () => 'tRPC') .use( trpc(router, { - createContext + createContext, + useSubscription: true }) ) .listen(8080, ({ hostname, port }) => { diff --git a/package.json b/package.json index 609ab86..059cde0 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@elysiajs/trpc", - "version": "0.5.2", + "version": "0.7", "description": "A plugin for Elysia that add support for using tRPC", "author": { "name": "saltyAom", @@ -40,7 +40,7 @@ "@types/node": "^20.1.4", "@types/ws": "^8.5.4", "bun-types": "^0.5.8", - "elysia": "0.5.12", + "elysia": "^0.7.15", "eslint": "^8.40.0", "rimraf": "4.4.1", "typescript": "^5.0.4" diff --git a/src/index.ts b/src/index.ts index e313d3c..80d4c50 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,13 +1,22 @@ -import { Elysia, getSchemaValidator } from 'elysia' - -import { callProcedure, TRPCError, type Router } from '@trpc/server' +import { + DefinitionBase, + Elysia, + InputSchema, + MergeSchema, + RouteSchema, + UnwrapSchema, + getSchemaValidator +} from 'elysia' + +import { TRPCError, callProcedure, type Router } from '@trpc/server' import { fetchRequestHandler } from '@trpc/server/adapters/fetch' -import { isObservable, Unsubscribable } from '@trpc/server/observable' +import { Unsubscribable, isObservable } from '@trpc/server/observable' -import { transformTRPCResponse, getTRPCErrorFromUnknown } from './utils' +import { getTRPCErrorFromUnknown, transformTRPCResponse } from './utils' import type { TSchema } from '@sinclair/typebox' import type { TRPCClientIncomingRequest, TRPCOptions } from './types' +import { getErrorShape } from '@trpc/server/shared' export function compile(schema: T) { const check = getSchemaValidator(schema, {}) @@ -34,6 +43,8 @@ const getPath = (url: string) => { return url.slice(start, end) } +type ClientSubscripted = Map + export const trpc = ( router: Router, @@ -41,8 +52,8 @@ export const trpc = endpoint: '/trpc' } ) => - (eri: Elysia) => { - let app = eri + (eri: Elysia): Elysia => { + const app = eri .onParse(async ({ request: { url } }) => { if (getPath(url).startsWith(endpoint)) return true }) @@ -63,28 +74,47 @@ export const trpc = }) }) - const observers: Map = new Map() + const observers: Map = new Map() - // @ts-ignore - if (app.wsRouter) - app.ws(endpoint, { + if (options.useSubscription) + app.ws(endpoint, { + open(ws) { + const id = + ws.data.headers['sec-websocket-key'] ?? + crypto.randomUUID() + + // @ts-ignore + ws.data.id = id + }, async message(ws, message) { + // @ts-ignore + const id = ws.data.id + + if (!observers.get(id)) { + observers.set(id, new Map()) + } + + const msg = + typeof message === 'string' + ? JSON.parse(message) + : message + const messages: TRPCClientIncomingRequest[] = Array.isArray( - message + msg ) - ? message - : [message] + ? msg + : [msg] - let observer: Unsubscribable | undefined + await Promise.allSettled(messages.map((incoming) => {})) for (const incoming of messages) { - if(!incoming.method || !incoming.params) { - continue - } - if (incoming.method === 'subscription.stop') { + const clientObservers = observers.get(id) + const observer = clientObservers?.get( + incoming.id.toString() + ) observer?.unsubscribe() - observers.delete(ws.data.id.toString()) + clientObservers?.delete(incoming.id.toString()) return void ws.send( JSON.stringify({ @@ -97,99 +127,122 @@ export const trpc = ) } - const result = await callProcedure({ - procedures: router._def.procedures, - path: incoming.params.path, - rawInput: incoming.params.input?.json, - type: incoming.method, - ctx: {} - }) + if (!incoming.method || !incoming.params) { + continue + } - if (incoming.method !== 'subscription') - return void ws.send( + const sendErrorMessage = (err: unknown) => { + ws.send( JSON.stringify( transformTRPCResponse(router, { id: incoming.id, jsonrpc: incoming.jsonrpc, - result: { - type: 'data', - data: result - } + error: getErrorShape({ + error: getTRPCErrorFromUnknown(err), + type: incoming.method as 'subscription', + path: incoming.params.path, + input: incoming.params.input, + ctx: {}, + config: router._def._config + }) }) ) ) + } - ws.send( - JSON.stringify({ - id: incoming.id, - jsonrpc: incoming.jsonrpc, - result: { - type: 'started' - } - }) - ) - - if (!isObservable(result)) - throw new TRPCError({ - message: `Subscription ${incoming.params.path} did not return an observable`, - code: 'INTERNAL_SERVER_ERROR' + try { + const result = await callProcedure({ + procedures: router._def.procedures, + path: incoming.params.path, + rawInput: incoming.params.input?.json, + type: incoming.method, + ctx: {} }) - observer = result.subscribe({ - next(data) { - ws.send( + if (incoming.method !== 'subscription') { + return void ws.send( JSON.stringify( transformTRPCResponse(router, { id: incoming.id, jsonrpc: incoming.jsonrpc, result: { type: 'data', - data + data: result } }) ) ) - }, - error(err) { - ws.send( - JSON.stringify( - transformTRPCResponse(router, { - id: incoming.id, - jsonrpc: incoming.jsonrpc, - error: router.getErrorShape({ - error: getTRPCErrorFromUnknown( - err - ), - type: incoming.method as 'subscription', - path: incoming.params.path, - input: incoming.params.input, - ctx: {} + } + + ws.send( + JSON.stringify({ + id: incoming.id, + jsonrpc: incoming.jsonrpc, + result: { + type: 'started' + } + }) + ) + + if (!isObservable(result)) { + throw new TRPCError({ + message: `Subscription ${incoming.params.path} did not return an observable`, + code: 'INTERNAL_SERVER_ERROR' + }) + } + + const observer = result.subscribe({ + next(data) { + ws.send( + JSON.stringify( + transformTRPCResponse(router, { + id: incoming.id, + jsonrpc: incoming.jsonrpc, + result: { + type: 'data', + data + } }) - }) + ) ) - ) - }, - complete() { - ws.send( - JSON.stringify( - transformTRPCResponse(router, { - id: incoming.id, - jsonrpc: incoming.jsonrpc, - result: { - type: 'stopped' - } - }) + }, + error(err) { + sendErrorMessage(err) + }, + complete() { + ws.send( + JSON.stringify( + transformTRPCResponse(router, { + id: incoming.id, + jsonrpc: incoming.jsonrpc, + result: { + type: 'stopped' + } + }) + ) ) - ) - } - }) + } + }) - observers.set(ws.data.id.toString(), observer) + observers + .get(id) + ?.set(incoming.id.toString(), observer) + } catch (err) { + sendErrorMessage(err) + } } }, close(ws) { - observers.get(ws.data.id.toString())?.unsubscribe() - observers.delete(ws.data.id.toString()) + // @ts-ignore + const id = ws.data.id + + const clientObservers = observers.get(id) + + clientObservers?.forEach((val, key) => { + val.unsubscribe() + }) + + observers.delete(id) } }) diff --git a/src/types.ts b/src/types.ts index f0abb4f..98130fa 100644 --- a/src/types.ts +++ b/src/types.ts @@ -1,4 +1,3 @@ -import type { Router } from '@trpc/server' import type { FetchHandlerRequestOptions } from '@trpc/server/adapters/fetch' export interface TRPCClientIncomingRequest { @@ -24,4 +23,5 @@ export interface TRPCOptions * @default '/trpc' */ endpoint?: string + useSubscription?: boolean; } From 7d2f0bf4bb17d262826c3393ed5317417899fa51 Mon Sep 17 00:00:00 2001 From: Mattia Lau Date: Fri, 29 Sep 2023 15:57:10 +0800 Subject: [PATCH 2/2] chore: upgrade peer dependencies --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 059cde0..adaab34 100644 --- a/package.json +++ b/package.json @@ -47,6 +47,6 @@ }, "peerDependencies": { "@trpc/server": ">= 10.0.0", - "elysia": ">= 0.5.12" + "elysia": ">= 0.7.15" } }