gst/playback/gstqueue2.c: Add pull based scheduling and fix some deadlocks. Fixes #444523.

Original commit message from CVS:
Patch by: Thiago Sousa Santos <thiagossantos at gmail dot com>
* gst/playback/gstqueue2.c: (gst_queue_init),
(gst_queue_handle_sink_event), (gst_queue_chain),
(gst_queue_get_range), (gst_queue_src_checkgetrange_function),
(gst_queue_sink_activate_push), (gst_queue_src_activate_push),
(gst_queue_src_activate_pull):
Add pull based scheduling and fix some deadlocks. Fixes #444523.
Does not yet completely work because duration queries upstream won't
block yet.
This commit is contained in:
Thiago Sousa Santos 2007-06-06 13:36:26 +00:00 committed by Sebastian Dröge
parent 28dca2d062
commit 2d77f57853

View file

@ -2,7 +2,7 @@
* Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
* 2003 Colin Walters <cwalters@gnome.org>
* 2000,2005,2007 Wim Taymans <wim@fluendo.com>
* 2007 Thiago Sousa Santos <thiagossantos at gmail dot com>
* 2007 Thiago Sousa Santos <thiagoss@lcc.ufcg.edu.br>
*
* gstqueue2.c:
*
@ -52,7 +52,6 @@
#include "config.h"
#endif
#include <stdio.h>
#include <glib/gstdio.h>
#include <gst/gst.h>
@ -121,6 +120,8 @@ enum
(G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_QUEUE))
#define GST_IS_QUEUE_CLASS(klass) \
(G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_QUEUE))
#define GST_QUEUE_CAST(obj) \
((GstQueue *)(obj))
typedef struct _GstQueue GstQueue;
typedef struct _GstQueueSize GstQueueSize;
@ -195,6 +196,7 @@ struct _GstQueue
* because we can't save it on the file */
gboolean segment_event_received;
GstEvent *starting_segment;
};
struct _GstQueueClass
@ -325,6 +327,11 @@ static gboolean gst_queue_handle_src_query (GstPad * pad, GstQuery * query);
static GstCaps *gst_queue_getcaps (GstPad * pad);
static GstFlowReturn gst_queue_get_range (GstPad * pad, guint64 offset,
guint length, GstBuffer ** buffer);
static gboolean gst_queue_src_checkgetrange_function (GstPad * pad);
static gboolean gst_queue_src_activate_pull (GstPad * pad, gboolean active);
static gboolean gst_queue_src_activate_push (GstPad * pad, gboolean active);
static gboolean gst_queue_sink_activate_push (GstPad * pad, gboolean active);
static GstStateChangeReturn gst_queue_change_state (GstElement * element,
@ -427,8 +434,14 @@ gst_queue_init (GstQueue * queue, GstQueueClass * g_class)
queue->srcpad = gst_pad_new_from_static_template (&srctemplate, "src");
gst_pad_set_activatepull_function (queue->srcpad,
GST_DEBUG_FUNCPTR (gst_queue_src_activate_pull));
gst_pad_set_activatepush_function (queue->srcpad,
GST_DEBUG_FUNCPTR (gst_queue_src_activate_push));
gst_pad_set_getrange_function (queue->srcpad,
GST_DEBUG_FUNCPTR (gst_queue_get_range));
gst_pad_set_checkgetrange_function (queue->srcpad,
GST_DEBUG_FUNCPTR (gst_queue_src_checkgetrange_function));
gst_pad_set_acceptcaps_function (queue->srcpad,
GST_DEBUG_FUNCPTR (gst_queue_acceptcaps));
gst_pad_set_getcaps_function (queue->srcpad,
@ -857,6 +870,7 @@ eos:
}
}
/* should be called with QUEUE_LOCK */
static GstMiniObject *
gst_queue_read_item_from_file (GstQueue * queue)
{
@ -1173,6 +1187,9 @@ done:
/* ERRORS */
out_flushing:
{
GST_DEBUG_OBJECT (queue, "we are flushing");
GST_QUEUE_MUTEX_UNLOCK (queue);
gst_buffer_unref (event);
return FALSE;
}
@ -1262,6 +1279,7 @@ out_flushing:
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
"exit because task paused, reason: %s", gst_flow_get_name (ret));
GST_QUEUE_MUTEX_UNLOCK (queue);
gst_buffer_unref (buffer);
return ret;
@ -1318,6 +1336,8 @@ out_flushing:
}
}
/* called repeadedly with @pad as the source pad. This function should push out
* data to the peer element. */
static void
gst_queue_loop (GstPad * pad)
{
@ -1419,6 +1439,51 @@ gst_queue_handle_src_query (GstPad * pad, GstQuery * query)
return TRUE;
}
static GstFlowReturn
gst_queue_get_range (GstPad * pad, guint64 offset, guint length,
GstBuffer ** buffer)
{
GstQueue *queue;
GstFlowReturn ret;
queue = GST_QUEUE_CAST (gst_pad_get_parent (pad));
GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
length = (length == -1) ? DEFAULT_BUFFER_SIZE : length;
offset = (offset == -1) ? queue->reading_pos : offset;
/* function will block when the range is not yet available */
ret = gst_queue_create_read (queue, offset, length, buffer);
GST_QUEUE_MUTEX_UNLOCK (queue);
gst_object_unref (queue);
return ret;
/* ERRORS */
out_flushing:
{
GST_DEBUG_OBJECT (queue, "we are flushing");
GST_QUEUE_MUTEX_UNLOCK (queue);
return GST_FLOW_WRONG_STATE;
}
}
static gboolean
gst_queue_src_checkgetrange_function (GstPad * pad)
{
GstQueue *queue;
gboolean ret;
queue = GST_QUEUE (gst_pad_get_parent (pad));
/* we can operate in pull mode when we are using a tempfile */
ret = QUEUE_IS_USING_TEMP_FILE (queue);
gst_object_unref (GST_OBJECT (queue));
return ret;
}
/* sink currently only operates in push mode */
static gboolean
gst_queue_sink_activate_push (GstPad * pad, gboolean active)
{
@ -1429,12 +1494,14 @@ gst_queue_sink_activate_push (GstPad * pad, gboolean active)
if (active) {
GST_QUEUE_MUTEX_LOCK (queue);
GST_DEBUG_OBJECT (queue, "activating push mode");
queue->srcresult = GST_FLOW_OK;
reset_rate_timer (queue);
GST_QUEUE_MUTEX_UNLOCK (queue);
} else {
/* unblock chain function */
GST_QUEUE_MUTEX_LOCK (queue);
GST_DEBUG_OBJECT (queue, "deactivating push mode");
queue->srcresult = GST_FLOW_WRONG_STATE;
gst_queue_locked_flush (queue);
GST_QUEUE_MUTEX_UNLOCK (queue);
@ -1445,6 +1512,8 @@ gst_queue_sink_activate_push (GstPad * pad, gboolean active)
return result;
}
/* src operating in push mode, we start a task on the source pad that pushes out
* buffers from the queue */
static gboolean
gst_queue_src_activate_push (GstPad * pad, gboolean active)
{
@ -1455,12 +1524,14 @@ gst_queue_src_activate_push (GstPad * pad, gboolean active)
if (active) {
GST_QUEUE_MUTEX_LOCK (queue);
GST_DEBUG_OBJECT (queue, "activating push mode");
queue->srcresult = GST_FLOW_OK;
result = gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad);
GST_QUEUE_MUTEX_UNLOCK (queue);
} else {
/* unblock loop function */
GST_QUEUE_MUTEX_LOCK (queue);
GST_DEBUG_OBJECT (queue, "deactivating push mode");
queue->srcresult = GST_FLOW_WRONG_STATE;
/* the item add signal will unblock */
g_cond_signal (queue->item_add);
@ -1475,6 +1546,45 @@ gst_queue_src_activate_push (GstPad * pad, gboolean active)
return result;
}
/* pull mode, downstream will call our getrange function */
static gboolean
gst_queue_src_activate_pull (GstPad * pad, gboolean active)
{
gboolean result;
GstQueue *queue;
queue = GST_QUEUE (gst_pad_get_parent (pad));
if (active) {
if (QUEUE_IS_USING_TEMP_FILE (queue)) {
GST_QUEUE_MUTEX_LOCK (queue);
GST_DEBUG_OBJECT (queue, "activating pull mode");
queue->srcresult = GST_FLOW_OK;
result = TRUE;
GST_QUEUE_MUTEX_UNLOCK (queue);
} else {
GST_QUEUE_MUTEX_LOCK (queue);
GST_DEBUG_OBJECT (queue, "no temp file, cannot activate pull mode");
/* this is not allowed, we cannot operate in pull mode without a temp
* file. */
queue->srcresult = GST_FLOW_WRONG_STATE;
result = FALSE;
GST_QUEUE_MUTEX_UNLOCK (queue);
}
} else {
GST_QUEUE_MUTEX_LOCK (queue);
GST_DEBUG_OBJECT (queue, "deactivating pull mode");
queue->srcresult = GST_FLOW_WRONG_STATE;
/* this will unlock getrange */
g_cond_signal (queue->item_add);
result = TRUE;
GST_QUEUE_MUTEX_UNLOCK (queue);
}
gst_object_unref (queue);
return result;
}
static GstStateChangeReturn
gst_queue_change_state (GstElement * element, GstStateChange transition)
{