Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion session.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type session struct {
stateTimer *internal.EventTimer
peerTimer *internal.EventTimer
sentReset bool
stopCh chan struct{}
stopOnce sync.Once

targetDefaultApplVerID string
Expand Down Expand Up @@ -98,6 +99,9 @@ type stopReq struct{}
func (s *session) stop() {
// Stop once.
s.stopOnce.Do(func() {
if s.stopCh != nil {
close(s.stopCh)
}
s.admin <- stopReq{}
})
}
Expand Down Expand Up @@ -596,7 +600,13 @@ func (s *session) initiateLogoutInReplyTo(reason string, inReplyTo *Message) (er
return
}
s.log.OnEvent("Inititated logout request")
time.AfterFunc(s.LogoutTimeout, func() { s.sessionEvent <- internal.LogoutTimeout })
time.AfterFunc(s.LogoutTimeout, func() {
select {
case <-s.stopCh:
return
case s.sessionEvent <- internal.LogoutTimeout:
}
})
return
}

Expand Down
1 change: 1 addition & 0 deletions session_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ func (f sessionFactory) newSession(
s = &session{
sessionID: sessionID,
stopOnce: sync.Once{},
stopCh: make(chan struct{}),
}

var validatorSettings = defaultValidatorSettings
Expand Down
87 changes: 87 additions & 0 deletions session_leak_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package quickfix

import (
"bytes"
"runtime/pprof"
"strings"
"testing"
"time"

"github.com/quickfixgo/quickfix/internal"
)

type testLog struct{}

func (testLog) OnIncoming([]byte) {}
func (testLog) OnOutgoing([]byte) {}
func (testLog) OnEvent(string) {}
func (testLog) OnEventf(string, ...interface{}) {}

// testApp is a no-op Application for tests.
type testApp struct{}

func (testApp) OnCreate(SessionID) {}
func (testApp) OnLogon(SessionID) {}
func (testApp) OnLogout(SessionID) {}
func (testApp) ToAdmin(*Message, SessionID) {}
func (testApp) FromAdmin(*Message, SessionID) MessageRejectError { return nil }
func (testApp) ToApp(*Message, SessionID) error { return nil }
func (testApp) FromApp(*Message, SessionID) MessageRejectError { return nil }

func newTimerOnlySession() *session {
tr, _ := internal.NewUTCTimeRange(internal.NewTimeOfDay(0, 0, 0), internal.NewTimeOfDay(23, 59, 59), nil)
s := &session{
store: &memoryStore{},
log: testLog{},
sessionID: SessionID{BeginString: BeginStringFIX44, SenderCompID: "S", TargetCompID: "T"},
messageOut: make(chan []byte, 2),
messageIn: make(chan fixIn),
sessionEvent: make(chan internal.Event),
messageEvent: make(chan bool),
application: testApp{},
stopCh: make(chan struct{}),
SessionSettings: internal.SessionSettings{
SessionTime: tr,
},
}
return s
}

func countGoroutinesContaining(substr string) int {
var buf bytes.Buffer
_ = pprof.Lookup("goroutine").WriteTo(&buf, 2)
return strings.Count(buf.String(), substr)
}

func TestLogonTimeoutDoesNotLeakGoroutine(t *testing.T) {
s := newTimerOnlySession()
s.InitiateLogon = true
s.LogonTimeout = 10 * time.Millisecond

baseline := countGoroutinesContaining("stateMachine).Connect.func1")

s.stateMachine.Connect(s)
close(s.stopCh)
time.Sleep(4 * s.LogonTimeout)

if got := countGoroutinesContaining("stateMachine).Connect.func1"); got > baseline {
t.Fatalf("logon timeout goroutine leaked: baseline=%d current=%d", baseline, got)
}
}

func TestLogoutTimeoutDoesNotLeakGoroutine(t *testing.T) {
s := newTimerOnlySession()
s.LogoutTimeout = 10 * time.Millisecond
baseline := countGoroutinesContaining("initiateLogoutInReplyTo")
s.stateMachine.Connect(s)

if err := s.initiateLogout("bye"); err != nil {
t.Fatalf("initiateLogout returned error: %v", err)
}
close(s.stopCh)
time.Sleep(4 * s.LogoutTimeout)

if got := countGoroutinesContaining("initiateLogoutInReplyTo"); got > baseline {
t.Fatalf("logout timeout goroutine leaked: baseline=%d current=%d", baseline, got)
}
}
8 changes: 7 additions & 1 deletion session_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,13 @@ func (sm *stateMachine) Connect(session *session) {

sm.setState(session, logonState{})
// Fire logon timeout event after the pre-configured delay period.
time.AfterFunc(session.LogonTimeout, func() { session.sessionEvent <- internal.LogonTimeout })
time.AfterFunc(session.LogonTimeout, func() {
select {
case <-session.stopCh:
return
case session.sessionEvent <- internal.LogonTimeout:
}
})
}

func (sm *stateMachine) Stop(session *session) {
Expand Down
Loading