diff --git a/meshtastic/__main__.py b/meshtastic/__main__.py index 168540025..4fde69c6c 100644 --- a/meshtastic/__main__.py +++ b/meshtastic/__main__.py @@ -1085,6 +1085,11 @@ def setSimpleConfig(modem_preset): print(f"Waiting {args.wait_to_disconnect} seconds before disconnecting") time.sleep(int(args.wait_to_disconnect)) + if args.sensor_config: + closeNow = True + waitForAckNak = True + interface.getNode(args.dest, False, **getNode_kwargs).sensorConfig(args.sensor_config) + # if the user didn't ask for serial debugging output, we might want to exit after we've done our operation if (not args.seriallog) and closeNow: interface.close() # after running command then exit @@ -1983,6 +1988,14 @@ def addRemoteAdminArgs(parser: argparse.ArgumentParser) -> argparse.ArgumentPars metavar="TIMESTAMP", ) + group.add_argument( + "--sensor-config", + help="Send a sensor admin command to configure sensor parameters.", + action="store", + nargs='+', + default=None + ) + return parser def initParser(): diff --git a/meshtastic/node.py b/meshtastic/node.py index afb5611ac..0836a6e01 100644 --- a/meshtastic/node.py +++ b/meshtastic/node.py @@ -977,6 +977,102 @@ def onAckNak(self, p): print(f"Received an ACK.") self.iface._acknowledgment.receivedAck = True + def sensorConfig(self, commands: List = None): + """Send a sensor configuration command""" + self.ensureSessionKey() + + p = admin_pb2.AdminMessage() + if any(['scd4x_config' in command for command in commands]): + cleanup_commands = [command.replace('scd4x_config.', '') for command in commands] + + if 'factory_reset' in cleanup_commands: + print ("Performing factory reset on SCD4X") + p.sensor_config.scd4x_config.factory_reset = True + else: + if 'set_asc' in cleanup_commands: + if cleanup_commands[cleanup_commands.index('set_asc')+1] == "true": + p.sensor_config.scd4x_config.set_asc = True + print ("Setting SCD4X ASC mode") + elif cleanup_commands[cleanup_commands.index('set_asc')+1] == "false": + p.sensor_config.scd4x_config.set_asc = False + print ("Setting SCD4X FRC mode") + else: + print( + f'Not valid argument for sensor_config.scd4x_config.set_asc' + ) + if 'set_target_co2_conc' in cleanup_commands: + try: + target_co2_conc = int(cleanup_commands[cleanup_commands.index('set_target_co2_conc')+1]) + except ValueError: + print( + f'Invalid value for target CO2 conc' + ) + return + else: + print (f"Setting SCD4X target CO2 conc to {target_co2_conc}") + p.sensor_config.scd4x_config.set_target_co2_conc = target_co2_conc + send_command = True + if 'set_temperature' in cleanup_commands: + try: + temperature = float(cleanup_commands[cleanup_commands.index('set_temperature')+1]) + except ValueError: + print( + f'Invalid value for reference temperature' + ) + return + else: + print (f"Setting SCD4X Reference temperature to {temperature}") + p.sensor_config.scd4x_config.set_temperature = temperature + send_command = True + if 'set_altitude' in cleanup_commands: + try: + altitude = int(cleanup_commands[cleanup_commands.index('set_altitude')+1]) + except ValueError: + print( + f'Invalid value for reference altitude' + ) + return + else: + print (f"Setting SCD4X Reference altitude to {altitude}") + p.sensor_config.scd4x_config.set_altitude = altitude + if 'set_ambient_pressure' in cleanup_commands: + try: + ambient_pressure = int(cleanup_commands[cleanup_commands.index('set_ambient_pressure')+1]) + except ValueError: + print( + f'Invalid value for reference ambient pressure' + ) + return + else: + print (f"Setting SCD4X Reference ambient pressure to {ambient_pressure}") + p.sensor_config.scd4x_config.set_ambient_pressure = ambient_pressure + + if any(['sen5x_config' in command for command in commands]): + cleanup_commands = [command.replace('sen5x_config.', '') for command in commands] + if 'set_one_shot_mode' in cleanup_commands: + if cleanup_commands[cleanup_commands.index('set_one_shot_mode')+1] == "true": + p.sensor_config.sen5x_config.set_one_shot_mode = True + print ("Setting SEN5X one shot mode") + elif cleanup_commands[cleanup_commands.index('set_one_shot_mode')+1] == "false": + p.sensor_config.sen5x_config.set_one_shot_mode = False + print ("Setting SEN5X continuous mode") + else: + print( + f'Not valid argument for sensor_config.sen5x_config.set_one_shot_mode' + ) + + # How to represent a HANDLED event? + if self == self.iface.localNode: + onResponse = None + else: + onResponse = self.onAckNak + + if p.ByteSize(): + # TODO - Should this require a response? + return self._sendAdmin(p, onResponse=onResponse) + else: + print ('Nothing to request') + def _requestChannel(self, channelNum: int): """Done with initial config messages, now send regular MeshPackets to ask for settings"""