From 44d3751d685a015952805bd882a7da5e1c294577 Mon Sep 17 00:00:00 2001 From: Jan Schmidt Date: Sat, 31 Dec 2022 23:22:29 +1100 Subject: [PATCH] adaptivedemux2: Add gst_adaptive_demux2_stream_wait_prepared() Add a method that waits for a stream to signal the prepare_cond after it returns a BUSY flow return. Part-of: --- .../adaptivedemux2/gstadaptivedemux-stream.c | 44 ++++++++++++++++++- .../adaptivedemux2/gstadaptivedemux-stream.h | 5 +++ .../ext/adaptivedemux2/gstadaptivedemux.c | 34 ++++++++++++-- 3 files changed, 79 insertions(+), 4 deletions(-) diff --git a/subprojects/gst-plugins-good/ext/adaptivedemux2/gstadaptivedemux-stream.c b/subprojects/gst-plugins-good/ext/adaptivedemux2/gstadaptivedemux-stream.c index 058bda1af7..5c8e5e3060 100644 --- a/subprojects/gst-plugins-good/ext/adaptivedemux2/gstadaptivedemux-stream.c +++ b/subprojects/gst-plugins-good/ext/adaptivedemux2/gstadaptivedemux-stream.c @@ -91,6 +91,9 @@ gst_adaptive_demux2_stream_init (GstAdaptiveDemux2Stream * stream) stream->start_position = stream->current_position = GST_CLOCK_TIME_NONE; + g_mutex_init (&stream->prepare_lock); + g_cond_init (&stream->prepare_cond); + gst_segment_init (&stream->parse_segment, GST_FORMAT_TIME); } @@ -148,6 +151,9 @@ gst_adaptive_demux2_stream_finalize (GObject * object) gst_clear_tag_list (&stream->pending_tags); g_clear_pointer (&stream->stream_collection, gst_object_unref); + g_mutex_clear (&stream->prepare_lock); + g_cond_clear (&stream->prepare_cond); + G_OBJECT_CLASS (parent_class)->finalize (object); } @@ -1787,12 +1793,21 @@ gst_adaptive_demux2_stream_on_can_download_fragments (GstAdaptiveDemux2Stream * * Called by a subclass that has returned GST_ADAPTIVE_DEMUX_FLOW_BUSY * from update_fragment_info() to indicate that it is ready to continue * downloading now. + * + * Called from the scheduler task */ void gst_adaptive_demux2_stream_mark_prepared (GstAdaptiveDemux2Stream * stream) { GstAdaptiveDemux *demux = stream->demux; + /* hlsdemux calls this method whenever a playlist is updated, so also + * use it to wake up a stream that's waiting at the live edge */ + if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_MANIFEST_UPDATE) { + gst_adaptive_demux2_stream_on_manifest_update (stream); + } + + g_cond_broadcast (&stream->prepare_cond); if (stream->state != GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_PREPARE) return; @@ -1805,6 +1820,22 @@ gst_adaptive_demux2_stream_mark_prepared (GstAdaptiveDemux2Stream * stream) gst_object_ref (stream), (GDestroyNotify) gst_object_unref); } +/* Called by external threads (manifest input on sinkpad, and seek handling) + * when it requires the stream to be prepared before they can continue + * Must be held with the SCHEDULER lock held */ +gboolean +gst_adaptive_demux2_stream_wait_prepared (GstAdaptiveDemux2Stream * stream) +{ + GstAdaptiveDemux *demux = stream->demux; + + g_mutex_lock (&stream->prepare_lock); + GST_ADAPTIVE_SCHEDULER_UNLOCK (demux); + g_cond_wait (&stream->prepare_cond, &stream->prepare_lock); + g_mutex_unlock (&stream->prepare_lock); + + return GST_ADAPTIVE_SCHEDULER_LOCK (demux); +} + static void gst_adaptive_demux2_stream_handle_playlist_eos (GstAdaptiveDemux2Stream * stream) @@ -1857,8 +1888,12 @@ gst_adaptive_demux2_stream_load_a_fragment (GstAdaptiveDemux2Stream * stream) GST_DEBUG_OBJECT (stream, "Fragment info update result: %d %s", ret, gst_flow_get_name (ret)); - if (ret == GST_FLOW_OK) + if (ret == GST_FLOW_OK) { + /* Wake anyone that's waiting for this stream to get prepared */ + if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_PREPARE) + g_cond_broadcast (&stream->prepare_cond); stream->starting_fragment = TRUE; + } break; case GST_ADAPTIVE_DEMUX2_STREAM_STATE_DOWNLOADING: break; @@ -1878,7 +1913,12 @@ gst_adaptive_demux2_stream_load_a_fragment (GstAdaptiveDemux2Stream * stream) if (ret == GST_ADAPTIVE_DEMUX_FLOW_BUSY) { GST_LOG_OBJECT (stream, "Sub-class returned BUSY flow return. Waiting in PREPARE state"); + /* Need to take the prepare lock specifically when switching + * to WAITING_PREPARE state, to avoid a race in _wait_prepared(); + */ + g_mutex_lock (&stream->prepare_lock); stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_PREPARE; + g_mutex_unlock (&stream->prepare_lock); return FALSE; } @@ -1941,6 +1981,7 @@ gst_adaptive_demux2_stream_load_a_fragment (GstAdaptiveDemux2Stream * stream) case GST_ADAPTIVE_DEMUX_FLOW_LOST_SYNC: GST_DEBUG_OBJECT (stream, "Lost sync, asking reset to current position"); stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED; + g_cond_broadcast (&stream->prepare_cond); gst_adaptive_demux_handle_lost_sync (demux); return FALSE; case GST_FLOW_NOT_LINKED: @@ -2140,6 +2181,7 @@ gst_adaptive_demux2_stream_stop_default (GstAdaptiveDemux2Stream * stream) GST_DEBUG_OBJECT (stream, "Stopping stream (from state %d)", stream->state); stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED; + g_cond_broadcast (&stream->prepare_cond); if (stream->pending_cb_id != 0) { gst_adaptive_demux_loop_cancel_call (demux->priv->scheduler_task, diff --git a/subprojects/gst-plugins-good/ext/adaptivedemux2/gstadaptivedemux-stream.h b/subprojects/gst-plugins-good/ext/adaptivedemux2/gstadaptivedemux-stream.h index 3463bf922c..8bd607baa6 100644 --- a/subprojects/gst-plugins-good/ext/adaptivedemux2/gstadaptivedemux-stream.h +++ b/subprojects/gst-plugins-good/ext/adaptivedemux2/gstadaptivedemux-stream.h @@ -309,6 +309,10 @@ struct _GstAdaptiveDemux2Stream GstAdaptiveDemux2StreamState state; guint pending_cb_id; gboolean download_active; + + GMutex prepare_lock; + GCond prepare_cond; + /* The (global output) time at which this stream should be woken * to download more input */ GstClockTimeDiff next_input_wakeup_time; @@ -376,6 +380,7 @@ gboolean gst_adaptive_demux2_stream_handle_collection (GstAdaptiveDemux2Stream * GstStreamCollection *collection, gboolean *had_pending_tracks); void gst_adaptive_demux2_stream_mark_prepared(GstAdaptiveDemux2Stream *stream); +gboolean gst_adaptive_demux2_stream_wait_prepared(GstAdaptiveDemux2Stream *stream); void gst_adaptive_demux2_stream_fragment_clear (GstAdaptiveDemux2StreamFragment * f); diff --git a/subprojects/gst-plugins-good/ext/adaptivedemux2/gstadaptivedemux.c b/subprojects/gst-plugins-good/ext/adaptivedemux2/gstadaptivedemux.c index 8989dfc31b..36ee337eda 100644 --- a/subprojects/gst-plugins-good/ext/adaptivedemux2/gstadaptivedemux.c +++ b/subprojects/gst-plugins-good/ext/adaptivedemux2/gstadaptivedemux.c @@ -1683,7 +1683,21 @@ gst_adaptive_demux_prepare_streams (GstAdaptiveDemux * demux, /* TODO we only need the first timestamp, maybe create a simple function to * get the current PTS of a fragment ? */ GST_DEBUG_OBJECT (stream, "Calling update_fragment_info"); - gst_adaptive_demux2_stream_update_fragment_info (stream); + GstFlowReturn flow_ret = + gst_adaptive_demux2_stream_update_fragment_info (stream); + + /* Handle fragment info waiting on BUSY */ + while (flow_ret == GST_ADAPTIVE_DEMUX_FLOW_BUSY) { + if (!gst_adaptive_demux2_stream_wait_prepared (stream)) + break; + flow_ret = gst_adaptive_demux2_stream_update_fragment_info (stream); + } + + if (flow_ret != GST_FLOW_OK) { + GST_WARNING_OBJECT (stream, "Could not update fragment info. flow: %s", + gst_flow_get_name (flow_ret)); + continue; + } GST_DEBUG_OBJECT (stream, "Got stream time %" GST_STIME_FORMAT, @@ -2285,8 +2299,22 @@ gst_adaptive_demux_handle_seek_event (GstAdaptiveDemux * demux, } } - if (gst_adaptive_demux2_stream_seek (stream, rate >= 0, stream_seek_flags, - ts, &ts) != GST_FLOW_OK) { + GstFlowReturn flow_ret = + gst_adaptive_demux2_stream_seek (stream, rate >= 0, stream_seek_flags, + ts, &ts); + + /* Handle fragment info waiting on BUSY */ + while (flow_ret == GST_ADAPTIVE_DEMUX_FLOW_BUSY) { + if (!gst_adaptive_demux2_stream_wait_prepared (stream)) + break; + flow_ret = gst_adaptive_demux2_stream_update_fragment_info (stream); + } + + if (flow_ret != GST_FLOW_OK) { + GST_DEBUG_OBJECT (demux, + "Seek on stream %" GST_PTR_FORMAT " failed with flow return %s", + stream, gst_flow_get_name (flow_ret)); + GST_ADAPTIVE_SCHEDULER_UNLOCK (demux); GST_API_UNLOCK (demux);