From 8ce187fac6b38ee0e82dc8002a9d611984b9c746 Mon Sep 17 00:00:00 2001 From: Alexandros Kyriakakis Date: Tue, 1 Jul 2025 15:50:41 +0300 Subject: [PATCH] Make Initiator`s inbound channel buffered (#22) We have observed slow consumption of FIX messages in the Initiator. Since the outbound channel is buffered, the event loop is sending too many messages at once but reads incoming messages one by one. This creates an inconsistency between incoming consumption messages and outgoing production messages. In this PR we resolve this issue by making the inbound channel buffered. So the Initiator can now read and write in a truly asynchronous way. This change is feature flagged. Can be used by setting `InitiatorInChanCapacity` with an integer representing the capacity of the inbound channel to the session settings config. --- acceptor.go | 2 +- config/configuration.go | 10 ++++++++++ initiator.go | 2 +- internal/session_settings.go | 1 + session.go | 18 ++++++++++++++++++ session_factory.go | 12 ++++++++++++ 6 files changed, 43 insertions(+), 2 deletions(-) diff --git a/acceptor.go b/acceptor.go index f58ef01f7..83384c5b8 100644 --- a/acceptor.go +++ b/acceptor.go @@ -359,7 +359,7 @@ func (a *Acceptor) handleConnection(netConn net.Conn) { } a.sessionAddr.Store(sessID, netConn.RemoteAddr()) - msgIn := make(chan fixIn) + msgIn := make(chan fixIn, session.InChanCapacity) msgOut := make(chan []byte) if err := session.connect(msgIn, msgOut); err != nil { diff --git a/config/configuration.go b/config/configuration.go index 4b9d322d4..6282bab1e 100644 --- a/config/configuration.go +++ b/config/configuration.go @@ -489,6 +489,16 @@ const ( // Valid Values: // - Any positive integer MaxLatency string = "MaxLatency" + + // InChanCapacity sets the maximum number of messages that can be buffered in the channel for incoming FIX messages. + // + // Required: No + // + // Default: 1 + // + // Valid Values: + // - A positive integer, or zero for an unbuffered channel + InChanCapacity string = "InChanCapacity" ) const ( diff --git a/initiator.go b/initiator.go index 18451477e..9af90cb2b 100644 --- a/initiator.go +++ b/initiator.go @@ -205,7 +205,7 @@ func (i *Initiator) handleConnection(session *session, tlsConfig *tls.Config, di netConn = tlsConn } - msgIn = make(chan fixIn) + msgIn = make(chan fixIn, session.InChanCapacity) msgOut = make(chan []byte) if err := session.connect(msgIn, msgOut); err != nil { session.log.OnEventf("Failed to initiate: %v", err) diff --git a/internal/session_settings.go b/internal/session_settings.go index ef7413ebd..c45134822 100644 --- a/internal/session_settings.go +++ b/internal/session_settings.go @@ -21,6 +21,7 @@ type SessionSettings struct { TimeZone *time.Location ResetSeqTime time.Time EnableResetSeqTime bool + InChanCapacity int // Required on logon for FIX.T.1 messages. DefaultApplVerID string diff --git a/session.go b/session.go index a6d296999..342988f64 100644 --- a/session.go +++ b/session.go @@ -757,6 +757,21 @@ func (s *session) checkBeginString(msg *Message) MessageRejectError { return nil } +func (s *session) drainMessageIn() { + s.log.OnEventf("Draining %d messages from inbound channel...", len(s.messageIn)) + for { + select { + case fixInc, ok := <-s.messageIn: + if !ok { + return + } + s.Incoming(s, fixInc) + default: + return + } + } +} + func (s *session) doReject(msg *Message, rej MessageRejectError) error { reply := msg.reverseRoute() @@ -824,6 +839,9 @@ func (s *session) onDisconnect() { s.messageOut = nil } + // s.messageIn is buffered so we need to drain it before disconnection + s.drainMessageIn() + s.messageIn = nil } diff --git a/session_factory.go b/session_factory.go index 613bb2069..1b79f5ee1 100644 --- a/session_factory.go +++ b/session_factory.go @@ -431,6 +431,18 @@ func (f sessionFactory) newSession( s.DisableMessagePersist = !persistMessages } + if settings.HasSetting(config.InChanCapacity) { + if s.InChanCapacity, err = settings.IntSetting(config.InChanCapacity); err != nil { + return + } else if s.InChanCapacity < 0 { + err = IncorrectFormatForSetting{Setting: config.InChanCapacity, Value: []byte(strconv.Itoa(s.InChanCapacity))} + return + } + } else { + // Default to 1 buffered message per channel + s.InChanCapacity = 1 + } + if f.BuildInitiators { if err = f.buildInitiatorSettings(s, settings); err != nil { return