Skip to content

Commit 6b719c8

Browse files
authored
feat: add subscribeBatchSize option to split subscribe packets for AWS IoT Core (#1995)
1 parent 65def96 commit 6b719c8

File tree

4 files changed

+125
-21
lines changed

4 files changed

+125
-21
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -475,6 +475,8 @@ The arguments are:
475475
For ws/wss protocols only. Can be used to implement a custom websocket subprotocol or implementation.
476476
- `resubscribe` : if connection is broken and reconnects,
477477
subscribed topics are automatically subscribed again (default `true`)
478+
- `subscribeBatchSize` : optional `number`
479+
Maximum number of topics per SUBSCRIBE packet. When the number of topics to subscribe exceeds this value, the client will automatically split them into multiple SUBSCRIBE packets of this size.
478480
- `messageIdProvider`: custom messageId provider. when `new UniqueMessageIdProvider()` is set, then non conflict messageId is provided.
479481
- `log`: custom log function. Default uses [debug](https://www.npmjs.com/package/debug) package.
480482
- `manualConnect`: prevents the constructor to call `connect`. In this case after the `mqtt.connect` is called you should call `client.connect` manually.

src/lib/client.ts

Lines changed: 67 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import { type ClientRequestArgs } from 'http'
2929
import {
3030
type DoneCallback,
3131
type ErrorWithReasonCode,
32+
ErrorWithSubackPacket,
3233
type GenericCallback,
3334
type IStream,
3435
MQTTJS_VERSION,
@@ -61,6 +62,7 @@ const defaultConnectOptions: IClientOptions = {
6162
connectTimeout: 30 * 1000,
6263
clean: true,
6364
resubscribe: true,
65+
subscribeBatchSize: null,
6466
writeCache: true,
6567
timerVariant: 'auto',
6668
}
@@ -220,6 +222,14 @@ export interface IClientOptions extends ISecureClientOptions {
220222
*/
221223
resubscribe?: boolean
222224

225+
/**
226+
* Maximum number of topics to include in a single SUBSCRIBE packet.
227+
* When subscribing to more topics than this, the client will automatically
228+
* split them into batches of this size.
229+
* This is useful on AWS IoT Core, which limits each SUBSCRIBE packet to 8 topics.
230+
*/
231+
subscribeBatchSize?: number
232+
223233
/** when defined this function will be called to transform the url string generated by MqttClient from provided options */
224234
transformWsUrl?: (
225235
url: string,
@@ -1209,16 +1219,13 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
12091219
return this
12101220
}
12111221

1212-
const subscribeProc = () => {
1213-
const messageId = this._nextId()
1214-
if (messageId === null) {
1215-
this.log('No messageId left')
1216-
return false
1217-
}
1218-
1222+
const subscribeChunkedSubs = (
1223+
chunkedSubs: ISubscriptionRequest[],
1224+
messageId: number,
1225+
): Promise<ISubackPacket> => {
12191226
const packet: ISubscribePacket = {
12201227
cmd: 'subscribe',
1221-
subscriptions: subs,
1228+
subscriptions: chunkedSubs,
12221229
// qos: 1,
12231230
// retain: false,
12241231
// dup: false,
@@ -1233,7 +1240,7 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
12331240
if (this.options.resubscribe) {
12341241
this.log('subscribe :: resubscribe true')
12351242
const topics = []
1236-
subs.forEach((sub) => {
1243+
chunkedSubs.forEach((sub) => {
12371244
if (this.options.reconnectPeriod > 0) {
12381245
const topic: IClientSubscribeOptions = { qos: sub.qos }
12391246
if (version === 5) {
@@ -1249,21 +1256,61 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
12491256
this.messageIdToTopic[packet.messageId] = topics
12501257
}
12511258

1252-
this.outgoing[packet.messageId] = {
1253-
volatile: true,
1254-
cb(err, packet2: ISubackPacket) {
1255-
if (!err) {
1256-
const { granted } = packet2
1257-
for (let i = 0; i < granted.length; i += 1) {
1258-
subs[i].qos = granted[i] as QoS
1259+
const promise = new Promise<ISubackPacket>((resolve, reject) => {
1260+
this.outgoing[packet.messageId] = {
1261+
volatile: true,
1262+
cb(err, packet2: ISubackPacket) {
1263+
if (!err) {
1264+
const { granted } = packet2
1265+
for (
1266+
let grantedI = 0;
1267+
grantedI < granted.length;
1268+
grantedI += 1
1269+
) {
1270+
chunkedSubs[grantedI].qos = granted[
1271+
grantedI
1272+
] as QoS
1273+
}
12591274
}
1260-
}
12611275

1262-
callback(err, subs, packet2)
1263-
},
1264-
}
1276+
if (!err) {
1277+
resolve(packet2)
1278+
} else {
1279+
reject(
1280+
new ErrorWithSubackPacket(err.message, packet2),
1281+
)
1282+
}
1283+
},
1284+
}
1285+
})
12651286
this.log('subscribe :: call _sendPacket')
12661287
this._sendPacket(packet)
1288+
return promise
1289+
}
1290+
1291+
const subscribeProc = () => {
1292+
const batchSize = this.options.subscribeBatchSize ?? subs.length
1293+
const subscribePromises: Promise<ISubackPacket>[] = []
1294+
1295+
for (let i = 0; i < subs.length; i += batchSize) {
1296+
const chunkedSubs = subs.slice(i, i + batchSize)
1297+
const messageId = this._nextId()
1298+
if (messageId === null) {
1299+
this.log('No messageId left')
1300+
return false
1301+
}
1302+
subscribePromises.push(
1303+
subscribeChunkedSubs(chunkedSubs, messageId),
1304+
)
1305+
}
1306+
Promise.all(subscribePromises)
1307+
.then((packets) => {
1308+
callback(null, subs, packets.at(-1))
1309+
})
1310+
.catch((err: ErrorWithSubackPacket) => {
1311+
callback(err, subs, err.packet)
1312+
})
1313+
12671314
return true
12681315
}
12691316

src/lib/shared.ts

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import type { Packet } from 'mqtt-packet'
1+
import type { Packet, ISubackPacket } from 'mqtt-packet'
22
import type { Duplex } from 'stream'
33
import type MqttClient from './client'
44
import type { IClientOptions } from './client'
@@ -42,6 +42,19 @@ export class ErrorWithReasonCode extends Error {
4242
}
4343
}
4444

45+
export class ErrorWithSubackPacket extends Error {
46+
public packet: ISubackPacket
47+
48+
public constructor(message: string, packet: ISubackPacket) {
49+
super(message)
50+
this.packet = packet
51+
52+
// We need to set the prototype explicitly
53+
Object.setPrototypeOf(this, ErrorWithSubackPacket.prototype)
54+
Object.getPrototypeOf(this).name = 'ErrorWithSubackPacket'
55+
}
56+
}
57+
4558
// eslint-disable-next-line @typescript-eslint/ban-types
4659
export type Constructor<T = {}> = new (...args: any[]) => T
4760

test/node/abstract_client.ts

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2619,6 +2619,48 @@ export default function abstractTest(server, config, ports) {
26192619
})
26202620
})
26212621
})
2622+
2623+
it('should send multiple subscribe packets when topic count exceeds batchSize', function _test(t, done) {
2624+
const client = connect({ subscribeBatchSize: 2 })
2625+
const subs = ['test1', 'test2', 'test3']
2626+
client.once('connect', () => {
2627+
client.subscribe(subs)
2628+
})
2629+
2630+
const spy = sinon.spy()
2631+
server.once('client', (serverClient) => {
2632+
serverClient.on('subscribe', spy)
2633+
})
2634+
client.on('end', () => {
2635+
assert.strictEqual(spy.callCount, 2)
2636+
for (let i = 0; i < 2; i++) {
2637+
// i.e. [{topic: 'a', qos: 0}, {topic: 'b', qos: 0}]
2638+
const expected = subs
2639+
.slice(i * 2, i * 2 + 2)
2640+
.map((topic) => {
2641+
const result: ISubscriptionRequest = {
2642+
topic,
2643+
qos: 0,
2644+
}
2645+
if (version === 5) {
2646+
result.nl = false
2647+
result.rap = false
2648+
result.rh = 0
2649+
}
2650+
return result
2651+
})
2652+
2653+
assert.deepStrictEqual(
2654+
spy.getCall(i).args[0].subscriptions,
2655+
expected,
2656+
)
2657+
}
2658+
done()
2659+
})
2660+
setTimeout(() => {
2661+
client.end()
2662+
}, 300)
2663+
})
26222664
})
26232665

26242666
describe('receiving messages', () => {

0 commit comments

Comments
 (0)