mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2025-06-05 15:08:53 +00:00
queue2: implement flushing in download buffering
Maintain a separate variable to control src and sink flowreturn values so that we can unlock the src part without shutting down the sink part. Add flushing for upstream pull based elements that unblocks our getrange function. This implements seeking when blocking for more data. Add some arbitrary threshold before attempting a seek. Add a FIXME for this because we need to find a sensible threshold based on the input rate.
This commit is contained in:
parent
747b1b3638
commit
11304c1257
2 changed files with 80 additions and 32 deletions
|
@ -152,9 +152,9 @@ enum
|
||||||
g_mutex_lock (q->qlock); \
|
g_mutex_lock (q->qlock); \
|
||||||
} G_STMT_END
|
} G_STMT_END
|
||||||
|
|
||||||
#define GST_QUEUE2_MUTEX_LOCK_CHECK(q,label) G_STMT_START { \
|
#define GST_QUEUE2_MUTEX_LOCK_CHECK(q,res,label) G_STMT_START { \
|
||||||
GST_QUEUE2_MUTEX_LOCK (q); \
|
GST_QUEUE2_MUTEX_LOCK (q); \
|
||||||
if (q->srcresult != GST_FLOW_OK) \
|
if (res != GST_FLOW_OK) \
|
||||||
goto label; \
|
goto label; \
|
||||||
} G_STMT_END
|
} G_STMT_END
|
||||||
|
|
||||||
|
@ -162,24 +162,24 @@ enum
|
||||||
g_mutex_unlock (q->qlock); \
|
g_mutex_unlock (q->qlock); \
|
||||||
} G_STMT_END
|
} G_STMT_END
|
||||||
|
|
||||||
#define GST_QUEUE2_WAIT_DEL_CHECK(q, label) G_STMT_START { \
|
#define GST_QUEUE2_WAIT_DEL_CHECK(q, res, label) G_STMT_START { \
|
||||||
STATUS (queue, q->sinkpad, "wait for DEL"); \
|
STATUS (queue, q->sinkpad, "wait for DEL"); \
|
||||||
q->waiting_del = TRUE; \
|
q->waiting_del = TRUE; \
|
||||||
g_cond_wait (q->item_del, queue->qlock); \
|
g_cond_wait (q->item_del, queue->qlock); \
|
||||||
q->waiting_del = FALSE; \
|
q->waiting_del = FALSE; \
|
||||||
if (q->srcresult != GST_FLOW_OK) { \
|
if (res != GST_FLOW_OK) { \
|
||||||
STATUS (queue, q->srcpad, "received DEL wakeup"); \
|
STATUS (queue, q->srcpad, "received DEL wakeup"); \
|
||||||
goto label; \
|
goto label; \
|
||||||
} \
|
} \
|
||||||
STATUS (queue, q->sinkpad, "received DEL"); \
|
STATUS (queue, q->sinkpad, "received DEL"); \
|
||||||
} G_STMT_END
|
} G_STMT_END
|
||||||
|
|
||||||
#define GST_QUEUE2_WAIT_ADD_CHECK(q, label) G_STMT_START { \
|
#define GST_QUEUE2_WAIT_ADD_CHECK(q, res, label) G_STMT_START { \
|
||||||
STATUS (queue, q->srcpad, "wait for ADD"); \
|
STATUS (queue, q->srcpad, "wait for ADD"); \
|
||||||
q->waiting_add = TRUE; \
|
q->waiting_add = TRUE; \
|
||||||
g_cond_wait (q->item_add, q->qlock); \
|
g_cond_wait (q->item_add, q->qlock); \
|
||||||
q->waiting_add = FALSE; \
|
q->waiting_add = FALSE; \
|
||||||
if (q->srcresult != GST_FLOW_OK) { \
|
if (res != GST_FLOW_OK) { \
|
||||||
STATUS (queue, q->srcpad, "received ADD wakeup"); \
|
STATUS (queue, q->srcpad, "received ADD wakeup"); \
|
||||||
goto label; \
|
goto label; \
|
||||||
} \
|
} \
|
||||||
|
@ -403,6 +403,7 @@ gst_queue2_init (GstQueue2 * queue, GstQueue2Class * g_class)
|
||||||
gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
|
gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
|
||||||
|
|
||||||
queue->srcresult = GST_FLOW_WRONG_STATE;
|
queue->srcresult = GST_FLOW_WRONG_STATE;
|
||||||
|
queue->sinkresult = GST_FLOW_WRONG_STATE;
|
||||||
queue->is_eos = FALSE;
|
queue->is_eos = FALSE;
|
||||||
queue->in_timer = g_timer_new ();
|
queue->in_timer = g_timer_new ();
|
||||||
queue->out_timer = g_timer_new ();
|
queue->out_timer = g_timer_new ();
|
||||||
|
@ -987,7 +988,12 @@ gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length)
|
||||||
return TRUE;
|
return TRUE;
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
/* we don't have the range, see how far away we are */
|
/* we don't have the range, see how far away we are, FIXME, find a good
|
||||||
|
* threshold based on the incomming rate. */
|
||||||
|
if (queue->current && offset < queue->current->writing_pos + 200000)
|
||||||
|
return FALSE;
|
||||||
|
|
||||||
|
/* too far away, do a seek */
|
||||||
perform_seek_to_offset (queue, offset);
|
perform_seek_to_offset (queue, offset);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1005,7 +1011,7 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
|
||||||
/* check if we have enough data at @offset. If there is not enough data, we
|
/* check if we have enough data at @offset. If there is not enough data, we
|
||||||
* block and wait. */
|
* block and wait. */
|
||||||
while (!gst_queue2_have_data (queue, offset, length)) {
|
while (!gst_queue2_have_data (queue, offset, length)) {
|
||||||
GST_QUEUE2_WAIT_ADD_CHECK (queue, out_flushing);
|
GST_QUEUE2_WAIT_ADD_CHECK (queue, queue->srcresult, out_flushing);
|
||||||
}
|
}
|
||||||
|
|
||||||
#ifdef HAVE_FSEEKO
|
#ifdef HAVE_FSEEKO
|
||||||
|
@ -1438,9 +1444,10 @@ gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event)
|
||||||
/* now unblock the chain function */
|
/* now unblock the chain function */
|
||||||
GST_QUEUE2_MUTEX_LOCK (queue);
|
GST_QUEUE2_MUTEX_LOCK (queue);
|
||||||
queue->srcresult = GST_FLOW_WRONG_STATE;
|
queue->srcresult = GST_FLOW_WRONG_STATE;
|
||||||
|
queue->sinkresult = GST_FLOW_WRONG_STATE;
|
||||||
/* unblock the loop and chain functions */
|
/* unblock the loop and chain functions */
|
||||||
g_cond_signal (queue->item_add);
|
GST_QUEUE2_SIGNAL_ADD (queue);
|
||||||
g_cond_signal (queue->item_del);
|
GST_QUEUE2_SIGNAL_DEL (queue);
|
||||||
GST_QUEUE2_MUTEX_UNLOCK (queue);
|
GST_QUEUE2_MUTEX_UNLOCK (queue);
|
||||||
|
|
||||||
/* make sure it pauses, this should happen since we sent
|
/* make sure it pauses, this should happen since we sent
|
||||||
|
@ -1461,6 +1468,7 @@ gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event)
|
||||||
GST_QUEUE2_MUTEX_LOCK (queue);
|
GST_QUEUE2_MUTEX_LOCK (queue);
|
||||||
gst_queue2_locked_flush (queue);
|
gst_queue2_locked_flush (queue);
|
||||||
queue->srcresult = GST_FLOW_OK;
|
queue->srcresult = GST_FLOW_OK;
|
||||||
|
queue->sinkresult = GST_FLOW_OK;
|
||||||
queue->is_eos = FALSE;
|
queue->is_eos = FALSE;
|
||||||
queue->unexpected = FALSE;
|
queue->unexpected = FALSE;
|
||||||
/* reset rate counters */
|
/* reset rate counters */
|
||||||
|
@ -1480,7 +1488,7 @@ gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event)
|
||||||
default:
|
default:
|
||||||
if (GST_EVENT_IS_SERIALIZED (event)) {
|
if (GST_EVENT_IS_SERIALIZED (event)) {
|
||||||
/* serialized events go in the queue */
|
/* serialized events go in the queue */
|
||||||
GST_QUEUE2_MUTEX_LOCK_CHECK (queue, out_flushing);
|
GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
|
||||||
/* refuse more events on EOS */
|
/* refuse more events on EOS */
|
||||||
if (queue->is_eos)
|
if (queue->is_eos)
|
||||||
goto out_eos;
|
goto out_eos;
|
||||||
|
@ -1575,7 +1583,7 @@ gst_queue2_chain (GstPad * pad, GstBuffer * buffer)
|
||||||
GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
|
GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
|
||||||
|
|
||||||
/* we have to lock the queue since we span threads */
|
/* we have to lock the queue since we span threads */
|
||||||
GST_QUEUE2_MUTEX_LOCK_CHECK (queue, out_flushing);
|
GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
|
||||||
/* when we received EOS, we refuse more data */
|
/* when we received EOS, we refuse more data */
|
||||||
if (queue->is_eos)
|
if (queue->is_eos)
|
||||||
goto out_eos;
|
goto out_eos;
|
||||||
|
@ -1597,7 +1605,7 @@ gst_queue2_chain (GstPad * pad, GstBuffer * buffer)
|
||||||
"queue is full, waiting for free space");
|
"queue is full, waiting for free space");
|
||||||
do {
|
do {
|
||||||
/* Wait for space to be available, we could be unlocked because of a flush. */
|
/* Wait for space to be available, we could be unlocked because of a flush. */
|
||||||
GST_QUEUE2_WAIT_DEL_CHECK (queue, out_flushing);
|
GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing);
|
||||||
}
|
}
|
||||||
while (gst_queue2_is_filled (queue));
|
while (gst_queue2_is_filled (queue));
|
||||||
|
|
||||||
|
@ -1615,7 +1623,7 @@ gst_queue2_chain (GstPad * pad, GstBuffer * buffer)
|
||||||
/* special conditions */
|
/* special conditions */
|
||||||
out_flushing:
|
out_flushing:
|
||||||
{
|
{
|
||||||
GstFlowReturn ret = queue->srcresult;
|
GstFlowReturn ret = queue->sinkresult;
|
||||||
|
|
||||||
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
|
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
|
||||||
"exit because task paused, reason: %s", gst_flow_get_name (ret));
|
"exit because task paused, reason: %s", gst_flow_get_name (ret));
|
||||||
|
@ -1673,7 +1681,7 @@ next:
|
||||||
result = gst_pad_push (queue->srcpad, buffer);
|
result = gst_pad_push (queue->srcpad, buffer);
|
||||||
|
|
||||||
/* need to check for srcresult here as well */
|
/* need to check for srcresult here as well */
|
||||||
GST_QUEUE2_MUTEX_LOCK_CHECK (queue, out_flushing);
|
GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
|
||||||
if (result == GST_FLOW_UNEXPECTED) {
|
if (result == GST_FLOW_UNEXPECTED) {
|
||||||
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
|
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
|
||||||
"got UNEXPECTED from downstream");
|
"got UNEXPECTED from downstream");
|
||||||
|
@ -1718,7 +1726,7 @@ next:
|
||||||
|
|
||||||
gst_pad_push_event (queue->srcpad, event);
|
gst_pad_push_event (queue->srcpad, event);
|
||||||
|
|
||||||
GST_QUEUE2_MUTEX_LOCK_CHECK (queue, out_flushing);
|
GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
|
||||||
/* if we're EOS, return UNEXPECTED so that the task pauses. */
|
/* if we're EOS, return UNEXPECTED so that the task pauses. */
|
||||||
if (type == GST_EVENT_EOS) {
|
if (type == GST_EVENT_EOS) {
|
||||||
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
|
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
|
||||||
|
@ -1753,7 +1761,7 @@ gst_queue2_loop (GstPad * pad)
|
||||||
queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
|
queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
|
||||||
|
|
||||||
/* have to lock for thread-safety */
|
/* have to lock for thread-safety */
|
||||||
GST_QUEUE2_MUTEX_LOCK_CHECK (queue, out_flushing);
|
GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
|
||||||
|
|
||||||
if (gst_queue2_is_empty (queue)) {
|
if (gst_queue2_is_empty (queue)) {
|
||||||
gboolean started;
|
gboolean started;
|
||||||
|
@ -1767,7 +1775,7 @@ gst_queue2_loop (GstPad * pad)
|
||||||
"queue is empty, waiting for new data");
|
"queue is empty, waiting for new data");
|
||||||
do {
|
do {
|
||||||
/* Wait for data to be available, we could be unlocked because of a flush. */
|
/* Wait for data to be available, we could be unlocked because of a flush. */
|
||||||
GST_QUEUE2_WAIT_ADD_CHECK (queue, out_flushing);
|
GST_QUEUE2_WAIT_ADD_CHECK (queue, queue->srcresult, out_flushing);
|
||||||
}
|
}
|
||||||
while (gst_queue2_is_empty (queue));
|
while (gst_queue2_is_empty (queue));
|
||||||
|
|
||||||
|
@ -1777,6 +1785,7 @@ gst_queue2_loop (GstPad * pad)
|
||||||
}
|
}
|
||||||
ret = gst_queue2_push_one (queue);
|
ret = gst_queue2_push_one (queue);
|
||||||
queue->srcresult = ret;
|
queue->srcresult = ret;
|
||||||
|
queue->sinkresult = ret;
|
||||||
if (ret != GST_FLOW_OK)
|
if (ret != GST_FLOW_OK)
|
||||||
goto out_flushing;
|
goto out_flushing;
|
||||||
|
|
||||||
|
@ -1819,14 +1828,43 @@ gst_queue2_handle_src_event (GstPad * pad, GstEvent * event)
|
||||||
event, GST_EVENT_TYPE_NAME (event));
|
event, GST_EVENT_TYPE_NAME (event));
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
switch (GST_EVENT_TYPE (event)) {
|
||||||
|
case GST_EVENT_FLUSH_START:
|
||||||
if (!QUEUE_IS_USING_TEMP_FILE (queue)) {
|
if (!QUEUE_IS_USING_TEMP_FILE (queue)) {
|
||||||
/* just forward upstream */
|
/* just forward upstream */
|
||||||
res = gst_pad_push_event (queue->sinkpad, event);
|
res = gst_pad_push_event (queue->sinkpad, event);
|
||||||
} else {
|
} else {
|
||||||
|
/* now unblock the getrange function */
|
||||||
|
GST_QUEUE2_MUTEX_LOCK (queue);
|
||||||
|
GST_DEBUG_OBJECT (queue, "flushing");
|
||||||
|
queue->srcresult = GST_FLOW_WRONG_STATE;
|
||||||
|
GST_QUEUE2_SIGNAL_ADD (queue);
|
||||||
|
GST_QUEUE2_MUTEX_UNLOCK (queue);
|
||||||
|
|
||||||
/* when using a temp file, we eat the event */
|
/* when using a temp file, we eat the event */
|
||||||
res = TRUE;
|
res = TRUE;
|
||||||
gst_event_unref (event);
|
gst_event_unref (event);
|
||||||
}
|
}
|
||||||
|
break;
|
||||||
|
case GST_EVENT_FLUSH_STOP:
|
||||||
|
if (!QUEUE_IS_USING_TEMP_FILE (queue)) {
|
||||||
|
/* just forward upstream */
|
||||||
|
res = gst_pad_push_event (queue->sinkpad, event);
|
||||||
|
} else {
|
||||||
|
/* now unblock the getrange function */
|
||||||
|
GST_QUEUE2_MUTEX_LOCK (queue);
|
||||||
|
queue->srcresult = GST_FLOW_OK;
|
||||||
|
GST_QUEUE2_MUTEX_UNLOCK (queue);
|
||||||
|
|
||||||
|
/* when using a temp file, we eat the event */
|
||||||
|
res = TRUE;
|
||||||
|
gst_event_unref (event);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
res = gst_pad_push_event (queue->sinkpad, event);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
@ -1972,7 +2010,7 @@ gst_queue2_get_range (GstPad * pad, guint64 offset, guint length,
|
||||||
|
|
||||||
queue = GST_QUEUE2_CAST (gst_pad_get_parent (pad));
|
queue = GST_QUEUE2_CAST (gst_pad_get_parent (pad));
|
||||||
|
|
||||||
GST_QUEUE2_MUTEX_LOCK_CHECK (queue, out_flushing);
|
GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
|
||||||
length = (length == -1) ? DEFAULT_BUFFER_SIZE : length;
|
length = (length == -1) ? DEFAULT_BUFFER_SIZE : length;
|
||||||
offset = (offset == -1) ? queue->current->reading_pos : offset;
|
offset = (offset == -1) ? queue->current->reading_pos : offset;
|
||||||
|
|
||||||
|
@ -1987,9 +2025,11 @@ gst_queue2_get_range (GstPad * pad, guint64 offset, guint length,
|
||||||
/* ERRORS */
|
/* ERRORS */
|
||||||
out_flushing:
|
out_flushing:
|
||||||
{
|
{
|
||||||
|
ret = queue->srcresult;
|
||||||
|
|
||||||
GST_DEBUG_OBJECT (queue, "we are flushing");
|
GST_DEBUG_OBJECT (queue, "we are flushing");
|
||||||
GST_QUEUE2_MUTEX_UNLOCK (queue);
|
GST_QUEUE2_MUTEX_UNLOCK (queue);
|
||||||
return GST_FLOW_WRONG_STATE;
|
return ret;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2022,6 +2062,7 @@ gst_queue2_sink_activate_push (GstPad * pad, gboolean active)
|
||||||
GST_QUEUE2_MUTEX_LOCK (queue);
|
GST_QUEUE2_MUTEX_LOCK (queue);
|
||||||
GST_DEBUG_OBJECT (queue, "activating push mode");
|
GST_DEBUG_OBJECT (queue, "activating push mode");
|
||||||
queue->srcresult = GST_FLOW_OK;
|
queue->srcresult = GST_FLOW_OK;
|
||||||
|
queue->sinkresult = GST_FLOW_OK;
|
||||||
queue->is_eos = FALSE;
|
queue->is_eos = FALSE;
|
||||||
queue->unexpected = FALSE;
|
queue->unexpected = FALSE;
|
||||||
reset_rate_timer (queue);
|
reset_rate_timer (queue);
|
||||||
|
@ -2031,6 +2072,7 @@ gst_queue2_sink_activate_push (GstPad * pad, gboolean active)
|
||||||
GST_QUEUE2_MUTEX_LOCK (queue);
|
GST_QUEUE2_MUTEX_LOCK (queue);
|
||||||
GST_DEBUG_OBJECT (queue, "deactivating push mode");
|
GST_DEBUG_OBJECT (queue, "deactivating push mode");
|
||||||
queue->srcresult = GST_FLOW_WRONG_STATE;
|
queue->srcresult = GST_FLOW_WRONG_STATE;
|
||||||
|
queue->sinkresult = GST_FLOW_WRONG_STATE;
|
||||||
gst_queue2_locked_flush (queue);
|
gst_queue2_locked_flush (queue);
|
||||||
GST_QUEUE2_MUTEX_UNLOCK (queue);
|
GST_QUEUE2_MUTEX_UNLOCK (queue);
|
||||||
}
|
}
|
||||||
|
@ -2054,6 +2096,7 @@ gst_queue2_src_activate_push (GstPad * pad, gboolean active)
|
||||||
GST_QUEUE2_MUTEX_LOCK (queue);
|
GST_QUEUE2_MUTEX_LOCK (queue);
|
||||||
GST_DEBUG_OBJECT (queue, "activating push mode");
|
GST_DEBUG_OBJECT (queue, "activating push mode");
|
||||||
queue->srcresult = GST_FLOW_OK;
|
queue->srcresult = GST_FLOW_OK;
|
||||||
|
queue->sinkresult = GST_FLOW_OK;
|
||||||
queue->is_eos = FALSE;
|
queue->is_eos = FALSE;
|
||||||
queue->unexpected = FALSE;
|
queue->unexpected = FALSE;
|
||||||
result = gst_pad_start_task (pad, (GstTaskFunction) gst_queue2_loop, pad);
|
result = gst_pad_start_task (pad, (GstTaskFunction) gst_queue2_loop, pad);
|
||||||
|
@ -2063,8 +2106,9 @@ gst_queue2_src_activate_push (GstPad * pad, gboolean active)
|
||||||
GST_QUEUE2_MUTEX_LOCK (queue);
|
GST_QUEUE2_MUTEX_LOCK (queue);
|
||||||
GST_DEBUG_OBJECT (queue, "deactivating push mode");
|
GST_DEBUG_OBJECT (queue, "deactivating push mode");
|
||||||
queue->srcresult = GST_FLOW_WRONG_STATE;
|
queue->srcresult = GST_FLOW_WRONG_STATE;
|
||||||
|
queue->sinkresult = GST_FLOW_WRONG_STATE;
|
||||||
/* the item add signal will unblock */
|
/* the item add signal will unblock */
|
||||||
g_cond_signal (queue->item_add);
|
GST_QUEUE2_SIGNAL_ADD (queue);
|
||||||
GST_QUEUE2_MUTEX_UNLOCK (queue);
|
GST_QUEUE2_MUTEX_UNLOCK (queue);
|
||||||
|
|
||||||
/* step 2, make sure streaming finishes */
|
/* step 2, make sure streaming finishes */
|
||||||
|
@ -2093,6 +2137,7 @@ gst_queue2_src_activate_pull (GstPad * pad, gboolean active)
|
||||||
GST_QUEUE2_MUTEX_LOCK (queue);
|
GST_QUEUE2_MUTEX_LOCK (queue);
|
||||||
GST_DEBUG_OBJECT (queue, "activating pull mode");
|
GST_DEBUG_OBJECT (queue, "activating pull mode");
|
||||||
queue->srcresult = GST_FLOW_OK;
|
queue->srcresult = GST_FLOW_OK;
|
||||||
|
queue->sinkresult = GST_FLOW_OK;
|
||||||
queue->is_eos = FALSE;
|
queue->is_eos = FALSE;
|
||||||
queue->unexpected = FALSE;
|
queue->unexpected = FALSE;
|
||||||
GST_QUEUE2_MUTEX_UNLOCK (queue);
|
GST_QUEUE2_MUTEX_UNLOCK (queue);
|
||||||
|
@ -2102,6 +2147,7 @@ gst_queue2_src_activate_pull (GstPad * pad, gboolean active)
|
||||||
/* this is not allowed, we cannot operate in pull mode without a temp
|
/* this is not allowed, we cannot operate in pull mode without a temp
|
||||||
* file. */
|
* file. */
|
||||||
queue->srcresult = GST_FLOW_WRONG_STATE;
|
queue->srcresult = GST_FLOW_WRONG_STATE;
|
||||||
|
queue->sinkresult = GST_FLOW_WRONG_STATE;
|
||||||
result = FALSE;
|
result = FALSE;
|
||||||
GST_QUEUE2_MUTEX_UNLOCK (queue);
|
GST_QUEUE2_MUTEX_UNLOCK (queue);
|
||||||
}
|
}
|
||||||
|
@ -2109,8 +2155,9 @@ gst_queue2_src_activate_pull (GstPad * pad, gboolean active)
|
||||||
GST_QUEUE2_MUTEX_LOCK (queue);
|
GST_QUEUE2_MUTEX_LOCK (queue);
|
||||||
GST_DEBUG_OBJECT (queue, "deactivating pull mode");
|
GST_DEBUG_OBJECT (queue, "deactivating pull mode");
|
||||||
queue->srcresult = GST_FLOW_WRONG_STATE;
|
queue->srcresult = GST_FLOW_WRONG_STATE;
|
||||||
|
queue->sinkresult = GST_FLOW_WRONG_STATE;
|
||||||
/* this will unlock getrange */
|
/* this will unlock getrange */
|
||||||
g_cond_signal (queue->item_add);
|
GST_QUEUE2_SIGNAL_ADD (queue);
|
||||||
result = TRUE;
|
result = TRUE;
|
||||||
GST_QUEUE2_MUTEX_UNLOCK (queue);
|
GST_QUEUE2_MUTEX_UNLOCK (queue);
|
||||||
}
|
}
|
||||||
|
@ -2170,14 +2217,14 @@ gst_queue2_change_state (GstElement * element, GstStateChange transition)
|
||||||
* the _chain function, it might have more room now
|
* the _chain function, it might have more room now
|
||||||
* to store the buffer/event in the queue */
|
* to store the buffer/event in the queue */
|
||||||
#define QUEUE_CAPACITY_CHANGE(q)\
|
#define QUEUE_CAPACITY_CHANGE(q)\
|
||||||
g_cond_signal (queue->item_del);
|
GST_QUEUE2_SIGNAL_DEL (queue);
|
||||||
|
|
||||||
/* Changing the minimum required fill level must
|
/* Changing the minimum required fill level must
|
||||||
* wake up the _loop function as it might now
|
* wake up the _loop function as it might now
|
||||||
* be able to preceed.
|
* be able to preceed.
|
||||||
*/
|
*/
|
||||||
#define QUEUE_THRESHOLD_CHANGE(q)\
|
#define QUEUE_THRESHOLD_CHANGE(q)\
|
||||||
g_cond_signal (queue->item_add);
|
GST_QUEUE2_SIGNAL_ADD (queue);
|
||||||
|
|
||||||
static void
|
static void
|
||||||
gst_queue2_set_temp_template (GstQueue2 * queue, const gchar * template)
|
gst_queue2_set_temp_template (GstQueue2 * queue, const gchar * template)
|
||||||
|
|
|
@ -80,6 +80,7 @@ struct _GstQueue2
|
||||||
|
|
||||||
/* flowreturn when srcpad is paused */
|
/* flowreturn when srcpad is paused */
|
||||||
GstFlowReturn srcresult;
|
GstFlowReturn srcresult;
|
||||||
|
GstFlowReturn sinkresult;
|
||||||
gboolean is_eos;
|
gboolean is_eos;
|
||||||
gboolean unexpected;
|
gboolean unexpected;
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue