diff --git a/gst/hls/gsthlsdemux.c b/gst/hls/gsthlsdemux.c index 900f25fc1b..1aff78893b 100644 --- a/gst/hls/gsthlsdemux.c +++ b/gst/hls/gsthlsdemux.c @@ -105,12 +105,11 @@ static GstFlowReturn gst_hls_demux_fetcher_chain (GstPad * pad, GstBuffer * buf); static gboolean gst_hls_demux_fetcher_sink_event (GstPad * pad, GstEvent * event); -static void gst_hls_demux_loop (GstHLSDemux * demux); +static void gst_hls_demux_stream_loop (GstHLSDemux * demux); +static void gst_hls_demux_updates_loop (GstHLSDemux * demux); static void gst_hls_demux_stop (GstHLSDemux * demux); static void gst_hls_demux_stop_fetcher_locked (GstHLSDemux * demux, gboolean cancelled); -static void gst_hls_demux_stop_update (GstHLSDemux * demux); -static gboolean gst_hls_demux_start_update (GstHLSDemux * demux); static gboolean gst_hls_demux_cache_fragments (GstHLSDemux * demux); static gboolean gst_hls_demux_schedule (GstHLSDemux * demux); static gboolean gst_hls_demux_switch_playlist (GstHLSDemux * demux); @@ -156,12 +155,28 @@ gst_hls_demux_dispose (GObject * obj) g_cond_free (demux->fetcher_cond); g_mutex_free (demux->fetcher_lock); - g_cond_free (demux->thread_cond); - g_mutex_free (demux->thread_lock); + if (demux->stream_task) { + if (GST_TASK_STATE (demux->stream_task) != GST_TASK_STOPPED) { + GST_DEBUG_OBJECT (demux, "Leaving streaming task"); + gst_task_stop (demux->stream_task); + gst_task_join (demux->stream_task); + } + gst_object_unref (demux->stream_task); + g_static_rec_mutex_free (&demux->stream_lock); + demux->stream_task = NULL; + } - gst_task_join (demux->task); - gst_object_unref (demux->task); - g_static_rec_mutex_free (&demux->task_lock); + if (demux->updates_task) { + if (GST_TASK_STATE (demux->updates_task) != GST_TASK_STOPPED) { + GST_DEBUG_OBJECT (demux, "Leaving updates task"); + gst_task_stop (demux->updates_task); + gst_task_join (demux->updates_task); + } + gst_object_unref (demux->updates_task); + g_mutex_free (demux->updates_timed_lock); + g_static_rec_mutex_free (&demux->updates_lock); + demux->updates_task = NULL; + } gst_object_unref (demux->fetcher_bus); gst_object_unref (demux->fetcherpad); @@ -236,15 +251,22 @@ gst_hls_demux_init (GstHLSDemux * demux, GstHLSDemuxClass * klass) demux->fetcher_bus = gst_bus_new (); gst_bus_set_sync_handler (demux->fetcher_bus, gst_hls_demux_fetcher_bus_handler, demux); - demux->thread_cond = g_cond_new (); - demux->thread_lock = g_mutex_new (); demux->fetcher_cond = g_cond_new (); demux->fetcher_lock = g_mutex_new (); demux->queue = g_queue_new (); - g_static_rec_mutex_init (&demux->task_lock); - /* FIXME: This really should be a pad task instead */ - demux->task = gst_task_create ((GstTaskFunction) gst_hls_demux_loop, demux); - gst_task_set_lock (demux->task, &demux->task_lock); + + /* Updates task */ + g_static_rec_mutex_init (&demux->updates_lock); + demux->updates_task = + gst_task_create ((GstTaskFunction) gst_hls_demux_updates_loop, demux); + gst_task_set_lock (demux->updates_task, &demux->updates_lock); + demux->updates_timed_lock = g_mutex_new (); + + /* Streaming task */ + g_static_rec_mutex_init (&demux->stream_lock); + demux->stream_task = + gst_task_create ((GstTaskFunction) gst_hls_demux_stream_loop, demux); + gst_task_set_lock (demux->stream_task, &demux->stream_lock); } static void @@ -301,7 +323,7 @@ gst_hls_demux_change_state (GstElement * element, GstStateChange transition) state and we filled our queue with enough cached fragments */ if (gst_m3u8_client_get_uri (demux->client)[0] != '\0') - gst_hls_demux_start_update (demux); + gst_task_start (demux->updates_task); break; default: break; @@ -311,12 +333,12 @@ gst_hls_demux_change_state (GstElement * element, GstStateChange transition) switch (transition) { case GST_STATE_CHANGE_PLAYING_TO_PAUSED: - gst_hls_demux_stop_update (demux); + gst_task_stop (demux->updates_task); break; case GST_STATE_CHANGE_PAUSED_TO_READY: demux->cancelled = TRUE; gst_hls_demux_stop (demux); - gst_task_join (demux->task); + gst_task_join (demux->stream_task); gst_hls_demux_reset (demux, FALSE); break; default: @@ -390,15 +412,15 @@ gst_hls_demux_src_event (GstPad * pad, GstEvent * event) } demux->cancelled = TRUE; - gst_task_pause (demux->task); + gst_task_pause (demux->stream_task); g_mutex_lock (demux->fetcher_lock); gst_hls_demux_stop_fetcher_locked (demux, TRUE); g_mutex_unlock (demux->fetcher_lock); - gst_hls_demux_stop_update (demux); - gst_task_pause (demux->task); + gst_task_stop (demux->updates_task); + gst_task_pause (demux->stream_task); /* wait for streaming to finish */ - g_static_rec_mutex_lock (&demux->task_lock); + g_static_rec_mutex_lock (&demux->stream_lock); demux->need_cache = TRUE; while (!g_queue_is_empty (demux->queue)) { @@ -423,8 +445,8 @@ gst_hls_demux_src_event (GstPad * pad, GstEvent * event) } demux->cancelled = FALSE; - gst_task_start (demux->task); - g_static_rec_mutex_unlock (&demux->task_lock); + gst_task_start (demux->stream_task); + g_static_rec_mutex_unlock (&demux->stream_lock); return TRUE; } @@ -487,7 +509,7 @@ gst_hls_demux_sink_event (GstPad * pad, GstEvent * event) return FALSE; } - gst_task_start (demux->task); + gst_task_start (demux->stream_task); gst_event_unref (event); return TRUE; } @@ -670,8 +692,15 @@ gst_hls_demux_stop (GstHLSDemux * demux) g_mutex_lock (demux->fetcher_lock); gst_hls_demux_stop_fetcher_locked (demux, TRUE); g_mutex_unlock (demux->fetcher_lock); - gst_task_stop (demux->task); - gst_hls_demux_stop_update (demux); + + if (GST_TASK_STATE (demux->updates_task) != GST_TASK_STOPPED) { + demux->stop_stream_task = TRUE; + gst_task_stop (demux->updates_task); + GST_TASK_SIGNAL (demux->updates_task); + } + + if (GST_TASK_STATE (demux->stream_task) != GST_TASK_STOPPED) + gst_task_stop (demux->stream_task); } static void @@ -715,7 +744,7 @@ switch_pads (GstHLSDemux * demux, GstCaps * newcaps) } static void -gst_hls_demux_loop (GstHLSDemux * demux) +gst_hls_demux_stream_loop (GstHLSDemux * demux) { GstBuffer *buf; GstFlowReturn ret; @@ -732,7 +761,7 @@ gst_hls_demux_loop (GstHLSDemux * demux) /* we can start now the updates thread (only if on playing) */ if (GST_STATE (demux) == GST_STATE_PLAYING) - gst_hls_demux_start_update (demux); + gst_task_start (demux->updates_task); GST_INFO_OBJECT (demux, "First fragments cached successfully"); } @@ -783,7 +812,7 @@ end_of_playlist: cache_error: { - gst_task_pause (demux->task); + gst_task_pause (demux->stream_task); if (!demux->cancelled) { GST_ELEMENT_ERROR (demux, RESOURCE, NOT_FOUND, ("Could not cache the first fragments"), (NULL)); @@ -802,7 +831,7 @@ error: pause_task: { - gst_task_pause (demux->task); + gst_task_pause (demux->stream_task); return; } } @@ -855,7 +884,6 @@ static void gst_hls_demux_reset (GstHLSDemux * demux, gboolean dispose) { demux->need_cache = TRUE; - demux->thread_return = FALSE; demux->accumulated_delay = 0; demux->end_of_playlist = FALSE; demux->cancelled = FALSE; @@ -903,8 +931,8 @@ gst_hls_demux_set_location (GstHLSDemux * demux, const gchar * uri) return TRUE; } -static gboolean -gst_hls_demux_update_thread (GstHLSDemux * demux) +void +gst_hls_demux_updates_loop (GstHLSDemux * demux) { /* Loop for the updates. It's started when the first fragments are cached and * schedules the next update of the playlist (for lives sources) and the next @@ -912,15 +940,14 @@ gst_hls_demux_update_thread (GstHLSDemux * demux) * download time with the next scheduled update to check if we can or should * switch to a different bitrate */ - g_mutex_lock (demux->thread_lock); - GST_DEBUG_OBJECT (demux, "Started updates thread"); + /* block until the next scheduled update or the signal to quit this thread */ + g_mutex_lock (demux->updates_timed_lock); + GST_DEBUG_OBJECT (demux, "Started updates task"); while (TRUE) { - /* block until the next scheduled update or the signal to quit this thread */ - if (g_cond_timed_wait (demux->thread_cond, demux->thread_lock, - &demux->next_update)) { + if (g_cond_timed_wait (GST_TASK_GET_COND (demux->updates_task), + demux->updates_timed_lock, &demux->next_update)) { goto quit; } - /* update the playlist for live sources */ if (gst_m3u8_client_is_live (demux->client)) { if (!gst_hls_demux_update_playlist (demux)) { @@ -976,41 +1003,11 @@ gst_hls_demux_update_thread (GstHLSDemux * demux) quit: { - GST_DEBUG_OBJECT (demux, "Stopped updates thread"); - demux->updates_thread = NULL; - g_mutex_unlock (demux->thread_lock); - return TRUE; + gst_hls_demux_stop (demux); + g_mutex_unlock (demux->updates_timed_lock); } } - -static void -gst_hls_demux_stop_update (GstHLSDemux * demux) -{ - GST_DEBUG_OBJECT (demux, "Stopping updates thread"); - while (demux->updates_thread) { - g_mutex_lock (demux->thread_lock); - g_cond_signal (demux->thread_cond); - g_mutex_unlock (demux->thread_lock); - } -} - -static gboolean -gst_hls_demux_start_update (GstHLSDemux * demux) -{ - GError *error; - - /* creates a new thread for the updates */ - g_mutex_lock (demux->thread_lock); - if (demux->updates_thread == NULL) { - GST_DEBUG_OBJECT (demux, "Starting updates thread"); - demux->updates_thread = g_thread_create ( - (GThreadFunc) gst_hls_demux_update_thread, demux, FALSE, &error); - } - g_mutex_unlock (demux->thread_lock); - return (error != NULL); -} - static gboolean gst_hls_demux_cache_fragments (GstHLSDemux * demux) { @@ -1314,7 +1311,7 @@ gst_hls_demux_get_next_fragment (GstHLSDemux * demux) &next_fragment_uri, &duration, ×tamp)) { GST_INFO_OBJECT (demux, "This playlist doesn't contain more fragments"); demux->end_of_playlist = TRUE; - gst_task_start (demux->task); + gst_task_start (demux->stream_task); return FALSE; } @@ -1356,7 +1353,7 @@ gst_hls_demux_get_next_fragment (GstHLSDemux * demux) } g_queue_push_tail (demux->queue, buf); - gst_task_start (demux->task); + gst_task_start (demux->stream_task); gst_adapter_clear (demux->download); return TRUE; } diff --git a/gst/hls/gsthlsdemux.h b/gst/hls/gsthlsdemux.h index a09a88b769..f304a06e73 100644 --- a/gst/hls/gsthlsdemux.h +++ b/gst/hls/gsthlsdemux.h @@ -51,8 +51,6 @@ struct _GstHLSDemux { GstElement parent; - GstTask *task; - GStaticRecMutex task_lock; GstPad *srcpad; GstPad *sinkpad; GstBuffer *playlist; @@ -67,13 +65,10 @@ struct _GstHLSDemux guint fragments_cache; /* number of fragments needed to be cached to start playing */ gfloat bitrate_switch_tol; /* tolerance with respect to the fragment duration to switch the bitarate*/ - /* Updates thread */ - GThread *updates_thread; /* Thread handling the playlist and fragments updates */ - GMutex *thread_lock; /* Thread lock */ - GCond *thread_cond; /* Signals the thread to quit */ - gboolean thread_return; /* Instructs the thread to return after the thread_quit condition is meet */ - GTimeVal next_update; /* Time of the next update */ - gint64 accumulated_delay; /* Delay accumulated fetching fragments, used to decide a playlist switch */ + /* Streaming task */ + GstTask *stream_task; + GStaticRecMutex stream_lock; + gboolean stop_stream_task; /* Fragments fetcher */ GstElement *fetcher; @@ -87,6 +82,13 @@ struct _GstHLSDemux gboolean cancelled; GstAdapter *download; + /* Updates task */ + GstTask *updates_task; + GStaticRecMutex updates_lock; + GMutex *updates_timed_lock; + GTimeVal next_update; /* Time of the next update */ + gint64 accumulated_delay; /* Delay accumulated fetching fragments, used to decide a playlist switch */ + /* Position in the stream */ GstClockTime position; GstClockTime position_shift;