From 1169fde24bf272dac7817d98ad3c09e8006d81f5 Mon Sep 17 00:00:00 2001 From: arthurkehrwald <50906979+arthurkehrwald@users.noreply.github.com> Date: Tue, 12 Nov 2024 16:02:12 +0100 Subject: [PATCH 1/2] Fix shutdown --- VisualPinball.Engine.Mpf/MpfClient.cs | 40 ++++++++++++++++++--------- 1 file changed, 27 insertions(+), 13 deletions(-) diff --git a/VisualPinball.Engine.Mpf/MpfClient.cs b/VisualPinball.Engine.Mpf/MpfClient.cs index 824523e8..0cee1c03 100644 --- a/VisualPinball.Engine.Mpf/MpfClient.cs +++ b/VisualPinball.Engine.Mpf/MpfClient.cs @@ -36,9 +36,10 @@ public class MpfClient private MpfHardwareService.MpfHardwareServiceClient _client; private static readonly Logger Logger = LogManager.GetCurrentClassLogger(); - private Thread _commandsThread; - private AsyncServerStreamingCall _commandStream; + private Task _receiveCommandsTask; + private Task _writeSwitchChangeTask; private AsyncClientStreamingCall _switchStream; + private CancellationTokenSource _cts; public void Connect(string serverIpPort = "127.0.0.1:50051") { @@ -55,11 +56,12 @@ public void StartGame(Dictionary initialSwitches, bool handleStrea } Logger.Info("Starting player with machine state: " + ms); - _commandStream = _client.Start(ms); + + _cts = new CancellationTokenSource(); if (handleStream) { - _commandsThread = new Thread(ReceiveCommands) { IsBackground = true }; - _commandsThread.Start(); + AsyncServerStreamingCall commandsStream = _client.Start(ms); + _receiveCommandsTask = ReceiveCommands(commandsStream, _cts.Token); } _switchStream = _client.SendSwitchChanges(); @@ -67,17 +69,19 @@ public void StartGame(Dictionary initialSwitches, bool handleStrea public async Task Switch(string swName, bool swValue) { - await _switchStream.RequestStream.WriteAsync(new SwitchChanges + _writeSwitchChangeTask = _switchStream.RequestStream.WriteAsync(new SwitchChanges {SwitchNumber = swName, SwitchState = swValue}); + await _writeSwitchChangeTask; + _writeSwitchChangeTask = null; } - private async void ReceiveCommands() + private async Task ReceiveCommands(AsyncServerStreamingCall commandsStream, CancellationToken ct) { Logger.Info("Client started, retrieving commands..."); try { - while (await _commandStream.ResponseStream.MoveNext()) { - var commands = _commandStream.ResponseStream.Current; + while (await commandsStream.ResponseStream.MoveNext(ct)) { + var commands = commandsStream.ResponseStream.Current; switch (commands.CommandCase) { case Commands.CommandOneofCase.None: break; @@ -107,9 +111,11 @@ private async void ReceiveCommands() } } } - catch(RpcException e) { - Logger.Error($"Unable to retrieve commands: Status={e.Status}"); + if (!ct.IsCancellationRequested) + Logger.Error($"Unable to retrieve commands: Status={e.Status}"); + } finally { + commandsStream.Dispose(); } } @@ -122,8 +128,16 @@ public MachineDescription GetMachineDescription() public void Shutdown() { Logger.Info("Shutting down..."); - _client.Quit(new QuitRequest()); - _commandStream?.Dispose(); + if (_channel.State == ChannelState.Ready) + _client.Quit(new QuitRequest()); + _cts?.Cancel(); + _cts?.Dispose(); + _cts = null; + _receiveCommandsTask.Wait(); + _receiveCommandsTask = null; + _writeSwitchChangeTask?.Wait(); + _writeSwitchChangeTask = null; + _switchStream?.Dispose(); _channel.ShutdownAsync().Wait(); Logger.Info("All down."); } From 60149b334b96b09d5540a6353bc15275b66edfe1 Mon Sep 17 00:00:00 2001 From: arthurkehrwald <50906979+arthurkehrwald@users.noreply.github.com> Date: Tue, 12 Nov 2024 23:43:59 +0100 Subject: [PATCH 2/2] Add timeouts and receive on thread pool --- VisualPinball.Engine.Mpf/MpfClient.cs | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/VisualPinball.Engine.Mpf/MpfClient.cs b/VisualPinball.Engine.Mpf/MpfClient.cs index 0cee1c03..524b9503 100644 --- a/VisualPinball.Engine.Mpf/MpfClient.cs +++ b/VisualPinball.Engine.Mpf/MpfClient.cs @@ -61,7 +61,7 @@ public void StartGame(Dictionary initialSwitches, bool handleStrea if (handleStream) { AsyncServerStreamingCall commandsStream = _client.Start(ms); - _receiveCommandsTask = ReceiveCommands(commandsStream, _cts.Token); + _receiveCommandsTask = Task.Run(() => ReceiveCommands(commandsStream, _cts.Token)); } _switchStream = _client.SendSwitchChanges(); @@ -133,13 +133,16 @@ public void Shutdown() _cts?.Cancel(); _cts?.Dispose(); _cts = null; - _receiveCommandsTask.Wait(); + if (!_receiveCommandsTask.Wait(500)) + Logger.Warn("Receive commands task shutdown timed out after 500ms"); _receiveCommandsTask = null; - _writeSwitchChangeTask?.Wait(); - _writeSwitchChangeTask = null; + if (_writeSwitchChangeTask != null && !_writeSwitchChangeTask.Wait(500)) + Logger.Warn("Write switch change task shutdown timed out after 500ms"); + _writeSwitchChangeTask = null; _switchStream?.Dispose(); - _channel.ShutdownAsync().Wait(); - Logger.Info("All down."); + if (!_channel.ShutdownAsync().Wait(500)) + Logger.Warn("GRPC channel shutdown timed out after 500ms"); + Logger.Info("All down."); } } }