urisourcebin: Remove pending pad handling

This was needed to support the legacy handling of changing streams (add new
pads, send EOS and remove old pads).

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/2784>
This commit is contained in:
Edward Hervey 2022-10-27 14:30:30 +02:00 committed by GStreamer Marge Bot
parent b6584defd0
commit 64c81b6972
2 changed files with 7 additions and 231 deletions

View file

@ -109,15 +109,6 @@ struct _ChildSrcPadInfo
/* ADAPTIVE DEMUXER SPECIFIC */
guint blocking_probe_id;
guint event_probe_id;
/* Current caps of the adaptive demuxer source pad. Used to link new pending
* pads to a compatible (old) output slot.
*
* NOTE : This will eventually go away once only streams-aware elements are
* allowed within urisourcebin
*/
GstCaps *cur_caps;
};
/* Output Slot:
@ -177,9 +168,6 @@ struct _GstURISourceBin
guint64 ring_buffer_max_size; /* 0 means disabled */
GList *pending_pads; /* Pads we have blocked pending assignment
to an output source pad */
GList *buffering_status; /* element currently buffering messages */
gint last_buffering_pct; /* Avoid sending buffering over and over */
GMutex buffering_lock;
@ -301,7 +289,7 @@ static gboolean setup_typefind (GstURISourceBin * urisrc, GstPad * srcpad);
static void remove_demuxer (GstURISourceBin * bin);
static void expose_output_pad (GstURISourceBin * urisrc, GstPad * pad);
static OutputSlotInfo *get_output_slot (GstURISourceBin * urisrc,
gboolean do_download, gboolean is_adaptive, GstCaps * caps);
gboolean do_download, gboolean is_adaptive);
static void free_output_slot (OutputSlotInfo * slot, GstURISourceBin * urisrc);
static void free_output_slot_async (GstURISourceBin * urisrc,
OutputSlotInfo * slot);
@ -647,17 +635,12 @@ copy_sticky_events (GstPad * pad, GstEvent ** event, gpointer user_data)
return TRUE;
}
static GstPadProbeReturn
pending_pad_blocked (GstPad * pad, GstPadProbeInfo * info, gpointer user_data);
static GstPadProbeReturn
demux_pad_events (GstPad * pad, GstPadProbeInfo * info, gpointer user_data);
static void
free_child_src_pad_info (ChildSrcPadInfo * info)
{
if (info->cur_caps)
gst_caps_unref (info->cur_caps);
if (info->output_pad)
gst_object_unref (info->output_pad);
g_free (info);
@ -672,9 +655,6 @@ new_demuxer_pad_added_cb (GstElement * element, GstPad * pad,
info = g_new0 (ChildSrcPadInfo, 1);
info->src_pad = pad;
info->cur_caps = gst_pad_get_current_caps (pad);
if (info->cur_caps == NULL)
info->cur_caps = gst_pad_query_caps (pad, NULL);
g_object_set_data_full (G_OBJECT (pad), "urisourcebin.srcpadinfo",
info, (GDestroyNotify) free_child_src_pad_info);
@ -690,172 +670,13 @@ new_demuxer_pad_added_cb (GstElement * element, GstPad * pad,
GST_URI_SOURCE_BIN_UNLOCK (urisrc);
expose_output_pad (urisrc, info->output_pad);
} else {
GST_DEBUG_OBJECT (element, "new demuxer pad, name: <%s>. "
"Added as pending pad with caps %" GST_PTR_FORMAT,
GST_PAD_NAME (pad), info->cur_caps);
urisrc->pending_pads = g_list_prepend (urisrc->pending_pads, pad);
GST_URI_SOURCE_BIN_UNLOCK (urisrc);
/* Block the pad. On the first data on that pad if it hasn't
* been linked to an output slot, we'll create one */
info->blocking_probe_id =
gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM,
pending_pad_blocked, urisrc, NULL);
g_return_if_reached ();
}
info->event_probe_id =
gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM |
GST_PAD_PROBE_TYPE_EVENT_FLUSH, demux_pad_events, urisrc, NULL);
}
static GstPadProbeReturn
pending_pad_blocked (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
{
ChildSrcPadInfo *child_info;
OutputSlotInfo *slot;
GstURISourceBin *urisrc = GST_URI_SOURCE_BIN (user_data);
GstCaps *caps;
GstPad *output_pad;
if (!(child_info =
g_object_get_data (G_OBJECT (pad), "urisourcebin.srcpadinfo")))
goto done;
GST_LOG_OBJECT (urisrc, "Removing pad %" GST_PTR_FORMAT " from pending list",
pad);
GST_URI_SOURCE_BIN_LOCK (urisrc);
/* Once blocked, this pad is no longer pending, one way or another */
urisrc->pending_pads = g_list_remove (urisrc->pending_pads, pad);
/* If already linked to a slot, nothing more to do */
if (child_info->output_slot) {
GST_LOG_OBJECT (urisrc, "Pad %" GST_PTR_FORMAT " is linked to queue %"
GST_PTR_FORMAT " on slot %p", pad, child_info->output_slot->queue,
child_info->output_slot);
GST_URI_SOURCE_BIN_UNLOCK (urisrc);
goto done;
}
/* If the demuxer handles buffering, we can expose it as-is */
if (urisrc->demuxer_handles_buffering) {
g_assert (child_info->output_pad == NULL);
child_info->output_pad = gst_object_ref (create_output_pad (urisrc, pad));
GST_DEBUG_OBJECT (pad, "Demuxer handles buffering, exposing as-is");
expose_output_pad (urisrc, child_info->output_pad);
GST_URI_SOURCE_BIN_UNLOCK (urisrc);
goto done;
}
caps = gst_pad_get_current_caps (pad);
if (caps == NULL)
caps = gst_pad_query_caps (pad, NULL);
slot = get_output_slot (urisrc, FALSE, TRUE, caps);
gst_caps_unref (caps);
if (slot == NULL) {
GST_URI_SOURCE_BIN_UNLOCK (urisrc);
goto done;
}
GST_LOG_OBJECT (urisrc, "Pad %" GST_PTR_FORMAT " linked to slot %p", pad,
slot);
child_info->output_slot = slot;
slot->linked_info = child_info;
gst_pad_link (pad, slot->queue_sinkpad);
output_pad = gst_object_ref (slot->output_pad);
GST_URI_SOURCE_BIN_UNLOCK (urisrc);
expose_output_pad (urisrc, output_pad);
gst_object_unref (output_pad);
done:
return GST_PAD_PROBE_REMOVE;
}
/* Called with LOCK held */
/* Looks for a suitable pending pad to connect onto this
* finishing output slot that's about to EOS */
static gboolean
link_pending_pad_to_output (GstURISourceBin * urisrc, OutputSlotInfo * slot)
{
GList *cur;
ChildSrcPadInfo *in_info = slot->linked_info;
ChildSrcPadInfo *out_info = NULL;
gboolean res = FALSE;
GstCaps *cur_caps;
/* Look for a suitable pending pad */
cur_caps = gst_pad_get_current_caps (slot->queue_sinkpad);
GST_DEBUG_OBJECT (urisrc,
"Looking for a pending pad with caps %" GST_PTR_FORMAT, cur_caps);
for (cur = urisrc->pending_pads; cur != NULL; cur = g_list_next (cur)) {
GstPad *pending = (GstPad *) (cur->data);
ChildSrcPadInfo *cur_info = NULL;
if ((cur_info =
g_object_get_data (G_OBJECT (pending),
"urisourcebin.srcpadinfo"))) {
/* Don't re-link to the same pad in case of EOS while still pending */
if (in_info == cur_info)
continue;
if (cur_caps == NULL || gst_caps_is_equal (cur_caps, cur_info->cur_caps)) {
GST_DEBUG_OBJECT (urisrc, "Found suitable pending pad %" GST_PTR_FORMAT
" with caps %" GST_PTR_FORMAT " to link to this output slot",
cur_info->src_pad, cur_info->cur_caps);
out_info = cur_info;
break;
}
}
}
if (cur_caps)
gst_caps_unref (cur_caps);
if (out_info) {
/* Block any upstream stuff while we switch out the pad */
guint block_id = gst_pad_add_probe (slot->queue_sinkpad,
GST_PAD_PROBE_TYPE_BLOCK_UPSTREAM,
NULL, NULL, NULL);
GST_DEBUG_OBJECT (urisrc, "Linking pending pad %" GST_PTR_FORMAT
" to existing output slot %p", out_info->src_pad, slot);
if (in_info) {
gst_pad_unlink (in_info->src_pad, slot->queue_sinkpad);
in_info->output_slot = NULL;
slot->linked_info = NULL;
}
if (gst_pad_link (out_info->src_pad,
slot->queue_sinkpad) == GST_PAD_LINK_OK) {
out_info->output_slot = slot;
slot->linked_info = out_info;
BUFFERING_LOCK (urisrc);
/* A re-linked slot is no longer EOS */
slot->is_eos = FALSE;
BUFFERING_UNLOCK (urisrc);
res = TRUE;
slot->is_eos = FALSE;
urisrc->pending_pads =
g_list_remove (urisrc->pending_pads, out_info->src_pad);
} else {
GST_ERROR_OBJECT (urisrc,
"Failed to link new demuxer pad to the output slot we tried");
}
gst_pad_remove_probe (slot->queue_sinkpad, block_id);
}
return res;
}
/* Called with lock held */
static gboolean
all_slots_are_eos (GstURISourceBin * urisrc)
@ -896,14 +717,6 @@ demux_pad_events (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
GST_LOG_OBJECT (urisrc, "EOS on pad %" GST_PTR_FORMAT, pad);
if ((urisrc->pending_pads &&
link_pending_pad_to_output (urisrc, child_info->output_slot))) {
/* Found a new source pad to give this slot data - no need to send EOS */
GST_URI_SOURCE_BIN_UNLOCK (urisrc);
ret = GST_PAD_PROBE_DROP;
goto done;
}
BUFFERING_LOCK (urisrc);
/* Mark that we fed an EOS to this slot */
child_info->output_slot->is_eos = TRUE;
@ -928,13 +741,6 @@ demux_pad_events (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
}
}
break;
case GST_EVENT_CAPS:
{
GstCaps *caps;
gst_event_parse_caps (ev, &caps);
gst_caps_replace (&child_info->cur_caps, caps);
}
break;
case GST_EVENT_STREAM_START:
case GST_EVENT_FLUSH_STOP:
BUFFERING_LOCK (urisrc);
@ -1101,34 +907,13 @@ on_queue_bitrate_changed (GstElement * queue, GParamSpec * pspec,
/* Called with lock held */
static OutputSlotInfo *
get_output_slot (GstURISourceBin * urisrc, gboolean do_download,
gboolean is_adaptive, GstCaps * caps)
gboolean is_adaptive)
{
OutputSlotInfo *slot;
GstPad *srcpad;
GstElement *queue;
const gchar *elem_name;
/* If we have caps, iterate the existing slots and look for an
* unlinked one that can be used */
if (caps && gst_caps_is_fixed (caps)) {
GSList *cur;
GstCaps *cur_caps;
for (cur = urisrc->out_slots; cur != NULL; cur = g_slist_next (cur)) {
slot = (OutputSlotInfo *) (cur->data);
if (slot->linked_info == NULL) {
cur_caps = gst_pad_get_current_caps (slot->queue_sinkpad);
if (cur_caps == NULL || gst_caps_is_equal (caps, cur_caps)) {
GST_LOG_OBJECT (urisrc, "Found existing slot %p to link to", slot);
gst_caps_unref (cur_caps);
slot->is_eos = FALSE;
return slot;
}
gst_caps_unref (cur_caps);
}
}
}
/* Otherwise create the new slot */
if (do_download)
elem_name = "downloadbuffer";
@ -1421,8 +1206,6 @@ pad_removed_cb (GstElement * element, GstPad * pad, GstURISourceBin * urisrc)
goto no_info;
GST_URI_SOURCE_BIN_LOCK (urisrc);
/* Make sure this isn't in the pending pads list */
urisrc->pending_pads = g_list_remove (urisrc->pending_pads, pad);
/* Send EOS to the output slot if the demuxer didn't already */
if (info->output_slot) {
@ -1432,13 +1215,6 @@ pad_removed_cb (GstElement * element, GstPad * pad, GstURISourceBin * urisrc)
slot = info->output_slot;
if (!slot->is_eos && urisrc->pending_pads &&
link_pending_pad_to_output (urisrc, slot)) {
/* Found a new source pad to give this slot data - no need to send EOS */
GST_URI_SOURCE_BIN_UNLOCK (urisrc);
return;
}
BUFFERING_LOCK (urisrc);
/* Unlink this pad from its output slot and send a fake EOS event
* to drain the queue */
@ -1800,7 +1576,7 @@ analyse_source_and_expose_raw_pads (GstURISourceBin * urisrc,
nb_raw++;
GST_URI_SOURCE_BIN_LOCK (urisrc);
if (use_queue) {
OutputSlotInfo *slot = get_output_slot (urisrc, FALSE, FALSE, NULL);
OutputSlotInfo *slot = get_output_slot (urisrc, FALSE, FALSE);
if (!slot) {
gst_caps_unref (padcaps);
gst_object_unref (pad);
@ -2047,7 +1823,7 @@ handle_new_pad (GstURISourceBin * urisrc, GstPad * srcpad, GstCaps * caps)
do_download);
GST_URI_SOURCE_BIN_LOCK (urisrc);
slot = get_output_slot (urisrc, do_download, FALSE, NULL);
slot = get_output_slot (urisrc, do_download, FALSE);
if (slot == NULL
|| gst_pad_link (srcpad, slot->queue_sinkpad) != GST_PAD_LINK_OK)

View file

@ -4969,11 +4969,11 @@ flushing:
}
wrong_mode:
{
g_critical ("getrange on pad %s:%s but it was not activated in pull mode",
GST_DEBUG_PAD_NAME (pad));
pad->ABI.abi.last_flowret = GST_FLOW_ERROR;
GST_OBJECT_UNLOCK (pad);
GST_PAD_STREAM_UNLOCK (pad);
g_critical ("getrange on pad %s:%s but it was not activated in pull mode",
GST_DEBUG_PAD_NAME (pad));
return GST_FLOW_ERROR;
}
events_error: