mssdemux: initial implementation of seek event handling

Adds basic handling for seek in time events. Needs to cooperate
with the downstream qtdemux so that it forwards the seeks and
the corresponding newsegments
This commit is contained in:
Thiago Santos 2012-12-28 00:48:33 -03:00
parent 33d872916b
commit cb7e3d1f3b
4 changed files with 231 additions and 8 deletions

View file

@ -164,6 +164,12 @@ gst_mss_demux_stream_free (GstMssDemuxStream * stream)
stream->stream_task = NULL; stream->stream_task = NULL;
} }
if (stream->pending_newsegment) {
gst_event_unref (stream->pending_newsegment);
stream->pending_newsegment = NULL;
}
if (stream->downloader != NULL) { if (stream->downloader != NULL) {
g_object_unref (stream->downloader); g_object_unref (stream->downloader);
stream->downloader = NULL; stream->downloader = NULL;
@ -264,6 +270,20 @@ gst_mss_demux_start (GstMssDemux * mssdemux)
} }
} }
static gboolean
gst_mss_demux_push_src_event (GstMssDemux * mssdemux, GstEvent * event)
{
GSList *iter;
gboolean ret = TRUE;
for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
GstMssDemuxStream *stream = iter->data;
gst_event_ref (event);
ret = ret & gst_pad_push_event (stream->pad, event);
}
return ret;
}
static gboolean static gboolean
gst_mss_demux_event (GstPad * pad, GstEvent * event) gst_mss_demux_event (GstPad * pad, GstEvent * event)
{ {
@ -295,6 +315,101 @@ gst_mss_demux_event (GstPad * pad, GstEvent * event)
return ret; return ret;
} }
static gboolean
gst_mss_demux_src_event (GstPad * pad, GstEvent * event)
{
GstMssDemux *mssdemux;
mssdemux = GST_MSS_DEMUX (GST_PAD_PARENT (pad));
switch (event->type) {
case GST_EVENT_SEEK:
{
gdouble rate;
GstFormat format;
GstSeekFlags flags;
GstSeekType start_type, stop_type;
gint64 start, stop;
GstEvent *newsegment;
GSList *iter;
GST_INFO_OBJECT (mssdemux, "Received GST_EVENT_SEEK");
gst_event_parse_seek (event, &rate, &format, &flags, &start_type, &start,
&stop_type, &stop);
if (format != GST_FORMAT_TIME)
return FALSE;
GST_DEBUG_OBJECT (mssdemux,
"seek event, rate: %f start: %" GST_TIME_FORMAT " stop: %"
GST_TIME_FORMAT, rate, GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
if (flags & GST_SEEK_FLAG_FLUSH) {
GstEvent *flush = gst_event_new_flush_start ();
GST_DEBUG_OBJECT (mssdemux, "sending flush start");
gst_event_set_seqnum (flush, gst_event_get_seqnum (event));
gst_mss_demux_push_src_event (mssdemux, flush);
gst_event_unref (flush);
}
/* stop the tasks */
for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
GstMssDemuxStream *stream = iter->data;
gst_uri_downloader_cancel (stream->downloader);
gst_task_pause (stream->stream_task);
}
for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
GstMssDemuxStream *stream = iter->data;
g_static_rec_mutex_lock (&stream->stream_lock);
}
if (!gst_mss_manifest_seek (mssdemux->manifest, start)) {;
GST_WARNING_OBJECT (mssdemux, "Could not find seeked fragment");
return FALSE;
}
newsegment =
gst_event_new_new_segment (FALSE, rate, format, start, stop, start);
gst_event_set_seqnum (newsegment, gst_event_get_seqnum (event));
for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
GstMssDemuxStream *stream = iter->data;
stream->pending_newsegment = gst_event_ref (newsegment);
}
gst_event_unref (newsegment);
if (flags & GST_SEEK_FLAG_FLUSH) {
GstEvent *flush = gst_event_new_flush_stop ();
GST_DEBUG_OBJECT (mssdemux, "sending flush stop");
gst_event_set_seqnum (flush, gst_event_get_seqnum (event));
gst_mss_demux_push_src_event (mssdemux, flush);
gst_event_unref (flush);
}
/* restart tasks */
for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
GstMssDemuxStream *stream = iter->data;
g_static_rec_mutex_unlock (&stream->stream_lock);
}
for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
GstMssDemuxStream *stream = iter->data;
gst_task_start (stream->stream_task);
}
return TRUE;
}
default:
break;
}
return gst_pad_event_default (pad, event);
}
static gboolean static gboolean
gst_mss_demux_src_query (GstPad * pad, GstQuery * query) gst_mss_demux_src_query (GstPad * pad, GstQuery * query)
{ {
@ -313,14 +428,8 @@ gst_mss_demux_src_query (GstPad * pad, GstQuery * query)
gst_query_parse_duration (query, &fmt, NULL); gst_query_parse_duration (query, &fmt, NULL);
if (fmt == GST_FORMAT_TIME && mssdemux->manifest) { if (fmt == GST_FORMAT_TIME && mssdemux->manifest) {
/* TODO should we use the streams accumulated duration? */ /* TODO should we use the streams accumulated duration or the main manifest duration? */
guint64 dur = gst_mss_manifest_get_duration (mssdemux->manifest); duration = gst_mss_manifest_get_gst_duration (mssdemux->manifest);
guint64 timescale = gst_mss_manifest_get_timescale (mssdemux->manifest);
if (dur != -1 && timescale != -1)
duration =
(GstClockTime) gst_util_uint64_scale_round (dur, GST_SECOND,
timescale);
if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0) { if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0) {
gst_query_set_duration (query, GST_FORMAT_TIME, duration); gst_query_set_duration (query, GST_FORMAT_TIME, duration);
@ -335,6 +444,25 @@ gst_mss_demux_src_query (GstPad * pad, GstQuery * query)
gst_query_set_latency (query, FALSE, 0, -1); gst_query_set_latency (query, FALSE, 0, -1);
ret = TRUE; ret = TRUE;
break; break;
case GST_QUERY_SEEKING:{
GstFormat fmt;
gint64 stop = -1;
gst_query_parse_seeking (query, &fmt, NULL, NULL, NULL);
GST_INFO_OBJECT (mssdemux, "Received GST_QUERY_SEEKING with format %d",
fmt);
if (fmt == GST_FORMAT_TIME) {
GstClockTime duration;
duration = gst_mss_manifest_get_gst_duration (mssdemux->manifest);
if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0)
stop = duration;
gst_query_set_seeking (query, fmt, TRUE, 0, stop);
ret = TRUE;
GST_INFO_OBJECT (mssdemux, "GST_QUERY_SEEKING returning with stop : %"
GST_TIME_FORMAT, GST_TIME_ARGS (stop));
}
break;
}
default: default:
/* Don't fordward queries upstream because of the special nature of this /* Don't fordward queries upstream because of the special nature of this
* "demuxer", which relies on the upstream element only to be fed * "demuxer", which relies on the upstream element only to be fed
@ -350,6 +478,7 @@ static void
_set_src_pad_functions (GstPad * pad) _set_src_pad_functions (GstPad * pad)
{ {
gst_pad_set_query_function (pad, GST_DEBUG_FUNCPTR (gst_mss_demux_src_query)); gst_pad_set_query_function (pad, GST_DEBUG_FUNCPTR (gst_mss_demux_src_query));
gst_pad_set_event_function (pad, GST_DEBUG_FUNCPTR (gst_mss_demux_src_event));
} }
static void static void
@ -508,10 +637,21 @@ gst_mss_demux_stream_loop (GstMssDemuxStream * stream)
g_free (path); g_free (path);
g_free (url); g_free (url);
if (!fragment) {
GST_INFO_OBJECT (mssdemux, "No fragment downloaded");
/* TODO check if we are truly stoping */
return;
}
buffer = gst_fragment_get_buffer (fragment); buffer = gst_fragment_get_buffer (fragment);
buffer = gst_buffer_make_metadata_writable (buffer); buffer = gst_buffer_make_metadata_writable (buffer);
gst_buffer_set_caps (buffer, GST_PAD_CAPS (stream->pad)); gst_buffer_set_caps (buffer, GST_PAD_CAPS (stream->pad));
if (G_UNLIKELY (stream->pending_newsegment)) {
gst_pad_push_event (stream->pad, stream->pending_newsegment);
stream->pending_newsegment = NULL;
}
ret = gst_pad_push (stream->pad, buffer); ret = gst_pad_push (stream->pad, buffer);
switch (ret) { switch (ret) {
case GST_FLOW_UNEXPECTED: case GST_FLOW_UNEXPECTED:

View file

@ -59,6 +59,8 @@ struct _GstMssDemuxStream {
GstUriDownloader *downloader; GstUriDownloader *downloader;
GstEvent *pending_newsegment;
/* Streaming task */ /* Streaming task */
GstTask *stream_task; GstTask *stream_task;
GStaticRecMutex stream_lock; GStaticRecMutex stream_lock;
@ -78,6 +80,7 @@ struct _GstMssDemux {
GSList *streams; GSList *streams;
guint n_videos; guint n_videos;
guint n_audios; guint n_audios;
}; };
struct _GstMssDemuxClass { struct _GstMssDemuxClass {

View file

@ -496,6 +496,28 @@ gst_mss_manifest_get_duration (GstMssManifest * manifest)
return dur; return dur;
} }
/**
* Gets the duration in nanoseconds
*/
GstClockTime
gst_mss_manifest_get_gst_duration (GstMssManifest * manifest)
{
guint64 duration = -1;
guint64 timescale;
GstClockTime gstdur = GST_CLOCK_TIME_NONE;
duration = gst_mss_manifest_get_duration (manifest);
timescale = gst_mss_manifest_get_timescale (manifest);
if (duration != -1 && timescale != -1)
gstdur =
(GstClockTime) gst_util_uint64_scale_round (duration, GST_SECOND,
timescale);
return gstdur;
}
GstCaps * GstCaps *
gst_mss_stream_get_caps (GstMssStream * stream) gst_mss_stream_get_caps (GstMssStream * stream)
{ {
@ -565,3 +587,58 @@ gst_mss_stream_type_name (GstMssStreamType streamtype)
return "unknown"; return "unknown";
} }
} }
/**
* Seeks all streams to the fragment that contains the set time
*
* @time: time in nanoseconds
*/
gboolean
gst_mss_manifest_seek (GstMssManifest * manifest, guint64 time)
{
gboolean ret = TRUE;
GSList *iter;
for (iter = manifest->streams; iter; iter = g_slist_next (iter)) {
ret = gst_mss_stream_seek (iter->data, time) & ret;
}
return ret;
}
/**
* Seeks this stream to the fragment that contains the sample at time
*
* @time: time in nanoseconds
*/
gboolean
gst_mss_stream_seek (GstMssStream * stream, guint64 time)
{
GList *iter;
guint64 timescale;
timescale = gst_mss_stream_get_timescale (stream);
time = gst_util_uint64_scale_round (time, timescale, GST_SECOND);
for (iter = stream->fragments; iter; iter = g_list_next (iter)) {
GList *next = g_list_next (iter);
if (next) {
GstMssStreamFragment *fragment = next->data;
if (fragment->time > time) {
stream->current_fragment = iter;
break;
}
} else {
GstMssStreamFragment *fragment = iter->data;
if (fragment->time + fragment->duration > time) {
stream->current_fragment = iter;
} else {
stream->current_fragment = NULL; /* EOS */
}
break;
}
}
return TRUE;
}

View file

@ -43,12 +43,15 @@ void gst_mss_manifest_free (GstMssManifest * manifest);
GSList * gst_mss_manifest_get_streams (GstMssManifest * manifest); GSList * gst_mss_manifest_get_streams (GstMssManifest * manifest);
guint64 gst_mss_manifest_get_timescale (GstMssManifest * manifest); guint64 gst_mss_manifest_get_timescale (GstMssManifest * manifest);
guint64 gst_mss_manifest_get_duration (GstMssManifest * manifest); guint64 gst_mss_manifest_get_duration (GstMssManifest * manifest);
GstClockTime gst_mss_manifest_get_gst_duration (GstMssManifest * manifest);
gboolean gst_mss_manifest_seek (GstMssManifest * manifest, guint64 time);
GstMssStreamType gst_mss_stream_get_type (GstMssStream *stream); GstMssStreamType gst_mss_stream_get_type (GstMssStream *stream);
GstCaps * gst_mss_stream_get_caps (GstMssStream * stream); GstCaps * gst_mss_stream_get_caps (GstMssStream * stream);
guint64 gst_mss_stream_get_timescale (GstMssStream * stream); guint64 gst_mss_stream_get_timescale (GstMssStream * stream);
GstFlowReturn gst_mss_stream_get_fragment_url (GstMssStream * stream, gchar ** url); GstFlowReturn gst_mss_stream_get_fragment_url (GstMssStream * stream, gchar ** url);
GstFlowReturn gst_mss_stream_advance_fragment (GstMssStream * stream); GstFlowReturn gst_mss_stream_advance_fragment (GstMssStream * stream);
gboolean gst_mss_stream_seek (GstMssStream * stream, guint64 time);
const gchar * gst_mss_stream_type_name (GstMssStreamType streamtype); const gchar * gst_mss_stream_type_name (GstMssStreamType streamtype);