gstreamer/subprojects/gst-plugins-good/ext/adaptivedemux2/gstadaptivedemux-track.c
Edward Hervey 12b689f6b5 adaptivedemux2: Fix early seeking
When seeking is handled by the collection posting thread, there is a possibility
that some leftover data will be pushed by the stream thread.

Properly detect and reject those early segments (and buffers) by comparing it to
the main segment seqnum

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/4922>
2023-06-23 08:42:03 +02:00

986 lines
32 KiB
C

/* GStreamer
*
* Copyright (C) 2014 Samsung Electronics. All rights reserved.
* Author: Thiago Santos <thiagoss@osg.samsung.com>
*
* Copyright (C) 2021-2022 Centricular Ltd
* Author: Edward Hervey <edward@centricular.com>
* Author: Jan Schmidt <jan@centricular.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include "gstadaptivedemux.h"
#include "gstadaptivedemux-private.h"
GST_DEBUG_CATEGORY_EXTERN (adaptivedemux2_debug);
#define GST_CAT_DEFAULT adaptivedemux2_debug
/* TRACKS_LOCK held
* Flushes all data in the track and resets it */
void
gst_adaptive_demux_track_flush (GstAdaptiveDemuxTrack * track)
{
GST_DEBUG_OBJECT (track->demux, "Flushing track '%s' with %u queued items",
track->stream_id, gst_queue_array_get_length (track->queue));
gst_queue_array_clear (track->queue);
gst_event_store_flush (&track->sticky_events);
gst_segment_init (&track->input_segment, GST_FORMAT_TIME);
track->lowest_input_time = GST_CLOCK_STIME_NONE;
track->input_time = 0;
track->input_segment_seqnum = GST_SEQNUM_INVALID;
gst_segment_init (&track->output_segment, GST_FORMAT_TIME);
track->gap_position = track->gap_duration = GST_CLOCK_TIME_NONE;
track->output_time = GST_CLOCK_STIME_NONE;
track->next_position = GST_CLOCK_STIME_NONE;
track->level_bytes = 0;
track->level_time = 0;
track->eos = FALSE;
track->update_next_segment = FALSE;
track->output_discont = FALSE;
}
static gboolean
_track_sink_query_function (GstPad * pad, GstObject * parent, GstQuery * query)
{
GstAdaptiveDemuxTrack *track = gst_pad_get_element_private (pad);
GstAdaptiveDemux *demux = track->demux;
gboolean ret = FALSE;
GST_DEBUG_OBJECT (pad, "query %" GST_PTR_FORMAT, query);
switch (GST_QUERY_TYPE (query)) {
case GST_QUERY_ACCEPT_CAPS:
/* Should we intersect by track caps as a safety check ? */
GST_DEBUG_OBJECT (demux, "We accept any caps on %s:%s",
GST_DEBUG_PAD_NAME (pad));
gst_query_set_accept_caps_result (query, TRUE);
ret = TRUE;
break;
default:
break;
}
return ret;
}
/* Dequeue an item from the track queue for processing
* TRACKS_LOCK hold */
static gboolean
track_dequeue_item_locked (GstAdaptiveDemux * demux,
GstAdaptiveDemuxTrack * track, TrackQueueItem * out_item)
{
TrackQueueItem *item = gst_queue_array_peek_head_struct (track->queue);
if (item == NULL)
return FALSE;
*out_item = *item;
gst_queue_array_pop_head (track->queue);
GST_LOG_OBJECT (demux,
"track %s (period %u) item running_time %" GST_STIME_FORMAT " end %"
GST_STIME_FORMAT, track->stream_id, track->period_num,
GST_STIME_ARGS (out_item->runningtime),
GST_STIME_ARGS (out_item->runningtime_end));
return TRUE;
}
static inline GstClockTimeDiff my_segment_to_running_time (GstSegment * segment,
GstClockTime val);
/* Dequeue or generate a buffer/event from the track queue and update the buffering levels
* TRACKS_LOCK hold */
GstMiniObject *
track_dequeue_data_locked (GstAdaptiveDemux * demux,
GstAdaptiveDemuxTrack * track, gboolean check_sticky_events)
{
GstMiniObject *res = NULL;
gboolean is_pending_sticky = FALSE;
GstEvent *event;
GstClockTimeDiff running_time;
GstClockTimeDiff running_time_buffering = GST_CLOCK_STIME_NONE;
GstClockTimeDiff running_time_end;
gsize item_size = 0;
if (check_sticky_events) {
/* If there are any sticky events to send, do that before anything else */
event = gst_event_store_get_next_pending (&track->sticky_events);
if (event != NULL) {
res = (GstMiniObject *) event;
running_time_buffering = running_time = running_time_end =
GST_CLOCK_STIME_NONE;
GST_DEBUG_OBJECT (demux,
"track %s (period %u) dequeued pending sticky event %" GST_PTR_FORMAT,
track->stream_id, track->period_num, event);
is_pending_sticky = TRUE;
goto handle_event;
}
}
do {
TrackQueueItem item;
/* If we're filling a gap, generate a gap event */
if (track->gap_position != GST_CLOCK_TIME_NONE) {
GstClockTime pos = track->gap_position;
GstClockTime duration = track->gap_duration;
if (duration > 100 * GST_MSECOND) {
duration = 100 * GST_MSECOND;
track->gap_position += duration;
track->gap_duration -= duration;
} else {
/* Duration dropped below 100 ms, this is the last
* gap of the sequence */
track->gap_position = GST_CLOCK_TIME_NONE;
track->gap_duration = GST_CLOCK_TIME_NONE;
}
res = (GstMiniObject *) gst_event_new_gap (pos, duration);
if (track->output_segment.rate > 0.0) {
running_time = my_segment_to_running_time (&track->output_segment, pos);
running_time_buffering = running_time_end =
my_segment_to_running_time (&track->output_segment, pos + duration);
} else {
running_time =
my_segment_to_running_time (&track->output_segment, pos + duration);
running_time_buffering = running_time_end =
my_segment_to_running_time (&track->output_segment, pos);
}
item_size = 0;
break;
}
/* Otherwise, try and pop something from the item queue */
if (!track_dequeue_item_locked (demux, track, &item))
return NULL;
res = item.item;
running_time = item.runningtime;
running_time_end = item.runningtime_end;
running_time_buffering = item.runningtime_buffering;
item_size = item.size;
/* Special case for a gap event, to drain them out little-by-little.
* See if it can be output directly, otherwise set up to fill a gap and loop again */
if (GST_IS_EVENT (res) && GST_EVENT_TYPE (res) == GST_EVENT_GAP
&& GST_CLOCK_STIME_IS_VALID (running_time)) {
GstClockTime pos, duration;
GstClockTime cstart, cstop;
gst_event_parse_gap (GST_EVENT_CAST (res), &pos, &duration);
/* Handle a track with no duration as 0 duration. This can only
* happen if an element in parsebin emits such a gap event */
if (duration == GST_CLOCK_TIME_NONE)
duration = 0;
/* We *can* end up with a gap outside of the segment range due to segment
* base updating when (re)activating a track. In that case, just let the gap
* event flow out normally.
* Otherwise, this gap crosses into the segment, clip it to the ends and set up to fill the gap */
if (!gst_segment_clip (&track->output_segment, GST_FORMAT_TIME, pos,
pos + duration, &cstart, &cstop))
break;
pos = cstart;
duration = cstop - cstart;
GST_DEBUG_OBJECT (demux,
"track %s (period %u) Starting gap for runningtime %" GST_STIME_FORMAT
" - clipped position %" GST_TIME_FORMAT " duration %" GST_TIME_FORMAT,
track->stream_id, track->period_num, GST_STIME_ARGS (running_time),
GST_TIME_ARGS (pos), GST_TIME_ARGS (duration));
track->gap_position = pos;
track->gap_duration = duration;
gst_mini_object_unref (res);
res = NULL;
continue;
}
} while (res == NULL);
handle_event:
if (GST_IS_EVENT (res)) {
event = (GstEvent *) res;
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_SEGMENT:
gst_event_copy_segment (event, &track->output_segment);
if (!GST_CLOCK_STIME_IS_VALID (track->output_time)) {
if (track->output_segment.rate > 0.0) {
track->output_time =
my_segment_to_running_time (&track->output_segment,
track->output_segment.start);
} else {
track->output_time =
my_segment_to_running_time (&track->output_segment,
track->output_segment.stop);
}
}
if (track->update_next_segment) {
GstClockTimeDiff global_output_position =
demux->priv->global_output_position;
GST_DEBUG ("track %s: Override segment for running time %"
GST_STIME_FORMAT " : %" GST_PTR_FORMAT, track->stream_id,
GST_STIME_ARGS (global_output_position), event);
gst_event_unref (event);
gst_segment_set_running_time (&track->output_segment, GST_FORMAT_TIME,
global_output_position);
event = gst_event_new_segment (&track->output_segment);
gst_event_set_seqnum (event, track->demux->priv->segment_seqnum);
res = (GstMiniObject *) event;
running_time = global_output_position;
track->update_next_segment = FALSE;
/* Replace the stored sticky event with this one */
is_pending_sticky = FALSE;
}
break;
default:
break;
}
/* Store any sticky event in the cache, unless this is already an event
* from the pending sticky_events store */
if (!is_pending_sticky && GST_EVENT_IS_STICKY (event)) {
GST_DEBUG_OBJECT (demux,
"track %s Storing sticky event %" GST_PTR_FORMAT,
track->stream_id, event);
gst_event_store_insert_event (&track->sticky_events, event, FALSE);
}
}
/* Update track buffering levels */
if (GST_CLOCK_STIME_IS_VALID (running_time_buffering)) {
track->output_time = running_time_buffering;
GST_LOG_OBJECT (demux,
"track %s buffering time:%" GST_STIME_FORMAT,
track->stream_id, GST_STIME_ARGS (running_time_buffering));
gst_adaptive_demux_track_update_level_locked (track);
} else {
GST_LOG_OBJECT (demux, "track %s popping untimed item %" GST_PTR_FORMAT,
track->stream_id, res);
}
track->level_bytes -= item_size;
return res;
}
void
gst_adaptive_demux_track_drain_to (GstAdaptiveDemuxTrack * track,
GstClockTime drain_running_time)
{
GstAdaptiveDemux *demux = track->demux;
GST_DEBUG_OBJECT (demux,
"Track '%s' draining to running time %" GST_STIME_FORMAT,
track->stream_id, GST_STIME_ARGS (drain_running_time));
while (track->next_position == GST_CLOCK_STIME_NONE ||
track->next_position < drain_running_time) {
TrackQueueItem *item;
GstMiniObject *next_mo = NULL;
/* If we're in a gap, and the end time is after the target running time,
* exit */
if (track->gap_position != GST_CLOCK_TIME_NONE) {
GstClockTimeDiff running_time_end;
GstClockTimeDiff gap_end = track->gap_position;
/* In reverse playback, the start of the gap is the highest
* running time, so only add duration for forward play */
if (track->output_segment.rate > 0)
gap_end += track->gap_duration;
running_time_end =
my_segment_to_running_time (&track->output_segment, gap_end);
if (running_time_end >= drain_running_time) {
GST_DEBUG_OBJECT (demux,
"Track '%s' drained to GAP with running time %" GST_STIME_FORMAT,
track->stream_id, GST_STIME_ARGS (running_time_end));
return;
}
/* Otherwise this gap is complete, so skip it */
track->gap_position = GST_CLOCK_STIME_NONE;
}
/* Otherwise check what's enqueued */
item = gst_queue_array_peek_head_struct (track->queue);
/* track is empty, we're done */
if (item == NULL) {
GST_DEBUG_OBJECT (demux, "Track '%s' completely drained",
track->stream_id);
return;
}
/* If the item has a running time, and it's after the drain_running_time
* we're done. */
if (item->runningtime != GST_CLOCK_STIME_NONE
&& item->runningtime >= drain_running_time) {
GST_DEBUG_OBJECT (demux, "Track '%s' drained to item %" GST_PTR_FORMAT
" with running time %" GST_STIME_FORMAT,
track->stream_id, item->item, GST_STIME_ARGS (item->runningtime));
return;
}
GST_DEBUG_OBJECT (demux, "Track '%s' discarding %" GST_PTR_FORMAT
" with running time %" GST_STIME_FORMAT,
track->stream_id, item->item, GST_STIME_ARGS (item->runningtime));
/* Dequeue the item and discard. Sticky events
* will be collected by the dequeue function, gaps will be started.
* If it's a buffer, mark the track as discont to get the flag set
* on the next output buffer */
next_mo = track_dequeue_data_locked (demux, track, FALSE);
if (GST_IS_BUFFER (next_mo)) {
track->output_discont = TRUE;
}
gst_mini_object_unref (next_mo);
gst_adaptive_demux_track_update_next_position (track);
}
GST_DEBUG_OBJECT (demux,
"Track '%s' drained to running time %" GST_STIME_FORMAT, track->stream_id,
GST_STIME_ARGS (track->next_position));
}
static inline GstClockTimeDiff
my_segment_to_running_time (GstSegment * segment, GstClockTime val)
{
GstClockTimeDiff res = GST_CLOCK_STIME_NONE;
if (GST_CLOCK_TIME_IS_VALID (val)) {
gboolean sign =
gst_segment_to_running_time_full (segment, GST_FORMAT_TIME, val, &val);
if (sign > 0)
res = val;
else if (sign < 0)
res = -val;
}
return res;
}
/* Queues an item on a track queue and updates the buffering levels
* TRACKS_LOCK hold */
static void
track_queue_data_locked (GstAdaptiveDemux * demux,
GstAdaptiveDemuxTrack * track, GstMiniObject * object, gsize size,
GstClockTime timestamp, GstClockTime duration, gboolean is_discont)
{
TrackQueueItem item;
item.item = object;
item.size = size;
item.runningtime = GST_CLOCK_STIME_NONE;
item.runningtime_end = GST_CLOCK_STIME_NONE;
item.runningtime_buffering = GST_CLOCK_STIME_NONE;
if (timestamp != GST_CLOCK_TIME_NONE) {
GstClockTimeDiff input_time;
/* Set the running time of the item */
input_time = item.runningtime_end = item.runningtime =
my_segment_to_running_time (&track->input_segment, timestamp);
/* Update segment position (include duration if valid) */
track->input_segment.position = timestamp;
if (GST_CLOCK_TIME_IS_VALID (duration)) {
if (track->input_segment.rate > 0.0) {
/* Forward playback, add duration onto our position and update
* the input time to match */
track->input_segment.position += duration;
item.runningtime_end = input_time =
my_segment_to_running_time (&track->input_segment,
track->input_segment.position);
} else {
/* Otherwise, the end of the buffer has the smaller running time and
* we need to change the item.runningtime, but input_time and runningtime_end
* are already set to the larger running time */
item.runningtime = my_segment_to_running_time (&track->input_segment,
timestamp + duration);
}
}
/* Update track input time and level */
if (!GST_CLOCK_STIME_IS_VALID (track->lowest_input_time))
track->lowest_input_time = track->input_time;
if (track->input_segment.rate > 0.0) {
if (input_time > track->input_time) {
track->input_time = input_time;
}
} else {
/* In reverse playback, we track input time differently, to do buffering
* across the reversed GOPs. Each GOP arrives in reverse order, with
* running time moving backward, then jumping forward at the start of
* each GOP. At each point, we want the input time to be the lowest
* running time of the previous GOP. Therefore, we track input times
* into a different variable, and transfer it across when a discont buffer
* arrives */
if (is_discont) {
track->input_time = track->lowest_input_time;
track->lowest_input_time = input_time;
} else if (input_time < track->lowest_input_time) {
track->lowest_input_time = input_time;
}
}
/* Store the maximum running time we've seen as
* this item's "buffering running time" */
item.runningtime_buffering = track->input_time;
/* Configure the track output time if nothing was dequeued yet,
* so buffering level is updated correctly */
if (!GST_CLOCK_STIME_IS_VALID (track->output_time)) {
track->output_time = track->lowest_input_time;
GST_LOG_OBJECT (track->sinkpad,
"track %s (period %u) set output_time = lowest input_time = %"
GST_STIME_FORMAT, track->stream_id, track->period_num,
GST_STIME_ARGS (track->output_time));
}
gst_adaptive_demux_track_update_level_locked (track);
}
GST_LOG_OBJECT (track->sinkpad,
"track %s item running_time :%" GST_STIME_FORMAT " end :%"
GST_STIME_FORMAT, track->stream_id, GST_STIME_ARGS (item.runningtime),
GST_STIME_ARGS (item.runningtime_end));
track->level_bytes += size;
gst_queue_array_push_tail_struct (track->queue, &item);
/* If we were waiting for this track to add something, notify output thread */
/* FIXME: This should be in adaptive demux */
if (track->waiting_add) {
g_cond_signal (&demux->priv->tracks_add);
}
}
static GstFlowReturn
_track_sink_chain_function (GstPad * pad, GstObject * parent,
GstBuffer * buffer)
{
GstAdaptiveDemuxTrack *track = gst_pad_get_element_private (pad);
GstAdaptiveDemux *demux = track->demux;
GstClockTime ts;
GST_DEBUG_OBJECT (pad, "buffer %" GST_PTR_FORMAT, buffer);
TRACKS_LOCK (demux);
/* Discard buffers that are received outside of a valid segment. This can
* happen if a flushing seek (which resets the track segment seqnums) was
* received but the stream is still providing buffers before returning.
*/
if (track->input_segment_seqnum == GST_SEQNUM_INVALID) {
GST_DEBUG_OBJECT (pad,
"Dropping buffer because we do not have a valid input segment");
gst_buffer_unref (buffer);
TRACKS_UNLOCK (demux);
return GST_FLOW_OK;
}
ts = GST_BUFFER_DTS_OR_PTS (buffer);
/* Buffers coming out of parsebin *should* always be timestamped (it's the
* goal of parsebin after all). The tracks will use that (converted to
* running-time) in order to track position and buffering levels.
*
* Unfortunately there are valid cases were the parsers won't be able to
* timestamp all frames (due to the underlying formats or muxing). For those
* cases, we use the last incoming timestamp (via the track input GstSegment
* position):
*
* * If buffers were previously received, that segment position will
* correspond to the last timestamped-buffer PTS/DTS
*
* * If *no* buffers were previously received, the segment position *should*
* correspond to the valid initial position (in buffer timestamps). If not
* set, we need to bail out.
*/
if (!GST_CLOCK_TIME_IS_VALID (ts)) {
if (GST_CLOCK_TIME_IS_VALID (track->input_segment.position)) {
GST_WARNING_OBJECT (pad,
"buffer doesn't have any pts or dts, using segment position (%"
GST_TIME_FORMAT ")", GST_TIME_ARGS (track->input_segment.position));
ts = track->input_segment.position;
} else {
GST_ERROR_OBJECT (pad, "initial buffer doesn't have any pts or dts !");
gst_buffer_unref (buffer);
TRACKS_UNLOCK (demux);
return GST_FLOW_ERROR;
}
}
if (GST_CLOCK_TIME_IS_VALID (track->input_segment.position) &&
ts > track->input_segment.position &&
ts > track->input_segment.start &&
ts - track->input_segment.position > 100 * GST_MSECOND) {
GstClockTime duration = ts - track->input_segment.position;
GstEvent *gap = gst_event_new_gap (track->input_segment.position, duration);
/* Insert gap event to ensure coherent interleave */
GST_DEBUG_OBJECT (pad,
"Inserting gap for %" GST_TIME_FORMAT " vs %" GST_TIME_FORMAT,
GST_TIME_ARGS (ts), GST_TIME_ARGS (track->input_segment.position));
track_queue_data_locked (demux, track, (GstMiniObject *) gap, 0,
track->input_segment.position, duration, FALSE);
}
track_queue_data_locked (demux, track, (GstMiniObject *) buffer,
gst_buffer_get_size (buffer), ts, GST_BUFFER_DURATION (buffer),
GST_BUFFER_IS_DISCONT (buffer));
/* Recalculate buffering */
demux_update_buffering_locked (demux);
demux_post_buffering_locked (demux);
/* UNLOCK */
TRACKS_UNLOCK (demux);
return GST_FLOW_OK;
}
static gboolean
_track_sink_event_function (GstPad * pad, GstObject * parent, GstEvent * event)
{
GstAdaptiveDemuxTrack *track = gst_pad_get_element_private (pad);
GstAdaptiveDemux *demux = track->demux;
GstClockTime timestamp = GST_CLOCK_TIME_NONE;
GstClockTime duration = GST_CLOCK_TIME_NONE;
gboolean drop = FALSE;
gboolean is_discont = FALSE;
GST_DEBUG_OBJECT (pad, "event %" GST_PTR_FORMAT, event);
TRACKS_LOCK (demux);
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_STREAM_COLLECTION:
{
/* Replace upstream collection with demux collection */
GST_DEBUG_OBJECT (pad, "Dropping stream-collection, we send our own");
drop = TRUE;
break;
}
case GST_EVENT_STREAM_START:
{
GST_DEBUG_OBJECT (pad, "Dropping stream-start, we send our own");
if (track->eos) {
gint i, len;
/* Find and drop latest EOS if present */
len = gst_queue_array_get_length (track->queue);
for (i = len - 1; i >= 0; i--) {
TrackQueueItem *item =
gst_queue_array_peek_nth_struct (track->queue, i);
if (GST_IS_EVENT (item->item)
&& GST_EVENT_TYPE (item->item) == GST_EVENT_EOS) {
TrackQueueItem sub;
GST_DEBUG_OBJECT (pad, "Removing previously received EOS (pos:%d)",
i);
if (gst_queue_array_drop_struct (track->queue, i, &sub))
gst_mini_object_unref (sub.item);
break;
}
}
track->eos = FALSE;
}
drop = TRUE;
break;
}
case GST_EVENT_EOS:
{
if (track->pending_srcpad != NULL) {
GST_DEBUG_OBJECT (pad,
"Dropping EOS because we have a pending pad switch");
drop = TRUE;
} else {
track->eos = TRUE;
}
break;
}
case GST_EVENT_FLUSH_STOP:
case GST_EVENT_FLUSH_START:
{
/* Drop flush events */
drop = TRUE;
break;
}
default:
break;
}
if (drop || !GST_EVENT_IS_SERIALIZED (event)) {
GST_DEBUG_OBJECT (pad, "dropping event %s", GST_EVENT_TYPE_NAME (event));
gst_event_unref (event);
TRACKS_UNLOCK (demux);
/* Silently "accept" them */
return TRUE;
}
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_SEGMENT:
{
guint64 seg_seqnum = gst_event_get_seqnum (event);
if (track->input_segment_seqnum == seg_seqnum) {
GST_DEBUG_OBJECT (pad, "Ignoring duplicate segment");
gst_event_unref (event);
TRACKS_UNLOCK (demux);
return TRUE;
}
if (seg_seqnum != demux->priv->segment_seqnum) {
GST_DEBUG_OBJECT (pad, "Ignoring non-current segment");
gst_event_unref (event);
TRACKS_UNLOCK (demux);
return TRUE;
}
track->input_segment_seqnum = seg_seqnum;
gst_event_copy_segment (event, &track->input_segment);
if (track->input_segment.rate >= 0)
track->input_segment.position = track->input_segment.start;
else
track->input_segment.position = track->input_segment.stop;
GST_DEBUG_OBJECT (pad, "track %s stored segment %" GST_SEGMENT_FORMAT,
track->stream_id, &track->input_segment);
timestamp = track->input_segment.position;
is_discont = TRUE;
break;
}
case GST_EVENT_GAP:
{
gst_event_parse_gap (event, &timestamp, &duration);
if (!GST_CLOCK_TIME_IS_VALID (timestamp)) {
GST_DEBUG_OBJECT (pad, "Dropping gap event with invalid timestamp");
goto drop_ok;
}
break;
}
default:
break;
}
track_queue_data_locked (demux, track, (GstMiniObject *) event, 0,
timestamp, duration, is_discont);
/* Recalculate buffering */
demux_update_buffering_locked (demux);
demux_post_buffering_locked (demux);
TRACKS_UNLOCK (demux);
return TRUE;
/* errors */
drop_ok:
{
gst_event_unref (event);
TRACKS_UNLOCK (demux);
return TRUE;
}
}
static void
track_sinkpad_unlinked_cb (GstPad * sinkpad, GstPad * parsebin_srcpad,
GstAdaptiveDemuxTrack * track)
{
GST_DEBUG_OBJECT (sinkpad, "Got unlinked from %s:%s",
GST_DEBUG_PAD_NAME (parsebin_srcpad));
if (track->pending_srcpad) {
GST_DEBUG_OBJECT (sinkpad, "linking to pending pad %s:%s",
GST_DEBUG_PAD_NAME (track->pending_srcpad));
if (gst_pad_link (track->pending_srcpad, sinkpad) != GST_PAD_LINK_OK) {
GST_ERROR_OBJECT (sinkpad, "could not link pending pad !");
}
gst_object_unref (track->pending_srcpad);
track->pending_srcpad = NULL;
}
}
/* TRACKS_LOCK held
* Call this to update the track next_position with timed data */
void
gst_adaptive_demux_track_update_next_position (GstAdaptiveDemuxTrack * track)
{
guint i, len;
/* If filling a gap, the next position is the gap position */
if (track->gap_position != GST_CLOCK_TIME_NONE) {
track->next_position =
my_segment_to_running_time (&track->output_segment,
track->gap_position);
return;
}
len = gst_queue_array_get_length (track->queue);
for (i = 0; i < len; i++) {
TrackQueueItem *item = gst_queue_array_peek_nth_struct (track->queue, i);
if (item->runningtime != GST_CLOCK_STIME_NONE) {
GST_DEBUG_OBJECT (track->demux,
"Track '%s' next position %" GST_STIME_FORMAT, track->stream_id,
GST_STIME_ARGS (item->runningtime));
track->next_position = item->runningtime;
return;
}
}
track->next_position = GST_CLOCK_STIME_NONE;
GST_DEBUG_OBJECT (track->demux,
"Track '%s' doesn't have any pending timed data", track->stream_id);
}
/* TRACKS_LOCK held. Recomputes the level_time for the track */
void
gst_adaptive_demux_track_update_level_locked (GstAdaptiveDemuxTrack * track)
{
GstAdaptiveDemux *demux = track->demux;
GstClockTimeDiff output_time;
if (GST_CLOCK_STIME_IS_VALID (track->output_time))
output_time = MAX (track->output_time, demux->priv->global_output_position);
else
output_time = MIN (track->input_time, demux->priv->global_output_position);
if (track->input_time >= output_time)
track->level_time = track->input_time - output_time;
else
track->level_time = 0;
GST_LOG_OBJECT (track->sinkpad,
"track %s (period %u) input_time:%" GST_STIME_FORMAT " output_time:%"
GST_STIME_FORMAT " level:%" GST_TIME_FORMAT,
track->stream_id, track->period_num, GST_STIME_ARGS (track->input_time),
GST_STIME_ARGS (track->output_time), GST_TIME_ARGS (track->level_time));
}
static void
_demux_track_free (GstAdaptiveDemuxTrack * track)
{
GST_DEBUG_OBJECT (track->demux, "freeing track %p '%s'", track,
track->stream_id);
g_free (track->stream_id);
g_free (track->upstream_stream_id);
if (track->pending_srcpad)
gst_object_unref (track->pending_srcpad);
if (track->generic_caps)
gst_caps_unref (track->generic_caps);
gst_object_unref (track->stream_object);
if (track->tags)
gst_tag_list_unref (track->tags);
gst_queue_array_free (track->queue);
gst_event_store_deinit (&track->sticky_events);
if (track->element != NULL) {
gst_element_set_state (track->element, GST_STATE_NULL);
gst_bin_remove (GST_BIN_CAST (track->demux), track->element);
}
g_free (track);
}
GstAdaptiveDemuxTrack *
gst_adaptive_demux_track_ref (GstAdaptiveDemuxTrack * track)
{
g_return_val_if_fail (track != NULL, NULL);
GST_TRACE ("%p %d -> %d", track, track->ref_count, track->ref_count + 1);
g_atomic_int_inc (&track->ref_count);
return track;
}
void
gst_adaptive_demux_track_unref (GstAdaptiveDemuxTrack * track)
{
g_return_if_fail (track != NULL);
GST_TRACE ("%p %d -> %d", track, track->ref_count, track->ref_count - 1);
if (g_atomic_int_dec_and_test (&track->ref_count)) {
_demux_track_free (track);
}
}
static void
_track_queue_item_clear (TrackQueueItem * item)
{
if (item->item) {
gst_mini_object_unref ((GstMiniObject *) item->item);
item->item = NULL;
}
}
/* Internal function which actually adds the elements to the demuxer */
gboolean
gst_adaptive_demux_track_add_elements (GstAdaptiveDemuxTrack * track,
guint period_num)
{
GstAdaptiveDemux *demux = track->demux;
gchar *internal_name;
guint i, len;
/* Store the period number for debugging output */
track->period_num = period_num;
internal_name =
g_strdup_printf ("track-period%d-%s", period_num, track->stream_id);
len = strlen (internal_name);
for (i = 0; i < len; i++)
if (internal_name[i] == ' ')
internal_name[i] = '_';
track->element = gst_bin_new (internal_name);
g_free (internal_name);
internal_name =
g_strdup_printf ("track-period%d-sink-%s", period_num, track->stream_id);
len = strlen (internal_name);
for (i = 0; i < len; i++)
if (internal_name[i] == ' ')
internal_name[i] = '_';
track->sinkpad = gst_pad_new (internal_name, GST_PAD_SINK);
g_signal_connect (track->sinkpad, "unlinked",
(GCallback) track_sinkpad_unlinked_cb, track);
g_free (internal_name);
gst_element_add_pad (GST_ELEMENT_CAST (track->element), track->sinkpad);
gst_pad_set_element_private (track->sinkpad, track);
gst_pad_set_chain_function (track->sinkpad, _track_sink_chain_function);
gst_pad_set_event_function (track->sinkpad, _track_sink_event_function);
gst_pad_set_query_function (track->sinkpad, _track_sink_query_function);
if (!gst_bin_add (GST_BIN_CAST (demux), track->element)) {
track->element = NULL;
return FALSE;
}
gst_element_sync_state_with_parent (track->element);
return TRUE;
}
/**
* gst_adaptive_demux_track_new:
* @demux: a #GstAdaptiveDemux
* @type: a #GstStreamType
* @flags: a #GstStreamFlags
* @stream_id: (transfer none): The stream id for the new track
* @caps: (transfer full): The caps for the track
* @tags: (allow-none) (transfer full): The tags for the track
*
* Create and register a new #GstAdaptiveDemuxTrack
*
* Returns: (transfer none) The new track
*/
GstAdaptiveDemuxTrack *
gst_adaptive_demux_track_new (GstAdaptiveDemux * demux,
GstStreamType type,
GstStreamFlags flags, gchar * stream_id, GstCaps * caps, GstTagList * tags)
{
GstAdaptiveDemuxTrack *track;
g_return_val_if_fail (stream_id != NULL, NULL);
g_return_val_if_fail (type && type != GST_STREAM_TYPE_UNKNOWN, NULL);
GST_DEBUG_OBJECT (demux, "type:%s stream_id:%s caps:%" GST_PTR_FORMAT,
gst_stream_type_get_name (type), stream_id, caps);
track = g_new0 (GstAdaptiveDemuxTrack, 1);
g_atomic_int_set (&track->ref_count, 1);
track->demux = demux;
track->type = type;
track->flags = flags;
track->stream_id = g_strdup (stream_id);
track->period_num = (guint) (-1);
track->generic_caps = caps;
track->stream_object = gst_stream_new (stream_id, caps, type, flags);
if (tags) {
track->tags = gst_tag_list_ref (tags);
gst_stream_set_tags (track->stream_object, tags);
}
track->selected = FALSE;
track->active = FALSE;
track->draining = FALSE;
track->queue = gst_queue_array_new_for_struct (sizeof (TrackQueueItem), 50);
gst_queue_array_set_clear_func (track->queue,
(GDestroyNotify) _track_queue_item_clear);
gst_event_store_init (&track->sticky_events);
track->waiting_add = TRUE;
/* We have no fragment duration yet, so the buffering threshold is just the
* low watermark in time for now */
GST_OBJECT_LOCK (demux);
track->buffering_threshold = demux->buffering_low_watermark_time;
GST_OBJECT_UNLOCK (demux);
gst_segment_init (&track->input_segment, GST_FORMAT_TIME);
track->input_time = 0;
track->input_segment_seqnum = GST_SEQNUM_INVALID;
gst_segment_init (&track->output_segment, GST_FORMAT_TIME);
track->gap_position = track->gap_duration = GST_CLOCK_TIME_NONE;
track->output_time = GST_CLOCK_STIME_NONE;
track->next_position = GST_CLOCK_STIME_NONE;
track->update_next_segment = FALSE;
track->level_bytes = 0;
track->level_time = 0;
return track;
}