gst/playback/gstdecodebin.c: Also catch queue underruns but don't do anything yet.

Original commit message from CVS:
* gst/playback/gstdecodebin.c: (try_to_link_1), (queue_enlarge),
(queue_underrun_cb), (queue_filled_cb):
Also catch queue underruns but don't do anything yet.
Refactor and comment queue enlarging code a bit.
* gst/playback/gstplaybasebin.c: (queue_overrun),
(queue_threshold_reached), (queue_out_of_data),
(gen_preroll_element):
If a queue over/underruns check that we don't create nasty
deadlocks when the min-threshold is not reached but the
max-bytes is. In those cases disable max-bytes when we
know that the queue is fed timed data.
Add more comments.
This commit is contained in:
Wim Taymans 2006-05-11 19:38:22 +00:00
parent 7cae9d887f
commit 03c8d8ae43
3 changed files with 175 additions and 25 deletions

View file

@ -1,3 +1,19 @@
2006-05-11 Wim Taymans <wim@fluendo.com>
* gst/playback/gstdecodebin.c: (try_to_link_1), (queue_enlarge),
(queue_underrun_cb), (queue_filled_cb):
Also catch queue underruns but don't do anything yet.
Refactor and comment queue enlarging code a bit.
* gst/playback/gstplaybasebin.c: (queue_overrun),
(queue_threshold_reached), (queue_out_of_data),
(gen_preroll_element):
If a queue over/underruns check that we don't create nasty
deadlocks when the min-threshold is not reached but the
max-bytes is. In those cases disable max-bytes when we
know that the queue is fed timed data.
Add more comments.
2006-05-11 Tim-Philipp Müller <tim at centricular dot net> 2006-05-11 Tim-Philipp Müller <tim at centricular dot net>
* gst/playback/gstplaybin.c: (gen_audio_element): * gst/playback/gstplaybin.c: (gen_audio_element):

View file

@ -138,6 +138,7 @@ static void new_pad (GstElement * element, GstPad * pad, GstDynamic * dynamic);
static void no_more_pads (GstElement * element, GstDynamic * dynamic); static void no_more_pads (GstElement * element, GstDynamic * dynamic);
static void queue_filled_cb (GstElement * queue, GstDecodeBin * decode_bin); static void queue_filled_cb (GstElement * queue, GstDecodeBin * decode_bin);
static void queue_underrun_cb (GstElement * queue, GstDecodeBin * decode_bin);
static GstElementClass *parent_class; static GstElementClass *parent_class;
static guint gst_decode_bin_signals[LAST_SIGNAL] = { 0 }; static guint gst_decode_bin_signals[LAST_SIGNAL] = { 0 };
@ -805,6 +806,8 @@ try_to_link_1 (GstDecodeBin * decode_bin, GstElement * srcelement, GstPad * pad,
decode_bin->queues = g_list_append (decode_bin->queues, queue); decode_bin->queues = g_list_append (decode_bin->queues, queue);
g_signal_connect (G_OBJECT (queue), g_signal_connect (G_OBJECT (queue),
"overrun", G_CALLBACK (queue_filled_cb), decode_bin); "overrun", G_CALLBACK (queue_filled_cb), decode_bin);
g_signal_connect (G_OBJECT (queue),
"underrun", G_CALLBACK (queue_underrun_cb), decode_bin);
} }
/* The link worked, now figure out what it was that we connected */ /* The link worked, now figure out what it was that we connected */
@ -990,6 +993,48 @@ remove_element_chain (GstDecodeBin * decode_bin, GstPad * pad)
gst_bin_remove (GST_BIN (decode_bin), elem); gst_bin_remove (GST_BIN (decode_bin), elem);
} }
/* there are @bytes bytes in @queue, enlarge it
*
* Returns: new max number of bytes in @queue
*/
static guint
queue_enlarge (GstElement * queue, guint bytes, GstDecodeBin * decode_bin)
{
/* Increase the queue size by 1Mbyte if it is over 1Mb, else double its current limit
*/
if (bytes > 1024 * 1024)
bytes += 1024 * 1024;
else
bytes *= 2;
GST_DEBUG_OBJECT (decode_bin,
"increasing queue %s max-size-bytes to %d", GST_ELEMENT_NAME (queue),
bytes);
g_object_set (G_OBJECT (queue), "max-size-bytes", bytes, NULL);
return bytes;
}
/* this callback is called when our queues fills up or are empty
* We then check the status of all other queues to make sure we
* never have an empty and full queue at the same time since that
* would block dataflow. In the case of a filled queue, we make
* it larger.
*/
static void
queue_underrun_cb (GstElement * queue, GstDecodeBin * decode_bin)
{
/* FIXME: we don't really do anything here for now. Ideally we should
* see if some of the queues are filled and increase their values
* in that case.
* Note: be very carefull with thread safety here as this underrun
* signal is done from the streaming thread of queue srcpad which
* is different from the pad_added (where we add the queue to the
* list) and the overrun signals that are signalled from the
* demuxer thread.
*/
}
/* Make sure we don't have a full queue and empty queue situation */ /* Make sure we don't have a full queue and empty queue situation */
static void static void
queue_filled_cb (GstElement * queue, GstDecodeBin * decode_bin) queue_filled_cb (GstElement * queue, GstDecodeBin * decode_bin)
@ -998,43 +1043,48 @@ queue_filled_cb (GstElement * queue, GstDecodeBin * decode_bin)
gboolean increase = FALSE; gboolean increase = FALSE;
guint bytes; guint bytes;
/* get current byte level from the queue that is filled */
g_object_get (G_OBJECT (queue), "current-level-bytes", &bytes, NULL); g_object_get (G_OBJECT (queue), "current-level-bytes", &bytes, NULL);
GST_DEBUG_OBJECT (decode_bin, "One of the queues is full at %d bytes", bytes); GST_DEBUG_OBJECT (decode_bin, "One of the queues is full at %d bytes", bytes);
if (bytes > (20 * 1024 * 1024)) { /* we do not buffer more than 20Mb */
GST_WARNING_OBJECT (decode_bin, if (bytes > (20 * 1024 * 1024))
"Queue is bigger than 20Mbytes, something else is going wrong"); goto too_large;
return;
}
/* check all other queue to see if one is empty, in that case
* we need to enlarge @queue */
for (tmp = decode_bin->queues; tmp; tmp = g_list_next (tmp)) { for (tmp = decode_bin->queues; tmp; tmp = g_list_next (tmp)) {
GstElement *aqueue = GST_ELEMENT (tmp->data); GstElement *aqueue = GST_ELEMENT (tmp->data);
guint levelbytes = -1; guint levelbytes = -1;
if (aqueue != queue) { if (aqueue != queue) {
g_object_get (G_OBJECT (aqueue), g_object_get (G_OBJECT (aqueue), "current-level-bytes", &levelbytes,
"current-level-bytes", &levelbytes, NULL); NULL);
if (levelbytes == 0) { if (levelbytes == 0) {
/* yup, found an empty queue, we can stop the search and
* need to enlarge the queue */
increase = TRUE; increase = TRUE;
break;
} }
} }
} }
if (increase) { if (increase) {
/* /* enlarge @queue */
* Increase the queue size by 1Mbyte if it is over 1Mb, else double its current limit queue_enlarge (queue, bytes, decode_bin);
*/ } else {
if (bytes > 1024 * 1024)
bytes += 1024 * 1024;
else
bytes *= 2;
GST_DEBUG_OBJECT (decode_bin,
"One of the other queues is empty, increasing queue byte limit to %d",
bytes);
g_object_set (G_OBJECT (queue), "max-size-bytes", bytes, NULL);
} else
GST_DEBUG_OBJECT (decode_bin, GST_DEBUG_OBJECT (decode_bin,
"Queue is full but other queues are not empty, not doing anything"); "Queue is full but other queues are not empty, not doing anything");
}
return;
/* errors */
too_large:
{
GST_WARNING_OBJECT (decode_bin,
"Queue is bigger than 20Mbytes, something else is going wrong");
return;
}
} }
/* This function will be called when a dynamic pad is created on an element. /* This function will be called when a dynamic pad is created on an element.

View file

@ -496,21 +496,50 @@ check_queue (GstPad * pad, GstBuffer * data, gpointer user_data)
/* this signal will be fired when one of the queues with raw /* this signal will be fired when one of the queues with raw
* data is filled. This means that the group building stage is over * data is filled. This means that the group building stage is over
* and playback of the new queued group should start */ * and playback of the new queued group should start
*
* If this queue overruns we can potentially create a deadlock when:
*
* 1) the max-bytes is hit and
* 2) the min-time is not hit.
*
* We recover from this situation in the overrun callback by
* setting the max-bytes to unlimited if we see that there is
* a current-time-level (which means some sort of timestamping is
* done).
*/
static void static void
queue_overrun (GstElement * element, GstPlayBaseBin * play_base_bin) queue_overrun (GstElement * queue, GstPlayBaseBin * play_base_bin)
{ {
GST_DEBUG ("queue %s overrun", GST_ELEMENT_NAME (element)); GST_DEBUG ("queue %s overrun", GST_ELEMENT_NAME (queue));
/* if we're streaming, check if we had a deadlock with the
* max-bytes <> min-time thresholds */
if (play_base_bin->is_stream) {
guint64 time, min_time;
g_object_get (G_OBJECT (queue), "current-level-time", &time,
"min-threshold-time", &min_time, NULL);
GST_DEBUG_OBJECT (play_base_bin, "streaming mode, queue %s current %"
GST_TIME_FORMAT ", min %" GST_TIME_FORMAT,
GST_ELEMENT_NAME (queue),
GST_TIME_ARGS (time), GST_TIME_ARGS (min_time));
if (time != 0) {
/* queue knows about time, disable bytes checking. */
g_object_set (G_OBJECT (queue), "max-size-bytes", 0, NULL);
}
}
group_commit (play_base_bin, FALSE, group_commit (play_base_bin, FALSE,
GST_OBJECT_PARENT (GST_OBJECT_CAST (element)) == GST_OBJECT_PARENT (GST_OBJECT_CAST (queue)) ==
GST_OBJECT (play_base_bin->subtitle)); GST_OBJECT (play_base_bin->subtitle));
g_signal_handlers_disconnect_by_func (element, g_signal_handlers_disconnect_by_func (queue,
G_CALLBACK (queue_overrun), play_base_bin); G_CALLBACK (queue_overrun), play_base_bin);
/* We have disconnected this signal, remove the signal_id from the object /* We have disconnected this signal, remove the signal_id from the object
data */ data */
g_object_set_data (G_OBJECT (element), "signal_id", NULL); g_object_set_data (G_OBJECT (queue), "signal_id", NULL);
} }
/* Used for time-based buffering. */ /* Used for time-based buffering. */
@ -522,6 +551,8 @@ queue_threshold_reached (GstElement * queue, GstPlayBaseBin * play_base_bin)
GST_DEBUG ("Running"); GST_DEBUG ("Running");
/* play */ /* play */
g_signal_handlers_disconnect_by_func (queue,
G_CALLBACK (queue_threshold_reached), play_base_bin);
g_object_set (queue, "min-threshold-time", (guint64) 0, NULL); g_object_set (queue, "min-threshold-time", (guint64) 0, NULL);
data = g_object_get_data (G_OBJECT (queue), "probe"); data = g_object_get_data (G_OBJECT (queue), "probe");
@ -542,14 +573,41 @@ queue_threshold_reached (GstElement * queue, GstPlayBaseBin * play_base_bin)
} }
} }
/* this signal is only added when in streaming mode to catch underruns
*/
static void static void
queue_out_of_data (GstElement * queue, GstPlayBaseBin * play_base_bin) queue_out_of_data (GstElement * queue, GstPlayBaseBin * play_base_bin)
{ {
guint64 time, min_time;
guint bytes, max_bytes;
GST_DEBUG ("Underrun, re-caching"); GST_DEBUG ("Underrun, re-caching");
g_object_get (G_OBJECT (queue), "current-level-time", &time,
"current-level-bytes", &bytes,
"max-size-bytes", &max_bytes, "min-threshold-time", &min_time, NULL);
GST_DEBUG_OBJECT (play_base_bin, "streaming mode, queue %s current %"
GST_TIME_FORMAT ", min %" GST_TIME_FORMAT
", bytes %d",
GST_ELEMENT_NAME (queue),
GST_TIME_ARGS (time), GST_TIME_ARGS (min_time), bytes);
/* if the bytes in the queue represent time, we disable bytes
* overrun checking to not cause deadlocks.
*/
if (bytes && time != 0 && time < min_time) {
/* queue knows about time but is filled with bytes that do
* not represent min-threshold time, disable bytes checking so
* the queue can grow some more. */
g_object_set (G_OBJECT (queue), "max-size-bytes", 0, NULL);
}
/* On underrun, we want to temoprarily pause playback, set a "min-size" /* On underrun, we want to temoprarily pause playback, set a "min-size"
* threshold and wait for the running signal and then play again. Take * threshold and wait for the running signal and then play again. Take
* care of possible deadlocks and so on, */ * care of possible deadlocks and so on, */
g_signal_connect (G_OBJECT (queue), "running",
G_CALLBACK (queue_threshold_reached), play_base_bin);
g_object_set (queue, "min-threshold-time", g_object_set (queue, "min-threshold-time",
(guint64) play_base_bin->queue_threshold, NULL); (guint64) play_base_bin->queue_threshold, NULL);
@ -611,6 +669,19 @@ gen_preroll_element (GstPlayBaseBin * play_base_bin,
g_free (name); g_free (name);
g_free (padname); g_free (padname);
/* for buffering of raw data we ideally want to buffer a
* very small amount of buffers since the memory used by
* this raw data can be enormously huge.
*
* FIXME: we abuse this buffer to do network buffering since
* we can then easily do time-based buffering. The better
* solution would be to add a specific network queue right
* after the source that measures the datarate and scales this
* queue of encoded data instead.
*
* We use an upper limit of typically a few seconds here but
* cap in case no timestamps are set on the raw data (bad!).
*/
g_object_set (G_OBJECT (preroll), g_object_set (G_OBJECT (preroll),
"max-size-buffers", 0, "max-size-bytes", "max-size-buffers", 0, "max-size-bytes",
((type == GST_STREAM_TYPE_VIDEO) ? 25 : 1) * 1024 * 1024, ((type == GST_STREAM_TYPE_VIDEO) ? 25 : 1) * 1024 * 1024,
@ -641,9 +712,22 @@ gen_preroll_element (GstPlayBaseBin * play_base_bin,
g_object_set_data (G_OBJECT (preroll), "pbb", play_base_bin); g_object_set_data (G_OBJECT (preroll), "pbb", play_base_bin);
g_object_set_data (G_OBJECT (preroll), "probe", GINT_TO_POINTER (id)); g_object_set_data (G_OBJECT (preroll), "probe", GINT_TO_POINTER (id));
/*
* When we connect this queue, it will start running and immediatly
* fire an underrun when:
*
* 1) the max-bytes is hit
* 2) the min-time is not hit.
*
* We recover from this situation in the out_of_data callback by
* setting the max-bytes to unlimited if we see that there is
* a current-time-level (which means some sort of timestamping is
* done).
*/
g_signal_connect (G_OBJECT (preroll), "underrun", g_signal_connect (G_OBJECT (preroll), "underrun",
G_CALLBACK (queue_out_of_data), play_base_bin); G_CALLBACK (queue_out_of_data), play_base_bin);
} }
/* keep a ref to the signal id so that we can disconnect the signal callback /* keep a ref to the signal id so that we can disconnect the signal callback
* when we are done with the preroll */ * when we are done with the preroll */
g_object_set_data (G_OBJECT (preroll), "signal_id", GINT_TO_POINTER (sig)); g_object_set_data (G_OBJECT (preroll), "signal_id", GINT_TO_POINTER (sig));