Skip to content

Commit cbb56b4

Browse files
committed
Backport SDL audio features from ESDL project.
Not making these public yet.
1 parent 4385a4d commit cbb56b4

File tree

4 files changed

+289
-0
lines changed

4 files changed

+289
-0
lines changed

tcod/sdl/__init__.py

Whitespace-only changes.

tcod/sdl/audio.py

Lines changed: 217 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,217 @@
1+
from __future__ import annotations
2+
3+
import sys
4+
import threading
5+
import time
6+
import weakref
7+
from typing import Any, Iterator, List, Optional
8+
9+
import numpy as np
10+
from numpy.typing import ArrayLike, DTypeLike, NDArray
11+
12+
import tcod.sdl.sys
13+
from tcod.loader import ffi, lib
14+
15+
16+
def _get_format(format: DTypeLike) -> int:
17+
"""Return a SDL_AudioFormat bitfield from a NumPy dtype."""
18+
dt: Any = np.dtype(format)
19+
assert dt.fields is None
20+
bitsize = dt.itemsize * 8
21+
assert 0 < bitsize <= lib.SDL_AUDIO_MASK_BITSIZE
22+
assert dt.str[1] in "uif"
23+
is_signed = dt.str[1] != "u"
24+
is_float = dt.str[1] == "f"
25+
byteorder = dt.byteorder
26+
if byteorder == "=":
27+
byteorder = "<" if sys.byteorder == "little" else ">"
28+
29+
return ( # type: ignore
30+
bitsize
31+
| (lib.SDL_AUDIO_MASK_DATATYPE * is_float)
32+
| (lib.SDL_AUDIO_MASK_ENDIAN * (byteorder == ">"))
33+
| (lib.SDL_AUDIO_MASK_SIGNED * is_signed)
34+
)
35+
36+
37+
def _dtype_from_format(format: int) -> np.dtype[Any]:
38+
"""Return a dtype from a SDL_AudioFormat."""
39+
bitsize = format & lib.SDL_AUDIO_MASK_BITSIZE
40+
assert bitsize % 8 == 0
41+
bytesize = bitsize // 8
42+
byteorder = ">" if format & lib.SDL_AUDIO_MASK_ENDIAN else "<"
43+
if format & lib.SDL_AUDIO_MASK_DATATYPE:
44+
kind = "f"
45+
elif format & lib.SDL_AUDIO_MASK_SIGNED:
46+
kind = "i"
47+
else:
48+
kind = "u"
49+
return np.dtype(f"{byteorder}{kind}{bytesize}")
50+
51+
52+
class AudioDevice:
53+
def __init__(
54+
self,
55+
device: Optional[str] = None,
56+
capture: bool = False,
57+
*,
58+
frequency: int = 44100,
59+
format: DTypeLike = np.float32,
60+
channels: int = 2,
61+
samples: int = 0,
62+
allowed_changes: int = 0,
63+
):
64+
self.__sdl_subsystems = tcod.sdl.sys._ScopeInit(tcod.sdl.sys.Subsystem.AUDIO)
65+
self.__handle = ffi.new_handle(weakref.ref(self))
66+
desired = ffi.new(
67+
"SDL_AudioSpec*",
68+
{
69+
"freq": frequency,
70+
"format": _get_format(format),
71+
"channels": channels,
72+
"samples": samples,
73+
"callback": ffi.NULL,
74+
"userdata": self.__handle,
75+
},
76+
)
77+
obtained = ffi.new("SDL_AudioSpec*")
78+
self.device_id = lib.SDL_OpenAudioDevice(
79+
ffi.NULL if device is None else device.encode("utf-8"),
80+
capture,
81+
desired,
82+
obtained,
83+
allowed_changes,
84+
)
85+
assert self.device_id != 0, tcod.sdl.sys._get_error()
86+
self.frequency = obtained.freq
87+
self.is_capture = capture
88+
self.format = _dtype_from_format(obtained.format)
89+
self.channels = int(obtained.channels)
90+
self.silence = int(obtained.silence)
91+
self.samples = int(obtained.samples)
92+
self.buffer_size = int(obtained.size)
93+
self.unpause()
94+
95+
@property
96+
def _sample_size(self) -> int:
97+
return self.format.itemsize * self.channels
98+
99+
def pause(self) -> None:
100+
lib.SDL_PauseAudioDevice(self.device_id, True)
101+
102+
def unpause(self) -> None:
103+
lib.SDL_PauseAudioDevice(self.device_id, False)
104+
105+
def _verify_array_format(self, samples: NDArray[Any]) -> NDArray[Any]:
106+
if samples.dtype != self.format:
107+
raise TypeError(f"Expected an array of dtype {self.format}, got {samples.dtype} instead.")
108+
return samples
109+
110+
def _convert_array(self, samples_: ArrayLike) -> NDArray[Any]:
111+
if isinstance(samples_, np.ndarray):
112+
samples_ = self._verify_array_format(samples_)
113+
samples: NDArray[Any] = np.asarray(samples_, dtype=self.format)
114+
if len(samples.shape) < 2:
115+
samples = samples[:, np.newaxis]
116+
return np.ascontiguousarray(np.broadcast_to(samples, (samples.shape[0], self.channels)), dtype=self.format)
117+
118+
@property
119+
def queued_audio_bytes(self) -> int:
120+
return int(lib.SDL_GetQueuedAudioSize(self.device_id))
121+
122+
def queue_audio(self, samples: ArrayLike) -> None:
123+
assert not self.is_capture
124+
samples = self._convert_array(samples)
125+
buffer = ffi.from_buffer(samples)
126+
lib.SDL_QueueAudio(self.device_id, buffer, len(buffer))
127+
128+
def dequeue_audio(self) -> NDArray[Any]:
129+
assert self.is_capture
130+
out_samples = self.queued_audio_bytes // self._sample_size
131+
out = np.empty((out_samples, self.channels), self.format)
132+
buffer = ffi.from_buffer(out)
133+
bytes_returned = lib.SDL_DequeueAudio(self.device_id, buffer, len(buffer))
134+
samples_returned = bytes_returned // self._sample_size
135+
assert samples_returned == out_samples
136+
return out
137+
138+
def __del__(self) -> None:
139+
self.close()
140+
141+
def close(self) -> None:
142+
if not self.device_id:
143+
return
144+
lib.SDL_CloseAudioDevice(self.device_id)
145+
self.device_id = 0
146+
147+
@staticmethod
148+
def __default_callback(stream: NDArray[Any], silence: int) -> None:
149+
stream[...] = silence
150+
151+
152+
class Mixer(threading.Thread):
153+
def __init__(self, device: AudioDevice):
154+
super().__init__(daemon=True)
155+
self.device = device
156+
self.device.unpause()
157+
self.start()
158+
159+
def run(self) -> None:
160+
buffer = np.full((self.device.samples, self.device.channels), self.device.silence, dtype=self.device.format)
161+
while True:
162+
time.sleep(0.001)
163+
if self.device.queued_audio_bytes == 0:
164+
self.on_stream(buffer)
165+
self.device.queue_audio(buffer)
166+
buffer[:] = self.device.silence
167+
168+
def on_stream(self, stream: NDArray[Any]) -> None:
169+
pass
170+
171+
172+
class BasicMixer(Mixer):
173+
def __init__(self, device: AudioDevice):
174+
super().__init__(device)
175+
self.play_buffers: List[List[NDArray[Any]]] = []
176+
177+
def play(self, sound: ArrayLike) -> None:
178+
array = np.asarray(sound, dtype=self.device.format)
179+
assert array.size
180+
if len(array.shape) == 1:
181+
array = array[:, np.newaxis]
182+
chunks: List[NDArray[Any]] = np.split(array, range(0, len(array), self.device.samples)[1:])[::-1]
183+
self.play_buffers.append(chunks)
184+
185+
def on_stream(self, stream: NDArray[Any]) -> None:
186+
super().on_stream(stream)
187+
for chunks in self.play_buffers:
188+
chunk = chunks.pop()
189+
stream[: len(chunk)] += chunk
190+
191+
self.play_buffers = [chunks for chunks in self.play_buffers if chunks]
192+
193+
194+
@ffi.def_extern() # type: ignore
195+
def _sdl_audio_callback(userdata: Any, stream: Any, length: int) -> None:
196+
"""Handle audio device callbacks."""
197+
device: Optional[AudioDevice] = ffi.from_handle(userdata)()
198+
assert device is not None
199+
_ = np.frombuffer(ffi.buffer(stream, length), dtype=device.format).reshape(-1, device.channels)
200+
201+
202+
def _get_devices(capture: bool) -> Iterator[str]:
203+
"""Get audio devices from SDL_GetAudioDeviceName."""
204+
with tcod.sdl.sys._ScopeInit(tcod.sdl.sys.Subsystem.AUDIO):
205+
device_count = lib.SDL_GetNumAudioDevices(capture)
206+
for i in range(device_count):
207+
yield str(ffi.string(lib.SDL_GetAudioDeviceName(i, capture)), encoding="utf-8")
208+
209+
210+
def get_devices() -> Iterator[str]:
211+
"""Iterate over the available audio output devices."""
212+
yield from _get_devices(capture=False)
213+
214+
215+
def get_capture_devices() -> Iterator[str]:
216+
"""Iterate over the available audio capture devices."""
217+
yield from _get_devices(capture=True)

tcod/sdl/sys.py

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
from __future__ import annotations
2+
3+
import enum
4+
from typing import Any, Tuple
5+
6+
from tcod.loader import ffi, lib
7+
8+
9+
class Subsystem(enum.IntFlag):
10+
TIMER = lib.SDL_INIT_TIMER
11+
AUDIO = lib.SDL_INIT_AUDIO
12+
VIDEO = lib.SDL_INIT_VIDEO
13+
JOYSTICK = lib.SDL_INIT_JOYSTICK
14+
HAPTIC = lib.SDL_INIT_HAPTIC
15+
GAMECONTROLLER = lib.SDL_INIT_GAMECONTROLLER
16+
EVENTS = lib.SDL_INIT_EVENTS
17+
SENSOR = getattr(lib, "SDL_INIT_SENSOR", 0)
18+
EVERYTHING = lib.SDL_INIT_EVERYTHING
19+
20+
21+
def _check(result: int) -> int:
22+
if result < 0:
23+
raise RuntimeError(_get_error())
24+
return result
25+
26+
27+
def init(flags: int = Subsystem.EVERYTHING) -> None:
28+
_check(lib.SDL_InitSubSystem(flags))
29+
30+
31+
def quit(flags: int = Subsystem.EVERYTHING) -> None:
32+
lib.SDL_QuitSubSystem(flags)
33+
34+
35+
class _ScopeInit:
36+
def __init__(self, flags: int) -> None:
37+
init(flags)
38+
self.flags = flags
39+
40+
def close(self) -> None:
41+
if self.flags:
42+
quit(self.flags)
43+
self.flags = 0
44+
45+
def __del__(self) -> None:
46+
self.close()
47+
48+
def __enter__(self) -> _ScopeInit:
49+
return self
50+
51+
def __exit__(self, *args: Any) -> None:
52+
self.close()
53+
54+
55+
def _get_error() -> str:
56+
return str(ffi.string(lib.SDL_GetError()), encoding="utf-8")
57+
58+
59+
class _PowerState(enum.IntEnum):
60+
UNKNOWN = lib.SDL_POWERSTATE_UNKNOWN
61+
ON_BATTERY = lib.SDL_POWERSTATE_ON_BATTERY
62+
NO_BATTERY = lib.SDL_POWERSTATE_NO_BATTERY
63+
CHARGING = lib.SDL_POWERSTATE_CHARGING
64+
CHARGED = lib.SDL_POWERSTATE_CHARGED
65+
66+
67+
def _get_power_info() -> Tuple[_PowerState, int, int]:
68+
buffer = ffi.new("int[2]")
69+
power_state = _PowerState(lib.SDL_GetPowerInfo(buffer, buffer + 1))
70+
seconds_of_power = buffer[0]
71+
percenage = buffer[1]
72+
return power_state, seconds_of_power, percenage
File renamed without changes.

0 commit comments

Comments
 (0)