|
9 | 9 | } from "./utils/marqs.js"; |
10 | 10 | import { trace } from "@opentelemetry/api"; |
11 | 11 | import { EnvQueues } from "~/v3/marqs/types.js"; |
| 12 | +import { MARQS_RESUME_PRIORITY_TIMESTAMP_OFFSET } from "~/v3/marqs/constants.server.js"; |
12 | 13 |
|
13 | 14 | const tracer = trace.getTracer("test"); |
14 | 15 |
|
@@ -870,6 +871,135 @@ describe("FairDequeuingStrategy", () => { |
870 | 871 | expect(selectionPercentages["env-4"] || 0).toBeLessThan(20); |
871 | 872 | } |
872 | 873 | ); |
| 874 | + |
| 875 | + redisTest( |
| 876 | + "should not overly bias picking environments when queue have priority offset ages", |
| 877 | + async ({ redis }) => { |
| 878 | + const keyProducer = createKeyProducer("test"); |
| 879 | + const strategy = new FairDequeuingStrategy({ |
| 880 | + tracer, |
| 881 | + redis, |
| 882 | + keys: keyProducer, |
| 883 | + defaultEnvConcurrency: 5, |
| 884 | + parentQueueLimit: 100, |
| 885 | + seed: "test-seed-max-orgs", |
| 886 | + maximumEnvCount: 2, // Only select top 2 orgs |
| 887 | + }); |
| 888 | + |
| 889 | + const now = Date.now(); |
| 890 | + |
| 891 | + // Setup 4 envs with different queue age profiles |
| 892 | + const envSetups = [ |
| 893 | + { |
| 894 | + envId: "env-1", |
| 895 | + queues: [ |
| 896 | + { age: 1000 }, // Average age: 1000 |
| 897 | + ], |
| 898 | + }, |
| 899 | + { |
| 900 | + envId: "env-2", |
| 901 | + queues: [ |
| 902 | + { age: 5000 + MARQS_RESUME_PRIORITY_TIMESTAMP_OFFSET }, // Average age: 5000 + 1 year |
| 903 | + { age: 5000 + MARQS_RESUME_PRIORITY_TIMESTAMP_OFFSET }, |
| 904 | + ], |
| 905 | + }, |
| 906 | + { |
| 907 | + envId: "env-3", |
| 908 | + queues: [ |
| 909 | + { age: 2000 }, // Average age: 2000 |
| 910 | + { age: 2000 }, |
| 911 | + ], |
| 912 | + }, |
| 913 | + { |
| 914 | + envId: "env-4", |
| 915 | + queues: [ |
| 916 | + { age: 500 }, // Average age: 500 |
| 917 | + { age: 500 }, |
| 918 | + ], |
| 919 | + }, |
| 920 | + ]; |
| 921 | + |
| 922 | + // Setup queues and concurrency for each org |
| 923 | + for (const setup of envSetups) { |
| 924 | + await setupConcurrency({ |
| 925 | + redis, |
| 926 | + keyProducer, |
| 927 | + env: { id: setup.envId, currentConcurrency: 0, limit: 5 }, |
| 928 | + }); |
| 929 | + |
| 930 | + for (let i = 0; i < setup.queues.length; i++) { |
| 931 | + await setupQueue({ |
| 932 | + redis, |
| 933 | + keyProducer, |
| 934 | + parentQueue: "parent-queue", |
| 935 | + score: now - setup.queues[i].age, |
| 936 | + queueId: `queue-${setup.envId}-${i}`, |
| 937 | + orgId: `org-${setup.envId}`, |
| 938 | + envId: setup.envId, |
| 939 | + }); |
| 940 | + } |
| 941 | + } |
| 942 | + |
| 943 | + // Run multiple iterations to verify consistent behavior |
| 944 | + const iterations = 100; |
| 945 | + const selectedEnvCounts: Record<string, number> = {}; |
| 946 | + |
| 947 | + for (let i = 0; i < iterations; i++) { |
| 948 | + const envResult = await strategy.distributeFairQueuesFromParentQueue( |
| 949 | + "parent-queue", |
| 950 | + `consumer-${i}` |
| 951 | + ); |
| 952 | + const result = flattenResults(envResult); |
| 953 | + |
| 954 | + // Track which orgs were included in the result |
| 955 | + const selectedEnvs = new Set(result.map((queueId) => keyProducer.envIdFromQueue(queueId))); |
| 956 | + |
| 957 | + // Verify we never get more than maximumOrgCount orgs |
| 958 | + expect(selectedEnvs.size).toBeLessThanOrEqual(2); |
| 959 | + |
| 960 | + for (const envId of selectedEnvs) { |
| 961 | + selectedEnvCounts[envId] = (selectedEnvCounts[envId] || 0) + 1; |
| 962 | + } |
| 963 | + } |
| 964 | + |
| 965 | + console.log("Environment selection counts:", selectedEnvCounts); |
| 966 | + |
| 967 | + // org-2 should be selected most often (highest average age) |
| 968 | + expect(selectedEnvCounts["env-2"]).toBeGreaterThan(selectedEnvCounts["env-4"] || 0); |
| 969 | + |
| 970 | + // org-4 should be selected least often (lowest average age) |
| 971 | + const env4Count = selectedEnvCounts["env-4"] || 0; |
| 972 | + expect(env4Count).toBeLessThan(selectedEnvCounts["env-2"]); |
| 973 | + |
| 974 | + // Verify that envs with higher average queue age are selected more frequently |
| 975 | + const sortedEnvs = Object.entries(selectedEnvCounts).sort((a, b) => b[1] - a[1]); |
| 976 | + console.log("Sorted environment frequencies:", sortedEnvs); |
| 977 | + |
| 978 | + // The top 2 most frequently selected orgs should be env-2 and env-3 |
| 979 | + // as they have the highest average queue ages |
| 980 | + const topTwoEnvs = new Set([sortedEnvs[0][0], sortedEnvs[1][0]]); |
| 981 | + expect(topTwoEnvs).toContain("env-2"); // Highest average age |
| 982 | + expect(topTwoEnvs).toContain("env-3"); // Second highest average age |
| 983 | + |
| 984 | + // Calculate selection percentages |
| 985 | + const totalSelections = Object.values(selectedEnvCounts).reduce((a, b) => a + b, 0); |
| 986 | + const selectionPercentages = Object.entries(selectedEnvCounts).reduce( |
| 987 | + (acc, [orgId, count]) => { |
| 988 | + acc[orgId] = (count / totalSelections) * 100; |
| 989 | + return acc; |
| 990 | + }, |
| 991 | + {} as Record<string, number> |
| 992 | + ); |
| 993 | + |
| 994 | + console.log("Environment selection percentages:", selectionPercentages); |
| 995 | + |
| 996 | + // Verify that env-2 (highest average age) gets selected in at least 40% of iterations |
| 997 | + expect(selectionPercentages["env-2"]).toBeGreaterThan(40); |
| 998 | + |
| 999 | + // Verify that env-4 (lowest average age) gets selected in less than 20% of iterations |
| 1000 | + expect(selectionPercentages["env-4"] || 0).toBeLessThan(20); |
| 1001 | + } |
| 1002 | + ); |
873 | 1003 | }); |
874 | 1004 |
|
875 | 1005 | // Helper function to flatten results for counting |
|
0 commit comments