adaptivedemux: Allow application to force EOS

Adaptive demuxers are special demuxers that runs their own sources
internally. In this patch we flag the demuxer as being a source in order
to receive the downstream events. We then handle the EOS event by
resetting the internal state and pushing EOS on all pads. This handling
is done asynchronously to avoid blocking user thread.

https://bugzilla.gnome.org/show_bug.cgi?id=723868
This commit is contained in:
Nicolas Dufresne 2017-07-03 15:28:25 -04:00
parent 7e6a44388b
commit 8a070cf9af

View file

@ -210,6 +210,10 @@ struct _GstAdaptiveDemuxPrivate
* without needing to stop tasks when they just want to
* update the segment boundaries */
GMutex segment_lock;
/* Used to keep application EOS event asynchronous, protected by the
* MANIFEST_LOC */
gboolean pending_eos;
};
typedef struct _GstAdaptiveDemuxTimer
@ -229,6 +233,9 @@ static void gst_adaptive_demux_finalize (GObject * object);
static GstStateChangeReturn gst_adaptive_demux_change_state (GstElement *
element, GstStateChange transition);
static gboolean gst_adaptive_demux_send_event (GstElement * element,
GstEvent * event);
static void gst_adaptive_demux_handle_message (GstBin * bin, GstMessage * msg);
static gboolean gst_adaptive_demux_sink_event (GstPad * pad, GstObject * parent,
@ -246,7 +253,8 @@ gst_adaptive_demux_push_src_event (GstAdaptiveDemux * demux, GstEvent * event);
static void gst_adaptive_demux_updates_loop (GstAdaptiveDemux * demux);
static void gst_adaptive_demux_stream_download_loop (GstAdaptiveDemuxStream *
stream);
static void gst_adaptive_demux_reset (GstAdaptiveDemux * demux);
static void gst_adaptive_demux_reset (GstAdaptiveDemux * demux,
GstEvent * event);
static gboolean gst_adaptive_demux_prepare_streams (GstAdaptiveDemux * demux,
gboolean first_and_live);
static gboolean gst_adaptive_demux_expose_streams (GstAdaptiveDemux * demux);
@ -422,6 +430,7 @@ gst_adaptive_demux_class_init (GstAdaptiveDemuxClass * klass)
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
gstelement_class->change_state = gst_adaptive_demux_change_state;
gstelement_class->send_event = gst_adaptive_demux_send_event;
gstbin_class->handle_message = gst_adaptive_demux_handle_message;
@ -452,6 +461,9 @@ gst_adaptive_demux_init (GstAdaptiveDemux * demux,
demux->have_group_id = FALSE;
demux->group_id = G_MAXUINT;
/* To receive downstream events, so we can handle EOS */
GST_OBJECT_FLAG_SET (demux, GST_ELEMENT_FLAG_SOURCE);
gst_segment_init (&demux->segment, GST_FORMAT_TIME);
gst_bin_set_suppressed_flags (GST_BIN_CAST (demux),
@ -561,12 +573,12 @@ gst_adaptive_demux_change_state (GstElement * element,
case GST_STATE_CHANGE_PAUSED_TO_READY:
GST_MANIFEST_LOCK (demux);
demux->running = FALSE;
gst_adaptive_demux_reset (demux);
gst_adaptive_demux_reset (demux, NULL);
GST_MANIFEST_UNLOCK (demux);
break;
case GST_STATE_CHANGE_READY_TO_PAUSED:
GST_MANIFEST_LOCK (demux);
gst_adaptive_demux_reset (demux);
gst_adaptive_demux_reset (demux, NULL);
/* Clear "cancelled" flag in uridownloader since subclass might want to
* use uridownloader to fetch another manifest */
gst_uri_downloader_reset (demux->downloader);
@ -590,6 +602,40 @@ gst_adaptive_demux_change_state (GstElement * element,
return result;
}
static void
gst_adaptive_demux_force_eos (GstElement * element, gpointer user_data)
{
GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (element);
GstEvent *event = user_data;
GST_MANIFEST_LOCK (demux);
if (demux->priv->pending_eos) {
GST_DEBUG_OBJECT (demux, "Forcing EOS now.");
gst_adaptive_demux_reset (demux, event);
}
GST_MANIFEST_UNLOCK (demux);
}
static gboolean
gst_adaptive_demux_send_event (GstElement * element, GstEvent * event)
{
GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (element);
gboolean result = FALSE;
if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
GST_MANIFEST_LOCK (demux);
demux->priv->pending_eos = TRUE;
gst_element_call_async (element, gst_adaptive_demux_force_eos, event,
(GDestroyNotify) gst_event_unref);
result = TRUE;
GST_MANIFEST_UNLOCK (demux);
} else {
gst_event_unref (event);
}
return result;
}
static gboolean
gst_adaptive_demux_sink_event (GstPad * pad, GstObject * parent,
GstEvent * event)
@ -602,7 +648,7 @@ gst_adaptive_demux_sink_event (GstPad * pad, GstObject * parent,
GST_API_LOCK (demux);
GST_MANIFEST_LOCK (demux);
gst_adaptive_demux_reset (demux);
gst_adaptive_demux_reset (demux, NULL);
ret = gst_pad_event_default (pad, parent, event);
@ -760,12 +806,11 @@ gst_adaptive_demux_sink_chain (GstPad * pad, GstObject * parent,
/* must be called with manifest_lock taken */
static void
gst_adaptive_demux_reset (GstAdaptiveDemux * demux)
gst_adaptive_demux_reset (GstAdaptiveDemux * demux, GstEvent * eos)
{
GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
GList *iter;
GList *old_streams;
GstEvent *eos;
/* take ownership of old_streams before releasing the manifest_lock in
* gst_adaptive_demux_stop_tasks
@ -778,7 +823,11 @@ gst_adaptive_demux_reset (GstAdaptiveDemux * demux)
if (klass->reset)
klass->reset (demux);
eos = gst_event_new_eos ();
if (eos)
gst_event_ref (eos);
else
eos = gst_event_new_eos ();
for (iter = demux->streams; iter; iter = g_list_next (iter)) {
GstAdaptiveDemuxStream *stream = iter->data;
if (stream->pad) {
@ -790,6 +839,7 @@ gst_adaptive_demux_reset (GstAdaptiveDemux * demux)
gst_adaptive_demux_stream_free (stream);
}
gst_event_unref (eos);
demux->priv->pending_eos = FALSE;
g_list_free (demux->streams);
demux->streams = NULL;
if (demux->prepared_streams) {