mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2025-04-15 12:34:15 +00:00
gst/gstqueue.*: Propagate GstFlowReturn more intelligently upstream and output an ERROR/EOS when streaming stopped du...
Original commit message from CVS: * gst/gstqueue.c: (gst_queue_init), (gst_queue_handle_sink_event), (gst_queue_chain), (gst_queue_loop), (gst_queue_handle_src_event), (gst_queue_handle_src_query), (gst_queue_sink_activate_push), (gst_queue_src_activate_push), (gst_queue_change_state), (gst_queue_get_property): * gst/gstqueue.h: Propagate GstFlowReturn more intelligently upstream and output an ERROR/EOS when streaming stopped due to fatal error.
This commit is contained in:
parent
5c875fadec
commit
f529a0bafc
5 changed files with 169 additions and 68 deletions
11
ChangeLog
11
ChangeLog
|
@ -1,3 +1,14 @@
|
|||
2005-07-19 Wim Taymans <wim@fluendo.com>
|
||||
|
||||
* gst/gstqueue.c: (gst_queue_init), (gst_queue_handle_sink_event),
|
||||
(gst_queue_chain), (gst_queue_loop), (gst_queue_handle_src_event),
|
||||
(gst_queue_handle_src_query), (gst_queue_sink_activate_push),
|
||||
(gst_queue_src_activate_push), (gst_queue_change_state),
|
||||
(gst_queue_get_property):
|
||||
* gst/gstqueue.h:
|
||||
Propagate GstFlowReturn more intelligently upstream and output
|
||||
an ERROR/EOS when streaming stopped due to fatal error.
|
||||
|
||||
2005-07-19 Wim Taymans <wim@fluendo.com>
|
||||
|
||||
* tools/gst-launch.c: (check_intr), (event_loop), (main):
|
||||
|
|
109
gst/gstqueue.c
109
gst/gstqueue.c
|
@ -338,7 +338,7 @@ gst_queue_init (GstQueue * queue)
|
|||
queue->leaky = GST_QUEUE_NO_LEAK;
|
||||
queue->may_deadlock = TRUE;
|
||||
queue->block_timeout = GST_CLOCK_TIME_NONE;
|
||||
queue->flushing = FALSE;
|
||||
queue->srcresult = GST_FLOW_WRONG_STATE;
|
||||
|
||||
queue->qlock = g_mutex_new ();
|
||||
queue->item_add = g_cond_new ();
|
||||
|
@ -473,22 +473,22 @@ gst_queue_handle_sink_event (GstPad * pad, GstEvent * event)
|
|||
gst_pad_push_event (queue->srcpad, event);
|
||||
if (GST_EVENT_FLUSH_DONE (event)) {
|
||||
GST_QUEUE_MUTEX_LOCK;
|
||||
queue->flushing = FALSE;
|
||||
gst_queue_locked_flush (queue);
|
||||
queue->srcresult = GST_FLOW_OK;
|
||||
gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue_loop,
|
||||
queue->srcpad);
|
||||
GST_QUEUE_MUTEX_UNLOCK;
|
||||
} else {
|
||||
/* now unblock the chain function */
|
||||
GST_QUEUE_MUTEX_LOCK;
|
||||
queue->flushing = TRUE;
|
||||
gst_queue_locked_flush (queue);
|
||||
queue->srcresult = GST_FLOW_WRONG_STATE;
|
||||
/* unblock the loop function */
|
||||
g_cond_signal (queue->item_add);
|
||||
GST_QUEUE_MUTEX_UNLOCK;
|
||||
|
||||
STATUS (queue, "after flush");
|
||||
|
||||
/* make sure it stops */
|
||||
/* make sure it pauses */
|
||||
gst_pad_pause_task (queue->srcpad);
|
||||
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped");
|
||||
}
|
||||
|
@ -548,6 +548,9 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer)
|
|||
/* we have to lock the queue since we span threads */
|
||||
GST_QUEUE_MUTEX_LOCK;
|
||||
|
||||
if (queue->srcresult != GST_FLOW_OK)
|
||||
goto out_flushing;
|
||||
|
||||
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
|
||||
"adding buffer %p of size %d", buffer, GST_BUFFER_SIZE (buffer));
|
||||
|
||||
|
@ -559,6 +562,9 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer)
|
|||
g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_OVERRUN], 0);
|
||||
GST_QUEUE_MUTEX_LOCK;
|
||||
|
||||
if (queue->srcresult != GST_FLOW_OK)
|
||||
goto out_flushing;
|
||||
|
||||
/* how are we going to make space for this buffer? */
|
||||
switch (queue->leaky) {
|
||||
/* leak current buffer */
|
||||
|
@ -621,7 +627,7 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer)
|
|||
STATUS (queue, "waiting for item_del signal from thread using qlock");
|
||||
g_cond_wait (queue->item_del, queue->qlock);
|
||||
|
||||
if (queue->flushing)
|
||||
if (queue->srcresult != GST_FLOW_OK)
|
||||
goto out_flushing;
|
||||
|
||||
/* if there's a pending state change for this queue
|
||||
|
@ -634,6 +640,10 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer)
|
|||
GST_QUEUE_MUTEX_UNLOCK;
|
||||
g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0);
|
||||
GST_QUEUE_MUTEX_LOCK;
|
||||
|
||||
if (queue->srcresult != GST_FLOW_OK)
|
||||
goto out_flushing;
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -654,6 +664,7 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer)
|
|||
|
||||
return GST_FLOW_OK;
|
||||
|
||||
/* special conditions */
|
||||
out_unref:
|
||||
{
|
||||
GST_QUEUE_MUTEX_UNLOCK;
|
||||
|
@ -664,13 +675,15 @@ out_unref:
|
|||
}
|
||||
out_flushing:
|
||||
{
|
||||
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because of flush");
|
||||
GstFlowReturn ret = queue->srcresult;
|
||||
|
||||
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
|
||||
"exit because task paused, reason: %d", ret);
|
||||
GST_QUEUE_MUTEX_UNLOCK;
|
||||
gst_pad_pause_task (queue->srcpad);
|
||||
|
||||
gst_buffer_unref (buffer);
|
||||
|
||||
return GST_FLOW_WRONG_STATE;
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -687,23 +700,28 @@ gst_queue_loop (GstPad * pad)
|
|||
GST_QUEUE_MUTEX_LOCK;
|
||||
|
||||
restart:
|
||||
if (queue->srcresult != GST_FLOW_OK)
|
||||
goto out_flushing;
|
||||
|
||||
while (gst_queue_is_empty (queue)) {
|
||||
GST_QUEUE_MUTEX_UNLOCK;
|
||||
g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_UNDERRUN], 0);
|
||||
GST_QUEUE_MUTEX_LOCK;
|
||||
|
||||
if (queue->srcresult != GST_FLOW_OK)
|
||||
goto out_flushing;
|
||||
|
||||
STATUS (queue, "pre-empty wait");
|
||||
while (gst_queue_is_empty (queue)) {
|
||||
STATUS (queue, "waiting for item_add");
|
||||
|
||||
if (queue->flushing)
|
||||
goto out_flushing;
|
||||
|
||||
GST_LOG_OBJECT (queue, "doing g_cond_wait using qlock from thread %p",
|
||||
g_thread_self ());
|
||||
g_cond_wait (queue->item_add, queue->qlock);
|
||||
|
||||
if (queue->flushing)
|
||||
/* we released the lock in the g_cond above so we might be
|
||||
* flushing now */
|
||||
if (queue->srcresult != GST_FLOW_OK)
|
||||
goto out_flushing;
|
||||
|
||||
GST_LOG_OBJECT (queue, "done g_cond_wait using qlock from thread %p",
|
||||
|
@ -715,6 +733,9 @@ restart:
|
|||
GST_QUEUE_MUTEX_UNLOCK;
|
||||
g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0);
|
||||
GST_QUEUE_MUTEX_LOCK;
|
||||
|
||||
if (queue->srcresult != GST_FLOW_OK)
|
||||
goto out_flushing;
|
||||
}
|
||||
|
||||
/* There's something in the list now, whatever it is */
|
||||
|
@ -734,11 +755,23 @@ restart:
|
|||
GST_QUEUE_MUTEX_UNLOCK;
|
||||
result = gst_pad_push (pad, GST_BUFFER (data));
|
||||
GST_QUEUE_MUTEX_LOCK;
|
||||
/* can opt to check for srcresult here but the push should
|
||||
* return an error value that is more accurate */
|
||||
if (result != GST_FLOW_OK) {
|
||||
queue->srcresult = result;
|
||||
if (GST_FLOW_IS_FATAL (result)) {
|
||||
GST_ELEMENT_ERROR (queue, STREAM, STOPPED,
|
||||
("streaming stopped, reason %d", result),
|
||||
("streaming stopped, reason %d", result));
|
||||
gst_pad_push_event (queue->srcpad, gst_event_new (GST_EVENT_EOS));
|
||||
}
|
||||
gst_pad_pause_task (queue->srcpad);
|
||||
}
|
||||
} else {
|
||||
if (GST_EVENT_TYPE (data) == GST_EVENT_EOS) {
|
||||
/* all incomming data is now unexpected */
|
||||
queue->srcresult = GST_FLOW_UNEXPECTED;
|
||||
/* and we don't need to process anymore */
|
||||
gst_pad_pause_task (queue->srcpad);
|
||||
restart = FALSE;
|
||||
}
|
||||
|
@ -754,13 +787,17 @@ restart:
|
|||
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "signalling item_del");
|
||||
g_cond_signal (queue->item_del);
|
||||
GST_QUEUE_MUTEX_UNLOCK;
|
||||
|
||||
return;
|
||||
|
||||
out_flushing:
|
||||
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because of flush");
|
||||
gst_pad_pause_task (pad);
|
||||
GST_QUEUE_MUTEX_UNLOCK;
|
||||
return;
|
||||
{
|
||||
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
|
||||
"exit because task paused, reason: %d", queue->srcresult);
|
||||
GST_QUEUE_MUTEX_UNLOCK;
|
||||
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -768,15 +805,14 @@ static gboolean
|
|||
gst_queue_handle_src_event (GstPad * pad, GstEvent * event)
|
||||
{
|
||||
gboolean res = TRUE;
|
||||
|
||||
#ifndef GST_DISABLE_GST_DEBUG
|
||||
GstQueue *queue = GST_QUEUE (GST_PAD_PARENT (pad));
|
||||
|
||||
#ifndef GST_DISABLE_GST_DEBUG
|
||||
GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%d)",
|
||||
event, GST_EVENT_TYPE (event));
|
||||
#endif
|
||||
|
||||
res = gst_pad_event_default (pad, event);
|
||||
res = gst_pad_push_event (queue->sinkpad, event);
|
||||
|
||||
return res;
|
||||
}
|
||||
|
@ -785,10 +821,15 @@ static gboolean
|
|||
gst_queue_handle_src_query (GstPad * pad, GstQuery * query)
|
||||
{
|
||||
GstQueue *queue = GST_QUEUE (GST_PAD_PARENT (pad));
|
||||
GstPad *peer;
|
||||
gboolean res;
|
||||
|
||||
if (!GST_PAD_PEER (queue->sinkpad))
|
||||
if (!(peer = gst_pad_get_peer (queue->sinkpad)))
|
||||
return FALSE;
|
||||
if (!gst_pad_query (GST_PAD_PEER (queue->sinkpad), query))
|
||||
|
||||
res = gst_pad_query (peer, query);
|
||||
gst_object_unref (peer);
|
||||
if (!res)
|
||||
return FALSE;
|
||||
|
||||
switch (GST_QUERY_TYPE (query)) {
|
||||
|
@ -829,15 +870,15 @@ gst_queue_sink_activate_push (GstPad * pad, gboolean active)
|
|||
gboolean result = FALSE;
|
||||
GstQueue *queue;
|
||||
|
||||
queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
|
||||
queue = GST_QUEUE (gst_pad_get_parent (pad));
|
||||
|
||||
if (active) {
|
||||
queue->flushing = FALSE;
|
||||
queue->srcresult = GST_FLOW_OK;
|
||||
result = TRUE;
|
||||
} else {
|
||||
/* step 1, unblock chain and loop functions */
|
||||
GST_QUEUE_MUTEX_LOCK;
|
||||
queue->flushing = TRUE;
|
||||
queue->srcresult = GST_FLOW_WRONG_STATE;
|
||||
gst_queue_locked_flush (queue);
|
||||
g_cond_signal (queue->item_del);
|
||||
GST_QUEUE_MUTEX_UNLOCK;
|
||||
|
@ -845,6 +886,7 @@ gst_queue_sink_activate_push (GstPad * pad, gboolean active)
|
|||
/* step 2, make sure streaming finishes */
|
||||
result = gst_pad_stop_task (pad);
|
||||
}
|
||||
gst_object_unref (queue);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
@ -855,17 +897,17 @@ gst_queue_src_activate_push (GstPad * pad, gboolean active)
|
|||
gboolean result = FALSE;
|
||||
GstQueue *queue;
|
||||
|
||||
queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
|
||||
queue = GST_QUEUE (gst_pad_get_parent (pad));
|
||||
|
||||
if (active) {
|
||||
GST_QUEUE_MUTEX_LOCK;
|
||||
queue->flushing = FALSE;
|
||||
queue->srcresult = GST_FLOW_OK;
|
||||
result = gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad);
|
||||
GST_QUEUE_MUTEX_UNLOCK;
|
||||
} else {
|
||||
/* step 1, unblock chain and loop functions */
|
||||
GST_QUEUE_MUTEX_LOCK;
|
||||
queue->flushing = TRUE;
|
||||
queue->srcresult = GST_FLOW_WRONG_STATE;
|
||||
g_cond_signal (queue->item_add);
|
||||
GST_QUEUE_MUTEX_UNLOCK;
|
||||
|
||||
|
@ -873,6 +915,8 @@ gst_queue_src_activate_push (GstPad * pad, gboolean active)
|
|||
result = gst_pad_stop_task (pad);
|
||||
}
|
||||
|
||||
gst_object_unref (queue);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
|
@ -886,9 +930,6 @@ gst_queue_change_state (GstElement * element)
|
|||
|
||||
GST_CAT_LOG_OBJECT (GST_CAT_STATES, element, "starting state change");
|
||||
|
||||
/* lock the queue so another thread (not in sync with this thread's state)
|
||||
* can't call this queue's _loop (or whatever) */
|
||||
|
||||
switch (GST_STATE_TRANSITION (element)) {
|
||||
case GST_STATE_NULL_TO_READY:
|
||||
break;
|
||||
|
@ -913,8 +954,6 @@ gst_queue_change_state (GstElement * element)
|
|||
break;
|
||||
}
|
||||
|
||||
GST_CAT_LOG_OBJECT (GST_CAT_STATES, element, "done with state change");
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -970,6 +1009,8 @@ gst_queue_get_property (GObject * object,
|
|||
{
|
||||
GstQueue *queue = GST_QUEUE (object);
|
||||
|
||||
GST_QUEUE_MUTEX_LOCK;
|
||||
|
||||
switch (prop_id) {
|
||||
case ARG_CUR_LEVEL_BYTES:
|
||||
g_value_set_uint (value, queue->cur_level.bytes);
|
||||
|
@ -1011,4 +1052,6 @@ gst_queue_get_property (GObject * object,
|
|||
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
|
||||
break;
|
||||
}
|
||||
|
||||
GST_QUEUE_MUTEX_UNLOCK;
|
||||
}
|
||||
|
|
|
@ -62,6 +62,9 @@ struct _GstQueue {
|
|||
GstPad *sinkpad;
|
||||
GstPad *srcpad;
|
||||
|
||||
/* flowreturn when srcpad is paused */
|
||||
GstFlowReturn srcresult;
|
||||
|
||||
/* the queue of data we're keeping our grubby hands on */
|
||||
GQueue *queue;
|
||||
|
||||
|
@ -79,7 +82,6 @@ struct _GstQueue {
|
|||
|
||||
/* it the queue should fail on possible deadlocks */
|
||||
gboolean may_deadlock;
|
||||
gboolean flushing;
|
||||
|
||||
GMutex *qlock; /* lock for queue (vs object lock) */
|
||||
GCond *item_add; /* signals buffers now available for reading */
|
||||
|
|
|
@ -338,7 +338,7 @@ gst_queue_init (GstQueue * queue)
|
|||
queue->leaky = GST_QUEUE_NO_LEAK;
|
||||
queue->may_deadlock = TRUE;
|
||||
queue->block_timeout = GST_CLOCK_TIME_NONE;
|
||||
queue->flushing = FALSE;
|
||||
queue->srcresult = GST_FLOW_WRONG_STATE;
|
||||
|
||||
queue->qlock = g_mutex_new ();
|
||||
queue->item_add = g_cond_new ();
|
||||
|
@ -473,22 +473,22 @@ gst_queue_handle_sink_event (GstPad * pad, GstEvent * event)
|
|||
gst_pad_push_event (queue->srcpad, event);
|
||||
if (GST_EVENT_FLUSH_DONE (event)) {
|
||||
GST_QUEUE_MUTEX_LOCK;
|
||||
queue->flushing = FALSE;
|
||||
gst_queue_locked_flush (queue);
|
||||
queue->srcresult = GST_FLOW_OK;
|
||||
gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue_loop,
|
||||
queue->srcpad);
|
||||
GST_QUEUE_MUTEX_UNLOCK;
|
||||
} else {
|
||||
/* now unblock the chain function */
|
||||
GST_QUEUE_MUTEX_LOCK;
|
||||
queue->flushing = TRUE;
|
||||
gst_queue_locked_flush (queue);
|
||||
queue->srcresult = GST_FLOW_WRONG_STATE;
|
||||
/* unblock the loop function */
|
||||
g_cond_signal (queue->item_add);
|
||||
GST_QUEUE_MUTEX_UNLOCK;
|
||||
|
||||
STATUS (queue, "after flush");
|
||||
|
||||
/* make sure it stops */
|
||||
/* make sure it pauses */
|
||||
gst_pad_pause_task (queue->srcpad);
|
||||
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped");
|
||||
}
|
||||
|
@ -548,6 +548,9 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer)
|
|||
/* we have to lock the queue since we span threads */
|
||||
GST_QUEUE_MUTEX_LOCK;
|
||||
|
||||
if (queue->srcresult != GST_FLOW_OK)
|
||||
goto out_flushing;
|
||||
|
||||
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
|
||||
"adding buffer %p of size %d", buffer, GST_BUFFER_SIZE (buffer));
|
||||
|
||||
|
@ -559,6 +562,9 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer)
|
|||
g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_OVERRUN], 0);
|
||||
GST_QUEUE_MUTEX_LOCK;
|
||||
|
||||
if (queue->srcresult != GST_FLOW_OK)
|
||||
goto out_flushing;
|
||||
|
||||
/* how are we going to make space for this buffer? */
|
||||
switch (queue->leaky) {
|
||||
/* leak current buffer */
|
||||
|
@ -621,7 +627,7 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer)
|
|||
STATUS (queue, "waiting for item_del signal from thread using qlock");
|
||||
g_cond_wait (queue->item_del, queue->qlock);
|
||||
|
||||
if (queue->flushing)
|
||||
if (queue->srcresult != GST_FLOW_OK)
|
||||
goto out_flushing;
|
||||
|
||||
/* if there's a pending state change for this queue
|
||||
|
@ -634,6 +640,10 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer)
|
|||
GST_QUEUE_MUTEX_UNLOCK;
|
||||
g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0);
|
||||
GST_QUEUE_MUTEX_LOCK;
|
||||
|
||||
if (queue->srcresult != GST_FLOW_OK)
|
||||
goto out_flushing;
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -654,6 +664,7 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer)
|
|||
|
||||
return GST_FLOW_OK;
|
||||
|
||||
/* special conditions */
|
||||
out_unref:
|
||||
{
|
||||
GST_QUEUE_MUTEX_UNLOCK;
|
||||
|
@ -664,13 +675,15 @@ out_unref:
|
|||
}
|
||||
out_flushing:
|
||||
{
|
||||
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because of flush");
|
||||
GstFlowReturn ret = queue->srcresult;
|
||||
|
||||
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
|
||||
"exit because task paused, reason: %d", ret);
|
||||
GST_QUEUE_MUTEX_UNLOCK;
|
||||
gst_pad_pause_task (queue->srcpad);
|
||||
|
||||
gst_buffer_unref (buffer);
|
||||
|
||||
return GST_FLOW_WRONG_STATE;
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -687,23 +700,28 @@ gst_queue_loop (GstPad * pad)
|
|||
GST_QUEUE_MUTEX_LOCK;
|
||||
|
||||
restart:
|
||||
if (queue->srcresult != GST_FLOW_OK)
|
||||
goto out_flushing;
|
||||
|
||||
while (gst_queue_is_empty (queue)) {
|
||||
GST_QUEUE_MUTEX_UNLOCK;
|
||||
g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_UNDERRUN], 0);
|
||||
GST_QUEUE_MUTEX_LOCK;
|
||||
|
||||
if (queue->srcresult != GST_FLOW_OK)
|
||||
goto out_flushing;
|
||||
|
||||
STATUS (queue, "pre-empty wait");
|
||||
while (gst_queue_is_empty (queue)) {
|
||||
STATUS (queue, "waiting for item_add");
|
||||
|
||||
if (queue->flushing)
|
||||
goto out_flushing;
|
||||
|
||||
GST_LOG_OBJECT (queue, "doing g_cond_wait using qlock from thread %p",
|
||||
g_thread_self ());
|
||||
g_cond_wait (queue->item_add, queue->qlock);
|
||||
|
||||
if (queue->flushing)
|
||||
/* we released the lock in the g_cond above so we might be
|
||||
* flushing now */
|
||||
if (queue->srcresult != GST_FLOW_OK)
|
||||
goto out_flushing;
|
||||
|
||||
GST_LOG_OBJECT (queue, "done g_cond_wait using qlock from thread %p",
|
||||
|
@ -715,6 +733,9 @@ restart:
|
|||
GST_QUEUE_MUTEX_UNLOCK;
|
||||
g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0);
|
||||
GST_QUEUE_MUTEX_LOCK;
|
||||
|
||||
if (queue->srcresult != GST_FLOW_OK)
|
||||
goto out_flushing;
|
||||
}
|
||||
|
||||
/* There's something in the list now, whatever it is */
|
||||
|
@ -734,11 +755,23 @@ restart:
|
|||
GST_QUEUE_MUTEX_UNLOCK;
|
||||
result = gst_pad_push (pad, GST_BUFFER (data));
|
||||
GST_QUEUE_MUTEX_LOCK;
|
||||
/* can opt to check for srcresult here but the push should
|
||||
* return an error value that is more accurate */
|
||||
if (result != GST_FLOW_OK) {
|
||||
queue->srcresult = result;
|
||||
if (GST_FLOW_IS_FATAL (result)) {
|
||||
GST_ELEMENT_ERROR (queue, STREAM, STOPPED,
|
||||
("streaming stopped, reason %d", result),
|
||||
("streaming stopped, reason %d", result));
|
||||
gst_pad_push_event (queue->srcpad, gst_event_new (GST_EVENT_EOS));
|
||||
}
|
||||
gst_pad_pause_task (queue->srcpad);
|
||||
}
|
||||
} else {
|
||||
if (GST_EVENT_TYPE (data) == GST_EVENT_EOS) {
|
||||
/* all incomming data is now unexpected */
|
||||
queue->srcresult = GST_FLOW_UNEXPECTED;
|
||||
/* and we don't need to process anymore */
|
||||
gst_pad_pause_task (queue->srcpad);
|
||||
restart = FALSE;
|
||||
}
|
||||
|
@ -754,13 +787,17 @@ restart:
|
|||
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "signalling item_del");
|
||||
g_cond_signal (queue->item_del);
|
||||
GST_QUEUE_MUTEX_UNLOCK;
|
||||
|
||||
return;
|
||||
|
||||
out_flushing:
|
||||
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because of flush");
|
||||
gst_pad_pause_task (pad);
|
||||
GST_QUEUE_MUTEX_UNLOCK;
|
||||
return;
|
||||
{
|
||||
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
|
||||
"exit because task paused, reason: %d", queue->srcresult);
|
||||
GST_QUEUE_MUTEX_UNLOCK;
|
||||
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -768,15 +805,14 @@ static gboolean
|
|||
gst_queue_handle_src_event (GstPad * pad, GstEvent * event)
|
||||
{
|
||||
gboolean res = TRUE;
|
||||
|
||||
#ifndef GST_DISABLE_GST_DEBUG
|
||||
GstQueue *queue = GST_QUEUE (GST_PAD_PARENT (pad));
|
||||
|
||||
#ifndef GST_DISABLE_GST_DEBUG
|
||||
GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%d)",
|
||||
event, GST_EVENT_TYPE (event));
|
||||
#endif
|
||||
|
||||
res = gst_pad_event_default (pad, event);
|
||||
res = gst_pad_push_event (queue->sinkpad, event);
|
||||
|
||||
return res;
|
||||
}
|
||||
|
@ -785,10 +821,15 @@ static gboolean
|
|||
gst_queue_handle_src_query (GstPad * pad, GstQuery * query)
|
||||
{
|
||||
GstQueue *queue = GST_QUEUE (GST_PAD_PARENT (pad));
|
||||
GstPad *peer;
|
||||
gboolean res;
|
||||
|
||||
if (!GST_PAD_PEER (queue->sinkpad))
|
||||
if (!(peer = gst_pad_get_peer (queue->sinkpad)))
|
||||
return FALSE;
|
||||
if (!gst_pad_query (GST_PAD_PEER (queue->sinkpad), query))
|
||||
|
||||
res = gst_pad_query (peer, query);
|
||||
gst_object_unref (peer);
|
||||
if (!res)
|
||||
return FALSE;
|
||||
|
||||
switch (GST_QUERY_TYPE (query)) {
|
||||
|
@ -829,15 +870,15 @@ gst_queue_sink_activate_push (GstPad * pad, gboolean active)
|
|||
gboolean result = FALSE;
|
||||
GstQueue *queue;
|
||||
|
||||
queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
|
||||
queue = GST_QUEUE (gst_pad_get_parent (pad));
|
||||
|
||||
if (active) {
|
||||
queue->flushing = FALSE;
|
||||
queue->srcresult = GST_FLOW_OK;
|
||||
result = TRUE;
|
||||
} else {
|
||||
/* step 1, unblock chain and loop functions */
|
||||
GST_QUEUE_MUTEX_LOCK;
|
||||
queue->flushing = TRUE;
|
||||
queue->srcresult = GST_FLOW_WRONG_STATE;
|
||||
gst_queue_locked_flush (queue);
|
||||
g_cond_signal (queue->item_del);
|
||||
GST_QUEUE_MUTEX_UNLOCK;
|
||||
|
@ -845,6 +886,7 @@ gst_queue_sink_activate_push (GstPad * pad, gboolean active)
|
|||
/* step 2, make sure streaming finishes */
|
||||
result = gst_pad_stop_task (pad);
|
||||
}
|
||||
gst_object_unref (queue);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
@ -855,17 +897,17 @@ gst_queue_src_activate_push (GstPad * pad, gboolean active)
|
|||
gboolean result = FALSE;
|
||||
GstQueue *queue;
|
||||
|
||||
queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
|
||||
queue = GST_QUEUE (gst_pad_get_parent (pad));
|
||||
|
||||
if (active) {
|
||||
GST_QUEUE_MUTEX_LOCK;
|
||||
queue->flushing = FALSE;
|
||||
queue->srcresult = GST_FLOW_OK;
|
||||
result = gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad);
|
||||
GST_QUEUE_MUTEX_UNLOCK;
|
||||
} else {
|
||||
/* step 1, unblock chain and loop functions */
|
||||
GST_QUEUE_MUTEX_LOCK;
|
||||
queue->flushing = TRUE;
|
||||
queue->srcresult = GST_FLOW_WRONG_STATE;
|
||||
g_cond_signal (queue->item_add);
|
||||
GST_QUEUE_MUTEX_UNLOCK;
|
||||
|
||||
|
@ -873,6 +915,8 @@ gst_queue_src_activate_push (GstPad * pad, gboolean active)
|
|||
result = gst_pad_stop_task (pad);
|
||||
}
|
||||
|
||||
gst_object_unref (queue);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
|
@ -886,9 +930,6 @@ gst_queue_change_state (GstElement * element)
|
|||
|
||||
GST_CAT_LOG_OBJECT (GST_CAT_STATES, element, "starting state change");
|
||||
|
||||
/* lock the queue so another thread (not in sync with this thread's state)
|
||||
* can't call this queue's _loop (or whatever) */
|
||||
|
||||
switch (GST_STATE_TRANSITION (element)) {
|
||||
case GST_STATE_NULL_TO_READY:
|
||||
break;
|
||||
|
@ -913,8 +954,6 @@ gst_queue_change_state (GstElement * element)
|
|||
break;
|
||||
}
|
||||
|
||||
GST_CAT_LOG_OBJECT (GST_CAT_STATES, element, "done with state change");
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -970,6 +1009,8 @@ gst_queue_get_property (GObject * object,
|
|||
{
|
||||
GstQueue *queue = GST_QUEUE (object);
|
||||
|
||||
GST_QUEUE_MUTEX_LOCK;
|
||||
|
||||
switch (prop_id) {
|
||||
case ARG_CUR_LEVEL_BYTES:
|
||||
g_value_set_uint (value, queue->cur_level.bytes);
|
||||
|
@ -1011,4 +1052,6 @@ gst_queue_get_property (GObject * object,
|
|||
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
|
||||
break;
|
||||
}
|
||||
|
||||
GST_QUEUE_MUTEX_UNLOCK;
|
||||
}
|
||||
|
|
|
@ -62,6 +62,9 @@ struct _GstQueue {
|
|||
GstPad *sinkpad;
|
||||
GstPad *srcpad;
|
||||
|
||||
/* flowreturn when srcpad is paused */
|
||||
GstFlowReturn srcresult;
|
||||
|
||||
/* the queue of data we're keeping our grubby hands on */
|
||||
GQueue *queue;
|
||||
|
||||
|
@ -79,7 +82,6 @@ struct _GstQueue {
|
|||
|
||||
/* it the queue should fail on possible deadlocks */
|
||||
gboolean may_deadlock;
|
||||
gboolean flushing;
|
||||
|
||||
GMutex *qlock; /* lock for queue (vs object lock) */
|
||||
GCond *item_add; /* signals buffers now available for reading */
|
||||
|
|
Loading…
Reference in a new issue