Skip to content

Commit 86cc5cc

Browse files
committed
Refactor TaskMessageQueue to not store response closures
1 parent 702d077 commit 86cc5cc

File tree

4 files changed

+1175
-128
lines changed

4 files changed

+1175
-128
lines changed

src/examples/shared/inMemoryTaskStore.test.ts

Lines changed: 271 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest';
2-
import { InMemoryTaskStore } from './inMemoryTaskStore.js';
2+
import { InMemoryTaskStore, InMemoryTaskMessageQueue } from './inMemoryTaskStore.js';
33
import { TaskCreationParams, Request } from '../../types.js';
4+
import { QueuedMessage } from '../../shared/task.js';
45

56
describe('InMemoryTaskStore', () => {
67
let store: InMemoryTaskStore;
@@ -664,3 +665,272 @@ describe('InMemoryTaskStore', () => {
664665
});
665666
});
666667
});
668+
669+
describe('InMemoryTaskMessageQueue', () => {
670+
let queue: InMemoryTaskMessageQueue;
671+
672+
beforeEach(() => {
673+
queue = new InMemoryTaskMessageQueue();
674+
});
675+
676+
describe('enqueue and dequeue', () => {
677+
it('should enqueue and dequeue request messages', async () => {
678+
const requestMessage: QueuedMessage = {
679+
type: 'request',
680+
message: {
681+
jsonrpc: '2.0',
682+
id: 1,
683+
method: 'tools/call',
684+
params: { name: 'test-tool', arguments: {} }
685+
},
686+
timestamp: Date.now()
687+
};
688+
689+
await queue.enqueue('task-1', requestMessage);
690+
const dequeued = await queue.dequeue('task-1');
691+
692+
expect(dequeued).toEqual(requestMessage);
693+
});
694+
695+
it('should enqueue and dequeue notification messages', async () => {
696+
const notificationMessage: QueuedMessage = {
697+
type: 'notification',
698+
message: {
699+
jsonrpc: '2.0',
700+
method: 'notifications/progress',
701+
params: { progress: 50, total: 100 }
702+
},
703+
timestamp: Date.now()
704+
};
705+
706+
await queue.enqueue('task-2', notificationMessage);
707+
const dequeued = await queue.dequeue('task-2');
708+
709+
expect(dequeued).toEqual(notificationMessage);
710+
});
711+
712+
it('should enqueue and dequeue response messages', async () => {
713+
const responseMessage: QueuedMessage = {
714+
type: 'response',
715+
message: {
716+
jsonrpc: '2.0',
717+
id: 42,
718+
result: { content: [{ type: 'text', text: 'Success' }] }
719+
},
720+
timestamp: Date.now()
721+
};
722+
723+
await queue.enqueue('task-3', responseMessage);
724+
const dequeued = await queue.dequeue('task-3');
725+
726+
expect(dequeued).toEqual(responseMessage);
727+
});
728+
729+
it('should return undefined when dequeuing from empty queue', async () => {
730+
const dequeued = await queue.dequeue('task-empty');
731+
expect(dequeued).toBeUndefined();
732+
});
733+
734+
it('should maintain FIFO order for mixed message types', async () => {
735+
const request: QueuedMessage = {
736+
type: 'request',
737+
message: {
738+
jsonrpc: '2.0',
739+
id: 1,
740+
method: 'tools/call',
741+
params: {}
742+
},
743+
timestamp: 1000
744+
};
745+
746+
const notification: QueuedMessage = {
747+
type: 'notification',
748+
message: {
749+
jsonrpc: '2.0',
750+
method: 'notifications/progress',
751+
params: {}
752+
},
753+
timestamp: 2000
754+
};
755+
756+
const response: QueuedMessage = {
757+
type: 'response',
758+
message: {
759+
jsonrpc: '2.0',
760+
id: 1,
761+
result: {}
762+
},
763+
timestamp: 3000
764+
};
765+
766+
await queue.enqueue('task-fifo', request);
767+
await queue.enqueue('task-fifo', notification);
768+
await queue.enqueue('task-fifo', response);
769+
770+
expect(await queue.dequeue('task-fifo')).toEqual(request);
771+
expect(await queue.dequeue('task-fifo')).toEqual(notification);
772+
expect(await queue.dequeue('task-fifo')).toEqual(response);
773+
expect(await queue.dequeue('task-fifo')).toBeUndefined();
774+
});
775+
});
776+
777+
describe('dequeueAll', () => {
778+
it('should dequeue all messages including responses', async () => {
779+
const request: QueuedMessage = {
780+
type: 'request',
781+
message: {
782+
jsonrpc: '2.0',
783+
id: 1,
784+
method: 'tools/call',
785+
params: {}
786+
},
787+
timestamp: 1000
788+
};
789+
790+
const response: QueuedMessage = {
791+
type: 'response',
792+
message: {
793+
jsonrpc: '2.0',
794+
id: 1,
795+
result: {}
796+
},
797+
timestamp: 2000
798+
};
799+
800+
const notification: QueuedMessage = {
801+
type: 'notification',
802+
message: {
803+
jsonrpc: '2.0',
804+
method: 'notifications/progress',
805+
params: {}
806+
},
807+
timestamp: 3000
808+
};
809+
810+
await queue.enqueue('task-all', request);
811+
await queue.enqueue('task-all', response);
812+
await queue.enqueue('task-all', notification);
813+
814+
const all = await queue.dequeueAll('task-all');
815+
816+
expect(all).toHaveLength(3);
817+
expect(all[0]).toEqual(request);
818+
expect(all[1]).toEqual(response);
819+
expect(all[2]).toEqual(notification);
820+
});
821+
822+
it('should return empty array for non-existent task', async () => {
823+
const all = await queue.dequeueAll('non-existent');
824+
expect(all).toEqual([]);
825+
});
826+
827+
it('should clear the queue after dequeueAll', async () => {
828+
const message: QueuedMessage = {
829+
type: 'request',
830+
message: {
831+
jsonrpc: '2.0',
832+
id: 1,
833+
method: 'test',
834+
params: {}
835+
},
836+
timestamp: Date.now()
837+
};
838+
839+
await queue.enqueue('task-clear', message);
840+
await queue.dequeueAll('task-clear');
841+
842+
const dequeued = await queue.dequeue('task-clear');
843+
expect(dequeued).toBeUndefined();
844+
});
845+
});
846+
847+
describe('queue size limits', () => {
848+
it('should throw when maxSize is exceeded', async () => {
849+
const message: QueuedMessage = {
850+
type: 'request',
851+
message: {
852+
jsonrpc: '2.0',
853+
id: 1,
854+
method: 'test',
855+
params: {}
856+
},
857+
timestamp: Date.now()
858+
};
859+
860+
await queue.enqueue('task-limit', message, undefined, 2);
861+
await queue.enqueue('task-limit', message, undefined, 2);
862+
863+
await expect(queue.enqueue('task-limit', message, undefined, 2)).rejects.toThrow('Task message queue overflow');
864+
});
865+
866+
it('should allow enqueue when under maxSize', async () => {
867+
const message: QueuedMessage = {
868+
type: 'response',
869+
message: {
870+
jsonrpc: '2.0',
871+
id: 1,
872+
result: {}
873+
},
874+
timestamp: Date.now()
875+
};
876+
877+
await expect(queue.enqueue('task-ok', message, undefined, 5)).resolves.toBeUndefined();
878+
});
879+
});
880+
881+
describe('task isolation', () => {
882+
it('should isolate messages between different tasks', async () => {
883+
const message1: QueuedMessage = {
884+
type: 'request',
885+
message: {
886+
jsonrpc: '2.0',
887+
id: 1,
888+
method: 'test1',
889+
params: {}
890+
},
891+
timestamp: 1000
892+
};
893+
894+
const message2: QueuedMessage = {
895+
type: 'response',
896+
message: {
897+
jsonrpc: '2.0',
898+
id: 2,
899+
result: {}
900+
},
901+
timestamp: 2000
902+
};
903+
904+
await queue.enqueue('task-a', message1);
905+
await queue.enqueue('task-b', message2);
906+
907+
expect(await queue.dequeue('task-a')).toEqual(message1);
908+
expect(await queue.dequeue('task-b')).toEqual(message2);
909+
expect(await queue.dequeue('task-a')).toBeUndefined();
910+
expect(await queue.dequeue('task-b')).toBeUndefined();
911+
});
912+
});
913+
914+
describe('response message error handling', () => {
915+
it('should handle response messages with errors', async () => {
916+
const errorResponse: QueuedMessage = {
917+
type: 'error',
918+
message: {
919+
jsonrpc: '2.0',
920+
id: 1,
921+
error: {
922+
code: -32600,
923+
message: 'Invalid Request'
924+
}
925+
},
926+
timestamp: Date.now()
927+
};
928+
929+
await queue.enqueue('task-error', errorResponse);
930+
const dequeued = await queue.dequeue('task-error');
931+
932+
expect(dequeued).toEqual(errorResponse);
933+
expect(dequeued?.type).toBe('error');
934+
});
935+
});
936+
});

0 commit comments

Comments
 (0)