diff --git a/gst/gstqueue.c b/gst/gstqueue.c index fc646bf371..efff3c940f 100644 --- a/gst/gstqueue.c +++ b/gst/gstqueue.c @@ -39,54 +39,76 @@ static GstElementDetails gst_queue_details = GST_ELEMENT_DETAILS ( /* Queue signals and args */ enum { - FULL, + SIGNAL_UNDERRUN, + SIGNAL_RUNNING, + SIGNAL_OVERRUN, LAST_SIGNAL }; enum { ARG_0, - ARG_LEVEL_BUFFERS, - ARG_LEVEL_BYTES, - ARG_LEVEL_TIME, - ARG_SIZE_BUFFERS, - ARG_SIZE_BYTES, - ARG_SIZE_TIME, + /* FIXME: don't we have another way of doing this + * "Gstreamer format" (frame/byte/time) queries? */ + ARG_CUR_LEVEL_BUFFERS, + ARG_CUR_LEVEL_BYTES, + ARG_CUR_LEVEL_TIME, + ARG_MAX_SIZE_BUFFERS, + ARG_MAX_SIZE_BYTES, + ARG_MAX_SIZE_TIME, + ARG_MIN_TRESHOLD_BUFFERS, + ARG_MIN_TRESHOLD_BYTES, + ARG_MIN_TRESHOLD_TIME, ARG_LEAKY, - ARG_LEVEL, - ARG_MAX_LEVEL, - ARG_MIN_THRESHOLD_BYTES, ARG_MAY_DEADLOCK, - ARG_BLOCK_TIMEOUT, + ARG_BLOCK_TIMEOUT + /* FILL ME */ }; -static void gst_queue_base_init (gpointer g_class); -static void gst_queue_class_init (gpointer g_class, - gpointer class_data); -static void gst_queue_init (GTypeInstance *instance, - gpointer g_class); -static void gst_queue_dispose (GObject *object); +typedef struct _GstQueueEventResponse { + GstEvent *event; + gboolean ret, handled; +} GstQueueEventResponse; -static void gst_queue_set_property (GObject *object, guint prop_id, - const GValue *value, GParamSpec *pspec); -static void gst_queue_get_property (GObject *object, guint prop_id, - GValue *value, GParamSpec *pspec); +static void gst_queue_base_init (GstQueueClass *klass); +static void gst_queue_class_init (GstQueueClass *klass); +static void gst_queue_init (GstQueue *queue); +static void gst_queue_dispose (GObject *object); -static void gst_queue_chain (GstPad *pad, GstData *data); -static GstData * gst_queue_get (GstPad *pad); -static GstBufferPool* gst_queue_get_bufferpool (GstPad *pad); +static void gst_queue_set_property (GObject *object, + guint prop_id, + const GValue *value, + GParamSpec *pspec); +static void gst_queue_get_property (GObject *object, + guint prop_id, + GValue *value, + GParamSpec *pspec); + +static GstCaps *gst_queue_getcaps (GstPad *pad, + GstCaps *caps); +static GstPadLinkReturn + gst_queue_link (GstPad *pad, + GstCaps *caps); +static void gst_queue_chain (GstPad *pad, + GstData *data); +static GstData *gst_queue_get (GstPad *pad); +static GstBufferPool * + gst_queue_get_bufferpool (GstPad *pad); -static gboolean gst_queue_handle_src_event (GstPad *pad, GstEvent *event); +static gboolean gst_queue_handle_src_event (GstPad *pad, + GstEvent *event); +static void gst_queue_locked_flush (GstQueue *queue); -static void gst_queue_locked_flush (GstQueue *queue); - -static GstElementStateReturn gst_queue_change_state (GstElement *element); -static gboolean gst_queue_release_locks (GstElement *element); +static GstElementStateReturn + gst_queue_change_state (GstElement *element); +static gboolean gst_queue_release_locks (GstElement *element); -#define GST_TYPE_QUEUE_LEAKY (queue_leaky_get_type()) +#define GST_TYPE_QUEUE_LEAKY (queue_leaky_get_type ()) + static GType -queue_leaky_get_type(void) { +queue_leaky_get_type (void) +{ static GType queue_leaky_type = 0; static GEnumValue queue_leaky[] = { { GST_QUEUE_NO_LEAK, "0", "Not Leaky" }, @@ -104,115 +126,127 @@ static GstElementClass *parent_class = NULL; static guint gst_queue_signals[LAST_SIGNAL] = { 0 }; GType -gst_queue_get_type(void) +gst_queue_get_type (void) { static GType queue_type = 0; if (!queue_type) { static const GTypeInfo queue_info = { - sizeof(GstQueueClass), - gst_queue_base_init, + sizeof (GstQueueClass), + (GBaseInitFunc) gst_queue_base_init, NULL, - gst_queue_class_init, + (GClassInitFunc) gst_queue_class_init, NULL, NULL, - sizeof(GstQueue), + sizeof (GstQueue), 4, - gst_queue_init, + (GInstanceInitFunc) gst_queue_init, NULL }; - queue_type = g_type_register_static (GST_TYPE_ELEMENT, "GstQueue", &queue_info, 0); + + queue_type = g_type_register_static (GST_TYPE_ELEMENT, + "GstQueue", &queue_info, 0); } + return queue_type; } static void -gst_queue_base_init (gpointer g_class) +gst_queue_base_init (GstQueueClass *klass) { - GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class); + GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass); gst_element_class_set_details (gstelement_class, &gst_queue_details); } static void -gst_queue_class_init (gpointer g_class, gpointer class_data) +gst_queue_class_init (GstQueueClass *klass) { - GObjectClass *gobject_class = G_OBJECT_CLASS (g_class); - GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class); - GstQueueClass *gstqueue_class = GST_QUEUE_CLASS (g_class); + GObjectClass *gobject_class = G_OBJECT_CLASS (klass); + GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass); - parent_class = g_type_class_peek_parent (g_class); + parent_class = g_type_class_peek_parent (klass); - gst_queue_signals[FULL] = - g_signal_new ("full", G_TYPE_FROM_CLASS (gstqueue_class), G_SIGNAL_RUN_FIRST, - G_STRUCT_OFFSET (GstQueueClass, full), NULL, NULL, + /* signals */ + gst_queue_signals[SIGNAL_UNDERRUN] = + g_signal_new ("underrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST, + G_STRUCT_OFFSET (GstQueueClass, underrun), NULL, NULL, + g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0); + gst_queue_signals[SIGNAL_RUNNING] = + g_signal_new ("running", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST, + G_STRUCT_OFFSET (GstQueueClass, running), NULL, NULL, + g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0); + gst_queue_signals[SIGNAL_OVERRUN] = + g_signal_new ("overrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST, + G_STRUCT_OFFSET (GstQueueClass, overrun), NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0); - g_object_class_install_property (G_OBJECT_CLASS (gstqueue_class), ARG_LEAKY, - g_param_spec_enum ("leaky", "Leaky", "Where the queue leaks, if at all.", - GST_TYPE_QUEUE_LEAKY, GST_QUEUE_NO_LEAK, G_PARAM_READWRITE)); - g_object_class_install_property (G_OBJECT_CLASS (gstqueue_class), ARG_LEVEL, - g_param_spec_int ("level", "Level", "How many buffers are in the queue.", - 0, G_MAXINT, 0, G_PARAM_READABLE)); - g_object_class_install_property (G_OBJECT_CLASS (gstqueue_class), ARG_MAX_LEVEL, - g_param_spec_int ("max_level", "Maximum Level", "How many buffers the queue holds.", - 0, G_MAXINT, 100, G_PARAM_READWRITE)); - g_object_class_install_property (G_OBJECT_CLASS (gstqueue_class), ARG_MIN_THRESHOLD_BYTES, - g_param_spec_int ("min_threshold_bytes", "Minimum Threshold", - "Minimum bytes required before signalling not_empty to reader.", - 0, G_MAXINT, 0, G_PARAM_READWRITE)); - g_object_class_install_property (G_OBJECT_CLASS (gstqueue_class), ARG_MAY_DEADLOCK, - g_param_spec_boolean ("may_deadlock", "May Deadlock", "The queue may deadlock if it's full and not PLAYING", - TRUE, G_PARAM_READWRITE)); - g_object_class_install_property (G_OBJECT_CLASS (gstqueue_class), ARG_BLOCK_TIMEOUT, - g_param_spec_int ("block_timeout", "Timeout for Block", - "Microseconds until blocked queue times out and returns filler event. " - "Value of -1 disables timeout", - -1, G_MAXINT, -1, G_PARAM_READWRITE)); - gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_queue_dispose); - gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_queue_set_property); - gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_queue_get_property); + /* properties */ + g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_BYTES, + g_param_spec_uint ("current-level-bytes", "Current level (kB)", + "Current amount of data in the queue (bytes)", + 0, G_MAXUINT, 0, G_PARAM_READABLE)); + g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_BUFFERS, + g_param_spec_uint ("current-level-buffers", "Current level (buffers)", + "Current number of buffers in the queue", + 0, G_MAXUINT, 0, G_PARAM_READABLE)); + g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_TIME, + g_param_spec_uint64 ("current-level-time", "Current level (ns)", + "Current amount of data in the queue (in ns)", + 0, G_MAXUINT64, 0, G_PARAM_READABLE)); - gstelement_class->change_state = GST_DEBUG_FUNCPTR(gst_queue_change_state); - gstelement_class->release_locks = GST_DEBUG_FUNCPTR(gst_queue_release_locks); -} + g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BYTES, + g_param_spec_uint ("max-size-bytes", "Max. size (kB)", + "Max. amount of data in the queue (bytes, 0=disable)", + 0, G_MAXUINT, 0, G_PARAM_READWRITE)); + g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BUFFERS, + g_param_spec_uint ("max-size-buffers", "Max. size (buffers)", + "Max. number of buffers in the queue (0=disable)", + 0, G_MAXUINT, 0, G_PARAM_READWRITE)); + g_object_class_install_property (gobject_class, ARG_MAX_SIZE_TIME, + g_param_spec_uint64 ("max-size-time", "Max. size (ns)", + "Max. amount of data in the queue (in ns, 0=disable)", + 0, G_MAXUINT64, 0, G_PARAM_READWRITE)); -static GstPadLinkReturn -gst_queue_link (GstPad *pad, GstCaps *caps) -{ - GstQueue *queue = GST_QUEUE (gst_pad_get_parent (pad)); - GstPad *otherpad; + g_object_class_install_property (gobject_class, ARG_MIN_TRESHOLD_BYTES, + g_param_spec_uint ("min-treshold-bytes", "Min. treshold (kB)", + "Min. amount of data in the queue to allow reading (bytes, 0=disable)", + 0, G_MAXUINT, 0, G_PARAM_READWRITE)); + g_object_class_install_property (gobject_class, ARG_MIN_TRESHOLD_BUFFERS, + g_param_spec_uint ("min-treshold-buffers", "Min. treshold (buffers)", + "Min. number of buffers in the queue to allow reading (0=disable)", + 0, G_MAXUINT, 0, G_PARAM_READWRITE)); + g_object_class_install_property (gobject_class, ARG_MIN_TRESHOLD_TIME, + g_param_spec_uint64 ("min-treshold-time", "Min. treshold (ns)", + "Min. amount of data in the queue to allow reading (in ns, 0=disable)", + 0, G_MAXUINT64, 0, G_PARAM_READWRITE)); - if (pad == queue->srcpad) - otherpad = queue->sinkpad; - else - otherpad = queue->srcpad; + g_object_class_install_property (gobject_class, ARG_LEAKY, + g_param_spec_enum ("leaky", "Leaky", + "Where the queue leaks, if at all", + GST_TYPE_QUEUE_LEAKY, GST_QUEUE_NO_LEAK, G_PARAM_READWRITE)); + g_object_class_install_property (gobject_class, ARG_MAY_DEADLOCK, + g_param_spec_boolean ("may_deadlock", "May Deadlock", + "The queue may deadlock if it's full and not PLAYING", + TRUE, G_PARAM_READWRITE)); + g_object_class_install_property (gobject_class, ARG_BLOCK_TIMEOUT, + g_param_spec_uint64 ("block_timeout", "Timeout for Block", + "Nanoseconds until blocked queue times out and returns filler event. " + "Value of -1 disables timeout", + 0, G_MAXUINT64, -1, G_PARAM_READWRITE)); - return gst_pad_proxy_link (otherpad, caps); -} + /* set several parent class virtual functions */ + gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_queue_dispose); + gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_queue_set_property); + gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_queue_get_property); -static GstCaps* -gst_queue_getcaps (GstPad *pad, GstCaps *caps) -{ - GstQueue *queue = GST_QUEUE (gst_pad_get_parent (pad)); - GstPad *otherpad; - - if (pad == queue->srcpad) - otherpad = GST_PAD_PEER (queue->sinkpad); - else - otherpad = GST_PAD_PEER (queue->srcpad); - - if (otherpad) - return gst_pad_get_caps (otherpad); - - return NULL; + gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_queue_change_state); + gstelement_class->release_locks = GST_DEBUG_FUNCPTR (gst_queue_release_locks); } static void -gst_queue_init (GTypeInstance *instance, gpointer g_class) +gst_queue_init (GstQueue *queue) { - GstQueue *queue = GST_QUEUE (instance); - /* scheduling on this kind of element is, well, interesting */ GST_FLAG_SET (queue, GST_ELEMENT_DECOUPLED); GST_FLAG_SET (queue, GST_ELEMENT_EVENT_AWARE); @@ -233,26 +267,31 @@ gst_queue_init (GTypeInstance *instance, gpointer g_class) gst_pad_set_event_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_handle_src_event)); gst_pad_set_active (queue->srcpad, TRUE); + queue->cur_level.buffers = 0; /* no content */ + queue->cur_level.bytes = 0; /* no content */ + queue->cur_level.time = 0; /* no content */ + queue->max_size.buffers = 100; /* max. 100 buffers */ + queue->max_size.bytes = 1024 * 1024; /* max. 1 MB */ + queue->max_size.time = GST_SECOND; /* max. 1 sec. */ + queue->min_treshold.buffers = 0; /* no treshold */ + queue->min_treshold.bytes = 0; /* no treshold */ + queue->min_treshold.time = 0; /* no treshold */ + queue->leaky = GST_QUEUE_NO_LEAK; - queue->queue = NULL; - queue->level_buffers = 0; - queue->level_bytes = 0; - queue->level_time = G_GINT64_CONSTANT (0); - queue->size_buffers = 100; /* 100 buffers */ - queue->size_bytes = 100 * 1024; /* 100KB */ - queue->size_time = GST_SECOND; /* 1sec */ - queue->min_threshold_bytes = 0; queue->may_deadlock = TRUE; - queue->block_timeout = -1; + queue->block_timeout = GST_CLOCK_TIME_NONE; queue->interrupt = FALSE; queue->flush = FALSE; queue->qlock = g_mutex_new (); - queue->not_empty = g_cond_new (); - queue->not_full = g_cond_new (); - queue->events = g_async_queue_new(); + queue->item_add = g_cond_new (); + queue->item_del = g_cond_new (); + queue->event_done = g_cond_new (); + queue->events = g_queue_new (); queue->queue = g_queue_new (); - GST_CAT_DEBUG_OBJECT (GST_CAT_THREAD, queue, "initialized queue's not_empty & not_full conditions"); + + GST_CAT_DEBUG_OBJECT (GST_CAT_THREAD, queue, + "initialized queue's not_empty & not_full conditions"); } static void @@ -262,54 +301,102 @@ gst_queue_dispose (GObject *object) gst_element_set_state (GST_ELEMENT (queue), GST_STATE_NULL); - g_mutex_free (queue->qlock); - g_cond_free (queue->not_empty); - g_cond_free (queue->not_full); + while (!g_queue_is_empty (queue->queue)) { + GstData *data = g_queue_pop_head (queue->queue); + gst_data_unref (data); + } g_queue_free (queue->queue); + g_mutex_free (queue->qlock); + g_cond_free (queue->item_add); + g_cond_free (queue->item_del); + g_cond_free (queue->event_done); + while (!g_queue_is_empty (queue->events)) { + GstEvent *event = g_queue_pop_head (queue->events); + gst_event_unref (event); + } - g_async_queue_unref(queue->events); - - G_OBJECT_CLASS (parent_class)->dispose (object); + if (G_OBJECT_CLASS (parent_class)->dispose) + G_OBJECT_CLASS (parent_class)->dispose (object); } -static GstBufferPool* +static GstPad * +gst_queue_otherpad (GstPad *pad) +{ + GstQueue *queue = GST_QUEUE (gst_pad_get_parent (pad)); + GstPad *otherpad; + + if (pad == queue->srcpad) + otherpad = queue->sinkpad; + else + otherpad = queue->srcpad; + + return otherpad; +} + +static GstPadLinkReturn +gst_queue_link (GstPad *pad, + GstCaps *caps) +{ + return gst_pad_proxy_link (gst_queue_otherpad (pad), caps); +} + +static GstCaps * +gst_queue_getcaps (GstPad *pad, + GstCaps *caps) +{ + GstPad *otherpad = GST_PAD_PEER (gst_queue_otherpad (pad)); + + if (otherpad) + return gst_pad_get_caps (otherpad); + + return NULL; +} + +static GstBufferPool * gst_queue_get_bufferpool (GstPad *pad) { - GstQueue *queue; - - queue = GST_QUEUE (GST_OBJECT_PARENT (pad)); - - return gst_pad_get_bufferpool (queue->srcpad); -} - -static void -gst_queue_cleanup_data (gpointer data, const gpointer user_data) -{ - GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, user_data, "cleaning buffer %p", data); - - gst_data_unref (GST_DATA (data)); + return gst_pad_get_bufferpool (gst_queue_otherpad (pad)); } static void gst_queue_locked_flush (GstQueue *queue) { - gpointer data; - - while ((data = g_queue_pop_head (queue->queue))) { - gst_queue_cleanup_data (data, (gpointer) queue); + while (!g_queue_is_empty (queue->queue)) { + GstData *data = g_queue_pop_head (queue->queue); + gst_data_unref (data); } queue->timeval = NULL; - queue->level_buffers = 0; - queue->level_bytes = 0; - queue->level_time = G_GINT64_CONSTANT (0); + queue->cur_level.buffers = 0; + queue->cur_level.bytes = 0; + queue->cur_level.time = 0; + /* make sure any pending buffers to be added are flushed too */ queue->flush = TRUE; - /* signal not_full, since we apparently aren't full anymore */ - g_cond_signal (queue->not_full); + + /* we deleted something... */ + g_cond_signal (queue->item_del); } +#define STATUS(queue, msg) \ + GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, \ + "(%s:%s) " msg ": %u of %u-%u buffers, %u of %u-%u " \ + "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \ + "-%" G_GUINT64_FORMAT " ns, %u elements", \ + GST_DEBUG_PAD_NAME (pad), \ + queue->cur_level.buffers, \ + queue->min_treshold.buffers, \ + queue->max_size.buffers, \ + queue->cur_level.bytes, \ + queue->min_treshold.bytes, \ + queue->max_size.bytes, \ + queue->cur_level.time, \ + queue->min_treshold.time, \ + queue->max_size.time, \ + queue->queue->length) + static void -gst_queue_chain (GstPad *pad, GstData *data) +gst_queue_chain (GstPad *pad, + GstData *data) { GstQueue *queue; @@ -319,147 +406,201 @@ gst_queue_chain (GstPad *pad, GstData *data) queue = GST_QUEUE (GST_OBJECT_PARENT (pad)); - /* check for events to send upstream */ - g_async_queue_lock(queue->events); - while (g_async_queue_length_unlocked(queue->events) > 0){ - GstEvent *event = (GstEvent*)g_async_queue_pop_unlocked(queue->events); - GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "sending event upstream\n"); - gst_pad_event_default (pad, event); - GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "event sent\n"); - } - g_async_queue_unlock(queue->events); - restart: /* we have to lock the queue since we span threads */ GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "locking t:%p", g_thread_self ()); g_mutex_lock (queue->qlock); GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "locked t:%p", g_thread_self ()); - + + /* check for events to send upstream */ + while (!g_queue_is_empty (queue->events)){ + GstQueueEventResponse *er = g_queue_pop_head (queue->events); + GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "sending event upstream"); + er->ret = gst_pad_event_default (GST_PAD_PEER (pad), er->event); + er->handled = TRUE; + g_cond_signal (queue->event_done); + GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "event sent"); + } + /* assume don't need to flush this buffer when the queue is filled */ queue->flush = FALSE; if (GST_IS_EVENT (data)) { switch (GST_EVENT_TYPE (data)) { case GST_EVENT_FLUSH: - GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "FLUSH event, flushing queue\n"); + STATUS (queue, "received flush event"); gst_queue_locked_flush (queue); break; case GST_EVENT_EOS: - GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "eos in on %s %d\n", - GST_ELEMENT_NAME (queue), queue->level_buffers); + STATUS (queue, "received EOS"); break; default: /* we put the event in the queue, we don't have to act ourselves */ + GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, + "adding event %p of type %d", + data, GST_EVENT_TYPE (data)); break; } } if (GST_IS_BUFFER (data)) GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, - "adding buffer %p of size %d", data, GST_BUFFER_SIZE (data)); + "adding buffer %p of size %d", + data, GST_BUFFER_SIZE (data)); - if (queue->level_buffers == queue->size_buffers) { + /* We make space available if we're "full" according to whatever + * the user defined as "full". Note that this only applies to buffers. + * We always handle events and they don't count in our statistics. */ + if (GST_IS_BUFFER (data) && + ((queue->max_size.buffers > 0 && + queue->cur_level.buffers >= queue->max_size.buffers) || + (queue->max_size.bytes > 0 && + queue->cur_level.bytes >= queue->max_size.bytes) || + (queue->max_size.time > 0 && + queue->cur_level.time >= queue->max_size.time))) { g_mutex_unlock (queue->qlock); - g_signal_emit (G_OBJECT (queue), gst_queue_signals[FULL], 0); + g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_OVERRUN], 0); g_mutex_lock (queue->qlock); - /* if this is a leaky queue... */ - if (queue->leaky) { - /* FIXME don't want to leak events! */ - /* if we leak on the upstream side, drop the current buffer */ - if (queue->leaky == GST_QUEUE_LEAK_UPSTREAM) { - GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on upstream end"); - if (GST_IS_EVENT (data)) - fprintf(stderr, "Error: queue [%s] leaked an event, type:%d\n", - GST_ELEMENT_NAME(GST_ELEMENT(queue)), - GST_EVENT_TYPE(GST_EVENT(data))); - /* now we have to clean up and exit right away */ + /* how are we going to make space for this buffer? */ + switch (queue->leaky) { + /* leak current buffer */ + case GST_QUEUE_LEAK_UPSTREAM: + GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, + "queue is full, leaking buffer on upstream end"); + /* now we can clean up and exit right away */ g_mutex_unlock (queue->qlock); goto out_unref; - } - /* otherwise we have to push a buffer off the other end */ - else { - gpointer front; - GstData *leak; - GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on downstream end"); + /* leak first buffer in the queue */ + case GST_QUEUE_LEAK_DOWNSTREAM: { + /* this is a bit hacky. We'll manually iterate the list + * and find the first buffer from the head on. We'll + * unref that and "fix up" the GQueue object... */ + GList *item; + GstData *leak = NULL; - front = g_queue_pop_head (queue->queue); - leak = GST_DATA (front); + GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, + "queue is full, leaking buffer on downstream end"); - queue->level_buffers--; - if (GST_IS_EVENT (leak)) { - fprintf(stderr, "Error: queue [%s] leaked an event, type:%d\n", - GST_ELEMENT_NAME(GST_ELEMENT(queue)), - GST_EVENT_TYPE(GST_EVENT(leak))); - } else { - queue->level_bytes -= GST_BUFFER_SIZE(leak); + for (item = queue->queue->head; item != NULL; item = item->next) { + if (GST_IS_BUFFER (item->data)) { + leak = item->data; + break; + } } - gst_data_unref (leak); + /* if we didn't find anything, it means we have no buffers + * in here. That cannot happen, since we had >= 1 bufs */ + g_assert (leak); + + /* Now remove it from the list, fixing up the GQueue + * CHECKME: is a queue->head the first or the last item? */ + item = g_list_delete_link (queue->queue->head, item); + queue->queue->head = g_list_first (item); + queue->queue->tail = g_list_last (item); + queue->queue->length--; + + /* and unref the data at the end. Twice, because we keep a ref + * to make things read-only. Also keep our list uptodate. */ + queue->cur_level.bytes -= GST_BUFFER_SIZE (data); + queue->cur_level.buffers --; + if (GST_BUFFER_DURATION (data) != GST_CLOCK_TIME_NONE) + queue->cur_level.time -= GST_BUFFER_DURATION (data); + + gst_data_unref (data); + gst_data_unref (data); + break; } - } - GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "pre full wait, level:%d/%d buffers, %d bytes", - queue->level_buffers, queue->size_buffers, queue->level_bytes); + default: + g_warning ("Unknown leaky type, using default"); + /* fall-through */ - while (queue->level_buffers == queue->size_buffers) { - /* if there's a pending state change for this queue or its manager, switch */ - /* back to iterator so bottom half of state change executes */ - if (queue->interrupt) { - GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "interrupted!!"); + /* don't leak. Instead, wait for space to be available */ + case GST_QUEUE_NO_LEAK: + STATUS (queue, "pre-full wait"); + + while ((queue->max_size.buffers > 0 && + queue->cur_level.buffers >= queue->max_size.buffers) || + (queue->max_size.bytes > 0 && + queue->cur_level.bytes >= queue->max_size.bytes) || + (queue->max_size.time > 0 && + queue->cur_level.time >= queue->max_size.time)) { + /* if there's a pending state change for this queue + * or its manager, switch back to iterator so bottom + * half of state change executes */ + if (queue->interrupt) { + GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "interrupted"); + g_mutex_unlock (queue->qlock); + if (gst_scheduler_interrupt (gst_pad_get_scheduler (queue->sinkpad), + GST_ELEMENT (queue))) { + goto out_unref; + } + /* if we got here because we were unlocked after a + * flush, we don't need to add the buffer to the + * queue again */ + if (queue->flush) { + GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, + "not adding pending buffer after flush"); + goto out_unref; + } + GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, + "adding pending buffer after interrupt"); + goto restart; + } + + if (GST_STATE (queue) != GST_STATE_PLAYING) { + /* this means the other end is shut down. Try to + * signal to resolve the error */ + if (!queue->may_deadlock) { + g_mutex_unlock (queue->qlock); + gst_data_unref (data); + gst_element_error (GST_ELEMENT (queue), + "deadlock found, source pad elements are shut down"); + /* we don't go to out_unref here, since we want to + * unref the buffer *before* calling gst_element_error */ + return; + } else { + GST_CAT_WARNING_OBJECT (GST_CAT_DATAFLOW, queue, + "%s: waiting for the app to restart " + "source pad elements", + GST_ELEMENT_NAME (queue)); + } + } + + STATUS (queue, "waiting for item_del signal"); + g_cond_wait (queue->item_del, queue->qlock); + STATUS (queue, "received item_del signal"); + } + + STATUS (queue, "post-full wait"); g_mutex_unlock (queue->qlock); - if (gst_scheduler_interrupt (gst_pad_get_scheduler (queue->sinkpad), GST_ELEMENT (queue))) - goto out_unref; - /* if we got here because we were unlocked after a flush, we don't need - * to add the buffer to the queue again */ - if (queue->flush) { - GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "not adding pending buffer after flush"); - goto out_unref; - } - GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "adding pending buffer after interrupt"); - goto restart; - } - if (GST_STATE (queue) != GST_STATE_PLAYING) { - /* this means the other end is shut down */ - /* try to signal to resolve the error */ - if (!queue->may_deadlock) { - g_mutex_unlock (queue->qlock); - gst_data_unref (data); - gst_element_error (GST_ELEMENT (queue), "deadlock found, source pad elements are shut down"); - /* we don't want to goto out_unref here, since we want to clean up before calling gst_element_error */ - return; - } - else { - g_print ("%s: waiting for the app to restart source pad elements\n", GST_ELEMENT_NAME (queue)); - } - } - - GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "waiting for not_full, level:%d/%d buffers, %d bytes", - queue->level_buffers, queue->size_buffers, queue->level_bytes); - g_cond_wait (queue->not_full, queue->qlock); - GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "got not_full signal"); + g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0); + g_mutex_lock (queue->qlock); + break; } - GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "post full wait, level:%d/%d buffers, %d bytes", - queue->level_buffers, queue->size_buffers, queue->level_bytes); } - /* put the buffer on the tail of the list */ + /* put the buffer on the tail of the list. We keep a reference, + * so that the data is read-only while in here. There's a good + * reason to do so: we have a size and time counter, and any + * modification to the content could change any of the two. */ + gst_data_ref (data); g_queue_push_tail (queue->queue, data); - queue->level_buffers++; - if (GST_IS_BUFFER (data)) - queue->level_bytes += GST_BUFFER_SIZE (data); + /* Note that we only add buffers (not events) to the statistics */ + if (GST_IS_BUFFER (data)) { + queue->cur_level.buffers++; + queue->cur_level.bytes += GST_BUFFER_SIZE (data); + if (GST_BUFFER_DURATION (data) != GST_CLOCK_TIME_NONE) + queue->cur_level.time += GST_BUFFER_DURATION (data); + } - /* this assertion _has_ to hold */ - g_assert (queue->queue->length == queue->level_buffers); + STATUS (queue, "+ level"); - GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "(%s:%s)+ level:%d/%d buffers, %d bytes", - GST_DEBUG_PAD_NAME(pad), - queue->level_buffers, queue->size_buffers, queue->level_bytes); - - GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "signalling not_empty"); - g_cond_signal (queue->not_empty); + GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "signalling item_add"); + g_cond_signal (queue->item_add); g_mutex_unlock (queue->qlock); return; @@ -473,96 +614,124 @@ static GstData * gst_queue_get (GstPad *pad) { GstQueue *queue; - GstData *data = NULL; - gpointer front; + GstData *data; - g_assert(pad != NULL); - g_assert(GST_IS_PAD(pad)); g_return_val_if_fail (pad != NULL, NULL); g_return_val_if_fail (GST_IS_PAD (pad), NULL); - queue = GST_QUEUE (GST_OBJECT_PARENT (pad)); + queue = GST_QUEUE (gst_pad_get_parent (pad)); restart: /* have to lock for thread-safety */ - GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "locking t:%p", g_thread_self ()); + GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, + "locking t:%p", g_thread_self ()); g_mutex_lock (queue->qlock); - GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "locked t:%p %p", g_thread_self (), queue->not_empty); + GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, + "locked t:%p", g_thread_self ()); - GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "pre empty wait, level:%d/%d buffers, %d bytes", - queue->level_buffers, queue->size_buffers, queue->level_bytes); - while (queue->level_buffers == 0) { - /* if there's a pending state change for this queue or its manager, switch - * back to iterator so bottom half of state change executes - */ - //while (GST_STATE_PENDING (queue) != GST_STATE_VOID_PENDING) { - if (queue->interrupt) { - GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "interrupted!!"); - g_mutex_unlock (queue->qlock); - if (gst_scheduler_interrupt (gst_pad_get_scheduler (queue->srcpad), GST_ELEMENT (queue))) - return GST_DATA (gst_event_new (GST_EVENT_INTERRUPT)); - goto restart; - } - if (GST_STATE (queue) != GST_STATE_PLAYING) { - /* this means the other end is shut down */ - if (!queue->may_deadlock) { + if (queue->queue->length == 0 || + (queue->min_treshold.buffers > 0 && + queue->cur_level.buffers < queue->min_treshold.buffers) || + (queue->min_treshold.bytes > 0 && + queue->cur_level.bytes < queue->min_treshold.bytes) || + (queue->min_treshold.time > 0 && + queue->cur_level.time < queue->min_treshold.time)) { + g_mutex_unlock (queue->qlock); + g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_UNDERRUN], 0); + g_mutex_lock (queue->qlock); + + STATUS (queue, "pre-empty wait"); + while (queue->queue->length == 0 || + (queue->min_treshold.buffers > 0 && + queue->cur_level.buffers < queue->min_treshold.buffers) || + (queue->min_treshold.bytes > 0 && + queue->cur_level.bytes < queue->min_treshold.bytes) || + (queue->min_treshold.time > 0 && + queue->cur_level.time < queue->min_treshold.time)) { + /* if there's a pending state change for this queue or its + * manager, switch back to iterator so bottom half of state + * change executes. */ + if (queue->interrupt) { + GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "interrupted"); g_mutex_unlock (queue->qlock); - gst_element_error (GST_ELEMENT (queue), "deadlock found, sink pad elements are shut down"); + if (gst_scheduler_interrupt (gst_pad_get_scheduler (queue->srcpad), + GST_ELEMENT (queue))) + return GST_DATA (gst_event_new (GST_EVENT_INTERRUPT)); goto restart; } - else { - g_print ("%s: waiting for the app to restart source pad elements\n", GST_ELEMENT_NAME (queue)); + if (GST_STATE (queue) != GST_STATE_PLAYING) { + /* this means the other end is shut down */ + if (!queue->may_deadlock) { + g_mutex_unlock (queue->qlock); + gst_element_error (GST_ELEMENT (queue), + "deadlock found, sink pad elements are shut down"); + goto restart; + } else { + GST_CAT_WARNING_OBJECT (GST_CAT_DATAFLOW, queue, + "%s: waiting for the app to restart " + "source pad elements", + GST_ELEMENT_NAME (queue)); + } } - } - GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "waiting for not_empty, level:%d/%d buffers, %d bytes", - queue->level_buffers, queue->size_buffers, queue->level_bytes); + STATUS (queue, "waiting for item_add"); - /* if (queue->block_timeout > -1){ */ - if (FALSE) { - GTimeVal timeout; - g_get_current_time(&timeout); - g_time_val_add(&timeout, queue->block_timeout); - if (!g_cond_timed_wait (queue->not_empty, queue->qlock, &timeout)){ - g_mutex_unlock (queue->qlock); - g_warning ("filler"); - return GST_DATA (gst_event_new_filler()); + if (queue->block_timeout != GST_CLOCK_TIME_NONE) { + GTimeVal timeout; + g_get_current_time (&timeout); + g_time_val_add (&timeout, queue->block_timeout / 1000); + if (!g_cond_timed_wait (queue->item_add, queue->qlock, &timeout)){ + g_mutex_unlock (queue->qlock); + GST_CAT_WARNING_OBJECT (GST_CAT_DATAFLOW, queue, + "Sending filler event"); + return GST_DATA (gst_event_new_filler ()); + } + } else { + g_cond_wait (queue->item_add, queue->qlock); } + STATUS (queue, "got item_add signal"); } - else { - g_cond_wait (queue->not_empty, queue->qlock); - } - GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "got not_empty signal"); + + STATUS (queue, "post-empty wait"); + g_mutex_unlock (queue->qlock); + g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0); + g_mutex_lock (queue->qlock); } - GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "post empty wait, level:%d/%d buffers, %d bytes", - queue->level_buffers, queue->size_buffers, queue->level_bytes); - front = g_queue_pop_head (queue->queue); - data = GST_DATA (front); - GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "retrieved data %p from queue", data); + /* There's something in the list now, whatever it is */ + data = g_queue_pop_head (queue->queue); + GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, + "retrieved data %p from queue", data); - queue->level_buffers--; - if (GST_IS_BUFFER (data)) - queue->level_bytes -= GST_BUFFER_SIZE (data); + if (GST_IS_BUFFER (data)) { + /* Update statistics */ + queue->cur_level.buffers--; + queue->cur_level.bytes -= GST_BUFFER_SIZE (data); + if (GST_BUFFER_DURATION (data) != GST_CLOCK_TIME_NONE) + queue->cur_level.time -= GST_BUFFER_DURATION (data); + } - GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "(%s:%s)- level:%d/%d buffers, %d bytes", - GST_DEBUG_PAD_NAME(pad), - queue->level_buffers, queue->size_buffers, queue->level_bytes); + /* Now that we're done, we can lose our own reference to + * the item, since we're no longer in danger. */ + gst_data_unref (data); - /* this assertion _has_ to hold */ - g_assert (queue->queue->length == queue->level_buffers); + STATUS (queue, "after _get()"); - GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "signalling not_full"); - g_cond_signal (queue->not_full); + GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "signalling item_del"); + g_cond_signal (queue->item_del); g_mutex_unlock (queue->qlock); - /* FIXME where should this be? locked? */ + /* FIXME: I suppose this needs to be locked, since the EOS + * bit affects the pipeline state. However, that bit is + * locked too so it'd cause a deadlock. */ if (GST_IS_EVENT (data)) { GstEvent *event = GST_EVENT (data); - switch (GST_EVENT_TYPE(event)) { + switch (GST_EVENT_TYPE (event)) { case GST_EVENT_EOS: - GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "queue \"%s\" eos", GST_ELEMENT_NAME (queue)); + GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, + "queue \"%s\" eos", + GST_ELEMENT_NAME (queue)); gst_element_set_eos (GST_ELEMENT (queue)); break; default: @@ -575,47 +744,43 @@ restart: static gboolean -gst_queue_handle_src_event (GstPad *pad, GstEvent *event) +gst_queue_handle_src_event (GstPad *pad, + GstEvent *event) { - GstQueue *queue; + GstQueue *queue = GST_QUEUE (gst_pad_get_parent (pad)); gboolean res; - gint event_type; - gint flag_flush = 0; - - queue = GST_QUEUE (GST_OBJECT_PARENT (pad)); g_mutex_lock (queue->qlock); if (gst_element_get_state (GST_ELEMENT (queue)) == GST_STATE_PLAYING) { - /* push the event to the queue for upstream consumption */ - g_async_queue_push(queue->events, event); - g_warning ("FIXME: sending event in a running queue"); - /* FIXME wait for delivery of the event here, then return the result - * instead of FALSE */ - res = FALSE; - goto done; - } + GstQueueEventResponse er; - event_type = GST_EVENT_TYPE (event); - if (event_type == GST_EVENT_SEEK) - flag_flush = GST_EVENT_SEEK_FLAGS (event) & GST_SEEK_FLAG_FLUSH; + /* push the event to the queue and wait for upstream consumption */ + er.event = event; + er.handled = FALSE; + g_queue_push_tail (queue->events, &er); + while (!er.handled) { + g_cond_wait (queue->event_done, queue->qlock); + } + res = er.ret; + } else { + res = gst_pad_event_default (pad, event); - res = gst_pad_event_default (pad, event); - - switch (event_type) { - case GST_EVENT_FLUSH: - GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "FLUSH event, flushing queue\n"); - gst_queue_locked_flush (queue); - break; - case GST_EVENT_SEEK: - if (flag_flush) { + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_FLUSH: + GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, + "FLUSH event, flushing queue\n"); gst_queue_locked_flush (queue); - } - default: - break; + break; + case GST_EVENT_SEEK: + if (GST_EVENT_SEEK_FLAGS (event) & GST_SEEK_FLAG_FLUSH) { + gst_queue_locked_flush (queue); + } + default: + break; + } } -done: g_mutex_unlock (queue->qlock); /* we have to claim success, but we don't really know */ @@ -631,8 +796,8 @@ gst_queue_release_locks (GstElement *element) g_mutex_lock (queue->qlock); queue->interrupt = TRUE; - g_cond_signal (queue->not_full); - g_cond_signal (queue->not_empty); + g_cond_signal (queue->item_add); + g_cond_signal (queue->item_del); g_mutex_unlock (queue->qlock); return TRUE; @@ -642,7 +807,7 @@ static GstElementStateReturn gst_queue_change_state (GstElement *element) { GstQueue *queue; - GstElementStateReturn ret; + GstElementStateReturn ret = GST_STATE_SUCCESS; queue = GST_QUEUE (element); @@ -657,29 +822,29 @@ gst_queue_change_state (GstElement *element) case GST_STATE_NULL_TO_READY: gst_queue_locked_flush (queue); break; - case GST_STATE_READY_TO_PAUSED: - break; case GST_STATE_PAUSED_TO_PLAYING: if (!GST_PAD_IS_LINKED (queue->sinkpad)) { - GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, queue, "queue %s is not linked", GST_ELEMENT_NAME (queue)); + GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, queue, + "queue %s is not linked", + GST_ELEMENT_NAME (queue)); /* FIXME can this be? */ - g_cond_signal (queue->not_empty); + g_cond_signal (queue->item_add); ret = GST_STATE_FAILURE; goto error; - } - else { + } else { GstScheduler *src_sched, *sink_sched; src_sched = gst_pad_get_scheduler (GST_PAD (queue->srcpad)); sink_sched = gst_pad_get_scheduler (GST_PAD (queue->sinkpad)); if (src_sched == sink_sched) { - GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, queue, "queue %s does not connect different schedulers", - GST_ELEMENT_NAME (queue)); + GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, queue, + "queue %s does not connect different schedulers", + GST_ELEMENT_NAME (queue)); g_warning ("queue %s does not connect different schedulers", - GST_ELEMENT_NAME (queue)); + GST_ELEMENT_NAME (queue)); ret = GST_STATE_FAILURE; goto error; @@ -687,18 +852,19 @@ gst_queue_change_state (GstElement *element) } queue->interrupt = FALSE; break; - case GST_STATE_PLAYING_TO_PAUSED: - break; case GST_STATE_PAUSED_TO_READY: gst_queue_locked_flush (queue); break; - case GST_STATE_READY_TO_NULL: + default: break; } - ret = GST_ELEMENT_CLASS (parent_class)->change_state (element); - /* this is an ugly hack to make sure our pads are always active. Reason for this is that - * pad activation for the queue element depends on 2 schedulers (ugh) */ + if (GST_ELEMENT_CLASS (parent_class)->change_state) + ret = GST_ELEMENT_CLASS (parent_class)->change_state (element); + + /* this is an ugly hack to make sure our pads are always active. + * Reason for this is that pad activation for the queue element + * depends on 2 schedulers (ugh) */ gst_pad_set_active (queue->sinkpad, TRUE); gst_pad_set_active (queue->srcpad, TRUE); @@ -706,70 +872,103 @@ error: g_mutex_unlock (queue->qlock); GST_CAT_LOG_OBJECT (GST_CAT_STATES, element, "done with state change"); + return ret; } static void -gst_queue_set_property (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec) +gst_queue_set_property (GObject *object, + guint prop_id, + const GValue *value, + GParamSpec *pspec) { - GstQueue *queue; + GstQueue *queue = GST_QUEUE (object); - /* it's not null if we got it, but it might not be ours */ - g_return_if_fail (GST_IS_QUEUE (object)); - - queue = GST_QUEUE (object); + /* someone could change levels here, and since this + * affects the get/put funcs, we need to lock for safety. */ + g_mutex_lock (queue->qlock); switch (prop_id) { + case ARG_MAX_SIZE_BYTES: + queue->max_size.bytes = g_value_get_uint (value); + break; + case ARG_MAX_SIZE_BUFFERS: + queue->max_size.buffers = g_value_get_uint (value); + break; + case ARG_MAX_SIZE_TIME: + queue->max_size.time = g_value_get_uint64 (value); + break; + case ARG_MIN_TRESHOLD_BYTES: + queue->max_size.bytes = g_value_get_uint (value); + break; + case ARG_MIN_TRESHOLD_BUFFERS: + queue->max_size.buffers = g_value_get_uint (value); + break; + case ARG_MIN_TRESHOLD_TIME: + queue->max_size.time = g_value_get_uint64 (value); + break; case ARG_LEAKY: queue->leaky = g_value_get_enum (value); break; - case ARG_MAX_LEVEL: - queue->size_buffers = g_value_get_int (value); - break; - case ARG_MIN_THRESHOLD_BYTES: - queue->min_threshold_bytes = g_value_get_int (value); - break; case ARG_MAY_DEADLOCK: queue->may_deadlock = g_value_get_boolean (value); break; case ARG_BLOCK_TIMEOUT: - queue->block_timeout = g_value_get_int (value); + queue->block_timeout = g_value_get_uint64 (value); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; } + + g_mutex_unlock (queue->qlock); } static void -gst_queue_get_property (GObject *object, guint prop_id, GValue *value, GParamSpec *pspec) +gst_queue_get_property (GObject *object, + guint prop_id, + GValue *value, + GParamSpec *pspec) { - GstQueue *queue; - - /* it's not null if we got it, but it might not be ours */ - g_return_if_fail (GST_IS_QUEUE (object)); - - queue = GST_QUEUE (object); + GstQueue *queue = GST_QUEUE (object); switch (prop_id) { + case ARG_CUR_LEVEL_BYTES: + g_value_set_uint (value, queue->cur_level.bytes); + break; + case ARG_CUR_LEVEL_BUFFERS: + g_value_set_uint (value, queue->cur_level.buffers); + break; + case ARG_CUR_LEVEL_TIME: + g_value_set_uint64 (value, queue->cur_level.time); + break; + case ARG_MAX_SIZE_BYTES: + g_value_set_uint (value, queue->max_size.bytes); + break; + case ARG_MAX_SIZE_BUFFERS: + g_value_set_uint (value, queue->max_size.buffers); + break; + case ARG_MAX_SIZE_TIME: + g_value_set_uint64 (value, queue->max_size.time); + break; + case ARG_MIN_TRESHOLD_BYTES: + g_value_set_uint (value, queue->min_treshold.bytes); + break; + case ARG_MIN_TRESHOLD_BUFFERS: + g_value_set_uint (value, queue->min_treshold.buffers); + break; + case ARG_MIN_TRESHOLD_TIME: + g_value_set_uint64 (value, queue->min_treshold.time); + break; case ARG_LEAKY: g_value_set_enum (value, queue->leaky); break; - case ARG_LEVEL: - g_value_set_int (value, queue->level_buffers); - break; - case ARG_MAX_LEVEL: - g_value_set_int (value, queue->size_buffers); - break; - case ARG_MIN_THRESHOLD_BYTES: - g_value_set_int (value, queue->min_threshold_bytes); - break; case ARG_MAY_DEADLOCK: g_value_set_boolean (value, queue->may_deadlock); break; case ARG_BLOCK_TIMEOUT: - g_value_set_int (value, queue->block_timeout); + g_value_set_uint64 (value, queue->block_timeout); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); diff --git a/gst/gstqueue.h b/gst/gstqueue.h index 428c3e09c5..5e87b899e5 100644 --- a/gst/gstqueue.h +++ b/gst/gstqueue.h @@ -56,32 +56,37 @@ struct _GstQueue { GstPad *sinkpad; GstPad *srcpad; - /* the queue of buffers we're keeping our grubby hands on */ + /* the queue of data we're keeping our grubby hands on */ GQueue *queue; - guint level_buffers; /* number of buffers queued here */ - guint level_bytes; /* number of bytes queued here */ - guint64 level_time; /* amount of time queued here */ + struct { + guint buffers; /* no. of buffers */ + guint bytes; /* no. of bytes */ + guint64 time; /* amount of time */ + } cur_level, /* currently in the queue */ + max_size, /* max. amount of data allowed in the queue */ + min_treshold; /* min. amount of data required to wake reader */ - guint size_buffers; /* size of queue in buffers */ - guint size_bytes; /* size of queue in bytes */ - guint64 size_time; /* size of queue in time */ + /* whether we leak data, and at which end */ + gint leaky; + + /* number of nanoseconds until a blocked queue 'times out' + * to receive data and returns a filler event. -1 = disable */ + guint64 block_timeout; + + /* it the queue should fail on possible deadlocks */ + gboolean may_deadlock; - gint leaky; /* whether the queue is leaky, and if so at which end */ - gint block_timeout; /* microseconds until a blocked queue times out and returns GST_EVENT_FILLER. - * A value of -1 will block forever. */ - guint min_threshold_bytes; /* the minimum number of bytes required before - * waking up the reader thread */ - gboolean may_deadlock; /* it the queue should fail on possible deadlocks */ gboolean interrupt; gboolean flush; GMutex *qlock; /* lock for queue (vs object lock) */ - GCond *not_empty; /* signals buffers now available for reading */ - GCond *not_full; /* signals space now available for writing */ + GCond *item_add; /* signals buffers now available for reading */ + GCond *item_del; /* signals space now available for writing */ + GCond *event_done; /* upstream event signaller */ GTimeVal *timeval; /* the timeout for the queue locking */ - GAsyncQueue *events; /* upstream events get decoupled here */ + GQueue *events; /* upstream events get decoupled here */ gpointer _gst_reserved[GST_PADDING]; }; @@ -89,8 +94,11 @@ struct _GstQueue { struct _GstQueueClass { GstElementClass parent_class; - /* signal callbacks */ - void (*full) (GstQueue *queue); + /* signals - 'running' is called from both sides + * which might make it sort of non-useful... */ + void (*underrun) (GstQueue *queue); + void (*running) (GstQueue *queue); + void (*overrun) (GstQueue *queue); gpointer _gst_reserved[GST_PADDING]; }; diff --git a/plugins/elements/gstqueue.c b/plugins/elements/gstqueue.c index fc646bf371..efff3c940f 100644 --- a/plugins/elements/gstqueue.c +++ b/plugins/elements/gstqueue.c @@ -39,54 +39,76 @@ static GstElementDetails gst_queue_details = GST_ELEMENT_DETAILS ( /* Queue signals and args */ enum { - FULL, + SIGNAL_UNDERRUN, + SIGNAL_RUNNING, + SIGNAL_OVERRUN, LAST_SIGNAL }; enum { ARG_0, - ARG_LEVEL_BUFFERS, - ARG_LEVEL_BYTES, - ARG_LEVEL_TIME, - ARG_SIZE_BUFFERS, - ARG_SIZE_BYTES, - ARG_SIZE_TIME, + /* FIXME: don't we have another way of doing this + * "Gstreamer format" (frame/byte/time) queries? */ + ARG_CUR_LEVEL_BUFFERS, + ARG_CUR_LEVEL_BYTES, + ARG_CUR_LEVEL_TIME, + ARG_MAX_SIZE_BUFFERS, + ARG_MAX_SIZE_BYTES, + ARG_MAX_SIZE_TIME, + ARG_MIN_TRESHOLD_BUFFERS, + ARG_MIN_TRESHOLD_BYTES, + ARG_MIN_TRESHOLD_TIME, ARG_LEAKY, - ARG_LEVEL, - ARG_MAX_LEVEL, - ARG_MIN_THRESHOLD_BYTES, ARG_MAY_DEADLOCK, - ARG_BLOCK_TIMEOUT, + ARG_BLOCK_TIMEOUT + /* FILL ME */ }; -static void gst_queue_base_init (gpointer g_class); -static void gst_queue_class_init (gpointer g_class, - gpointer class_data); -static void gst_queue_init (GTypeInstance *instance, - gpointer g_class); -static void gst_queue_dispose (GObject *object); +typedef struct _GstQueueEventResponse { + GstEvent *event; + gboolean ret, handled; +} GstQueueEventResponse; -static void gst_queue_set_property (GObject *object, guint prop_id, - const GValue *value, GParamSpec *pspec); -static void gst_queue_get_property (GObject *object, guint prop_id, - GValue *value, GParamSpec *pspec); +static void gst_queue_base_init (GstQueueClass *klass); +static void gst_queue_class_init (GstQueueClass *klass); +static void gst_queue_init (GstQueue *queue); +static void gst_queue_dispose (GObject *object); -static void gst_queue_chain (GstPad *pad, GstData *data); -static GstData * gst_queue_get (GstPad *pad); -static GstBufferPool* gst_queue_get_bufferpool (GstPad *pad); +static void gst_queue_set_property (GObject *object, + guint prop_id, + const GValue *value, + GParamSpec *pspec); +static void gst_queue_get_property (GObject *object, + guint prop_id, + GValue *value, + GParamSpec *pspec); + +static GstCaps *gst_queue_getcaps (GstPad *pad, + GstCaps *caps); +static GstPadLinkReturn + gst_queue_link (GstPad *pad, + GstCaps *caps); +static void gst_queue_chain (GstPad *pad, + GstData *data); +static GstData *gst_queue_get (GstPad *pad); +static GstBufferPool * + gst_queue_get_bufferpool (GstPad *pad); -static gboolean gst_queue_handle_src_event (GstPad *pad, GstEvent *event); +static gboolean gst_queue_handle_src_event (GstPad *pad, + GstEvent *event); +static void gst_queue_locked_flush (GstQueue *queue); -static void gst_queue_locked_flush (GstQueue *queue); - -static GstElementStateReturn gst_queue_change_state (GstElement *element); -static gboolean gst_queue_release_locks (GstElement *element); +static GstElementStateReturn + gst_queue_change_state (GstElement *element); +static gboolean gst_queue_release_locks (GstElement *element); -#define GST_TYPE_QUEUE_LEAKY (queue_leaky_get_type()) +#define GST_TYPE_QUEUE_LEAKY (queue_leaky_get_type ()) + static GType -queue_leaky_get_type(void) { +queue_leaky_get_type (void) +{ static GType queue_leaky_type = 0; static GEnumValue queue_leaky[] = { { GST_QUEUE_NO_LEAK, "0", "Not Leaky" }, @@ -104,115 +126,127 @@ static GstElementClass *parent_class = NULL; static guint gst_queue_signals[LAST_SIGNAL] = { 0 }; GType -gst_queue_get_type(void) +gst_queue_get_type (void) { static GType queue_type = 0; if (!queue_type) { static const GTypeInfo queue_info = { - sizeof(GstQueueClass), - gst_queue_base_init, + sizeof (GstQueueClass), + (GBaseInitFunc) gst_queue_base_init, NULL, - gst_queue_class_init, + (GClassInitFunc) gst_queue_class_init, NULL, NULL, - sizeof(GstQueue), + sizeof (GstQueue), 4, - gst_queue_init, + (GInstanceInitFunc) gst_queue_init, NULL }; - queue_type = g_type_register_static (GST_TYPE_ELEMENT, "GstQueue", &queue_info, 0); + + queue_type = g_type_register_static (GST_TYPE_ELEMENT, + "GstQueue", &queue_info, 0); } + return queue_type; } static void -gst_queue_base_init (gpointer g_class) +gst_queue_base_init (GstQueueClass *klass) { - GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class); + GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass); gst_element_class_set_details (gstelement_class, &gst_queue_details); } static void -gst_queue_class_init (gpointer g_class, gpointer class_data) +gst_queue_class_init (GstQueueClass *klass) { - GObjectClass *gobject_class = G_OBJECT_CLASS (g_class); - GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class); - GstQueueClass *gstqueue_class = GST_QUEUE_CLASS (g_class); + GObjectClass *gobject_class = G_OBJECT_CLASS (klass); + GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass); - parent_class = g_type_class_peek_parent (g_class); + parent_class = g_type_class_peek_parent (klass); - gst_queue_signals[FULL] = - g_signal_new ("full", G_TYPE_FROM_CLASS (gstqueue_class), G_SIGNAL_RUN_FIRST, - G_STRUCT_OFFSET (GstQueueClass, full), NULL, NULL, + /* signals */ + gst_queue_signals[SIGNAL_UNDERRUN] = + g_signal_new ("underrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST, + G_STRUCT_OFFSET (GstQueueClass, underrun), NULL, NULL, + g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0); + gst_queue_signals[SIGNAL_RUNNING] = + g_signal_new ("running", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST, + G_STRUCT_OFFSET (GstQueueClass, running), NULL, NULL, + g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0); + gst_queue_signals[SIGNAL_OVERRUN] = + g_signal_new ("overrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST, + G_STRUCT_OFFSET (GstQueueClass, overrun), NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0); - g_object_class_install_property (G_OBJECT_CLASS (gstqueue_class), ARG_LEAKY, - g_param_spec_enum ("leaky", "Leaky", "Where the queue leaks, if at all.", - GST_TYPE_QUEUE_LEAKY, GST_QUEUE_NO_LEAK, G_PARAM_READWRITE)); - g_object_class_install_property (G_OBJECT_CLASS (gstqueue_class), ARG_LEVEL, - g_param_spec_int ("level", "Level", "How many buffers are in the queue.", - 0, G_MAXINT, 0, G_PARAM_READABLE)); - g_object_class_install_property (G_OBJECT_CLASS (gstqueue_class), ARG_MAX_LEVEL, - g_param_spec_int ("max_level", "Maximum Level", "How many buffers the queue holds.", - 0, G_MAXINT, 100, G_PARAM_READWRITE)); - g_object_class_install_property (G_OBJECT_CLASS (gstqueue_class), ARG_MIN_THRESHOLD_BYTES, - g_param_spec_int ("min_threshold_bytes", "Minimum Threshold", - "Minimum bytes required before signalling not_empty to reader.", - 0, G_MAXINT, 0, G_PARAM_READWRITE)); - g_object_class_install_property (G_OBJECT_CLASS (gstqueue_class), ARG_MAY_DEADLOCK, - g_param_spec_boolean ("may_deadlock", "May Deadlock", "The queue may deadlock if it's full and not PLAYING", - TRUE, G_PARAM_READWRITE)); - g_object_class_install_property (G_OBJECT_CLASS (gstqueue_class), ARG_BLOCK_TIMEOUT, - g_param_spec_int ("block_timeout", "Timeout for Block", - "Microseconds until blocked queue times out and returns filler event. " - "Value of -1 disables timeout", - -1, G_MAXINT, -1, G_PARAM_READWRITE)); - gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_queue_dispose); - gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_queue_set_property); - gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_queue_get_property); + /* properties */ + g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_BYTES, + g_param_spec_uint ("current-level-bytes", "Current level (kB)", + "Current amount of data in the queue (bytes)", + 0, G_MAXUINT, 0, G_PARAM_READABLE)); + g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_BUFFERS, + g_param_spec_uint ("current-level-buffers", "Current level (buffers)", + "Current number of buffers in the queue", + 0, G_MAXUINT, 0, G_PARAM_READABLE)); + g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_TIME, + g_param_spec_uint64 ("current-level-time", "Current level (ns)", + "Current amount of data in the queue (in ns)", + 0, G_MAXUINT64, 0, G_PARAM_READABLE)); - gstelement_class->change_state = GST_DEBUG_FUNCPTR(gst_queue_change_state); - gstelement_class->release_locks = GST_DEBUG_FUNCPTR(gst_queue_release_locks); -} + g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BYTES, + g_param_spec_uint ("max-size-bytes", "Max. size (kB)", + "Max. amount of data in the queue (bytes, 0=disable)", + 0, G_MAXUINT, 0, G_PARAM_READWRITE)); + g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BUFFERS, + g_param_spec_uint ("max-size-buffers", "Max. size (buffers)", + "Max. number of buffers in the queue (0=disable)", + 0, G_MAXUINT, 0, G_PARAM_READWRITE)); + g_object_class_install_property (gobject_class, ARG_MAX_SIZE_TIME, + g_param_spec_uint64 ("max-size-time", "Max. size (ns)", + "Max. amount of data in the queue (in ns, 0=disable)", + 0, G_MAXUINT64, 0, G_PARAM_READWRITE)); -static GstPadLinkReturn -gst_queue_link (GstPad *pad, GstCaps *caps) -{ - GstQueue *queue = GST_QUEUE (gst_pad_get_parent (pad)); - GstPad *otherpad; + g_object_class_install_property (gobject_class, ARG_MIN_TRESHOLD_BYTES, + g_param_spec_uint ("min-treshold-bytes", "Min. treshold (kB)", + "Min. amount of data in the queue to allow reading (bytes, 0=disable)", + 0, G_MAXUINT, 0, G_PARAM_READWRITE)); + g_object_class_install_property (gobject_class, ARG_MIN_TRESHOLD_BUFFERS, + g_param_spec_uint ("min-treshold-buffers", "Min. treshold (buffers)", + "Min. number of buffers in the queue to allow reading (0=disable)", + 0, G_MAXUINT, 0, G_PARAM_READWRITE)); + g_object_class_install_property (gobject_class, ARG_MIN_TRESHOLD_TIME, + g_param_spec_uint64 ("min-treshold-time", "Min. treshold (ns)", + "Min. amount of data in the queue to allow reading (in ns, 0=disable)", + 0, G_MAXUINT64, 0, G_PARAM_READWRITE)); - if (pad == queue->srcpad) - otherpad = queue->sinkpad; - else - otherpad = queue->srcpad; + g_object_class_install_property (gobject_class, ARG_LEAKY, + g_param_spec_enum ("leaky", "Leaky", + "Where the queue leaks, if at all", + GST_TYPE_QUEUE_LEAKY, GST_QUEUE_NO_LEAK, G_PARAM_READWRITE)); + g_object_class_install_property (gobject_class, ARG_MAY_DEADLOCK, + g_param_spec_boolean ("may_deadlock", "May Deadlock", + "The queue may deadlock if it's full and not PLAYING", + TRUE, G_PARAM_READWRITE)); + g_object_class_install_property (gobject_class, ARG_BLOCK_TIMEOUT, + g_param_spec_uint64 ("block_timeout", "Timeout for Block", + "Nanoseconds until blocked queue times out and returns filler event. " + "Value of -1 disables timeout", + 0, G_MAXUINT64, -1, G_PARAM_READWRITE)); - return gst_pad_proxy_link (otherpad, caps); -} + /* set several parent class virtual functions */ + gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_queue_dispose); + gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_queue_set_property); + gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_queue_get_property); -static GstCaps* -gst_queue_getcaps (GstPad *pad, GstCaps *caps) -{ - GstQueue *queue = GST_QUEUE (gst_pad_get_parent (pad)); - GstPad *otherpad; - - if (pad == queue->srcpad) - otherpad = GST_PAD_PEER (queue->sinkpad); - else - otherpad = GST_PAD_PEER (queue->srcpad); - - if (otherpad) - return gst_pad_get_caps (otherpad); - - return NULL; + gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_queue_change_state); + gstelement_class->release_locks = GST_DEBUG_FUNCPTR (gst_queue_release_locks); } static void -gst_queue_init (GTypeInstance *instance, gpointer g_class) +gst_queue_init (GstQueue *queue) { - GstQueue *queue = GST_QUEUE (instance); - /* scheduling on this kind of element is, well, interesting */ GST_FLAG_SET (queue, GST_ELEMENT_DECOUPLED); GST_FLAG_SET (queue, GST_ELEMENT_EVENT_AWARE); @@ -233,26 +267,31 @@ gst_queue_init (GTypeInstance *instance, gpointer g_class) gst_pad_set_event_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_handle_src_event)); gst_pad_set_active (queue->srcpad, TRUE); + queue->cur_level.buffers = 0; /* no content */ + queue->cur_level.bytes = 0; /* no content */ + queue->cur_level.time = 0; /* no content */ + queue->max_size.buffers = 100; /* max. 100 buffers */ + queue->max_size.bytes = 1024 * 1024; /* max. 1 MB */ + queue->max_size.time = GST_SECOND; /* max. 1 sec. */ + queue->min_treshold.buffers = 0; /* no treshold */ + queue->min_treshold.bytes = 0; /* no treshold */ + queue->min_treshold.time = 0; /* no treshold */ + queue->leaky = GST_QUEUE_NO_LEAK; - queue->queue = NULL; - queue->level_buffers = 0; - queue->level_bytes = 0; - queue->level_time = G_GINT64_CONSTANT (0); - queue->size_buffers = 100; /* 100 buffers */ - queue->size_bytes = 100 * 1024; /* 100KB */ - queue->size_time = GST_SECOND; /* 1sec */ - queue->min_threshold_bytes = 0; queue->may_deadlock = TRUE; - queue->block_timeout = -1; + queue->block_timeout = GST_CLOCK_TIME_NONE; queue->interrupt = FALSE; queue->flush = FALSE; queue->qlock = g_mutex_new (); - queue->not_empty = g_cond_new (); - queue->not_full = g_cond_new (); - queue->events = g_async_queue_new(); + queue->item_add = g_cond_new (); + queue->item_del = g_cond_new (); + queue->event_done = g_cond_new (); + queue->events = g_queue_new (); queue->queue = g_queue_new (); - GST_CAT_DEBUG_OBJECT (GST_CAT_THREAD, queue, "initialized queue's not_empty & not_full conditions"); + + GST_CAT_DEBUG_OBJECT (GST_CAT_THREAD, queue, + "initialized queue's not_empty & not_full conditions"); } static void @@ -262,54 +301,102 @@ gst_queue_dispose (GObject *object) gst_element_set_state (GST_ELEMENT (queue), GST_STATE_NULL); - g_mutex_free (queue->qlock); - g_cond_free (queue->not_empty); - g_cond_free (queue->not_full); + while (!g_queue_is_empty (queue->queue)) { + GstData *data = g_queue_pop_head (queue->queue); + gst_data_unref (data); + } g_queue_free (queue->queue); + g_mutex_free (queue->qlock); + g_cond_free (queue->item_add); + g_cond_free (queue->item_del); + g_cond_free (queue->event_done); + while (!g_queue_is_empty (queue->events)) { + GstEvent *event = g_queue_pop_head (queue->events); + gst_event_unref (event); + } - g_async_queue_unref(queue->events); - - G_OBJECT_CLASS (parent_class)->dispose (object); + if (G_OBJECT_CLASS (parent_class)->dispose) + G_OBJECT_CLASS (parent_class)->dispose (object); } -static GstBufferPool* +static GstPad * +gst_queue_otherpad (GstPad *pad) +{ + GstQueue *queue = GST_QUEUE (gst_pad_get_parent (pad)); + GstPad *otherpad; + + if (pad == queue->srcpad) + otherpad = queue->sinkpad; + else + otherpad = queue->srcpad; + + return otherpad; +} + +static GstPadLinkReturn +gst_queue_link (GstPad *pad, + GstCaps *caps) +{ + return gst_pad_proxy_link (gst_queue_otherpad (pad), caps); +} + +static GstCaps * +gst_queue_getcaps (GstPad *pad, + GstCaps *caps) +{ + GstPad *otherpad = GST_PAD_PEER (gst_queue_otherpad (pad)); + + if (otherpad) + return gst_pad_get_caps (otherpad); + + return NULL; +} + +static GstBufferPool * gst_queue_get_bufferpool (GstPad *pad) { - GstQueue *queue; - - queue = GST_QUEUE (GST_OBJECT_PARENT (pad)); - - return gst_pad_get_bufferpool (queue->srcpad); -} - -static void -gst_queue_cleanup_data (gpointer data, const gpointer user_data) -{ - GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, user_data, "cleaning buffer %p", data); - - gst_data_unref (GST_DATA (data)); + return gst_pad_get_bufferpool (gst_queue_otherpad (pad)); } static void gst_queue_locked_flush (GstQueue *queue) { - gpointer data; - - while ((data = g_queue_pop_head (queue->queue))) { - gst_queue_cleanup_data (data, (gpointer) queue); + while (!g_queue_is_empty (queue->queue)) { + GstData *data = g_queue_pop_head (queue->queue); + gst_data_unref (data); } queue->timeval = NULL; - queue->level_buffers = 0; - queue->level_bytes = 0; - queue->level_time = G_GINT64_CONSTANT (0); + queue->cur_level.buffers = 0; + queue->cur_level.bytes = 0; + queue->cur_level.time = 0; + /* make sure any pending buffers to be added are flushed too */ queue->flush = TRUE; - /* signal not_full, since we apparently aren't full anymore */ - g_cond_signal (queue->not_full); + + /* we deleted something... */ + g_cond_signal (queue->item_del); } +#define STATUS(queue, msg) \ + GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, \ + "(%s:%s) " msg ": %u of %u-%u buffers, %u of %u-%u " \ + "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \ + "-%" G_GUINT64_FORMAT " ns, %u elements", \ + GST_DEBUG_PAD_NAME (pad), \ + queue->cur_level.buffers, \ + queue->min_treshold.buffers, \ + queue->max_size.buffers, \ + queue->cur_level.bytes, \ + queue->min_treshold.bytes, \ + queue->max_size.bytes, \ + queue->cur_level.time, \ + queue->min_treshold.time, \ + queue->max_size.time, \ + queue->queue->length) + static void -gst_queue_chain (GstPad *pad, GstData *data) +gst_queue_chain (GstPad *pad, + GstData *data) { GstQueue *queue; @@ -319,147 +406,201 @@ gst_queue_chain (GstPad *pad, GstData *data) queue = GST_QUEUE (GST_OBJECT_PARENT (pad)); - /* check for events to send upstream */ - g_async_queue_lock(queue->events); - while (g_async_queue_length_unlocked(queue->events) > 0){ - GstEvent *event = (GstEvent*)g_async_queue_pop_unlocked(queue->events); - GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "sending event upstream\n"); - gst_pad_event_default (pad, event); - GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "event sent\n"); - } - g_async_queue_unlock(queue->events); - restart: /* we have to lock the queue since we span threads */ GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "locking t:%p", g_thread_self ()); g_mutex_lock (queue->qlock); GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "locked t:%p", g_thread_self ()); - + + /* check for events to send upstream */ + while (!g_queue_is_empty (queue->events)){ + GstQueueEventResponse *er = g_queue_pop_head (queue->events); + GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "sending event upstream"); + er->ret = gst_pad_event_default (GST_PAD_PEER (pad), er->event); + er->handled = TRUE; + g_cond_signal (queue->event_done); + GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "event sent"); + } + /* assume don't need to flush this buffer when the queue is filled */ queue->flush = FALSE; if (GST_IS_EVENT (data)) { switch (GST_EVENT_TYPE (data)) { case GST_EVENT_FLUSH: - GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "FLUSH event, flushing queue\n"); + STATUS (queue, "received flush event"); gst_queue_locked_flush (queue); break; case GST_EVENT_EOS: - GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "eos in on %s %d\n", - GST_ELEMENT_NAME (queue), queue->level_buffers); + STATUS (queue, "received EOS"); break; default: /* we put the event in the queue, we don't have to act ourselves */ + GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, + "adding event %p of type %d", + data, GST_EVENT_TYPE (data)); break; } } if (GST_IS_BUFFER (data)) GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, - "adding buffer %p of size %d", data, GST_BUFFER_SIZE (data)); + "adding buffer %p of size %d", + data, GST_BUFFER_SIZE (data)); - if (queue->level_buffers == queue->size_buffers) { + /* We make space available if we're "full" according to whatever + * the user defined as "full". Note that this only applies to buffers. + * We always handle events and they don't count in our statistics. */ + if (GST_IS_BUFFER (data) && + ((queue->max_size.buffers > 0 && + queue->cur_level.buffers >= queue->max_size.buffers) || + (queue->max_size.bytes > 0 && + queue->cur_level.bytes >= queue->max_size.bytes) || + (queue->max_size.time > 0 && + queue->cur_level.time >= queue->max_size.time))) { g_mutex_unlock (queue->qlock); - g_signal_emit (G_OBJECT (queue), gst_queue_signals[FULL], 0); + g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_OVERRUN], 0); g_mutex_lock (queue->qlock); - /* if this is a leaky queue... */ - if (queue->leaky) { - /* FIXME don't want to leak events! */ - /* if we leak on the upstream side, drop the current buffer */ - if (queue->leaky == GST_QUEUE_LEAK_UPSTREAM) { - GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on upstream end"); - if (GST_IS_EVENT (data)) - fprintf(stderr, "Error: queue [%s] leaked an event, type:%d\n", - GST_ELEMENT_NAME(GST_ELEMENT(queue)), - GST_EVENT_TYPE(GST_EVENT(data))); - /* now we have to clean up and exit right away */ + /* how are we going to make space for this buffer? */ + switch (queue->leaky) { + /* leak current buffer */ + case GST_QUEUE_LEAK_UPSTREAM: + GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, + "queue is full, leaking buffer on upstream end"); + /* now we can clean up and exit right away */ g_mutex_unlock (queue->qlock); goto out_unref; - } - /* otherwise we have to push a buffer off the other end */ - else { - gpointer front; - GstData *leak; - GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on downstream end"); + /* leak first buffer in the queue */ + case GST_QUEUE_LEAK_DOWNSTREAM: { + /* this is a bit hacky. We'll manually iterate the list + * and find the first buffer from the head on. We'll + * unref that and "fix up" the GQueue object... */ + GList *item; + GstData *leak = NULL; - front = g_queue_pop_head (queue->queue); - leak = GST_DATA (front); + GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, + "queue is full, leaking buffer on downstream end"); - queue->level_buffers--; - if (GST_IS_EVENT (leak)) { - fprintf(stderr, "Error: queue [%s] leaked an event, type:%d\n", - GST_ELEMENT_NAME(GST_ELEMENT(queue)), - GST_EVENT_TYPE(GST_EVENT(leak))); - } else { - queue->level_bytes -= GST_BUFFER_SIZE(leak); + for (item = queue->queue->head; item != NULL; item = item->next) { + if (GST_IS_BUFFER (item->data)) { + leak = item->data; + break; + } } - gst_data_unref (leak); + /* if we didn't find anything, it means we have no buffers + * in here. That cannot happen, since we had >= 1 bufs */ + g_assert (leak); + + /* Now remove it from the list, fixing up the GQueue + * CHECKME: is a queue->head the first or the last item? */ + item = g_list_delete_link (queue->queue->head, item); + queue->queue->head = g_list_first (item); + queue->queue->tail = g_list_last (item); + queue->queue->length--; + + /* and unref the data at the end. Twice, because we keep a ref + * to make things read-only. Also keep our list uptodate. */ + queue->cur_level.bytes -= GST_BUFFER_SIZE (data); + queue->cur_level.buffers --; + if (GST_BUFFER_DURATION (data) != GST_CLOCK_TIME_NONE) + queue->cur_level.time -= GST_BUFFER_DURATION (data); + + gst_data_unref (data); + gst_data_unref (data); + break; } - } - GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "pre full wait, level:%d/%d buffers, %d bytes", - queue->level_buffers, queue->size_buffers, queue->level_bytes); + default: + g_warning ("Unknown leaky type, using default"); + /* fall-through */ - while (queue->level_buffers == queue->size_buffers) { - /* if there's a pending state change for this queue or its manager, switch */ - /* back to iterator so bottom half of state change executes */ - if (queue->interrupt) { - GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "interrupted!!"); + /* don't leak. Instead, wait for space to be available */ + case GST_QUEUE_NO_LEAK: + STATUS (queue, "pre-full wait"); + + while ((queue->max_size.buffers > 0 && + queue->cur_level.buffers >= queue->max_size.buffers) || + (queue->max_size.bytes > 0 && + queue->cur_level.bytes >= queue->max_size.bytes) || + (queue->max_size.time > 0 && + queue->cur_level.time >= queue->max_size.time)) { + /* if there's a pending state change for this queue + * or its manager, switch back to iterator so bottom + * half of state change executes */ + if (queue->interrupt) { + GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "interrupted"); + g_mutex_unlock (queue->qlock); + if (gst_scheduler_interrupt (gst_pad_get_scheduler (queue->sinkpad), + GST_ELEMENT (queue))) { + goto out_unref; + } + /* if we got here because we were unlocked after a + * flush, we don't need to add the buffer to the + * queue again */ + if (queue->flush) { + GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, + "not adding pending buffer after flush"); + goto out_unref; + } + GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, + "adding pending buffer after interrupt"); + goto restart; + } + + if (GST_STATE (queue) != GST_STATE_PLAYING) { + /* this means the other end is shut down. Try to + * signal to resolve the error */ + if (!queue->may_deadlock) { + g_mutex_unlock (queue->qlock); + gst_data_unref (data); + gst_element_error (GST_ELEMENT (queue), + "deadlock found, source pad elements are shut down"); + /* we don't go to out_unref here, since we want to + * unref the buffer *before* calling gst_element_error */ + return; + } else { + GST_CAT_WARNING_OBJECT (GST_CAT_DATAFLOW, queue, + "%s: waiting for the app to restart " + "source pad elements", + GST_ELEMENT_NAME (queue)); + } + } + + STATUS (queue, "waiting for item_del signal"); + g_cond_wait (queue->item_del, queue->qlock); + STATUS (queue, "received item_del signal"); + } + + STATUS (queue, "post-full wait"); g_mutex_unlock (queue->qlock); - if (gst_scheduler_interrupt (gst_pad_get_scheduler (queue->sinkpad), GST_ELEMENT (queue))) - goto out_unref; - /* if we got here because we were unlocked after a flush, we don't need - * to add the buffer to the queue again */ - if (queue->flush) { - GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "not adding pending buffer after flush"); - goto out_unref; - } - GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "adding pending buffer after interrupt"); - goto restart; - } - if (GST_STATE (queue) != GST_STATE_PLAYING) { - /* this means the other end is shut down */ - /* try to signal to resolve the error */ - if (!queue->may_deadlock) { - g_mutex_unlock (queue->qlock); - gst_data_unref (data); - gst_element_error (GST_ELEMENT (queue), "deadlock found, source pad elements are shut down"); - /* we don't want to goto out_unref here, since we want to clean up before calling gst_element_error */ - return; - } - else { - g_print ("%s: waiting for the app to restart source pad elements\n", GST_ELEMENT_NAME (queue)); - } - } - - GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "waiting for not_full, level:%d/%d buffers, %d bytes", - queue->level_buffers, queue->size_buffers, queue->level_bytes); - g_cond_wait (queue->not_full, queue->qlock); - GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "got not_full signal"); + g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0); + g_mutex_lock (queue->qlock); + break; } - GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "post full wait, level:%d/%d buffers, %d bytes", - queue->level_buffers, queue->size_buffers, queue->level_bytes); } - /* put the buffer on the tail of the list */ + /* put the buffer on the tail of the list. We keep a reference, + * so that the data is read-only while in here. There's a good + * reason to do so: we have a size and time counter, and any + * modification to the content could change any of the two. */ + gst_data_ref (data); g_queue_push_tail (queue->queue, data); - queue->level_buffers++; - if (GST_IS_BUFFER (data)) - queue->level_bytes += GST_BUFFER_SIZE (data); + /* Note that we only add buffers (not events) to the statistics */ + if (GST_IS_BUFFER (data)) { + queue->cur_level.buffers++; + queue->cur_level.bytes += GST_BUFFER_SIZE (data); + if (GST_BUFFER_DURATION (data) != GST_CLOCK_TIME_NONE) + queue->cur_level.time += GST_BUFFER_DURATION (data); + } - /* this assertion _has_ to hold */ - g_assert (queue->queue->length == queue->level_buffers); + STATUS (queue, "+ level"); - GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "(%s:%s)+ level:%d/%d buffers, %d bytes", - GST_DEBUG_PAD_NAME(pad), - queue->level_buffers, queue->size_buffers, queue->level_bytes); - - GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "signalling not_empty"); - g_cond_signal (queue->not_empty); + GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "signalling item_add"); + g_cond_signal (queue->item_add); g_mutex_unlock (queue->qlock); return; @@ -473,96 +614,124 @@ static GstData * gst_queue_get (GstPad *pad) { GstQueue *queue; - GstData *data = NULL; - gpointer front; + GstData *data; - g_assert(pad != NULL); - g_assert(GST_IS_PAD(pad)); g_return_val_if_fail (pad != NULL, NULL); g_return_val_if_fail (GST_IS_PAD (pad), NULL); - queue = GST_QUEUE (GST_OBJECT_PARENT (pad)); + queue = GST_QUEUE (gst_pad_get_parent (pad)); restart: /* have to lock for thread-safety */ - GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "locking t:%p", g_thread_self ()); + GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, + "locking t:%p", g_thread_self ()); g_mutex_lock (queue->qlock); - GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "locked t:%p %p", g_thread_self (), queue->not_empty); + GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, + "locked t:%p", g_thread_self ()); - GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "pre empty wait, level:%d/%d buffers, %d bytes", - queue->level_buffers, queue->size_buffers, queue->level_bytes); - while (queue->level_buffers == 0) { - /* if there's a pending state change for this queue or its manager, switch - * back to iterator so bottom half of state change executes - */ - //while (GST_STATE_PENDING (queue) != GST_STATE_VOID_PENDING) { - if (queue->interrupt) { - GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "interrupted!!"); - g_mutex_unlock (queue->qlock); - if (gst_scheduler_interrupt (gst_pad_get_scheduler (queue->srcpad), GST_ELEMENT (queue))) - return GST_DATA (gst_event_new (GST_EVENT_INTERRUPT)); - goto restart; - } - if (GST_STATE (queue) != GST_STATE_PLAYING) { - /* this means the other end is shut down */ - if (!queue->may_deadlock) { + if (queue->queue->length == 0 || + (queue->min_treshold.buffers > 0 && + queue->cur_level.buffers < queue->min_treshold.buffers) || + (queue->min_treshold.bytes > 0 && + queue->cur_level.bytes < queue->min_treshold.bytes) || + (queue->min_treshold.time > 0 && + queue->cur_level.time < queue->min_treshold.time)) { + g_mutex_unlock (queue->qlock); + g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_UNDERRUN], 0); + g_mutex_lock (queue->qlock); + + STATUS (queue, "pre-empty wait"); + while (queue->queue->length == 0 || + (queue->min_treshold.buffers > 0 && + queue->cur_level.buffers < queue->min_treshold.buffers) || + (queue->min_treshold.bytes > 0 && + queue->cur_level.bytes < queue->min_treshold.bytes) || + (queue->min_treshold.time > 0 && + queue->cur_level.time < queue->min_treshold.time)) { + /* if there's a pending state change for this queue or its + * manager, switch back to iterator so bottom half of state + * change executes. */ + if (queue->interrupt) { + GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "interrupted"); g_mutex_unlock (queue->qlock); - gst_element_error (GST_ELEMENT (queue), "deadlock found, sink pad elements are shut down"); + if (gst_scheduler_interrupt (gst_pad_get_scheduler (queue->srcpad), + GST_ELEMENT (queue))) + return GST_DATA (gst_event_new (GST_EVENT_INTERRUPT)); goto restart; } - else { - g_print ("%s: waiting for the app to restart source pad elements\n", GST_ELEMENT_NAME (queue)); + if (GST_STATE (queue) != GST_STATE_PLAYING) { + /* this means the other end is shut down */ + if (!queue->may_deadlock) { + g_mutex_unlock (queue->qlock); + gst_element_error (GST_ELEMENT (queue), + "deadlock found, sink pad elements are shut down"); + goto restart; + } else { + GST_CAT_WARNING_OBJECT (GST_CAT_DATAFLOW, queue, + "%s: waiting for the app to restart " + "source pad elements", + GST_ELEMENT_NAME (queue)); + } } - } - GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "waiting for not_empty, level:%d/%d buffers, %d bytes", - queue->level_buffers, queue->size_buffers, queue->level_bytes); + STATUS (queue, "waiting for item_add"); - /* if (queue->block_timeout > -1){ */ - if (FALSE) { - GTimeVal timeout; - g_get_current_time(&timeout); - g_time_val_add(&timeout, queue->block_timeout); - if (!g_cond_timed_wait (queue->not_empty, queue->qlock, &timeout)){ - g_mutex_unlock (queue->qlock); - g_warning ("filler"); - return GST_DATA (gst_event_new_filler()); + if (queue->block_timeout != GST_CLOCK_TIME_NONE) { + GTimeVal timeout; + g_get_current_time (&timeout); + g_time_val_add (&timeout, queue->block_timeout / 1000); + if (!g_cond_timed_wait (queue->item_add, queue->qlock, &timeout)){ + g_mutex_unlock (queue->qlock); + GST_CAT_WARNING_OBJECT (GST_CAT_DATAFLOW, queue, + "Sending filler event"); + return GST_DATA (gst_event_new_filler ()); + } + } else { + g_cond_wait (queue->item_add, queue->qlock); } + STATUS (queue, "got item_add signal"); } - else { - g_cond_wait (queue->not_empty, queue->qlock); - } - GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "got not_empty signal"); + + STATUS (queue, "post-empty wait"); + g_mutex_unlock (queue->qlock); + g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0); + g_mutex_lock (queue->qlock); } - GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "post empty wait, level:%d/%d buffers, %d bytes", - queue->level_buffers, queue->size_buffers, queue->level_bytes); - front = g_queue_pop_head (queue->queue); - data = GST_DATA (front); - GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "retrieved data %p from queue", data); + /* There's something in the list now, whatever it is */ + data = g_queue_pop_head (queue->queue); + GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, + "retrieved data %p from queue", data); - queue->level_buffers--; - if (GST_IS_BUFFER (data)) - queue->level_bytes -= GST_BUFFER_SIZE (data); + if (GST_IS_BUFFER (data)) { + /* Update statistics */ + queue->cur_level.buffers--; + queue->cur_level.bytes -= GST_BUFFER_SIZE (data); + if (GST_BUFFER_DURATION (data) != GST_CLOCK_TIME_NONE) + queue->cur_level.time -= GST_BUFFER_DURATION (data); + } - GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "(%s:%s)- level:%d/%d buffers, %d bytes", - GST_DEBUG_PAD_NAME(pad), - queue->level_buffers, queue->size_buffers, queue->level_bytes); + /* Now that we're done, we can lose our own reference to + * the item, since we're no longer in danger. */ + gst_data_unref (data); - /* this assertion _has_ to hold */ - g_assert (queue->queue->length == queue->level_buffers); + STATUS (queue, "after _get()"); - GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "signalling not_full"); - g_cond_signal (queue->not_full); + GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "signalling item_del"); + g_cond_signal (queue->item_del); g_mutex_unlock (queue->qlock); - /* FIXME where should this be? locked? */ + /* FIXME: I suppose this needs to be locked, since the EOS + * bit affects the pipeline state. However, that bit is + * locked too so it'd cause a deadlock. */ if (GST_IS_EVENT (data)) { GstEvent *event = GST_EVENT (data); - switch (GST_EVENT_TYPE(event)) { + switch (GST_EVENT_TYPE (event)) { case GST_EVENT_EOS: - GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "queue \"%s\" eos", GST_ELEMENT_NAME (queue)); + GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, + "queue \"%s\" eos", + GST_ELEMENT_NAME (queue)); gst_element_set_eos (GST_ELEMENT (queue)); break; default: @@ -575,47 +744,43 @@ restart: static gboolean -gst_queue_handle_src_event (GstPad *pad, GstEvent *event) +gst_queue_handle_src_event (GstPad *pad, + GstEvent *event) { - GstQueue *queue; + GstQueue *queue = GST_QUEUE (gst_pad_get_parent (pad)); gboolean res; - gint event_type; - gint flag_flush = 0; - - queue = GST_QUEUE (GST_OBJECT_PARENT (pad)); g_mutex_lock (queue->qlock); if (gst_element_get_state (GST_ELEMENT (queue)) == GST_STATE_PLAYING) { - /* push the event to the queue for upstream consumption */ - g_async_queue_push(queue->events, event); - g_warning ("FIXME: sending event in a running queue"); - /* FIXME wait for delivery of the event here, then return the result - * instead of FALSE */ - res = FALSE; - goto done; - } + GstQueueEventResponse er; - event_type = GST_EVENT_TYPE (event); - if (event_type == GST_EVENT_SEEK) - flag_flush = GST_EVENT_SEEK_FLAGS (event) & GST_SEEK_FLAG_FLUSH; + /* push the event to the queue and wait for upstream consumption */ + er.event = event; + er.handled = FALSE; + g_queue_push_tail (queue->events, &er); + while (!er.handled) { + g_cond_wait (queue->event_done, queue->qlock); + } + res = er.ret; + } else { + res = gst_pad_event_default (pad, event); - res = gst_pad_event_default (pad, event); - - switch (event_type) { - case GST_EVENT_FLUSH: - GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "FLUSH event, flushing queue\n"); - gst_queue_locked_flush (queue); - break; - case GST_EVENT_SEEK: - if (flag_flush) { + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_FLUSH: + GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, + "FLUSH event, flushing queue\n"); gst_queue_locked_flush (queue); - } - default: - break; + break; + case GST_EVENT_SEEK: + if (GST_EVENT_SEEK_FLAGS (event) & GST_SEEK_FLAG_FLUSH) { + gst_queue_locked_flush (queue); + } + default: + break; + } } -done: g_mutex_unlock (queue->qlock); /* we have to claim success, but we don't really know */ @@ -631,8 +796,8 @@ gst_queue_release_locks (GstElement *element) g_mutex_lock (queue->qlock); queue->interrupt = TRUE; - g_cond_signal (queue->not_full); - g_cond_signal (queue->not_empty); + g_cond_signal (queue->item_add); + g_cond_signal (queue->item_del); g_mutex_unlock (queue->qlock); return TRUE; @@ -642,7 +807,7 @@ static GstElementStateReturn gst_queue_change_state (GstElement *element) { GstQueue *queue; - GstElementStateReturn ret; + GstElementStateReturn ret = GST_STATE_SUCCESS; queue = GST_QUEUE (element); @@ -657,29 +822,29 @@ gst_queue_change_state (GstElement *element) case GST_STATE_NULL_TO_READY: gst_queue_locked_flush (queue); break; - case GST_STATE_READY_TO_PAUSED: - break; case GST_STATE_PAUSED_TO_PLAYING: if (!GST_PAD_IS_LINKED (queue->sinkpad)) { - GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, queue, "queue %s is not linked", GST_ELEMENT_NAME (queue)); + GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, queue, + "queue %s is not linked", + GST_ELEMENT_NAME (queue)); /* FIXME can this be? */ - g_cond_signal (queue->not_empty); + g_cond_signal (queue->item_add); ret = GST_STATE_FAILURE; goto error; - } - else { + } else { GstScheduler *src_sched, *sink_sched; src_sched = gst_pad_get_scheduler (GST_PAD (queue->srcpad)); sink_sched = gst_pad_get_scheduler (GST_PAD (queue->sinkpad)); if (src_sched == sink_sched) { - GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, queue, "queue %s does not connect different schedulers", - GST_ELEMENT_NAME (queue)); + GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, queue, + "queue %s does not connect different schedulers", + GST_ELEMENT_NAME (queue)); g_warning ("queue %s does not connect different schedulers", - GST_ELEMENT_NAME (queue)); + GST_ELEMENT_NAME (queue)); ret = GST_STATE_FAILURE; goto error; @@ -687,18 +852,19 @@ gst_queue_change_state (GstElement *element) } queue->interrupt = FALSE; break; - case GST_STATE_PLAYING_TO_PAUSED: - break; case GST_STATE_PAUSED_TO_READY: gst_queue_locked_flush (queue); break; - case GST_STATE_READY_TO_NULL: + default: break; } - ret = GST_ELEMENT_CLASS (parent_class)->change_state (element); - /* this is an ugly hack to make sure our pads are always active. Reason for this is that - * pad activation for the queue element depends on 2 schedulers (ugh) */ + if (GST_ELEMENT_CLASS (parent_class)->change_state) + ret = GST_ELEMENT_CLASS (parent_class)->change_state (element); + + /* this is an ugly hack to make sure our pads are always active. + * Reason for this is that pad activation for the queue element + * depends on 2 schedulers (ugh) */ gst_pad_set_active (queue->sinkpad, TRUE); gst_pad_set_active (queue->srcpad, TRUE); @@ -706,70 +872,103 @@ error: g_mutex_unlock (queue->qlock); GST_CAT_LOG_OBJECT (GST_CAT_STATES, element, "done with state change"); + return ret; } static void -gst_queue_set_property (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec) +gst_queue_set_property (GObject *object, + guint prop_id, + const GValue *value, + GParamSpec *pspec) { - GstQueue *queue; + GstQueue *queue = GST_QUEUE (object); - /* it's not null if we got it, but it might not be ours */ - g_return_if_fail (GST_IS_QUEUE (object)); - - queue = GST_QUEUE (object); + /* someone could change levels here, and since this + * affects the get/put funcs, we need to lock for safety. */ + g_mutex_lock (queue->qlock); switch (prop_id) { + case ARG_MAX_SIZE_BYTES: + queue->max_size.bytes = g_value_get_uint (value); + break; + case ARG_MAX_SIZE_BUFFERS: + queue->max_size.buffers = g_value_get_uint (value); + break; + case ARG_MAX_SIZE_TIME: + queue->max_size.time = g_value_get_uint64 (value); + break; + case ARG_MIN_TRESHOLD_BYTES: + queue->max_size.bytes = g_value_get_uint (value); + break; + case ARG_MIN_TRESHOLD_BUFFERS: + queue->max_size.buffers = g_value_get_uint (value); + break; + case ARG_MIN_TRESHOLD_TIME: + queue->max_size.time = g_value_get_uint64 (value); + break; case ARG_LEAKY: queue->leaky = g_value_get_enum (value); break; - case ARG_MAX_LEVEL: - queue->size_buffers = g_value_get_int (value); - break; - case ARG_MIN_THRESHOLD_BYTES: - queue->min_threshold_bytes = g_value_get_int (value); - break; case ARG_MAY_DEADLOCK: queue->may_deadlock = g_value_get_boolean (value); break; case ARG_BLOCK_TIMEOUT: - queue->block_timeout = g_value_get_int (value); + queue->block_timeout = g_value_get_uint64 (value); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; } + + g_mutex_unlock (queue->qlock); } static void -gst_queue_get_property (GObject *object, guint prop_id, GValue *value, GParamSpec *pspec) +gst_queue_get_property (GObject *object, + guint prop_id, + GValue *value, + GParamSpec *pspec) { - GstQueue *queue; - - /* it's not null if we got it, but it might not be ours */ - g_return_if_fail (GST_IS_QUEUE (object)); - - queue = GST_QUEUE (object); + GstQueue *queue = GST_QUEUE (object); switch (prop_id) { + case ARG_CUR_LEVEL_BYTES: + g_value_set_uint (value, queue->cur_level.bytes); + break; + case ARG_CUR_LEVEL_BUFFERS: + g_value_set_uint (value, queue->cur_level.buffers); + break; + case ARG_CUR_LEVEL_TIME: + g_value_set_uint64 (value, queue->cur_level.time); + break; + case ARG_MAX_SIZE_BYTES: + g_value_set_uint (value, queue->max_size.bytes); + break; + case ARG_MAX_SIZE_BUFFERS: + g_value_set_uint (value, queue->max_size.buffers); + break; + case ARG_MAX_SIZE_TIME: + g_value_set_uint64 (value, queue->max_size.time); + break; + case ARG_MIN_TRESHOLD_BYTES: + g_value_set_uint (value, queue->min_treshold.bytes); + break; + case ARG_MIN_TRESHOLD_BUFFERS: + g_value_set_uint (value, queue->min_treshold.buffers); + break; + case ARG_MIN_TRESHOLD_TIME: + g_value_set_uint64 (value, queue->min_treshold.time); + break; case ARG_LEAKY: g_value_set_enum (value, queue->leaky); break; - case ARG_LEVEL: - g_value_set_int (value, queue->level_buffers); - break; - case ARG_MAX_LEVEL: - g_value_set_int (value, queue->size_buffers); - break; - case ARG_MIN_THRESHOLD_BYTES: - g_value_set_int (value, queue->min_threshold_bytes); - break; case ARG_MAY_DEADLOCK: g_value_set_boolean (value, queue->may_deadlock); break; case ARG_BLOCK_TIMEOUT: - g_value_set_int (value, queue->block_timeout); + g_value_set_uint64 (value, queue->block_timeout); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); diff --git a/plugins/elements/gstqueue.h b/plugins/elements/gstqueue.h index 428c3e09c5..5e87b899e5 100644 --- a/plugins/elements/gstqueue.h +++ b/plugins/elements/gstqueue.h @@ -56,32 +56,37 @@ struct _GstQueue { GstPad *sinkpad; GstPad *srcpad; - /* the queue of buffers we're keeping our grubby hands on */ + /* the queue of data we're keeping our grubby hands on */ GQueue *queue; - guint level_buffers; /* number of buffers queued here */ - guint level_bytes; /* number of bytes queued here */ - guint64 level_time; /* amount of time queued here */ + struct { + guint buffers; /* no. of buffers */ + guint bytes; /* no. of bytes */ + guint64 time; /* amount of time */ + } cur_level, /* currently in the queue */ + max_size, /* max. amount of data allowed in the queue */ + min_treshold; /* min. amount of data required to wake reader */ - guint size_buffers; /* size of queue in buffers */ - guint size_bytes; /* size of queue in bytes */ - guint64 size_time; /* size of queue in time */ + /* whether we leak data, and at which end */ + gint leaky; + + /* number of nanoseconds until a blocked queue 'times out' + * to receive data and returns a filler event. -1 = disable */ + guint64 block_timeout; + + /* it the queue should fail on possible deadlocks */ + gboolean may_deadlock; - gint leaky; /* whether the queue is leaky, and if so at which end */ - gint block_timeout; /* microseconds until a blocked queue times out and returns GST_EVENT_FILLER. - * A value of -1 will block forever. */ - guint min_threshold_bytes; /* the minimum number of bytes required before - * waking up the reader thread */ - gboolean may_deadlock; /* it the queue should fail on possible deadlocks */ gboolean interrupt; gboolean flush; GMutex *qlock; /* lock for queue (vs object lock) */ - GCond *not_empty; /* signals buffers now available for reading */ - GCond *not_full; /* signals space now available for writing */ + GCond *item_add; /* signals buffers now available for reading */ + GCond *item_del; /* signals space now available for writing */ + GCond *event_done; /* upstream event signaller */ GTimeVal *timeval; /* the timeout for the queue locking */ - GAsyncQueue *events; /* upstream events get decoupled here */ + GQueue *events; /* upstream events get decoupled here */ gpointer _gst_reserved[GST_PADDING]; }; @@ -89,8 +94,11 @@ struct _GstQueue { struct _GstQueueClass { GstElementClass parent_class; - /* signal callbacks */ - void (*full) (GstQueue *queue); + /* signals - 'running' is called from both sides + * which might make it sort of non-useful... */ + void (*underrun) (GstQueue *queue); + void (*running) (GstQueue *queue); + void (*overrun) (GstQueue *queue); gpointer _gst_reserved[GST_PADDING]; };