From 2d77f5785391596aefd1cb8bb75f81cd1ebd2c74 Mon Sep 17 00:00:00 2001 From: Thiago Sousa Santos Date: Wed, 6 Jun 2007 13:36:26 +0000 Subject: [PATCH] gst/playback/gstqueue2.c: Add pull based scheduling and fix some deadlocks. Fixes #444523. Original commit message from CVS: Patch by: Thiago Sousa Santos * gst/playback/gstqueue2.c: (gst_queue_init), (gst_queue_handle_sink_event), (gst_queue_chain), (gst_queue_get_range), (gst_queue_src_checkgetrange_function), (gst_queue_sink_activate_push), (gst_queue_src_activate_push), (gst_queue_src_activate_pull): Add pull based scheduling and fix some deadlocks. Fixes #444523. Does not yet completely work because duration queries upstream won't block yet. --- gst/playback/gstqueue2.c | 116 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 113 insertions(+), 3 deletions(-) diff --git a/gst/playback/gstqueue2.c b/gst/playback/gstqueue2.c index 10f4b29491..bd0da0acc2 100644 --- a/gst/playback/gstqueue2.c +++ b/gst/playback/gstqueue2.c @@ -2,7 +2,7 @@ * Copyright (C) 1999,2000 Erik Walthinsen * 2003 Colin Walters * 2000,2005,2007 Wim Taymans - * 2007 Thiago Sousa Santos + * 2007 Thiago Sousa Santos * * gstqueue2.c: * @@ -40,7 +40,7 @@ * * The default queue size limits are 100 buffers, 2MB of data, or * two seconds worth of data, whichever is reached first. - * + * * If you set temp-location, the element will buffer data on the file * specified by it. By using this, it will buffer the entire * stream data on the file independently of the queue size limits, they @@ -52,7 +52,6 @@ #include "config.h" #endif -#include #include #include @@ -121,6 +120,8 @@ enum (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_QUEUE)) #define GST_IS_QUEUE_CLASS(klass) \ (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_QUEUE)) +#define GST_QUEUE_CAST(obj) \ + ((GstQueue *)(obj)) typedef struct _GstQueue GstQueue; typedef struct _GstQueueSize GstQueueSize; @@ -195,6 +196,7 @@ struct _GstQueue * because we can't save it on the file */ gboolean segment_event_received; GstEvent *starting_segment; + }; struct _GstQueueClass @@ -325,6 +327,11 @@ static gboolean gst_queue_handle_src_query (GstPad * pad, GstQuery * query); static GstCaps *gst_queue_getcaps (GstPad * pad); +static GstFlowReturn gst_queue_get_range (GstPad * pad, guint64 offset, + guint length, GstBuffer ** buffer); +static gboolean gst_queue_src_checkgetrange_function (GstPad * pad); + +static gboolean gst_queue_src_activate_pull (GstPad * pad, gboolean active); static gboolean gst_queue_src_activate_push (GstPad * pad, gboolean active); static gboolean gst_queue_sink_activate_push (GstPad * pad, gboolean active); static GstStateChangeReturn gst_queue_change_state (GstElement * element, @@ -427,8 +434,14 @@ gst_queue_init (GstQueue * queue, GstQueueClass * g_class) queue->srcpad = gst_pad_new_from_static_template (&srctemplate, "src"); + gst_pad_set_activatepull_function (queue->srcpad, + GST_DEBUG_FUNCPTR (gst_queue_src_activate_pull)); gst_pad_set_activatepush_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_src_activate_push)); + gst_pad_set_getrange_function (queue->srcpad, + GST_DEBUG_FUNCPTR (gst_queue_get_range)); + gst_pad_set_checkgetrange_function (queue->srcpad, + GST_DEBUG_FUNCPTR (gst_queue_src_checkgetrange_function)); gst_pad_set_acceptcaps_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_acceptcaps)); gst_pad_set_getcaps_function (queue->srcpad, @@ -857,6 +870,7 @@ eos: } } +/* should be called with QUEUE_LOCK */ static GstMiniObject * gst_queue_read_item_from_file (GstQueue * queue) { @@ -1173,6 +1187,9 @@ done: /* ERRORS */ out_flushing: { + GST_DEBUG_OBJECT (queue, "we are flushing"); + GST_QUEUE_MUTEX_UNLOCK (queue); + gst_buffer_unref (event); return FALSE; } @@ -1262,6 +1279,7 @@ out_flushing: GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because task paused, reason: %s", gst_flow_get_name (ret)); GST_QUEUE_MUTEX_UNLOCK (queue); + gst_buffer_unref (buffer); return ret; @@ -1318,6 +1336,8 @@ out_flushing: } } +/* called repeadedly with @pad as the source pad. This function should push out + * data to the peer element. */ static void gst_queue_loop (GstPad * pad) { @@ -1419,6 +1439,51 @@ gst_queue_handle_src_query (GstPad * pad, GstQuery * query) return TRUE; } +static GstFlowReturn +gst_queue_get_range (GstPad * pad, guint64 offset, guint length, + GstBuffer ** buffer) +{ + GstQueue *queue; + GstFlowReturn ret; + + queue = GST_QUEUE_CAST (gst_pad_get_parent (pad)); + + GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing); + length = (length == -1) ? DEFAULT_BUFFER_SIZE : length; + offset = (offset == -1) ? queue->reading_pos : offset; + + /* function will block when the range is not yet available */ + ret = gst_queue_create_read (queue, offset, length, buffer); + GST_QUEUE_MUTEX_UNLOCK (queue); + + gst_object_unref (queue); + + return ret; + + /* ERRORS */ +out_flushing: + { + GST_DEBUG_OBJECT (queue, "we are flushing"); + GST_QUEUE_MUTEX_UNLOCK (queue); + return GST_FLOW_WRONG_STATE; + } +} + +static gboolean +gst_queue_src_checkgetrange_function (GstPad * pad) +{ + GstQueue *queue; + gboolean ret; + + queue = GST_QUEUE (gst_pad_get_parent (pad)); + /* we can operate in pull mode when we are using a tempfile */ + ret = QUEUE_IS_USING_TEMP_FILE (queue); + gst_object_unref (GST_OBJECT (queue)); + + return ret; +} + +/* sink currently only operates in push mode */ static gboolean gst_queue_sink_activate_push (GstPad * pad, gboolean active) { @@ -1429,12 +1494,14 @@ gst_queue_sink_activate_push (GstPad * pad, gboolean active) if (active) { GST_QUEUE_MUTEX_LOCK (queue); + GST_DEBUG_OBJECT (queue, "activating push mode"); queue->srcresult = GST_FLOW_OK; reset_rate_timer (queue); GST_QUEUE_MUTEX_UNLOCK (queue); } else { /* unblock chain function */ GST_QUEUE_MUTEX_LOCK (queue); + GST_DEBUG_OBJECT (queue, "deactivating push mode"); queue->srcresult = GST_FLOW_WRONG_STATE; gst_queue_locked_flush (queue); GST_QUEUE_MUTEX_UNLOCK (queue); @@ -1445,6 +1512,8 @@ gst_queue_sink_activate_push (GstPad * pad, gboolean active) return result; } +/* src operating in push mode, we start a task on the source pad that pushes out + * buffers from the queue */ static gboolean gst_queue_src_activate_push (GstPad * pad, gboolean active) { @@ -1455,12 +1524,14 @@ gst_queue_src_activate_push (GstPad * pad, gboolean active) if (active) { GST_QUEUE_MUTEX_LOCK (queue); + GST_DEBUG_OBJECT (queue, "activating push mode"); queue->srcresult = GST_FLOW_OK; result = gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad); GST_QUEUE_MUTEX_UNLOCK (queue); } else { /* unblock loop function */ GST_QUEUE_MUTEX_LOCK (queue); + GST_DEBUG_OBJECT (queue, "deactivating push mode"); queue->srcresult = GST_FLOW_WRONG_STATE; /* the item add signal will unblock */ g_cond_signal (queue->item_add); @@ -1475,6 +1546,45 @@ gst_queue_src_activate_push (GstPad * pad, gboolean active) return result; } +/* pull mode, downstream will call our getrange function */ +static gboolean +gst_queue_src_activate_pull (GstPad * pad, gboolean active) +{ + gboolean result; + GstQueue *queue; + + queue = GST_QUEUE (gst_pad_get_parent (pad)); + + if (active) { + if (QUEUE_IS_USING_TEMP_FILE (queue)) { + GST_QUEUE_MUTEX_LOCK (queue); + GST_DEBUG_OBJECT (queue, "activating pull mode"); + queue->srcresult = GST_FLOW_OK; + result = TRUE; + GST_QUEUE_MUTEX_UNLOCK (queue); + } else { + GST_QUEUE_MUTEX_LOCK (queue); + GST_DEBUG_OBJECT (queue, "no temp file, cannot activate pull mode"); + /* this is not allowed, we cannot operate in pull mode without a temp + * file. */ + queue->srcresult = GST_FLOW_WRONG_STATE; + result = FALSE; + GST_QUEUE_MUTEX_UNLOCK (queue); + } + } else { + GST_QUEUE_MUTEX_LOCK (queue); + GST_DEBUG_OBJECT (queue, "deactivating pull mode"); + queue->srcresult = GST_FLOW_WRONG_STATE; + /* this will unlock getrange */ + g_cond_signal (queue->item_add); + result = TRUE; + GST_QUEUE_MUTEX_UNLOCK (queue); + } + gst_object_unref (queue); + + return result; +} + static GstStateChangeReturn gst_queue_change_state (GstElement * element, GstStateChange transition) {