urisourcebin: Only expose pads once activation has completed

The following problem could happen:
* Thread 1 : urisourcebin gets activated from READY->PAUSED
* Thread 2 : some element causes a pad to be added to urisourcebin , which gets
linked downstream, which decides to activate upstream to pull-based.
  * That requires "activating" the pads from PUSH to NONE, and then from NONE to PULL
* Thread 1 : the base class state change handlers checks if all pads are
activated

The issue is that since going form PUSH to PULL requires going through NONE,
there is a window during which:
* Thread 1 : The pad was set to NONE (before being set to PULL)
* Thread 2 : The base class activates that pad (to PUSH)
* Thread 1 : The attempt to "activate" to PULL fails (silently or not)

This is very racy, so in order to avoid that, we make sure that we only add pads
once the transition from READY->PAUSED in the parent classes is done.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/2784>
This commit is contained in:
Edward Hervey 2022-10-27 11:47:56 +02:00 committed by GStreamer Marge Bot
parent a347846698
commit b6584defd0

View file

@ -149,6 +149,10 @@ struct _GstURISourceBin
gchar *uri; gchar *uri;
guint64 connection_speed; guint64 connection_speed;
gboolean activated; /* TRUE if the switch to PAUSED has been completed */
gboolean flushing; /* TRUE if switching from PAUSED to READY */
GCond activation_cond; /* Uses the urisourcebin lock */
gboolean is_stream; gboolean is_stream;
gboolean is_adaptive; gboolean is_adaptive;
gboolean demuxer_handles_buffering; /* If TRUE: Don't use buffering elements */ gboolean demuxer_handles_buffering; /* If TRUE: Don't use buffering elements */
@ -491,6 +495,8 @@ gst_uri_source_bin_init (GstURISourceBin * urisrc)
g_mutex_init (&urisrc->buffering_lock); g_mutex_init (&urisrc->buffering_lock);
g_mutex_init (&urisrc->buffering_post_lock); g_mutex_init (&urisrc->buffering_post_lock);
g_cond_init (&urisrc->activation_cond);
urisrc->uri = g_strdup (DEFAULT_PROP_URI); urisrc->uri = g_strdup (DEFAULT_PROP_URI);
urisrc->connection_speed = DEFAULT_CONNECTION_SPEED; urisrc->connection_speed = DEFAULT_CONNECTION_SPEED;
@ -681,8 +687,8 @@ new_demuxer_pad_added_cb (GstElement * element, GstPad * pad,
GST_DEBUG_OBJECT (element, GST_DEBUG_OBJECT (element,
"New streams-aware demuxer pad %s:%s , exposing directly", "New streams-aware demuxer pad %s:%s , exposing directly",
GST_DEBUG_PAD_NAME (pad)); GST_DEBUG_PAD_NAME (pad));
expose_output_pad (urisrc, info->output_pad);
GST_URI_SOURCE_BIN_UNLOCK (urisrc); GST_URI_SOURCE_BIN_UNLOCK (urisrc);
expose_output_pad (urisrc, info->output_pad);
} else { } else {
GST_DEBUG_OBJECT (element, "new demuxer pad, name: <%s>. " GST_DEBUG_OBJECT (element, "new demuxer pad, name: <%s>. "
"Added as pending pad with caps %" GST_PTR_FORMAT, "Added as pending pad with caps %" GST_PTR_FORMAT,
@ -1315,6 +1321,32 @@ create_output_pad (GstURISourceBin * urisrc, GstPad * pad)
return newpad; return newpad;
} }
static GstPadProbeReturn
expose_block_probe (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
{
GstURISourceBin *urisrc = (GstURISourceBin *) user_data;
gboolean expose = FALSE;
GST_DEBUG_OBJECT (pad, "blocking");
GST_URI_SOURCE_BIN_LOCK (urisrc);
while (!urisrc->activated && !urisrc->flushing) {
GST_DEBUG_OBJECT (urisrc, "activated:%d flushing:%d", urisrc->activated,
urisrc->flushing);
g_cond_wait (&urisrc->activation_cond, &urisrc->lock);
}
GST_DEBUG_OBJECT (urisrc, "activated:%d flushing:%d", urisrc->activated,
urisrc->flushing);
if (!urisrc->flushing)
expose = TRUE;
GST_URI_SOURCE_BIN_UNLOCK (urisrc);
if (expose)
gst_element_add_pad (GST_ELEMENT_CAST (urisrc), pad);
GST_DEBUG_OBJECT (pad, "Done blocking, removing probe");
return GST_PAD_PROBE_REMOVE;
}
static void static void
expose_output_pad (GstURISourceBin * urisrc, GstPad * pad) expose_output_pad (GstURISourceBin * urisrc, GstPad * pad)
{ {
@ -1328,10 +1360,20 @@ expose_output_pad (GstURISourceBin * urisrc, GstPad * pad)
gst_pad_sticky_events_foreach (target, copy_sticky_events, pad); gst_pad_sticky_events_foreach (target, copy_sticky_events, pad);
gst_object_unref (target); gst_object_unref (target);
GST_DEBUG_OBJECT (urisrc, "Exposing pad %s:%s", GST_DEBUG_PAD_NAME (pad));
gst_pad_set_active (pad, TRUE); gst_pad_set_active (pad, TRUE);
gst_element_add_pad (GST_ELEMENT_CAST (urisrc), pad); GST_URI_SOURCE_BIN_LOCK (urisrc);
if (!urisrc->activated) {
GST_DEBUG_OBJECT (urisrc, "Not fully activated, adding pad once PAUSED !");
gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM,
expose_block_probe, urisrc, NULL);
pad = NULL;
}
GST_URI_SOURCE_BIN_UNLOCK (urisrc);
if (pad) {
GST_DEBUG_OBJECT (urisrc, "Exposing pad %" GST_PTR_FORMAT, pad);
gst_element_add_pad (GST_ELEMENT_CAST (urisrc), pad);
}
} }
static void static void
@ -1984,8 +2026,8 @@ handle_new_pad (GstURISourceBin * urisrc, GstPad * srcpad, GstCaps * caps)
/* We don't need slot here, expose immediately */ /* We don't need slot here, expose immediately */
GST_URI_SOURCE_BIN_LOCK (urisrc); GST_URI_SOURCE_BIN_LOCK (urisrc);
output_pad = create_output_pad (urisrc, srcpad); output_pad = create_output_pad (urisrc, srcpad);
expose_raw_output_pad (urisrc, srcpad, output_pad);
GST_URI_SOURCE_BIN_UNLOCK (urisrc); GST_URI_SOURCE_BIN_UNLOCK (urisrc);
expose_raw_output_pad (urisrc, srcpad, output_pad);
} else { } else {
OutputSlotInfo *slot; OutputSlotInfo *slot;
GstPad *output_pad; GstPad *output_pad;
@ -2896,10 +2938,19 @@ gst_uri_source_bin_change_state (GstElement * element,
switch (transition) { switch (transition) {
case GST_STATE_CHANGE_READY_TO_PAUSED: case GST_STATE_CHANGE_READY_TO_PAUSED:
GST_URI_SOURCE_BIN_LOCK (element);
urisrc->flushing = FALSE;
urisrc->activated = FALSE;
GST_URI_SOURCE_BIN_UNLOCK (element);
GST_DEBUG ("ready to paused"); GST_DEBUG ("ready to paused");
if (!setup_source (urisrc)) if (!setup_source (urisrc))
goto source_failed; goto source_failed;
break; break;
case GST_STATE_CHANGE_PAUSED_TO_READY:
GST_URI_SOURCE_BIN_LOCK (element);
urisrc->flushing = TRUE;
g_cond_broadcast (&urisrc->activation_cond);
GST_URI_SOURCE_BIN_UNLOCK (element);
default: default:
break; break;
} }
@ -2910,6 +2961,13 @@ gst_uri_source_bin_change_state (GstElement * element,
switch (transition) { switch (transition) {
case GST_STATE_CHANGE_READY_TO_PAUSED: case GST_STATE_CHANGE_READY_TO_PAUSED:
{
GST_URI_SOURCE_BIN_LOCK (element);
GST_DEBUG_OBJECT (urisrc, "Potentially exposing pads");
urisrc->activated = TRUE;
g_cond_broadcast (&urisrc->activation_cond);
GST_URI_SOURCE_BIN_UNLOCK (element);
}
break; break;
case GST_STATE_CHANGE_PAUSED_TO_READY: case GST_STATE_CHANGE_PAUSED_TO_READY:
GST_DEBUG ("paused to ready"); GST_DEBUG ("paused to ready");