mssdemux: remove uridownloader in favor of internal source

Refactor mssdemux to remove uridownloader to use an internal
source element which reduces startup latency and provides smaller
buffers for better buffering management downstream
This commit is contained in:
Thiago Santos 2014-05-12 13:40:19 -03:00
parent 4c59f7df4c
commit 90577c3ae7
2 changed files with 366 additions and 84 deletions

View file

@ -114,7 +114,7 @@ GST_STATIC_PAD_TEMPLATE ("audio_%02u",
GST_STATIC_CAPS_ANY);
#define gst_mss_demux_parent_class parent_class
G_DEFINE_TYPE (GstMssDemux, gst_mss_demux, GST_TYPE_ELEMENT);
G_DEFINE_TYPE (GstMssDemux, gst_mss_demux, GST_TYPE_BIN);
static void gst_mss_demux_dispose (GObject * object);
static void gst_mss_demux_set_property (GObject * object, guint prop_id,
@ -131,8 +131,6 @@ static gboolean gst_mss_demux_src_query (GstPad * pad, GstObject * parent,
GstQuery * query);
static void gst_mss_demux_download_loop (GstMssDemuxStream * stream);
static GstFlowReturn gst_mss_demux_stream_push (GstMssDemuxStream * stream,
GstBuffer * buffer);
static GstFlowReturn gst_mss_demux_stream_push_event (GstMssDemuxStream *
stream, GstEvent * event);
static GstFlowReturn gst_mss_demux_combine_flows (GstMssDemux * mssdemux);
@ -215,7 +213,6 @@ gst_mss_demux_stream_new (GstMssDemux * mssdemux,
GstMssDemuxStream *stream;
stream = g_new0 (GstMssDemuxStream, 1);
stream->downloader = gst_uri_downloader_new ();
/* Downloading task */
g_rec_mutex_init (&stream->download_lock);
@ -232,6 +229,8 @@ gst_mss_demux_stream_new (GstMssDemux * mssdemux,
DOWNLOAD_RATE_MAX_HISTORY_LENGTH);
gst_segment_init (&stream->segment, GST_FORMAT_TIME);
g_cond_init (&stream->fragment_download_cond);
g_mutex_init (&stream->fragment_download_lock);
return stream;
}
@ -243,7 +242,7 @@ gst_mss_demux_stream_free (GstMssDemuxStream * stream)
if (GST_TASK_STATE (stream->download_task) != GST_TASK_STOPPED) {
GST_DEBUG_OBJECT (stream->parent, "Leaving streaming task %s:%s",
GST_DEBUG_PAD_NAME (stream->pad));
gst_uri_downloader_cancel (stream->downloader);
g_cond_signal (&stream->fragment_download_cond);
gst_task_stop (stream->download_task);
GST_LOG_OBJECT (stream->parent, "Waiting for task to finish");
gst_task_join (stream->download_task);
@ -255,15 +254,25 @@ gst_mss_demux_stream_free (GstMssDemuxStream * stream)
}
gst_download_rate_deinit (&stream->download_rate);
if (stream->pending_newsegment) {
gst_event_unref (stream->pending_newsegment);
stream->pending_newsegment = NULL;
if (stream->pending_segment) {
gst_event_unref (stream->pending_segment);
stream->pending_segment = NULL;
}
if (stream->downloader != NULL) {
g_object_unref (stream->downloader);
stream->downloader = NULL;
if (stream->src_srcpad) {
gst_object_unref (stream->src_srcpad);
stream->src_srcpad = NULL;
}
if (stream->src) {
gst_element_set_state (stream->src, GST_STATE_NULL);
gst_bin_remove (GST_BIN_CAST (stream->parent), stream->src);
stream->src = NULL;
}
g_cond_clear (&stream->fragment_download_cond);
g_mutex_clear (&stream->fragment_download_lock);
if (stream->pad) {
gst_object_unref (stream->pad);
stream->pad = NULL;
@ -479,7 +488,7 @@ gst_mss_demux_stop_tasks (GstMssDemux * mssdemux, gboolean immediate)
gst_task_stop (stream->download_task);
stream->cancelled = TRUE;
if (immediate)
gst_uri_downloader_cancel (stream->downloader);
g_cond_signal (&stream->fragment_download_cond);
}
GST_OBJECT_UNLOCK (mssdemux);
@ -497,7 +506,6 @@ gst_mss_demux_restart_tasks (GstMssDemux * mssdemux)
GSList *iter;
for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
GstMssDemuxStream *stream = iter->data;
gst_uri_downloader_reset (stream->downloader);
stream->cancelled = FALSE;
gst_task_start (stream->download_task);
}
@ -562,9 +570,9 @@ gst_mss_demux_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
newsegment = gst_event_new_segment (&stream->segment);
gst_event_set_seqnum (newsegment, gst_event_get_seqnum (event));
if (stream->pending_newsegment)
gst_event_unref (stream->pending_newsegment);
stream->pending_newsegment = newsegment;
if (stream->pending_segment)
gst_event_unref (stream->pending_segment);
stream->pending_segment = newsegment;
}
if (flags & GST_SEEK_FLAG_FLUSH) {
@ -698,9 +706,10 @@ _set_src_pad_functions (GstPad * pad)
static GstPad *
_create_pad (GstMssDemux * mssdemux, GstMssStream * manifeststream)
{
gchar *name;
gchar *name = NULL;
GstPad *srcpad = NULL;
GstMssStreamType streamtype;
GstPadTemplate *tmpl = NULL;
streamtype = gst_mss_stream_get_type (manifeststream);
GST_DEBUG_OBJECT (mssdemux, "Found stream of type: %s",
@ -709,18 +718,18 @@ _create_pad (GstMssDemux * mssdemux, GstMssStream * manifeststream)
/* TODO use stream's name/bitrate/index as the pad name? */
if (streamtype == MSS_STREAM_TYPE_VIDEO) {
name = g_strdup_printf ("video_%02u", mssdemux->n_videos++);
srcpad =
gst_pad_new_from_static_template (&gst_mss_demux_videosrc_template,
name);
g_free (name);
tmpl = gst_static_pad_template_get (&gst_mss_demux_videosrc_template);
} else if (streamtype == MSS_STREAM_TYPE_AUDIO) {
name = g_strdup_printf ("audio_%02u", mssdemux->n_audios++);
srcpad =
gst_pad_new_from_static_template (&gst_mss_demux_audiosrc_template,
name);
g_free (name);
tmpl = gst_static_pad_template_get (&gst_mss_demux_audiosrc_template);
}
if (tmpl != NULL) {
srcpad =
GST_PAD_CAST (gst_ghost_pad_new_no_target_from_template (name, tmpl));
g_free (name);
gst_object_unref (tmpl);
}
if (!srcpad) {
GST_WARNING_OBJECT (mssdemux, "Ignoring unknown type stream");
return NULL;
@ -756,6 +765,7 @@ gst_mss_demux_create_streams (GstMssDemux * mssdemux)
}
stream = gst_mss_demux_stream_new (mssdemux, manifeststream, srcpad);
gst_pad_set_element_private (srcpad, stream);
gst_mss_stream_set_active (manifeststream, TRUE);
mssdemux->streams = g_slist_append (mssdemux->streams, stream);
}
@ -977,13 +987,14 @@ gst_mss_demux_reconfigure_stream (GstMssDemuxStream * stream)
if (stream->caps)
gst_caps_unref (stream->caps);
stream->caps = create_mss_caps (stream, caps);
gst_caps_unref (caps);
GST_DEBUG_OBJECT (stream->pad,
"Stream changed bitrate to %" G_GUINT64_FORMAT " caps: %"
GST_PTR_FORMAT,
gst_mss_stream_get_current_bitrate (stream->manifest_stream), caps);
gst_caps_unref (caps);
capsevent = gst_event_new_caps (stream->caps);
GST_DEBUG_OBJECT (stream->pad, "Finished streams reconfiguration");
}
@ -991,16 +1002,288 @@ gst_mss_demux_reconfigure_stream (GstMssDemuxStream * stream)
}
static GstFlowReturn
gst_mss_demux_stream_download_fragment (GstMssDemuxStream * stream,
GstBuffer ** _buffer)
_src_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
{
GstPad *srcpad = (GstPad *) parent;
GstMssDemux *demux = (GstMssDemux *) GST_PAD_PARENT (srcpad);
GstMssDemuxStream *stream = gst_pad_get_element_private (srcpad);
GstFlowReturn ret;
gboolean discont = FALSE;
if (stream->starting_fragment) {
#if 0
if (demux->segment.rate < 0)
/* Set DISCONT flag for every first buffer in reverse playback mode
* as each fragment for its own has to be reversed */
discont = TRUE;
#endif
stream->starting_fragment = FALSE;
GST_BUFFER_PTS (buffer) =
gst_mss_stream_get_fragment_gst_timestamp (stream->manifest_stream);
GST_LOG_OBJECT (stream->pad, "set fragment pts=%" GST_TIME_FORMAT,
GST_TIME_ARGS (GST_BUFFER_PTS (buffer)));
if (stream->pending_segment) {
/* And send a newsegment */
gst_pad_push_event (stream->pad, stream->pending_segment);
stream->pending_segment = NULL;
}
#if 0
stream->position = demux->segment.position = GST_BUFFER_PTS (buffer);
#endif
} else {
GST_BUFFER_PTS (buffer) = GST_CLOCK_TIME_NONE;
}
if (discont) {
GST_DEBUG_OBJECT (stream->pad, "Marking fragment as discontinuous");
GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
} else {
GST_BUFFER_FLAG_UNSET (buffer, GST_BUFFER_FLAG_DISCONT);
}
#if 0
GST_BUFFER_DURATION (buffer) =
gst_mss_stream_get_fragment_gst_duration (stream->manifest_stream);
#endif
GST_BUFFER_DURATION (buffer) = GST_CLOCK_TIME_NONE;
GST_BUFFER_DTS (buffer) = GST_CLOCK_TIME_NONE;
/* accumulate time and size to get this chunk */
stream->download_total_time +=
g_get_monotonic_time () - stream->download_start_time;
stream->download_total_bytes += gst_buffer_get_size (buffer);
ret = gst_proxy_pad_chain_default (pad, parent, buffer);
stream->download_start_time = g_get_monotonic_time ();
GST_LOG_OBJECT (pad, "Chain res: %d %s", ret, gst_flow_get_name (ret));
if (ret != GST_FLOW_OK) {
if (ret < GST_FLOW_EOS) {
GST_ELEMENT_ERROR (demux, STREAM, FAILED, (NULL),
("stream stopped, reason %s", gst_flow_get_name (ret)));
/* TODO push this on all pads */
gst_pad_push_event (stream->pad, gst_event_new_eos ());
} else {
GST_DEBUG_OBJECT (stream->pad, "stream stopped, reason %s",
gst_flow_get_name (ret));
}
/* TODO properly stop tasks */
/* gst_hls_demux_pause_tasks (demux); */
}
/* avoid having the source handle the same error again */
stream->last_ret = ret;
ret = GST_FLOW_OK;
return ret;
}
static gboolean
_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
{
GstPad *srcpad = GST_PAD_CAST (parent);
GstMssDemuxStream *stream = gst_pad_get_element_private (srcpad);
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_EOS:
g_cond_signal (&stream->fragment_download_cond);
break;
default:
break;
}
gst_event_unref (event);
return TRUE;
}
static gboolean
_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
{
switch (GST_QUERY_TYPE (query)) {
case GST_QUERY_ALLOCATION:
return FALSE;
break;
default:
break;
}
return gst_pad_query_default (pad, parent, query);
}
static gboolean
gst_mss_demux_stream_update_source (GstMssDemuxStream * stream,
const gchar * uri, const gchar * referer, gboolean refresh,
gboolean allow_cache)
{
GstMssDemux *demux = stream->parent;
if (!gst_uri_is_valid (uri)) {
GST_WARNING_OBJECT (stream->pad, "Invalid URI: %s", uri);
stream->last_ret = GST_FLOW_ERROR;
return FALSE;
}
if (stream->src != NULL) {
gchar *old_protocol, *new_protocol;
gchar *old_uri;
old_uri = gst_uri_handler_get_uri (GST_URI_HANDLER (stream->src));
old_protocol = gst_uri_get_protocol (old_uri);
new_protocol = gst_uri_get_protocol (uri);
if (!g_str_equal (old_protocol, new_protocol)) {
gst_object_unref (stream->src_srcpad);
gst_element_set_state (stream->src, GST_STATE_NULL);
gst_bin_remove (GST_BIN_CAST (demux), stream->src);
stream->src = NULL;
stream->src_srcpad = NULL;
GST_DEBUG_OBJECT (demux, "Can't re-use old source element");
} else {
GError *err = NULL;
GST_DEBUG_OBJECT (demux, "Re-using old source element");
if (!gst_uri_handler_set_uri (GST_URI_HANDLER (stream->src), uri, &err)) {
GST_DEBUG_OBJECT (demux, "Failed to re-use old source element: %s",
err->message);
g_clear_error (&err);
gst_element_set_state (stream->src, GST_STATE_NULL);
gst_bin_remove (GST_BIN_CAST (demux), stream->src);
stream->src = NULL;
}
}
g_free (old_uri);
g_free (old_protocol);
g_free (new_protocol);
}
if (stream->src == NULL) {
GObjectClass *gobject_class;
GstPad *internal_pad;
stream->src = gst_element_make_from_uri (GST_URI_SRC, uri, NULL, NULL);
if (stream->src == NULL) {
GST_ELEMENT_ERROR (demux, CORE, MISSING_PLUGIN,
("Missing plugin to handle URI: '%s'", uri), (NULL));
return FALSE;
}
gobject_class = G_OBJECT_GET_CLASS (stream->src);
if (g_object_class_find_property (gobject_class, "compress"))
g_object_set (stream->src, "compress", FALSE, NULL);
if (g_object_class_find_property (gobject_class, "keep-alive"))
g_object_set (stream->src, "keep-alive", TRUE, NULL);
if (g_object_class_find_property (gobject_class, "extra-headers")) {
if (referer || refresh || !allow_cache) {
GstStructure *extra_headers = gst_structure_new_empty ("headers");
if (referer)
gst_structure_set (extra_headers, "Referer", G_TYPE_STRING, referer,
NULL);
if (!allow_cache)
gst_structure_set (extra_headers, "Cache-Control", G_TYPE_STRING,
"no-cache", NULL);
else if (refresh)
gst_structure_set (extra_headers, "Cache-Control", G_TYPE_STRING,
"max-age=0", NULL);
g_object_set (stream->src, "extra-headers", extra_headers, NULL);
gst_structure_free (extra_headers);
} else {
g_object_set (stream->src, "extra-headers", NULL, NULL);
}
}
gst_element_set_locked_state (stream->src, TRUE);
gst_bin_add (GST_BIN_CAST (demux), stream->src);
stream->src_srcpad = gst_element_get_static_pad (stream->src, "src");
gst_ghost_pad_set_target (GST_GHOST_PAD_CAST (stream->pad),
stream->src_srcpad);
/* set up our internal pad to drop all events from
* the http src we don't care about. On the chain function
* we just push the buffer forward, but this way dash can get
* the flow return from downstream */
internal_pad =
GST_PAD_CAST (gst_proxy_pad_get_internal (GST_PROXY_PAD (stream->pad)));
gst_pad_set_chain_function (GST_PAD_CAST (internal_pad), _src_chain);
gst_pad_set_event_function (GST_PAD_CAST (internal_pad), _src_event);
/* need to set query otherwise deadlocks happen with allocation queries */
gst_pad_set_query_function (GST_PAD_CAST (internal_pad), _src_query);
gst_object_unref (internal_pad);
}
return TRUE;
}
/* must be called with the stream's fragment_download_lock */
static void
gst_mss_demux_stream_download_uri (GstMssDemux * demux,
GstMssDemuxStream * stream, const gchar * uri, gint64 start, gint64 end)
{
GST_DEBUG_OBJECT (stream->pad, "Downloading uri: %s, range:%" G_GINT64_FORMAT
" - %" G_GINT64_FORMAT, uri, start, end);
if (!gst_mss_demux_stream_update_source (stream, uri, NULL, FALSE, TRUE)) {
return;
}
if (gst_element_set_state (stream->src,
GST_STATE_READY) != GST_STATE_CHANGE_FAILURE) {
if (start != 0 || end != -1) {
if (!gst_element_send_event (stream->src, gst_event_new_seek (1.0,
GST_FORMAT_BYTES, (GstSeekFlags) GST_SEEK_FLAG_FLUSH,
GST_SEEK_TYPE_SET, start, GST_SEEK_TYPE_SET, end))) {
/* looks like the source can't handle seeks in READY */
/*
*err = g_error_new (GST_CORE_ERROR, GST_CORE_ERROR_NOT_IMPLEMENTED,
"Source element can't handle range requests");
*/
stream->last_ret = GST_FLOW_ERROR;
}
}
if (G_LIKELY (stream->last_ret == GST_FLOW_OK)) {
/* flush the proxypads so that the EOS state is reset */
gst_pad_push_event (stream->src_srcpad, gst_event_new_flush_start ());
gst_pad_push_event (stream->src_srcpad, gst_event_new_flush_stop (TRUE));
stream->download_start_time = g_get_monotonic_time ();
gst_element_sync_state_with_parent (stream->src);
/* wait for the fragment to be completely downloaded */
GST_DEBUG_OBJECT (stream->pad,
"Waiting for fragment download to finish: %s", uri);
g_cond_wait (&stream->fragment_download_cond,
&stream->fragment_download_lock);
}
} else {
stream->last_ret = GST_FLOW_CUSTOM_ERROR;
}
gst_element_set_state (stream->src, GST_STATE_READY);
}
static GstFlowReturn
gst_mss_demux_stream_download_fragment (GstMssDemuxStream * stream)
{
GstMssDemux *mssdemux = stream->parent;
gchar *path;
gchar *url;
GstFragment *fragment;
GstBuffer *buffer;
gchar *path = NULL;
gchar *url = NULL;
GstFlowReturn ret = GST_FLOW_OK;
#if 0
guint64 before_download, after_download;
#endif
/* special case for not-linked streams */
if (stream->last_ret == GST_FLOW_NOT_LINKED) {
@ -1008,21 +1291,30 @@ gst_mss_demux_stream_download_fragment (GstMssDemuxStream * stream,
stream);
return GST_FLOW_NOT_LINKED;
}
#if 0
before_download = g_get_real_time ();
#endif
g_mutex_lock (&stream->fragment_download_lock);
GST_DEBUG_OBJECT (stream->pad, "Getting url for stream");
ret = gst_mss_stream_get_fragment_url (stream->manifest_stream, &path);
switch (ret) {
case GST_FLOW_OK:
break; /* all is good, let's go */
case GST_FLOW_EOS:
g_free (path);
g_free (url);
if (gst_mss_manifest_is_live (mssdemux->manifest)) {
gst_mss_demux_reload_manifest (mssdemux);
g_mutex_unlock (&stream->fragment_download_lock);
return GST_FLOW_OK;
}
g_mutex_unlock (&stream->fragment_download_lock);
return GST_FLOW_EOS;
case GST_FLOW_ERROR:
g_mutex_unlock (&stream->fragment_download_lock);
g_free (path);
g_free (url);
goto error;
default:
break;
@ -1036,32 +1328,26 @@ gst_mss_demux_stream_download_fragment (GstMssDemuxStream * stream,
GST_DEBUG_OBJECT (mssdemux, "Got url '%s' for stream %p", url, stream);
fragment =
gst_uri_downloader_fetch_uri (stream->downloader, url,
mssdemux->manifest_uri, FALSE, FALSE, TRUE, NULL);
stream->starting_fragment = TRUE;
gst_mss_demux_stream_download_uri (mssdemux, stream, url, 0, -1);
g_free (path);
g_free (url);
g_mutex_unlock (&stream->fragment_download_lock);
if (!fragment) {
if (stream->last_ret != GST_FLOW_OK) {
GST_INFO_OBJECT (mssdemux, "No fragment downloaded");
/* TODO check if we are truly stoping */
if (gst_mss_manifest_is_live (mssdemux->manifest)) {
if (stream->last_ret != GST_FLOW_ERROR
&& gst_mss_manifest_is_live (mssdemux->manifest)) {
/* looks like there is no way of knowing when a live stream has ended
* Have to assume we are falling behind and cause a manifest reload */
return GST_FLOW_OK;
}
return GST_FLOW_ERROR;
return stream->last_ret;
}
buffer = gst_fragment_get_buffer (fragment);
*_buffer = buffer = gst_buffer_make_writable (buffer);
GST_BUFFER_TIMESTAMP (buffer) =
gst_mss_stream_get_fragment_gst_timestamp (stream->manifest_stream);
GST_BUFFER_DURATION (buffer) =
gst_mss_stream_get_fragment_gst_duration (stream->manifest_stream);
g_object_unref (fragment);
#if 0
after_download = g_get_real_time ();
{
#ifndef GST_DISABLE_GST_DEBUG
@ -1076,8 +1362,9 @@ gst_mss_demux_stream_download_fragment (GstMssDemuxStream * stream,
gst_buffer_get_size (buffer),
1000 * (after_download - before_download));
}
#endif
return ret;
return stream->last_ret;
no_url_error:
{
@ -1100,8 +1387,6 @@ gst_mss_demux_download_loop (GstMssDemuxStream * stream)
{
GstMssDemux *mssdemux = stream->parent;
GstFlowReturn ret;
GstBuffer *buffer = NULL;
gboolean buffer_downloaded = FALSE;
GstEvent *gap = NULL;
GstEvent *capsevent = NULL;
@ -1153,23 +1438,13 @@ gst_mss_demux_download_loop (GstMssDemuxStream * stream)
if (G_UNLIKELY (capsevent != NULL))
gst_pad_push_event (stream->pad, capsevent);
ret = gst_mss_demux_stream_download_fragment (stream, &buffer);
buffer_downloaded = buffer != NULL;
ret = gst_mss_demux_stream_download_fragment (stream);
GST_OBJECT_LOCK (mssdemux);
if (stream->cancelled) {
if (buffer)
gst_buffer_unref (buffer);
GST_OBJECT_UNLOCK (mssdemux);
goto cancelled;
}
GST_OBJECT_UNLOCK (mssdemux);
if (buffer) {
ret = gst_mss_demux_stream_push (stream, buffer);
}
GST_OBJECT_LOCK (mssdemux);
stream->last_ret = ret;
if (stream->cancelled) {
@ -1179,6 +1454,8 @@ gst_mss_demux_download_loop (GstMssDemuxStream * stream)
switch (ret) {
case GST_FLOW_OK:
stream->download_error_count = 0;
gst_mss_stream_advance_fragment (stream->manifest_stream);
break; /* all is good, let's go */
case GST_FLOW_EOS:
@ -1211,16 +1488,11 @@ gst_mss_demux_download_loop (GstMssDemuxStream * stream)
default:
if (ret <= GST_FLOW_ERROR) {
if (buffer_downloaded) {
GST_ERROR_OBJECT (mssdemux, "Error while pushing fragment");
} else {
GST_WARNING_OBJECT (mssdemux, "Error while downloading fragment");
if (++stream->download_error_count >=
DOWNLOAD_RATE_MAX_HISTORY_LENGTH) {
GST_ELEMENT_ERROR (mssdemux, RESOURCE, NOT_FOUND,
(_("Couldn't download fragments")),
("fragment downloading has failed too much consecutive times"));
}
GST_WARNING_OBJECT (mssdemux, "Error while downloading fragment");
if (++stream->download_error_count >= DOWNLOAD_RATE_MAX_HISTORY_LENGTH) {
GST_ELEMENT_ERROR (mssdemux, RESOURCE, NOT_FOUND,
(_("Couldn't download fragments")),
("fragment downloading has failed too much consecutive times"));
}
}
break;
@ -1231,11 +1503,6 @@ gst_mss_demux_download_loop (GstMssDemuxStream * stream)
gst_mss_demux_stream_push_event (stream, gst_event_new_eos ());
}
if (buffer_downloaded) {
stream->download_error_count = 0;
gst_mss_stream_advance_fragment (stream->manifest_stream);
}
GST_LOG_OBJECT (stream->pad, "download loop end");
return;
@ -1267,14 +1534,15 @@ gst_mss_demux_combine_flows (GstMssDemux * mssdemux)
return GST_FLOW_OK;
}
#if 0
static gboolean
gst_mss_demux_stream_push (GstMssDemuxStream * stream, GstBuffer * buf)
{
GstFlowReturn ret;
if (G_UNLIKELY (stream->pending_newsegment)) {
gst_pad_push_event (stream->pad, stream->pending_newsegment);
stream->pending_newsegment = NULL;
if (G_UNLIKELY (stream->pending_segment)) {
gst_pad_push_event (stream->pad, stream->pending_segment);
stream->pending_segment = NULL;
}
if (GST_BUFFER_TIMESTAMP (buf) != stream->next_timestamp) {
@ -1304,6 +1572,7 @@ gst_mss_demux_stream_push (GstMssDemuxStream * stream, GstBuffer * buf)
return ret;
}
#endif
static gboolean
gst_mss_demux_stream_push_event (GstMssDemuxStream * stream, GstEvent * event)

View file

@ -61,9 +61,11 @@ struct _GstMssDemuxStream {
GstMssStream *manifest_stream;
#if 0
GstUriDownloader *downloader;
#endif
GstEvent *pending_newsegment;
GstEvent *pending_segment;
GstClockTime next_timestamp;
GstSegment segment;
@ -81,10 +83,21 @@ struct _GstMssDemuxStream {
GstDownloadRate download_rate;
guint download_error_count;
/* download tooling */
GstElement *src;
GstPad *src_srcpad;
GMutex fragment_download_lock;
GCond fragment_download_cond;
gboolean starting_fragment;
gint64 download_start_time;
gint64 download_total_time;
gint64 download_total_bytes;
gint current_download_rate;
};
struct _GstMssDemux {
GstElement element;
GstBin bin;
/* pads */
GstPad *sinkpad;
@ -111,7 +124,7 @@ struct _GstMssDemux {
};
struct _GstMssDemuxClass {
GstElementClass parent_class;
GstBinClass parent_class;
};
GType gst_mss_demux_get_type (void);