adaptivedemux: Preroll streams before exposing them

To ensure that pads have caps when they are exposed, do
the exposing when all pending streams have prerolled an
output buffer, and only then EOS and remove any old pads.

Improves the switching sequence by making caps available
as soon as a pad appears.

With fixes from Seungha Yang <sh.yang@lge.com>

https://bugzilla.gnome.org/show_bug.cgi?id=758257
This commit is contained in:
Jan Schmidt 2017-01-07 12:12:05 +09:00
parent 9b778f7264
commit b2113f69c6
2 changed files with 180 additions and 31 deletions

View file

@ -198,6 +198,11 @@ struct _GstAdaptiveDemuxPrivate
GCond manifest_cond;
GMutex manifest_update_lock;
/* Lock and condition for prerolling streams before exposing */
GMutex preroll_lock;
GCond preroll_cond;
gint preroll_pending;
GMutex api_lock;
/* Protects demux and stream segment information
@ -242,8 +247,9 @@ static void gst_adaptive_demux_updates_loop (GstAdaptiveDemux * demux);
static void gst_adaptive_demux_stream_download_loop (GstAdaptiveDemuxStream *
stream);
static void gst_adaptive_demux_reset (GstAdaptiveDemux * demux);
static gboolean gst_adaptive_demux_expose_streams (GstAdaptiveDemux * demux,
static gboolean gst_adaptive_demux_prepare_streams (GstAdaptiveDemux * demux,
gboolean first_and_live);
static gboolean gst_adaptive_demux_expose_streams (GstAdaptiveDemux * demux);
static gboolean gst_adaptive_demux_is_live (GstAdaptiveDemux * demux);
static GstFlowReturn gst_adaptive_demux_stream_seek (GstAdaptiveDemux * demux,
GstAdaptiveDemuxStream * stream, gboolean forward, GstSeekFlags flags,
@ -270,7 +276,8 @@ static GstFlowReturn
gst_adaptive_demux_stream_push_event (GstAdaptiveDemuxStream * stream,
GstEvent * event);
static void gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux);
static void gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux,
gboolean start_preroll_streams);
static void gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux);
static GstFlowReturn gst_adaptive_demux_combine_flows (GstAdaptiveDemux *
demux);
@ -483,6 +490,9 @@ gst_adaptive_demux_init (GstAdaptiveDemux * demux,
g_mutex_init (&demux->priv->api_lock);
g_mutex_init (&demux->priv->segment_lock);
g_cond_init (&demux->priv->preroll_cond);
g_mutex_init (&demux->priv->preroll_lock);
pad_template =
gst_element_class_get_pad_template (GST_ELEMENT_CLASS (klass), "sink");
g_return_if_fail (pad_template != NULL);
@ -525,6 +535,9 @@ gst_adaptive_demux_finalize (GObject * object)
demux->realtime_clock = NULL;
}
g_cond_clear (&demux->priv->preroll_cond);
g_mutex_clear (&demux->priv->preroll_lock);
G_OBJECT_CLASS (parent_class)->finalize (object);
}
@ -684,9 +697,9 @@ gst_adaptive_demux_sink_event (GstPad * pad, GstObject * parent,
}
if (demux->next_streams) {
gst_adaptive_demux_expose_streams (demux,
gst_adaptive_demux_prepare_streams (demux,
gst_adaptive_demux_is_live (demux));
gst_adaptive_demux_start_tasks (demux);
gst_adaptive_demux_start_tasks (demux, TRUE);
if (gst_adaptive_demux_is_live (demux)) {
g_mutex_lock (&demux->priv->updates_timed_lock);
demux->priv->stop_updates_task = FALSE;
@ -775,6 +788,16 @@ gst_adaptive_demux_reset (GstAdaptiveDemux * demux)
gst_event_unref (eos);
g_list_free (demux->streams);
demux->streams = NULL;
if (demux->prepared_streams) {
g_list_free_full (demux->prepared_streams,
(GDestroyNotify) gst_adaptive_demux_stream_free);
demux->prepared_streams = NULL;
}
if (demux->next_streams) {
g_list_free_full (demux->next_streams,
(GDestroyNotify) gst_adaptive_demux_stream_free);
demux->next_streams = NULL;
}
if (old_streams) {
g_list_free_full (old_streams,
@ -872,10 +895,9 @@ gst_adaptive_demux_set_stream_struct_size (GstAdaptiveDemux * demux,
/* must be called with manifest_lock taken */
static gboolean
gst_adaptive_demux_expose_stream (GstAdaptiveDemux * demux,
gst_adaptive_demux_prepare_stream (GstAdaptiveDemux * demux,
GstAdaptiveDemuxStream * stream)
{
gboolean ret;
GstPad *pad = stream->pad;
gchar *name = gst_pad_get_name (pad);
GstEvent *event;
@ -907,16 +929,33 @@ gst_adaptive_demux_expose_stream (GstAdaptiveDemux * demux,
g_free (stream_id);
g_free (name);
GST_DEBUG_OBJECT (demux, "Adding srcpad %s:%s with caps %" GST_PTR_FORMAT,
GST_DEBUG_PAD_NAME (pad), stream->pending_caps);
GST_DEBUG_OBJECT (demux, "Preparing srcpad %s:%s", GST_DEBUG_PAD_NAME (pad));
stream->discont = TRUE;
return TRUE;
}
static gboolean
gst_adaptive_demux_expose_stream (GstAdaptiveDemux * demux,
GstAdaptiveDemuxStream * stream)
{
gboolean ret;
GstPad *pad = stream->pad;
GstCaps *caps;
if (stream->pending_caps) {
gst_pad_set_caps (pad, stream->pending_caps);
gst_caps_unref (stream->pending_caps);
caps = stream->pending_caps;
stream->pending_caps = NULL;
} else {
caps = gst_pad_get_current_caps (pad);
}
stream->discont = TRUE;
GST_DEBUG_OBJECT (demux, "Exposing srcpad %s:%s with caps %" GST_PTR_FORMAT,
GST_DEBUG_PAD_NAME (pad), caps);
if (caps)
gst_caps_unref (caps);
gst_object_ref (pad);
@ -959,17 +998,21 @@ gst_adaptive_demux_get_period_start_time (GstAdaptiveDemux * demux)
/* must be called with manifest_lock taken */
static gboolean
gst_adaptive_demux_expose_streams (GstAdaptiveDemux * demux,
gst_adaptive_demux_prepare_streams (GstAdaptiveDemux * demux,
gboolean first_and_live)
{
GList *iter;
GList *old_streams;
GstClockTime period_start, min_pts = GST_CLOCK_TIME_NONE;
g_return_val_if_fail (demux->next_streams != NULL, FALSE);
if (demux->prepared_streams != NULL) {
/* Old streams that were never exposed, due to a seek or so */
GST_FIXME_OBJECT (demux,
"Preparing new streams without cleaning up old ones!");
return FALSE;
}
old_streams = demux->streams;
demux->streams = demux->next_streams;
demux->prepared_streams = demux->next_streams;
demux->next_streams = NULL;
if (!demux->running) {
@ -977,10 +1020,12 @@ gst_adaptive_demux_expose_streams (GstAdaptiveDemux * demux,
return TRUE;
}
for (iter = demux->streams; iter; iter = g_list_next (iter)) {
for (iter = demux->prepared_streams; iter; iter = g_list_next (iter)) {
GstAdaptiveDemuxStream *stream = iter->data;
if (!gst_adaptive_demux_expose_stream (demux,
stream->do_block = TRUE;
if (!gst_adaptive_demux_prepare_stream (demux,
GST_ADAPTIVE_DEMUX_STREAM_CAST (stream))) {
/* TODO act on error */
}
@ -1011,7 +1056,7 @@ gst_adaptive_demux_expose_streams (GstAdaptiveDemux * demux,
GST_SEEK_TYPE_NONE, -1, NULL);
}
for (iter = demux->streams; iter; iter = g_list_next (iter)) {
for (iter = demux->prepared_streams; iter; iter = g_list_next (iter)) {
GstAdaptiveDemuxStream *stream = iter->data;
GstClockTime offset;
@ -1091,8 +1136,36 @@ gst_adaptive_demux_expose_streams (GstAdaptiveDemux * demux,
stream->pending_segment = gst_event_new_segment (&stream->segment);
gst_event_set_seqnum (stream->pending_segment, demux->priv->segment_seqnum);
GST_DEBUG ("Prepared segment %" GST_SEGMENT_FORMAT " for stream %p",
&stream->segment, stream);
}
return TRUE;
}
static gboolean
gst_adaptive_demux_expose_streams (GstAdaptiveDemux * demux)
{
GList *iter;
GList *old_streams;
g_return_val_if_fail (demux->prepared_streams != NULL, FALSE);
old_streams = demux->streams;
demux->streams = demux->prepared_streams;
demux->prepared_streams = NULL;
for (iter = demux->streams; iter; iter = g_list_next (iter)) {
GstAdaptiveDemuxStream *stream = iter->data;
if (!gst_adaptive_demux_expose_stream (demux,
GST_ADAPTIVE_DEMUX_STREAM_CAST (stream))) {
/* TODO act on error */
}
}
demux->priv->preroll_pending = 0;
GST_MANIFEST_UNLOCK (demux);
gst_element_no_more_pads (GST_ELEMENT_CAST (demux));
GST_MANIFEST_LOCK (demux);
@ -1142,6 +1215,14 @@ gst_adaptive_demux_expose_streams (GstAdaptiveDemux * demux,
g_list_concat (demux->priv->old_streams, old_streams);
}
/* Unblock after removing oldstreams */
for (iter = demux->streams; iter; iter = g_list_next (iter)) {
GstAdaptiveDemuxStream *stream = iter->data;
stream->do_block = FALSE;
}
GST_DEBUG_OBJECT (demux, "All streams are exposed");
return TRUE;
}
@ -1166,6 +1247,11 @@ gst_adaptive_demux_stream_new (GstAdaptiveDemux * demux, GstPad * pad)
g_malloc0 (sizeof (guint64) * NUM_LOOKBACK_FRAGMENTS);
gst_pad_set_element_private (pad, stream);
g_mutex_lock (&demux->priv->preroll_lock);
stream->do_block = TRUE;
demux->priv->preroll_pending++;
g_mutex_unlock (&demux->priv->preroll_lock);
gst_pad_set_query_function (pad,
GST_DEBUG_FUNCPTR (gst_adaptive_demux_src_query));
gst_pad_set_event_function (pad,
@ -1509,7 +1595,10 @@ gst_adaptive_demux_handle_seek_event (GstAdaptiveDemux * demux, GstPad * pad,
}
if (demux->next_streams) {
gst_adaptive_demux_expose_streams (demux, FALSE);
/* If the seek generated new streams, get them
* to preroll */
gst_adaptive_demux_prepare_streams (demux, FALSE);
gst_adaptive_demux_start_tasks (demux, TRUE);
} else {
GList *iter;
GstClockTime period_start =
@ -1540,10 +1629,11 @@ gst_adaptive_demux_handle_seek_event (GstAdaptiveDemux * demux, GstPad * pad,
}
GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
/* Restart the demux */
gst_adaptive_demux_start_tasks (demux, FALSE);
}
/* Restart the demux */
gst_adaptive_demux_start_tasks (demux);
if (gst_adaptive_demux_is_live (demux)) {
g_mutex_lock (&demux->priv->updates_timed_lock);
demux->priv->stop_updates_task = FALSE;
@ -1730,7 +1820,8 @@ gst_adaptive_demux_src_query (GstPad * pad, GstObject * parent,
/* must be called with manifest_lock taken */
static void
gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux)
gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux,
gboolean start_preroll_streams)
{
GList *iter;
@ -1740,12 +1831,17 @@ gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux)
}
GST_INFO_OBJECT (demux, "Starting streams' tasks");
for (iter = demux->streams; iter; iter = g_list_next (iter)) {
iter = start_preroll_streams ? demux->prepared_streams : demux->streams;
for (; iter; iter = g_list_next (iter)) {
GstAdaptiveDemuxStream *stream = iter->data;
g_mutex_lock (&stream->fragment_download_lock);
stream->cancelled = FALSE;
g_mutex_unlock (&stream->fragment_download_lock);
if (!start_preroll_streams) {
g_mutex_lock (&stream->fragment_download_lock);
stream->cancelled = FALSE;
g_mutex_unlock (&stream->fragment_download_lock);
}
stream->last_ret = GST_FLOW_OK;
gst_task_start (stream->download_task);
@ -1770,8 +1866,6 @@ gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux)
g_cond_signal (&demux->priv->updates_timed_cond);
g_mutex_unlock (&demux->priv->updates_timed_lock);
gst_uri_downloader_cancel (demux->downloader);
GST_LOG_OBJECT (demux, "Stopping tasks");
for (iter = demux->streams; iter; iter = g_list_next (iter)) {
@ -1784,6 +1878,12 @@ gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux)
g_mutex_unlock (&stream->fragment_download_lock);
}
g_mutex_lock (&demux->priv->preroll_lock);
g_cond_broadcast (&demux->priv->preroll_cond);
g_mutex_unlock (&demux->priv->preroll_lock);
gst_uri_downloader_cancel (demux->downloader);
g_mutex_lock (&demux->priv->manifest_update_lock);
g_cond_broadcast (&demux->priv->manifest_cond);
g_mutex_unlock (&demux->priv->manifest_update_lock);
@ -1973,6 +2073,21 @@ gst_adaptive_demux_combine_flows (GstAdaptiveDemux * demux)
return GST_FLOW_OK;
}
/* Called with preroll_lock */
static void
gst_adaptive_demux_handle_preroll (GstAdaptiveDemux * demux,
GstAdaptiveDemuxStream * stream)
{
demux->priv->preroll_pending--;
if (demux->priv->preroll_pending == 0) {
/* That was the last one, time to release all streams
* and expose them */
GST_DEBUG_OBJECT (demux, "All streams prerolled. exposing");
gst_adaptive_demux_expose_streams (demux);
g_cond_broadcast (&demux->priv->preroll_cond);
}
}
/* must be called with manifest_lock taken.
* Temporarily releases manifest_lock
*/
@ -2043,6 +2158,35 @@ gst_adaptive_demux_stream_push_buffer (GstAdaptiveDemuxStream * stream,
gst_caps_unref (stream->pending_caps);
stream->pending_caps = NULL;
}
if (stream->do_block) {
g_mutex_lock (&demux->priv->preroll_lock);
/* If we are preroll state, set caps in here */
if (pending_caps) {
gst_pad_push_event (stream->pad, pending_caps);
pending_caps = NULL;
}
gst_adaptive_demux_handle_preroll (demux, stream);
GST_MANIFEST_UNLOCK (demux);
while (stream->do_block && !stream->cancelled) {
GST_LOG_OBJECT (demux, "Stream %p sleeping for preroll", stream);
g_cond_wait (&demux->priv->preroll_cond, &demux->priv->preroll_lock);
}
if (stream->cancelled) {
GST_LOG_OBJECT (demux, "stream %p cancelled", stream);
gst_buffer_unref (buffer);
g_mutex_unlock (&demux->priv->preroll_lock);
return GST_FLOW_FLUSHING;
}
g_mutex_unlock (&demux->priv->preroll_lock);
GST_MANIFEST_LOCK (demux);
}
if (G_UNLIKELY (stream->pending_segment)) {
GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
pending_segment = stream->pending_segment;
@ -2098,6 +2242,8 @@ gst_adaptive_demux_stream_push_buffer (GstAdaptiveDemuxStream * stream,
pending_events = g_list_delete_link (pending_events, pending_events);
}
/* Wait for preroll if blocking */
ret = gst_pad_push (stream->pad, buffer);
GST_MANIFEST_LOCK (demux);
@ -3786,8 +3932,8 @@ gst_adaptive_demux_stream_advance_fragment_unlocked (GstAdaptiveDemux * demux,
if (can_expose) {
GST_DEBUG_OBJECT (demux, "Subclass wants new pads "
"to do bitrate switching");
gst_adaptive_demux_expose_streams (demux, FALSE);
gst_adaptive_demux_start_tasks (demux);
gst_adaptive_demux_prepare_streams (demux, FALSE);
gst_adaptive_demux_start_tasks (demux, TRUE);
} else {
GST_LOG_OBJECT (demux, "Not switching yet - ongoing downloads");
}
@ -3946,8 +4092,8 @@ gst_adaptive_demux_advance_period (GstAdaptiveDemux * demux)
GST_DEBUG_OBJECT (demux, "Advancing to next period");
klass->advance_period (demux);
gst_adaptive_demux_expose_streams (demux, FALSE);
gst_adaptive_demux_start_tasks (demux);
gst_adaptive_demux_prepare_streams (demux, FALSE);
gst_adaptive_demux_start_tasks (demux, TRUE);
}
/**

View file

@ -185,6 +185,8 @@ struct _GstAdaptiveDemuxStream
/* TODO check if used */
gboolean eos;
gboolean do_block; /* TRUE if stream should block on preroll */
};
/**
@ -207,6 +209,7 @@ struct _GstAdaptiveDemux
GstUriDownloader *downloader;
GList *streams;
GList *prepared_streams;
GList *next_streams;
GstSegment segment;