multiqueue: Hold weak references to pads/multiqueue in SingleQueue

Without holding a ref we have no guarantees that the SingleQueue
doesn't have dangling pointers on those objects during its destruction.
This commit is contained in:
Thibault Saunier 2019-06-27 15:51:47 -04:00
parent 5f89225bc2
commit 27fbaf9d44

View file

@ -118,10 +118,9 @@ struct _GstSingleQueue
guint groupid; guint groupid;
GstClockTimeDiff group_high_time; GstClockTimeDiff group_high_time;
GstMultiQueue *mqueue; GWeakRef mqueue;
GWeakRef sinkpad;
GstPad *sinkpad; GWeakRef srcpad;
GstPad *srcpad;
/* flowreturn of previous srcpad push */ /* flowreturn of previous srcpad push */
GstFlowReturn srcresult; GstFlowReturn srcresult;
@ -343,10 +342,18 @@ gst_multiqueue_pad_get_property (GObject * object, guint prop_id,
switch (prop_id) { switch (prop_id) {
case PROP_PAD_GROUP_ID: case PROP_PAD_GROUP_ID:
GST_OBJECT_LOCK (pad->sq->mqueue); if (pad->sq) {
if (pad->sq) GstMultiQueue *mq = g_weak_ref_get (&pad->sq->mqueue);
if (mq)
GST_OBJECT_LOCK (mq);
g_value_set_uint (value, pad->sq->groupid); g_value_set_uint (value, pad->sq->groupid);
GST_OBJECT_UNLOCK (pad->sq->mqueue); if (mq) {
GST_OBJECT_UNLOCK (mq);
gst_object_unref (mq);
}
}
break; break;
default: default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
@ -362,10 +369,19 @@ gst_multiqueue_pad_set_property (GObject * object, guint prop_id,
switch (prop_id) { switch (prop_id) {
case PROP_PAD_GROUP_ID: case PROP_PAD_GROUP_ID:
GST_OBJECT_LOCK (pad->sq->mqueue); if (pad->sq) {
if (pad->sq) GstMultiQueue *mqueue = g_weak_ref_get (&pad->sq->mqueue);
if (mqueue)
GST_OBJECT_LOCK (mqueue);
pad->sq->groupid = g_value_get_uint (value); pad->sq->groupid = g_value_get_uint (value);
GST_OBJECT_UNLOCK (pad->sq->mqueue);
if (mqueue) {
GST_OBJECT_UNLOCK (mqueue);
gst_object_unref (mqueue);
}
}
break; break;
default: default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
@ -900,7 +916,7 @@ static GstIterator *
gst_multi_queue_iterate_internal_links (GstPad * pad, GstObject * parent) gst_multi_queue_iterate_internal_links (GstPad * pad, GstObject * parent)
{ {
GstIterator *it = NULL; GstIterator *it = NULL;
GstPad *opad; GstPad *opad, *sinkpad, *srcpad;
GstSingleQueue *squeue; GstSingleQueue *squeue;
GstMultiQueue *mq = GST_MULTI_QUEUE (parent); GstMultiQueue *mq = GST_MULTI_QUEUE (parent);
GValue val = { 0, }; GValue val = { 0, };
@ -910,12 +926,21 @@ gst_multi_queue_iterate_internal_links (GstPad * pad, GstObject * parent)
if (!squeue) if (!squeue)
goto out; goto out;
if (squeue->sinkpad == pad) srcpad = g_weak_ref_get (&squeue->srcpad);
opad = gst_object_ref (squeue->srcpad); sinkpad = g_weak_ref_get (&squeue->sinkpad);
else if (squeue->srcpad == pad) if (sinkpad == pad) {
opad = gst_object_ref (squeue->sinkpad); opad = srcpad;
else gst_clear_object (&sinkpad);
} else if (srcpad == pad) {
opad = sinkpad;
gst_clear_object (&srcpad);
} else {
gst_clear_object (&srcpad);
gst_clear_object (&sinkpad);
goto out; goto out;
}
g_value_init (&val, GST_TYPE_PAD); g_value_init (&val, GST_TYPE_PAD);
g_value_set_object (&val, opad); g_value_set_object (&val, opad);
@ -952,7 +977,10 @@ gst_multi_queue_request_new_pad (GstElement * element, GstPadTemplate * temp,
/* Create a new single queue, add the sink and source pad and return the sink pad */ /* Create a new single queue, add the sink and source pad and return the sink pad */
squeue = gst_single_queue_new (mqueue, temp_id); squeue = gst_single_queue_new (mqueue, temp_id);
new_pad = squeue ? squeue->sinkpad : NULL; new_pad = squeue ? g_weak_ref_get (&squeue->sinkpad) : NULL;
/* request pad assumes the element is owning the ref of the pad it returns */
if (new_pad)
gst_object_unref (new_pad);
GST_DEBUG_OBJECT (mqueue, "Returning pad %" GST_PTR_FORMAT, new_pad); GST_DEBUG_OBJECT (mqueue, "Returning pad %" GST_PTR_FORMAT, new_pad);
@ -962,6 +990,7 @@ gst_multi_queue_request_new_pad (GstElement * element, GstPadTemplate * temp,
static void static void
gst_multi_queue_release_pad (GstElement * element, GstPad * pad) gst_multi_queue_release_pad (GstElement * element, GstPad * pad)
{ {
GstPad *sinkpad = NULL, *srcpad = NULL;
GstMultiQueue *mqueue = GST_MULTI_QUEUE (element); GstMultiQueue *mqueue = GST_MULTI_QUEUE (element);
GstSingleQueue *sq = NULL; GstSingleQueue *sq = NULL;
GList *tmp; GList *tmp;
@ -972,12 +1001,19 @@ gst_multi_queue_release_pad (GstElement * element, GstPad * pad)
/* Find which single queue it belongs to, knowing that it should be a sinkpad */ /* Find which single queue it belongs to, knowing that it should be a sinkpad */
for (tmp = mqueue->queues; tmp; tmp = g_list_next (tmp)) { for (tmp = mqueue->queues; tmp; tmp = g_list_next (tmp)) {
sq = (GstSingleQueue *) tmp->data; sq = (GstSingleQueue *) tmp->data;
sinkpad = g_weak_ref_get (&sq->sinkpad);
if (sq->sinkpad == pad) if (sinkpad == pad) {
srcpad = g_weak_ref_get (&sq->srcpad);
break; break;
}
gst_object_unref (sinkpad);
} }
if (!tmp) { if (!tmp) {
gst_clear_object (&sinkpad);
gst_clear_object (&srcpad);
GST_WARNING_OBJECT (mqueue, "That pad doesn't belong to this element ???"); GST_WARNING_OBJECT (mqueue, "That pad doesn't belong to this element ???");
GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue); GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
return; return;
@ -996,10 +1032,12 @@ gst_multi_queue_release_pad (GstElement * element, GstPad * pad)
/* delete SingleQueue */ /* delete SingleQueue */
gst_data_queue_set_flushing (sq->queue, TRUE); gst_data_queue_set_flushing (sq->queue, TRUE);
gst_pad_set_active (sq->srcpad, FALSE); gst_pad_set_active (srcpad, FALSE);
gst_pad_set_active (sq->sinkpad, FALSE); gst_pad_set_active (sinkpad, FALSE);
gst_element_remove_pad (element, sq->srcpad); gst_element_remove_pad (element, srcpad);
gst_element_remove_pad (element, sq->sinkpad); gst_element_remove_pad (element, sinkpad);
gst_object_unref (srcpad);
gst_object_unref (sinkpad);
} }
static GstStateChangeReturn static GstStateChangeReturn
@ -1061,18 +1099,32 @@ gst_multi_queue_change_state (GstElement * element, GstStateChange transition)
static gboolean static gboolean
gst_single_queue_start (GstMultiQueue * mq, GstSingleQueue * sq) gst_single_queue_start (GstMultiQueue * mq, GstSingleQueue * sq)
{ {
gboolean res = FALSE;
GstPad *srcpad = g_weak_ref_get (&sq->srcpad);
GST_LOG_OBJECT (mq, "SingleQueue %d : starting task", sq->id); GST_LOG_OBJECT (mq, "SingleQueue %d : starting task", sq->id);
return gst_pad_start_task (sq->srcpad,
(GstTaskFunction) gst_multi_queue_loop, sq->srcpad, NULL); if (srcpad) {
res = gst_pad_start_task (srcpad,
(GstTaskFunction) gst_multi_queue_loop, srcpad, NULL);
gst_object_unref (srcpad);
}
return res;
} }
static gboolean static gboolean
gst_single_queue_pause (GstMultiQueue * mq, GstSingleQueue * sq) gst_single_queue_pause (GstMultiQueue * mq, GstSingleQueue * sq)
{ {
gboolean result; gboolean result = FALSE;
GstPad *srcpad = g_weak_ref_get (&sq->srcpad);
GST_LOG_OBJECT (mq, "SingleQueue %d : pausing task", sq->id); GST_LOG_OBJECT (mq, "SingleQueue %d : pausing task", sq->id);
result = gst_pad_pause_task (sq->srcpad); if (srcpad) {
result = gst_pad_pause_task (srcpad);
gst_object_unref (srcpad);
}
sq->sink_tainted = sq->src_tainted = TRUE; sq->sink_tainted = sq->src_tainted = TRUE;
return result; return result;
} }
@ -1080,10 +1132,14 @@ gst_single_queue_pause (GstMultiQueue * mq, GstSingleQueue * sq)
static gboolean static gboolean
gst_single_queue_stop (GstMultiQueue * mq, GstSingleQueue * sq) gst_single_queue_stop (GstMultiQueue * mq, GstSingleQueue * sq)
{ {
gboolean result; gboolean result = FALSE;
GstPad *srcpad = g_weak_ref_get (&sq->srcpad);
GST_LOG_OBJECT (mq, "SingleQueue %d : stopping task", sq->id); GST_LOG_OBJECT (mq, "SingleQueue %d : stopping task", sq->id);
result = gst_pad_stop_task (sq->srcpad); if (srcpad) {
result = gst_pad_stop_task (srcpad);
gst_object_unref (srcpad);
}
sq->sink_tainted = sq->src_tainted = TRUE; sq->sink_tainted = sq->src_tainted = TRUE;
return result; return result;
} }
@ -1145,14 +1201,14 @@ gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush,
/* WITH LOCK TAKEN */ /* WITH LOCK TAKEN */
static gint static gint
get_buffering_level (GstSingleQueue * sq) get_buffering_level (GstMultiQueue * mq, GstSingleQueue * sq)
{ {
GstDataQueueSize size; GstDataQueueSize size;
gint buffering_level, tmp; gint buffering_level, tmp;
gst_data_queue_get_level (sq->queue, &size); gst_data_queue_get_level (sq->queue, &size);
GST_DEBUG_OBJECT (sq->mqueue, GST_DEBUG_OBJECT (mq,
"queue %d: visible %u/%u, bytes %u/%u, time %" G_GUINT64_FORMAT "/%" "queue %d: visible %u/%u, bytes %u/%u, time %" G_GUINT64_FORMAT "/%"
G_GUINT64_FORMAT, sq->id, size.visible, sq->max_size.visible, G_GUINT64_FORMAT, sq->id, size.visible, sq->max_size.visible,
size.bytes, sq->max_size.bytes, sq->cur_time, sq->max_size.time); size.bytes, sq->max_size.bytes, sq->cur_time, sq->max_size.time);
@ -1190,7 +1246,7 @@ update_buffering (GstMultiQueue * mq, GstSingleQueue * sq)
if (!mq->use_buffering) if (!mq->use_buffering)
return; return;
buffering_level = get_buffering_level (sq); buffering_level = get_buffering_level (mq, sq);
/* scale so that if buffering_level equals the high watermark, /* scale so that if buffering_level equals the high watermark,
* the percentage is 100% */ * the percentage is 100% */
@ -1214,7 +1270,7 @@ update_buffering (GstMultiQueue * mq, GstSingleQueue * sq)
for (iter = mq->queues; iter; iter = g_list_next (iter)) { for (iter = mq->queues; iter; iter = g_list_next (iter)) {
GstSingleQueue *oq = (GstSingleQueue *) iter->data; GstSingleQueue *oq = (GstSingleQueue *) iter->data;
if (get_buffering_level (oq) >= mq->high_watermark) { if (get_buffering_level (mq, oq) >= mq->high_watermark) {
is_buffering = FALSE; is_buffering = FALSE;
break; break;
@ -1624,6 +1680,13 @@ gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq,
GstMiniObject * object, gboolean * allow_drop) GstMiniObject * object, gboolean * allow_drop)
{ {
GstFlowReturn result = sq->srcresult; GstFlowReturn result = sq->srcresult;
GstPad *srcpad = g_weak_ref_get (&sq->srcpad);
if (!srcpad) {
GST_INFO_OBJECT (mq,
"Pushing while corresponding sourcepad has been cleared");
return GST_FLOW_FLUSHING;
}
if (GST_IS_BUFFER (object)) { if (GST_IS_BUFFER (object)) {
GstBuffer *buffer; GstBuffer *buffer;
@ -1647,7 +1710,7 @@ gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq,
GST_DEBUG_OBJECT (mq, GST_DEBUG_OBJECT (mq,
"SingleQueue %d : Pushing buffer %p with ts %" GST_TIME_FORMAT, "SingleQueue %d : Pushing buffer %p with ts %" GST_TIME_FORMAT,
sq->id, buffer, GST_TIME_ARGS (timestamp)); sq->id, buffer, GST_TIME_ARGS (timestamp));
result = gst_pad_push (sq->srcpad, buffer); result = gst_pad_push (srcpad, buffer);
} }
} else if (GST_IS_EVENT (object)) { } else if (GST_IS_EVENT (object)) {
GstEvent *event; GstEvent *event;
@ -1696,7 +1759,7 @@ gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq,
"SingleQueue %d : Pushing event %p of type %s", "SingleQueue %d : Pushing event %p of type %s",
sq->id, event, GST_EVENT_TYPE_NAME (event)); sq->id, event, GST_EVENT_TYPE_NAME (event));
gst_pad_push_event (sq->srcpad, event); gst_pad_push_event (srcpad, event);
} }
} else if (GST_IS_QUERY (object)) { } else if (GST_IS_QUERY (object)) {
GstQuery *query; GstQuery *query;
@ -1710,7 +1773,7 @@ gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq,
gst_query_unref (query); gst_query_unref (query);
res = FALSE; res = FALSE;
} else { } else {
res = gst_pad_peer_query (sq->srcpad, query); res = gst_pad_peer_query (srcpad, query);
} }
GST_MULTI_QUEUE_MUTEX_LOCK (mq); GST_MULTI_QUEUE_MUTEX_LOCK (mq);
@ -1722,6 +1785,8 @@ gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq,
g_warning ("Unexpected object in singlequeue %u (refcounting problem?)", g_warning ("Unexpected object in singlequeue %u (refcounting problem?)",
sq->id); sq->id);
} }
gst_object_unref (srcpad);
return result; return result;
/* ERRORS */ /* ERRORS */
@ -1801,9 +1866,14 @@ gst_multi_queue_loop (GstPad * pad)
gboolean is_buffer; gboolean is_buffer;
gboolean do_update_buffering = FALSE; gboolean do_update_buffering = FALSE;
gboolean dropping = FALSE; gboolean dropping = FALSE;
GstPad *srcpad = NULL;
sq = GST_MULTIQUEUE_PAD (pad)->sq; sq = GST_MULTIQUEUE_PAD (pad)->sq;
mq = sq->mqueue; mq = g_weak_ref_get (&sq->mqueue);
srcpad = g_weak_ref_get (&sq->srcpad);
if (!mq || !srcpad)
goto out_flushing;
next: next:
GST_DEBUG_OBJECT (mq, "SingleQueue %d : trying to pop an object", sq->id); GST_DEBUG_OBJECT (mq, "SingleQueue %d : trying to pop an object", sq->id);
@ -2037,11 +2107,11 @@ next:
gst_multi_queue_post_buffering (mq); gst_multi_queue_post_buffering (mq);
GST_LOG_OBJECT (mq, "sq:%d AFTER PUSHING sq->srcresult: %s (is_eos:%d)", GST_LOG_OBJECT (mq, "sq:%d AFTER PUSHING sq->srcresult: %s (is_eos:%d)",
sq->id, gst_flow_get_name (sq->srcresult), GST_PAD_IS_EOS (sq->srcpad)); sq->id, gst_flow_get_name (sq->srcresult), GST_PAD_IS_EOS (srcpad));
/* Need to make sure wake up any sleeping pads when we exit */ /* Need to make sure wake up any sleeping pads when we exit */
GST_MULTI_QUEUE_MUTEX_LOCK (mq); GST_MULTI_QUEUE_MUTEX_LOCK (mq);
if (mq->numwaiting > 0 && (GST_PAD_IS_EOS (sq->srcpad) if (mq->numwaiting > 0 && (GST_PAD_IS_EOS (srcpad)
|| sq->srcresult == GST_FLOW_EOS)) { || sq->srcresult == GST_FLOW_EOS)) {
compute_high_time (mq, sq->groupid); compute_high_time (mq, sq->groupid);
compute_high_id (mq); compute_high_id (mq);
@ -2056,6 +2126,10 @@ next:
&& result != GST_FLOW_EOS) && result != GST_FLOW_EOS)
goto out_flushing; goto out_flushing;
done:
gst_clear_object (&mq);
gst_clear_object (&srcpad);
return; return;
out_flushing: out_flushing:
@ -2086,11 +2160,11 @@ out_flushing:
gst_single_queue_flush_queue (sq, FALSE); gst_single_queue_flush_queue (sq, FALSE);
single_queue_underrun_cb (sq->queue, sq); single_queue_underrun_cb (sq->queue, sq);
gst_data_queue_set_flushing (sq->queue, TRUE); gst_data_queue_set_flushing (sq->queue, TRUE);
gst_pad_pause_task (sq->srcpad); gst_pad_pause_task (srcpad);
GST_CAT_LOG_OBJECT (multi_queue_debug, mq, GST_CAT_LOG_OBJECT (multi_queue_debug, mq,
"SingleQueue[%d] task paused, reason:%s", "SingleQueue[%d] task paused, reason:%s",
sq->id, gst_flow_get_name (sq->srcresult)); sq->id, gst_flow_get_name (sq->srcresult));
return; goto done;
} }
} }
@ -2106,12 +2180,15 @@ gst_multi_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
{ {
GstSingleQueue *sq; GstSingleQueue *sq;
GstMultiQueue *mq; GstMultiQueue *mq;
GstMultiQueueItem *item; GstMultiQueueItem *item = NULL;
guint32 curid; guint32 curid;
GstClockTime timestamp, duration; GstClockTime timestamp, duration;
sq = GST_MULTIQUEUE_PAD (pad)->sq; sq = GST_MULTIQUEUE_PAD (pad)->sq;
mq = sq->mqueue; mq = g_weak_ref_get (&sq->mqueue);
if (!mq)
goto flushing;
/* if eos, we are always full, so avoid hanging incoming indefinitely */ /* if eos, we are always full, so avoid hanging incoming indefinitely */
if (sq->is_eos) if (sq->is_eos)
@ -2164,6 +2241,7 @@ gst_multi_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
apply_buffer (mq, sq, timestamp, duration, &sq->sink_segment); apply_buffer (mq, sq, timestamp, duration, &sq->sink_segment);
done: done:
gst_clear_object (&mq);
return sq->srcresult; return sq->srcresult;
/* ERRORS */ /* ERRORS */
@ -2171,13 +2249,15 @@ flushing:
{ {
GST_LOG_OBJECT (mq, "SingleQueue %d : exit because task paused, reason: %s", GST_LOG_OBJECT (mq, "SingleQueue %d : exit because task paused, reason: %s",
sq->id, gst_flow_get_name (sq->srcresult)); sq->id, gst_flow_get_name (sq->srcresult));
gst_multi_queue_item_destroy (item); if (item)
gst_multi_queue_item_destroy (item);
goto done; goto done;
} }
was_eos: was_eos:
{ {
GST_DEBUG_OBJECT (mq, "we are EOS, dropping buffer, return EOS"); GST_DEBUG_OBJECT (mq, "we are EOS, dropping buffer, return EOS");
gst_buffer_unref (buffer); gst_buffer_unref (buffer);
gst_object_unref (mq);
return GST_FLOW_EOS; return GST_FLOW_EOS;
} }
} }
@ -2250,9 +2330,19 @@ gst_multi_queue_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
GstFlowReturn flowret = GST_FLOW_OK; GstFlowReturn flowret = GST_FLOW_OK;
GstEventType type; GstEventType type;
GstEvent *sref = NULL; GstEvent *sref = NULL;
GstPad *srcpad;
sq = GST_MULTIQUEUE_PAD (pad)->sq; sq = GST_MULTIQUEUE_PAD (pad)->sq;
mq = (GstMultiQueue *) parent; mq = (GstMultiQueue *) parent;
srcpad = g_weak_ref_get (&sq->srcpad);
if (!srcpad) {
GST_INFO_OBJECT (pad, "Pushing while corresponding sourcepad has been"
" removed already");
return GST_FLOW_FLUSHING;
}
type = GST_EVENT_TYPE (event); type = GST_EVENT_TYPE (event);
@ -2278,7 +2368,7 @@ gst_multi_queue_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
GST_DEBUG_OBJECT (mq, "SingleQueue %d : received flush start event", GST_DEBUG_OBJECT (mq, "SingleQueue %d : received flush start event",
sq->id); sq->id);
res = gst_pad_push_event (sq->srcpad, event); res = gst_pad_push_event (srcpad, event);
gst_single_queue_flush (mq, sq, TRUE, FALSE); gst_single_queue_flush (mq, sq, TRUE, FALSE);
gst_single_queue_pause (mq, sq); gst_single_queue_pause (mq, sq);
@ -2288,7 +2378,7 @@ gst_multi_queue_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
GST_DEBUG_OBJECT (mq, "SingleQueue %d : received flush stop event", GST_DEBUG_OBJECT (mq, "SingleQueue %d : received flush stop event",
sq->id); sq->id);
res = gst_pad_push_event (sq->srcpad, event); res = gst_pad_push_event (srcpad, event);
gst_single_queue_flush (mq, sq, FALSE, FALSE); gst_single_queue_flush (mq, sq, FALSE, FALSE);
gst_single_queue_start (mq, sq); gst_single_queue_start (mq, sq);
@ -2322,7 +2412,7 @@ gst_multi_queue_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
default: default:
if (!(GST_EVENT_IS_SERIALIZED (event))) { if (!(GST_EVENT_IS_SERIALIZED (event))) {
res = gst_pad_push_event (sq->srcpad, event); res = gst_pad_push_event (srcpad, event);
goto done; goto done;
} }
break; break;
@ -2396,6 +2486,8 @@ gst_multi_queue_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
} }
done: done:
gst_object_unref (srcpad);
if (res == FALSE) if (res == FALSE)
flowret = GST_FLOW_ERROR; flowret = GST_FLOW_ERROR;
GST_DEBUG_OBJECT (mq, "SingleQueue %d : returning %s", sq->id, GST_DEBUG_OBJECT (mq, "SingleQueue %d : returning %s", sq->id,
@ -2404,6 +2496,7 @@ done:
flushing: flushing:
{ {
gst_object_unref (srcpad);
GST_LOG_OBJECT (mq, "SingleQueue %d : exit because task paused, reason: %s", GST_LOG_OBJECT (mq, "SingleQueue %d : exit because task paused, reason: %s",
sq->id, gst_flow_get_name (sq->srcresult)); sq->id, gst_flow_get_name (sq->srcresult));
if (sref) if (sref)
@ -2413,6 +2506,7 @@ flushing:
} }
was_eos: was_eos:
{ {
gst_object_unref (srcpad);
GST_DEBUG_OBJECT (mq, "we are EOS, dropping event, return GST_FLOW_EOS"); GST_DEBUG_OBJECT (mq, "we are EOS, dropping event, return GST_FLOW_EOS");
gst_event_unref (event); gst_event_unref (event);
return GST_FLOW_EOS; return GST_FLOW_EOS;
@ -2502,7 +2596,13 @@ gst_multi_queue_src_activate_mode (GstPad * pad, GstObject * parent,
gboolean result; gboolean result;
sq = GST_MULTIQUEUE_PAD (pad)->sq; sq = GST_MULTIQUEUE_PAD (pad)->sq;
mq = sq->mqueue; mq = g_weak_ref_get (&sq->mqueue);
if (!mq) {
GST_ERROR_OBJECT (pad, "No multique set anymore, can't activate pad");
return FALSE;
}
GST_DEBUG_OBJECT (mq, "SingleQueue %d", sq->id); GST_DEBUG_OBJECT (mq, "SingleQueue %d", sq->id);
@ -2520,6 +2620,7 @@ gst_multi_queue_src_activate_mode (GstPad * pad, GstObject * parent,
result = FALSE; result = FALSE;
break; break;
} }
gst_object_unref (mq);
return result; return result;
} }
@ -2527,8 +2628,17 @@ static gboolean
gst_multi_queue_src_event (GstPad * pad, GstObject * parent, GstEvent * event) gst_multi_queue_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
{ {
GstSingleQueue *sq = GST_MULTIQUEUE_PAD (pad)->sq; GstSingleQueue *sq = GST_MULTIQUEUE_PAD (pad)->sq;
GstMultiQueue *mq = sq->mqueue; GstMultiQueue *mq = g_weak_ref_get (&sq->mqueue);
gboolean ret; gboolean ret;
GstPad *sinkpad = g_weak_ref_get (&sq->sinkpad);
if (!mq || !sinkpad) {
gst_clear_object (&sinkpad);
gst_clear_object (&mq);
GST_INFO_OBJECT (pad, "No multique/sinkpad set anymore, flushing");
return FALSE;
}
switch (GST_EVENT_TYPE (event)) { switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_RECONFIGURE: case GST_EVENT_RECONFIGURE:
@ -2539,13 +2649,16 @@ gst_multi_queue_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
} }
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
ret = gst_pad_push_event (sq->sinkpad, event); ret = gst_pad_push_event (sinkpad, event);
break; break;
default: default:
ret = gst_pad_push_event (sq->sinkpad, event); ret = gst_pad_push_event (sinkpad, event);
break; break;
} }
gst_object_unref (sinkpad);
gst_object_unref (mq);
return ret; return ret;
} }
@ -2623,6 +2736,14 @@ compute_high_id (GstMultiQueue * mq)
for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) { for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
GstSingleQueue *sq = (GstSingleQueue *) tmp->data; GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
GstPad *srcpad = g_weak_ref_get (&sq->srcpad);
if (!srcpad) {
GST_INFO_OBJECT (mq,
"srcpad has been removed already... ignoring single queue");
continue;
}
GST_LOG_OBJECT (mq, "inspecting sq:%d , nextid:%d, oldid:%d, srcresult:%s", GST_LOG_OBJECT (mq, "inspecting sq:%d , nextid:%d, oldid:%d, srcresult:%s",
sq->id, sq->nextid, sq->oldid, gst_flow_get_name (sq->srcresult)); sq->id, sq->nextid, sq->oldid, gst_flow_get_name (sq->srcresult));
@ -2631,18 +2752,20 @@ compute_high_id (GstMultiQueue * mq)
/* No need to consider queues which are not waiting */ /* No need to consider queues which are not waiting */
if (sq->nextid == 0) { if (sq->nextid == 0) {
GST_LOG_OBJECT (mq, "sq:%d is not waiting - ignoring", sq->id); GST_LOG_OBJECT (mq, "sq:%d is not waiting - ignoring", sq->id);
gst_object_unref (srcpad);
continue; continue;
} }
if (sq->nextid < lowest) if (sq->nextid < lowest)
lowest = sq->nextid; lowest = sq->nextid;
} else if (!GST_PAD_IS_EOS (sq->srcpad) && sq->srcresult != GST_FLOW_EOS) { } else if (!GST_PAD_IS_EOS (srcpad) && sq->srcresult != GST_FLOW_EOS) {
/* If we don't have a global highid, or the global highid is lower than /* If we don't have a global highid, or the global highid is lower than
* this single queue's last outputted id, store the queue's one, * this single queue's last outputted id, store the queue's one,
* unless the singlequeue output is at EOS */ * unless the singlequeue output is at EOS */
if ((highid == G_MAXUINT32) || (sq->oldid > highid)) if ((highid == G_MAXUINT32) || (sq->oldid > highid))
highid = sq->oldid; highid = sq->oldid;
} }
gst_object_unref (srcpad);
} }
if (highid == G_MAXUINT32 || lowest < highid) if (highid == G_MAXUINT32 || lowest < highid)
@ -2676,6 +2799,14 @@ compute_high_time (GstMultiQueue * mq, guint groupid)
for (tmp = mq->queues; tmp; tmp = tmp->next) { for (tmp = mq->queues; tmp; tmp = tmp->next) {
GstSingleQueue *sq = (GstSingleQueue *) tmp->data; GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
GstPad *srcpad = g_weak_ref_get (&sq->srcpad);
if (!srcpad) {
GST_INFO_OBJECT (mq,
"srcpad has been removed already... ignoring single queue");
continue;
}
GST_LOG_OBJECT (mq, GST_LOG_OBJECT (mq,
"inspecting sq:%d (group:%d) , next_time:%" GST_STIME_FORMAT "inspecting sq:%d (group:%d) , next_time:%" GST_STIME_FORMAT
@ -2690,6 +2821,7 @@ compute_high_time (GstMultiQueue * mq, guint groupid)
/* No need to consider queues which are not waiting */ /* No need to consider queues which are not waiting */
if (!GST_CLOCK_STIME_IS_VALID (sq->next_time)) { if (!GST_CLOCK_STIME_IS_VALID (sq->next_time)) {
GST_LOG_OBJECT (mq, "sq:%d is not waiting - ignoring", sq->id); GST_LOG_OBJECT (mq, "sq:%d is not waiting - ignoring", sq->id);
gst_object_unref (srcpad);
continue; continue;
} }
@ -2698,7 +2830,7 @@ compute_high_time (GstMultiQueue * mq, guint groupid)
if (sq->groupid == groupid && (group_low == GST_CLOCK_STIME_NONE if (sq->groupid == groupid && (group_low == GST_CLOCK_STIME_NONE
|| sq->next_time < group_low)) || sq->next_time < group_low))
group_low = sq->next_time; group_low = sq->next_time;
} else if (!GST_PAD_IS_EOS (sq->srcpad) && sq->srcresult != GST_FLOW_EOS) { } else if (!GST_PAD_IS_EOS (srcpad) && sq->srcresult != GST_FLOW_EOS) {
/* If we don't have a global high time, or the global high time /* If we don't have a global high time, or the global high time
* is lower than this single queue's last outputted time, store * is lower than this single queue's last outputted time, store
* the queue's one, unless the singlequeue output is at EOS. */ * the queue's one, unless the singlequeue output is at EOS. */
@ -2717,6 +2849,8 @@ compute_high_time (GstMultiQueue * mq, guint groupid)
GST_LOG_OBJECT (mq, GST_LOG_OBJECT (mq,
"grouphigh %" GST_STIME_FORMAT " grouplow %" GST_STIME_FORMAT, "grouphigh %" GST_STIME_FORMAT " grouplow %" GST_STIME_FORMAT,
GST_STIME_ARGS (group_high), GST_STIME_ARGS (group_low)); GST_STIME_ARGS (group_high), GST_STIME_ARGS (group_low));
gst_object_unref (srcpad);
} }
if (highest == GST_CLOCK_STIME_NONE) if (highest == GST_CLOCK_STIME_NONE)
@ -2755,11 +2889,17 @@ compute_high_time (GstMultiQueue * mq, guint groupid)
static void static void
single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq) single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq)
{ {
GstMultiQueue *mq = sq->mqueue;
GList *tmp; GList *tmp;
GstDataQueueSize size; GstDataQueueSize size;
gboolean filled = TRUE; gboolean filled = TRUE;
gboolean empty_found = FALSE; gboolean empty_found = FALSE;
GstMultiQueue *mq = g_weak_ref_get (&sq->mqueue);
if (!mq) {
GST_ERROR ("No multique set anymore, not doing anything");
return;
}
gst_data_queue_get_level (sq->queue, &size); gst_data_queue_get_level (sq->queue, &size);
@ -2812,6 +2952,7 @@ single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq)
done: done:
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
gst_object_unref (mq);
/* Overrun is always forwarded, since this is blocking the upstream element */ /* Overrun is always forwarded, since this is blocking the upstream element */
if (filled) { if (filled) {
@ -2824,11 +2965,18 @@ static void
single_queue_underrun_cb (GstDataQueue * dq, GstSingleQueue * sq) single_queue_underrun_cb (GstDataQueue * dq, GstSingleQueue * sq)
{ {
gboolean empty = TRUE; gboolean empty = TRUE;
GstMultiQueue *mq = sq->mqueue; GstMultiQueue *mq = g_weak_ref_get (&sq->mqueue);
GList *tmp; GList *tmp;
if (!mq) {
GST_ERROR ("No multique set anymore, not doing anything");
return;
}
if (sq->srcresult == GST_FLOW_NOT_LINKED) { if (sq->srcresult == GST_FLOW_NOT_LINKED) {
GST_LOG_OBJECT (mq, "Single Queue %d is empty but not-linked", sq->id); GST_LOG_OBJECT (mq, "Single Queue %d is empty but not-linked", sq->id);
gst_object_unref (mq);
return; return;
} else { } else {
GST_LOG_OBJECT (mq, GST_LOG_OBJECT (mq,
@ -2855,6 +3003,7 @@ single_queue_underrun_cb (GstDataQueue * dq, GstSingleQueue * sq)
empty = FALSE; empty = FALSE;
} }
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
gst_object_unref (mq);
if (empty) { if (empty) {
GST_DEBUG_OBJECT (mq, "All queues are empty, signalling it"); GST_DEBUG_OBJECT (mq, "All queues are empty, signalling it");
@ -2867,7 +3016,13 @@ single_queue_check_full (GstDataQueue * dataq, guint visible, guint bytes,
guint64 time, GstSingleQueue * sq) guint64 time, GstSingleQueue * sq)
{ {
gboolean res; gboolean res;
GstMultiQueue *mq = sq->mqueue; GstMultiQueue *mq = g_weak_ref_get (&sq->mqueue);
if (!mq) {
GST_ERROR ("No multique set anymore, let's say we are full");
return TRUE;
}
GST_DEBUG_OBJECT (mq, GST_DEBUG_OBJECT (mq,
"queue %d: visible %u/%u, bytes %u/%u, time %" G_GUINT64_FORMAT "/%" "queue %d: visible %u/%u, bytes %u/%u, time %" G_GUINT64_FORMAT "/%"
@ -2875,12 +3030,16 @@ single_queue_check_full (GstDataQueue * dataq, guint visible, guint bytes,
sq->max_size.bytes, sq->cur_time, sq->max_size.time); sq->max_size.bytes, sq->cur_time, sq->max_size.time);
/* we are always filled on EOS */ /* we are always filled on EOS */
if (sq->is_eos || sq->is_segment_done) if (sq->is_eos || sq->is_segment_done) {
return TRUE; res = TRUE;
goto done;
}
/* we never go past the max visible items unless we are in buffering mode */ /* we never go past the max visible items unless we are in buffering mode */
if (!mq->use_buffering && IS_FILLED (sq, visible, visible)) if (!mq->use_buffering && IS_FILLED (sq, visible, visible)) {
return TRUE; res = TRUE;
goto done;
}
/* check time or bytes */ /* check time or bytes */
res = IS_FILLED (sq, bytes, bytes); res = IS_FILLED (sq, bytes, bytes);
@ -2896,6 +3055,8 @@ single_queue_check_full (GstDataQueue * dataq, guint visible, guint bytes,
} else } else
res |= IS_FILLED (sq, time, sq->cur_time); res |= IS_FILLED (sq, time, sq->cur_time);
} }
done:
gst_object_unref (mq);
return res; return res;
} }
@ -2906,6 +3067,8 @@ gst_single_queue_flush_queue (GstSingleQueue * sq, gboolean full)
GstDataQueueItem *sitem; GstDataQueueItem *sitem;
GstMultiQueueItem *mitem; GstMultiQueueItem *mitem;
gboolean was_flushing = FALSE; gboolean was_flushing = FALSE;
GstPad *srcpad = g_weak_ref_get (&sq->srcpad);
GstMultiQueue *mq = g_weak_ref_get (&sq->mqueue);
while (!gst_data_queue_is_empty (sq->queue)) { while (!gst_data_queue_is_empty (sq->queue)) {
GstMiniObject *data; GstMiniObject *data;
@ -2925,23 +3088,27 @@ gst_single_queue_flush_queue (GstSingleQueue * sq, gboolean full)
data = sitem->object; data = sitem->object;
if (!full && !mitem->is_query && GST_IS_EVENT (data) if (!full && !mitem->is_query && GST_IS_EVENT (data)
&& GST_EVENT_IS_STICKY (data) && srcpad && GST_EVENT_IS_STICKY (data)
&& GST_EVENT_TYPE (data) != GST_EVENT_SEGMENT && GST_EVENT_TYPE (data) != GST_EVENT_SEGMENT
&& GST_EVENT_TYPE (data) != GST_EVENT_EOS) { && GST_EVENT_TYPE (data) != GST_EVENT_EOS) {
gst_pad_store_sticky_event (sq->srcpad, GST_EVENT_CAST (data)); gst_pad_store_sticky_event (srcpad, GST_EVENT_CAST (data));
} }
sitem->destroy (sitem); sitem->destroy (sitem);
} }
gst_clear_object (&srcpad);
gst_data_queue_flush (sq->queue); gst_data_queue_flush (sq->queue);
if (was_flushing) if (was_flushing)
gst_data_queue_set_flushing (sq->queue, TRUE); gst_data_queue_set_flushing (sq->queue, TRUE);
GST_MULTI_QUEUE_MUTEX_LOCK (sq->mqueue); if (mq) {
update_buffering (sq->mqueue, sq); GST_MULTI_QUEUE_MUTEX_LOCK (mq);
GST_MULTI_QUEUE_MUTEX_UNLOCK (sq->mqueue); update_buffering (mq, sq);
gst_multi_queue_post_buffering (sq->mqueue); GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
gst_multi_queue_post_buffering (mq);
gst_object_unref (mq);
}
} }
static void static void
@ -2953,6 +3120,9 @@ gst_single_queue_unref (GstSingleQueue * sq)
g_object_unref (sq->queue); g_object_unref (sq->queue);
g_cond_clear (&sq->turn); g_cond_clear (&sq->turn);
g_cond_clear (&sq->query_handled); g_cond_clear (&sq->query_handled);
g_weak_ref_clear (&sq->sinkpad);
g_weak_ref_clear (&sq->srcpad);
g_weak_ref_clear (&sq->mqueue);
g_free (sq); g_free (sq);
} }
} }
@ -2969,8 +3139,8 @@ gst_single_queue_ref (GstSingleQueue * squeue)
static GstSingleQueue * static GstSingleQueue *
gst_single_queue_new (GstMultiQueue * mqueue, guint id) gst_single_queue_new (GstMultiQueue * mqueue, guint id)
{ {
GstPad *srcpad, *sinkpad;
GstSingleQueue *sq; GstSingleQueue *sq;
GstMultiQueuePad *mqpad;
GstPadTemplate *templ; GstPadTemplate *templ;
gchar *name; gchar *name;
GList *tmp; GList *tmp;
@ -3019,7 +3189,7 @@ gst_single_queue_new (GstMultiQueue * mqueue, guint id)
GST_DEBUG_OBJECT (mqueue, "Creating GstSingleQueue id:%d", sq->id); GST_DEBUG_OBJECT (mqueue, "Creating GstSingleQueue id:%d", sq->id);
sq->mqueue = mqueue; g_weak_ref_init (&sq->mqueue, mqueue);
sq->srcresult = GST_FLOW_FLUSHING; sq->srcresult = GST_FLOW_FLUSHING;
sq->pushed = FALSE; sq->pushed = FALSE;
sq->queue = gst_data_queue_new ((GstDataQueueCheckFullFunction) sq->queue = gst_data_queue_new ((GstDataQueueCheckFullFunction)
@ -3047,45 +3217,45 @@ gst_single_queue_new (GstMultiQueue * mqueue, guint id)
name = g_strdup_printf ("sink_%u", sq->id); name = g_strdup_printf ("sink_%u", sq->id);
templ = gst_static_pad_template_get (&sinktemplate); templ = gst_static_pad_template_get (&sinktemplate);
sq->sinkpad = g_object_new (GST_TYPE_MULTIQUEUE_PAD, "name", name, sinkpad = g_object_new (GST_TYPE_MULTIQUEUE_PAD, "name", name,
"direction", templ->direction, "template", templ, NULL); "direction", templ->direction, "template", templ, NULL);
g_weak_ref_init (&sq->sinkpad, sinkpad);
gst_object_unref (templ); gst_object_unref (templ);
g_free (name); g_free (name);
mqpad = (GstMultiQueuePad *) sq->sinkpad; GST_MULTIQUEUE_PAD (sinkpad)->sq = sq;
mqpad->sq = sq;
gst_pad_set_chain_function (sq->sinkpad, gst_pad_set_chain_function (sinkpad,
GST_DEBUG_FUNCPTR (gst_multi_queue_chain)); GST_DEBUG_FUNCPTR (gst_multi_queue_chain));
gst_pad_set_activatemode_function (sq->sinkpad, gst_pad_set_activatemode_function (sinkpad,
GST_DEBUG_FUNCPTR (gst_multi_queue_sink_activate_mode)); GST_DEBUG_FUNCPTR (gst_multi_queue_sink_activate_mode));
gst_pad_set_event_full_function (sq->sinkpad, gst_pad_set_event_full_function (sinkpad,
GST_DEBUG_FUNCPTR (gst_multi_queue_sink_event)); GST_DEBUG_FUNCPTR (gst_multi_queue_sink_event));
gst_pad_set_query_function (sq->sinkpad, gst_pad_set_query_function (sinkpad,
GST_DEBUG_FUNCPTR (gst_multi_queue_sink_query)); GST_DEBUG_FUNCPTR (gst_multi_queue_sink_query));
gst_pad_set_iterate_internal_links_function (sq->sinkpad, gst_pad_set_iterate_internal_links_function (sinkpad,
GST_DEBUG_FUNCPTR (gst_multi_queue_iterate_internal_links)); GST_DEBUG_FUNCPTR (gst_multi_queue_iterate_internal_links));
GST_OBJECT_FLAG_SET (sq->sinkpad, GST_PAD_FLAG_PROXY_CAPS); GST_OBJECT_FLAG_SET (sinkpad, GST_PAD_FLAG_PROXY_CAPS);
name = g_strdup_printf ("src_%u", sq->id); name = g_strdup_printf ("src_%u", sq->id);
templ = gst_static_pad_template_get (&srctemplate); templ = gst_static_pad_template_get (&srctemplate);
sq->srcpad = g_object_new (GST_TYPE_MULTIQUEUE_PAD, "name", name, srcpad = g_object_new (GST_TYPE_MULTIQUEUE_PAD, "name", name,
"direction", templ->direction, "template", templ, NULL); "direction", templ->direction, "template", templ, NULL);
g_weak_ref_init (&sq->srcpad, srcpad);
gst_object_unref (templ); gst_object_unref (templ);
g_free (name); g_free (name);
mqpad = (GstMultiQueuePad *) sq->srcpad; GST_MULTIQUEUE_PAD (srcpad)->sq = gst_single_queue_ref (sq);
mqpad->sq = gst_single_queue_ref (sq);
gst_pad_set_activatemode_function (sq->srcpad, gst_pad_set_activatemode_function (srcpad,
GST_DEBUG_FUNCPTR (gst_multi_queue_src_activate_mode)); GST_DEBUG_FUNCPTR (gst_multi_queue_src_activate_mode));
gst_pad_set_event_function (sq->srcpad, gst_pad_set_event_function (srcpad,
GST_DEBUG_FUNCPTR (gst_multi_queue_src_event)); GST_DEBUG_FUNCPTR (gst_multi_queue_src_event));
gst_pad_set_query_function (sq->srcpad, gst_pad_set_query_function (srcpad,
GST_DEBUG_FUNCPTR (gst_multi_queue_src_query)); GST_DEBUG_FUNCPTR (gst_multi_queue_src_query));
gst_pad_set_iterate_internal_links_function (sq->srcpad, gst_pad_set_iterate_internal_links_function (srcpad,
GST_DEBUG_FUNCPTR (gst_multi_queue_iterate_internal_links)); GST_DEBUG_FUNCPTR (gst_multi_queue_iterate_internal_links));
GST_OBJECT_FLAG_SET (sq->srcpad, GST_PAD_FLAG_PROXY_CAPS); GST_OBJECT_FLAG_SET (srcpad, GST_PAD_FLAG_PROXY_CAPS);
GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue); GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
@ -3094,11 +3264,11 @@ gst_single_queue_new (GstMultiQueue * mqueue, guint id)
* between activating and adding */ * between activating and adding */
g_rec_mutex_lock (GST_STATE_GET_LOCK (mqueue)); g_rec_mutex_lock (GST_STATE_GET_LOCK (mqueue));
if (GST_STATE_TARGET (mqueue) != GST_STATE_NULL) { if (GST_STATE_TARGET (mqueue) != GST_STATE_NULL) {
gst_pad_set_active (sq->srcpad, TRUE); gst_pad_set_active (srcpad, TRUE);
gst_pad_set_active (sq->sinkpad, TRUE); gst_pad_set_active (sinkpad, TRUE);
} }
gst_element_add_pad (GST_ELEMENT (mqueue), sq->srcpad); gst_element_add_pad (GST_ELEMENT (mqueue), srcpad);
gst_element_add_pad (GST_ELEMENT (mqueue), sq->sinkpad); gst_element_add_pad (GST_ELEMENT (mqueue), sinkpad);
if (GST_STATE_TARGET (mqueue) != GST_STATE_NULL) { if (GST_STATE_TARGET (mqueue) != GST_STATE_NULL) {
gst_single_queue_start (mqueue, sq); gst_single_queue_start (mqueue, sq);
} }