From b8e75fa29e36ba72683b8dbc81f66d62c9aa8cb5 Mon Sep 17 00:00:00 2001 From: jhjang Date: Wed, 13 Aug 2025 06:00:39 +0900 Subject: [PATCH 1/5] feat: add round-robin retries via Retry Rounds/Retry Delay --- runner/options.go | 8 +++ runner/runner.go | 110 +++++++++++++++++++++++++++++++++++++----- runner/runner_test.go | 78 ++++++++++++++++++++++++++++-- runner/types.go | 11 +++++ 4 files changed, 192 insertions(+), 15 deletions(-) diff --git a/runner/options.go b/runner/options.go index 2fa8d601..8edfccff 100644 --- a/runner/options.go +++ b/runner/options.go @@ -274,6 +274,8 @@ type Options struct { RateLimitMinute int Probe bool Resume bool + RetryRounds int + RetryDelay int resumeCfg *ResumeCfg Exclude goflags.StringSlice HostMaxErrors int @@ -530,6 +532,8 @@ func ParseOptions() *Options { flagSet.DurationVar(&options.Delay, "delay", -1, "duration between each http request (eg: 200ms, 1s)"), flagSet.IntVarP(&options.MaxResponseBodySizeToSave, "response-size-to-save", "rsts", math.MaxInt32, "max response size to save in bytes"), flagSet.IntVarP(&options.MaxResponseBodySizeToRead, "response-size-to-read", "rstr", math.MaxInt32, "max response size to read in bytes"), + flagSet.IntVarP(&options.RetryRounds, "retry-rounds", "rr", 0, "number of retry rounds for HTTP 429 responses (Too Many Requests)"), + flagSet.IntVarP(&options.RetryDelay, "retry-delay", "rd", 500, "delay between retry rounds for HTTP 429 responses (e.g. 5ms, 30ms)"), ) flagSet.CreateGroup("cloud", "Cloud", @@ -757,6 +761,10 @@ func (options *Options) ValidateOptions() error { options.Threads = defaultThreads } + if options.RetryRounds > 0 && options.RetryDelay <= 0 { + return errors.New(fmt.Sprintf("invalid retry-delay: must be >0 when retry-rounds=%d (got %d)", options.RetryRounds, options.RetryDelay)) + } + return nil } diff --git a/runner/runner.go b/runner/runner.go index f6ec504d..aaf848a1 100644 --- a/runner/runner.go +++ b/runner/runner.go @@ -1257,6 +1257,9 @@ func (r *Runner) RunEnumeration() { }(nextStep) wg, _ := syncutil.New(syncutil.WithSize(r.options.Threads)) + retryCh := make(chan retryJob) + + retryCancel, retryWait := r.retryLoop(context.Background(), retryCh, output, r.analyze) processItem := func(k string) error { if r.options.resumeCfg != nil { @@ -1279,10 +1282,10 @@ func (r *Runner) RunEnumeration() { for _, p := range r.options.requestURIs { scanopts := r.scanopts.Clone() scanopts.RequestURI = p - r.process(k, wg, r.hp, protocol, scanopts, output) + r.process(k, wg, r.hp, protocol, scanopts, output, retryCh) } } else { - r.process(k, wg, r.hp, protocol, &r.scanopts, output) + r.process(k, wg, r.hp, protocol, &r.scanopts, output, retryCh) } return nil @@ -1300,8 +1303,11 @@ func (r *Runner) RunEnumeration() { wg.Wait() - close(output) + retryWait() + retryCancel() + close(retryCh) + close(output) wgoutput.Wait() if r.scanopts.StoreVisionReconClusters { @@ -1323,6 +1329,62 @@ func (r *Runner) RunEnumeration() { } } +type analyzeFunc func(*httpx.HTTPX, string, httpx.Target, string, string, *ScanOptions) Result + +func (r *Runner) retryLoop( + parent context.Context, + ch chan retryJob, + output chan<- Result, + analyze analyzeFunc, +) (func(), func()) { + ctx, cancel := context.WithCancel(parent) + var jobWG sync.WaitGroup + + go func() { + for { + select { + case <-ctx.Done(): + return + case job := <-ch: + jobWG.Add(1) + + go func(j retryJob) { + defer jobWG.Done() + + if wait := time.Until(j.when); wait > 0 { + timer := time.NewTimer(wait) + select { + case <-ctx.Done(): + timer.Stop() + return + case <-timer.C: + } + } + + res := analyze(j.hp, j.protocol, j.target, j.method, j.origInput, j.scanopts) + output <- res + + if res.StatusCode == http.StatusTooManyRequests && + j.attempt < r.options.RetryRounds { + + j.attempt++ + j.when = time.Now().Add(time.Duration(r.options.RetryDelay) * time.Millisecond) + + select { + case <-ctx.Done(): + case ch <- j: + } + } + }(job) + } + } + }() + + stop := func() { cancel() } + wait := func() { jobWG.Wait() } + return stop, wait +} + func logFilteredErrorPage(fileName, url string) { dir := filepath.Dir(fileName) if !fileutil.FolderExists(dir) { @@ -1380,11 +1442,11 @@ func (r *Runner) GetScanOpts() ScanOptions { return r.scanopts } -func (r *Runner) Process(t string, wg *syncutil.AdaptiveWaitGroup, protocol string, scanopts *ScanOptions, output chan Result) { - r.process(t, wg, r.hp, protocol, scanopts, output) +func (r *Runner) Process(t string, wg *syncutil.AdaptiveWaitGroup, protocol string, scanopts *ScanOptions, output chan Result, retryCh chan retryJob) { + r.process(t, wg, r.hp, protocol, scanopts, output, retryCh) } -func (r *Runner) process(t string, wg *syncutil.AdaptiveWaitGroup, hp *httpx.HTTPX, protocol string, scanopts *ScanOptions, output chan Result) { +func (r *Runner) process(t string, wg *syncutil.AdaptiveWaitGroup, hp *httpx.HTTPX, protocol string, scanopts *ScanOptions, output chan Result, retryCh chan retryJob) { // attempts to set the workpool size to the number of threads if r.options.Threads > 0 && wg.Size != r.options.Threads { if err := wg.Resize(context.Background(), r.options.Threads); err != nil { @@ -1409,15 +1471,28 @@ func (r *Runner) process(t string, wg *syncutil.AdaptiveWaitGroup, hp *httpx.HTT defer wg.Done() result := r.analyze(hp, protocol, target, method, t, scanopts) output <- result + if result.StatusCode == http.StatusTooManyRequests && + r.options.RetryRounds > 0 { + retryCh <- retryJob{ + hp: hp, + protocol: protocol, + target: target, + method: method, + origInput: t, + scanopts: scanopts.Clone(), + attempt: 1, + when: time.Now().Add(time.Duration(r.options.RetryDelay) * time.Millisecond), + } + } if scanopts.TLSProbe && result.TLSData != nil { for _, tt := range result.TLSData.SubjectAN { if !r.testAndSet(tt) { continue } - r.process(tt, wg, hp, protocol, scanopts, output) + r.process(tt, wg, hp, protocol, scanopts, output, retryCh) } if r.testAndSet(result.TLSData.SubjectCN) { - r.process(result.TLSData.SubjectCN, wg, hp, protocol, scanopts, output) + r.process(result.TLSData.SubjectCN, wg, hp, protocol, scanopts, output, retryCh) } } if scanopts.CSPProbe && result.CSPData != nil { @@ -1428,7 +1503,7 @@ func (r *Runner) process(t string, wg *syncutil.AdaptiveWaitGroup, hp *httpx.HTT if !r.testAndSet(tt) { continue } - r.process(tt, wg, hp, protocol, scanopts, output) + r.process(tt, wg, hp, protocol, scanopts, output, retryCh) } } }(target, method, prot) @@ -1463,15 +1538,28 @@ func (r *Runner) process(t string, wg *syncutil.AdaptiveWaitGroup, hp *httpx.HTT } result := r.analyze(hp, protocol, target, method, t, scanopts) output <- result + if result.StatusCode == http.StatusTooManyRequests && + r.options.RetryRounds > 0 { + retryCh <- retryJob{ + hp: hp, + protocol: protocol, + target: target, + method: method, + origInput: t, + scanopts: scanopts.Clone(), + attempt: 1, + when: time.Now().Add(time.Duration(r.options.RetryDelay) * time.Millisecond), + } + } if scanopts.TLSProbe && result.TLSData != nil { for _, tt := range result.TLSData.SubjectAN { if !r.testAndSet(tt) { continue } - r.process(tt, wg, hp, protocol, scanopts, output) + r.process(tt, wg, hp, protocol, scanopts, output, retryCh) } if r.testAndSet(result.TLSData.SubjectCN) { - r.process(result.TLSData.SubjectCN, wg, hp, protocol, scanopts, output) + r.process(result.TLSData.SubjectCN, wg, hp, protocol, scanopts, output, retryCh) } } }(port, target, method, wantedProtocol) diff --git a/runner/runner_test.go b/runner/runner_test.go index 850566b8..3395a61f 100644 --- a/runner/runner_test.go +++ b/runner/runner_test.go @@ -1,7 +1,9 @@ package runner import ( + "context" "fmt" + "net/http" "os" "strings" "testing" @@ -227,10 +229,10 @@ func TestCreateNetworkpolicyInstance_AllowDenyFlags(t *testing.T) { runner := &Runner{} tests := []struct { - name string - allow []string - deny []string - testCases []struct { + name string + allow []string + deny []string + testCases []struct { ip string expected bool reason string @@ -312,3 +314,71 @@ func TestCreateNetworkpolicyInstance_AllowDenyFlags(t *testing.T) { }) } } + +func TestRunner_RetryLoop(t *testing.T) { + retryCh := make(chan retryJob) + out := make(chan Result) + + r, err := New(&Options{ + RetryDelay: 500, + RetryRounds: 3, + }) + require.Nil(t, err, "could not create httpx runner") + + var calls = map[string]int{} + analyze := func(hp *httpx.HTTPX, + protocol string, + target httpx.Target, + method, origInput string, + scanopts *ScanOptions) Result { + calls[method]++ + if strings.HasPrefix(method, "retry-") && calls[method] == 1 { + return Result{StatusCode: http.StatusTooManyRequests} + } + return Result{StatusCode: http.StatusOK} + } + + cancel, wait := r.retryLoop(context.Background(), retryCh, out, analyze) + + seed := []retryJob{ + {method: "ok-a", when: time.Now().Add(-time.Millisecond), attempt: 1}, + {method: "retry-a", when: time.Now().Add(-time.Millisecond), attempt: 1}, + {method: "ok-b", when: time.Now().Add(-time.Millisecond), attempt: 1}, + {method: "retry-b", when: time.Now().Add(-time.Millisecond), attempt: 1}, + } + for _, j := range seed { + retryCh <- j + } + + want := 6 + got := make([]Result, 0, want) + deadline := time.After(2 * time.Second) + + for len(got) < want { + select { + case r := <-out: + got = append(got, r) + case <-deadline: + t.Errorf("timed out waiting results: got=%d want=%d", len(got), want) + } + } + + wait() + cancel() + close(retryCh) + + close(out) + + var n429, n200 int + for _, r := range got { + switch r.StatusCode { + case http.StatusTooManyRequests: + n429++ + case http.StatusOK: + n200++ + } + } + + require.GreaterOrEqual(t, n429, 2) + require.Equal(t, 4, n200) +} diff --git a/runner/types.go b/runner/types.go index 724e8697..5f4367c3 100644 --- a/runner/types.go +++ b/runner/types.go @@ -120,6 +120,17 @@ type Trace struct { WroteRequest time.Time `json:"wrote_request,omitempty"` } +type retryJob struct { + hp *httpx.HTTPX + protocol string + target httpx.Target + method string + origInput string + scanopts *ScanOptions + attempt int + when time.Time +} + // function to get dsl variables from result struct func dslVariables() ([]string, error) { fakeResult := Result{} From 8c2f4ffef97b2b9488eb5e93b92ad6a44f30ba36 Mon Sep 17 00:00:00 2001 From: jhjang Date: Wed, 13 Aug 2025 18:42:06 +0900 Subject: [PATCH 2/5] fix: remove short aliases (-rr, -rd) to avoid redefinition --- runner/options.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/runner/options.go b/runner/options.go index 8edfccff..44bda3b0 100644 --- a/runner/options.go +++ b/runner/options.go @@ -532,8 +532,8 @@ func ParseOptions() *Options { flagSet.DurationVar(&options.Delay, "delay", -1, "duration between each http request (eg: 200ms, 1s)"), flagSet.IntVarP(&options.MaxResponseBodySizeToSave, "response-size-to-save", "rsts", math.MaxInt32, "max response size to save in bytes"), flagSet.IntVarP(&options.MaxResponseBodySizeToRead, "response-size-to-read", "rstr", math.MaxInt32, "max response size to read in bytes"), - flagSet.IntVarP(&options.RetryRounds, "retry-rounds", "rr", 0, "number of retry rounds for HTTP 429 responses (Too Many Requests)"), - flagSet.IntVarP(&options.RetryDelay, "retry-delay", "rd", 500, "delay between retry rounds for HTTP 429 responses (e.g. 5ms, 30ms)"), + flagSet.IntVar(&options.RetryRounds, "retry-rounds", 0, "number of retry rounds for HTTP 429 responses (Too Many Requests)"), + flagSet.IntVar(&options.RetryDelay, "retry-delay", 500, "delay between retry rounds for HTTP 429 responses (e.g. 5ms, 30ms)"), ) flagSet.CreateGroup("cloud", "Cloud", From 6147653d9821a2807fe2f5c68d849729ad145960 Mon Sep 17 00:00:00 2001 From: jhjang Date: Wed, 13 Aug 2025 18:42:34 +0900 Subject: [PATCH 3/5] fix: add max retries reached --- out.txt | 1 + runner/runner.go | 7 ++- runner/runner_test.go | 129 ++++++++++++++++++++++++++---------------- 3 files changed, 85 insertions(+), 52 deletions(-) create mode 100644 out.txt diff --git a/out.txt b/out.txt new file mode 100644 index 00000000..df20805c --- /dev/null +++ b/out.txt @@ -0,0 +1 @@ +http://localhost:8080/once-429 diff --git a/runner/runner.go b/runner/runner.go index aaf848a1..a54d292d 100644 --- a/runner/runner.go +++ b/runner/runner.go @@ -1364,14 +1364,17 @@ func (r *Runner) retryLoop( res := analyze(j.hp, j.protocol, j.target, j.method, j.origInput, j.scanopts) output <- res - if res.StatusCode == http.StatusTooManyRequests && - j.attempt < r.options.RetryRounds { + if res.StatusCode == http.StatusTooManyRequests { + if j.attempt >= r.options.RetryRounds { + return + } j.attempt++ j.when = time.Now().Add(time.Duration(r.options.RetryDelay) * time.Millisecond) select { case <-ctx.Done(): + return case ch <- j: } } diff --git a/runner/runner_test.go b/runner/runner_test.go index 3395a61f..79bb1cbf 100644 --- a/runner/runner_test.go +++ b/runner/runner_test.go @@ -3,9 +3,13 @@ package runner import ( "context" "fmt" + "log" "net/http" + "net/http/httptest" "os" "strings" + "sync" + "sync/atomic" "testing" "time" @@ -13,6 +17,7 @@ import ( "github.com/projectdiscovery/httpx/common/httpx" "github.com/projectdiscovery/mapcidr/asn" stringsutil "github.com/projectdiscovery/utils/strings" + syncutil "github.com/projectdiscovery/utils/sync" "github.com/stretchr/testify/require" ) @@ -315,70 +320,94 @@ func TestCreateNetworkpolicyInstance_AllowDenyFlags(t *testing.T) { } } -func TestRunner_RetryLoop(t *testing.T) { - retryCh := make(chan retryJob) - out := make(chan Result) +func TestRunner_Process_And_RetryLoop(t *testing.T) { + var hits1, hits2 int32 + srv1 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if atomic.AddInt32(&hits1, 1) != 4 { + log.Println("serv1 429") + w.WriteHeader(http.StatusTooManyRequests) + return + } + log.Println("serv1 200") + w.WriteHeader(http.StatusOK) + })) + defer srv1.Close() + + srv2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if atomic.AddInt32(&hits2, 1) != 3 { + log.Println("serv2 429") + w.WriteHeader(http.StatusTooManyRequests) + return + } + log.Println("serv2 200") + w.WriteHeader(http.StatusOK) + })) + defer srv2.Close() r, err := New(&Options{ - RetryDelay: 500, + Threads: 1, + Delay: 0, RetryRounds: 3, + RetryDelay: 200, // Duration 권장 + Timeout: 2, }) - require.Nil(t, err, "could not create httpx runner") + require.NoError(t, err) - var calls = map[string]int{} - analyze := func(hp *httpx.HTTPX, - protocol string, - target httpx.Target, - method, origInput string, - scanopts *ScanOptions) Result { - calls[method]++ - if strings.HasPrefix(method, "retry-") && calls[method] == 1 { - return Result{StatusCode: http.StatusTooManyRequests} - } - return Result{StatusCode: http.StatusOK} - } + output := make(chan Result) + retryCh := make(chan retryJob) - cancel, wait := r.retryLoop(context.Background(), retryCh, out, analyze) + // ctx, timeout := context.WithTimeout(context.Background(), time.Duration(r.options.Timeout)) + // defer timeout() + cancel, wait := r.retryLoop(context.Background(), retryCh, output, r.analyze) - seed := []retryJob{ - {method: "ok-a", when: time.Now().Add(-time.Millisecond), attempt: 1}, - {method: "retry-a", when: time.Now().Add(-time.Millisecond), attempt: 1}, - {method: "ok-b", when: time.Now().Add(-time.Millisecond), attempt: 1}, - {method: "retry-b", when: time.Now().Add(-time.Millisecond), attempt: 1}, - } - for _, j := range seed { - retryCh <- j - } + wg, _ := syncutil.New(syncutil.WithSize(r.options.Threads)) + so := r.scanopts.Clone() + so.Methods = []string{"GET"} + so.TLSProbe = false + so.CSPProbe = false - want := 6 - got := make([]Result, 0, want) - deadline := time.After(2 * time.Second) + seed := map[string]string{ + "srv1": srv1.URL, + "srv2": srv2.URL, + } - for len(got) < want { - select { - case r := <-out: - got = append(got, r) - case <-deadline: - t.Errorf("timed out waiting results: got=%d want=%d", len(got), want) + var drainWG sync.WaitGroup + drainWG.Add(1) + var s1n429, s1n200, s2n429, s2n200 int + go func(output chan Result) { + defer drainWG.Done() + for res := range output { + switch res.StatusCode { + case http.StatusTooManyRequests: + if res.URL == srv1.URL { + s1n429++ + } else { + s2n429++ + } + case http.StatusOK: + if res.URL == srv1.URL { + s1n200++ + } else { + s2n200++ + } + } } + }(output) + + for _, url := range seed { + r.process(url, wg, r.hp, httpx.HTTP, so, output, retryCh) } + wg.Wait() wait() cancel() - close(retryCh) - close(out) - - var n429, n200 int - for _, r := range got { - switch r.StatusCode { - case http.StatusTooManyRequests: - n429++ - case http.StatusOK: - n200++ - } - } + close(retryCh) + close(output) + drainWG.Wait() - require.GreaterOrEqual(t, n429, 2) - require.Equal(t, 4, n200) + require.Equal(t, 3, s1n429) + require.Equal(t, 1, s1n200) + require.Equal(t, 2, s2n429) + require.Equal(t, 1, s2n200) } From ae4b2e900d23efd6a52714a6369aad0b7441cb84 Mon Sep 17 00:00:00 2001 From: jhjang Date: Wed, 13 Aug 2025 06:00:39 +0900 Subject: [PATCH 4/5] feat: add round-robin retries via Retry Rounds/Retry Delay --- runner/options.go | 4 ++++ runner/runner.go | 48 +++++++++++++++++++++++-------------------- runner/runner_test.go | 23 ++++++--------------- 3 files changed, 36 insertions(+), 39 deletions(-) diff --git a/runner/options.go b/runner/options.go index 44bda3b0..e1cb3041 100644 --- a/runner/options.go +++ b/runner/options.go @@ -765,6 +765,10 @@ func (options *Options) ValidateOptions() error { return errors.New(fmt.Sprintf("invalid retry-delay: must be >0 when retry-rounds=%d (got %d)", options.RetryRounds, options.RetryDelay)) } + if options.RetryRounds > 0 && options.RetryDelay <= 0 { + return errors.New(fmt.Sprintf("invalid retry-delay: must be >0 when retry-rounds=%d (got %d)", options.RetryRounds, options.RetryDelay)) + } + return nil } diff --git a/runner/runner.go b/runner/runner.go index a54d292d..9f003544 100644 --- a/runner/runner.go +++ b/runner/runner.go @@ -23,6 +23,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "golang.org/x/exp/maps" @@ -1259,7 +1260,7 @@ func (r *Runner) RunEnumeration() { wg, _ := syncutil.New(syncutil.WithSize(r.options.Threads)) retryCh := make(chan retryJob) - retryCancel, retryWait := r.retryLoop(context.Background(), retryCh, output, r.analyze) + _, drainedCh := r.retryLoop(context.Background(), retryCh, output, r.analyze) processItem := func(k string) error { if r.options.resumeCfg != nil { @@ -1302,11 +1303,9 @@ func (r *Runner) RunEnumeration() { } wg.Wait() - - retryWait() - retryCancel() - close(retryCh) - + if r.options.RetryRounds > 0 { + <-drainedCh + } close(output) wgoutput.Wait() @@ -1333,24 +1332,30 @@ type analyzeFunc func(*httpx.HTTPX, string, httpx.Target, string, string, *ScanO func (r *Runner) retryLoop( parent context.Context, - ch chan retryJob, + retryCh chan retryJob, output chan<- Result, analyze analyzeFunc, -) (func(), func()) { +) (stop func(), drained <-chan struct{}) { + var remaining atomic.Int64 ctx, cancel := context.WithCancel(parent) - var jobWG sync.WaitGroup + drainedCh := make(chan struct{}) go func() { + defer close(retryCh) + for { select { case <-ctx.Done(): return - case job := <-ch: - jobWG.Add(1) + case job, ok := <-retryCh: + if !ok { + return + } + if job.attempt == 1 { + remaining.Add(1) + } go func(j retryJob) { - defer jobWG.Done() - if wait := time.Until(j.when); wait > 0 { timer := time.NewTimer(wait) select { @@ -1364,28 +1369,27 @@ func (r *Runner) retryLoop( res := analyze(j.hp, j.protocol, j.target, j.method, j.origInput, j.scanopts) output <- res - if res.StatusCode == http.StatusTooManyRequests { - if j.attempt >= r.options.RetryRounds { - return - } - + if res.StatusCode == http.StatusTooManyRequests && j.attempt < r.options.RetryRounds { j.attempt++ j.when = time.Now().Add(time.Duration(r.options.RetryDelay) * time.Millisecond) select { case <-ctx.Done(): return - case ch <- j: + case retryCh <- j: + return } } + + if remaining.Add(-1) == 0 { + close(drainedCh) + } }(job) } } }() - stop := func() { cancel() } - wait := func() { jobWG.Wait() } - return stop, wait + return func() { cancel() }, drainedCh } func logFilteredErrorPage(fileName, url string) { diff --git a/runner/runner_test.go b/runner/runner_test.go index 79bb1cbf..950b97d7 100644 --- a/runner/runner_test.go +++ b/runner/runner_test.go @@ -3,7 +3,6 @@ package runner import ( "context" "fmt" - "log" "net/http" "net/http/httptest" "os" @@ -324,41 +323,34 @@ func TestRunner_Process_And_RetryLoop(t *testing.T) { var hits1, hits2 int32 srv1 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if atomic.AddInt32(&hits1, 1) != 4 { - log.Println("serv1 429") w.WriteHeader(http.StatusTooManyRequests) return } - log.Println("serv1 200") w.WriteHeader(http.StatusOK) })) defer srv1.Close() srv2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if atomic.AddInt32(&hits2, 1) != 3 { - log.Println("serv2 429") w.WriteHeader(http.StatusTooManyRequests) return } - log.Println("serv2 200") w.WriteHeader(http.StatusOK) })) defer srv2.Close() r, err := New(&Options{ Threads: 1, - Delay: 0, - RetryRounds: 3, - RetryDelay: 200, // Duration 권장 - Timeout: 2, + RetryRounds: 2, + RetryDelay: 5, + Timeout: 3, }) require.NoError(t, err) output := make(chan Result) retryCh := make(chan retryJob) - // ctx, timeout := context.WithTimeout(context.Background(), time.Duration(r.options.Timeout)) - // defer timeout() - cancel, wait := r.retryLoop(context.Background(), retryCh, output, r.analyze) + _, drainedCh := r.retryLoop(context.Background(), retryCh, output, r.analyze) wg, _ := syncutil.New(syncutil.WithSize(r.options.Threads)) so := r.scanopts.Clone() @@ -399,15 +391,12 @@ func TestRunner_Process_And_RetryLoop(t *testing.T) { } wg.Wait() - wait() - cancel() - - close(retryCh) + <-drainedCh close(output) drainWG.Wait() require.Equal(t, 3, s1n429) - require.Equal(t, 1, s1n200) + require.Equal(t, 0, s1n200) require.Equal(t, 2, s2n429) require.Equal(t, 1, s2n200) } From 7fe6a48eebf7148130038d8de27d044d5477fbf6 Mon Sep 17 00:00:00 2001 From: jhjang Date: Thu, 21 Aug 2025 11:32:30 +0900 Subject: [PATCH 5/5] refactor: retryLoop with atomic counter and drained channel --- out.txt | 1 - runner/options.go | 4 ---- runner/runner_test.go | 11 +++++++++-- 3 files changed, 9 insertions(+), 7 deletions(-) delete mode 100644 out.txt diff --git a/out.txt b/out.txt deleted file mode 100644 index df20805c..00000000 --- a/out.txt +++ /dev/null @@ -1 +0,0 @@ -http://localhost:8080/once-429 diff --git a/runner/options.go b/runner/options.go index e1cb3041..44bda3b0 100644 --- a/runner/options.go +++ b/runner/options.go @@ -765,10 +765,6 @@ func (options *Options) ValidateOptions() error { return errors.New(fmt.Sprintf("invalid retry-delay: must be >0 when retry-rounds=%d (got %d)", options.RetryRounds, options.RetryDelay)) } - if options.RetryRounds > 0 && options.RetryDelay <= 0 { - return errors.New(fmt.Sprintf("invalid retry-delay: must be >0 when retry-rounds=%d (got %d)", options.RetryRounds, options.RetryDelay)) - } - return nil } diff --git a/runner/runner_test.go b/runner/runner_test.go index 950b97d7..a566356d 100644 --- a/runner/runner_test.go +++ b/runner/runner_test.go @@ -321,6 +321,8 @@ func TestCreateNetworkpolicyInstance_AllowDenyFlags(t *testing.T) { func TestRunner_Process_And_RetryLoop(t *testing.T) { var hits1, hits2 int32 + + // srv1: returns 429 for the first 3 requests, and 200 on the 4th request srv1 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if atomic.AddInt32(&hits1, 1) != 4 { w.WriteHeader(http.StatusTooManyRequests) @@ -330,6 +332,7 @@ func TestRunner_Process_And_RetryLoop(t *testing.T) { })) defer srv1.Close() + // srv2: returns 429 for the first 2 requests, and 200 on the 3rd request srv2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if atomic.AddInt32(&hits2, 1) != 3 { w.WriteHeader(http.StatusTooManyRequests) @@ -366,7 +369,7 @@ func TestRunner_Process_And_RetryLoop(t *testing.T) { var drainWG sync.WaitGroup drainWG.Add(1) var s1n429, s1n200, s2n429, s2n200 int - go func(output chan Result) { + go func() { defer drainWG.Done() for res := range output { switch res.StatusCode { @@ -384,7 +387,7 @@ func TestRunner_Process_And_RetryLoop(t *testing.T) { } } } - }(output) + }() for _, url := range seed { r.process(url, wg, r.hp, httpx.HTTP, so, output, retryCh) @@ -395,8 +398,12 @@ func TestRunner_Process_And_RetryLoop(t *testing.T) { close(output) drainWG.Wait() + // Verify expected results + // srv1: should have 3x 429 responses and no 200 (never succeeds within retries) require.Equal(t, 3, s1n429) require.Equal(t, 0, s1n200) + + // srv2: should have 2x 429 responses and 1x 200 (succeeds on 3rd attempt) require.Equal(t, 2, s2n429) require.Equal(t, 1, s2n200) }