From 8a070cf9aff8a122b1a52597441bab61c0476ef9 Mon Sep 17 00:00:00 2001 From: Nicolas Dufresne Date: Mon, 3 Jul 2017 15:28:25 -0400 Subject: [PATCH] adaptivedemux: Allow application to force EOS Adaptive demuxers are special demuxers that runs their own sources internally. In this patch we flag the demuxer as being a source in order to receive the downstream events. We then handle the EOS event by resetting the internal state and pushing EOS on all pads. This handling is done asynchronously to avoid blocking user thread. https://bugzilla.gnome.org/show_bug.cgi?id=723868 --- gst-libs/gst/adaptivedemux/gstadaptivedemux.c | 64 +++++++++++++++++-- 1 file changed, 57 insertions(+), 7 deletions(-) diff --git a/gst-libs/gst/adaptivedemux/gstadaptivedemux.c b/gst-libs/gst/adaptivedemux/gstadaptivedemux.c index 15c7f77d20..014f90f087 100644 --- a/gst-libs/gst/adaptivedemux/gstadaptivedemux.c +++ b/gst-libs/gst/adaptivedemux/gstadaptivedemux.c @@ -210,6 +210,10 @@ struct _GstAdaptiveDemuxPrivate * without needing to stop tasks when they just want to * update the segment boundaries */ GMutex segment_lock; + + /* Used to keep application EOS event asynchronous, protected by the + * MANIFEST_LOC */ + gboolean pending_eos; }; typedef struct _GstAdaptiveDemuxTimer @@ -229,6 +233,9 @@ static void gst_adaptive_demux_finalize (GObject * object); static GstStateChangeReturn gst_adaptive_demux_change_state (GstElement * element, GstStateChange transition); +static gboolean gst_adaptive_demux_send_event (GstElement * element, + GstEvent * event); + static void gst_adaptive_demux_handle_message (GstBin * bin, GstMessage * msg); static gboolean gst_adaptive_demux_sink_event (GstPad * pad, GstObject * parent, @@ -246,7 +253,8 @@ gst_adaptive_demux_push_src_event (GstAdaptiveDemux * demux, GstEvent * event); static void gst_adaptive_demux_updates_loop (GstAdaptiveDemux * demux); static void gst_adaptive_demux_stream_download_loop (GstAdaptiveDemuxStream * stream); -static void gst_adaptive_demux_reset (GstAdaptiveDemux * demux); +static void gst_adaptive_demux_reset (GstAdaptiveDemux * demux, + GstEvent * event); static gboolean gst_adaptive_demux_prepare_streams (GstAdaptiveDemux * demux, gboolean first_and_live); static gboolean gst_adaptive_demux_expose_streams (GstAdaptiveDemux * demux); @@ -422,6 +430,7 @@ gst_adaptive_demux_class_init (GstAdaptiveDemuxClass * klass) G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); gstelement_class->change_state = gst_adaptive_demux_change_state; + gstelement_class->send_event = gst_adaptive_demux_send_event; gstbin_class->handle_message = gst_adaptive_demux_handle_message; @@ -452,6 +461,9 @@ gst_adaptive_demux_init (GstAdaptiveDemux * demux, demux->have_group_id = FALSE; demux->group_id = G_MAXUINT; + /* To receive downstream events, so we can handle EOS */ + GST_OBJECT_FLAG_SET (demux, GST_ELEMENT_FLAG_SOURCE); + gst_segment_init (&demux->segment, GST_FORMAT_TIME); gst_bin_set_suppressed_flags (GST_BIN_CAST (demux), @@ -561,12 +573,12 @@ gst_adaptive_demux_change_state (GstElement * element, case GST_STATE_CHANGE_PAUSED_TO_READY: GST_MANIFEST_LOCK (demux); demux->running = FALSE; - gst_adaptive_demux_reset (demux); + gst_adaptive_demux_reset (demux, NULL); GST_MANIFEST_UNLOCK (demux); break; case GST_STATE_CHANGE_READY_TO_PAUSED: GST_MANIFEST_LOCK (demux); - gst_adaptive_demux_reset (demux); + gst_adaptive_demux_reset (demux, NULL); /* Clear "cancelled" flag in uridownloader since subclass might want to * use uridownloader to fetch another manifest */ gst_uri_downloader_reset (demux->downloader); @@ -590,6 +602,40 @@ gst_adaptive_demux_change_state (GstElement * element, return result; } +static void +gst_adaptive_demux_force_eos (GstElement * element, gpointer user_data) +{ + GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (element); + GstEvent *event = user_data; + + GST_MANIFEST_LOCK (demux); + if (demux->priv->pending_eos) { + GST_DEBUG_OBJECT (demux, "Forcing EOS now."); + gst_adaptive_demux_reset (demux, event); + } + GST_MANIFEST_UNLOCK (demux); +} + +static gboolean +gst_adaptive_demux_send_event (GstElement * element, GstEvent * event) +{ + GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (element); + gboolean result = FALSE; + + if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) { + GST_MANIFEST_LOCK (demux); + demux->priv->pending_eos = TRUE; + gst_element_call_async (element, gst_adaptive_demux_force_eos, event, + (GDestroyNotify) gst_event_unref); + result = TRUE; + GST_MANIFEST_UNLOCK (demux); + } else { + gst_event_unref (event); + } + + return result; +} + static gboolean gst_adaptive_demux_sink_event (GstPad * pad, GstObject * parent, GstEvent * event) @@ -602,7 +648,7 @@ gst_adaptive_demux_sink_event (GstPad * pad, GstObject * parent, GST_API_LOCK (demux); GST_MANIFEST_LOCK (demux); - gst_adaptive_demux_reset (demux); + gst_adaptive_demux_reset (demux, NULL); ret = gst_pad_event_default (pad, parent, event); @@ -760,12 +806,11 @@ gst_adaptive_demux_sink_chain (GstPad * pad, GstObject * parent, /* must be called with manifest_lock taken */ static void -gst_adaptive_demux_reset (GstAdaptiveDemux * demux) +gst_adaptive_demux_reset (GstAdaptiveDemux * demux, GstEvent * eos) { GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux); GList *iter; GList *old_streams; - GstEvent *eos; /* take ownership of old_streams before releasing the manifest_lock in * gst_adaptive_demux_stop_tasks @@ -778,7 +823,11 @@ gst_adaptive_demux_reset (GstAdaptiveDemux * demux) if (klass->reset) klass->reset (demux); - eos = gst_event_new_eos (); + if (eos) + gst_event_ref (eos); + else + eos = gst_event_new_eos (); + for (iter = demux->streams; iter; iter = g_list_next (iter)) { GstAdaptiveDemuxStream *stream = iter->data; if (stream->pad) { @@ -790,6 +839,7 @@ gst_adaptive_demux_reset (GstAdaptiveDemux * demux) gst_adaptive_demux_stream_free (stream); } gst_event_unref (eos); + demux->priv->pending_eos = FALSE; g_list_free (demux->streams); demux->streams = NULL; if (demux->prepared_streams) {