adaptivedemux: fix seeking that just updates stop position

Fixed adaptivedemux seeking without flushing that just wants
to update stop position. This required protecting the segment
variables with a new mutex so that the seeking thread and the
download threads could safely manipulate the segment and
events related to it.

This contention is only locked/unlocked when starting a new
download, when the first fragment of a segment is received and
when seeking so, hopefully, it won't damage performance.
This commit is contained in:
Thiago Santos 2016-01-07 15:21:22 -03:00
parent eaace4922c
commit 16a2f7f5c0

View file

@ -144,6 +144,10 @@ GST_DEBUG_CATEGORY (adaptivedemux_debug);
#define GST_API_LOCK(d) g_mutex_lock (GST_API_GET_LOCK (d));
#define GST_API_UNLOCK(d) g_mutex_unlock (GST_API_GET_LOCK (d));
#define GST_ADAPTIVE_DEMUX_SEGMENT_GET_LOCK(d) (&GST_ADAPTIVE_DEMUX_CAST(d)->priv->segment_lock)
#define GST_ADAPTIVE_DEMUX_SEGMENT_LOCK(d) g_mutex_lock (GST_ADAPTIVE_DEMUX_SEGMENT_GET_LOCK (d))
#define GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK(d) g_mutex_unlock (GST_ADAPTIVE_DEMUX_SEGMENT_GET_LOCK (d))
enum
{
PROP_0,
@ -188,6 +192,12 @@ struct _GstAdaptiveDemuxPrivate
GMutex manifest_update_lock;
GMutex api_lock;
/* Protects demux and stream segment information
* Needed because seeks can update segment information
* without needing to stop tasks when they just want to
* update the segment boundaries */
GMutex segment_lock;
};
static GstBinClass *parent_class = NULL;
@ -415,6 +425,7 @@ gst_adaptive_demux_init (GstAdaptiveDemux * demux,
g_rec_mutex_init (&demux->priv->manifest_lock);
g_mutex_init (&demux->priv->api_lock);
g_mutex_init (&demux->priv->segment_lock);
pad_template =
gst_element_class_get_pad_template (GST_ELEMENT_CLASS (klass), "sink");
@ -452,6 +463,7 @@ gst_adaptive_demux_finalize (GObject * object)
g_rec_mutex_clear (&priv->updates_lock);
g_rec_mutex_clear (&demux->priv->manifest_lock);
g_mutex_clear (&demux->priv->api_lock);
g_mutex_clear (&demux->priv->segment_lock);
G_OBJECT_CLASS (parent_class)->finalize (object);
}
@ -1247,9 +1259,6 @@ gst_adaptive_demux_src_event (GstPad * pad, GstObject * parent,
/* have a backup in case seek fails */
gst_segment_copy_into (&demux->segment, &oldsegment);
gst_segment_do_seek (&demux->segment, rate, format, flags, start_type,
start, stop_type, stop, &update);
if (flags & GST_SEEK_FLAG_FLUSH) {
GstEvent *fevent;
@ -1257,8 +1266,27 @@ gst_adaptive_demux_src_event (GstPad * pad, GstObject * parent,
fevent = gst_event_new_flush_start ();
gst_event_set_seqnum (fevent, seqnum);
gst_adaptive_demux_push_src_event (demux, fevent);
gst_adaptive_demux_stop_tasks (demux);
} else if ((rate > 0 && start_type != GST_SEEK_TYPE_NONE) ||
(rate < 0 && stop_type != GST_SEEK_TYPE_NONE)) {
gst_adaptive_demux_stop_tasks (demux);
}
gst_adaptive_demux_stop_tasks (demux);
GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
gst_segment_do_seek (&demux->segment, rate, format, flags, start_type,
start, stop_type, stop, &update);
/* FIXME - this seems unatural, do_seek() is updating base when we
* only want the start/stop position to change, maybe do_seek() needs
* some fixing? */
if (!(flags & GST_SEEK_FLAG_FLUSH) && ((rate > 0
&& start_type == GST_SEEK_TYPE_NONE) || (rate < 0
&& stop_type == GST_SEEK_TYPE_NONE))) {
demux->segment.base = oldsegment.base;
}
GST_DEBUG_OBJECT (demux, "Seeking to segment %" GST_SEGMENT_FORMAT,
&demux->segment);
@ -1270,6 +1298,7 @@ gst_adaptive_demux_src_event (GstPad * pad, GstObject * parent,
} else {
demux->priv->segment_seqnum = seqnum;
}
GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
if (flags & GST_SEEK_FLAG_FLUSH) {
GstEvent *fevent;
@ -1287,6 +1316,7 @@ gst_adaptive_demux_src_event (GstPad * pad, GstObject * parent,
GstClockTime period_start =
gst_adaptive_demux_get_period_start_time (demux);
GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
for (iter = demux->streams; iter; iter = g_list_next (iter)) {
GstAdaptiveDemuxStream *stream = iter->data;
GstEvent *seg_evt;
@ -1307,6 +1337,8 @@ gst_adaptive_demux_src_event (GstPad * pad, GstObject * parent,
gst_event_replace (&stream->pending_segment, seg_evt);
gst_event_unref (seg_evt);
}
GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
}
/* Restart the demux */
@ -1675,6 +1707,7 @@ gst_adaptive_demux_stream_push_buffer (GstAdaptiveDemuxStream * stream,
GstClockTime period_start =
gst_adaptive_demux_get_period_start_time (demux);
GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
if (demux->segment.rate < 0)
/* Set DISCONT flag for every first buffer in reverse playback mode
* as each fragment for its own has to be reversed */
@ -1694,6 +1727,8 @@ gst_adaptive_demux_stream_push_buffer (GstAdaptiveDemuxStream * stream,
demux->segment.position =
stream->segment.position - offset + period_start;
}
GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
GST_LOG_OBJECT (stream->pad,
"Going to push buffer with PTS %" GST_TIME_FORMAT,
GST_TIME_ARGS (GST_BUFFER_PTS (buffer)));
@ -1723,8 +1758,10 @@ gst_adaptive_demux_stream_push_buffer (GstAdaptiveDemuxStream * stream,
stream->pending_caps = NULL;
}
if (G_UNLIKELY (stream->pending_segment)) {
GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
pending_segment = stream->pending_segment;
stream->pending_segment = NULL;
GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
}
if (G_UNLIKELY (stream->pending_tags || stream->bitrate_changed)) {
GstTagList *tags = stream->pending_tags;
@ -1864,6 +1901,7 @@ _src_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
GST_TIME_ARGS (GST_BUFFER_PTS (buffer)));
if (GST_BUFFER_PTS_IS_VALID (buffer)) {
GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
stream->segment.position = GST_BUFFER_PTS (buffer);
/* Convert from position inside the stream's segment to the demuxer's
@ -1872,6 +1910,7 @@ _src_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
demux->segment.position)
demux->segment.position =
stream->segment.position - offset + period_start;
GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
}
} else {
@ -2563,9 +2602,11 @@ gst_adaptive_demux_stream_download_loop (GstAdaptiveDemuxStream * stream)
g_mutex_unlock (&stream->fragment_download_lock);
/* Check if we're done with our segment */
GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
if (demux->segment.rate > 0) {
if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop)
&& stream->segment.position >= stream->segment.stop) {
GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
ret = GST_FLOW_EOS;
gst_task_stop (stream->download_task);
goto end_of_manifest;
@ -2573,11 +2614,13 @@ gst_adaptive_demux_stream_download_loop (GstAdaptiveDemuxStream * stream)
} else {
if (GST_CLOCK_TIME_IS_VALID (demux->segment.start)
&& stream->segment.position <= stream->segment.start) {
GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
ret = GST_FLOW_EOS;
gst_task_stop (stream->download_task);
goto end_of_manifest;
}
}
GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
/* Cleanup old streams if any */
if (G_UNLIKELY (demux->priv->old_streams != NULL)) {
@ -2604,16 +2647,12 @@ gst_adaptive_demux_stream_download_loop (GstAdaptiveDemuxStream * stream)
if (G_UNLIKELY (stream->restart_download)) {
GstSegment segment;
GstEvent *seg_event;
GstClockTime cur, ts;
GstClockTime cur, ts = 0;
gint64 pos;
GST_DEBUG_OBJECT (stream->pad,
"Activating stream due to reconfigure event");
cur = ts =
gst_segment_to_stream_time (&stream->segment, GST_FORMAT_TIME,
stream->segment.position);
if (gst_pad_peer_query_position (stream->pad, GST_FORMAT_TIME, &pos)) {
ts = (GstClockTime) pos;
GST_DEBUG_OBJECT (demux, "Downstream position: %"
@ -2638,6 +2677,11 @@ gst_adaptive_demux_stream_download_loop (GstAdaptiveDemuxStream * stream)
}
}
GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
cur =
gst_segment_to_stream_time (&stream->segment, GST_FORMAT_TIME,
stream->segment.position);
/* we might have already pushed this data */
ts = MAX (ts, cur);
@ -2664,6 +2708,8 @@ gst_adaptive_demux_stream_download_loop (GstAdaptiveDemuxStream * stream)
* updated position */
seg_event = gst_event_new_segment (&stream->segment);
gst_event_set_seqnum (seg_event, demux->priv->segment_seqnum);
GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
GST_DEBUG_OBJECT (stream->pad, "Sending restart segment: %"
GST_PTR_FORMAT, seg_event);
gst_pad_push_event (stream->pad, seg_event);
@ -3084,6 +3130,7 @@ gst_adaptive_demux_stream_advance_fragment_unlocked (GstAdaptiveDemux * demux,
stream->download_total_time * GST_USECOND, NULL)));
/* Don't update to the end of the segment if in reverse playback */
GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
if (GST_CLOCK_TIME_IS_VALID (duration) && demux->segment.rate > 0) {
GstClockTime offset =
gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
@ -3099,6 +3146,7 @@ gst_adaptive_demux_stream_advance_fragment_unlocked (GstAdaptiveDemux * demux,
demux->segment.position =
stream->segment.position - offset + period_start;
}
GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
if (gst_adaptive_demux_is_live (demux)
|| gst_adaptive_demux_stream_has_next_fragment (demux, stream)) {