Skip to content

Commit bb2bbff

Browse files
committed
[save-images] Make all threads exception-safe
Ensure errors are re-raised safely from worker threads by using non-blocking puts and monitoring a common error queue.
1 parent 86b31aa commit bb2bbff

File tree

2 files changed

+104
-82
lines changed

2 files changed

+104
-82
lines changed

scenedetect/__init__.py

Lines changed: 104 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import math
1919
import queue
20+
import sys
2021
import threading
2122
import typing as ty
2223
from dataclasses import dataclass
@@ -183,46 +184,8 @@ def detect(
183184
scene_manager.stats_manager.save_to_csv(csv_file=stats_file_path)
184185
return scene_manager.get_scene_list(start_in_scene=start_in_scene)
185186

186-
187-
# TODO: Just merge these variables into the extractor.
188-
@dataclass
189-
class ImageExtractorConfig:
190-
num_images: int = 3
191-
"""Number of images to generate for each scene. Minimum is 1."""
192-
frame_margin: int = 1
193-
"""Number of frames to pad each scene around the beginning
194-
and end (e.g. moves the first/last image into the scene by N frames).
195-
Can set to 0, but will result in some video files failing to extract
196-
the very last frame."""
197-
image_extension: str = "jpg"
198-
"""Type of image to save (must be one of 'jpg', 'png', or 'webp')."""
199-
encoder_param: int = 95
200-
"""Quality/compression efficiency, based on type of image:
201-
'jpg' / 'webp': Quality 0-100, higher is better quality. 100 is lossless for webp.
202-
'png': Compression from 1-9, where 9 achieves best filesize but is slower to encode."""
203-
image_name_template: str = "$VIDEO_NAME-Scene-$SCENE_NUMBER-$IMAGE_NUMBER"
204-
"""Template to use for naming image files. Can use the template variables
205-
$VIDEO_NAME, $SCENE_NUMBER, $IMAGE_NUMBER, $TIMECODE, $FRAME_NUMBER, $TIMESTAMP_MS.
206-
Should not include an extension."""
207-
scale: ty.Optional[float] = None
208-
"""Optional factor by which to rescale saved images. A scaling factor of 1 would
209-
not result in rescaling. A value < 1 results in a smaller saved image, while a
210-
value > 1 results in an image larger than the original. This value is ignored if
211-
either the height or width values are specified."""
212-
height: ty.Optional[int] = None
213-
"""Optional value for the height of the saved images. Specifying both the height
214-
and width will resize images to an exact size, regardless of aspect ratio.
215-
Specifying only height will rescale the image to that number of pixels in height
216-
while preserving the aspect ratio."""
217-
width: ty.Optional[int] = None
218-
"""Optional value for the width of the saved images. Specifying both the width
219-
and height will resize images to an exact size, regardless of aspect ratio.
220-
Specifying only width will rescale the image to that number of pixels wide
221-
while preserving the aspect ratio."""
222-
interpolation: Interpolation = Interpolation.CUBIC
223-
"""Type of interpolation to use when resizing images."""
224-
225-
187+
# TODO: Move this into scenedetect._postprocessing. (can make that public module once stable)
188+
# Also move some common types to scenedetect.common out of scene_manager to facilitate code reuse.
226189
class ImageExtractor:
227190
def __init__(
228191
self,
@@ -236,7 +199,10 @@ def __init__(
236199
width: ty.Optional[int] = None,
237200
interpolation: Interpolation = Interpolation.CUBIC,
238201
):
239-
"""Helper type to handle saving images for a set of scenes. This object is *not* thread-safe.
202+
"""Multi-threaded implementation of save-images functionality. Uses background threads to
203+
handle image encoding and saving images to disk to improve parallelism.
204+
205+
This object is thread-safe.
240206
241207
Arguments:
242208
num_images: Number of images to generate for each scene. Minimum is 1.
@@ -275,13 +241,22 @@ def __init__(
275241
self._width = width
276242
self._interpolation = interpolation
277243

244+
278245
def run(
279246
self,
280247
video: VideoStream,
281248
scene_list: SceneList,
282249
output_dir: ty.Optional[str] = None,
283250
show_progress=False,
284251
) -> ty.Dict[int, ty.List[str]]:
252+
"""Run image extraction on `video` using the current parameters. Thread-safe.
253+
254+
Arguments:
255+
video: The video to process.
256+
scene_list: The scenes detected in the video.
257+
output_dir: Directory to write files to.
258+
show_progress: If `true` and tqdm is available, shows a progress bar.
259+
"""
285260
if not scene_list:
286261
return {}
287262
if self._num_images <= 0 or self._frame_margin < 0:
@@ -300,70 +275,116 @@ def run(
300275
total=len(scene_list) * self._num_images, unit="images", dynamic_ncols=True
301276
)
302277

278+
timecode_list = self.generate_timecode_list(scene_list)
279+
image_filenames = {i: [] for i in range(len(timecode_list))}
280+
303281
filename_template = Template(self._image_name_template)
282+
logger.debug("Writing images with template %s", filename_template.template)
304283
scene_num_format = "%0"
305284
scene_num_format += str(max(3, math.floor(math.log(len(scene_list), 10)) + 1)) + "d"
306285
image_num_format = "%0"
307286
image_num_format += str(math.floor(math.log(self._num_images, 10)) + 2) + "d"
308287

309-
timecode_list = self.generate_timecode_list(scene_list)
310-
image_filenames = {i: [] for i in range(len(timecode_list))}
311-
logger.debug("Writing images with template %s", filename_template.template)
288+
def format_filename(scene_number: int, image_number: int, image_timecode: FrameTimecode):
289+
return "%s.%s" % (
290+
filename_template.safe_substitute(
291+
VIDEO_NAME=video.name,
292+
SCENE_NUMBER=scene_num_format % (scene_number + 1),
293+
IMAGE_NUMBER=image_num_format % (image_number + 1),
294+
FRAME_NUMBER=image_timecode.get_frames(),
295+
TIMESTAMP_MS=int(image_timecode.get_seconds() * 1000),
296+
TIMECODE=image_timecode.get_timecode().replace(":", ";"),
297+
),
298+
self._image_extension,
299+
)
312300

313301
MAX_QUEUED_ENCODE_FRAMES = 4
314302
MAX_QUEUED_SAVE_IMAGES = 4
315303
encode_queue = queue.Queue(MAX_QUEUED_ENCODE_FRAMES)
316304
save_queue = queue.Queue(MAX_QUEUED_SAVE_IMAGES)
317-
encode_thread = threading.Thread(
318-
target=self._image_encode_thread,
319-
args=(video, encode_queue, save_queue, self._image_extension),
320-
daemon=True,
321-
)
322-
save_thread = threading.Thread(
323-
target=self._save_files_thread,
324-
args=(save_queue, progress_bar),
325-
daemon=True,
305+
error_queue = queue.Queue(2) # Queue size must be the same as the # of worker threads!
306+
307+
def check_error_queue():
308+
try:
309+
return error_queue.get(block=False)
310+
except queue.Empty:
311+
pass
312+
return None
313+
314+
def launch_thread(callable, *args, **kwargs):
315+
def capture_errors(callable, *args, **kwargs):
316+
try:
317+
return callable(*args, **kwargs)
318+
# Errors we capture in `error_queue` will be re-raised by this thread.
319+
except: # noqa: E722
320+
error_queue.put(sys.exc_info())
321+
return None
322+
323+
thread = threading.Thread(
324+
target=capture_errors,
325+
args=(
326+
callable,
327+
*args,
328+
),
329+
kwargs=kwargs,
330+
daemon=True,
331+
)
332+
thread.start()
333+
return thread
334+
335+
def checked_put(work_queue: queue.Queue, item: ty.Any):
336+
error = None
337+
while True:
338+
try:
339+
work_queue.put(item, timeout=0.1)
340+
break
341+
except queue.Full:
342+
error = check_error_queue()
343+
if error is None:
344+
continue
345+
if error is not None:
346+
raise error[1].with_traceback(error[2])
347+
348+
encode_thread = launch_thread(
349+
self._encode_images,
350+
video,
351+
encode_queue,
352+
save_queue,
353+
self._image_extension,
326354
)
327-
encode_thread.start()
328-
save_thread.start()
355+
save_thread = launch_thread(self._save_images, save_queue, progress_bar)
329356

330357
for i, scene_timecodes in enumerate(timecode_list):
331-
for j, image_timecode in enumerate(scene_timecodes):
332-
video.seek(image_timecode)
358+
for j, timecode in enumerate(scene_timecodes):
359+
video.seek(timecode)
333360
frame_im = video.read()
334361
if frame_im is not None and frame_im is not False:
335-
# TODO: Add extension to template.
336-
# TODO: Allow NUM to be a valid suffix in addition to NUMBER.
337-
file_path = "%s.%s" % (
338-
filename_template.safe_substitute(
339-
VIDEO_NAME=video.name,
340-
SCENE_NUMBER=scene_num_format % (i + 1),
341-
IMAGE_NUMBER=image_num_format % (j + 1),
342-
FRAME_NUMBER=image_timecode.get_frames(),
343-
TIMESTAMP_MS=int(image_timecode.get_seconds() * 1000),
344-
TIMECODE=image_timecode.get_timecode().replace(":", ";"),
345-
),
346-
self._image_extension,
347-
)
362+
file_path = format_filename(i, j, timecode)
348363
image_filenames[i].append(file_path)
349-
encode_queue.put((frame_im, get_and_create_path(file_path, output_dir)))
364+
checked_put(
365+
encode_queue, (frame_im, get_and_create_path(file_path, output_dir))
366+
)
350367
else:
351368
completed = False
352369
break
353370

354-
# *WARNING*: We do not handle errors or exceptions yet, and this can deadlock on errors!
355-
encode_queue.put((None, None))
356-
save_queue.put((None, None))
371+
checked_put(encode_queue, (None, None))
372+
checked_put(save_queue, (None, None))
357373
encode_thread.join()
358374
save_thread.join()
375+
376+
error = check_error_queue()
377+
if error is not None:
378+
raise error[1].with_traceback(error[2])
379+
359380
if progress_bar is not None:
360381
progress_bar.close()
361382
if not completed:
362383
logger.error("Could not generate all output images.")
363384

364385
return image_filenames
365386

366-
def _image_encode_thread(
387+
def _encode_images(
367388
self,
368389
video: VideoStream,
369390
encode_queue: queue.Queue,
@@ -393,7 +414,7 @@ def _image_encode_thread(
393414
continue
394415
save_queue.put((encoded, dest_path))
395416

396-
def _save_files_thread(self, save_queue: queue.Queue, progress_bar: tqdm):
417+
def _save_images(self, save_queue: queue.Queue, progress_bar: tqdm):
397418
while True:
398419
encoded, dest_path = save_queue.get()
399420
if encoded is None:
@@ -457,14 +478,16 @@ def resize_image(
457478
image = cv2.resize(
458479
image, (0, 0), fx=aspect_ratio, fy=1.0, interpolation=self._interpolation.value
459480
)
481+
# Figure out what kind of resizing needs to be done
482+
width = self._width
483+
height = self._height
460484
image_height = image.shape[0]
461485
image_width = image.shape[1]
462-
# Figure out what kind of resizing needs to be done
463-
if self._height or self._width:
464-
if self._height and not self._width:
465-
factor = self._height / float(image_height)
486+
if width or height:
487+
if height and not width:
488+
factor = height / float(image_height)
466489
width = int(factor * image_width)
467-
if self._width and not self._height:
490+
if width and not height:
468491
factor = width / float(image_width)
469492
height = int(factor * image_height)
470493
assert height > 0 and width > 0

scenedetect/scene_manager.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,6 @@ def on_new_scene(frame_img: numpy.ndarray, frame_num: int):
8686
import sys
8787
import threading
8888
from enum import Enum
89-
from pathlib import Path
9089
from typing import Callable, Dict, List, Optional, TextIO, Tuple, Union
9190

9291
import cv2

0 commit comments

Comments
 (0)