dashdemux: Replace GQueue by GstDataQueue

GstDataQueue has proper locking and provides functions to limit the
size of the queue. Also has blocking calls that are useful to
our multithread scenario in Dash.
This commit is contained in:
Thiago Santos 2013-01-28 18:52:04 -03:00
parent 27b1abbda3
commit 7330225ac8
5 changed files with 135 additions and 239 deletions

View file

@ -216,7 +216,6 @@ static gboolean gst_dash_demux_get_next_fragment_set (GstDashDemux * demux);
static void gst_dash_demux_reset (GstDashDemux * demux, gboolean dispose); static void gst_dash_demux_reset (GstDashDemux * demux, gboolean dispose);
static GstClockTime gst_dash_demux_get_buffering_time (GstDashDemux * demux); static GstClockTime gst_dash_demux_get_buffering_time (GstDashDemux * demux);
static float gst_dash_demux_get_buffering_ratio (GstDashDemux * demux); static float gst_dash_demux_get_buffering_ratio (GstDashDemux * demux);
static GstBuffer *gst_dash_demux_merge_buffer_list (GstFragment * fragment);
static GstCaps *gst_dash_demux_get_input_caps (GstDashDemux * demux, static GstCaps *gst_dash_demux_get_input_caps (GstDashDemux * demux,
GstActiveStream * stream); GstActiveStream * stream);
static GstClockTime gst_dash_demux_stream_get_buffering_time (GstDashDemuxStream static GstClockTime gst_dash_demux_stream_get_buffering_time (GstDashDemuxStream
@ -463,7 +462,7 @@ gst_dash_demux_all_queues_have_data (GstDashDemux * demux)
for (iter = demux->streams; iter; iter = g_slist_next (iter)) { for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
GstDashDemuxStream *stream = iter->data; GstDashDemuxStream *stream = iter->data;
if (g_queue_is_empty (stream->queue)) { if (gst_data_queue_is_empty (stream->queue)) {
return FALSE; return FALSE;
} }
} }
@ -477,14 +476,42 @@ gst_dash_demux_clear_queues (GstDashDemux * demux)
for (iter = demux->streams; iter; iter = g_slist_next (iter)) { for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
GstDashDemuxStream *stream = iter->data; GstDashDemuxStream *stream = iter->data;
while (!g_queue_is_empty (stream->queue)) {
GstFragment *fragment = g_queue_pop_head (stream->queue); gst_data_queue_flush (stream->queue);
g_object_unref (fragment);
}
g_queue_clear (stream->queue);
} }
} }
static gboolean
_check_queue_full (GstDataQueue * q, guint visible, guint bytes, guint64 time,
GstDashDemuxStream * stream)
{
/* TODO add limits */
return FALSE;
}
static void
_data_queue_item_destroy (GstDataQueueItem * item)
{
gst_mini_object_unref (item->object);
g_free (item);
}
static void
gst_dash_demux_stream_push_data (GstDashDemuxStream * stream,
GstBuffer * fragment)
{
GstDataQueueItem *item = g_new0 (GstDataQueueItem, 1);
item->object = GST_MINI_OBJECT_CAST (fragment);
item->duration = GST_BUFFER_DURATION (fragment);
item->visible = TRUE;
item->size = GST_BUFFER_SIZE (fragment);
item->destroy = (GDestroyNotify) _data_queue_item_destroy;
gst_data_queue_push (stream->queue, item);
}
static gboolean static gboolean
gst_dash_demux_src_event (GstPad * pad, GstEvent * event) gst_dash_demux_src_event (GstPad * pad, GstEvent * event)
{ {
@ -563,8 +590,8 @@ gst_dash_demux_src_event (GstPad * pad, GstEvent * event)
chunk = list->data; chunk = list->data;
current_pos = chunk->start_time; current_pos = chunk->start_time;
//current_sequence = chunk->number; //current_sequence = chunk->number;
GST_WARNING_OBJECT (demux, "%i <= %i (%i)", current_pos, target_pos, GST_WARNING_OBJECT (demux, "%llu <= %llu (%llu)", current_pos,
chunk->duration); target_pos, chunk->duration);
if (current_pos <= target_pos if (current_pos <= target_pos
&& target_pos < current_pos + chunk->duration) { && target_pos < current_pos + chunk->duration) {
break; break;
@ -626,6 +653,10 @@ gst_dash_demux_src_event (GstPad * pad, GstEvent * event)
/* Restart the demux */ /* Restart the demux */
demux->cancelled = FALSE; demux->cancelled = FALSE;
demux->end_of_manifest = FALSE; demux->end_of_manifest = FALSE;
for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
GstDashDemuxStream *stream = iter->data;
gst_data_queue_set_flushing (stream->queue, FALSE);
}
gst_dash_demux_resume_download_task (demux); gst_dash_demux_resume_download_task (demux);
gst_dash_demux_resume_stream_task (demux); gst_dash_demux_resume_stream_task (demux);
g_static_rec_mutex_unlock (&demux->stream_lock); g_static_rec_mutex_unlock (&demux->stream_lock);
@ -686,7 +717,9 @@ gst_dash_demux_setup_all_streams (GstDashDemux * demux)
stream = g_new0 (GstDashDemuxStream, 1); stream = g_new0 (GstDashDemuxStream, 1);
caps = gst_dash_demux_get_input_caps (demux, active_stream); caps = gst_dash_demux_get_input_caps (demux, active_stream);
stream->queue = g_queue_new (); stream->queue =
gst_data_queue_new ((GstDataQueueCheckFullFunction) _check_queue_full,
stream);
stream->index = i; stream->index = i;
stream->input_caps = caps; stream->input_caps = caps;
@ -878,7 +911,14 @@ gst_dash_demux_pad (GstPad * pad, GstBuffer * buf)
static void static void
gst_dash_demux_stop (GstDashDemux * demux) gst_dash_demux_stop (GstDashDemux * demux)
{ {
GSList *iter;
gst_uri_downloader_cancel (demux->downloader); gst_uri_downloader_cancel (demux->downloader);
for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
GstDashDemuxStream *stream = iter->data;
gst_data_queue_set_flushing (stream->queue, TRUE);
}
if (GST_TASK_STATE (demux->download_task) != GST_TASK_STOPPED) { if (GST_TASK_STATE (demux->download_task) != GST_TASK_STOPPED) {
GST_TASK_SIGNAL (demux->download_task); GST_TASK_SIGNAL (demux->download_task);
@ -969,16 +1009,18 @@ needs_pad_switch (GstDashDemux * demux)
GSList *iter; GSList *iter;
for (iter = demux->streams; iter; iter = g_slist_next (iter)) { for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
GstDataQueueItem *item;
GstDashDemuxStream *stream = iter->data; GstDashDemuxStream *stream = iter->data;
GstFragment *newFragment = g_queue_peek_head (stream->queue);
GstCaps *srccaps = NULL; GstCaps *srccaps = NULL;
GstBuffer *buffer;
if (newFragment == NULL) { if (!gst_data_queue_peek (stream->queue, &item))
continue; continue;
}
if (stream->output_caps) buffer = GST_BUFFER_CAST (item->object);
gst_caps_unref (stream->output_caps);
stream->output_caps = gst_fragment_get_caps (newFragment); gst_caps_replace (&stream->output_caps, GST_BUFFER_CAPS (buffer));
if (G_LIKELY (stream->pad)) if (G_LIKELY (stream->pad))
srccaps = gst_pad_get_negotiated_caps (stream->pad); srccaps = gst_pad_get_negotiated_caps (stream->pad);
if (G_UNLIKELY (!srccaps if (G_UNLIKELY (!srccaps
@ -1017,7 +1059,6 @@ static void
gst_dash_demux_stream_loop (GstDashDemux * demux) gst_dash_demux_stream_loop (GstDashDemux * demux)
{ {
GstFlowReturn ret; GstFlowReturn ret;
GstBufferList *buffer_list;
guint nb_adaptation_set = 0; guint nb_adaptation_set = 0;
GstActiveStream *active_stream; GstActiveStream *active_stream;
gboolean switch_pad; gboolean switch_pad;
@ -1057,15 +1098,18 @@ gst_dash_demux_stream_loop (GstDashDemux * demux)
} }
for (iter = demux->streams, i = 0; iter; i++, iter = g_slist_next (iter)) { for (iter = demux->streams, i = 0; iter; i++, iter = g_slist_next (iter)) {
GstDataQueueItem *item;
GstBuffer *buffer;
GstDashDemuxStream *stream = iter->data; GstDashDemuxStream *stream = iter->data;
GstFragment *fragment = g_queue_pop_head (stream->queue); if (!gst_data_queue_pop (stream->queue, &item))
if (!fragment)
continue; continue;
buffer = GST_BUFFER_CAST (item->object);
active_stream = gst_mpdparser_get_active_stream_by_index (demux->client, i); active_stream = gst_mpdparser_get_active_stream_by_index (demux->client, i);
if (demux->need_segment) { if (demux->need_segment) {
GstClockTime start = fragment->start_time + demux->position_shift; GstClockTime start =
GST_BUFFER_TIMESTAMP (buffer) + demux->position_shift;
/* And send a newsegment */ /* And send a newsegment */
GST_DEBUG_OBJECT (demux, "Sending new-segment. segment start:%" GST_DEBUG_OBJECT (demux, "Sending new-segment. segment start:%"
GST_TIME_FORMAT, GST_TIME_ARGS (start)); GST_TIME_FORMAT, GST_TIME_ARGS (start));
@ -1074,11 +1118,13 @@ gst_dash_demux_stream_loop (GstDashDemux * demux)
start, GST_CLOCK_TIME_NONE, start)); start, GST_CLOCK_TIME_NONE, start));
} }
GST_DEBUG_OBJECT (demux, "Pushing fragment #%d (stream %i)", GST_DEBUG_OBJECT (demux,
fragment->index, i); "Pushing fragment %p #%d (stream %i) ts:%" GST_TIME_FORMAT " dur:%"
buffer_list = gst_fragment_get_buffer_list (fragment); GST_TIME_FORMAT, buffer, GST_BUFFER_OFFSET (buffer), i,
g_object_unref (fragment); GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
ret = gst_pad_push_list (stream->pad, buffer_list); GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
ret = gst_pad_push (stream->pad, gst_buffer_ref (buffer));
item->destroy (item);
if ((ret != GST_FLOW_OK) && (active_stream->mimeType == GST_STREAM_VIDEO)) if ((ret != GST_FLOW_OK) && (active_stream->mimeType == GST_STREAM_VIDEO))
goto error_pushing; goto error_pushing;
} }
@ -1121,7 +1167,7 @@ gst_dash_demux_stream_free (GstDashDemuxStream * stream)
gst_object_unref (stream->pad); gst_object_unref (stream->pad);
/* TODO flush the queue */ /* TODO flush the queue */
g_queue_free (stream->queue); g_object_unref (stream->queue);
g_free (stream); g_free (stream);
} }
@ -1182,22 +1228,11 @@ gst_dash_demux_get_buffering_time (GstDashDemux * demux)
static GstClockTime static GstClockTime
gst_dash_demux_stream_get_buffering_time (GstDashDemuxStream * stream) gst_dash_demux_stream_get_buffering_time (GstDashDemuxStream * stream)
{ {
GstFragment *first_fragment, *last_fragment; GstDataQueueSize level;
GstClockTime buffer_time = 0;
/* get first fragment */ gst_data_queue_get_level (stream->queue, &level);
first_fragment = g_queue_peek_head (stream->queue);
/* get last fragment */
last_fragment = g_queue_peek_tail (stream->queue);
if (!first_fragment && !last_fragment) return (GstClockTime) level.time;
return 0;
if (first_fragment && last_fragment) {
buffer_time = last_fragment->stop_time - first_fragment->start_time;
}
return buffer_time;
} }
static float static float
@ -1210,34 +1245,6 @@ gst_dash_demux_get_buffering_ratio (GstDashDemux * demux)
return buffering_time / demux->min_buffering_time; return buffering_time / demux->min_buffering_time;
} }
static GstBuffer *
gst_dash_demux_merge_buffer_list (GstFragment * fragment)
{
GstBufferList *list;
GstBufferListIterator *it;
GstBuffer *buffer, *ret = NULL;
GstAdapter *adapter;
gsize size;
adapter = gst_adapter_new ();
list = gst_fragment_get_buffer_list (fragment);
it = gst_buffer_list_iterate (list);
while (gst_buffer_list_iterator_next_group (it)) {
while ((buffer = gst_buffer_list_iterator_next (it)) != NULL) {
gst_adapter_push (adapter, gst_buffer_ref (buffer));
}
}
gst_buffer_list_iterator_free (it);
gst_buffer_list_unref (list);
size = gst_adapter_available (adapter);
if (size > 0)
ret = gst_adapter_take_buffer (adapter, size);
GST_DEBUG ("Extracted a buffer of size %d from the fragment", size);
g_object_unref (adapter);
return ret;
}
/* gst_dash_demux_download_loop: /* gst_dash_demux_download_loop:
* *
* Loop for the "download' task that fetches fragments based on the * Loop for the "download' task that fetches fragments based on the
@ -1301,7 +1308,7 @@ gst_dash_demux_download_loop (GstDashDemux * demux)
"Failed to update the manifest file from URL %s", "Failed to update the manifest file from URL %s",
demux->client->mpd_uri); demux->client->mpd_uri);
} else { } else {
buffer = gst_dash_demux_merge_buffer_list (download); buffer = gst_fragment_get_buffer (download);
g_object_unref (download); g_object_unref (download);
/* parse the manifest file */ /* parse the manifest file */
if (buffer == NULL) { if (buffer == NULL) {
@ -1554,44 +1561,6 @@ gst_dash_demux_get_next_header (GstDashDemux * demux, guint stream_idx)
return fragment; return fragment;
} }
static GstBufferListItem
gst_dash_demux_add_buffer_cb (GstBuffer ** buffer,
guint group, guint idx, gpointer user_data)
{
GstFragment *frag = GST_FRAGMENT (user_data);
/* This buffer still belongs to the original fragment */
/* so we need to increase refcount */
gst_fragment_add_buffer (frag, gst_buffer_ref (*buffer));
return GST_BUFFER_LIST_CONTINUE;
}
/* Since we cannot add headers after the chunk has been downloaded, we have to recreate a new fragment */
static GstFragment *
gst_dash_demux_prepend_header (GstDashDemux * demux,
GstFragment * frag, GstFragment * header)
{
GstBufferList *list;
GstFragment *res = gst_fragment_new ();
res->name = g_strdup (frag->name);
res->download_start_time = frag->download_start_time;
res->download_stop_time = frag->download_stop_time;
res->start_time = frag->start_time;
res->stop_time = frag->stop_time;
res->index = frag->index;
res->discontinuous = frag->discontinuous;
list = gst_fragment_get_buffer_list (header);
gst_buffer_list_foreach (list, gst_dash_demux_add_buffer_cb, res);
gst_buffer_list_unref (list);
list = gst_fragment_get_buffer_list (frag);
gst_buffer_list_foreach (list, gst_dash_demux_add_buffer_cb, res);
gst_buffer_list_unref (list);
res->completed = TRUE;
return res;
}
static GstCaps * static GstCaps *
gst_dash_demux_get_video_input_caps (GstDashDemux * demux, gst_dash_demux_get_video_input_caps (GstDashDemux * demux,
GstActiveStream * stream) GstActiveStream * stream)
@ -1749,6 +1718,7 @@ gst_dash_demux_get_next_fragment_set (GstDashDemux * demux)
for (iter = demux->streams; iter; iter = g_slist_next (iter)) { for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
GstDashDemuxStream *stream = iter->data; GstDashDemuxStream *stream = iter->data;
guint stream_idx = stream->index; guint stream_idx = stream->index;
GstBuffer *buffer;
if (!gst_mpd_client_get_next_fragment (demux->client, if (!gst_mpd_client_get_next_fragment (demux->client,
stream_idx, &discont, &next_fragment_uri, &duration, &timestamp)) { stream_idx, &discont, &next_fragment_uri, &duration, &timestamp)) {
@ -1758,7 +1728,10 @@ gst_dash_demux_get_next_fragment_set (GstDashDemux * demux)
} }
GST_INFO_OBJECT (demux, "Next fragment for stream #%i", stream_idx); GST_INFO_OBJECT (demux, "Next fragment for stream #%i", stream_idx);
GST_INFO_OBJECT (demux, "Fetching next fragment %s", next_fragment_uri); GST_INFO_OBJECT (demux,
"Fetching next fragment %s ts:%" GST_TIME_FORMAT " dur:%"
GST_TIME_FORMAT, next_fragment_uri, GST_TIME_ARGS (timestamp),
GST_TIME_ARGS (duration));
download = gst_uri_downloader_fetch_uri (demux->downloader, download = gst_uri_downloader_fetch_uri (demux->downloader,
next_fragment_uri); next_fragment_uri);
@ -1767,33 +1740,36 @@ gst_dash_demux_get_next_fragment_set (GstDashDemux * demux)
if (download == NULL) if (download == NULL)
return FALSE; return FALSE;
download->start_time = timestamp; buffer = gst_fragment_get_buffer (download);
download->stop_time = timestamp + duration;
active_stream = active_stream =
gst_mpdparser_get_active_stream_by_index (demux->client, stream_idx); gst_mpdparser_get_active_stream_by_index (demux->client, stream_idx);
if (stream == NULL) if (stream == NULL) /* TODO unref fragments */
return FALSE; return FALSE;
download->index = gst_mpd_client_get_segment_index (active_stream) - 1;
if (need_header) { if (need_header) {
/* We need to fetch a new header */ /* We need to fetch a new header */
if ((header = gst_dash_demux_get_next_header (demux, stream_idx)) == NULL) { if ((header = gst_dash_demux_get_next_header (demux, stream_idx)) == NULL) {
GST_INFO_OBJECT (demux, "Unable to fetch header"); GST_INFO_OBJECT (demux, "Unable to fetch header");
} else { } else {
GstBuffer *header_buffer;
/* Replace fragment with a new one including the header */ /* Replace fragment with a new one including the header */
GstFragment *new_fragment =
gst_dash_demux_prepend_header (demux, download, header); header_buffer = gst_fragment_get_buffer (header);
g_object_unref (header); buffer = gst_buffer_join (header_buffer, buffer);
g_object_unref (download);
download = new_fragment;
} }
} }
gst_fragment_set_caps (download, stream->input_caps); buffer = gst_buffer_make_metadata_writable (buffer);
g_queue_push_tail (stream->queue, download);
size_buffer += gst_fragment_get_buffer_size (download); GST_BUFFER_TIMESTAMP (buffer) = timestamp;
GST_BUFFER_DURATION (buffer) = duration;
GST_BUFFER_OFFSET (buffer) =
gst_mpd_client_get_segment_index (active_stream) - 1;
gst_buffer_set_caps (buffer, stream->input_caps);
gst_dash_demux_stream_push_data (stream, buffer);
size_buffer += GST_BUFFER_SIZE (buffer);
} }
/* Wake the download task up */ /* Wake the download task up */

View file

@ -32,8 +32,7 @@
#include <gst/gst.h> #include <gst/gst.h>
#include <gst/base/gstadapter.h> #include <gst/base/gstadapter.h>
#include <gst/gst.h> #include <gst/base/gstdataqueue.h>
#include <gst/base/gstadapter.h>
#include "gstmpdparser.h" #include "gstmpdparser.h"
#include "gstfragmented.h" #include "gstfragmented.h"
#include "gsturidownloader.h" #include "gsturidownloader.h"
@ -53,7 +52,6 @@ G_BEGIN_DECLS
typedef struct _GstDashDemuxStream GstDashDemuxStream; typedef struct _GstDashDemuxStream GstDashDemuxStream;
typedef struct _GstDashDemux GstDashDemux; typedef struct _GstDashDemux GstDashDemux;
typedef struct _GstDashDemuxClass GstDashDemuxClass; typedef struct _GstDashDemuxClass GstDashDemuxClass;
#define MAX_LANGUAGES 20
struct _GstDashDemuxStream struct _GstDashDemuxStream
{ {
@ -64,7 +62,7 @@ struct _GstDashDemuxStream
GstCaps *output_caps; GstCaps *output_caps;
GstCaps *input_caps; GstCaps *input_caps;
GQueue *queue; GstDataQueue *queue;
}; };
/** /**

View file

@ -20,7 +20,7 @@
*/ */
#include <glib.h> #include <glib.h>
#include <gst/base/gsttypefindhelper.h> #include <gst/base/gstadapter.h>
#include "gstfragmented.h" #include "gstfragmented.h"
#include "gstfragment.h" #include "gstfragment.h"
@ -35,18 +35,13 @@ enum
PROP_NAME, PROP_NAME,
PROP_DURATION, PROP_DURATION,
PROP_DISCONTINOUS, PROP_DISCONTINOUS,
PROP_BUFFER_LIST,
PROP_CAPS,
PROP_LAST PROP_LAST
}; };
struct _GstFragmentPrivate struct _GstFragmentPrivate
{ {
GstBufferList *buffer_list; GstAdapter *adapter;
guint64 size; GstBuffer *buffer;
GstBufferListIterator *buffer_iterator;
GstCaps *caps;
GMutex lock;
}; };
G_DEFINE_TYPE (GstFragment, gst_fragment, G_TYPE_OBJECT); G_DEFINE_TYPE (GstFragment, gst_fragment, G_TYPE_OBJECT);
@ -54,24 +49,6 @@ G_DEFINE_TYPE (GstFragment, gst_fragment, G_TYPE_OBJECT);
static void gst_fragment_dispose (GObject * object); static void gst_fragment_dispose (GObject * object);
static void gst_fragment_finalize (GObject * object); static void gst_fragment_finalize (GObject * object);
static void
gst_fragment_set_property (GObject * object,
guint property_id, const GValue * value, GParamSpec * pspec)
{
GstFragment *fragment = GST_FRAGMENT (object);
switch (property_id) {
case PROP_CAPS:
gst_fragment_set_caps (fragment, g_value_get_boxed (value));
break;
default:
/* We don't have any other property... */
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
break;
}
}
static void static void
gst_fragment_get_property (GObject * object, gst_fragment_get_property (GObject * object,
guint property_id, GValue * value, GParamSpec * pspec) guint property_id, GValue * value, GParamSpec * pspec)
@ -95,14 +72,6 @@ gst_fragment_get_property (GObject * object,
g_value_set_boolean (value, fragment->discontinuous); g_value_set_boolean (value, fragment->discontinuous);
break; break;
case PROP_BUFFER_LIST:
g_value_set_object (value, gst_fragment_get_buffer_list (fragment));
break;
case PROP_CAPS:
g_value_set_boxed (value, gst_fragment_get_caps (fragment));
break;
default: default:
/* We don't have any other property... */ /* We don't have any other property... */
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec); G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
@ -110,8 +79,6 @@ gst_fragment_get_property (GObject * object,
} }
} }
static void static void
gst_fragment_class_init (GstFragmentClass * klass) gst_fragment_class_init (GstFragmentClass * klass)
{ {
@ -119,7 +86,6 @@ gst_fragment_class_init (GstFragmentClass * klass)
g_type_class_add_private (klass, sizeof (GstFragmentPrivate)); g_type_class_add_private (klass, sizeof (GstFragmentPrivate));
gobject_class->set_property = gst_fragment_set_property;
gobject_class->get_property = gst_fragment_get_property; gobject_class->get_property = gst_fragment_get_property;
gobject_class->dispose = gst_fragment_dispose; gobject_class->dispose = gst_fragment_dispose;
gobject_class->finalize = gst_fragment_finalize; gobject_class->finalize = gst_fragment_finalize;
@ -140,16 +106,6 @@ gst_fragment_class_init (GstFragmentClass * klass)
g_object_class_install_property (gobject_class, PROP_DURATION, g_object_class_install_property (gobject_class, PROP_DURATION,
g_param_spec_uint64 ("duration", "Fragment duration", g_param_spec_uint64 ("duration", "Fragment duration",
"Duration of the fragment", 0, G_MAXUINT64, 0, G_PARAM_READABLE)); "Duration of the fragment", 0, G_MAXUINT64, 0, G_PARAM_READABLE));
g_object_class_install_property (gobject_class, PROP_BUFFER_LIST,
g_param_spec_object ("buffer-list", "Buffer List",
"A list with the fragment's buffers", GST_TYPE_FRAGMENT,
G_PARAM_READABLE));
g_object_class_install_property (gobject_class, PROP_CAPS,
g_param_spec_boxed ("caps", "Fragment caps",
"The caps of the fragment's buffer. (NULL = detect)", GST_TYPE_CAPS,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
} }
static void static void
@ -159,12 +115,8 @@ gst_fragment_init (GstFragment * fragment)
fragment->priv = priv = GST_FRAGMENT_GET_PRIVATE (fragment); fragment->priv = priv = GST_FRAGMENT_GET_PRIVATE (fragment);
g_mutex_init (&fragment->priv->lock); priv->adapter = gst_adapter_new ();
priv->buffer_list = gst_buffer_list_new (); fragment->download_start_time = gst_util_get_timestamp ();
priv->size = 0;
priv->buffer_iterator = gst_buffer_list_iterate (priv->buffer_list);
gst_buffer_list_iterator_add_group (priv->buffer_iterator);
fragment->download_start_time = g_get_real_time ();
fragment->start_time = 0; fragment->start_time = 0;
fragment->stop_time = 0; fragment->stop_time = 0;
fragment->index = 0; fragment->index = 0;
@ -185,7 +137,6 @@ gst_fragment_finalize (GObject * gobject)
GstFragment *fragment = GST_FRAGMENT (gobject); GstFragment *fragment = GST_FRAGMENT (gobject);
g_free (fragment->name); g_free (fragment->name);
g_mutex_clear (&fragment->priv->lock);
G_OBJECT_CLASS (gst_fragment_parent_class)->finalize (gobject); G_OBJECT_CLASS (gst_fragment_parent_class)->finalize (gobject);
} }
@ -195,74 +146,34 @@ gst_fragment_dispose (GObject * object)
{ {
GstFragmentPrivate *priv = GST_FRAGMENT (object)->priv; GstFragmentPrivate *priv = GST_FRAGMENT (object)->priv;
if (priv->buffer_list != NULL) { if (priv->adapter) {
gst_buffer_list_iterator_free (priv->buffer_iterator); gst_object_unref (priv->adapter);
gst_buffer_list_unref (priv->buffer_list); priv->adapter = NULL;
priv->buffer_list = NULL;
priv->size = 0;
} }
if (priv->buffer) {
if (priv->caps != NULL) { gst_buffer_unref (priv->buffer);
gst_caps_unref (priv->caps); priv->buffer = NULL;
priv->caps = NULL;
} }
G_OBJECT_CLASS (gst_fragment_parent_class)->dispose (object); G_OBJECT_CLASS (gst_fragment_parent_class)->dispose (object);
} }
GstBufferList * GstBuffer *
gst_fragment_get_buffer_list (GstFragment * fragment) gst_fragment_get_buffer (GstFragment * fragment)
{ {
g_return_val_if_fail (fragment != NULL, NULL); g_return_val_if_fail (fragment != NULL, NULL);
if (!fragment->completed) if (!fragment->completed)
return NULL; return NULL;
gst_buffer_list_ref (fragment->priv->buffer_list); if (!fragment->priv->buffer) {
return fragment->priv->buffer_list; fragment->priv->buffer = gst_adapter_take_buffer (fragment->priv->adapter,
} gst_adapter_available (fragment->priv->adapter));
void
gst_fragment_set_caps (GstFragment * fragment, GstCaps * caps)
{
g_return_if_fail (fragment != NULL);
g_mutex_lock (&fragment->priv->lock);
gst_caps_replace (&fragment->priv->caps, caps);
g_mutex_unlock (&fragment->priv->lock);
}
GstCaps *
gst_fragment_get_caps (GstFragment * fragment)
{
g_return_val_if_fail (fragment != NULL, NULL);
if (!fragment->completed)
return NULL;
g_mutex_lock (&fragment->priv->lock);
if (fragment->priv->caps == NULL) {
GstBuffer *buf = gst_buffer_list_get (fragment->priv->buffer_list, 0, 0);
fragment->priv->caps = gst_type_find_helper_for_buffer (NULL, buf, NULL);
} }
gst_caps_ref (fragment->priv->caps);
g_mutex_unlock (&fragment->priv->lock);
return fragment->priv->caps; return gst_buffer_ref (fragment->priv->buffer);
} }
guint64
gst_fragment_get_buffer_size (GstFragment * fragment)
{
g_return_val_if_fail (fragment != NULL, 0);
if (!fragment->completed)
return 0;
return fragment->priv->size;
}
gboolean gboolean
gst_fragment_add_buffer (GstFragment * fragment, GstBuffer * buffer) gst_fragment_add_buffer (GstFragment * fragment, GstBuffer * buffer)
{ {
@ -274,8 +185,20 @@ gst_fragment_add_buffer (GstFragment * fragment, GstBuffer * buffer)
return FALSE; return FALSE;
} }
gst_buffer_list_iterator_add (fragment->priv->buffer_iterator, buffer); GST_DEBUG ("Adding new buffer to the fragment");
fragment->priv->size = fragment->priv->size + GST_BUFFER_SIZE (buffer); /* We steal the buffers you pass in */
gst_adapter_push (fragment->priv->adapter, buffer);
return TRUE; return TRUE;
} }
gsize
gst_fragment_get_total_size (GstFragment * fragment)
{
g_return_val_if_fail (GST_IS_FRAGMENT (fragment), 0);
if (fragment->priv->buffer)
return GST_BUFFER_SIZE (fragment->priv->buffer);
return gst_adapter_available (fragment->priv->adapter);
}

View file

@ -60,11 +60,10 @@ struct _GstFragmentClass
GType gst_fragment_get_type (void); GType gst_fragment_get_type (void);
guint64 gst_fragment_get_buffer_size (GstFragment * fragment); GstBuffer * gst_fragment_get_buffer (GstFragment *fragment);
GstBufferList * gst_fragment_get_buffer_list (GstFragment *fragment); gboolean gst_fragment_set_headers (GstFragment *fragment, GstBuffer **buffer, guint count);
void gst_fragment_set_caps (GstFragment * fragment, GstCaps * caps);
GstCaps * gst_fragment_get_caps (GstFragment * fragment);
gboolean gst_fragment_add_buffer (GstFragment *fragment, GstBuffer *buffer); gboolean gst_fragment_add_buffer (GstFragment *fragment, GstBuffer *buffer);
gsize gst_fragment_get_total_size (GstFragment * fragment);
GstFragment * gst_fragment_new (void); GstFragment * gst_fragment_new (void);
G_END_DECLS G_END_DECLS

View file

@ -3256,7 +3256,7 @@ gst_mpd_client_get_next_fragment_duration (GstMpdClient * client)
GstActiveStream *stream; GstActiveStream *stream;
GstMediaSegment *media_segment; GstMediaSegment *media_segment;
GST_WARNING ("Stream index: %i", client->stream_idx); GST_DEBUG ("Stream index: %i", client->stream_idx);
stream = g_list_nth_data (client->active_streams, client->stream_idx); stream = g_list_nth_data (client->active_streams, client->stream_idx);
g_return_val_if_fail (stream != NULL, 0); g_return_val_if_fail (stream != NULL, 0);