mssdemux: implement bitrate switching

When connection-speed changes, signal that we might need a bitrate
switch. During the switch, a new pad group is added and the old one
is drained and removed.

New pads also need to push newsegments before starting to stream
This commit is contained in:
Thiago Santos 2013-01-10 15:16:36 -03:00
parent b9aec0ad0d
commit da4fad2f6f
2 changed files with 146 additions and 54 deletions

View file

@ -124,9 +124,9 @@ gst_mss_demux_class_init (GstMssDemuxClass * klass)
gobject_class->get_property = gst_mss_demux_get_property;
g_object_class_install_property (gobject_class, PROP_CONNECTION_SPEED,
g_param_spec_uint64 ("connection-speed", "Connection Speed",
g_param_spec_uint ("connection-speed", "Connection Speed",
"Network connection speed in kbps (0 = unknown)",
0, G_MAXUINT64 / 1000, DEFAULT_CONNECTION_SPEED,
0, G_MAXUINT / 1000, DEFAULT_CONNECTION_SPEED,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
gstelement_class->change_state =
@ -246,7 +246,12 @@ gst_mss_demux_set_property (GObject * object, guint prop_id,
switch (prop_id) {
case PROP_CONNECTION_SPEED:
GST_OBJECT_LOCK (mssdemux);
mssdemux->connection_speed = g_value_get_uint (value) * 1000;
mssdemux->update_bitrates = TRUE;
GST_DEBUG_OBJECT (mssdemux, "Connection speed set to %llu",
mssdemux->connection_speed);
GST_OBJECT_UNLOCK (mssdemux);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
@ -369,6 +374,38 @@ gst_mss_demux_event (GstPad * pad, GstEvent * event)
return ret;
}
static void
gst_mss_demux_stop_tasks (GstMssDemux * mssdemux, gboolean immediate)
{
GSList *iter;
for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
GstMssDemuxStream *stream = iter->data;
if (immediate)
gst_uri_downloader_cancel (stream->downloader);
gst_task_pause (stream->stream_task);
}
for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
GstMssDemuxStream *stream = iter->data;
g_static_rec_mutex_lock (&stream->stream_lock);
}
}
static void
gst_mss_demux_restart_tasks (GstMssDemux * mssdemux)
{
GSList *iter;
for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
GstMssDemuxStream *stream = iter->data;
g_static_rec_mutex_unlock (&stream->stream_lock);
}
for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
GstMssDemuxStream *stream = iter->data;
gst_task_start (stream->stream_task);
}
}
static gboolean
gst_mss_demux_src_event (GstPad * pad, GstEvent * event)
{
@ -408,17 +445,7 @@ gst_mss_demux_src_event (GstPad * pad, GstEvent * event)
gst_event_unref (flush);
}
/* stop the tasks */
for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
GstMssDemuxStream *stream = iter->data;
gst_uri_downloader_cancel (stream->downloader);
gst_task_pause (stream->stream_task);
}
for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
GstMssDemuxStream *stream = iter->data;
g_static_rec_mutex_lock (&stream->stream_lock);
}
gst_mss_demux_stop_tasks (mssdemux, TRUE);
if (!gst_mss_manifest_seek (mssdemux->manifest, start)) {;
GST_WARNING_OBJECT (mssdemux, "Could not find seeked fragment");
@ -444,16 +471,7 @@ gst_mss_demux_src_event (GstPad * pad, GstEvent * event)
gst_event_unref (flush);
}
/* restart tasks */
for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
GstMssDemuxStream *stream = iter->data;
g_static_rec_mutex_unlock (&stream->stream_lock);
}
for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
GstMssDemuxStream *stream = iter->data;
gst_task_start (stream->stream_task);
}
gst_mss_demux_restart_tasks (mssdemux);
return TRUE;
}
@ -535,6 +553,41 @@ _set_src_pad_functions (GstPad * pad)
gst_pad_set_event_function (pad, GST_DEBUG_FUNCPTR (gst_mss_demux_src_event));
}
static GstPad *
_create_pad (GstMssDemux * mssdemux, GstMssStream * manifeststream)
{
gchar *name;
GstPad *srcpad = NULL;
GstMssStreamType streamtype;
streamtype = gst_mss_stream_get_type (manifeststream);
GST_DEBUG_OBJECT (mssdemux, "Found stream of type: %s",
gst_mss_stream_type_name (streamtype));
/* TODO use stream's name/bitrate/index as the pad name? */
if (streamtype == MSS_STREAM_TYPE_VIDEO) {
name = g_strdup_printf ("video_%02u", mssdemux->n_videos++);
srcpad =
gst_pad_new_from_static_template (&gst_mss_demux_videosrc_template,
name);
g_free (name);
} else if (streamtype == MSS_STREAM_TYPE_AUDIO) {
name = g_strdup_printf ("audio_%02u", mssdemux->n_audios++);
srcpad =
gst_pad_new_from_static_template (&gst_mss_demux_audiosrc_template,
name);
g_free (name);
}
if (!srcpad) {
GST_WARNING_OBJECT (mssdemux, "Ignoring unknown type stream");
return NULL;
}
_set_src_pad_functions (srcpad);
return srcpad;
}
static void
gst_mss_demux_create_streams (GstMssDemux * mssdemux)
{
@ -550,46 +603,29 @@ gst_mss_demux_create_streams (GstMssDemux * mssdemux)
}
for (iter = streams; iter; iter = g_slist_next (iter)) {
gchar *name;
GstPad *srcpad = NULL;
GstMssDemuxStream *stream = NULL;
GstMssStream *manifeststream = iter->data;
GstMssStreamType streamtype;
streamtype = gst_mss_stream_get_type (manifeststream);
GST_DEBUG_OBJECT (mssdemux, "Found stream of type: %s",
gst_mss_stream_type_name (streamtype));
/* TODO use stream's name as the pad name? */
if (streamtype == MSS_STREAM_TYPE_VIDEO) {
name = g_strdup_printf ("video_%02u", mssdemux->n_videos++);
srcpad =
gst_pad_new_from_static_template (&gst_mss_demux_videosrc_template,
name);
g_free (name);
} else if (streamtype == MSS_STREAM_TYPE_AUDIO) {
name = g_strdup_printf ("audio_%02u", mssdemux->n_audios++);
srcpad =
gst_pad_new_from_static_template (&gst_mss_demux_audiosrc_template,
name);
g_free (name);
}
srcpad = _create_pad (mssdemux, manifeststream);
if (!srcpad) {
GST_WARNING_OBJECT (mssdemux, "Ignoring unknown type stream");
continue;
}
_set_src_pad_functions (srcpad);
stream = gst_mss_demux_stream_new (mssdemux, manifeststream, srcpad);
gst_mss_stream_set_active (manifeststream, TRUE);
mssdemux->streams = g_slist_append (mssdemux->streams, stream);
}
/* select initial bitrates */
GST_OBJECT_LOCK (mssdemux);
GST_INFO_OBJECT (mssdemux, "Changing max bitrate to %llu",
mssdemux->connection_speed);
gst_mss_manifest_change_bitrate (mssdemux->manifest,
mssdemux->connection_speed);
mssdemux->update_bitrates = FALSE;
GST_OBJECT_UNLOCK (mssdemux);
}
static gboolean
@ -685,6 +721,47 @@ gst_mss_demux_process_manifest (GstMssDemux * mssdemux)
gst_element_no_more_pads (GST_ELEMENT_CAST (mssdemux));
}
static void
gst_mss_demux_reconfigure (GstMssDemux * mssdemux)
{
GSList *oldpads = NULL;
GSList *iter;
gst_mss_demux_stop_tasks (mssdemux, FALSE);
if (gst_mss_manifest_change_bitrate (mssdemux->manifest,
mssdemux->connection_speed)) {
GST_DEBUG_OBJECT (mssdemux, "Creating new pad group");
/* if we changed the bitrate, we need to add new pads */
for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
GstMssDemuxStream *stream = iter->data;
GstClockTime ts =
gst_mss_stream_get_fragment_gst_timestamp (stream->manifest_stream);
oldpads = g_slist_prepend (oldpads, stream->pad);
stream->pad = _create_pad (mssdemux, stream->manifest_stream);
/* TODO keep the same playback rate */
stream->pending_newsegment =
gst_event_new_new_segment (FALSE, 1.0, GST_FORMAT_TIME, ts, -1, ts);
gst_mss_demux_expose_stream (mssdemux, stream);
}
gst_element_no_more_pads (GST_ELEMENT (mssdemux));
for (iter = oldpads; iter; iter = g_slist_next (iter)) {
GstPad *oldpad = iter->data;
/* Push out EOS */
gst_pad_push_event (oldpad, gst_event_new_eos ());
gst_pad_set_active (oldpad, FALSE);
gst_element_remove_pad (GST_ELEMENT (mssdemux), oldpad);
gst_object_unref (oldpad);
}
}
gst_mss_demux_restart_tasks (mssdemux);
}
static void
gst_mss_demux_stream_loop (GstMssDemuxStream * stream)
{
@ -695,6 +772,20 @@ gst_mss_demux_stream_loop (GstMssDemuxStream * stream)
GstBuffer *buffer;
GstFlowReturn ret;
GST_OBJECT_LOCK (mssdemux);
if (mssdemux->update_bitrates) {
mssdemux->update_bitrates = FALSE;
GST_OBJECT_UNLOCK (mssdemux);
GST_DEBUG_OBJECT (mssdemux,
"Starting streams reconfiguration due to bitrate changes");
g_thread_create ((GThreadFunc) gst_mss_demux_reconfigure, mssdemux, FALSE,
NULL);
GST_DEBUG_OBJECT (mssdemux, "Finished streams reconfiguration");
} else {
GST_OBJECT_UNLOCK (mssdemux);
}
GST_DEBUG_OBJECT (mssdemux, "Getting url for stream %p", stream);
ret = gst_mss_stream_get_fragment_url (stream->manifest_stream, &path);
switch (ret) {
@ -732,6 +823,12 @@ gst_mss_demux_stream_loop (GstMssDemuxStream * stream)
GST_BUFFER_DURATION (buffer) =
gst_mss_stream_get_fragment_gst_duration (stream->manifest_stream);
if (GST_BUFFER_TIMESTAMP (buffer) > 10 * GST_SECOND
&& mssdemux->connection_speed != 1000) {
mssdemux->connection_speed = 1000;
mssdemux->update_bitrates = TRUE;
}
if (G_UNLIKELY (stream->pending_newsegment)) {
gst_pad_push_event (stream->pad, stream->pending_newsegment);
stream->pending_newsegment = NULL;
@ -764,14 +861,6 @@ eos:
gst_task_stop (stream->stream_task);
return;
}
no_url_error:
{
GST_ELEMENT_ERROR (mssdemux, STREAM, DEMUX,
(_("Failed to get fragment URL.")),
("An error happened when getting fragment URL"));
gst_task_stop (stream->stream_task);
return;
}
error:
{
GST_WARNING_OBJECT (mssdemux, "Error while pushing fragment");

View file

@ -64,6 +64,7 @@ struct _GstMssDemuxStream {
/* Streaming task */
GstTask *stream_task;
GStaticRecMutex stream_lock;
};
struct _GstMssDemux {
@ -81,6 +82,8 @@ struct _GstMssDemux {
guint n_videos;
guint n_audios;
gboolean update_bitrates;
/* properties */
guint64 connection_speed; /* in bps */
};