gst/playback/gstplaybasebin.c: Refactor handling of overrun detection.

Original commit message from CVS:
* gst/playback/gstplaybasebin.c: (group_commit),
(queue_deadlock_check), (queue_overrun), (queue_threshold_reached),
(queue_out_of_data), (gen_preroll_element),
(preroll_remove_overrun), (probe_triggered):
Refactor handling of overrun detection.
Separate handling of group completion and deadlock detection when doing
network buffering. This should fix some deadlocks that were not detected
because the group was completed.
Add more comments, improve debugging.
This commit is contained in:
Wim Taymans 2006-09-21 07:01:48 +00:00
parent 09c389ee2a
commit 4c6f5e766a
2 changed files with 149 additions and 102 deletions

View file

@ -1,3 +1,15 @@
2006-09-21 Wim Taymans <wim@fluendo.com>
* gst/playback/gstplaybasebin.c: (group_commit),
(queue_deadlock_check), (queue_overrun), (queue_threshold_reached),
(queue_out_of_data), (gen_preroll_element),
(preroll_remove_overrun), (probe_triggered):
Refactor handling of overrun detection.
Separate handling of group completion and deadlock detection when doing
network buffering. This should fix some deadlocks that were not detected
because the group was completed.
Add more comments, improve debugging.
2006-09-21 Wim Taymans <wim@fluendo.com>
* tests/check/elements/gdpdepay.c: (GST_START_TEST):

View file

@ -31,7 +31,7 @@ GST_DEBUG_CATEGORY_STATIC (gst_play_base_bin_debug);
#define GST_CAT_DEFAULT gst_play_base_bin_debug
#define DEFAULT_QUEUE_SIZE (3 * GST_SECOND)
#define DEFAULT_QUEUE_MIN_THRESHOLD ((DEFAULT_QUEUE_SIZE * 30)/100)
#define DEFAULT_QUEUE_MIN_THRESHOLD ((DEFAULT_QUEUE_SIZE * 30) / 100)
#define DEFAULT_QUEUE_THRESHOLD ((DEFAULT_QUEUE_SIZE * 95) / 100)
#define GROUP_LOCK(pbb) g_mutex_lock (pbb->group_lock)
@ -72,6 +72,8 @@ static GstStateChangeReturn gst_play_base_bin_change_state (GstElement *
const GList *gst_play_base_bin_get_streaminfo (GstPlayBaseBin * play_base_bin);
const GValueArray *gst_play_base_bin_get_streaminfo_value_array (GstPlayBaseBin
* play_base_bin);
static void preroll_remove_overrun (GstElement * element,
GstPlayBaseBin * play_base_bin);
static gboolean prepare_output (GstPlayBaseBin * play_base_bin);
static void set_active_source (GstPlayBaseBin * play_base_bin,
@ -420,20 +422,11 @@ group_commit (GstPlayBaseBin * play_base_bin, gboolean fatal, gboolean subtitle)
* elements at this stage. */
for (n = 0; n < NUM_TYPES; n++) {
GstElement *element = group->type[n].preroll;
guint sig_id;
if (!element)
continue;
sig_id =
GPOINTER_TO_INT (g_object_get_data (G_OBJECT (element),
"signal_id"));
if (sig_id) {
GST_LOG_OBJECT (play_base_bin, "removing preroll signal %s",
GST_ELEMENT_NAME (element));
g_signal_handler_disconnect (G_OBJECT (element), sig_id);
}
preroll_remove_overrun (element, play_base_bin);
}
} else {
/* this is a special subtitle bin, we don't commit the group but
@ -520,68 +513,96 @@ check_queue (GstPad * pad, GstBuffer * data, gpointer user_data)
return TRUE;
}
/* this signal will be fired when one of the queues with raw
* data is filled. This means that the group building stage is over
* and playback of the new queued group should start
*
* If this queue overruns we can potentially create a deadlock when:
/* If a queue overruns and we are buffer in streaming mode (we have a min-time)
* we can potentially create a deadlock when:
*
* 1) the max-bytes is hit and
* 2) the min-time is not hit.
*
* We recover from this situation in the overrun callback by
* We recover from this situation in this callback by
* setting the max-bytes to unlimited if we see that there is
* a current-time-level (which means some sort of timestamping is
* done).
*/
static void
queue_deadlock_check (GstElement * queue, GstPlayBaseBin * play_base_bin)
{
guint64 time, min_time;
guint bytes;
GST_DEBUG_OBJECT (play_base_bin, "overrun signal received from queue %s",
GST_ELEMENT_NAME (queue));
/* figure out where we are */
g_object_get (G_OBJECT (queue), "current-level-time", &time,
"current-level-bytes", &bytes, "min-threshold-time", &min_time, NULL);
GST_DEBUG_OBJECT (play_base_bin, "streaming mode, queue %s current %"
GST_TIME_FORMAT ", min %" GST_TIME_FORMAT
", bytes %d", GST_ELEMENT_NAME (queue),
GST_TIME_ARGS (time), GST_TIME_ARGS (min_time), bytes);
/* if the bytes in the queue represent time, we disable bytes
* overrun checking to not cause deadlocks.
*/
if (bytes && time != 0 && time < min_time) {
GST_DEBUG_OBJECT (play_base_bin,
"possible deadlock found, removing byte limit");
/* queue knows about time but is filled with bytes that do
* not represent min-threshold time, disable bytes checking so
* the queue can grow some more. */
g_object_set (G_OBJECT (queue), "max-size-bytes", 0, NULL);
/* bytes limit is removed, we cannot deadlock anymore */
g_signal_handlers_disconnect_by_func (queue,
G_CALLBACK (queue_deadlock_check), play_base_bin);
} else {
GST_DEBUG_OBJECT (play_base_bin, "no deadlock");
}
}
/* this signal will be fired when one of the queues with raw
* data is filled. This means that the group building stage is over
* and playback of the new queued group should start. This is a rather unusual
* situation because normally the group is commited when the "no_more_pads"
* signal is fired.
*/
static void
queue_overrun (GstElement * queue, GstPlayBaseBin * play_base_bin)
{
GST_DEBUG ("queue %s overrun", GST_ELEMENT_NAME (queue));
GST_DEBUG_OBJECT (play_base_bin, "queue %s overrun",
GST_ELEMENT_NAME (queue));
/* if we're streaming, check if we had a deadlock with the
* max-bytes <> min-time thresholds */
if (play_base_bin->is_stream) {
guint64 time, min_time;
g_object_get (G_OBJECT (queue), "current-level-time", &time,
"min-threshold-time", &min_time, NULL);
GST_DEBUG_OBJECT (play_base_bin, "streaming mode, queue %s current %"
GST_TIME_FORMAT ", min %" GST_TIME_FORMAT,
GST_ELEMENT_NAME (queue),
GST_TIME_ARGS (time), GST_TIME_ARGS (min_time));
if (time != 0) {
/* queue knows about time, disable bytes checking. */
g_object_set (G_OBJECT (queue), "max-size-bytes", 0, NULL);
}
}
preroll_remove_overrun (queue, play_base_bin);
group_commit (play_base_bin, FALSE,
GST_OBJECT_PARENT (GST_OBJECT_CAST (queue)) ==
GST_OBJECT (play_base_bin->subtitle));
g_signal_handlers_disconnect_by_func (queue,
G_CALLBACK (queue_overrun), play_base_bin);
/* We have disconnected this signal, remove the signal_id from the object
data */
g_object_set_data (G_OBJECT (queue), "signal_id", NULL);
}
/* Used for time-based buffering. */
/* Used for time-based buffering in streaming mode and is called when a queue
* emits the running signal. This means that the high watermark threshold is
* reached and the buffering is completed. */
static void
queue_threshold_reached (GstElement * queue, GstPlayBaseBin * play_base_bin)
{
gpointer data;
GST_DEBUG ("Running");
GST_DEBUG_OBJECT (play_base_bin, "running signal received from queue %s",
GST_ELEMENT_NAME (queue));
/* play */
/* we disconnect the signal so that we don't get called for every buffer. */
g_signal_handlers_disconnect_by_func (queue,
G_CALLBACK (queue_threshold_reached), play_base_bin);
/* now place the limits at the low threshold. When we hit this limit, the
* underrun signal will be called. The underrun signal is always connected. */
g_object_set (queue, "min-threshold-time", play_base_bin->queue_min_threshold,
NULL);
/* we remove the probe now because we don't need it anymore to give progress
* about the buffering. */
data = g_object_get_data (G_OBJECT (queue), "probe");
if (data) {
GstPad *sinkpad;
@ -596,6 +617,9 @@ queue_threshold_reached (GstElement * queue, GstPlayBaseBin * play_base_bin)
gst_object_unref (sinkpad);
}
/* we post a 100% buffering message to notify the app that buffering is
* completed and playback can start/continue */
fill_buffer (play_base_bin, 100);
}
@ -604,40 +628,22 @@ queue_threshold_reached (GstElement * queue, GstPlayBaseBin * play_base_bin)
static void
queue_out_of_data (GstElement * queue, GstPlayBaseBin * play_base_bin)
{
guint64 time, min_time;
guint bytes, max_bytes;
GST_DEBUG ("Underrun, re-caching");
g_object_get (G_OBJECT (queue), "current-level-time", &time,
"current-level-bytes", &bytes,
"max-size-bytes", &max_bytes, "min-threshold-time", &min_time, NULL);
GST_DEBUG_OBJECT (play_base_bin, "streaming mode, queue %s current %"
GST_TIME_FORMAT ", min %" GST_TIME_FORMAT
", bytes %d",
GST_ELEMENT_NAME (queue),
GST_TIME_ARGS (time), GST_TIME_ARGS (min_time), bytes);
/* if the bytes in the queue represent time, we disable bytes
* overrun checking to not cause deadlocks.
*/
if (bytes && time != 0 && time < min_time) {
/* queue knows about time but is filled with bytes that do
* not represent min-threshold time, disable bytes checking so
* the queue can grow some more. */
g_object_set (G_OBJECT (queue), "max-size-bytes", 0, NULL);
}
GST_DEBUG_OBJECT (play_base_bin, "underrun signal received from queue %s",
GST_ELEMENT_NAME (queue));
/* On underrun, we want to temoprarily pause playback, set a "min-size"
* threshold and wait for the running signal and then play again. Take
* care of possible deadlocks and so on, */
* threshold and wait for the running signal and then play again.
*
* This signal could never be called because the queue max-size limits are set
* too low. We take care of this possible deadlock in the the overrun signal
* handler. */
g_signal_connect (G_OBJECT (queue), "running",
G_CALLBACK (queue_threshold_reached), play_base_bin);
g_object_set (queue, "min-threshold-time",
(guint64) play_base_bin->queue_threshold, NULL);
/* re-connect probe */
/* re-connect probe, this will five feedback about the percentage that we
* buffered and is posted in the BUFFERING message. */
if (!g_object_get_data (G_OBJECT (queue), "probe")) {
GstPad *sinkpad;
guint id;
@ -670,7 +676,7 @@ gen_preroll_element (GstPlayBaseBin * play_base_bin,
GstElement *selector, *preroll;
gchar *name, *padname;
const gchar *prename;
guint sig;
guint overrun_sig;
GstPad *preroll_pad;
if (type == GST_STREAM_TYPE_VIDEO)
@ -699,23 +705,41 @@ gen_preroll_element (GstPlayBaseBin * play_base_bin,
* very small amount of buffers since the memory used by
* this raw data can be enormously huge.
*
* We use an upper limit of typically a few seconds here but
* cap in case no timestamps are set on the raw data (bad!).
*
* FIXME: we abuse this buffer to do network buffering since
* we can then easily do time-based buffering. The better
* solution would be to add a specific network queue right
* after the source that measures the datarate and scales this
* queue of encoded data instead.
*
* We use an upper limit of typically a few seconds here but
* cap in case no timestamps are set on the raw data (bad!).
*/
g_object_set (G_OBJECT (preroll),
"max-size-buffers", 0, "max-size-bytes",
((type == GST_STREAM_TYPE_VIDEO) ? 25 : 1) * 1024 * 1024,
"max-size-time", play_base_bin->queue_size, NULL);
sig = g_signal_connect (G_OBJECT (preroll), "overrun",
/* the overrun signal is always attached and serves two pusposes:
*
* 1) when we are building a group and the overrun is called, we commit the
* group. The reason being that if we fill the entire queue without a
* normal group commit (with _no_more_pads()) we can assume the
* audio/video is completely wacked or the element just does not know when
* it is ready with all the pads (mpeg).
* 2) When we are doing network buffering, we keep track of low/high
* watermarks in the queue. It is possible that we set the high watermark
* higher than the max-size limits to trigger an overrun. In this case we
* will never get a running signal but we can use the overrun signal to
* detect this deadlock and correct it.
*/
overrun_sig = g_signal_connect (G_OBJECT (preroll), "overrun",
G_CALLBACK (queue_overrun), play_base_bin);
/* keep a ref to the signal id so that we can disconnect the signal callback
* when we are done with the preroll */
g_object_set_data (G_OBJECT (preroll), "overrun_signal_id",
GINT_TO_POINTER (overrun_sig));
if (play_base_bin->is_stream &&
((type == GST_STREAM_TYPE_VIDEO &&
group->type[GST_STREAM_TYPE_AUDIO - 1].npads == 0) ||
@ -724,10 +748,10 @@ gen_preroll_element (GstPlayBaseBin * play_base_bin,
GstPad *sinkpad;
guint id;
g_signal_connect (G_OBJECT (preroll), "running",
G_CALLBACK (queue_threshold_reached), play_base_bin);
g_object_set (G_OBJECT (preroll),
"min-threshold-time", (guint64) play_base_bin->queue_threshold, NULL);
/* catch deadlocks when we are network buffering in time but the max-limit
* in bytes is hit. */
g_signal_connect (G_OBJECT (preroll), "overrun",
G_CALLBACK (queue_deadlock_check), play_base_bin);
/* give updates on queue size */
sinkpad = gst_element_get_pad (preroll, "sink");
@ -738,28 +762,14 @@ gen_preroll_element (GstPlayBaseBin * play_base_bin,
g_object_set_data (G_OBJECT (preroll), "pbb", play_base_bin);
g_object_set_data (G_OBJECT (preroll), "probe", GINT_TO_POINTER (id));
/*
* When we connect this queue, it will start running and immediatly
* fire an underrun when:
*
* 1) the max-bytes is hit
* 2) the min-time is not hit.
*
* We recover from this situation in the out_of_data callback by
* setting the max-bytes to unlimited if we see that there is
* a current-time-level (which means some sort of timestamping is
* done).
*/
/* When we connect this queue, it will start running and immediatly
* fire an underrun. */
g_signal_connect (G_OBJECT (preroll), "underrun",
G_CALLBACK (queue_out_of_data), play_base_bin);
}
/* keep a ref to the signal id so that we can disconnect the signal callback
* when we are done with the preroll */
g_object_set_data (G_OBJECT (preroll), "signal_id", GINT_TO_POINTER (sig));
/* listen for EOS */
preroll_pad = gst_element_get_pad (preroll, "src");
/* listen for EOS */
gst_pad_add_event_probe (preroll_pad, G_CALLBACK (probe_triggered), info);
gst_object_unref (preroll_pad);
@ -792,6 +802,23 @@ gen_preroll_element (GstPlayBaseBin * play_base_bin,
gst_object_unref (selector);
}
static void
preroll_remove_overrun (GstElement * element, GstPlayBaseBin * play_base_bin)
{
guint overrun_sig;
GObject *obj = G_OBJECT (element);
overrun_sig = GPOINTER_TO_INT (g_object_get_data (obj, "overrun_signal_id"));
if (overrun_sig) {
GST_LOG_OBJECT (play_base_bin, "removing preroll signal %s",
GST_ELEMENT_NAME (element));
g_signal_handler_disconnect (obj, overrun_sig);
/* We have disconnected this signal, remove the signal_id from the object
* data */
g_object_set_data (obj, "overrun_signal_id", NULL);
}
}
static void
remove_groups (GstPlayBaseBin * play_base_bin)
{
@ -918,19 +945,27 @@ probe_triggered (GstPad * pad, GstEvent * event, gpointer user_data)
{
GstPlayBaseGroup *group;
GstPlayBaseBin *play_base_bin;
GstStreamInfo *info = GST_STREAM_INFO (user_data);
GstStreamInfo *info;
gboolean res;
GstEventType type;
type = GST_EVENT_TYPE (event);
GST_DEBUG ("probe triggered, (%d) %s", type, gst_event_type_get_name (type));
/* we only care about EOS */
if (type != GST_EVENT_EOS)
return TRUE;
info = GST_STREAM_INFO (user_data);
group = (GstPlayBaseGroup *) g_object_get_data (G_OBJECT (info), "group");
play_base_bin = group->bin;
GST_DEBUG ("probe triggered");
if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
if (type == GST_EVENT_EOS) {
gint num_groups = 0;
gboolean have_left;
GST_DEBUG ("probe got EOS in group %p", group);
GST_DEBUG_OBJECT (play_base_bin, "probe got EOS in group %p", group);
GROUP_LOCK (play_base_bin);