diff --git a/plugins/elements/gstmultiqueue.c b/plugins/elements/gstmultiqueue.c index 5f2d687b38..b1cd84e052 100644 --- a/plugins/elements/gstmultiqueue.c +++ b/plugins/elements/gstmultiqueue.c @@ -118,10 +118,9 @@ struct _GstSingleQueue guint groupid; GstClockTimeDiff group_high_time; - GstMultiQueue *mqueue; - - GstPad *sinkpad; - GstPad *srcpad; + GWeakRef mqueue; + GWeakRef sinkpad; + GWeakRef srcpad; /* flowreturn of previous srcpad push */ GstFlowReturn srcresult; @@ -343,10 +342,18 @@ gst_multiqueue_pad_get_property (GObject * object, guint prop_id, switch (prop_id) { case PROP_PAD_GROUP_ID: - GST_OBJECT_LOCK (pad->sq->mqueue); - if (pad->sq) + if (pad->sq) { + GstMultiQueue *mq = g_weak_ref_get (&pad->sq->mqueue); + + if (mq) + GST_OBJECT_LOCK (mq); + g_value_set_uint (value, pad->sq->groupid); - GST_OBJECT_UNLOCK (pad->sq->mqueue); + if (mq) { + GST_OBJECT_UNLOCK (mq); + gst_object_unref (mq); + } + } break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); @@ -362,10 +369,19 @@ gst_multiqueue_pad_set_property (GObject * object, guint prop_id, switch (prop_id) { case PROP_PAD_GROUP_ID: - GST_OBJECT_LOCK (pad->sq->mqueue); - if (pad->sq) + if (pad->sq) { + GstMultiQueue *mqueue = g_weak_ref_get (&pad->sq->mqueue); + + if (mqueue) + GST_OBJECT_LOCK (mqueue); + pad->sq->groupid = g_value_get_uint (value); - GST_OBJECT_UNLOCK (pad->sq->mqueue); + + if (mqueue) { + GST_OBJECT_UNLOCK (mqueue); + gst_object_unref (mqueue); + } + } break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); @@ -900,7 +916,7 @@ static GstIterator * gst_multi_queue_iterate_internal_links (GstPad * pad, GstObject * parent) { GstIterator *it = NULL; - GstPad *opad; + GstPad *opad, *sinkpad, *srcpad; GstSingleQueue *squeue; GstMultiQueue *mq = GST_MULTI_QUEUE (parent); GValue val = { 0, }; @@ -910,12 +926,21 @@ gst_multi_queue_iterate_internal_links (GstPad * pad, GstObject * parent) if (!squeue) goto out; - if (squeue->sinkpad == pad) - opad = gst_object_ref (squeue->srcpad); - else if (squeue->srcpad == pad) - opad = gst_object_ref (squeue->sinkpad); - else + srcpad = g_weak_ref_get (&squeue->srcpad); + sinkpad = g_weak_ref_get (&squeue->sinkpad); + if (sinkpad == pad) { + opad = srcpad; + gst_clear_object (&sinkpad); + + } else if (srcpad == pad) { + opad = sinkpad; + gst_clear_object (&srcpad); + + } else { + gst_clear_object (&srcpad); + gst_clear_object (&sinkpad); goto out; + } g_value_init (&val, GST_TYPE_PAD); g_value_set_object (&val, opad); @@ -952,7 +977,10 @@ gst_multi_queue_request_new_pad (GstElement * element, GstPadTemplate * temp, /* Create a new single queue, add the sink and source pad and return the sink pad */ squeue = gst_single_queue_new (mqueue, temp_id); - new_pad = squeue ? squeue->sinkpad : NULL; + new_pad = squeue ? g_weak_ref_get (&squeue->sinkpad) : NULL; + /* request pad assumes the element is owning the ref of the pad it returns */ + if (new_pad) + gst_object_unref (new_pad); GST_DEBUG_OBJECT (mqueue, "Returning pad %" GST_PTR_FORMAT, new_pad); @@ -962,6 +990,7 @@ gst_multi_queue_request_new_pad (GstElement * element, GstPadTemplate * temp, static void gst_multi_queue_release_pad (GstElement * element, GstPad * pad) { + GstPad *sinkpad = NULL, *srcpad = NULL; GstMultiQueue *mqueue = GST_MULTI_QUEUE (element); GstSingleQueue *sq = NULL; GList *tmp; @@ -972,12 +1001,19 @@ gst_multi_queue_release_pad (GstElement * element, GstPad * pad) /* Find which single queue it belongs to, knowing that it should be a sinkpad */ for (tmp = mqueue->queues; tmp; tmp = g_list_next (tmp)) { sq = (GstSingleQueue *) tmp->data; + sinkpad = g_weak_ref_get (&sq->sinkpad); - if (sq->sinkpad == pad) + if (sinkpad == pad) { + srcpad = g_weak_ref_get (&sq->srcpad); break; + } + + gst_object_unref (sinkpad); } if (!tmp) { + gst_clear_object (&sinkpad); + gst_clear_object (&srcpad); GST_WARNING_OBJECT (mqueue, "That pad doesn't belong to this element ???"); GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue); return; @@ -996,10 +1032,12 @@ gst_multi_queue_release_pad (GstElement * element, GstPad * pad) /* delete SingleQueue */ gst_data_queue_set_flushing (sq->queue, TRUE); - gst_pad_set_active (sq->srcpad, FALSE); - gst_pad_set_active (sq->sinkpad, FALSE); - gst_element_remove_pad (element, sq->srcpad); - gst_element_remove_pad (element, sq->sinkpad); + gst_pad_set_active (srcpad, FALSE); + gst_pad_set_active (sinkpad, FALSE); + gst_element_remove_pad (element, srcpad); + gst_element_remove_pad (element, sinkpad); + gst_object_unref (srcpad); + gst_object_unref (sinkpad); } static GstStateChangeReturn @@ -1061,18 +1099,32 @@ gst_multi_queue_change_state (GstElement * element, GstStateChange transition) static gboolean gst_single_queue_start (GstMultiQueue * mq, GstSingleQueue * sq) { + gboolean res = FALSE; + GstPad *srcpad = g_weak_ref_get (&sq->srcpad); + GST_LOG_OBJECT (mq, "SingleQueue %d : starting task", sq->id); - return gst_pad_start_task (sq->srcpad, - (GstTaskFunction) gst_multi_queue_loop, sq->srcpad, NULL); + + if (srcpad) { + res = gst_pad_start_task (srcpad, + (GstTaskFunction) gst_multi_queue_loop, srcpad, NULL); + gst_object_unref (srcpad); + } + + return res; } static gboolean gst_single_queue_pause (GstMultiQueue * mq, GstSingleQueue * sq) { - gboolean result; + gboolean result = FALSE; + GstPad *srcpad = g_weak_ref_get (&sq->srcpad); GST_LOG_OBJECT (mq, "SingleQueue %d : pausing task", sq->id); - result = gst_pad_pause_task (sq->srcpad); + if (srcpad) { + result = gst_pad_pause_task (srcpad); + gst_object_unref (srcpad); + } + sq->sink_tainted = sq->src_tainted = TRUE; return result; } @@ -1080,10 +1132,14 @@ gst_single_queue_pause (GstMultiQueue * mq, GstSingleQueue * sq) static gboolean gst_single_queue_stop (GstMultiQueue * mq, GstSingleQueue * sq) { - gboolean result; + gboolean result = FALSE; + GstPad *srcpad = g_weak_ref_get (&sq->srcpad); GST_LOG_OBJECT (mq, "SingleQueue %d : stopping task", sq->id); - result = gst_pad_stop_task (sq->srcpad); + if (srcpad) { + result = gst_pad_stop_task (srcpad); + gst_object_unref (srcpad); + } sq->sink_tainted = sq->src_tainted = TRUE; return result; } @@ -1145,14 +1201,14 @@ gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush, /* WITH LOCK TAKEN */ static gint -get_buffering_level (GstSingleQueue * sq) +get_buffering_level (GstMultiQueue * mq, GstSingleQueue * sq) { GstDataQueueSize size; gint buffering_level, tmp; gst_data_queue_get_level (sq->queue, &size); - GST_DEBUG_OBJECT (sq->mqueue, + GST_DEBUG_OBJECT (mq, "queue %d: visible %u/%u, bytes %u/%u, time %" G_GUINT64_FORMAT "/%" G_GUINT64_FORMAT, sq->id, size.visible, sq->max_size.visible, size.bytes, sq->max_size.bytes, sq->cur_time, sq->max_size.time); @@ -1190,7 +1246,7 @@ update_buffering (GstMultiQueue * mq, GstSingleQueue * sq) if (!mq->use_buffering) return; - buffering_level = get_buffering_level (sq); + buffering_level = get_buffering_level (mq, sq); /* scale so that if buffering_level equals the high watermark, * the percentage is 100% */ @@ -1214,7 +1270,7 @@ update_buffering (GstMultiQueue * mq, GstSingleQueue * sq) for (iter = mq->queues; iter; iter = g_list_next (iter)) { GstSingleQueue *oq = (GstSingleQueue *) iter->data; - if (get_buffering_level (oq) >= mq->high_watermark) { + if (get_buffering_level (mq, oq) >= mq->high_watermark) { is_buffering = FALSE; break; @@ -1624,6 +1680,13 @@ gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq, GstMiniObject * object, gboolean * allow_drop) { GstFlowReturn result = sq->srcresult; + GstPad *srcpad = g_weak_ref_get (&sq->srcpad); + + if (!srcpad) { + GST_INFO_OBJECT (mq, + "Pushing while corresponding sourcepad has been cleared"); + return GST_FLOW_FLUSHING; + } if (GST_IS_BUFFER (object)) { GstBuffer *buffer; @@ -1647,7 +1710,7 @@ gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq, GST_DEBUG_OBJECT (mq, "SingleQueue %d : Pushing buffer %p with ts %" GST_TIME_FORMAT, sq->id, buffer, GST_TIME_ARGS (timestamp)); - result = gst_pad_push (sq->srcpad, buffer); + result = gst_pad_push (srcpad, buffer); } } else if (GST_IS_EVENT (object)) { GstEvent *event; @@ -1696,7 +1759,7 @@ gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq, "SingleQueue %d : Pushing event %p of type %s", sq->id, event, GST_EVENT_TYPE_NAME (event)); - gst_pad_push_event (sq->srcpad, event); + gst_pad_push_event (srcpad, event); } } else if (GST_IS_QUERY (object)) { GstQuery *query; @@ -1710,7 +1773,7 @@ gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq, gst_query_unref (query); res = FALSE; } else { - res = gst_pad_peer_query (sq->srcpad, query); + res = gst_pad_peer_query (srcpad, query); } GST_MULTI_QUEUE_MUTEX_LOCK (mq); @@ -1722,6 +1785,8 @@ gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq, g_warning ("Unexpected object in singlequeue %u (refcounting problem?)", sq->id); } + + gst_object_unref (srcpad); return result; /* ERRORS */ @@ -1801,9 +1866,14 @@ gst_multi_queue_loop (GstPad * pad) gboolean is_buffer; gboolean do_update_buffering = FALSE; gboolean dropping = FALSE; + GstPad *srcpad = NULL; sq = GST_MULTIQUEUE_PAD (pad)->sq; - mq = sq->mqueue; + mq = g_weak_ref_get (&sq->mqueue); + srcpad = g_weak_ref_get (&sq->srcpad); + + if (!mq || !srcpad) + goto out_flushing; next: GST_DEBUG_OBJECT (mq, "SingleQueue %d : trying to pop an object", sq->id); @@ -2037,11 +2107,11 @@ next: gst_multi_queue_post_buffering (mq); GST_LOG_OBJECT (mq, "sq:%d AFTER PUSHING sq->srcresult: %s (is_eos:%d)", - sq->id, gst_flow_get_name (sq->srcresult), GST_PAD_IS_EOS (sq->srcpad)); + sq->id, gst_flow_get_name (sq->srcresult), GST_PAD_IS_EOS (srcpad)); /* Need to make sure wake up any sleeping pads when we exit */ GST_MULTI_QUEUE_MUTEX_LOCK (mq); - if (mq->numwaiting > 0 && (GST_PAD_IS_EOS (sq->srcpad) + if (mq->numwaiting > 0 && (GST_PAD_IS_EOS (srcpad) || sq->srcresult == GST_FLOW_EOS)) { compute_high_time (mq, sq->groupid); compute_high_id (mq); @@ -2056,6 +2126,10 @@ next: && result != GST_FLOW_EOS) goto out_flushing; +done: + gst_clear_object (&mq); + gst_clear_object (&srcpad); + return; out_flushing: @@ -2086,11 +2160,11 @@ out_flushing: gst_single_queue_flush_queue (sq, FALSE); single_queue_underrun_cb (sq->queue, sq); gst_data_queue_set_flushing (sq->queue, TRUE); - gst_pad_pause_task (sq->srcpad); + gst_pad_pause_task (srcpad); GST_CAT_LOG_OBJECT (multi_queue_debug, mq, "SingleQueue[%d] task paused, reason:%s", sq->id, gst_flow_get_name (sq->srcresult)); - return; + goto done; } } @@ -2106,12 +2180,15 @@ gst_multi_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) { GstSingleQueue *sq; GstMultiQueue *mq; - GstMultiQueueItem *item; + GstMultiQueueItem *item = NULL; guint32 curid; GstClockTime timestamp, duration; sq = GST_MULTIQUEUE_PAD (pad)->sq; - mq = sq->mqueue; + mq = g_weak_ref_get (&sq->mqueue); + + if (!mq) + goto flushing; /* if eos, we are always full, so avoid hanging incoming indefinitely */ if (sq->is_eos) @@ -2164,6 +2241,7 @@ gst_multi_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) apply_buffer (mq, sq, timestamp, duration, &sq->sink_segment); done: + gst_clear_object (&mq); return sq->srcresult; /* ERRORS */ @@ -2171,13 +2249,15 @@ flushing: { GST_LOG_OBJECT (mq, "SingleQueue %d : exit because task paused, reason: %s", sq->id, gst_flow_get_name (sq->srcresult)); - gst_multi_queue_item_destroy (item); + if (item) + gst_multi_queue_item_destroy (item); goto done; } was_eos: { GST_DEBUG_OBJECT (mq, "we are EOS, dropping buffer, return EOS"); gst_buffer_unref (buffer); + gst_object_unref (mq); return GST_FLOW_EOS; } } @@ -2250,9 +2330,19 @@ gst_multi_queue_sink_event (GstPad * pad, GstObject * parent, GstEvent * event) GstFlowReturn flowret = GST_FLOW_OK; GstEventType type; GstEvent *sref = NULL; + GstPad *srcpad; + sq = GST_MULTIQUEUE_PAD (pad)->sq; mq = (GstMultiQueue *) parent; + srcpad = g_weak_ref_get (&sq->srcpad); + + if (!srcpad) { + GST_INFO_OBJECT (pad, "Pushing while corresponding sourcepad has been" + " removed already"); + + return GST_FLOW_FLUSHING; + } type = GST_EVENT_TYPE (event); @@ -2278,7 +2368,7 @@ gst_multi_queue_sink_event (GstPad * pad, GstObject * parent, GstEvent * event) GST_DEBUG_OBJECT (mq, "SingleQueue %d : received flush start event", sq->id); - res = gst_pad_push_event (sq->srcpad, event); + res = gst_pad_push_event (srcpad, event); gst_single_queue_flush (mq, sq, TRUE, FALSE); gst_single_queue_pause (mq, sq); @@ -2288,7 +2378,7 @@ gst_multi_queue_sink_event (GstPad * pad, GstObject * parent, GstEvent * event) GST_DEBUG_OBJECT (mq, "SingleQueue %d : received flush stop event", sq->id); - res = gst_pad_push_event (sq->srcpad, event); + res = gst_pad_push_event (srcpad, event); gst_single_queue_flush (mq, sq, FALSE, FALSE); gst_single_queue_start (mq, sq); @@ -2322,7 +2412,7 @@ gst_multi_queue_sink_event (GstPad * pad, GstObject * parent, GstEvent * event) default: if (!(GST_EVENT_IS_SERIALIZED (event))) { - res = gst_pad_push_event (sq->srcpad, event); + res = gst_pad_push_event (srcpad, event); goto done; } break; @@ -2396,6 +2486,8 @@ gst_multi_queue_sink_event (GstPad * pad, GstObject * parent, GstEvent * event) } done: + + gst_object_unref (srcpad); if (res == FALSE) flowret = GST_FLOW_ERROR; GST_DEBUG_OBJECT (mq, "SingleQueue %d : returning %s", sq->id, @@ -2404,6 +2496,7 @@ done: flushing: { + gst_object_unref (srcpad); GST_LOG_OBJECT (mq, "SingleQueue %d : exit because task paused, reason: %s", sq->id, gst_flow_get_name (sq->srcresult)); if (sref) @@ -2413,6 +2506,7 @@ flushing: } was_eos: { + gst_object_unref (srcpad); GST_DEBUG_OBJECT (mq, "we are EOS, dropping event, return GST_FLOW_EOS"); gst_event_unref (event); return GST_FLOW_EOS; @@ -2502,7 +2596,13 @@ gst_multi_queue_src_activate_mode (GstPad * pad, GstObject * parent, gboolean result; sq = GST_MULTIQUEUE_PAD (pad)->sq; - mq = sq->mqueue; + mq = g_weak_ref_get (&sq->mqueue); + + if (!mq) { + GST_ERROR_OBJECT (pad, "No multique set anymore, can't activate pad"); + + return FALSE; + } GST_DEBUG_OBJECT (mq, "SingleQueue %d", sq->id); @@ -2520,6 +2620,7 @@ gst_multi_queue_src_activate_mode (GstPad * pad, GstObject * parent, result = FALSE; break; } + gst_object_unref (mq); return result; } @@ -2527,8 +2628,17 @@ static gboolean gst_multi_queue_src_event (GstPad * pad, GstObject * parent, GstEvent * event) { GstSingleQueue *sq = GST_MULTIQUEUE_PAD (pad)->sq; - GstMultiQueue *mq = sq->mqueue; + GstMultiQueue *mq = g_weak_ref_get (&sq->mqueue); gboolean ret; + GstPad *sinkpad = g_weak_ref_get (&sq->sinkpad); + + if (!mq || !sinkpad) { + gst_clear_object (&sinkpad); + gst_clear_object (&mq); + GST_INFO_OBJECT (pad, "No multique/sinkpad set anymore, flushing"); + + return FALSE; + } switch (GST_EVENT_TYPE (event)) { case GST_EVENT_RECONFIGURE: @@ -2539,13 +2649,16 @@ gst_multi_queue_src_event (GstPad * pad, GstObject * parent, GstEvent * event) } GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); - ret = gst_pad_push_event (sq->sinkpad, event); + ret = gst_pad_push_event (sinkpad, event); break; default: - ret = gst_pad_push_event (sq->sinkpad, event); + ret = gst_pad_push_event (sinkpad, event); break; } + gst_object_unref (sinkpad); + gst_object_unref (mq); + return ret; } @@ -2623,6 +2736,14 @@ compute_high_id (GstMultiQueue * mq) for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) { GstSingleQueue *sq = (GstSingleQueue *) tmp->data; + GstPad *srcpad = g_weak_ref_get (&sq->srcpad); + + if (!srcpad) { + GST_INFO_OBJECT (mq, + "srcpad has been removed already... ignoring single queue"); + + continue; + } GST_LOG_OBJECT (mq, "inspecting sq:%d , nextid:%d, oldid:%d, srcresult:%s", sq->id, sq->nextid, sq->oldid, gst_flow_get_name (sq->srcresult)); @@ -2631,18 +2752,20 @@ compute_high_id (GstMultiQueue * mq) /* No need to consider queues which are not waiting */ if (sq->nextid == 0) { GST_LOG_OBJECT (mq, "sq:%d is not waiting - ignoring", sq->id); + gst_object_unref (srcpad); continue; } if (sq->nextid < lowest) lowest = sq->nextid; - } else if (!GST_PAD_IS_EOS (sq->srcpad) && sq->srcresult != GST_FLOW_EOS) { + } else if (!GST_PAD_IS_EOS (srcpad) && sq->srcresult != GST_FLOW_EOS) { /* If we don't have a global highid, or the global highid is lower than * this single queue's last outputted id, store the queue's one, * unless the singlequeue output is at EOS */ if ((highid == G_MAXUINT32) || (sq->oldid > highid)) highid = sq->oldid; } + gst_object_unref (srcpad); } if (highid == G_MAXUINT32 || lowest < highid) @@ -2676,6 +2799,14 @@ compute_high_time (GstMultiQueue * mq, guint groupid) for (tmp = mq->queues; tmp; tmp = tmp->next) { GstSingleQueue *sq = (GstSingleQueue *) tmp->data; + GstPad *srcpad = g_weak_ref_get (&sq->srcpad); + + if (!srcpad) { + GST_INFO_OBJECT (mq, + "srcpad has been removed already... ignoring single queue"); + + continue; + } GST_LOG_OBJECT (mq, "inspecting sq:%d (group:%d) , next_time:%" GST_STIME_FORMAT @@ -2690,6 +2821,7 @@ compute_high_time (GstMultiQueue * mq, guint groupid) /* No need to consider queues which are not waiting */ if (!GST_CLOCK_STIME_IS_VALID (sq->next_time)) { GST_LOG_OBJECT (mq, "sq:%d is not waiting - ignoring", sq->id); + gst_object_unref (srcpad); continue; } @@ -2698,7 +2830,7 @@ compute_high_time (GstMultiQueue * mq, guint groupid) if (sq->groupid == groupid && (group_low == GST_CLOCK_STIME_NONE || sq->next_time < group_low)) group_low = sq->next_time; - } else if (!GST_PAD_IS_EOS (sq->srcpad) && sq->srcresult != GST_FLOW_EOS) { + } else if (!GST_PAD_IS_EOS (srcpad) && sq->srcresult != GST_FLOW_EOS) { /* If we don't have a global high time, or the global high time * is lower than this single queue's last outputted time, store * the queue's one, unless the singlequeue output is at EOS. */ @@ -2717,6 +2849,8 @@ compute_high_time (GstMultiQueue * mq, guint groupid) GST_LOG_OBJECT (mq, "grouphigh %" GST_STIME_FORMAT " grouplow %" GST_STIME_FORMAT, GST_STIME_ARGS (group_high), GST_STIME_ARGS (group_low)); + + gst_object_unref (srcpad); } if (highest == GST_CLOCK_STIME_NONE) @@ -2755,11 +2889,17 @@ compute_high_time (GstMultiQueue * mq, guint groupid) static void single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq) { - GstMultiQueue *mq = sq->mqueue; GList *tmp; GstDataQueueSize size; gboolean filled = TRUE; gboolean empty_found = FALSE; + GstMultiQueue *mq = g_weak_ref_get (&sq->mqueue); + + if (!mq) { + GST_ERROR ("No multique set anymore, not doing anything"); + + return; + } gst_data_queue_get_level (sq->queue, &size); @@ -2812,6 +2952,7 @@ single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq) done: GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); + gst_object_unref (mq); /* Overrun is always forwarded, since this is blocking the upstream element */ if (filled) { @@ -2824,11 +2965,18 @@ static void single_queue_underrun_cb (GstDataQueue * dq, GstSingleQueue * sq) { gboolean empty = TRUE; - GstMultiQueue *mq = sq->mqueue; + GstMultiQueue *mq = g_weak_ref_get (&sq->mqueue); GList *tmp; + if (!mq) { + GST_ERROR ("No multique set anymore, not doing anything"); + + return; + } + if (sq->srcresult == GST_FLOW_NOT_LINKED) { GST_LOG_OBJECT (mq, "Single Queue %d is empty but not-linked", sq->id); + gst_object_unref (mq); return; } else { GST_LOG_OBJECT (mq, @@ -2855,6 +3003,7 @@ single_queue_underrun_cb (GstDataQueue * dq, GstSingleQueue * sq) empty = FALSE; } GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); + gst_object_unref (mq); if (empty) { GST_DEBUG_OBJECT (mq, "All queues are empty, signalling it"); @@ -2867,7 +3016,13 @@ single_queue_check_full (GstDataQueue * dataq, guint visible, guint bytes, guint64 time, GstSingleQueue * sq) { gboolean res; - GstMultiQueue *mq = sq->mqueue; + GstMultiQueue *mq = g_weak_ref_get (&sq->mqueue); + + if (!mq) { + GST_ERROR ("No multique set anymore, let's say we are full"); + + return TRUE; + } GST_DEBUG_OBJECT (mq, "queue %d: visible %u/%u, bytes %u/%u, time %" G_GUINT64_FORMAT "/%" @@ -2875,12 +3030,16 @@ single_queue_check_full (GstDataQueue * dataq, guint visible, guint bytes, sq->max_size.bytes, sq->cur_time, sq->max_size.time); /* we are always filled on EOS */ - if (sq->is_eos || sq->is_segment_done) - return TRUE; + if (sq->is_eos || sq->is_segment_done) { + res = TRUE; + goto done; + } /* we never go past the max visible items unless we are in buffering mode */ - if (!mq->use_buffering && IS_FILLED (sq, visible, visible)) - return TRUE; + if (!mq->use_buffering && IS_FILLED (sq, visible, visible)) { + res = TRUE; + goto done; + } /* check time or bytes */ res = IS_FILLED (sq, bytes, bytes); @@ -2896,6 +3055,8 @@ single_queue_check_full (GstDataQueue * dataq, guint visible, guint bytes, } else res |= IS_FILLED (sq, time, sq->cur_time); } +done: + gst_object_unref (mq); return res; } @@ -2906,6 +3067,8 @@ gst_single_queue_flush_queue (GstSingleQueue * sq, gboolean full) GstDataQueueItem *sitem; GstMultiQueueItem *mitem; gboolean was_flushing = FALSE; + GstPad *srcpad = g_weak_ref_get (&sq->srcpad); + GstMultiQueue *mq = g_weak_ref_get (&sq->mqueue); while (!gst_data_queue_is_empty (sq->queue)) { GstMiniObject *data; @@ -2925,23 +3088,27 @@ gst_single_queue_flush_queue (GstSingleQueue * sq, gboolean full) data = sitem->object; if (!full && !mitem->is_query && GST_IS_EVENT (data) - && GST_EVENT_IS_STICKY (data) + && srcpad && GST_EVENT_IS_STICKY (data) && GST_EVENT_TYPE (data) != GST_EVENT_SEGMENT && GST_EVENT_TYPE (data) != GST_EVENT_EOS) { - gst_pad_store_sticky_event (sq->srcpad, GST_EVENT_CAST (data)); + gst_pad_store_sticky_event (srcpad, GST_EVENT_CAST (data)); } sitem->destroy (sitem); } + gst_clear_object (&srcpad); gst_data_queue_flush (sq->queue); if (was_flushing) gst_data_queue_set_flushing (sq->queue, TRUE); - GST_MULTI_QUEUE_MUTEX_LOCK (sq->mqueue); - update_buffering (sq->mqueue, sq); - GST_MULTI_QUEUE_MUTEX_UNLOCK (sq->mqueue); - gst_multi_queue_post_buffering (sq->mqueue); + if (mq) { + GST_MULTI_QUEUE_MUTEX_LOCK (mq); + update_buffering (mq, sq); + GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); + gst_multi_queue_post_buffering (mq); + gst_object_unref (mq); + } } static void @@ -2953,6 +3120,9 @@ gst_single_queue_unref (GstSingleQueue * sq) g_object_unref (sq->queue); g_cond_clear (&sq->turn); g_cond_clear (&sq->query_handled); + g_weak_ref_clear (&sq->sinkpad); + g_weak_ref_clear (&sq->srcpad); + g_weak_ref_clear (&sq->mqueue); g_free (sq); } } @@ -2969,8 +3139,8 @@ gst_single_queue_ref (GstSingleQueue * squeue) static GstSingleQueue * gst_single_queue_new (GstMultiQueue * mqueue, guint id) { + GstPad *srcpad, *sinkpad; GstSingleQueue *sq; - GstMultiQueuePad *mqpad; GstPadTemplate *templ; gchar *name; GList *tmp; @@ -3019,7 +3189,7 @@ gst_single_queue_new (GstMultiQueue * mqueue, guint id) GST_DEBUG_OBJECT (mqueue, "Creating GstSingleQueue id:%d", sq->id); - sq->mqueue = mqueue; + g_weak_ref_init (&sq->mqueue, mqueue); sq->srcresult = GST_FLOW_FLUSHING; sq->pushed = FALSE; sq->queue = gst_data_queue_new ((GstDataQueueCheckFullFunction) @@ -3047,45 +3217,45 @@ gst_single_queue_new (GstMultiQueue * mqueue, guint id) name = g_strdup_printf ("sink_%u", sq->id); templ = gst_static_pad_template_get (&sinktemplate); - sq->sinkpad = g_object_new (GST_TYPE_MULTIQUEUE_PAD, "name", name, + sinkpad = g_object_new (GST_TYPE_MULTIQUEUE_PAD, "name", name, "direction", templ->direction, "template", templ, NULL); + g_weak_ref_init (&sq->sinkpad, sinkpad); gst_object_unref (templ); g_free (name); - mqpad = (GstMultiQueuePad *) sq->sinkpad; - mqpad->sq = sq; + GST_MULTIQUEUE_PAD (sinkpad)->sq = sq; - gst_pad_set_chain_function (sq->sinkpad, + gst_pad_set_chain_function (sinkpad, GST_DEBUG_FUNCPTR (gst_multi_queue_chain)); - gst_pad_set_activatemode_function (sq->sinkpad, + gst_pad_set_activatemode_function (sinkpad, GST_DEBUG_FUNCPTR (gst_multi_queue_sink_activate_mode)); - gst_pad_set_event_full_function (sq->sinkpad, + gst_pad_set_event_full_function (sinkpad, GST_DEBUG_FUNCPTR (gst_multi_queue_sink_event)); - gst_pad_set_query_function (sq->sinkpad, + gst_pad_set_query_function (sinkpad, GST_DEBUG_FUNCPTR (gst_multi_queue_sink_query)); - gst_pad_set_iterate_internal_links_function (sq->sinkpad, + gst_pad_set_iterate_internal_links_function (sinkpad, GST_DEBUG_FUNCPTR (gst_multi_queue_iterate_internal_links)); - GST_OBJECT_FLAG_SET (sq->sinkpad, GST_PAD_FLAG_PROXY_CAPS); + GST_OBJECT_FLAG_SET (sinkpad, GST_PAD_FLAG_PROXY_CAPS); name = g_strdup_printf ("src_%u", sq->id); templ = gst_static_pad_template_get (&srctemplate); - sq->srcpad = g_object_new (GST_TYPE_MULTIQUEUE_PAD, "name", name, + srcpad = g_object_new (GST_TYPE_MULTIQUEUE_PAD, "name", name, "direction", templ->direction, "template", templ, NULL); + g_weak_ref_init (&sq->srcpad, srcpad); gst_object_unref (templ); g_free (name); - mqpad = (GstMultiQueuePad *) sq->srcpad; - mqpad->sq = gst_single_queue_ref (sq); + GST_MULTIQUEUE_PAD (srcpad)->sq = gst_single_queue_ref (sq); - gst_pad_set_activatemode_function (sq->srcpad, + gst_pad_set_activatemode_function (srcpad, GST_DEBUG_FUNCPTR (gst_multi_queue_src_activate_mode)); - gst_pad_set_event_function (sq->srcpad, + gst_pad_set_event_function (srcpad, GST_DEBUG_FUNCPTR (gst_multi_queue_src_event)); - gst_pad_set_query_function (sq->srcpad, + gst_pad_set_query_function (srcpad, GST_DEBUG_FUNCPTR (gst_multi_queue_src_query)); - gst_pad_set_iterate_internal_links_function (sq->srcpad, + gst_pad_set_iterate_internal_links_function (srcpad, GST_DEBUG_FUNCPTR (gst_multi_queue_iterate_internal_links)); - GST_OBJECT_FLAG_SET (sq->srcpad, GST_PAD_FLAG_PROXY_CAPS); + GST_OBJECT_FLAG_SET (srcpad, GST_PAD_FLAG_PROXY_CAPS); GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue); @@ -3094,11 +3264,11 @@ gst_single_queue_new (GstMultiQueue * mqueue, guint id) * between activating and adding */ g_rec_mutex_lock (GST_STATE_GET_LOCK (mqueue)); if (GST_STATE_TARGET (mqueue) != GST_STATE_NULL) { - gst_pad_set_active (sq->srcpad, TRUE); - gst_pad_set_active (sq->sinkpad, TRUE); + gst_pad_set_active (srcpad, TRUE); + gst_pad_set_active (sinkpad, TRUE); } - gst_element_add_pad (GST_ELEMENT (mqueue), sq->srcpad); - gst_element_add_pad (GST_ELEMENT (mqueue), sq->sinkpad); + gst_element_add_pad (GST_ELEMENT (mqueue), srcpad); + gst_element_add_pad (GST_ELEMENT (mqueue), sinkpad); if (GST_STATE_TARGET (mqueue) != GST_STATE_NULL) { gst_single_queue_start (mqueue, sq); }