Skip to content

Commit ce1f6a4

Browse files
author
arch
committed
add watchdog to ffmpeg stream
1 parent 632adec commit ce1f6a4

File tree

3 files changed

+127
-54
lines changed

3 files changed

+127
-54
lines changed

funscript_editor/algorithms/funscriptgenerator.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -950,6 +950,9 @@ def tracking(self) -> str:
950950
self.logger.critical("The program crashed due to a fatal error", exc_info=ex)
951951
return "program crashed"
952952

953+
if video.isTimeout():
954+
status = "Reach a corrupt video frame"
955+
953956
self.__show_loading_screen(first_frame.shape)
954957
self.logger.info("Raw tracking data: %d Tracking points for %d seconds of the video", len(bboxes["Woman"]), int(len(bboxes["Woman"])*(self.params.skip_frames + 1)/self.video_info.fps))
955958
bboxes = self.correct_bboxes(bboxes, delete_last_predictions)

funscript_editor/data/ffmpegstream.py

Lines changed: 99 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import subprocess as sp
1414
import numpy as np
1515

16+
from funscript_editor.utils.watchdog import Watchdog
1617

1718
@dataclass
1819
class VideoInfo:
@@ -31,34 +32,55 @@ class FFmpegStream:
3132
config (dict): conversion parameter
3233
start_frame (int): start frame number
3334
queue_size (int): size of frame buffer
35+
watchdog_timeout (int): watchdog timeout in seconds
3436
"""
3537

3638
def __init__(self,
3739
video_path :str,
3840
config :dict,
3941
start_frame :int = 0,
40-
queue_size :int = 256):
42+
queue_size :int = 256,
43+
watchdog_timeout :int = 4):
4144

4245
self.video_path = video_path
4346
self.config = config
4447
self.start_frame = start_frame
4548
self.queue_size = queue_size
4649

4750
self.stopped = False
51+
self.timeout = False
4852
self.current_frame = 0
4953
self.sleep_time = 0.001
5054

5155
self.video_info = self.get_video_info(video_path)
5256
self.frame_buffer = Queue(maxsize=queue_size)
5357

58+
self.watchdog = Watchdog(watchdog_timeout, self.watchdog_timeout)
5459
self.thread = Thread(target=self.run, args=())
5560
self.thread.daemon = True
5661
self.thread.start()
5762

5863

64+
def __del__(self):
65+
self.watchdog.stop()
66+
67+
5968
logger = logging.getLogger(__name__)
6069

6170

71+
def watchdog_timeout(self):
72+
""" Watchdog timeout for ffmpeg stream """
73+
self.logger.error("FFmpegStream Timeout")
74+
self.timeout = True
75+
self.stopped = True
76+
try: self.pipe.terminate()
77+
except: pass
78+
try: self.pipe.stdout.close()
79+
except: pass
80+
try: self.pipe.stderr.close()
81+
except: pass
82+
83+
6284
@staticmethod
6385
def get_video_info(
6486
video_path: str) -> VideoInfo:
@@ -261,6 +283,15 @@ def isOpen(self) -> bool:
261283
return self.more() or not self.stopped
262284

263285

286+
def isTimeout(self) -> bool:
287+
""" Check if FFmpeg video stream has an timeout
288+
289+
Returns:
290+
bool: True if ffmpeg video stream has an timeout else False
291+
"""
292+
return self.timeout
293+
294+
264295
def more(self) -> bool:
265296
""" Check if frames in the frame bufer are available
266297
@@ -278,59 +309,73 @@ def more(self) -> bool:
278309
def run(self) -> None:
279310
""" Function to read transformed frames from ffmpeg video stream into a queue """
280311

281-
video_filter = self.config['video_filter']
282-
for k, v in self.config['parameter'].items():
283-
video_filter = video_filter.replace('${' + k + '}', str(v))
284-
285-
seek = FFmpegStream.frame_to_timestamp(self.start_frame, self.video_info.fps)
286-
287-
command = [
288-
FFmpegStream.get_ffmpeg_command(),
289-
'-hide_banner',
290-
'-loglevel', 'warning',
291-
'-ss', str(seek),
292-
'-hwaccel', 'auto',
293-
'-i', self.video_path,
294-
'-f', 'image2pipe',
295-
'-pix_fmt', 'bgr24',
296-
'-vsync', '0',
297-
'-vcodec', 'rawvideo',
298-
'-an',
299-
'-sn',
300-
'-vf', video_filter,
301-
'-'
302-
]
303-
304-
self.logger.info("Open FFmpeg Stream")
305-
pipe = sp.Popen(
306-
command,
307-
stdout = sp.PIPE,
308-
stderr = sp.PIPE,
309-
bufsize= 3 * self.config['parameter']['height'] * self.config['parameter']['width']
310-
)
311-
312-
while not self.stopped:
313-
data = pipe.stdout.read(self.config['parameter']['width'] * self.config['parameter']['height'] * 3)
314-
if not data:
315-
break
316-
317-
frame = np.frombuffer(data, dtype='uint8').reshape(
318-
(self.config['parameter']['height'], self.config['parameter']['width'], 3)
312+
try:
313+
video_filter = self.config['video_filter']
314+
for k, v in self.config['parameter'].items():
315+
video_filter = video_filter.replace('${' + k + '}', str(v))
316+
317+
seek = FFmpegStream.frame_to_timestamp(self.start_frame, self.video_info.fps)
318+
319+
command = [
320+
FFmpegStream.get_ffmpeg_command(),
321+
'-hide_banner',
322+
'-loglevel', 'warning',
323+
'-ss', str(seek),
324+
'-hwaccel', 'auto',
325+
'-i', self.video_path,
326+
'-f', 'image2pipe',
327+
'-pix_fmt', 'bgr24',
328+
'-vsync', '0',
329+
'-vcodec', 'rawvideo',
330+
'-an',
331+
'-sn',
332+
'-vf', video_filter,
333+
'-'
334+
]
335+
336+
self.watchdog.start()
337+
self.logger.info("FFmpeg Stream Watchdog started")
338+
self.logger.info("Open FFmpeg Stream")
339+
self.pipe = sp.Popen(
340+
command,
341+
stdout = sp.PIPE,
342+
stderr = sp.PIPE,
343+
bufsize= 3 * self.config['parameter']['height'] * self.config['parameter']['width']
319344
)
320-
if frame is None:
321-
break
322-
323-
while self.frame_buffer.full() and not self.stopped:
324-
time.sleep(self.sleep_time)
325-
326-
self.frame_buffer.put(frame)
327-
self.current_frame += 1
328345

329-
self.stopped = True
330-
self.logger.info('Close FFmpeg Stream')
331-
pipe.terminate()
332-
try: pipe.stdout.close()
333-
except: pass
334-
try: pipe.stderr.close()
335-
except: pass
346+
while not self.stopped:
347+
self.watchdog.trigger()
348+
data = self.pipe.stdout.read(self.config['parameter']['width'] * self.config['parameter']['height'] * 3)
349+
if not data:
350+
break
351+
352+
frame = np.frombuffer(data, dtype='uint8').reshape(
353+
(self.config['parameter']['height'], self.config['parameter']['width'], 3)
354+
)
355+
if frame is None:
356+
break
357+
358+
wait_counter = 0
359+
while self.frame_buffer.full() and not self.stopped:
360+
self.watchdog.trigger()
361+
time.sleep(self.sleep_time)
362+
wait_counter += 1
363+
if wait_counter == 2000:
364+
self.logger.error("FFmpeg Frame Buffer overrun!!!")
365+
366+
self.frame_buffer.put(frame)
367+
self.current_frame += 1
368+
369+
self.stopped = True
370+
self.logger.info('Close FFmpeg Stream')
371+
self.watchdog.stop()
372+
self.logger.info("FFmpeg Stream Watchdog stoped")
373+
self.pipe.terminate()
374+
try: self.pipe.stdout.close()
375+
except: pass
376+
try: self.pipe.stderr.close()
377+
except: pass
378+
except Exception as ex:
379+
self.stopped = True
380+
self.logger.critical("FFmpegStream crashed due to a fatal error", exc_info=ex)
336381

funscript_editor/utils/watchdog.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
from threading import Timer
2+
3+
class Watchdog(Exception):
4+
def __init__(self, timeout_in_seconds, userHandler=None):
5+
self.timeout = timeout_in_seconds
6+
self.handler = userHandler if userHandler is not None else self.defaultHandler
7+
self.timer = Timer(self.timeout, self.handler)
8+
self.started = False
9+
10+
def start(self):
11+
if not self.started:
12+
self.started = True
13+
self.timer.start()
14+
15+
def trigger(self):
16+
self.timer.cancel()
17+
self.timer = Timer(self.timeout, self.handler)
18+
self.timer.start()
19+
20+
def stop(self):
21+
try: self.timer.cancel()
22+
except: pass
23+
24+
def defaultHandler(self):
25+
raise self

0 commit comments

Comments
 (0)