rtspsrc: Use a flow combiner at the source pads instead of custom logic

Most importantly, this ensures that UDP streams still continue to run even if
they are not linked for a while. With decodebin3 the pads will all be unlinked
unless selected, and selecting a stream at a later time would otherwise switch
to a stream with a stopped udpsrc.

Apart from that this also ensures that actual errors from handling RTP packets
between udpsrc and the source pads are not silently ignored but considered
errors like they would be for TCP/interleaved.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/7946>
This commit is contained in:
Sebastian Dröge 2024-11-22 19:37:27 +02:00 committed by GStreamer Marge Bot
parent f880abba46
commit 835e232e8c
2 changed files with 77 additions and 104 deletions

View file

@ -509,8 +509,6 @@ static GstFlowReturn gst_rtspsrc_push_backchannel_buffer (GstRTSPSrc * src,
static GstFlowReturn gst_rtspsrc_push_backchannel_sample (GstRTSPSrc * src,
guint id, GstSample * sample);
static void gst_rtspsrc_reset_flows (GstRTSPSrc * src);
static GstCaps *signal_get_srtcp_params (GstRTSPSrc * src,
GstRTSPStream * stream);
@ -1811,6 +1809,9 @@ gst_rtspsrc_init (GstRTSPSrc * src)
g_mutex_init (&src->group_lock);
src->flow_combiner = gst_flow_combiner_new ();
g_mutex_init (&src->flow_combiner_lock);
GST_OBJECT_FLAG_SET (src, GST_ELEMENT_FLAG_SOURCE);
gst_bin_set_suppressed_flags (GST_BIN (src),
GST_ELEMENT_FLAG_SOURCE | GST_ELEMENT_FLAG_SINK);
@ -1866,6 +1867,9 @@ gst_rtspsrc_finalize (GObject * object)
rtspsrc->prop_extra_http_request_headers = NULL;
}
gst_flow_combiner_unref (rtspsrc->flow_combiner);
g_mutex_clear (&rtspsrc->flow_combiner_lock);
/* free locks */
g_rec_mutex_clear (&rtspsrc->stream_rec_lock);
g_rec_mutex_clear (&rtspsrc->state_rec_lock);
@ -2367,19 +2371,6 @@ find_stream_by_channel (GstRTSPStream * stream, gint * channel)
return -1;
}
static gint
find_stream_by_udpsrc (GstRTSPStream * stream, gconstpointer a)
{
GstElement *src = (GstElement *) a;
if (stream->udpsrc[0] == src)
return 0;
if (stream->udpsrc[1] == src)
return 0;
return -1;
}
static gint
find_stream_by_setup (GstRTSPStream * stream, gconstpointer a)
{
@ -2805,9 +2796,13 @@ gst_rtspsrc_stream_free (GstRTSPSrc * src, GstRTSPStream * stream)
}
if (stream->srcpad) {
gst_pad_set_active (stream->srcpad, FALSE);
if (stream->added)
if (stream->added) {
g_mutex_lock (&src->flow_combiner_lock);
gst_flow_combiner_remove_pad (src->flow_combiner, stream->srcpad);
g_mutex_unlock (&src->flow_combiner_lock);
gst_element_remove_pad (GST_ELEMENT_CAST (src), stream->srcpad);
}
}
if (stream->srtpenc)
gst_object_unref (stream->srtpenc);
if (stream->srtpdec)
@ -2844,6 +2839,9 @@ gst_rtspsrc_cleanup (GstRTSPSrc * src)
}
g_list_free (src->streams);
src->streams = NULL;
g_mutex_lock (&src->flow_combiner_lock);
gst_flow_combiner_reset (src->flow_combiner);
g_mutex_unlock (&src->flow_combiner_lock);
if (src->manager) {
if (src->manager_sig_id) {
g_signal_handler_disconnect (src->manager, src->manager_sig_id);
@ -3110,7 +3108,9 @@ gst_rtspsrc_flush (GstRTSPSrc * src, gboolean flush, gboolean playing)
state = GST_STATE_PAUSED;
}
gst_rtspsrc_push_event (src, event);
gst_rtspsrc_reset_flows (src);
g_mutex_lock (&src->flow_combiner_lock);
gst_flow_combiner_reset (src->flow_combiner);
g_mutex_unlock (&src->flow_combiner_lock);
gst_rtspsrc_loop_send_cmd (src, cmd, CMD_LOOP);
gst_rtspsrc_set_state (src, state);
}
@ -3465,6 +3465,44 @@ gst_rtspsrc_handle_src_sink_event (GstPad * pad, GstObject * parent,
return gst_pad_push_event (stream->srcpad, event);
}
static GstFlowReturn
gst_rtspsrc_handle_src_sink_chain (GstPad * pad, GstObject * parent,
GstBuffer * buffer)
{
GstFlowReturn ret;
GstRTSPStream *stream;
stream = gst_pad_get_element_private (pad);
ret = gst_pad_push (stream->srcpad, buffer);
g_mutex_lock (&stream->parent->flow_combiner_lock);
ret =
gst_flow_combiner_update_pad_flow (stream->parent->flow_combiner,
stream->srcpad, ret);
g_mutex_unlock (&stream->parent->flow_combiner_lock);
return ret;
}
static GstFlowReturn
gst_rtspsrc_handle_src_sink_chain_list (GstPad * pad, GstObject * parent,
GstBufferList * list)
{
GstFlowReturn ret;
GstRTSPStream *stream;
stream = gst_pad_get_element_private (pad);
ret = gst_pad_push_list (stream->srcpad, list);
g_mutex_lock (&stream->parent->flow_combiner_lock);
ret =
gst_flow_combiner_update_pad_flow (stream->parent->flow_combiner,
stream->srcpad, ret);
g_mutex_unlock (&stream->parent->flow_combiner_lock);
return ret;
}
/* this is the final event function we receive on the internal source pad when
* we deal with TCP connections */
static gboolean
@ -3889,6 +3927,9 @@ new_manager_pad (GstElement * manager, GstPad * pad, GstRTSPSrc * src)
GST_PAD (gst_proxy_pad_get_internal (GST_PROXY_PAD (stream->srcpad)));
gst_pad_set_element_private (internal_src, stream);
gst_pad_set_event_function (internal_src, gst_rtspsrc_handle_src_sink_event);
gst_pad_set_chain_function (internal_src, gst_rtspsrc_handle_src_sink_chain);
gst_pad_set_chain_list_function (internal_src,
gst_rtspsrc_handle_src_sink_chain_list);
gst_pad_set_event_function (stream->srcpad, gst_rtspsrc_handle_src_event);
gst_pad_set_query_function (stream->srcpad, gst_rtspsrc_handle_src_query);
@ -3902,10 +3943,14 @@ new_manager_pad (GstElement * manager, GstPad * pad, GstRTSPSrc * src)
gst_object_unref (internal_src);
/* don't add the srcpad if this is a sendonly stream */
if (stream->is_backchannel)
if (stream->is_backchannel) {
add_backchannel_fakesink (src, stream, stream->srcpad);
else
} else {
g_mutex_lock (&src->flow_combiner_lock);
gst_flow_combiner_add_pad (src->flow_combiner, stream->srcpad);
g_mutex_unlock (&src->flow_combiner_lock);
gst_element_add_pad (GST_ELEMENT_CAST (src), stream->srcpad);
}
if (all_added) {
GST_DEBUG_OBJECT (src, "We added all streams");
@ -5373,6 +5418,10 @@ gst_rtspsrc_stream_configure_transport (GstRTSPStream * stream,
gst_pad_set_element_private (internal_src, stream);
gst_pad_set_event_function (internal_src,
gst_rtspsrc_handle_src_sink_event);
gst_pad_set_chain_function (internal_src,
gst_rtspsrc_handle_src_sink_chain);
gst_pad_set_chain_list_function (internal_src,
gst_rtspsrc_handle_src_sink_chain_list);
gst_object_unref (internal_src);
gst_pad_set_event_function (stream->srcpad, gst_rtspsrc_handle_src_event);
@ -5470,10 +5519,14 @@ gst_rtspsrc_activate_streams (GstRTSPSrc * src)
/* add the pad */
if (!stream->added) {
GST_DEBUG_OBJECT (src, "adding stream pad %p", stream);
if (stream->is_backchannel)
if (stream->is_backchannel) {
add_backchannel_fakesink (src, stream, stream->srcpad);
else
} else {
g_mutex_lock (&src->flow_combiner_lock);
gst_flow_combiner_add_pad (src->flow_combiner, stream->srcpad);
g_mutex_unlock (&src->flow_combiner_lock);
gst_element_add_pad (GST_ELEMENT_CAST (src), stream->srcpad);
}
stream->added = TRUE;
}
}
@ -5556,49 +5609,6 @@ gst_rtspsrc_configure_caps (GstRTSPSrc * src, GstSegment * segment,
}
}
static GstFlowReturn
gst_rtspsrc_combine_flows (GstRTSPSrc * src, GstRTSPStream * stream,
GstFlowReturn ret)
{
GList *streams;
/* store the value */
stream->last_ret = ret;
/* if it's success we can return the value right away */
if (ret == GST_FLOW_OK)
goto done;
/* any other error that is not-linked can be returned right
* away */
if (ret != GST_FLOW_NOT_LINKED)
goto done;
/* only return NOT_LINKED if all other pads returned NOT_LINKED */
for (streams = src->streams; streams; streams = g_list_next (streams)) {
GstRTSPStream *ostream = (GstRTSPStream *) streams->data;
ret = ostream->last_ret;
/* some other return value (must be SUCCESS but we can return
* other values as well) */
if (ret != GST_FLOW_NOT_LINKED)
goto done;
}
/* if we get here, all other pads were unlinked and we return
* NOT_LINKED then */
done:
return ret;
}
static void
gst_rtspsrc_reset_flows (GstRTSPSrc * src)
{
for (GList * streams = src->streams; streams; streams = g_list_next (streams)) {
GstRTSPStream *ostream = (GstRTSPStream *) streams->data;
ostream->last_ret = GST_FLOW_OK;
}
}
static gboolean
gst_rtspsrc_stream_push_event (GstRTSPSrc * src, GstRTSPStream * stream,
GstEvent * event)
@ -6237,10 +6247,6 @@ gst_rtspsrc_handle_data (GstRTSPSrc * src, GstRTSPMessage * message)
else
ret = gst_pad_push (outpad, buf);
if (!is_rtcp) {
/* combine all stream flows for the data transport */
ret = gst_rtspsrc_combine_flows (src, stream, ret);
}
return ret;
/* ERRORS */
@ -9986,42 +9992,6 @@ gst_rtspsrc_handle_message (GstBin * bin, GstMessage * message)
GST_BIN_CLASS (parent_class)->handle_message (bin, message);
break;
}
case GST_MESSAGE_ERROR:
{
GstObject *udpsrc;
GstRTSPStream *stream;
GstFlowReturn ret;
udpsrc = GST_MESSAGE_SRC (message);
GST_DEBUG_OBJECT (rtspsrc, "got error from %s",
GST_ELEMENT_NAME (udpsrc));
stream = find_stream (rtspsrc, udpsrc, (gpointer) find_stream_by_udpsrc);
if (!stream)
goto forward;
/* we ignore the RTCP udpsrc */
if (stream->udpsrc[1] == GST_ELEMENT_CAST (udpsrc))
goto done;
/* if we get error messages from the udp sources, that's not a problem as
* long as not all of them error out. We also don't really know what the
* problem is, the message does not give enough detail... */
ret = gst_rtspsrc_combine_flows (rtspsrc, stream, GST_FLOW_NOT_LINKED);
GST_DEBUG_OBJECT (rtspsrc, "combined flows: %s", gst_flow_get_name (ret));
if (ret != GST_FLOW_OK)
goto forward;
done:
gst_message_unref (message);
break;
forward:
/* fatal but not our message, forward */
GST_BIN_CLASS (parent_class)->handle_message (bin, message);
break;
}
default:
{
GST_BIN_CLASS (parent_class)->handle_message (bin, message);

View file

@ -45,6 +45,7 @@
#define __GST_RTSPSRC_H__
#include <gst/gst.h>
#include <gst/base/base.h>
G_BEGIN_DECLS
@ -227,6 +228,8 @@ struct _GstRTSPSrc {
GstSDPMessage *sdp;
gboolean from_sdp;
GList *streams;
GMutex flow_combiner_lock;
GstFlowCombiner *flow_combiner;
GstStructure *props;
gboolean need_activate;