Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion acceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ func (a *Acceptor) handleConnection(netConn net.Conn) {
}

a.sessionAddr.Store(sessID, netConn.RemoteAddr())
msgIn := make(chan fixIn)
msgIn := make(chan fixIn, session.InChanCapacity)
msgOut := make(chan []byte)

if err := session.connect(msgIn, msgOut); err != nil {
Expand Down
10 changes: 10 additions & 0 deletions config/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,16 @@ const (
// Valid Values:
// - Any positive integer
MaxLatency string = "MaxLatency"

// InChanCapacity sets the maximum number of messages that can be buffered in the channel for incoming FIX messages.
//
// Required: No
//
// Default: 1
//
// Valid Values:
// - A positive integer, or zero for an unbuffered channel
InChanCapacity string = "InChanCapacity"
)

const (
Expand Down
2 changes: 1 addition & 1 deletion initiator.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func (i *Initiator) handleConnection(session *session, tlsConfig *tls.Config, di
netConn = tlsConn
}

msgIn = make(chan fixIn)
msgIn = make(chan fixIn, session.InChanCapacity)
msgOut = make(chan []byte)
if err := session.connect(msgIn, msgOut); err != nil {
session.log.OnEventf("Failed to initiate: %v", err)
Expand Down
1 change: 1 addition & 0 deletions internal/session_settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type SessionSettings struct {
TimeZone *time.Location
ResetSeqTime time.Time
EnableResetSeqTime bool
InChanCapacity int

// Required on logon for FIX.T.1 messages.
DefaultApplVerID string
Expand Down
18 changes: 18 additions & 0 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -757,6 +757,21 @@ func (s *session) checkBeginString(msg *Message) MessageRejectError {
return nil
}

func (s *session) drainMessageIn() {
s.log.OnEventf("Draining %d messages from inbound channel...", len(s.messageIn))
for {
select {
case fixInc, ok := <-s.messageIn:
if !ok {
return
}
s.Incoming(s, fixInc)
default:
return
}
}
}

func (s *session) doReject(msg *Message, rej MessageRejectError) error {
reply := msg.reverseRoute()

Expand Down Expand Up @@ -824,6 +839,9 @@ func (s *session) onDisconnect() {
s.messageOut = nil
}

// s.messageIn is buffered so we need to drain it before disconnection
s.drainMessageIn()

s.messageIn = nil
}

Expand Down
12 changes: 12 additions & 0 deletions session_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,18 @@ func (f sessionFactory) newSession(
s.DisableMessagePersist = !persistMessages
}

if settings.HasSetting(config.InChanCapacity) {
if s.InChanCapacity, err = settings.IntSetting(config.InChanCapacity); err != nil {
return
} else if s.InChanCapacity < 0 {
err = IncorrectFormatForSetting{Setting: config.InChanCapacity, Value: []byte(strconv.Itoa(s.InChanCapacity))}
return
}
} else {
// Default to 1 buffered message per channel
s.InChanCapacity = 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This changes the existing behavior to buffered channel from unbuffered. For those who reset the session on disconnect, there is edge case where they lose the last message in case the application crashes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can add a warning when the new release is published

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

if f.BuildInitiators {
if err = f.buildInitiatorSettings(s, settings); err != nil {
return
Expand Down
Loading