mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2025-01-04 06:29:31 +00:00
98e4d90519
The initial goal was to support the case where we are paused watching a live stream, and when we resume we can no longer resume from the previously downloaded position. In that case we internally do a flushing seek back to the "current live head position". This was also extended since to be able to handle (utterly broken) servers when we can't really figure out where we are anymore and therefore trigger that lost sync so we can try to get back on our feet. This does fix the issue... but results in spurious FLUSH_{START|STOP} events being sent downstream. While that's fine for regular playback scenarios, it's a bit of a wild scenario since a lot of pipelines/applications don't expect such events when it wasn't triggered by downstream/application. Fixes #3605 Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/7005>
993 lines
32 KiB
C
993 lines
32 KiB
C
/* GStreamer
|
|
*
|
|
* Copyright (C) 2014 Samsung Electronics. All rights reserved.
|
|
* Author: Thiago Santos <thiagoss@osg.samsung.com>
|
|
*
|
|
* Copyright (C) 2021-2022 Centricular Ltd
|
|
* Author: Edward Hervey <edward@centricular.com>
|
|
* Author: Jan Schmidt <jan@centricular.com>
|
|
*
|
|
* This library is free software; you can redistribute it and/or
|
|
* modify it under the terms of the GNU Library General Public
|
|
* License as published by the Free Software Foundation; either
|
|
* version 2 of the License, or (at your option) any later version.
|
|
*
|
|
* This library is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
* Library General Public License for more details.
|
|
*
|
|
* You should have received a copy of the GNU Library General Public
|
|
* License along with this library; if not, write to the
|
|
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
|
|
* Boston, MA 02110-1301, USA.
|
|
*/
|
|
|
|
#ifdef HAVE_CONFIG_H
|
|
#include "config.h"
|
|
#endif
|
|
|
|
#include "gstadaptivedemux.h"
|
|
#include "gstadaptivedemux-private.h"
|
|
|
|
GST_DEBUG_CATEGORY_EXTERN (adaptivedemux2_debug);
|
|
#define GST_CAT_DEFAULT adaptivedemux2_debug
|
|
|
|
/* TRACKS_LOCK held
|
|
* Flushes all data in the track and resets it */
|
|
void
|
|
gst_adaptive_demux_track_flush (GstAdaptiveDemuxTrack * track)
|
|
{
|
|
GST_DEBUG_ID (track->id,
|
|
"Flushing track with %" G_GSIZE_FORMAT " queued items",
|
|
gst_vec_deque_get_length (track->queue));
|
|
gst_vec_deque_clear (track->queue);
|
|
|
|
gst_event_store_flush (&track->sticky_events);
|
|
|
|
gst_segment_init (&track->input_segment, GST_FORMAT_TIME);
|
|
track->lowest_input_time = GST_CLOCK_STIME_NONE;
|
|
track->input_time = 0;
|
|
track->input_segment_seqnum = GST_SEQNUM_INVALID;
|
|
|
|
gst_segment_init (&track->output_segment, GST_FORMAT_TIME);
|
|
track->gap_position = track->gap_duration = GST_CLOCK_TIME_NONE;
|
|
|
|
track->output_time = GST_CLOCK_STIME_NONE;
|
|
track->next_position = GST_CLOCK_STIME_NONE;
|
|
|
|
track->level_bytes = 0;
|
|
track->level_time = 0;
|
|
|
|
track->eos = FALSE;
|
|
|
|
track->update_next_segment = FALSE;
|
|
|
|
track->output_discont = FALSE;
|
|
}
|
|
|
|
static gboolean
|
|
_track_sink_query_function (GstPad * pad, GstObject * parent, GstQuery * query)
|
|
{
|
|
GstAdaptiveDemuxTrack *track = gst_pad_get_element_private (pad);
|
|
gboolean ret = FALSE;
|
|
|
|
GST_DEBUG_ID (track->id, "query %" GST_PTR_FORMAT, query);
|
|
|
|
switch (GST_QUERY_TYPE (query)) {
|
|
case GST_QUERY_ACCEPT_CAPS:
|
|
/* Should we intersect by track caps as a safety check ? */
|
|
GST_DEBUG_ID (track->id, "We accept any caps on %s:%s",
|
|
GST_DEBUG_PAD_NAME (pad));
|
|
gst_query_set_accept_caps_result (query, TRUE);
|
|
ret = TRUE;
|
|
break;
|
|
default:
|
|
break;
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
|
|
/* Dequeue an item from the track queue for processing
|
|
* TRACKS_LOCK hold */
|
|
static gboolean
|
|
track_dequeue_item_locked (GstAdaptiveDemux * demux,
|
|
GstAdaptiveDemuxTrack * track, TrackQueueItem * out_item)
|
|
{
|
|
TrackQueueItem *item = gst_vec_deque_peek_head_struct (track->queue);
|
|
|
|
if (item == NULL)
|
|
return FALSE;
|
|
|
|
*out_item = *item;
|
|
gst_vec_deque_pop_head (track->queue);
|
|
|
|
GST_LOG_ID (track->id,
|
|
"item running_time %" GST_STIME_FORMAT " end %"
|
|
GST_STIME_FORMAT,
|
|
GST_STIME_ARGS (out_item->runningtime),
|
|
GST_STIME_ARGS (out_item->runningtime_end));
|
|
|
|
return TRUE;
|
|
}
|
|
|
|
static inline GstClockTimeDiff my_segment_to_running_time (GstSegment * segment,
|
|
GstClockTime val);
|
|
|
|
/* Dequeue or generate a buffer/event from the track queue and update the buffering levels
|
|
* TRACKS_LOCK hold */
|
|
GstMiniObject *
|
|
gst_adaptive_demux_track_dequeue_data_locked (GstAdaptiveDemux * demux,
|
|
GstAdaptiveDemuxTrack * track, gboolean check_sticky_events)
|
|
{
|
|
GstMiniObject *res = NULL;
|
|
gboolean is_pending_sticky = FALSE;
|
|
GstEvent *event;
|
|
GstClockTimeDiff running_time;
|
|
GstClockTimeDiff running_time_buffering = GST_CLOCK_STIME_NONE;
|
|
GstClockTimeDiff running_time_end;
|
|
gsize item_size = 0;
|
|
|
|
if (check_sticky_events) {
|
|
/* If there are any sticky events to send, do that before anything else */
|
|
event = gst_event_store_get_next_pending (&track->sticky_events);
|
|
if (event != NULL) {
|
|
res = (GstMiniObject *) event;
|
|
running_time_buffering = running_time = running_time_end =
|
|
GST_CLOCK_STIME_NONE;
|
|
GST_DEBUG_ID (track->id,
|
|
"dequeued pending sticky event %" GST_PTR_FORMAT, event);
|
|
is_pending_sticky = TRUE;
|
|
goto handle_event;
|
|
}
|
|
}
|
|
|
|
do {
|
|
TrackQueueItem item;
|
|
|
|
/* If we're filling a gap, generate a gap event */
|
|
if (track->gap_position != GST_CLOCK_TIME_NONE) {
|
|
GstClockTime pos = track->gap_position;
|
|
GstClockTime duration = track->gap_duration;
|
|
|
|
if (duration > 100 * GST_MSECOND) {
|
|
duration = 100 * GST_MSECOND;
|
|
track->gap_position += duration;
|
|
track->gap_duration -= duration;
|
|
} else {
|
|
/* Duration dropped below 100 ms, this is the last
|
|
* gap of the sequence */
|
|
track->gap_position = GST_CLOCK_TIME_NONE;
|
|
track->gap_duration = GST_CLOCK_TIME_NONE;
|
|
}
|
|
|
|
res = (GstMiniObject *) gst_event_new_gap (pos, duration);
|
|
if (track->output_segment.rate > 0.0) {
|
|
running_time = my_segment_to_running_time (&track->output_segment, pos);
|
|
running_time_buffering = running_time_end =
|
|
my_segment_to_running_time (&track->output_segment, pos + duration);
|
|
} else {
|
|
running_time =
|
|
my_segment_to_running_time (&track->output_segment, pos + duration);
|
|
running_time_buffering = running_time_end =
|
|
my_segment_to_running_time (&track->output_segment, pos);
|
|
}
|
|
item_size = 0;
|
|
break;
|
|
}
|
|
|
|
/* Otherwise, try and pop something from the item queue */
|
|
if (!track_dequeue_item_locked (demux, track, &item))
|
|
return NULL;
|
|
|
|
res = item.item;
|
|
running_time = item.runningtime;
|
|
running_time_end = item.runningtime_end;
|
|
running_time_buffering = item.runningtime_buffering;
|
|
item_size = item.size;
|
|
|
|
/* Special case for a gap event, to drain them out little-by-little.
|
|
* See if it can be output directly, otherwise set up to fill a gap and loop again */
|
|
if (GST_IS_EVENT (res) && GST_EVENT_TYPE (res) == GST_EVENT_GAP
|
|
&& GST_CLOCK_STIME_IS_VALID (running_time)) {
|
|
GstClockTime pos, duration;
|
|
GstClockTime cstart, cstop;
|
|
|
|
gst_event_parse_gap (GST_EVENT_CAST (res), &pos, &duration);
|
|
|
|
/* Handle a track with no duration as 0 duration. This can only
|
|
* happen if an element in parsebin emits such a gap event */
|
|
if (duration == GST_CLOCK_TIME_NONE)
|
|
duration = 0;
|
|
|
|
/* We *can* end up with a gap outside of the segment range due to segment
|
|
* base updating when (re)activating a track. In that case, just let the gap
|
|
* event flow out normally.
|
|
* Otherwise, this gap crosses into the segment, clip it to the ends and set up to fill the gap */
|
|
if (!gst_segment_clip (&track->output_segment, GST_FORMAT_TIME, pos,
|
|
pos + duration, &cstart, &cstop))
|
|
break;
|
|
|
|
pos = cstart;
|
|
duration = cstop - cstart;
|
|
|
|
GST_DEBUG_ID (track->id,
|
|
"Starting gap for runningtime %" GST_STIME_FORMAT
|
|
" - clipped position %" GST_TIME_FORMAT " duration %" GST_TIME_FORMAT,
|
|
GST_STIME_ARGS (running_time),
|
|
GST_TIME_ARGS (pos), GST_TIME_ARGS (duration));
|
|
|
|
track->gap_position = pos;
|
|
track->gap_duration = duration;
|
|
|
|
gst_mini_object_unref (res);
|
|
res = NULL;
|
|
continue;
|
|
}
|
|
} while (res == NULL);
|
|
|
|
handle_event:
|
|
if (GST_IS_EVENT (res)) {
|
|
event = (GstEvent *) res;
|
|
|
|
switch (GST_EVENT_TYPE (event)) {
|
|
case GST_EVENT_SEGMENT:
|
|
gst_event_copy_segment (event, &track->output_segment);
|
|
|
|
if (!GST_CLOCK_STIME_IS_VALID (track->output_time)) {
|
|
if (track->output_segment.rate > 0.0) {
|
|
track->output_time =
|
|
my_segment_to_running_time (&track->output_segment,
|
|
track->output_segment.start);
|
|
} else {
|
|
track->output_time =
|
|
my_segment_to_running_time (&track->output_segment,
|
|
track->output_segment.stop);
|
|
}
|
|
}
|
|
|
|
if (track->update_next_segment) {
|
|
GstClockTimeDiff global_output_position =
|
|
demux->priv->global_output_position;
|
|
|
|
GST_DEBUG_ID (track->id, "Override segment for running time %"
|
|
GST_STIME_FORMAT " : %" GST_PTR_FORMAT,
|
|
GST_STIME_ARGS (global_output_position), event);
|
|
gst_event_unref (event);
|
|
gst_segment_set_running_time (&track->output_segment, GST_FORMAT_TIME,
|
|
global_output_position);
|
|
|
|
event = gst_event_new_segment (&track->output_segment);
|
|
gst_event_set_seqnum (event, track->demux->priv->segment_seqnum);
|
|
|
|
res = (GstMiniObject *) event;
|
|
running_time = global_output_position;
|
|
|
|
track->update_next_segment = FALSE;
|
|
|
|
/* Replace the stored sticky event with this one */
|
|
is_pending_sticky = FALSE;
|
|
}
|
|
|
|
if (demux->priv->base_offset != 0) {
|
|
GstSegment ev_segment;
|
|
GstEvent *new_event;
|
|
GST_DEBUG_ID (track->id,
|
|
"Offsetting segment base by %" GST_TIME_FORMAT,
|
|
GST_TIME_ARGS (demux->priv->base_offset));
|
|
gst_event_copy_segment (event, &ev_segment);
|
|
ev_segment.base += demux->priv->base_offset;
|
|
new_event = gst_event_new_segment (&ev_segment);
|
|
gst_event_set_seqnum (new_event, gst_event_get_seqnum (event));
|
|
gst_mini_object_unref (res);
|
|
event = new_event;
|
|
res = (GstMiniObject *) event;
|
|
/* Replace the stored sticky event with this one */
|
|
is_pending_sticky = FALSE;
|
|
}
|
|
|
|
break;
|
|
default:
|
|
break;
|
|
}
|
|
|
|
/* Store any sticky event in the cache, unless this is already an event
|
|
* from the pending sticky_events store */
|
|
if (!is_pending_sticky && GST_EVENT_IS_STICKY (event)) {
|
|
GST_DEBUG_ID (track->id, "Storing sticky event %" GST_PTR_FORMAT, event);
|
|
gst_event_store_insert_event (&track->sticky_events, event, FALSE);
|
|
}
|
|
}
|
|
|
|
/* Update track buffering levels */
|
|
if (GST_CLOCK_STIME_IS_VALID (running_time_buffering)) {
|
|
track->output_time = running_time_buffering;
|
|
|
|
GST_LOG_ID (track->id,
|
|
"buffering time:%" GST_STIME_FORMAT,
|
|
GST_STIME_ARGS (running_time_buffering));
|
|
|
|
gst_adaptive_demux_track_update_level_locked (track);
|
|
} else {
|
|
GST_LOG_ID (track->id, "popping untimed item %" GST_PTR_FORMAT, res);
|
|
}
|
|
|
|
track->level_bytes -= item_size;
|
|
|
|
return res;
|
|
}
|
|
|
|
void
|
|
gst_adaptive_demux_track_drain_to (GstAdaptiveDemuxTrack * track,
|
|
GstClockTime drain_running_time)
|
|
{
|
|
GstAdaptiveDemux *demux = track->demux;
|
|
|
|
GST_DEBUG_ID (track->id,
|
|
"draining to running time %" GST_STIME_FORMAT,
|
|
GST_STIME_ARGS (drain_running_time));
|
|
|
|
while (track->next_position == GST_CLOCK_STIME_NONE ||
|
|
track->next_position < drain_running_time) {
|
|
TrackQueueItem *item;
|
|
GstMiniObject *next_mo = NULL;
|
|
|
|
/* If we're in a gap, and the end time is after the target running time,
|
|
* exit */
|
|
if (track->gap_position != GST_CLOCK_TIME_NONE) {
|
|
GstClockTimeDiff running_time_end;
|
|
GstClockTimeDiff gap_end = track->gap_position;
|
|
|
|
/* In reverse playback, the start of the gap is the highest
|
|
* running time, so only add duration for forward play */
|
|
if (track->output_segment.rate > 0)
|
|
gap_end += track->gap_duration;
|
|
|
|
running_time_end =
|
|
my_segment_to_running_time (&track->output_segment, gap_end);
|
|
|
|
if (running_time_end >= drain_running_time) {
|
|
GST_DEBUG_ID (track->id,
|
|
"drained to GAP with running time %" GST_STIME_FORMAT,
|
|
GST_STIME_ARGS (running_time_end));
|
|
return;
|
|
}
|
|
|
|
/* Otherwise this gap is complete, so skip it */
|
|
track->gap_position = GST_CLOCK_STIME_NONE;
|
|
}
|
|
|
|
/* Otherwise check what's enqueued */
|
|
item = gst_vec_deque_peek_head_struct (track->queue);
|
|
/* track is empty, we're done */
|
|
if (item == NULL) {
|
|
GST_DEBUG_ID (track->id, "Track completely drained");
|
|
return;
|
|
}
|
|
|
|
/* If the item has a running time, and it's after the drain_running_time
|
|
* we're done. */
|
|
if (item->runningtime != GST_CLOCK_STIME_NONE
|
|
&& item->runningtime >= drain_running_time) {
|
|
GST_DEBUG_ID (track->id, "Track drained to item %" GST_PTR_FORMAT
|
|
" with running time %" GST_STIME_FORMAT,
|
|
item->item, GST_STIME_ARGS (item->runningtime));
|
|
return;
|
|
}
|
|
|
|
GST_DEBUG_ID (track->id, "discarding %" GST_PTR_FORMAT
|
|
" with running time %" GST_STIME_FORMAT,
|
|
item->item, GST_STIME_ARGS (item->runningtime));
|
|
|
|
/* Dequeue the item and discard. Sticky events
|
|
* will be collected by the dequeue function, gaps will be started.
|
|
* If it's a buffer, mark the track as discont to get the flag set
|
|
* on the next output buffer */
|
|
next_mo =
|
|
gst_adaptive_demux_track_dequeue_data_locked (demux, track, FALSE);
|
|
if (GST_IS_BUFFER (next_mo)) {
|
|
track->output_discont = TRUE;
|
|
}
|
|
gst_mini_object_unref (next_mo);
|
|
gst_adaptive_demux_track_update_next_position (track);
|
|
}
|
|
|
|
GST_DEBUG_ID (track->id,
|
|
"drained to running time %" GST_STIME_FORMAT,
|
|
GST_STIME_ARGS (track->next_position));
|
|
}
|
|
|
|
static inline GstClockTimeDiff
|
|
my_segment_to_running_time (GstSegment * segment, GstClockTime val)
|
|
{
|
|
GstClockTimeDiff res = GST_CLOCK_STIME_NONE;
|
|
|
|
if (GST_CLOCK_TIME_IS_VALID (val)) {
|
|
gboolean sign =
|
|
gst_segment_to_running_time_full (segment, GST_FORMAT_TIME, val, &val);
|
|
if (sign > 0)
|
|
res = val;
|
|
else if (sign < 0)
|
|
res = -val;
|
|
}
|
|
return res;
|
|
}
|
|
|
|
/* Queues an item on a track queue and updates the buffering levels
|
|
* TRACKS_LOCK hold */
|
|
static void
|
|
track_queue_data_locked (GstAdaptiveDemux * demux,
|
|
GstAdaptiveDemuxTrack * track, GstMiniObject * object, gsize size,
|
|
GstClockTime timestamp, GstClockTime duration, gboolean is_discont)
|
|
{
|
|
TrackQueueItem item;
|
|
|
|
item.item = object;
|
|
item.size = size;
|
|
item.runningtime = GST_CLOCK_STIME_NONE;
|
|
item.runningtime_end = GST_CLOCK_STIME_NONE;
|
|
item.runningtime_buffering = GST_CLOCK_STIME_NONE;
|
|
|
|
if (timestamp != GST_CLOCK_TIME_NONE) {
|
|
GstClockTimeDiff input_time;
|
|
|
|
/* Set the running time of the item */
|
|
input_time = item.runningtime_end = item.runningtime =
|
|
my_segment_to_running_time (&track->input_segment, timestamp);
|
|
|
|
/* Update segment position (include duration if valid) */
|
|
track->input_segment.position = timestamp;
|
|
|
|
if (GST_CLOCK_TIME_IS_VALID (duration)) {
|
|
if (track->input_segment.rate > 0.0) {
|
|
/* Forward playback, add duration onto our position and update
|
|
* the input time to match */
|
|
track->input_segment.position += duration;
|
|
item.runningtime_end = input_time =
|
|
my_segment_to_running_time (&track->input_segment,
|
|
track->input_segment.position);
|
|
} else {
|
|
/* Otherwise, the end of the buffer has the smaller running time and
|
|
* we need to change the item.runningtime, but input_time and runningtime_end
|
|
* are already set to the larger running time */
|
|
item.runningtime = my_segment_to_running_time (&track->input_segment,
|
|
timestamp + duration);
|
|
}
|
|
}
|
|
|
|
/* Update track input time and level */
|
|
if (!GST_CLOCK_STIME_IS_VALID (track->lowest_input_time))
|
|
track->lowest_input_time = track->input_time;
|
|
|
|
if (track->input_segment.rate > 0.0) {
|
|
if (input_time > track->input_time) {
|
|
track->input_time = input_time;
|
|
}
|
|
} else {
|
|
/* In reverse playback, we track input time differently, to do buffering
|
|
* across the reversed GOPs. Each GOP arrives in reverse order, with
|
|
* running time moving backward, then jumping forward at the start of
|
|
* each GOP. At each point, we want the input time to be the lowest
|
|
* running time of the previous GOP. Therefore, we track input times
|
|
* into a different variable, and transfer it across when a discont buffer
|
|
* arrives */
|
|
if (is_discont) {
|
|
track->input_time = track->lowest_input_time;
|
|
track->lowest_input_time = input_time;
|
|
} else if (input_time < track->lowest_input_time) {
|
|
track->lowest_input_time = input_time;
|
|
}
|
|
}
|
|
|
|
/* Store the maximum running time we've seen as
|
|
* this item's "buffering running time" */
|
|
item.runningtime_buffering = track->input_time;
|
|
|
|
/* Configure the track output time if nothing was dequeued yet,
|
|
* so buffering level is updated correctly */
|
|
if (!GST_CLOCK_STIME_IS_VALID (track->output_time)) {
|
|
track->output_time = track->lowest_input_time;
|
|
GST_LOG_ID (track->id,
|
|
"setting output_time = lowest input_time = %"
|
|
GST_STIME_FORMAT, GST_STIME_ARGS (track->output_time));
|
|
}
|
|
|
|
gst_adaptive_demux_track_update_level_locked (track);
|
|
}
|
|
|
|
GST_LOG_ID (track->id,
|
|
"item running_time :%" GST_STIME_FORMAT " end :%"
|
|
GST_STIME_FORMAT, GST_STIME_ARGS (item.runningtime),
|
|
GST_STIME_ARGS (item.runningtime_end));
|
|
|
|
track->level_bytes += size;
|
|
gst_vec_deque_push_tail_struct (track->queue, &item);
|
|
|
|
/* If we were waiting for this track to add something, notify output thread */
|
|
/* FIXME: This should be in adaptive demux */
|
|
if (track->waiting_add) {
|
|
g_cond_signal (&demux->priv->tracks_add);
|
|
}
|
|
}
|
|
|
|
static GstFlowReturn
|
|
_track_sink_chain_function (GstPad * pad, GstObject * parent,
|
|
GstBuffer * buffer)
|
|
{
|
|
GstAdaptiveDemuxTrack *track = gst_pad_get_element_private (pad);
|
|
GstAdaptiveDemux *demux = track->demux;
|
|
GstClockTime ts;
|
|
|
|
GST_DEBUG_ID (track->id, "buffer %" GST_PTR_FORMAT, buffer);
|
|
|
|
TRACKS_LOCK (demux);
|
|
|
|
/* Discard buffers that are received outside of a valid segment. This can
|
|
* happen if a flushing seek (which resets the track segment seqnums) was
|
|
* received but the stream is still providing buffers before returning.
|
|
*/
|
|
if (track->input_segment_seqnum == GST_SEQNUM_INVALID) {
|
|
GST_DEBUG_OBJECT (pad,
|
|
"Dropping buffer because we do not have a valid input segment");
|
|
gst_buffer_unref (buffer);
|
|
TRACKS_UNLOCK (demux);
|
|
return GST_FLOW_OK;
|
|
}
|
|
|
|
ts = GST_BUFFER_DTS_OR_PTS (buffer);
|
|
|
|
/* Buffers coming out of parsebin *should* always be timestamped (it's the
|
|
* goal of parsebin after all). The tracks will use that (converted to
|
|
* running-time) in order to track position and buffering levels.
|
|
*
|
|
* Unfortunately there are valid cases were the parsers won't be able to
|
|
* timestamp all frames (due to the underlying formats or muxing). For those
|
|
* cases, we use the last incoming timestamp (via the track input GstSegment
|
|
* position):
|
|
*
|
|
* * If buffers were previously received, that segment position will
|
|
* correspond to the last timestamped-buffer PTS/DTS
|
|
*
|
|
* * If *no* buffers were previously received, the segment position *should*
|
|
* correspond to the valid initial position (in buffer timestamps). If not
|
|
* set, we need to bail out.
|
|
*/
|
|
if (!GST_CLOCK_TIME_IS_VALID (ts)) {
|
|
if (GST_CLOCK_TIME_IS_VALID (track->input_segment.position)) {
|
|
GST_WARNING_ID (track->id,
|
|
"buffer doesn't have any pts or dts, using segment position (%"
|
|
GST_TIME_FORMAT ")", GST_TIME_ARGS (track->input_segment.position));
|
|
ts = track->input_segment.position;
|
|
} else {
|
|
GST_ERROR_ID (track->id, "initial buffer doesn't have any pts or dts !");
|
|
gst_buffer_unref (buffer);
|
|
TRACKS_UNLOCK (demux);
|
|
return GST_FLOW_ERROR;
|
|
}
|
|
}
|
|
|
|
if (GST_CLOCK_TIME_IS_VALID (track->input_segment.position) &&
|
|
ts > track->input_segment.position &&
|
|
ts > track->input_segment.start &&
|
|
ts - track->input_segment.position > 100 * GST_MSECOND) {
|
|
GstClockTime duration = ts - track->input_segment.position;
|
|
GstEvent *gap = gst_event_new_gap (track->input_segment.position, duration);
|
|
/* Insert gap event to ensure coherent interleave */
|
|
GST_DEBUG_ID (track->id,
|
|
"Inserting gap for %" GST_TIME_FORMAT " vs %" GST_TIME_FORMAT,
|
|
GST_TIME_ARGS (ts), GST_TIME_ARGS (track->input_segment.position));
|
|
track_queue_data_locked (demux, track, (GstMiniObject *) gap, 0,
|
|
track->input_segment.position, duration, FALSE);
|
|
}
|
|
|
|
track_queue_data_locked (demux, track, (GstMiniObject *) buffer,
|
|
gst_buffer_get_size (buffer), ts, GST_BUFFER_DURATION (buffer),
|
|
GST_BUFFER_IS_DISCONT (buffer));
|
|
|
|
/* Recalculate buffering */
|
|
demux_update_buffering_locked (demux);
|
|
demux_post_buffering_locked (demux);
|
|
/* UNLOCK */
|
|
TRACKS_UNLOCK (demux);
|
|
|
|
return GST_FLOW_OK;
|
|
}
|
|
|
|
static gboolean
|
|
_track_sink_event_function (GstPad * pad, GstObject * parent, GstEvent * event)
|
|
{
|
|
GstAdaptiveDemuxTrack *track = gst_pad_get_element_private (pad);
|
|
GstAdaptiveDemux *demux = track->demux;
|
|
GstClockTime timestamp = GST_CLOCK_TIME_NONE;
|
|
GstClockTime duration = GST_CLOCK_TIME_NONE;
|
|
gboolean drop = FALSE;
|
|
gboolean is_discont = FALSE;
|
|
|
|
GST_DEBUG_ID (track->id, "event %" GST_PTR_FORMAT, event);
|
|
|
|
TRACKS_LOCK (demux);
|
|
|
|
switch (GST_EVENT_TYPE (event)) {
|
|
case GST_EVENT_STREAM_COLLECTION:
|
|
{
|
|
/* Replace upstream collection with demux collection */
|
|
GST_DEBUG_ID (track->id, "Dropping stream-collection, we send our own");
|
|
drop = TRUE;
|
|
break;
|
|
}
|
|
case GST_EVENT_STREAM_START:
|
|
{
|
|
GST_DEBUG_ID (track->id, "Dropping stream-start, we send our own");
|
|
if (track->eos) {
|
|
gint i, len;
|
|
/* Find and drop latest EOS if present */
|
|
len = gst_vec_deque_get_length (track->queue);
|
|
for (i = len - 1; i >= 0; i--) {
|
|
TrackQueueItem *item =
|
|
gst_vec_deque_peek_nth_struct (track->queue, i);
|
|
if (GST_IS_EVENT (item->item)
|
|
&& GST_EVENT_TYPE (item->item) == GST_EVENT_EOS) {
|
|
TrackQueueItem sub;
|
|
GST_DEBUG_ID (track->id,
|
|
"Removing previously received EOS (pos:%d)", i);
|
|
if (gst_vec_deque_drop_struct (track->queue, i, &sub))
|
|
gst_mini_object_unref (sub.item);
|
|
break;
|
|
}
|
|
}
|
|
track->eos = FALSE;
|
|
}
|
|
drop = TRUE;
|
|
break;
|
|
}
|
|
case GST_EVENT_EOS:
|
|
{
|
|
if (track->pending_srcpad != NULL) {
|
|
GST_DEBUG_ID (track->id,
|
|
"Dropping EOS because we have a pending pad switch");
|
|
drop = TRUE;
|
|
} else {
|
|
track->eos = TRUE;
|
|
}
|
|
break;
|
|
}
|
|
case GST_EVENT_FLUSH_STOP:
|
|
case GST_EVENT_FLUSH_START:
|
|
{
|
|
/* Drop flush events */
|
|
drop = TRUE;
|
|
break;
|
|
}
|
|
default:
|
|
break;
|
|
}
|
|
|
|
if (drop || !GST_EVENT_IS_SERIALIZED (event)) {
|
|
GST_DEBUG_ID (track->id, "dropping event %s", GST_EVENT_TYPE_NAME (event));
|
|
gst_event_unref (event);
|
|
TRACKS_UNLOCK (demux);
|
|
/* Silently "accept" them */
|
|
return TRUE;
|
|
}
|
|
|
|
switch (GST_EVENT_TYPE (event)) {
|
|
case GST_EVENT_SEGMENT:
|
|
{
|
|
guint64 seg_seqnum = gst_event_get_seqnum (event);
|
|
|
|
if (track->input_segment_seqnum == seg_seqnum) {
|
|
GST_DEBUG_ID (track->id, "Ignoring duplicate segment");
|
|
gst_event_unref (event);
|
|
TRACKS_UNLOCK (demux);
|
|
|
|
return TRUE;
|
|
}
|
|
|
|
if (seg_seqnum != demux->priv->segment_seqnum) {
|
|
GST_DEBUG_OBJECT (pad, "Ignoring non-current segment");
|
|
gst_event_unref (event);
|
|
TRACKS_UNLOCK (demux);
|
|
|
|
return TRUE;
|
|
}
|
|
|
|
track->input_segment_seqnum = seg_seqnum;
|
|
gst_event_copy_segment (event, &track->input_segment);
|
|
if (track->input_segment.rate >= 0)
|
|
track->input_segment.position = track->input_segment.start;
|
|
else
|
|
track->input_segment.position = track->input_segment.stop;
|
|
GST_DEBUG_ID (track->id, "stored segment %" GST_SEGMENT_FORMAT,
|
|
&track->input_segment);
|
|
timestamp = track->input_segment.position;
|
|
is_discont = TRUE;
|
|
|
|
break;
|
|
}
|
|
case GST_EVENT_GAP:
|
|
{
|
|
gst_event_parse_gap (event, ×tamp, &duration);
|
|
|
|
if (!GST_CLOCK_TIME_IS_VALID (timestamp)) {
|
|
GST_DEBUG_ID (track->id, "Dropping gap event with invalid timestamp");
|
|
goto drop_ok;
|
|
}
|
|
|
|
break;
|
|
}
|
|
default:
|
|
break;
|
|
}
|
|
|
|
track_queue_data_locked (demux, track, (GstMiniObject *) event, 0,
|
|
timestamp, duration, is_discont);
|
|
|
|
/* Recalculate buffering */
|
|
demux_update_buffering_locked (demux);
|
|
demux_post_buffering_locked (demux);
|
|
|
|
TRACKS_UNLOCK (demux);
|
|
|
|
return TRUE;
|
|
|
|
/* errors */
|
|
drop_ok:
|
|
{
|
|
gst_event_unref (event);
|
|
TRACKS_UNLOCK (demux);
|
|
return TRUE;
|
|
}
|
|
}
|
|
|
|
static void
|
|
track_sinkpad_unlinked_cb (GstPad * sinkpad, GstPad * parsebin_srcpad,
|
|
GstAdaptiveDemuxTrack * track)
|
|
{
|
|
GST_DEBUG_OBJECT (sinkpad, "Got unlinked from %s:%s",
|
|
GST_DEBUG_PAD_NAME (parsebin_srcpad));
|
|
|
|
if (track->pending_srcpad) {
|
|
GST_DEBUG_OBJECT (sinkpad, "linking to pending pad %s:%s",
|
|
GST_DEBUG_PAD_NAME (track->pending_srcpad));
|
|
|
|
if (gst_pad_link (track->pending_srcpad, sinkpad) != GST_PAD_LINK_OK) {
|
|
GST_ERROR_OBJECT (sinkpad, "could not link pending pad !");
|
|
}
|
|
gst_object_unref (track->pending_srcpad);
|
|
track->pending_srcpad = NULL;
|
|
}
|
|
}
|
|
|
|
/* TRACKS_LOCK held
|
|
* Call this to update the track next_position with timed data */
|
|
void
|
|
gst_adaptive_demux_track_update_next_position (GstAdaptiveDemuxTrack * track)
|
|
{
|
|
guint i, len;
|
|
|
|
/* If filling a gap, the next position is the gap position */
|
|
if (track->gap_position != GST_CLOCK_TIME_NONE) {
|
|
track->next_position =
|
|
my_segment_to_running_time (&track->output_segment,
|
|
track->gap_position);
|
|
return;
|
|
}
|
|
|
|
len = gst_vec_deque_get_length (track->queue);
|
|
for (i = 0; i < len; i++) {
|
|
TrackQueueItem *item = gst_vec_deque_peek_nth_struct (track->queue, i);
|
|
|
|
if (item->runningtime != GST_CLOCK_STIME_NONE) {
|
|
GST_DEBUG_ID (track->id,
|
|
"next position %" GST_STIME_FORMAT,
|
|
GST_STIME_ARGS (item->runningtime));
|
|
track->next_position = item->runningtime;
|
|
return;
|
|
}
|
|
}
|
|
track->next_position = GST_CLOCK_STIME_NONE;
|
|
|
|
GST_DEBUG_ID (track->id, "Track doesn't have any pending timed data");
|
|
}
|
|
|
|
/* TRACKS_LOCK held. Recomputes the level_time for the track */
|
|
void
|
|
gst_adaptive_demux_track_update_level_locked (GstAdaptiveDemuxTrack * track)
|
|
{
|
|
GstAdaptiveDemux *demux = track->demux;
|
|
GstClockTimeDiff output_time;
|
|
|
|
if (GST_CLOCK_STIME_IS_VALID (track->output_time))
|
|
output_time = MAX (track->output_time, demux->priv->global_output_position);
|
|
else
|
|
output_time = MIN (track->input_time, demux->priv->global_output_position);
|
|
|
|
if (track->input_time >= output_time)
|
|
track->level_time = track->input_time - output_time;
|
|
else
|
|
track->level_time = 0;
|
|
|
|
GST_LOG_ID (track->id,
|
|
"input_time:%" GST_STIME_FORMAT " output_time:%"
|
|
GST_STIME_FORMAT " level:%" GST_TIME_FORMAT,
|
|
GST_STIME_ARGS (track->input_time),
|
|
GST_STIME_ARGS (track->output_time), GST_TIME_ARGS (track->level_time));
|
|
}
|
|
|
|
static void
|
|
_demux_track_free (GstAdaptiveDemuxTrack * track)
|
|
{
|
|
GST_DEBUG_ID (track->id, "freeing track");
|
|
|
|
g_free (track->stream_id);
|
|
g_free (track->upstream_stream_id);
|
|
g_free (track->id);
|
|
|
|
if (track->pending_srcpad)
|
|
gst_object_unref (track->pending_srcpad);
|
|
|
|
if (track->generic_caps)
|
|
gst_caps_unref (track->generic_caps);
|
|
gst_object_unref (track->stream_object);
|
|
if (track->tags)
|
|
gst_tag_list_unref (track->tags);
|
|
gst_vec_deque_free (track->queue);
|
|
|
|
gst_event_store_deinit (&track->sticky_events);
|
|
|
|
if (track->element != NULL) {
|
|
gst_element_set_state (track->element, GST_STATE_NULL);
|
|
gst_bin_remove (GST_BIN_CAST (track->demux), track->element);
|
|
}
|
|
|
|
g_free (track);
|
|
}
|
|
|
|
GstAdaptiveDemuxTrack *
|
|
gst_adaptive_demux_track_ref (GstAdaptiveDemuxTrack * track)
|
|
{
|
|
g_return_val_if_fail (track != NULL, NULL);
|
|
GST_TRACE_ID (track->id, "%d -> %d", track->ref_count, track->ref_count + 1);
|
|
g_atomic_int_inc (&track->ref_count);
|
|
|
|
return track;
|
|
}
|
|
|
|
void
|
|
gst_adaptive_demux_track_unref (GstAdaptiveDemuxTrack * track)
|
|
{
|
|
g_return_if_fail (track != NULL);
|
|
|
|
GST_TRACE_ID (track->id, "%d -> %d", track->ref_count, track->ref_count - 1);
|
|
if (g_atomic_int_dec_and_test (&track->ref_count)) {
|
|
_demux_track_free (track);
|
|
}
|
|
}
|
|
|
|
static void
|
|
_track_queue_item_clear (TrackQueueItem * item)
|
|
{
|
|
if (item->item) {
|
|
gst_mini_object_unref ((GstMiniObject *) item->item);
|
|
item->item = NULL;
|
|
}
|
|
}
|
|
|
|
/* Internal function which actually adds the elements to the demuxer */
|
|
gboolean
|
|
gst_adaptive_demux_track_add_elements (GstAdaptiveDemuxTrack * track,
|
|
guint period_num)
|
|
{
|
|
GstAdaptiveDemux *demux = track->demux;
|
|
gchar *tmpid;
|
|
guint i, len;
|
|
|
|
/* Store the period number for debugging output */
|
|
track->period_num = period_num;
|
|
|
|
tmpid = g_strdup_printf ("%s-period%d", track->id, period_num);
|
|
g_free (track->id);
|
|
track->id = tmpid;
|
|
len = strlen (track->id);
|
|
for (i = 0; i < len; i++)
|
|
if (track->id[i] == ' ')
|
|
track->id[i] = '_';
|
|
track->element = gst_bin_new (track->id);
|
|
|
|
track->sinkpad = gst_pad_new ("sink", GST_PAD_SINK);
|
|
g_signal_connect (track->sinkpad, "unlinked",
|
|
(GCallback) track_sinkpad_unlinked_cb, track);
|
|
gst_element_add_pad (GST_ELEMENT_CAST (track->element), track->sinkpad);
|
|
gst_pad_set_element_private (track->sinkpad, track);
|
|
gst_pad_set_chain_function (track->sinkpad, _track_sink_chain_function);
|
|
gst_pad_set_event_function (track->sinkpad, _track_sink_event_function);
|
|
gst_pad_set_query_function (track->sinkpad, _track_sink_query_function);
|
|
|
|
if (!gst_bin_add (GST_BIN_CAST (demux), track->element)) {
|
|
track->element = NULL;
|
|
return FALSE;
|
|
}
|
|
|
|
gst_element_sync_state_with_parent (track->element);
|
|
return TRUE;
|
|
}
|
|
|
|
/**
|
|
* gst_adaptive_demux_track_new:
|
|
* @demux: a #GstAdaptiveDemux
|
|
* @type: a #GstStreamType
|
|
* @flags: a #GstStreamFlags
|
|
* @stream_id: (transfer none): The stream id for the new track
|
|
* @caps: (transfer full): The caps for the track
|
|
* @tags: (allow-none) (transfer full): The tags for the track
|
|
*
|
|
* Create and register a new #GstAdaptiveDemuxTrack
|
|
*
|
|
* Returns: (transfer none) The new track
|
|
*/
|
|
GstAdaptiveDemuxTrack *
|
|
gst_adaptive_demux_track_new (GstAdaptiveDemux * demux,
|
|
GstStreamType type,
|
|
GstStreamFlags flags, gchar * stream_id, GstCaps * caps, GstTagList * tags)
|
|
{
|
|
GstAdaptiveDemuxTrack *track;
|
|
|
|
g_return_val_if_fail (stream_id != NULL, NULL);
|
|
g_return_val_if_fail (type && type != GST_STREAM_TYPE_UNKNOWN, NULL);
|
|
|
|
|
|
GST_DEBUG_OBJECT (demux, "type:%s stream_id:%s caps:%" GST_PTR_FORMAT,
|
|
gst_stream_type_get_name (type), stream_id, caps);
|
|
|
|
track = g_new0 (GstAdaptiveDemuxTrack, 1);
|
|
g_atomic_int_set (&track->ref_count, 1);
|
|
track->demux = demux;
|
|
track->type = type;
|
|
track->flags = flags;
|
|
track->stream_id =
|
|
gst_element_decorate_stream_id (GST_ELEMENT (demux), stream_id);
|
|
track->id = g_strdup_printf ("track-%s", stream_id);
|
|
track->period_num = (guint) (-1);
|
|
track->generic_caps = caps;
|
|
track->stream_object = gst_stream_new (track->stream_id, caps, type, flags);
|
|
if (tags) {
|
|
gst_stream_set_tags (track->stream_object, tags);
|
|
track->tags = tags;
|
|
}
|
|
|
|
track->selected = FALSE;
|
|
track->active = FALSE;
|
|
track->draining = FALSE;
|
|
|
|
track->queue = gst_vec_deque_new_for_struct (sizeof (TrackQueueItem), 50);
|
|
gst_vec_deque_set_clear_func (track->queue,
|
|
(GDestroyNotify) _track_queue_item_clear);
|
|
|
|
gst_event_store_init (&track->sticky_events);
|
|
|
|
track->waiting_add = TRUE;
|
|
|
|
/* We have no fragment duration yet, so the buffering threshold is just the
|
|
* low watermark in time for now */
|
|
GST_OBJECT_LOCK (demux);
|
|
track->buffering_threshold = demux->buffering_low_watermark_time;
|
|
GST_OBJECT_UNLOCK (demux);
|
|
|
|
gst_segment_init (&track->input_segment, GST_FORMAT_TIME);
|
|
track->input_time = 0;
|
|
track->input_segment_seqnum = GST_SEQNUM_INVALID;
|
|
|
|
gst_segment_init (&track->output_segment, GST_FORMAT_TIME);
|
|
track->gap_position = track->gap_duration = GST_CLOCK_TIME_NONE;
|
|
|
|
track->output_time = GST_CLOCK_STIME_NONE;
|
|
track->next_position = GST_CLOCK_STIME_NONE;
|
|
|
|
track->update_next_segment = FALSE;
|
|
|
|
track->level_bytes = 0;
|
|
track->level_time = 0;
|
|
|
|
return track;
|
|
}
|