urisourcebin: Fix buffering message aggregation.

Add locking, and handle EOS properly now that urisourcebin
uses custom events in place of real EOS events, so we
need to manually remove buffering messages and potentially
post 100% in that situation
This commit is contained in:
Jan Schmidt 2017-03-18 00:54:55 +11:00
parent 8cb57a4fe8
commit 94da76d301

View file

@ -73,6 +73,23 @@ typedef struct _OutputSlotInfo OutputSlotInfo;
#define GST_URI_SOURCE_BIN_LOCK(dec) (g_mutex_lock(&((GstURISourceBin*)(dec))->lock))
#define GST_URI_SOURCE_BIN_UNLOCK(dec) (g_mutex_unlock(&((GstURISourceBin*)(dec))->lock))
#define BUFFERING_LOCK(ubin) G_STMT_START { \
GST_LOG_OBJECT (ubin, \
"buffering locking from thread %p", \
g_thread_self ()); \
g_mutex_lock (&GST_URI_SOURCE_BIN_CAST(ubin)->buffering_lock); \
GST_LOG_OBJECT (ubin, \
"buffering lock from thread %p", \
g_thread_self ()); \
} G_STMT_END
#define BUFFERING_UNLOCK(ubin) G_STMT_START { \
GST_LOG_OBJECT (ubin, \
"buffering unlocking from thread %p", \
g_thread_self ()); \
g_mutex_unlock (&GST_URI_SOURCE_BIN_CAST(ubin)->buffering_lock); \
} G_STMT_END
/* Track a source pad from a child that
* is linked or needs linking to an output
* slot */
@ -144,6 +161,8 @@ struct _GstURISourceBin
GList *buffering_status; /* element currently buffering messages */
gint last_buffering_pct; /* Avoid sending buffering over and over */
GMutex buffering_lock;
GMutex buffering_post_lock;
};
struct _GstURISourceBinClass
@ -725,6 +744,9 @@ gst_uri_source_bin_init (GstURISourceBin * urisrc)
g_mutex_init (&urisrc->lock);
g_mutex_init (&urisrc->buffering_lock);
g_mutex_init (&urisrc->buffering_post_lock);
urisrc->uri = g_strdup (DEFAULT_PROP_URI);
urisrc->connection_speed = DEFAULT_CONNECTION_SPEED;
@ -1002,6 +1024,9 @@ link_pending_pad_to_output (GstURISourceBin * urisrc, OutputSlotInfo * slot)
/* Look for a suitable pending pad */
cur_caps = gst_pad_get_current_caps (slot->sinkpad);
GST_DEBUG_OBJECT (urisrc,
"Looking for a pending pad with caps %" GST_PTR_FORMAT, cur_caps);
for (cur = urisrc->pending_pads; cur != NULL; cur = g_list_next (cur)) {
GstPad *pending = (GstPad *) (cur->data);
ChildSrcPadInfo *cur_info = NULL;
@ -1029,8 +1054,8 @@ link_pending_pad_to_output (GstURISourceBin * urisrc, OutputSlotInfo * slot)
guint block_id =
gst_pad_add_probe (slot->sinkpad, GST_PAD_PROBE_TYPE_BLOCK_UPSTREAM,
NULL, NULL, NULL);
GST_DEBUG_OBJECT (urisrc, "Linking pending pad to existing output slot %p",
slot);
GST_DEBUG_OBJECT (urisrc, "Linking pending pad %" GST_PTR_FORMAT
" to existing output slot %p", out_info->demux_src_pad, slot);
if (in_info) {
gst_pad_unlink (in_info->demux_src_pad, slot->sinkpad);
@ -1042,6 +1067,11 @@ link_pending_pad_to_output (GstURISourceBin * urisrc, OutputSlotInfo * slot)
slot->sinkpad) == GST_PAD_LINK_OK) {
out_info->output_slot = slot;
slot->linked_info = out_info;
BUFFERING_LOCK (urisrc);
/* A re-linked slot is no longer EOS */
slot->is_eos = FALSE;
BUFFERING_UNLOCK (urisrc);
res = TRUE;
urisrc->pending_pads =
g_list_remove (urisrc->pending_pads, out_info->demux_src_pad);
@ -1093,8 +1123,14 @@ demux_pad_events (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
goto done;
}
BUFFERING_LOCK (urisrc);
/* Mark that we fed an EOS to this slot */
child_info->output_slot->is_eos = TRUE;
BUFFERING_UNLOCK (urisrc);
/* EOS means this element is no longer buffering */
remove_buffering_msgs (urisrc,
GST_OBJECT_CAST (child_info->output_slot->queue));
/* Actually feed a custom EOS event to avoid marking pads as EOSed */
s = gst_structure_new_empty ("urisourcebin-custom-eos");
@ -1112,7 +1148,9 @@ demux_pad_events (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
break;
case GST_EVENT_STREAM_START:
case GST_EVENT_FLUSH_STOP:
BUFFERING_LOCK (urisrc);
child_info->output_slot->is_eos = FALSE;
BUFFERING_UNLOCK (urisrc);
break;
default:
break;
@ -1370,17 +1408,23 @@ pad_removed_cb (GstElement * element, GstPad * pad, GstURISourceBin * urisrc)
GstEvent *event;
OutputSlotInfo *slot;
if (!info->output_slot->is_eos && urisrc->pending_pads &&
link_pending_pad_to_output (urisrc, info->output_slot)) {
slot = info->output_slot;
if (!slot->is_eos && urisrc->pending_pads &&
link_pending_pad_to_output (urisrc, slot)) {
/* Found a new source pad to give this slot data - no need to send EOS */
GST_URI_SOURCE_BIN_UNLOCK (urisrc);
return;
}
/* Unlink this pad from its output slot and send a fake EOS event to drain the
* queue */
slot = info->output_slot;
BUFFERING_LOCK (urisrc);
/* Unlink this pad from its output slot and send a fake EOS event
* to drain the queue */
slot->is_eos = TRUE;
BUFFERING_UNLOCK (urisrc);
remove_buffering_msgs (urisrc, GST_OBJECT_CAST (slot->queue));
slot->linked_info = NULL;
info->output_slot = NULL;
@ -2378,7 +2422,7 @@ handle_redirect_message (GstURISourceBin * dec, GstMessage * msg)
return new_msg;
}
static GstMessage *
static void
handle_buffering_message (GstURISourceBin * urisrc, GstMessage * msg)
{
gint perc, msg_perc;
@ -2386,6 +2430,7 @@ handle_buffering_message (GstURISourceBin * urisrc, GstMessage * msg)
GstMessage *smaller = NULL;
GList *found = NULL;
GList *iter;
OutputSlotInfo *slot;
/* buffering messages must be aggregated as there might be multiple
* multiqueue in the pipeline and their independent buffering messages
@ -2405,7 +2450,22 @@ handle_buffering_message (GstURISourceBin * urisrc, GstMessage * msg)
GST_LOG_OBJECT (urisrc, "Got buffering msg from %" GST_PTR_FORMAT
" with %d%%", GST_MESSAGE_SRC (msg), msg_perc);
GST_OBJECT_LOCK (urisrc);
slot = g_object_get_data (G_OBJECT (GST_MESSAGE_SRC (msg)),
"urisourcebin.slotinfo");
BUFFERING_LOCK (urisrc);
if (slot && slot->is_eos) {
/* Ignore buffering messages from queues we marked as EOS,
* we already removed those from the list of buffering
* objects */
BUFFERING_UNLOCK (urisrc);
gst_message_replace (&msg, NULL);
return;
}
g_mutex_lock (&urisrc->buffering_post_lock);
/*
* Single loop for 2 things:
* 1) Look for a message with the same source
@ -2414,11 +2474,10 @@ handle_buffering_message (GstURISourceBin * urisrc, GstMessage * msg)
*/
for (iter = urisrc->buffering_status; iter;) {
GstMessage *bufstats = iter->data;
OutputSlotInfo *slot =
g_object_get_data (G_OBJECT (GST_MESSAGE_SRC (bufstats)),
"urisourcebin.slotinfo");
gboolean is_eos = FALSE;
slot = g_object_get_data (G_OBJECT (GST_MESSAGE_SRC (bufstats)),
"urisourcebin.slotinfo");
if (slot)
is_eos = slot->is_eos;
@ -2477,16 +2536,17 @@ handle_buffering_message (GstURISourceBin * urisrc, GstMessage * msg)
gst_message_replace (&msg, smaller);
}
}
GST_OBJECT_UNLOCK (urisrc);
BUFFERING_UNLOCK (urisrc);
if (msg) {
GST_LOG_OBJECT (urisrc, "Sending buffering msg from %" GST_PTR_FORMAT
" with %d%%", GST_MESSAGE_SRC (msg), smaller_perc);
GST_BIN_CLASS (parent_class)->handle_message (GST_BIN (urisrc), msg);
} else {
GST_LOG_OBJECT (urisrc, "Dropped buffering msg as a repeat of %d%%",
smaller_perc);
}
return msg;
g_mutex_unlock (&urisrc->buffering_post_lock);
}
/* Remove any buffering message from the given source */
@ -2494,8 +2554,14 @@ static void
remove_buffering_msgs (GstURISourceBin * urisrc, GstObject * src)
{
GList *iter;
gboolean removed = FALSE, post;
BUFFERING_LOCK (urisrc);
g_mutex_lock (&urisrc->buffering_post_lock);
GST_DEBUG_OBJECT (urisrc, "Removing %" GST_PTR_FORMAT
" buffering messages", src);
GST_OBJECT_LOCK (urisrc);
for (iter = urisrc->buffering_status; iter;) {
GstMessage *bufstats = iter->data;
if (GST_MESSAGE_SRC (bufstats) == src) {
@ -2506,7 +2572,19 @@ remove_buffering_msgs (GstURISourceBin * urisrc, GstObject * src)
}
iter = g_list_next (iter);
}
GST_OBJECT_UNLOCK (urisrc);
post = (removed && urisrc->buffering_status == NULL);
BUFFERING_UNLOCK (urisrc);
if (post) {
GST_DEBUG_OBJECT (urisrc, "Last buffering element done - posting 100%%");
/* removed the last buffering element, post 100% */
gst_element_post_message (GST_ELEMENT_CAST (urisrc),
gst_message_new_buffering (GST_OBJECT_CAST (urisrc), 100));
}
g_mutex_unlock (&urisrc->buffering_post_lock);
}
static void
@ -2526,7 +2604,8 @@ handle_message (GstBin * bin, GstMessage * msg)
break;
}
case GST_MESSAGE_BUFFERING:
msg = handle_buffering_message (urisrc, msg);
handle_buffering_message (urisrc, msg);
msg = NULL;
break;
default:
break;