Skip to content

Commit

Permalink
Fix RTSP source adapter freezing on connection loss (#860)
Browse files Browse the repository at this point in the history
  • Loading branch information
tomskikh authored Sep 9, 2024
1 parent bc1438f commit 2e6bc98
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 10 deletions.
40 changes: 36 additions & 4 deletions adapters/gst/gst_plugins/python/ffmpeg_src.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@
from savant.gstreamer.codecs import Codec
from savant.gstreamer.utils import (
gst_post_library_settings_error,
gst_post_stream_demux_error,
gst_post_stream_failed_error,
required_property,
)
from savant.utils.logging import LoggerMixin

DEFAULT_QUEUE_LEN = 100
DEFAULT_TIMEOUT_MS = 10_000
DEFAULT_FFMPEG_LOG_LEVEL = 'info'
STR_TO_FFMPEG_LOG_LEVEL = {
name.lower(): getattr(FFmpegLogLevel, name)
Expand Down Expand Up @@ -62,6 +63,15 @@
False,
GObject.ParamFlags.READWRITE,
),
'timeout-ms': (
int,
'Timeout on RTSP connection (milliseconds).',
'Timeout on RTSP connection (milliseconds).',
0,
GObject.G_MAXINT,
DEFAULT_TIMEOUT_MS,
GObject.ParamFlags.READWRITE,
),
'loglevel': (
str,
'FFmpeg log level',
Expand Down Expand Up @@ -110,6 +120,7 @@ def __init__(self):
self._params: Dict[str, str] = {}
self._queue_len: int = DEFAULT_QUEUE_LEN
self._decode: bool = False
self._timeout_ms: int = DEFAULT_TIMEOUT_MS
self._loglevel: str = DEFAULT_FFMPEG_LOG_LEVEL

self._frame_params: Optional[FrameParams] = None
Expand All @@ -132,6 +143,8 @@ def do_get_property(self, prop: GObject.GParamSpec):
return self._queue_len
if prop.name == 'decode':
return self._decode
if prop.name == 'timeout-ms':
return self._timeout_ms
if prop.name == 'loglevel':
return self._loglevel
raise AttributeError(f'Unknown property {prop.name}')
Expand All @@ -152,6 +165,8 @@ def do_set_property(self, prop: GObject.GParamSpec, value):
self._queue_len = value
elif prop.name == 'decode':
self._decode = value
elif prop.name == 'timeout-ms':
self._timeout_ms = value
elif prop.name == 'loglevel':
self._loglevel = value
else:
Expand All @@ -165,12 +180,13 @@ def do_start(self):
self.logger.info('Creating FFMpegSource.')
self._ffmpeg_source = FFMpegSource(
self._uri,
params=self._params,
params=list(self._params.items()),
queue_len=self._queue_len,
decode=self._decode,
ffmpeg_log_level=STR_TO_FFMPEG_LOG_LEVEL[self._loglevel.lower()],
autoconvert_raw_formats_to_rgb24=True,
)
assert self._ffmpeg_source.is_running, 'Failed to start FFMpegSource.'

except Exception as exc:
self.logger.exception('Failed to start element: %s.', exc, exc_info=True)
Expand All @@ -185,7 +201,23 @@ def do_create(self, offset: int, size: int, buffer: Gst.Buffer = None):

self.logger.debug('Receiving next frame')

frame: VideoFrameEnvelope = self._ffmpeg_source.video_frame()
try:
assert self._ffmpeg_source.is_running, 'FFMpegSource is not running.'
frame: VideoFrameEnvelope = self._ffmpeg_source.video_frame(
self._timeout_ms
)
except Exception as exc:
self.logger.exception('Failed to receive frame: %s', exc, exc_info=True)
code_frame = inspect.currentframe()
gst_post_stream_failed_error(
self,
code_frame,
__file__,
text='Failed to receive frame.',
debug=str(exc),
)
return Gst.FlowReturn.ERROR, None

self.logger.debug(
'Received frame with codec %s, PTS %s and DTS %s.',
frame.codec,
Expand Down Expand Up @@ -243,7 +275,7 @@ def on_frame_params_change(self, frame_params: FrameParams):
error = f'Unsupported codec {frame_params.codec_name!r}.'
self.logger.error(error)
frame = inspect.currentframe()
gst_post_stream_demux_error(
gst_post_stream_failed_error(
gst_element=self,
frame=frame,
file_path=__file__,
Expand Down
3 changes: 3 additions & 0 deletions adapters/gst/sources/ffmpeg.sh
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ FFMPEG_SRC=(ffmpeg_src uri="${URI}" queue-len="${BUFFER_LEN}" loglevel="${FFMPEG
if [[ -n "${FFMPEG_PARAMS}" ]]; then
FFMPEG_SRC+=("params=${FFMPEG_PARAMS}")
fi
if [[ -n "${FFMPEG_TIMEOUT_MS}" ]]; then
FFMPEG_SRC+=("timeout-ms=${FFMPEG_TIMEOUT_MS}")
fi
PIPELINE=(
"${FFMPEG_SRC[@]}" !
savant_parse_bin !
Expand Down
13 changes: 11 additions & 2 deletions adapters/gst/sources/rtsp.sh
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,19 @@ SINK_PROPERTIES=(
sync="${SYNC_OUTPUT}"
ts-offset="${SYNC_DELAY}"
)
FFMPEG_SRC=(
ffmpeg_src
uri="${RTSP_URI}"
params="rtsp_transport=${RTSP_TRANSPORT}"
queue-len="${BUFFER_LEN}"
loglevel="${FFMPEG_LOGLEVEL}"
)
if [[ -n "${FFMPEG_TIMEOUT_MS}" ]]; then
FFMPEG_SRC+=("timeout-ms=${FFMPEG_TIMEOUT_MS}")
fi

PIPELINE=(
ffmpeg_src uri="${RTSP_URI}" params="rtsp_transport=${RTSP_TRANSPORT}"
queue-len="${BUFFER_LEN}" loglevel="${FFMPEG_LOGLEVEL}" !
"${FFMPEG_SRC[@]}" !
savant_parse_bin !
)
if [[ "${USE_ABSOLUTE_TIMESTAMPS,,}" == "true" ]]; then
Expand Down
2 changes: 1 addition & 1 deletion adapters/requirements-gst.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
ffmpeg-input==0.1.23
ffmpeg-input==0.2.0
splitstream==1.2.6
boto3~=1.24.89
fastapi==0.110.0
Expand Down
4 changes: 3 additions & 1 deletion docs/source/savant_101/10_adapters.rst
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,8 @@ The adapter delivers video stream using FFmpeg library. It can be used to read v
* ``FFMPEG_LOGLEVEL``: a log level for FFmpeg; default is ``info``;
* ``BUFFER_LEN``: a maximum amount of frames in FFmpeg buffer; default is ``50``;
* ``SYNC_OUTPUT``: a flag indicating the need to send frames from source synchronously (i.e. at the source file rate); default is ``False``;
* ``SYNC_DELAY``: a delay in seconds before sending frames; default is ``0``.
* ``SYNC_DELAY``: a delay in seconds before sending frames; default is ``0``;
* ``FFMPEG_TIMEOUT_MS``: a timeout in milliseconds for FFmpeg to wait for a frame; default is ``10000``.

Running the adapter with Docker:

Expand Down Expand Up @@ -441,6 +442,7 @@ The RTSP Source Adapter delivers RTSP stream to a module.
- ``SYNC_DELAY``: a delay in seconds before sending frames; when the source has ``B``-frames the flag allows avoiding sending frames in batches; default is ``0``;
- ``RTSP_TRANSPORT``: a transport protocol to use; default is ``tcp``;
- ``BUFFER_LEN``: a maximum amount of frames in the buffer; default is ``50``;
- ``FFMPEG_TIMEOUT_MS``: a timeout in milliseconds for FFmpeg to wait for a frame; default is ``10000``.

Running the adapter with Docker:

Expand Down
4 changes: 2 additions & 2 deletions requirements/base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pyds @ https://github.com/NVIDIA-AI-IOT/deepstream_python_apps/releases/download
pyds @ https://github.com/NVIDIA-AI-IOT/deepstream_python_apps/releases/download/v1.1.11/pyds-1.1.11-py3-none-linux_aarch64.whl ; platform_machine=='aarch64'

# ffmpeg-input
ffmpeg-input @ https://github.com/insight-platform/FFmpeg-Input/releases/download/0.1.23/ffmpeg_input-0.1.23-cp310-cp310-manylinux_2_28_x86_64.whl ; platform_machine=='x86_64'
ffmpeg-input @ https://github.com/insight-platform/FFmpeg-Input/releases/download/0.1.23/ffmpeg_input-0.1.23-cp310-cp310-manylinux_2_28_aarch64.whl ; platform_machine=='aarch64'
ffmpeg-input @ https://github.com/insight-platform/FFmpeg-Input/releases/download/0.2.0/ffmpeg_input-0.2.0-cp310-cp310-manylinux_2_28_x86_64.whl ; platform_machine=='x86_64'
ffmpeg-input @ https://github.com/insight-platform/FFmpeg-Input/releases/download/0.2.0/ffmpeg_input-0.2.0-cp310-cp310-manylinux_2_28_aarch64.whl ; platform_machine=='aarch64'

splitstream==1.2.6
16 changes: 16 additions & 0 deletions scripts/run_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,12 @@ def images_source(
help='Maximum amount of frames in the buffer.',
show_default=True,
)
@click.option(
'--ffmpeg-timeout-ms',
default=10000,
help='Timeout in milliseconds for FFmpeg to wait for a frame.',
show_default=True,
)
@click.option(
'--ffmpeg-loglevel',
default='info',
Expand All @@ -535,6 +541,7 @@ def rtsp_source(
sync: bool,
sync_delay: Optional[int],
buffer_len: int,
ffmpeg_timeout_ms: int,
ffmpeg_loglevel: str,
rtsp_transport: str,
use_absolute_timestamps: Optional[bool],
Expand All @@ -559,6 +566,7 @@ def rtsp_source(
f'RTSP_URI={rtsp_uri}',
f'RTSP_TRANSPORT={rtsp_transport}',
f'BUFFER_LEN={buffer_len}',
f'FFMPEG_TIMEOUT_MS={ffmpeg_timeout_ms}',
f'FFMPEG_LOGLEVEL={ffmpeg_loglevel}',
]
if sync and sync_delay is not None:
Expand Down Expand Up @@ -754,6 +762,12 @@ def gige_cam_source(
help='Maximum amount of frames in the buffer.',
show_default=True,
)
@click.option(
'--ffmpeg-timeout-ms',
default=10000,
help='Timeout in milliseconds for FFmpeg to wait for a frame.',
show_default=True,
)
@click.option(
'--ffmpeg-loglevel',
default='info',
Expand All @@ -776,6 +790,7 @@ def ffmpeg_source(
sync_delay: Optional[int],
ffmpeg_params: Optional[str],
buffer_len: int,
ffmpeg_timeout_ms: int,
ffmpeg_loglevel: str,
device: Optional[str],
use_absolute_timestamps: Optional[bool],
Expand All @@ -799,6 +814,7 @@ def ffmpeg_source(
) + [
f'URI={uri}',
f'BUFFER_LEN={buffer_len}',
f'FFMPEG_TIMEOUT_MS={ffmpeg_timeout_ms}',
f'FFMPEG_LOGLEVEL={ffmpeg_loglevel}',
]
if sync and sync_delay is not None:
Expand Down

0 comments on commit 2e6bc98

Please sign in to comment.