diff --git a/VisualPinball.Engine.Mpf/MpfClient.cs b/VisualPinball.Engine.Mpf/MpfClient.cs index 824523e8..524b9503 100644 --- a/VisualPinball.Engine.Mpf/MpfClient.cs +++ b/VisualPinball.Engine.Mpf/MpfClient.cs @@ -36,9 +36,10 @@ public class MpfClient private MpfHardwareService.MpfHardwareServiceClient _client; private static readonly Logger Logger = LogManager.GetCurrentClassLogger(); - private Thread _commandsThread; - private AsyncServerStreamingCall _commandStream; + private Task _receiveCommandsTask; + private Task _writeSwitchChangeTask; private AsyncClientStreamingCall _switchStream; + private CancellationTokenSource _cts; public void Connect(string serverIpPort = "127.0.0.1:50051") { @@ -55,11 +56,12 @@ public void StartGame(Dictionary initialSwitches, bool handleStrea } Logger.Info("Starting player with machine state: " + ms); - _commandStream = _client.Start(ms); + + _cts = new CancellationTokenSource(); if (handleStream) { - _commandsThread = new Thread(ReceiveCommands) { IsBackground = true }; - _commandsThread.Start(); + AsyncServerStreamingCall commandsStream = _client.Start(ms); + _receiveCommandsTask = Task.Run(() => ReceiveCommands(commandsStream, _cts.Token)); } _switchStream = _client.SendSwitchChanges(); @@ -67,17 +69,19 @@ public void StartGame(Dictionary initialSwitches, bool handleStrea public async Task Switch(string swName, bool swValue) { - await _switchStream.RequestStream.WriteAsync(new SwitchChanges + _writeSwitchChangeTask = _switchStream.RequestStream.WriteAsync(new SwitchChanges {SwitchNumber = swName, SwitchState = swValue}); + await _writeSwitchChangeTask; + _writeSwitchChangeTask = null; } - private async void ReceiveCommands() + private async Task ReceiveCommands(AsyncServerStreamingCall commandsStream, CancellationToken ct) { Logger.Info("Client started, retrieving commands..."); try { - while (await _commandStream.ResponseStream.MoveNext()) { - var commands = _commandStream.ResponseStream.Current; + while (await commandsStream.ResponseStream.MoveNext(ct)) { + var commands = commandsStream.ResponseStream.Current; switch (commands.CommandCase) { case Commands.CommandOneofCase.None: break; @@ -107,9 +111,11 @@ private async void ReceiveCommands() } } } - catch(RpcException e) { - Logger.Error($"Unable to retrieve commands: Status={e.Status}"); + if (!ct.IsCancellationRequested) + Logger.Error($"Unable to retrieve commands: Status={e.Status}"); + } finally { + commandsStream.Dispose(); } } @@ -122,10 +128,21 @@ public MachineDescription GetMachineDescription() public void Shutdown() { Logger.Info("Shutting down..."); - _client.Quit(new QuitRequest()); - _commandStream?.Dispose(); - _channel.ShutdownAsync().Wait(); - Logger.Info("All down."); + if (_channel.State == ChannelState.Ready) + _client.Quit(new QuitRequest()); + _cts?.Cancel(); + _cts?.Dispose(); + _cts = null; + if (!_receiveCommandsTask.Wait(500)) + Logger.Warn("Receive commands task shutdown timed out after 500ms"); + _receiveCommandsTask = null; + if (_writeSwitchChangeTask != null && !_writeSwitchChangeTask.Wait(500)) + Logger.Warn("Write switch change task shutdown timed out after 500ms"); + _writeSwitchChangeTask = null; + _switchStream?.Dispose(); + if (!_channel.ShutdownAsync().Wait(500)) + Logger.Warn("GRPC channel shutdown timed out after 500ms"); + Logger.Info("All down."); } } }