Skip to content

Commit f7a74f6

Browse files
committed
Live Event App
* Added .gitignore * Added a new directory for @pubnub/tomato
1 parent 30bc7ce commit f7a74f6

File tree

6 files changed

+257
-0
lines changed

6 files changed

+257
-0
lines changed

live-event-app/.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
tomato/node_modules
2+
tomato/package-lock.json
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
import { rand, randUuid, randDatabaseType, randFirstName } from '@ngneat/falso'
2+
import { envelope, successfulResponse, timetokenData } from '../utils/subscribe'
3+
4+
export const name = 'loadTest'
5+
6+
const SUB_KEY = 'demo'
7+
8+
const MSGS_PER_SECOND = 100000
9+
const CHANNEL = 'demo'
10+
const CONCURRENT_USERS = 4
11+
const CHUNKS_PER_SECOND = 100
12+
const MSGS_PER_CHUNK = MSGS_PER_SECOND / CHUNKS_PER_SECOND
13+
const CHUNK_DELAY_MS = (1 / CHUNKS_PER_SECOND) * 1000
14+
15+
const users = Array.from(Array(CONCURRENT_USERS), () => ({
16+
uuid: randUuid(),
17+
}))
18+
19+
function generatePayload(currentItemIndex: number) {
20+
return {
21+
id: randUuid(),
22+
text: "Lorem ipsum dolor sit amet",
23+
createdAt: (new Date().toISOString())
24+
}
25+
}
26+
27+
function generateEnvelopes(startTimetoken: string, amount: number) {
28+
const result = []
29+
let start = BigInt(startTimetoken)
30+
31+
for (let i = 0; i < amount; i++) {
32+
const messageTimetoken = (start++).toString()
33+
const user = rand(users)
34+
35+
result.push(
36+
envelope({
37+
channel: CHANNEL,
38+
sender: "user_0",
39+
subKey: SUB_KEY,
40+
publishingTimetoken: {
41+
t: messageTimetoken,
42+
r: 0,
43+
},
44+
payload: generatePayload(i),
45+
})
46+
)
47+
}
48+
49+
return result
50+
}
51+
52+
export default async function () {
53+
let currentTimetoken = timetoken.now()
54+
55+
const request = await expect({
56+
description: 'subscribe with timetoken zero',
57+
validations: [],
58+
})
59+
60+
await request.respond({
61+
status: 200,
62+
body: successfulResponse(timetokenData(currentTimetoken)),
63+
})
64+
65+
while (true) {
66+
await Promise.race([
67+
sleep(CHUNK_DELAY_MS),
68+
expect({
69+
description: 'subscribe next',
70+
validations: [],
71+
}).then(async (request) => {
72+
const envelopes = generateEnvelopes(currentTimetoken, MSGS_PER_CHUNK)
73+
74+
const nextTimetoken = timetoken.now()
75+
76+
await request.respond({
77+
status: 200,
78+
body: successfulResponse(timetokenData(nextTimetoken), envelopes),
79+
})
80+
81+
currentTimetoken = nextTimetoken
82+
}),
83+
])
84+
}
85+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
import { router } from '../utils/router'
2+
3+
export const name = 'loadTestRouter'
4+
5+
export default router(expect, {
6+
'GET /time/:param': (req, params) => {
7+
console.log(req.url.path)
8+
9+
return {
10+
status: 200,
11+
body: {
12+
result: params.param,
13+
},
14+
}
15+
},
16+
})

live-event-app/tomato/package.json

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
{
2+
"name": "tomato-chat",
3+
"version": "1.0.0",
4+
"description": "Description goes here",
5+
"dependencies": {
6+
"@ngneat/falso": "^6.2.0",
7+
"@pubnub/tomato": "^1.6.0",
8+
"find-my-way": "^7.3.1",
9+
"pubnub": "^7.2.1",
10+
"typescript": "^4.9.3"
11+
}
12+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
import * as Router from 'find-my-way'
2+
3+
type RouteHandler = (
4+
req: {
5+
method: string
6+
body?: any
7+
headers: Record<string, any>
8+
url: { path: string; query: Record<string, string> }
9+
},
10+
params?: Record<string, any>
11+
) =>
12+
| Promise<{
13+
status: number
14+
headers?: Record<string, any>
15+
body?: any
16+
}>
17+
| {
18+
status: number
19+
headers?: Record<string, any>
20+
body?: any
21+
}
22+
23+
type Routes = {
24+
[k: string]: RouteHandler
25+
}
26+
27+
export function router(expect: any, routes: Routes) {
28+
const router = Router()
29+
30+
for (const [key, handler] of Object.entries(routes)) {
31+
const [method, path] = key.split(' ')
32+
33+
router.on(method.toUpperCase() as Router.HTTPMethod, path, handler as any)
34+
}
35+
36+
return async function () {
37+
while (true) {
38+
const request = await expect({
39+
description: 'any request',
40+
validations: [],
41+
})
42+
43+
const route = router.find(
44+
request.method.toUpperCase() as Router.HTTPMethod,
45+
request.url.path
46+
)
47+
48+
if (!route) {
49+
await request.respond({ status: 404 })
50+
} else {
51+
const response = await (route.handler as RouteHandler)(
52+
request,
53+
route.params
54+
)
55+
56+
await request.respond({
57+
status: response?.status,
58+
headers: response?.headers,
59+
body: response?.body,
60+
})
61+
}
62+
}
63+
}
64+
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
export type TimetokenData = {
2+
t: string
3+
r: number
4+
}
5+
6+
export type Envelope = {
7+
a: string
8+
b?: string
9+
c: string
10+
d: any
11+
e?: number
12+
f: number
13+
i: string
14+
k: string
15+
o?: TimetokenData
16+
p: TimetokenData
17+
u?: any
18+
}
19+
20+
export type SubscribeResponse = {
21+
t: TimetokenData
22+
m: Envelope[]
23+
}
24+
25+
export type SubscribeErrorResponse = {
26+
error: true
27+
status: number
28+
message: string
29+
service: 'Access Manager'
30+
payload?: {
31+
channels?: string[]
32+
'channel-groups'?: string[]
33+
}
34+
}
35+
36+
export type PublishResponse = [number, string, string]
37+
38+
export const timetokenData = (
39+
timetoken: string,
40+
region?: number
41+
): TimetokenData => ({
42+
t: timetoken,
43+
r: region ?? 1,
44+
})
45+
46+
export const envelope = (input: {
47+
shard?: string
48+
subscriptionMatch?: string
49+
channel: string
50+
payload: any
51+
messageType?: number
52+
flags?: number
53+
sender: string
54+
subKey: string
55+
metadata?: any
56+
originatingTimetoken?: TimetokenData
57+
publishingTimetoken: TimetokenData
58+
}): Envelope => ({
59+
a: input.shard ?? '1',
60+
b: input.subscriptionMatch ?? input.channel,
61+
c: input.channel,
62+
d: input.payload,
63+
e: input.messageType ?? 0,
64+
f: input.flags ?? 0,
65+
i: input.sender,
66+
k: input.subKey,
67+
o: input.originatingTimetoken,
68+
p: input.publishingTimetoken,
69+
u: input.metadata,
70+
})
71+
72+
export const successfulResponse = (
73+
timetoken: TimetokenData,
74+
envelopes: Envelope[] = []
75+
): SubscribeResponse => ({
76+
t: timetoken,
77+
m: envelopes,
78+
})

0 commit comments

Comments
 (0)