rtsp-media: wait for all GstRTSPStreamBlocking messages

Make sure rtsp-media have received a GstRTSPStreamBlocking message from
each active stream when checking if all streams are blocked.

Without this change there will be a race condition when using two or
more streams and rtsp-media receives a GstRTSPStreamBlocking message
from one of the streams. This is because rtsp-media then checks if all
streams are blocked by calling gst_rtsp_stream_is_blocking() for each
stream. This function call returns TRUE if the stream has sent a
GstRTSPStreamBlocking message, however, rtsp-media may have yet to
receive this message. This would then result in that rtsp-media
erroneously thinks it is blocking all streams which could result in
rtsp-media changing state, from PREPARING to PREPARED. In the case of a
preroll, this could result in that rtsp-media thinks that the pipeline
is prerolled even though that might not be the case.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-rtsp-server/-/merge_requests/124>
This commit is contained in:
Ludvig Rappe 2020-05-26 15:31:22 +02:00 committed by GStreamer Merge Bot
parent 0526a5c9bb
commit ae58f7d771

View file

@ -114,6 +114,7 @@ struct _GstRTSPMediaPrivate
gboolean blocked; gboolean blocked;
GstRTSPTransportMode transport_mode; GstRTSPTransportMode transport_mode;
gboolean stop_on_disconnect; gboolean stop_on_disconnect;
guint blocking_msg_received;
GstElement *element; GstElement *element;
GRecMutex state_lock; /* locking order: state lock, lock */ GRecMutex state_lock; /* locking order: state lock, lock */
@ -493,6 +494,7 @@ gst_rtsp_media_init (GstRTSPMedia * media)
priv->do_rate_control = DEFAULT_DO_RATE_CONTROL; priv->do_rate_control = DEFAULT_DO_RATE_CONTROL;
priv->dscp_qos = DEFAULT_DSCP_QOS; priv->dscp_qos = DEFAULT_DSCP_QOS;
priv->expected_async_done = FALSE; priv->expected_async_done = FALSE;
priv->blocking_msg_received = 0;
} }
static void static void
@ -2739,6 +2741,9 @@ media_streams_set_blocked (GstRTSPMedia * media, gboolean blocked)
GST_DEBUG ("media %p set blocked %d", media, blocked); GST_DEBUG ("media %p set blocked %d", media, blocked);
priv->blocked = blocked; priv->blocked = blocked;
g_ptr_array_foreach (priv->streams, (GFunc) stream_update_blocked, media); g_ptr_array_foreach (priv->streams, (GFunc) stream_update_blocked, media);
if (!blocked)
priv->blocking_msg_received = 0;
} }
static void static void
@ -2757,6 +2762,7 @@ media_unblock (GstRTSPMedia * media)
* streams that are complete */ * streams that are complete */
priv->blocked = FALSE; priv->blocked = FALSE;
g_ptr_array_foreach (priv->streams, (GFunc) stream_unblock, media); g_ptr_array_foreach (priv->streams, (GFunc) stream_unblock, media);
priv->blocking_msg_received = 0;
} }
static void static void
@ -3140,6 +3146,25 @@ set_target_state (GstRTSPMedia * media, GstState state, gboolean do_state)
return ret; return ret;
} }
static void
stream_collect_active (GstRTSPStream * stream, guint * active_streams)
{
if (gst_rtsp_stream_is_complete (stream))
(*active_streams)++;
}
static guint
nbr_active_streams (GstRTSPMedia * media)
{
guint ret = 0;
g_ptr_array_foreach (media->priv->streams, (GFunc) stream_collect_active,
&ret);
return ret;
}
/* called with state-lock */
/* called with state-lock */ /* called with state-lock */
static gboolean static gboolean
default_handle_message (GstRTSPMedia * media, GstMessage * message) default_handle_message (GstRTSPMedia * media, GstMessage * message)
@ -3247,8 +3272,11 @@ default_handle_message (GstRTSPMedia * media, GstMessage * message)
s = gst_message_get_structure (message); s = gst_message_get_structure (message);
if (gst_structure_has_name (s, "GstRTSPStreamBlocking")) { if (gst_structure_has_name (s, "GstRTSPStreamBlocking")) {
GST_DEBUG ("media received blocking message"); GST_DEBUG ("media received blocking message");
priv->blocking_msg_received++;
if (priv->blocked && media_streams_blocking (media) && if (priv->blocked && media_streams_blocking (media) &&
priv->no_more_pads_pending == 0) { priv->no_more_pads_pending == 0 &&
(priv->blocking_msg_received == nbr_active_streams (media) ||
priv->blocking_msg_received == priv->streams->len)) {
GST_DEBUG_OBJECT (GST_MESSAGE_SRC (message), "media is blocking"); GST_DEBUG_OBJECT (GST_MESSAGE_SRC (message), "media is blocking");
g_mutex_lock (&priv->lock); g_mutex_lock (&priv->lock);
collect_media_stats (media); collect_media_stats (media);
@ -3256,6 +3284,8 @@ default_handle_message (GstRTSPMedia * media, GstMessage * message)
if (priv->status == GST_RTSP_MEDIA_STATUS_PREPARING) if (priv->status == GST_RTSP_MEDIA_STATUS_PREPARING)
gst_rtsp_media_set_status (media, GST_RTSP_MEDIA_STATUS_PREPARED); gst_rtsp_media_set_status (media, GST_RTSP_MEDIA_STATUS_PREPARED);
priv->blocking_msg_received = 0;
} }
} }
break; break;