From 835e232e8ca798064cf3526045cbcde94df6820d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Fri, 22 Nov 2024 19:37:27 +0200 Subject: [PATCH] rtspsrc: Use a flow combiner at the source pads instead of custom logic Most importantly, this ensures that UDP streams still continue to run even if they are not linked for a while. With decodebin3 the pads will all be unlinked unless selected, and selecting a stream at a later time would otherwise switch to a stream with a stopped udpsrc. Apart from that this also ensures that actual errors from handling RTP packets between udpsrc and the source pads are not silently ignored but considered errors like they would be for TCP/interleaved. Part-of: --- .../gst-plugins-good/gst/rtsp/gstrtspsrc.c | 178 ++++++++---------- .../gst-plugins-good/gst/rtsp/gstrtspsrc.h | 3 + 2 files changed, 77 insertions(+), 104 deletions(-) diff --git a/subprojects/gst-plugins-good/gst/rtsp/gstrtspsrc.c b/subprojects/gst-plugins-good/gst/rtsp/gstrtspsrc.c index 7979b6d7a9..1c646d2b36 100644 --- a/subprojects/gst-plugins-good/gst/rtsp/gstrtspsrc.c +++ b/subprojects/gst-plugins-good/gst/rtsp/gstrtspsrc.c @@ -509,8 +509,6 @@ static GstFlowReturn gst_rtspsrc_push_backchannel_buffer (GstRTSPSrc * src, static GstFlowReturn gst_rtspsrc_push_backchannel_sample (GstRTSPSrc * src, guint id, GstSample * sample); -static void gst_rtspsrc_reset_flows (GstRTSPSrc * src); - static GstCaps *signal_get_srtcp_params (GstRTSPSrc * src, GstRTSPStream * stream); @@ -1811,6 +1809,9 @@ gst_rtspsrc_init (GstRTSPSrc * src) g_mutex_init (&src->group_lock); + src->flow_combiner = gst_flow_combiner_new (); + g_mutex_init (&src->flow_combiner_lock); + GST_OBJECT_FLAG_SET (src, GST_ELEMENT_FLAG_SOURCE); gst_bin_set_suppressed_flags (GST_BIN (src), GST_ELEMENT_FLAG_SOURCE | GST_ELEMENT_FLAG_SINK); @@ -1866,6 +1867,9 @@ gst_rtspsrc_finalize (GObject * object) rtspsrc->prop_extra_http_request_headers = NULL; } + gst_flow_combiner_unref (rtspsrc->flow_combiner); + g_mutex_clear (&rtspsrc->flow_combiner_lock); + /* free locks */ g_rec_mutex_clear (&rtspsrc->stream_rec_lock); g_rec_mutex_clear (&rtspsrc->state_rec_lock); @@ -2367,19 +2371,6 @@ find_stream_by_channel (GstRTSPStream * stream, gint * channel) return -1; } -static gint -find_stream_by_udpsrc (GstRTSPStream * stream, gconstpointer a) -{ - GstElement *src = (GstElement *) a; - - if (stream->udpsrc[0] == src) - return 0; - if (stream->udpsrc[1] == src) - return 0; - - return -1; -} - static gint find_stream_by_setup (GstRTSPStream * stream, gconstpointer a) { @@ -2805,8 +2796,12 @@ gst_rtspsrc_stream_free (GstRTSPSrc * src, GstRTSPStream * stream) } if (stream->srcpad) { gst_pad_set_active (stream->srcpad, FALSE); - if (stream->added) + if (stream->added) { + g_mutex_lock (&src->flow_combiner_lock); + gst_flow_combiner_remove_pad (src->flow_combiner, stream->srcpad); + g_mutex_unlock (&src->flow_combiner_lock); gst_element_remove_pad (GST_ELEMENT_CAST (src), stream->srcpad); + } } if (stream->srtpenc) gst_object_unref (stream->srtpenc); @@ -2844,6 +2839,9 @@ gst_rtspsrc_cleanup (GstRTSPSrc * src) } g_list_free (src->streams); src->streams = NULL; + g_mutex_lock (&src->flow_combiner_lock); + gst_flow_combiner_reset (src->flow_combiner); + g_mutex_unlock (&src->flow_combiner_lock); if (src->manager) { if (src->manager_sig_id) { g_signal_handler_disconnect (src->manager, src->manager_sig_id); @@ -3110,7 +3108,9 @@ gst_rtspsrc_flush (GstRTSPSrc * src, gboolean flush, gboolean playing) state = GST_STATE_PAUSED; } gst_rtspsrc_push_event (src, event); - gst_rtspsrc_reset_flows (src); + g_mutex_lock (&src->flow_combiner_lock); + gst_flow_combiner_reset (src->flow_combiner); + g_mutex_unlock (&src->flow_combiner_lock); gst_rtspsrc_loop_send_cmd (src, cmd, CMD_LOOP); gst_rtspsrc_set_state (src, state); } @@ -3465,6 +3465,44 @@ gst_rtspsrc_handle_src_sink_event (GstPad * pad, GstObject * parent, return gst_pad_push_event (stream->srcpad, event); } +static GstFlowReturn +gst_rtspsrc_handle_src_sink_chain (GstPad * pad, GstObject * parent, + GstBuffer * buffer) +{ + GstFlowReturn ret; + GstRTSPStream *stream; + + stream = gst_pad_get_element_private (pad); + + ret = gst_pad_push (stream->srcpad, buffer); + g_mutex_lock (&stream->parent->flow_combiner_lock); + ret = + gst_flow_combiner_update_pad_flow (stream->parent->flow_combiner, + stream->srcpad, ret); + g_mutex_unlock (&stream->parent->flow_combiner_lock); + + return ret; +} + +static GstFlowReturn +gst_rtspsrc_handle_src_sink_chain_list (GstPad * pad, GstObject * parent, + GstBufferList * list) +{ + GstFlowReturn ret; + GstRTSPStream *stream; + + stream = gst_pad_get_element_private (pad); + + ret = gst_pad_push_list (stream->srcpad, list); + g_mutex_lock (&stream->parent->flow_combiner_lock); + ret = + gst_flow_combiner_update_pad_flow (stream->parent->flow_combiner, + stream->srcpad, ret); + g_mutex_unlock (&stream->parent->flow_combiner_lock); + + return ret; +} + /* this is the final event function we receive on the internal source pad when * we deal with TCP connections */ static gboolean @@ -3889,6 +3927,9 @@ new_manager_pad (GstElement * manager, GstPad * pad, GstRTSPSrc * src) GST_PAD (gst_proxy_pad_get_internal (GST_PROXY_PAD (stream->srcpad))); gst_pad_set_element_private (internal_src, stream); gst_pad_set_event_function (internal_src, gst_rtspsrc_handle_src_sink_event); + gst_pad_set_chain_function (internal_src, gst_rtspsrc_handle_src_sink_chain); + gst_pad_set_chain_list_function (internal_src, + gst_rtspsrc_handle_src_sink_chain_list); gst_pad_set_event_function (stream->srcpad, gst_rtspsrc_handle_src_event); gst_pad_set_query_function (stream->srcpad, gst_rtspsrc_handle_src_query); @@ -3902,10 +3943,14 @@ new_manager_pad (GstElement * manager, GstPad * pad, GstRTSPSrc * src) gst_object_unref (internal_src); /* don't add the srcpad if this is a sendonly stream */ - if (stream->is_backchannel) + if (stream->is_backchannel) { add_backchannel_fakesink (src, stream, stream->srcpad); - else + } else { + g_mutex_lock (&src->flow_combiner_lock); + gst_flow_combiner_add_pad (src->flow_combiner, stream->srcpad); + g_mutex_unlock (&src->flow_combiner_lock); gst_element_add_pad (GST_ELEMENT_CAST (src), stream->srcpad); + } if (all_added) { GST_DEBUG_OBJECT (src, "We added all streams"); @@ -5373,6 +5418,10 @@ gst_rtspsrc_stream_configure_transport (GstRTSPStream * stream, gst_pad_set_element_private (internal_src, stream); gst_pad_set_event_function (internal_src, gst_rtspsrc_handle_src_sink_event); + gst_pad_set_chain_function (internal_src, + gst_rtspsrc_handle_src_sink_chain); + gst_pad_set_chain_list_function (internal_src, + gst_rtspsrc_handle_src_sink_chain_list); gst_object_unref (internal_src); gst_pad_set_event_function (stream->srcpad, gst_rtspsrc_handle_src_event); @@ -5470,10 +5519,14 @@ gst_rtspsrc_activate_streams (GstRTSPSrc * src) /* add the pad */ if (!stream->added) { GST_DEBUG_OBJECT (src, "adding stream pad %p", stream); - if (stream->is_backchannel) + if (stream->is_backchannel) { add_backchannel_fakesink (src, stream, stream->srcpad); - else + } else { + g_mutex_lock (&src->flow_combiner_lock); + gst_flow_combiner_add_pad (src->flow_combiner, stream->srcpad); + g_mutex_unlock (&src->flow_combiner_lock); gst_element_add_pad (GST_ELEMENT_CAST (src), stream->srcpad); + } stream->added = TRUE; } } @@ -5556,49 +5609,6 @@ gst_rtspsrc_configure_caps (GstRTSPSrc * src, GstSegment * segment, } } -static GstFlowReturn -gst_rtspsrc_combine_flows (GstRTSPSrc * src, GstRTSPStream * stream, - GstFlowReturn ret) -{ - GList *streams; - - /* store the value */ - stream->last_ret = ret; - - /* if it's success we can return the value right away */ - if (ret == GST_FLOW_OK) - goto done; - - /* any other error that is not-linked can be returned right - * away */ - if (ret != GST_FLOW_NOT_LINKED) - goto done; - - /* only return NOT_LINKED if all other pads returned NOT_LINKED */ - for (streams = src->streams; streams; streams = g_list_next (streams)) { - GstRTSPStream *ostream = (GstRTSPStream *) streams->data; - - ret = ostream->last_ret; - /* some other return value (must be SUCCESS but we can return - * other values as well) */ - if (ret != GST_FLOW_NOT_LINKED) - goto done; - } - /* if we get here, all other pads were unlinked and we return - * NOT_LINKED then */ -done: - return ret; -} - -static void -gst_rtspsrc_reset_flows (GstRTSPSrc * src) -{ - for (GList * streams = src->streams; streams; streams = g_list_next (streams)) { - GstRTSPStream *ostream = (GstRTSPStream *) streams->data; - ostream->last_ret = GST_FLOW_OK; - } -} - static gboolean gst_rtspsrc_stream_push_event (GstRTSPSrc * src, GstRTSPStream * stream, GstEvent * event) @@ -6237,10 +6247,6 @@ gst_rtspsrc_handle_data (GstRTSPSrc * src, GstRTSPMessage * message) else ret = gst_pad_push (outpad, buf); - if (!is_rtcp) { - /* combine all stream flows for the data transport */ - ret = gst_rtspsrc_combine_flows (src, stream, ret); - } return ret; /* ERRORS */ @@ -9986,42 +9992,6 @@ gst_rtspsrc_handle_message (GstBin * bin, GstMessage * message) GST_BIN_CLASS (parent_class)->handle_message (bin, message); break; } - case GST_MESSAGE_ERROR: - { - GstObject *udpsrc; - GstRTSPStream *stream; - GstFlowReturn ret; - - udpsrc = GST_MESSAGE_SRC (message); - - GST_DEBUG_OBJECT (rtspsrc, "got error from %s", - GST_ELEMENT_NAME (udpsrc)); - - stream = find_stream (rtspsrc, udpsrc, (gpointer) find_stream_by_udpsrc); - if (!stream) - goto forward; - - /* we ignore the RTCP udpsrc */ - if (stream->udpsrc[1] == GST_ELEMENT_CAST (udpsrc)) - goto done; - - /* if we get error messages from the udp sources, that's not a problem as - * long as not all of them error out. We also don't really know what the - * problem is, the message does not give enough detail... */ - ret = gst_rtspsrc_combine_flows (rtspsrc, stream, GST_FLOW_NOT_LINKED); - GST_DEBUG_OBJECT (rtspsrc, "combined flows: %s", gst_flow_get_name (ret)); - if (ret != GST_FLOW_OK) - goto forward; - - done: - gst_message_unref (message); - break; - - forward: - /* fatal but not our message, forward */ - GST_BIN_CLASS (parent_class)->handle_message (bin, message); - break; - } default: { GST_BIN_CLASS (parent_class)->handle_message (bin, message); diff --git a/subprojects/gst-plugins-good/gst/rtsp/gstrtspsrc.h b/subprojects/gst-plugins-good/gst/rtsp/gstrtspsrc.h index 93b53c58c7..cbe1a1e693 100644 --- a/subprojects/gst-plugins-good/gst/rtsp/gstrtspsrc.h +++ b/subprojects/gst-plugins-good/gst/rtsp/gstrtspsrc.h @@ -45,6 +45,7 @@ #define __GST_RTSPSRC_H__ #include +#include G_BEGIN_DECLS @@ -227,6 +228,8 @@ struct _GstRTSPSrc { GstSDPMessage *sdp; gboolean from_sdp; GList *streams; + GMutex flow_combiner_lock; + GstFlowCombiner *flow_combiner; GstStructure *props; gboolean need_activate;