gstreamer/subprojects/gst-plugins-good/ext/adaptivedemux2/gstadaptivedemux-track.c
Jan Schmidt a03f3df626 adaptivedemux2: Rework input download wakeups
Change the way streams are woken up to download more data.

Instead of checking the level on tracks that are being
output as data is dequeued, calculate a 'wakeup time'
at which it should download more data, and wake up
the stream when the global output position crosses
that threshold.

For efficiency, compute the earliest wakeup time
for all streams and store it on the period, so the
output loop can quickly check only a single value
to decide if something needs waking up.

Does the same buffering as the previous method,
but ensures that as we approach the end of
one period, the next period continues incrementally
downloading data so that it is fully buffered when
the period starts.

Fixes issues with multi-period VOD content where
download of the second period resumes only after
the first period is completely drained.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/3055>
2022-09-20 19:48:17 +00:00

966 lines
31 KiB
C

/* GStreamer
*
* Copyright (C) 2014 Samsung Electronics. All rights reserved.
* Author: Thiago Santos <thiagoss@osg.samsung.com>
*
* Copyright (C) 2021-2022 Centricular Ltd
* Author: Edward Hervey <edward@centricular.com>
* Author: Jan Schmidt <jan@centricular.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include "gstadaptivedemux.h"
#include "gstadaptivedemux-private.h"
GST_DEBUG_CATEGORY_EXTERN (adaptivedemux2_debug);
#define GST_CAT_DEFAULT adaptivedemux2_debug
/* TRACKS_LOCK held
* Flushes all data in the track and resets it */
void
gst_adaptive_demux_track_flush (GstAdaptiveDemuxTrack * track)
{
GST_DEBUG_OBJECT (track->demux, "Flushing track '%s' with %u queued items",
track->stream_id, gst_queue_array_get_length (track->queue));
gst_queue_array_clear (track->queue);
gst_event_store_flush (&track->sticky_events);
gst_segment_init (&track->input_segment, GST_FORMAT_TIME);
track->lowest_input_time = GST_CLOCK_STIME_NONE;
track->input_time = 0;
track->input_segment_seqnum = GST_SEQNUM_INVALID;
gst_segment_init (&track->output_segment, GST_FORMAT_TIME);
track->gap_position = track->gap_duration = GST_CLOCK_TIME_NONE;
track->output_time = GST_CLOCK_STIME_NONE;
track->next_position = GST_CLOCK_STIME_NONE;
track->level_bytes = 0;
track->level_time = 0;
track->eos = FALSE;
track->update_next_segment = FALSE;
track->output_discont = FALSE;
}
static gboolean
_track_sink_query_function (GstPad * pad, GstObject * parent, GstQuery * query)
{
GstAdaptiveDemuxTrack *track = gst_pad_get_element_private (pad);
GstAdaptiveDemux *demux = track->demux;
gboolean ret = FALSE;
GST_DEBUG_OBJECT (pad, "query %" GST_PTR_FORMAT, query);
switch (GST_QUERY_TYPE (query)) {
case GST_QUERY_ACCEPT_CAPS:
/* Should we intersect by track caps as a safety check ? */
GST_DEBUG_OBJECT (demux, "We accept any caps on %s:%s",
GST_DEBUG_PAD_NAME (pad));
gst_query_set_accept_caps_result (query, TRUE);
ret = TRUE;
break;
default:
break;
}
return ret;
}
/* Dequeue an item from the track queue for processing
* TRACKS_LOCK hold */
static gboolean
track_dequeue_item_locked (GstAdaptiveDemux * demux,
GstAdaptiveDemuxTrack * track, TrackQueueItem * out_item)
{
TrackQueueItem *item = gst_queue_array_peek_head_struct (track->queue);
if (item == NULL)
return FALSE;
*out_item = *item;
gst_queue_array_pop_head (track->queue);
GST_LOG_OBJECT (demux,
"track %s (period %u) item running_time %" GST_STIME_FORMAT " end %"
GST_STIME_FORMAT, track->stream_id, track->period_num,
GST_STIME_ARGS (out_item->runningtime),
GST_STIME_ARGS (out_item->runningtime_end));
return TRUE;
}
static inline GstClockTimeDiff my_segment_to_running_time (GstSegment * segment,
GstClockTime val);
/* Dequeue or generate a buffer/event from the track queue and update the buffering levels
* TRACKS_LOCK hold */
GstMiniObject *
track_dequeue_data_locked (GstAdaptiveDemux * demux,
GstAdaptiveDemuxTrack * track, gboolean check_sticky_events)
{
GstMiniObject *res = NULL;
gboolean is_pending_sticky = FALSE;
GstEvent *event;
GstClockTimeDiff running_time;
GstClockTimeDiff running_time_buffering = GST_CLOCK_STIME_NONE;
GstClockTimeDiff running_time_end;
gsize item_size = 0;
if (check_sticky_events) {
/* If there are any sticky events to send, do that before anything else */
event = gst_event_store_get_next_pending (&track->sticky_events);
if (event != NULL) {
res = (GstMiniObject *) event;
running_time_buffering = running_time = running_time_end =
GST_CLOCK_STIME_NONE;
GST_DEBUG_OBJECT (demux,
"track %s (period %u) dequeued pending sticky event %" GST_PTR_FORMAT,
track->stream_id, track->period_num, event);
is_pending_sticky = TRUE;
goto handle_event;
}
}
do {
TrackQueueItem item;
/* If we're filling a gap, generate a gap event */
if (track->gap_position != GST_CLOCK_TIME_NONE) {
GstClockTime pos = track->gap_position;
GstClockTime duration = track->gap_duration;
if (duration > 100 * GST_MSECOND) {
duration = 100 * GST_MSECOND;
track->gap_position += duration;
track->gap_duration -= duration;
} else {
/* Duration dropped below 100 ms, this is the last
* gap of the sequence */
track->gap_position = GST_CLOCK_TIME_NONE;
track->gap_duration = GST_CLOCK_TIME_NONE;
}
res = (GstMiniObject *) gst_event_new_gap (pos, duration);
if (track->output_segment.rate > 0.0) {
running_time = my_segment_to_running_time (&track->output_segment, pos);
running_time_buffering = running_time_end =
my_segment_to_running_time (&track->output_segment, pos + duration);
} else {
running_time =
my_segment_to_running_time (&track->output_segment, pos + duration);
running_time_buffering = running_time_end =
my_segment_to_running_time (&track->output_segment, pos);
}
item_size = 0;
break;
}
/* Otherwise, try and pop something from the item queue */
if (!track_dequeue_item_locked (demux, track, &item))
return NULL;
res = item.item;
running_time = item.runningtime;
running_time_end = item.runningtime_end;
running_time_buffering = item.runningtime_buffering;
item_size = item.size;
/* Special case for a gap event, to drain them out little-by-little.
* See if it can be output directly, otherwise set up to fill a gap and loop again */
if (GST_IS_EVENT (res) && GST_EVENT_TYPE (res) == GST_EVENT_GAP
&& GST_CLOCK_STIME_IS_VALID (running_time)) {
GstClockTime pos, duration;
GstClockTime cstart, cstop;
gst_event_parse_gap (GST_EVENT_CAST (res), &pos, &duration);
/* Handle a track with no duration as 0 duration. This can only
* happen if an element in parsebin emits such a gap event */
if (duration == GST_CLOCK_TIME_NONE)
duration = 0;
/* We *can* end up with a gap outside of the segment range due to segment
* base updating when (re)activating a track. In that case, just let the gap
* event flow out normally.
* Otherwise, this gap crosses into the segment, clip it to the ends and set up to fill the gap */
if (!gst_segment_clip (&track->output_segment, GST_FORMAT_TIME, pos,
pos + duration, &cstart, &cstop))
break;
pos = cstart;
duration = cstop - cstart;
GST_DEBUG_OBJECT (demux,
"track %s (period %u) Starting gap for runningtime %" GST_STIME_FORMAT
" - clipped position %" GST_TIME_FORMAT " duration %" GST_TIME_FORMAT,
track->stream_id, track->period_num, GST_STIME_ARGS (running_time),
GST_TIME_ARGS (pos), GST_TIME_ARGS (duration));
track->gap_position = pos;
track->gap_duration = duration;
gst_mini_object_unref (res);
res = NULL;
continue;
}
} while (res == NULL);
handle_event:
if (GST_IS_EVENT (res)) {
event = (GstEvent *) res;
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_SEGMENT:
gst_event_copy_segment (event, &track->output_segment);
if (!GST_CLOCK_STIME_IS_VALID (track->output_time)) {
if (track->output_segment.rate > 0.0) {
track->output_time =
my_segment_to_running_time (&track->output_segment,
track->output_segment.start);
} else {
track->output_time =
my_segment_to_running_time (&track->output_segment,
track->output_segment.stop);
}
}
if (track->update_next_segment) {
GstClockTimeDiff global_output_position =
demux->priv->global_output_position;
GST_DEBUG ("track %s: Override segment for running time %"
GST_STIME_FORMAT " : %" GST_PTR_FORMAT, track->stream_id,
GST_STIME_ARGS (global_output_position), event);
gst_event_unref (event);
gst_segment_set_running_time (&track->output_segment, GST_FORMAT_TIME,
global_output_position);
event = gst_event_new_segment (&track->output_segment);
gst_event_set_seqnum (event, track->demux->priv->segment_seqnum);
res = (GstMiniObject *) event;
running_time = global_output_position;
track->update_next_segment = FALSE;
/* Replace the stored sticky event with this one */
is_pending_sticky = FALSE;
}
break;
default:
break;
}
/* Store any sticky event in the cache, unless this is already an event
* from the pending sticky_events store */
if (!is_pending_sticky && GST_EVENT_IS_STICKY (event)) {
GST_DEBUG_OBJECT (demux,
"track %s Storing sticky event %" GST_PTR_FORMAT,
track->stream_id, event);
gst_event_store_insert_event (&track->sticky_events, event, FALSE);
}
}
/* Update track buffering levels */
if (GST_CLOCK_STIME_IS_VALID (running_time_buffering)) {
track->output_time = running_time_buffering;
GST_LOG_OBJECT (demux,
"track %s buffering time:%" GST_STIME_FORMAT,
track->stream_id, GST_STIME_ARGS (running_time_buffering));
gst_adaptive_demux_track_update_level_locked (track);
} else {
GST_LOG_OBJECT (demux, "track %s popping untimed item %" GST_PTR_FORMAT,
track->stream_id, res);
}
track->level_bytes -= item_size;
return res;
}
void
gst_adaptive_demux_track_drain_to (GstAdaptiveDemuxTrack * track,
GstClockTime drain_running_time)
{
GstAdaptiveDemux *demux = track->demux;
GST_DEBUG_OBJECT (demux,
"Track '%s' draining to running time %" GST_STIME_FORMAT,
track->stream_id, GST_STIME_ARGS (drain_running_time));
while (track->next_position == GST_CLOCK_STIME_NONE ||
track->next_position < drain_running_time) {
TrackQueueItem *item;
GstMiniObject *next_mo = NULL;
/* If we're in a gap, and the end time is after the target running time,
* exit */
if (track->gap_position != GST_CLOCK_TIME_NONE) {
GstClockTimeDiff running_time_end;
GstClockTimeDiff gap_end = track->gap_position;
/* In reverse playback, the start of the gap is the highest
* running time, so only add duration for forward play */
if (track->output_segment.rate > 0)
gap_end += track->gap_duration;
running_time_end =
my_segment_to_running_time (&track->output_segment, gap_end);
if (running_time_end >= drain_running_time) {
GST_DEBUG_OBJECT (demux,
"Track '%s' drained to GAP with running time %" GST_STIME_FORMAT,
track->stream_id, GST_STIME_ARGS (running_time_end));
return;
}
/* Otherwise this gap is complete, so skip it */
track->gap_position = GST_CLOCK_STIME_NONE;
}
/* Otherwise check what's enqueued */
item = gst_queue_array_peek_head_struct (track->queue);
/* track is empty, we're done */
if (item == NULL) {
GST_DEBUG_OBJECT (demux, "Track '%s' completely drained",
track->stream_id);
return;
}
/* If the item has a running time, and it's after the drain_running_time
* we're done. */
if (item->runningtime != GST_CLOCK_STIME_NONE
&& item->runningtime >= drain_running_time) {
GST_DEBUG_OBJECT (demux, "Track '%s' drained to item %" GST_PTR_FORMAT
" with running time %" GST_STIME_FORMAT,
track->stream_id, item->item, GST_STIME_ARGS (item->runningtime));
return;
}
GST_DEBUG_OBJECT (demux, "Track '%s' discarding %" GST_PTR_FORMAT
" with running time %" GST_STIME_FORMAT,
track->stream_id, item->item, GST_STIME_ARGS (item->runningtime));
/* Dequeue the item and discard. Sticky events
* will be collected by the dequeue function, gaps will be started.
* If it's a buffer, mark the track as discont to get the flag set
* on the next output buffer */
next_mo = track_dequeue_data_locked (demux, track, FALSE);
if (GST_IS_BUFFER (next_mo)) {
track->output_discont = TRUE;
}
gst_mini_object_unref (next_mo);
gst_adaptive_demux_track_update_next_position (track);
}
GST_DEBUG_OBJECT (demux,
"Track '%s' drained to running time %" GST_STIME_FORMAT, track->stream_id,
GST_STIME_ARGS (track->next_position));
}
static inline GstClockTimeDiff
my_segment_to_running_time (GstSegment * segment, GstClockTime val)
{
GstClockTimeDiff res = GST_CLOCK_STIME_NONE;
if (GST_CLOCK_TIME_IS_VALID (val)) {
gboolean sign =
gst_segment_to_running_time_full (segment, GST_FORMAT_TIME, val, &val);
if (sign > 0)
res = val;
else if (sign < 0)
res = -val;
}
return res;
}
/* Queues an item on a track queue and updates the buffering levels
* TRACKS_LOCK hold */
static void
track_queue_data_locked (GstAdaptiveDemux * demux,
GstAdaptiveDemuxTrack * track, GstMiniObject * object, gsize size,
GstClockTime timestamp, GstClockTime duration, gboolean is_discont)
{
TrackQueueItem item;
item.item = object;
item.size = size;
item.runningtime = GST_CLOCK_STIME_NONE;
item.runningtime_end = GST_CLOCK_STIME_NONE;
item.runningtime_buffering = GST_CLOCK_STIME_NONE;
if (timestamp != GST_CLOCK_TIME_NONE) {
GstClockTimeDiff input_time;
/* Set the running time of the item */
input_time = item.runningtime_end = item.runningtime =
my_segment_to_running_time (&track->input_segment, timestamp);
/* Update segment position (include duration if valid) */
track->input_segment.position = timestamp;
if (GST_CLOCK_TIME_IS_VALID (duration)) {
if (track->input_segment.rate > 0.0) {
/* Forward playback, add duration onto our position and update
* the input time to match */
track->input_segment.position += duration;
item.runningtime_end = input_time =
my_segment_to_running_time (&track->input_segment,
track->input_segment.position);
} else {
/* Otherwise, the end of the buffer has the smaller running time and
* we need to change the item.runningtime, but input_time and runningtime_end
* are already set to the larger running time */
item.runningtime = my_segment_to_running_time (&track->input_segment,
timestamp + duration);
}
}
/* Update track input time and level */
if (!GST_CLOCK_STIME_IS_VALID (track->lowest_input_time))
track->lowest_input_time = track->input_time;
if (track->input_segment.rate > 0.0) {
if (input_time > track->input_time) {
track->input_time = input_time;
}
} else {
/* In reverse playback, we track input time differently, to do buffering
* across the reversed GOPs. Each GOP arrives in reverse order, with
* running time moving backward, then jumping forward at the start of
* each GOP. At each point, we want the input time to be the lowest
* running time of the previous GOP. Therefore, we track input times
* into a different variable, and transfer it across when a discont buffer
* arrives */
if (is_discont) {
track->input_time = track->lowest_input_time;
track->lowest_input_time = input_time;
} else if (input_time < track->lowest_input_time) {
track->lowest_input_time = input_time;
}
}
/* Store the maximum running time we've seen as
* this item's "buffering running time" */
item.runningtime_buffering = track->input_time;
/* Configure the track output time if nothing was dequeued yet,
* so buffering level is updated correctly */
if (!GST_CLOCK_STIME_IS_VALID (track->output_time)) {
track->output_time = track->lowest_input_time;
GST_LOG_OBJECT (track->sinkpad,
"track %s (period %u) set output_time = lowest input_time = %"
GST_STIME_FORMAT, track->stream_id, track->period_num,
GST_STIME_ARGS (track->output_time));
}
gst_adaptive_demux_track_update_level_locked (track);
}
GST_LOG_OBJECT (track->sinkpad,
"track %s item running_time :%" GST_STIME_FORMAT " end :%"
GST_STIME_FORMAT, track->stream_id, GST_STIME_ARGS (item.runningtime),
GST_STIME_ARGS (item.runningtime_end));
track->level_bytes += size;
gst_queue_array_push_tail_struct (track->queue, &item);
/* If we were waiting for this track to add something, notify output thread */
/* FIXME: This should be in adaptive demux */
if (track->waiting_add) {
g_cond_signal (&demux->priv->tracks_add);
}
}
static GstFlowReturn
_track_sink_chain_function (GstPad * pad, GstObject * parent,
GstBuffer * buffer)
{
GstAdaptiveDemuxTrack *track = gst_pad_get_element_private (pad);
GstAdaptiveDemux *demux = track->demux;
GstClockTime ts;
GST_DEBUG_OBJECT (pad, "buffer %" GST_PTR_FORMAT, buffer);
TRACKS_LOCK (demux);
ts = GST_BUFFER_DTS_OR_PTS (buffer);
/* Buffers coming out of parsebin *should* always be timestamped (it's the
* goal of parsebin after all). The tracks will use that (converted to
* running-time) in order to track position and buffering levels.
*
* Unfortunately there are valid cases were the parsers won't be able to
* timestamp all frames (due to the underlying formats or muxing). For those
* cases, we use the last incoming timestamp (via the track input GstSegment
* position):
*
* * If buffers were previously received, that segment position will
* correspond to the last timestamped-buffer PTS/DTS
*
* * If *no* buffers were previously received, the segment position *should*
* correspond to the valid initial position (in buffer timestamps). If not
* set, we need to bail out.
*/
if (!GST_CLOCK_TIME_IS_VALID (ts)) {
if (GST_CLOCK_TIME_IS_VALID (track->input_segment.position)) {
GST_WARNING_OBJECT (pad,
"buffer doesn't have any pts or dts, using segment position (%"
GST_TIME_FORMAT ")", GST_TIME_ARGS (track->input_segment.position));
ts = track->input_segment.position;
} else {
GST_ERROR_OBJECT (pad, "initial buffer doesn't have any pts or dts !");
gst_buffer_unref (buffer);
TRACKS_UNLOCK (demux);
return GST_FLOW_ERROR;
}
}
if (GST_CLOCK_TIME_IS_VALID (track->input_segment.position) &&
ts > track->input_segment.position &&
ts > track->input_segment.start &&
ts - track->input_segment.position > 100 * GST_MSECOND) {
GstClockTime duration = ts - track->input_segment.position;
GstEvent *gap = gst_event_new_gap (track->input_segment.position, duration);
/* Insert gap event to ensure coherent interleave */
GST_DEBUG_OBJECT (pad,
"Inserting gap for %" GST_TIME_FORMAT " vs %" GST_TIME_FORMAT,
GST_TIME_ARGS (ts), GST_TIME_ARGS (track->input_segment.position));
track_queue_data_locked (demux, track, (GstMiniObject *) gap, 0,
track->input_segment.position, duration, FALSE);
}
track_queue_data_locked (demux, track, (GstMiniObject *) buffer,
gst_buffer_get_size (buffer), ts, GST_BUFFER_DURATION (buffer),
GST_BUFFER_IS_DISCONT (buffer));
/* Recalculate buffering */
demux_update_buffering_locked (demux);
demux_post_buffering_locked (demux);
/* UNLOCK */
TRACKS_UNLOCK (demux);
return GST_FLOW_OK;
}
static gboolean
_track_sink_event_function (GstPad * pad, GstObject * parent, GstEvent * event)
{
GstAdaptiveDemuxTrack *track = gst_pad_get_element_private (pad);
GstAdaptiveDemux *demux = track->demux;
GstClockTime timestamp = GST_CLOCK_TIME_NONE;
GstClockTime duration = GST_CLOCK_TIME_NONE;
gboolean drop = FALSE;
gboolean is_discont = FALSE;
GST_DEBUG_OBJECT (pad, "event %" GST_PTR_FORMAT, event);
TRACKS_LOCK (demux);
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_STREAM_COLLECTION:
{
/* Replace upstream collection with demux collection */
GST_DEBUG_OBJECT (pad, "Dropping stream-collection, we send our own");
drop = TRUE;
break;
}
case GST_EVENT_STREAM_START:
{
GST_DEBUG_OBJECT (pad, "Dropping stream-start, we send our own");
if (track->eos) {
gint i, len;
/* Find and drop latest EOS if present */
len = gst_queue_array_get_length (track->queue);
for (i = len - 1; i >= 0; i--) {
TrackQueueItem *item =
gst_queue_array_peek_nth_struct (track->queue, i);
if (GST_IS_EVENT (item->item)
&& GST_EVENT_TYPE (item->item) == GST_EVENT_EOS) {
TrackQueueItem sub;
GST_DEBUG_OBJECT (pad, "Removing previously received EOS (pos:%d)",
i);
if (gst_queue_array_drop_struct (track->queue, i, &sub))
gst_mini_object_unref (sub.item);
break;
}
}
track->eos = FALSE;
}
drop = TRUE;
break;
}
case GST_EVENT_EOS:
{
if (track->pending_srcpad != NULL) {
GST_DEBUG_OBJECT (pad,
"Dropping EOS because we have a pending pad switch");
drop = TRUE;
} else {
track->eos = TRUE;
}
break;
}
case GST_EVENT_FLUSH_STOP:
case GST_EVENT_FLUSH_START:
{
/* Drop flush events */
drop = TRUE;
break;
}
default:
break;
}
if (drop || !GST_EVENT_IS_SERIALIZED (event)) {
GST_DEBUG_OBJECT (pad, "dropping event %s", GST_EVENT_TYPE_NAME (event));
gst_event_unref (event);
TRACKS_UNLOCK (demux);
/* Silently "accept" them */
return TRUE;
}
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_SEGMENT:
{
guint64 seg_seqnum = gst_event_get_seqnum (event);
if (track->input_segment_seqnum == seg_seqnum) {
GST_DEBUG_OBJECT (pad, "Ignoring duplicate segment");
gst_event_unref (event);
TRACKS_UNLOCK (demux);
return TRUE;
}
track->input_segment_seqnum = seg_seqnum;
gst_event_copy_segment (event, &track->input_segment);
if (track->input_segment.rate >= 0)
track->input_segment.position = track->input_segment.start;
else
track->input_segment.position = track->input_segment.stop;
GST_DEBUG_OBJECT (pad, "track %s stored segment %" GST_SEGMENT_FORMAT,
track->stream_id, &track->input_segment);
timestamp = track->input_segment.position;
is_discont = TRUE;
break;
}
case GST_EVENT_GAP:
{
gst_event_parse_gap (event, &timestamp, &duration);
if (!GST_CLOCK_TIME_IS_VALID (timestamp)) {
GST_DEBUG_OBJECT (pad, "Dropping gap event with invalid timestamp");
goto drop_ok;
}
break;
}
default:
break;
}
track_queue_data_locked (demux, track, (GstMiniObject *) event, 0,
timestamp, duration, is_discont);
/* Recalculate buffering */
demux_update_buffering_locked (demux);
demux_post_buffering_locked (demux);
TRACKS_UNLOCK (demux);
return TRUE;
/* errors */
drop_ok:
{
gst_event_unref (event);
TRACKS_UNLOCK (demux);
return TRUE;
}
}
static void
track_sinkpad_unlinked_cb (GstPad * sinkpad, GstPad * parsebin_srcpad,
GstAdaptiveDemuxTrack * track)
{
GST_DEBUG_OBJECT (sinkpad, "Got unlinked from %s:%s",
GST_DEBUG_PAD_NAME (parsebin_srcpad));
if (track->pending_srcpad) {
GST_DEBUG_OBJECT (sinkpad, "linking to pending pad %s:%s",
GST_DEBUG_PAD_NAME (track->pending_srcpad));
if (gst_pad_link (track->pending_srcpad, sinkpad) != GST_PAD_LINK_OK) {
GST_ERROR_OBJECT (sinkpad, "could not link pending pad !");
}
gst_object_unref (track->pending_srcpad);
track->pending_srcpad = NULL;
}
}
/* TRACKS_LOCK held
* Call this to update the track next_position with timed data */
void
gst_adaptive_demux_track_update_next_position (GstAdaptiveDemuxTrack * track)
{
guint i, len;
/* If filling a gap, the next position is the gap position */
if (track->gap_position != GST_CLOCK_TIME_NONE) {
track->next_position =
my_segment_to_running_time (&track->output_segment,
track->gap_position);
return;
}
len = gst_queue_array_get_length (track->queue);
for (i = 0; i < len; i++) {
TrackQueueItem *item = gst_queue_array_peek_nth_struct (track->queue, i);
if (item->runningtime != GST_CLOCK_STIME_NONE) {
GST_DEBUG_OBJECT (track->demux,
"Track '%s' next position %" GST_STIME_FORMAT, track->stream_id,
GST_STIME_ARGS (item->runningtime));
track->next_position = item->runningtime;
return;
}
}
track->next_position = GST_CLOCK_STIME_NONE;
GST_DEBUG_OBJECT (track->demux,
"Track '%s' doesn't have any pending timed data", track->stream_id);
}
/* TRACKS_LOCK held. Recomputes the level_time for the track */
void
gst_adaptive_demux_track_update_level_locked (GstAdaptiveDemuxTrack * track)
{
GstAdaptiveDemux *demux = track->demux;
GstClockTimeDiff output_time;
if (GST_CLOCK_STIME_IS_VALID (track->output_time))
output_time = MAX (track->output_time, demux->priv->global_output_position);
else
output_time = MIN (track->input_time, demux->priv->global_output_position);
if (track->input_time >= output_time)
track->level_time = track->input_time - output_time;
else
track->level_time = 0;
GST_LOG_OBJECT (track->sinkpad,
"track %s (period %u) input_time:%" GST_STIME_FORMAT " output_time:%"
GST_STIME_FORMAT " level:%" GST_TIME_FORMAT,
track->stream_id, track->period_num, GST_STIME_ARGS (track->input_time),
GST_STIME_ARGS (track->output_time), GST_TIME_ARGS (track->level_time));
}
static void
_demux_track_free (GstAdaptiveDemuxTrack * track)
{
GST_DEBUG_OBJECT (track->demux, "freeing track %p '%s'", track,
track->stream_id);
g_free (track->stream_id);
g_free (track->upstream_stream_id);
if (track->pending_srcpad)
gst_object_unref (track->pending_srcpad);
if (track->generic_caps)
gst_caps_unref (track->generic_caps);
gst_object_unref (track->stream_object);
if (track->tags)
gst_tag_list_unref (track->tags);
gst_queue_array_free (track->queue);
gst_event_store_deinit (&track->sticky_events);
if (track->element != NULL) {
gst_element_set_state (track->element, GST_STATE_NULL);
gst_bin_remove (GST_BIN_CAST (track->demux), track->element);
}
g_free (track);
}
GstAdaptiveDemuxTrack *
gst_adaptive_demux_track_ref (GstAdaptiveDemuxTrack * track)
{
g_return_val_if_fail (track != NULL, NULL);
GST_TRACE ("%p %d -> %d", track, track->ref_count, track->ref_count + 1);
g_atomic_int_inc (&track->ref_count);
return track;
}
void
gst_adaptive_demux_track_unref (GstAdaptiveDemuxTrack * track)
{
g_return_if_fail (track != NULL);
GST_TRACE ("%p %d -> %d", track, track->ref_count, track->ref_count - 1);
if (g_atomic_int_dec_and_test (&track->ref_count)) {
_demux_track_free (track);
}
}
static void
_track_queue_item_clear (TrackQueueItem * item)
{
if (item->item) {
gst_mini_object_unref ((GstMiniObject *) item->item);
item->item = NULL;
}
}
/* Internal function which actually adds the elements to the demuxer */
gboolean
gst_adaptive_demux_track_add_elements (GstAdaptiveDemuxTrack * track,
guint period_num)
{
GstAdaptiveDemux *demux = track->demux;
gchar *internal_name;
guint i, len;
/* Store the period number for debugging output */
track->period_num = period_num;
internal_name =
g_strdup_printf ("track-period%d-%s", period_num, track->stream_id);
len = strlen (internal_name);
for (i = 0; i < len; i++)
if (internal_name[i] == ' ')
internal_name[i] = '_';
track->element = gst_bin_new (internal_name);
g_free (internal_name);
internal_name =
g_strdup_printf ("track-period%d-sink-%s", period_num, track->stream_id);
len = strlen (internal_name);
for (i = 0; i < len; i++)
if (internal_name[i] == ' ')
internal_name[i] = '_';
track->sinkpad = gst_pad_new (internal_name, GST_PAD_SINK);
g_signal_connect (track->sinkpad, "unlinked",
(GCallback) track_sinkpad_unlinked_cb, track);
g_free (internal_name);
gst_element_add_pad (GST_ELEMENT_CAST (track->element), track->sinkpad);
gst_pad_set_element_private (track->sinkpad, track);
gst_pad_set_chain_function (track->sinkpad, _track_sink_chain_function);
gst_pad_set_event_function (track->sinkpad, _track_sink_event_function);
gst_pad_set_query_function (track->sinkpad, _track_sink_query_function);
if (!gst_bin_add (GST_BIN_CAST (demux), track->element)) {
track->element = NULL;
return FALSE;
}
gst_element_sync_state_with_parent (track->element);
return TRUE;
}
/**
* gst_adaptive_demux_track_new:
* @demux: a #GstAdaptiveDemux
* @type: a #GstStreamType
* @flags: a #GstStreamFlags
* @stream_id: (transfer none): The stream id for the new track
* @caps: (transfer full): The caps for the track
* @tags: (allow-none) (transfer full): The tags for the track
*
* Create and register a new #GstAdaptiveDemuxTrack
*
* Returns: (transfer none) The new track
*/
GstAdaptiveDemuxTrack *
gst_adaptive_demux_track_new (GstAdaptiveDemux * demux,
GstStreamType type,
GstStreamFlags flags, gchar * stream_id, GstCaps * caps, GstTagList * tags)
{
GstAdaptiveDemuxTrack *track;
g_return_val_if_fail (stream_id != NULL, NULL);
g_return_val_if_fail (type && type != GST_STREAM_TYPE_UNKNOWN, NULL);
GST_DEBUG_OBJECT (demux, "type:%s stream_id:%s caps:%" GST_PTR_FORMAT,
gst_stream_type_get_name (type), stream_id, caps);
track = g_new0 (GstAdaptiveDemuxTrack, 1);
g_atomic_int_set (&track->ref_count, 1);
track->demux = demux;
track->type = type;
track->flags = flags;
track->stream_id = g_strdup (stream_id);
track->period_num = (guint) (-1);
track->generic_caps = caps;
track->stream_object = gst_stream_new (stream_id, caps, type, flags);
if (tags) {
track->tags = gst_tag_list_ref (tags);
gst_stream_set_tags (track->stream_object, tags);
}
track->selected = FALSE;
track->active = FALSE;
track->draining = FALSE;
track->queue = gst_queue_array_new_for_struct (sizeof (TrackQueueItem), 50);
gst_queue_array_set_clear_func (track->queue,
(GDestroyNotify) _track_queue_item_clear);
gst_event_store_init (&track->sticky_events);
track->waiting_add = TRUE;
/* We have no fragment duration yet, so the buffering threshold is just the
* low watermark in time for now */
GST_OBJECT_LOCK (demux);
track->buffering_threshold = demux->buffering_low_watermark_time;
GST_OBJECT_UNLOCK (demux);
gst_segment_init (&track->input_segment, GST_FORMAT_TIME);
track->input_time = 0;
track->input_segment_seqnum = GST_SEQNUM_INVALID;
gst_segment_init (&track->output_segment, GST_FORMAT_TIME);
track->gap_position = track->gap_duration = GST_CLOCK_TIME_NONE;
track->output_time = GST_CLOCK_STIME_NONE;
track->next_position = GST_CLOCK_STIME_NONE;
track->update_next_segment = FALSE;
track->level_bytes = 0;
track->level_time = 0;
return track;
}