diff --git a/freedata_gui/src/components/dynamic_components.vue b/freedata_gui/src/components/dynamic_components.vue index 092d058de..c966023e7 100644 --- a/freedata_gui/src/components/dynamic_components.vue +++ b/freedata_gui/src/components/dynamic_components.vue @@ -28,6 +28,7 @@ import grid_tune from "./grid/grid_tune.vue"; import grid_CQ_btn from "./grid/grid_CQ.vue"; import grid_ping from "./grid/grid_ping.vue"; import grid_freq from "./grid/grid_frequency.vue"; +import grid_audio from "./grid/grid_audio.vue"; import grid_beacon from "./grid/grid_beacon.vue"; import grid_mycall_small from "./grid/grid_mycall small.vue"; import grid_scatter from "./grid/grid_scatter.vue"; @@ -325,8 +326,19 @@ const gridWidgets = [ 18, false, { x: 16, y: 8, w: 2, h: 8 } + ), + new gridWidget( + grid_audio, + { x: 16, y: 8, w: 4, h: 24 }, + "Audio Stream", + false, + true, + "Audio", + 24, + false, + { x: 16, y: 8, w: 4, h: 24 } ) - //Next new widget ID should be 23 + //Next new widget ID should be 24 ]; function updateFrequencyAndApply(frequency) { diff --git a/freedata_gui/src/components/grid/grid_audio.vue b/freedata_gui/src/components/grid/grid_audio.vue new file mode 100644 index 000000000..0ec7599b7 --- /dev/null +++ b/freedata_gui/src/components/grid/grid_audio.vue @@ -0,0 +1,92 @@ + + + + + + + + {{ $t('grid.components.audiostream') }} + + + + + {{isPlaying ? 'Stop' : 'Start'}} + + + + + + + diff --git a/freedata_gui/src/js/audioStreamHandler.js b/freedata_gui/src/js/audioStreamHandler.js new file mode 100644 index 000000000..dea67dcad --- /dev/null +++ b/freedata_gui/src/js/audioStreamHandler.js @@ -0,0 +1,25 @@ +import { setActivePinia } from "pinia"; +import pinia from "../store/index"; +setActivePinia(pinia); + +import { useAudioStore } from "../store/audioStore.js"; +const audio = useAudioStore(pinia); + +const MAX_BLOCKS = 10; + +export function addDataToAudio(data) { + const int16 = new Int16Array(data); + const copy = new Int16Array(int16); // Kopie für Sicherheit +/* + const stream = audio.rxStream; + + if (stream.length >= MAX_BLOCKS) { + stream.shift(); + } + + stream.push(copy); + */ + audio.addBlock(copy); + + +} diff --git a/freedata_gui/src/js/event_sock.js b/freedata_gui/src/js/event_sock.js index ef03ec746..87d800408 100644 --- a/freedata_gui/src/js/event_sock.js +++ b/freedata_gui/src/js/event_sock.js @@ -5,6 +5,7 @@ import { loadAllData, } from "../js/eventHandler.js"; import { addDataToWaterfall } from "../js/waterfallHandler.js"; +import { addDataToAudio } from "../js/audioStreamHandler.js"; // ----------------- init pinia stores ------------- import { setActivePinia } from "pinia"; @@ -22,6 +23,10 @@ function connect(endpoint, dispatcher) { `${wsProtocol}//${hostname}:${adjustedPort}/${endpoint}`, ); + if (endpoint.includes("audio")){ + socket.binaryType = "arraybuffer"; + } + // handle opening socket.addEventListener("open", function () { console.log(`Connected to the WebSocket server: ${endpoint}`); @@ -56,4 +61,5 @@ export function initConnections() { connect("states", stateDispatcher); connect("events", eventDispatcher); connect("fft", addDataToWaterfall); + connect("audio_rx", addDataToAudio); } diff --git a/freedata_gui/src/js/waterfallHandler.js b/freedata_gui/src/js/waterfallHandler.js index 6ea51afaf..8636b7f40 100644 --- a/freedata_gui/src/js/waterfallHandler.js +++ b/freedata_gui/src/js/waterfallHandler.js @@ -28,6 +28,7 @@ export function addDataToWaterfall(data) { }); //window.dispatchEvent(new CustomEvent("wf-data-avail", {bubbles:true, detail: data })); } + /** * Setwaterfall colormap array by index * @param {number} index colormap index to use diff --git a/freedata_gui/src/locales/de_Deutsch.json b/freedata_gui/src/locales/de_Deutsch.json index 6fa80e020..3859ee294 100644 --- a/freedata_gui/src/locales/de_Deutsch.json +++ b/freedata_gui/src/locales/de_Deutsch.json @@ -86,6 +86,7 @@ "downloadpreset": "Einstell. Herunterladen", "downloadpreset_help": "Lade die GUI-Einstellungen herunter um sie zu speichern und zu teilen", "components": { + "audiostream": "Audio Stream", "tune": "Tune", "stop_help": "Sitzung abbrechen und Aussendung beenden", "transmissioncharts": "Übertragungs-Diagramme", diff --git a/freedata_gui/src/locales/en_English.json b/freedata_gui/src/locales/en_English.json index 7ac7e7704..5d888986c 100644 --- a/freedata_gui/src/locales/en_English.json +++ b/freedata_gui/src/locales/en_English.json @@ -86,6 +86,7 @@ "downloadpreset": "Download Preset", "downloadpreset_help": "Download preset file for sharing or saving", "components": { + "audiostream": "Audio Stream", "tune": "Tune", "stop_help": "Abort session and stop transmissions", "transmissioncharts": "Transmission charts", diff --git a/freedata_gui/src/store/audioStore.js b/freedata_gui/src/store/audioStore.js index cc62888bc..6a94f80e8 100644 --- a/freedata_gui/src/store/audioStore.js +++ b/freedata_gui/src/store/audioStore.js @@ -15,6 +15,48 @@ const skel = [ export const useAudioStore = defineStore("audioStore", () => { const audioInputs = ref([]); const audioOutputs = ref([]); + const rxStream = ref([]); + + const BUFFER_SIZE = 1024; + const rxStreamBuffer = new Array(BUFFER_SIZE).fill(null); + + let writePtr = 0; + let readPtr = 0; + let readyBlocks = 0; + + function addBlock(block) { + rxStreamBuffer[writePtr] = block; + writePtr = (writePtr + 1) % BUFFER_SIZE; + + if (readyBlocks < BUFFER_SIZE) { + readyBlocks++; + } else { + readPtr = (readPtr + 1) % BUFFER_SIZE; + } + } + + function getNextBlock() { + if (readyBlocks === 0) return null; + + const block = rxStreamBuffer[readPtr]; + readPtr = (readPtr + 1) % BUFFER_SIZE; + readyBlocks--; + return block; + } + + function resetBuffer() { + writePtr = 0; + readPtr = 0; + readyBlocks = 0; + for (let i = 0; i < BUFFER_SIZE; i++) { + rxStreamBuffer[i] = null; + } + } + + + + + const loadAudioDevices = async () => { try { @@ -35,5 +77,13 @@ export const useAudioStore = defineStore("audioStore", () => { audioInputs, audioOutputs, loadAudioDevices, + rxStream, + addBlock, + getNextBlock, + resetBuffer, + get bufferedBlockCount() { + return readyBlocks; + }, + }; }); diff --git a/freedata_server/api/websocket.py b/freedata_server/api/websocket.py index 2bd41e372..36b4d3cfc 100644 --- a/freedata_server/api/websocket.py +++ b/freedata_server/api/websocket.py @@ -35,3 +35,20 @@ async def websocket_states(websocket: WebSocket, ctx: AppContext = Depends(get_c await ctx.websocket_manager.handle_connection( websocket, ctx.websocket_manager.states_client_list, ctx.state_queue ) + +@router.websocket("/audio_rx") +async def websocket_audio_rx( + websocket: WebSocket, + ctx: AppContext = Depends(get_ctx) +): + """ + WebSocket endpoint for state updates. + """ + await websocket.accept() + await ctx.websocket_manager.handle_connection( + websocket, + ctx.websocket_manager.audio_rx_client_list, + ctx.state_queue + ) + #while True: + # await websocket.send_bytes(b"\x00" * 1024) diff --git a/freedata_server/context.py b/freedata_server/context.py index 971aaca8f..001658169 100644 --- a/freedata_server/context.py +++ b/freedata_server/context.py @@ -21,6 +21,8 @@ def __init__(self, config_file: str): self.modem_events = Queue() self.modem_fft = Queue() self.modem_service = Queue() + self.audio_rx_queue = Queue(maxsize=10) + self.event_manager = EventManager(self, [self.modem_events]) self.state_manager = StateManager(self.state_queue) self.schedule_manager = ScheduleManager(self) diff --git a/freedata_server/modem.py b/freedata_server/modem.py index 8e0acd4f0..6889154e9 100644 --- a/freedata_server/modem.py +++ b/freedata_server/modem.py @@ -67,6 +67,7 @@ def __init__(self, ctx) -> None: self.MODE = 0 self.rms_counter = 0 + self.AUDIO_STREAMING_CHUNK_SIZE = 2400 self.audio_out_queue = queue.Queue() # Make sure our resampler will work @@ -349,6 +350,24 @@ def enqueue_audio_out(self, audio_48k) -> None: return + def enqueue_streaming_audio_chunks(self, audio_block, queue): + #total_samples = len(audio_block) + #for start in range(0, total_samples, self.AUDIO_STREAMING_CHUNK_SIZE): + # end = start + self.AUDIO_STREAMING_CHUNK_SIZE + # chunk = audio_block[start:end] + # queue.put(chunk.tobytes()) + + block_size = self.AUDIO_STREAMING_CHUNK_SIZE + + pad_length = -len(audio_block) % block_size + padded_data = np.pad(audio_block, (0, pad_length), mode='constant') + sliced_audio_data = padded_data.reshape(-1, block_size) + # add each block to audio out queue + for block in sliced_audio_data: + queue.put(block) + + + def sd_output_audio_callback(self, outdata: np.ndarray, frames: int, time, status) -> None: """Callback function for the audio output stream. @@ -372,6 +391,8 @@ def sd_output_audio_callback(self, outdata: np.ndarray, frames: int, time, statu audio.calculate_fft(audio_8k, self.ctx.modem_fft, self.ctx.state_manager) outdata[:] = chunk.reshape(outdata.shape) + + else: # reset transmitting state only, if we are not actively processing audio # for avoiding a ptt toggle state bug @@ -407,7 +428,10 @@ def sd_input_audio_callback(self, indata: np.ndarray, frames: int, time, status) try: audio_48k = np.frombuffer(indata, dtype=np.int16) audio_8k = self.resampler.resample48_to_8(audio_48k) - if self.ctx.config_manager.config["AUDIO"].get("rx_auto_audio_level"): + + self.enqueue_streaming_audio_chunks(audio_8k, self.ctx.audio_rx_queue) + + if self.ctx.config_manager.config['AUDIO'].get('rx_auto_audio_level'): audio_8k = audio.normalize_audio(audio_8k) audio_8k_level_adjusted = audio.set_audio_volume(audio_8k, self.rx_audio_level) diff --git a/freedata_server/websocket_manager.py b/freedata_server/websocket_manager.py index 328486ffb..ff15935ed 100644 --- a/freedata_server/websocket_manager.py +++ b/freedata_server/websocket_manager.py @@ -1,6 +1,9 @@ import threading import json import asyncio +from asyncio import run_coroutine_threadsafe + +import numpy as np import structlog @@ -28,11 +31,13 @@ def __init__(self, ctx): self.events_client_list = set() self.fft_client_list = set() self.states_client_list = set() + self.audio_rx_client_list = set() self.events_thread = None self.states_thread = None self.fft_thread = None - + self.audio_rx_thread = None + async def handle_connection(self, websocket, client_list, event_queue): """Handles a WebSocket connection. @@ -46,6 +51,7 @@ async def handle_connection(self, websocket, client_list, event_queue): event_queue (queue.Queue): The event queue. Currently unused. """ client_list.add(websocket) + self.log.info(f"Client websocket connection established", ws=websocket) while not self.shutdown_flag.is_set(): try: await websocket.receive_text() @@ -71,7 +77,6 @@ def transmit_sock_data_worker(self, client_list, event_queue): while not self.shutdown_flag.is_set(): try: event = event_queue.get(timeout=1) - if event: json_event = json.dumps(event) clients = client_list.copy() @@ -83,6 +88,34 @@ def transmit_sock_data_worker(self, client_list, event_queue): except Exception: continue + def transmit_sock_audio_worker(self, client_list, audio_queue): + """Worker thread function for transmitting data to WebSocket clients. + + This method continuously retrieves events from the provided queue and + sends them as JSON strings to all connected clients in the specified + list. It handles client disconnections gracefully. + + Args: + client_list (set): The set of connected WebSocket clients. + event_queue (queue.Queue): The queue containing events to be transmitted. + """ + while not self.shutdown_flag.is_set(): + #loop = asyncio.get_event_loop() + try: + audio = audio_queue.get(timeout=1) + if isinstance(audio, np.ndarray): + audio = audio.tobytes() + clients = client_list.copy() + for client in clients: + try: + asyncio.run(client.send_bytes(audio)) + + except Exception: + client_list.remove(client) + except Exception: + continue + + def startWorkerThreads(self, app): """Starts worker threads for handling WebSocket data transmission. @@ -115,6 +148,9 @@ def startWorkerThreads(self, app): ) self.fft_thread.start() + self.audio_rx_thread = threading.Thread(target=self.transmit_sock_audio_worker, daemon=True, args=(self.audio_rx_client_list, self.ctx.audio_rx_queue)) + self.audio_rx_thread.start() + def shutdown(self): """Shuts down the WebSocket manager. @@ -128,6 +164,8 @@ def shutdown(self): self.events_thread.join(0.5) if self.states_thread: self.states_thread.join(0.5) - if self.states_thread: + if self.fft_thread: self.fft_thread.join(0.5) + if self.audio_rx_thread: + self.audio_rx_thread.join(0.5) self.log.warning("[SHUTDOWN] websockets closed")