mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2024-11-27 20:21:24 +00:00
dashdemux: move the buffers queues to the streams
Store the buffers separately for each stream, this is clearer than having a queue with a list of buffers. It also allows easier selection of buffers to push in later refactors
This commit is contained in:
parent
1556d8cb03
commit
27b1abbda3
2 changed files with 90 additions and 44 deletions
|
@ -219,6 +219,8 @@ static float gst_dash_demux_get_buffering_ratio (GstDashDemux * demux);
|
||||||
static GstBuffer *gst_dash_demux_merge_buffer_list (GstFragment * fragment);
|
static GstBuffer *gst_dash_demux_merge_buffer_list (GstFragment * fragment);
|
||||||
static GstCaps *gst_dash_demux_get_input_caps (GstDashDemux * demux,
|
static GstCaps *gst_dash_demux_get_input_caps (GstDashDemux * demux,
|
||||||
GstActiveStream * stream);
|
GstActiveStream * stream);
|
||||||
|
static GstClockTime gst_dash_demux_stream_get_buffering_time (GstDashDemuxStream
|
||||||
|
* stream);
|
||||||
|
|
||||||
static void
|
static void
|
||||||
_do_init (GType type)
|
_do_init (GType type)
|
||||||
|
@ -282,8 +284,6 @@ gst_dash_demux_dispose (GObject * obj)
|
||||||
|
|
||||||
gst_dash_demux_reset (demux, TRUE);
|
gst_dash_demux_reset (demux, TRUE);
|
||||||
|
|
||||||
g_queue_free (demux->queue);
|
|
||||||
|
|
||||||
G_OBJECT_CLASS (parent_class)->dispose (obj);
|
G_OBJECT_CLASS (parent_class)->dispose (obj);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -349,7 +349,6 @@ gst_dash_demux_init (GstDashDemux * demux, GstDashDemuxClass * klass)
|
||||||
demux->bandwidth_usage = DEFAULT_BANDWIDTH_USAGE;
|
demux->bandwidth_usage = DEFAULT_BANDWIDTH_USAGE;
|
||||||
demux->max_bitrate = DEFAULT_MAX_BITRATE;
|
demux->max_bitrate = DEFAULT_MAX_BITRATE;
|
||||||
|
|
||||||
demux->queue = g_queue_new ();
|
|
||||||
/* Updates task */
|
/* Updates task */
|
||||||
g_static_rec_mutex_init (&demux->download_lock);
|
g_static_rec_mutex_init (&demux->download_lock);
|
||||||
demux->download_task =
|
demux->download_task =
|
||||||
|
@ -457,20 +456,33 @@ gst_dash_demux_change_state (GstElement * element, GstStateChange transition)
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static gboolean
|
||||||
gst_dash_demux_clear_queue (GstDashDemux * demux)
|
gst_dash_demux_all_queues_have_data (GstDashDemux * demux)
|
||||||
{
|
{
|
||||||
while (!g_queue_is_empty (demux->queue)) {
|
GSList *iter;
|
||||||
GList *listfragment = g_queue_pop_head (demux->queue);
|
|
||||||
guint j = 0;
|
for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
|
||||||
while (j < g_list_length (listfragment)) {
|
GstDashDemuxStream *stream = iter->data;
|
||||||
GstFragment *fragment = g_list_nth_data (listfragment, j);
|
if (g_queue_is_empty (stream->queue)) {
|
||||||
g_object_unref (fragment);
|
return FALSE;
|
||||||
j++;
|
|
||||||
}
|
}
|
||||||
g_list_free (listfragment);
|
|
||||||
}
|
}
|
||||||
g_queue_clear (demux->queue);
|
return TRUE;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
gst_dash_demux_clear_queues (GstDashDemux * demux)
|
||||||
|
{
|
||||||
|
GSList *iter;
|
||||||
|
|
||||||
|
for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
|
||||||
|
GstDashDemuxStream *stream = iter->data;
|
||||||
|
while (!g_queue_is_empty (stream->queue)) {
|
||||||
|
GstFragment *fragment = g_queue_pop_head (stream->queue);
|
||||||
|
g_object_unref (fragment);
|
||||||
|
}
|
||||||
|
g_queue_clear (stream->queue);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static gboolean
|
static gboolean
|
||||||
|
@ -587,7 +599,7 @@ gst_dash_demux_src_event (GstPad * pad, GstEvent * event)
|
||||||
|
|
||||||
/* Clear the buffering queue */
|
/* Clear the buffering queue */
|
||||||
/* FIXME: allow seeking in the buffering queue */
|
/* FIXME: allow seeking in the buffering queue */
|
||||||
gst_dash_demux_clear_queue (demux);
|
gst_dash_demux_clear_queues (demux);
|
||||||
|
|
||||||
//GST_MPD_CLIENT_LOCK (demux->client);
|
//GST_MPD_CLIENT_LOCK (demux->client);
|
||||||
GST_DEBUG_OBJECT (demux, "Seeking to sequence %d", current_sequence);
|
GST_DEBUG_OBJECT (demux, "Seeking to sequence %d", current_sequence);
|
||||||
|
@ -674,6 +686,7 @@ gst_dash_demux_setup_all_streams (GstDashDemux * demux)
|
||||||
|
|
||||||
stream = g_new0 (GstDashDemuxStream, 1);
|
stream = g_new0 (GstDashDemuxStream, 1);
|
||||||
caps = gst_dash_demux_get_input_caps (demux, active_stream);
|
caps = gst_dash_demux_get_input_caps (demux, active_stream);
|
||||||
|
stream->queue = g_queue_new ();
|
||||||
|
|
||||||
stream->index = i;
|
stream->index = i;
|
||||||
stream->input_caps = caps;
|
stream->input_caps = caps;
|
||||||
|
@ -900,7 +913,7 @@ switch_pads (GstDashDemux * demux, guint nb_adaptation_set)
|
||||||
if (oldpad) {
|
if (oldpad) {
|
||||||
oldpads = g_slist_prepend (oldpads, oldpad);
|
oldpads = g_slist_prepend (oldpads, oldpad);
|
||||||
GST_DEBUG_OBJECT (demux,
|
GST_DEBUG_OBJECT (demux,
|
||||||
"Switching pads (oldpad:%p) %" GST_PTR_FORMAT, oldpad);
|
"Switching pads (oldpad:%p) %" GST_PTR_FORMAT, oldpad, oldpad);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
/* Create and activate new pads */
|
/* Create and activate new pads */
|
||||||
|
@ -914,7 +927,7 @@ switch_pads (GstDashDemux * demux, guint nb_adaptation_set)
|
||||||
gst_pad_set_element_private (stream->pad, demux);
|
gst_pad_set_element_private (stream->pad, demux);
|
||||||
gst_pad_set_active (stream->pad, TRUE);
|
gst_pad_set_active (stream->pad, TRUE);
|
||||||
gst_pad_set_caps (stream->pad, stream->output_caps);
|
gst_pad_set_caps (stream->pad, stream->output_caps);
|
||||||
gst_element_add_pad (GST_ELEMENT (demux), stream->pad);
|
gst_element_add_pad (GST_ELEMENT (demux), gst_object_ref (stream->pad));
|
||||||
GST_INFO_OBJECT (demux, "Adding srcpad %s:%s with caps %" GST_PTR_FORMAT,
|
GST_INFO_OBJECT (demux, "Adding srcpad %s:%s with caps %" GST_PTR_FORMAT,
|
||||||
GST_DEBUG_PAD_NAME (stream->pad), stream->output_caps);
|
GST_DEBUG_PAD_NAME (stream->pad), stream->output_caps);
|
||||||
}
|
}
|
||||||
|
@ -928,6 +941,7 @@ switch_pads (GstDashDemux * demux, guint nb_adaptation_set)
|
||||||
gst_pad_push_event (pad, gst_event_new_eos ());
|
gst_pad_push_event (pad, gst_event_new_eos ());
|
||||||
gst_pad_set_active (pad, FALSE);
|
gst_pad_set_active (pad, FALSE);
|
||||||
gst_element_remove_pad (GST_ELEMENT (demux), pad);
|
gst_element_remove_pad (GST_ELEMENT (demux), pad);
|
||||||
|
gst_object_unref (pad);
|
||||||
}
|
}
|
||||||
g_slist_free (oldpads);
|
g_slist_free (oldpads);
|
||||||
}
|
}
|
||||||
|
@ -948,7 +962,7 @@ switch_pads (GstDashDemux * demux, guint nb_adaptation_set)
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
static gboolean
|
static gboolean
|
||||||
needs_pad_switch (GstDashDemux * demux, GList * fragment)
|
needs_pad_switch (GstDashDemux * demux)
|
||||||
{
|
{
|
||||||
gboolean switch_pad = FALSE;
|
gboolean switch_pad = FALSE;
|
||||||
guint i = 0;
|
guint i = 0;
|
||||||
|
@ -956,7 +970,7 @@ needs_pad_switch (GstDashDemux * demux, GList * fragment)
|
||||||
|
|
||||||
for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
|
for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
|
||||||
GstDashDemuxStream *stream = iter->data;
|
GstDashDemuxStream *stream = iter->data;
|
||||||
GstFragment *newFragment = g_list_nth_data (fragment, i);
|
GstFragment *newFragment = g_queue_peek_head (stream->queue);
|
||||||
GstCaps *srccaps = NULL;
|
GstCaps *srccaps = NULL;
|
||||||
|
|
||||||
if (newFragment == NULL) {
|
if (newFragment == NULL) {
|
||||||
|
@ -1002,7 +1016,6 @@ needs_pad_switch (GstDashDemux * demux, GList * fragment)
|
||||||
static void
|
static void
|
||||||
gst_dash_demux_stream_loop (GstDashDemux * demux)
|
gst_dash_demux_stream_loop (GstDashDemux * demux)
|
||||||
{
|
{
|
||||||
GList *listfragment;
|
|
||||||
GstFlowReturn ret;
|
GstFlowReturn ret;
|
||||||
GstBufferList *buffer_list;
|
GstBufferList *buffer_list;
|
||||||
guint nb_adaptation_set = 0;
|
guint nb_adaptation_set = 0;
|
||||||
|
@ -1011,10 +1024,13 @@ gst_dash_demux_stream_loop (GstDashDemux * demux)
|
||||||
guint i = 0;
|
guint i = 0;
|
||||||
GSList *iter;
|
GSList *iter;
|
||||||
|
|
||||||
if (g_queue_is_empty (demux->queue)) {
|
GST_LOG_OBJECT (demux, "Starting stream loop");
|
||||||
|
|
||||||
|
if (!gst_dash_demux_all_queues_have_data (demux)) {
|
||||||
if (demux->end_of_manifest)
|
if (demux->end_of_manifest)
|
||||||
goto end_of_manifest;
|
goto end_of_manifest;
|
||||||
|
|
||||||
|
GST_DEBUG_OBJECT (demux, "Ending stream loop, no buffers to push");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1031,19 +1047,22 @@ gst_dash_demux_stream_loop (GstDashDemux * demux)
|
||||||
100 * gst_dash_demux_get_buffering_ratio (demux)));
|
100 * gst_dash_demux_get_buffering_ratio (demux)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
listfragment = g_queue_pop_head (demux->queue);
|
|
||||||
nb_adaptation_set = g_list_length (listfragment);
|
|
||||||
|
|
||||||
/* Figure out if we need to create/switch pads */
|
/* Figure out if we need to create/switch pads */
|
||||||
switch_pad = needs_pad_switch (demux, listfragment);
|
switch_pad = needs_pad_switch (demux);
|
||||||
if (switch_pad) {
|
if (switch_pad) {
|
||||||
GST_WARNING ("Switching pads");
|
GST_WARNING ("Switching pads");
|
||||||
switch_pads (demux, nb_adaptation_set);
|
switch_pads (demux, nb_adaptation_set);
|
||||||
demux->need_segment = TRUE;
|
demux->need_segment = TRUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (iter = demux->streams, i = 0; iter; i++, iter = g_slist_next (iter)) {
|
for (iter = demux->streams, i = 0; iter; i++, iter = g_slist_next (iter)) {
|
||||||
GstDashDemuxStream *stream = iter->data;
|
GstDashDemuxStream *stream = iter->data;
|
||||||
GstFragment *fragment = g_list_nth_data (listfragment, i);
|
GstFragment *fragment = g_queue_pop_head (stream->queue);
|
||||||
|
|
||||||
|
if (!fragment)
|
||||||
|
continue;
|
||||||
|
|
||||||
active_stream = gst_mpdparser_get_active_stream_by_index (demux->client, i);
|
active_stream = gst_mpdparser_get_active_stream_by_index (demux->client, i);
|
||||||
if (demux->need_segment) {
|
if (demux->need_segment) {
|
||||||
GstClockTime start = fragment->start_time + demux->position_shift;
|
GstClockTime start = fragment->start_time + demux->position_shift;
|
||||||
|
@ -1065,7 +1084,6 @@ gst_dash_demux_stream_loop (GstDashDemux * demux)
|
||||||
}
|
}
|
||||||
demux->need_segment = FALSE;
|
demux->need_segment = FALSE;
|
||||||
demux->position_shift = 0;
|
demux->position_shift = 0;
|
||||||
g_list_free (listfragment);
|
|
||||||
|
|
||||||
return;
|
return;
|
||||||
|
|
||||||
|
@ -1092,6 +1110,22 @@ error_pushing:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
gst_dash_demux_stream_free (GstDashDemuxStream * stream)
|
||||||
|
{
|
||||||
|
if (stream->input_caps)
|
||||||
|
gst_caps_unref (stream->input_caps);
|
||||||
|
if (stream->output_caps)
|
||||||
|
gst_caps_unref (stream->output_caps);
|
||||||
|
if (stream->pad)
|
||||||
|
gst_object_unref (stream->pad);
|
||||||
|
|
||||||
|
/* TODO flush the queue */
|
||||||
|
g_queue_free (stream->queue);
|
||||||
|
|
||||||
|
g_free (stream);
|
||||||
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
gst_dash_demux_reset (GstDashDemux * demux, gboolean dispose)
|
gst_dash_demux_reset (GstDashDemux * demux, gboolean dispose)
|
||||||
{
|
{
|
||||||
|
@ -1101,13 +1135,14 @@ gst_dash_demux_reset (GstDashDemux * demux, gboolean dispose)
|
||||||
demux->end_of_manifest = FALSE;
|
demux->end_of_manifest = FALSE;
|
||||||
demux->cancelled = FALSE;
|
demux->cancelled = FALSE;
|
||||||
|
|
||||||
|
gst_dash_demux_clear_queues (demux);
|
||||||
|
|
||||||
for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
|
for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
|
||||||
GstDashDemuxStream *stream = iter->data;
|
GstDashDemuxStream *stream = iter->data;
|
||||||
if (stream->input_caps) {
|
gst_dash_demux_stream_free (stream);
|
||||||
gst_caps_unref (stream->input_caps);
|
|
||||||
stream->input_caps = NULL;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
g_slist_free (demux->streams);
|
||||||
|
demux->streams = NULL;
|
||||||
|
|
||||||
if (demux->manifest) {
|
if (demux->manifest) {
|
||||||
gst_buffer_unref (demux->manifest);
|
gst_buffer_unref (demux->manifest);
|
||||||
|
@ -1121,8 +1156,6 @@ gst_dash_demux_reset (GstDashDemux * demux, gboolean dispose)
|
||||||
demux->client = gst_mpd_client_new ();
|
demux->client = gst_mpd_client_new ();
|
||||||
}
|
}
|
||||||
|
|
||||||
gst_dash_demux_clear_queue (demux);
|
|
||||||
|
|
||||||
demux->last_manifest_update = GST_CLOCK_TIME_NONE;
|
demux->last_manifest_update = GST_CLOCK_TIME_NONE;
|
||||||
demux->position = 0;
|
demux->position = 0;
|
||||||
demux->position_shift = 0;
|
demux->position_shift = 0;
|
||||||
|
@ -1134,18 +1167,31 @@ static GstClockTime
|
||||||
gst_dash_demux_get_buffering_time (GstDashDemux * demux)
|
gst_dash_demux_get_buffering_time (GstDashDemux * demux)
|
||||||
{
|
{
|
||||||
GstClockTime buffer_time = 0;
|
GstClockTime buffer_time = 0;
|
||||||
GList *listfragment;
|
GSList *iter;
|
||||||
GstFragment *first_fragment, *last_fragment;
|
|
||||||
|
|
||||||
if (g_queue_is_empty (demux->queue))
|
for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
|
||||||
return 0;
|
buffer_time = gst_dash_demux_stream_get_buffering_time (iter->data);
|
||||||
|
|
||||||
|
if (buffer_time)
|
||||||
|
return buffer_time;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static GstClockTime
|
||||||
|
gst_dash_demux_stream_get_buffering_time (GstDashDemuxStream * stream)
|
||||||
|
{
|
||||||
|
GstFragment *first_fragment, *last_fragment;
|
||||||
|
GstClockTime buffer_time = 0;
|
||||||
|
|
||||||
/* get first fragment */
|
/* get first fragment */
|
||||||
listfragment = g_queue_peek_head (demux->queue);
|
first_fragment = g_queue_peek_head (stream->queue);
|
||||||
first_fragment = listfragment->data;
|
|
||||||
/* get last fragment */
|
/* get last fragment */
|
||||||
listfragment = g_queue_peek_tail (demux->queue);
|
last_fragment = g_queue_peek_tail (stream->queue);
|
||||||
last_fragment = listfragment->data;
|
|
||||||
|
if (!first_fragment && !last_fragment)
|
||||||
|
return 0;
|
||||||
|
|
||||||
if (first_fragment && last_fragment) {
|
if (first_fragment && last_fragment) {
|
||||||
buffer_time = last_fragment->stop_time - first_fragment->start_time;
|
buffer_time = last_fragment->stop_time - first_fragment->start_time;
|
||||||
|
@ -1746,11 +1792,10 @@ gst_dash_demux_get_next_fragment_set (GstDashDemux * demux)
|
||||||
}
|
}
|
||||||
|
|
||||||
gst_fragment_set_caps (download, stream->input_caps);
|
gst_fragment_set_caps (download, stream->input_caps);
|
||||||
fragment_set = g_list_append (fragment_set, download);
|
g_queue_push_tail (stream->queue, download);
|
||||||
size_buffer += gst_fragment_get_buffer_size (download);
|
size_buffer += gst_fragment_get_buffer_size (download);
|
||||||
}
|
}
|
||||||
/* Push fragment set into the queue */
|
|
||||||
g_queue_push_tail (demux->queue, fragment_set);
|
|
||||||
/* Wake the download task up */
|
/* Wake the download task up */
|
||||||
GST_TASK_SIGNAL (demux->download_task);
|
GST_TASK_SIGNAL (demux->download_task);
|
||||||
g_get_current_time (&now);
|
g_get_current_time (&now);
|
||||||
|
|
|
@ -63,6 +63,8 @@ struct _GstDashDemuxStream
|
||||||
|
|
||||||
GstCaps *output_caps;
|
GstCaps *output_caps;
|
||||||
GstCaps *input_caps;
|
GstCaps *input_caps;
|
||||||
|
|
||||||
|
GQueue *queue;
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -80,7 +82,6 @@ struct _GstDashDemux
|
||||||
GstBuffer *manifest;
|
GstBuffer *manifest;
|
||||||
GstUriDownloader *downloader;
|
GstUriDownloader *downloader;
|
||||||
GstMpdClient *client; /* MPD client */
|
GstMpdClient *client; /* MPD client */
|
||||||
GQueue *queue; /* Video/Audio/Application List of fragment storing the fetched fragments */
|
|
||||||
gboolean end_of_period;
|
gboolean end_of_period;
|
||||||
gboolean end_of_manifest;
|
gboolean end_of_manifest;
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue