mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2024-12-22 00:06:36 +00:00
basesrc: Don't hold LIVE_LOCK in create/alloc/fill
Holding this lock on live source prevents the source from changing the caps in ::create() without risking a deadlock. This has consequences as the LIVE_LOCK was replacing the STREAM_LOCK in many situation. As a side effect: - We no longer need to unlock when doing play/pause as the LIVE_LOCK isn't held. We then let the create() call finish, but will block if the state have changed meanwhile. This has the benefit that wait_preroll() calls in subclass is no longer needed. - We no longer need to change the state to unlock, simplifying the set_flushing() interface - We need different handling for EOS depending if we are in push or pull mode. This patch also document the locking of each private class member and the locking order. https://bugzilla.gnome.org/show_bug.cgi?id=783301
This commit is contained in:
parent
946622ec3f
commit
523de1a9dc
1 changed files with 180 additions and 153 deletions
|
@ -204,59 +204,64 @@ enum
|
||||||
#define GST_BASE_SRC_GET_PRIVATE(obj) \
|
#define GST_BASE_SRC_GET_PRIVATE(obj) \
|
||||||
(G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BASE_SRC, GstBaseSrcPrivate))
|
(G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BASE_SRC, GstBaseSrcPrivate))
|
||||||
|
|
||||||
|
/* The basesrc implementation need to respect the following locking order:
|
||||||
|
* 1. STREAM_LOCK
|
||||||
|
* 2. LIVE_LOCK
|
||||||
|
* 3. OBJECT_LOCK
|
||||||
|
*/
|
||||||
struct _GstBaseSrcPrivate
|
struct _GstBaseSrcPrivate
|
||||||
{
|
{
|
||||||
gboolean discont;
|
gboolean discont; /* STREAM_LOCK */
|
||||||
gboolean flushing;
|
gboolean flushing; /* LIVE_LOCK */
|
||||||
|
|
||||||
GstFlowReturn start_result;
|
GstFlowReturn start_result; /* OBJECT_LOCK */
|
||||||
gboolean async;
|
gboolean async; /* OBJECT_LOCK */
|
||||||
|
|
||||||
/* if a stream-start event should be sent */
|
/* if a stream-start event should be sent */
|
||||||
gboolean stream_start_pending;
|
gboolean stream_start_pending; /* STREAM_LOCK */
|
||||||
|
|
||||||
/* if segment should be sent and a
|
/* if segment should be sent and a
|
||||||
* seqnum if it was originated by a seek */
|
* seqnum if it was originated by a seek */
|
||||||
gboolean segment_pending;
|
gboolean segment_pending; /* OBJECT_LOCK */
|
||||||
guint32 segment_seqnum;
|
guint32 segment_seqnum; /* OBJECT_LOCK */
|
||||||
|
|
||||||
/* if EOS is pending (atomic) */
|
/* if EOS is pending (atomic) */
|
||||||
GstEvent *pending_eos;
|
GstEvent *pending_eos; /* OBJECT_LOCK */
|
||||||
gint has_pending_eos;
|
gint has_pending_eos; /* atomic */
|
||||||
|
|
||||||
/* if the eos was caused by a forced eos from the application */
|
/* if the eos was caused by a forced eos from the application */
|
||||||
gboolean forced_eos;
|
gboolean forced_eos; /* LIVE_LOCK */
|
||||||
|
|
||||||
/* startup latency is the time it takes between going to PLAYING and producing
|
/* startup latency is the time it takes between going to PLAYING and producing
|
||||||
* the first BUFFER with running_time 0. This value is included in the latency
|
* the first BUFFER with running_time 0. This value is included in the latency
|
||||||
* reporting. */
|
* reporting. */
|
||||||
GstClockTime latency;
|
GstClockTime latency; /* OBJECT_LOCK */
|
||||||
/* timestamp offset, this is the offset add to the values of gst_times for
|
/* timestamp offset, this is the offset add to the values of gst_times for
|
||||||
* pseudo live sources */
|
* pseudo live sources */
|
||||||
GstClockTimeDiff ts_offset;
|
GstClockTimeDiff ts_offset; /* OBJECT_LOCK */
|
||||||
|
|
||||||
gboolean do_timestamp;
|
gboolean do_timestamp; /* OBJECT_LOCK */
|
||||||
volatile gint dynamic_size;
|
volatile gint dynamic_size; /* atomic */
|
||||||
volatile gint automatic_eos;
|
volatile gint automatic_eos; /* atomic */
|
||||||
|
|
||||||
/* stream sequence number */
|
/* stream sequence number */
|
||||||
guint32 seqnum;
|
guint32 seqnum; /* STREAM_LOCK */
|
||||||
|
|
||||||
/* pending events (TAG, CUSTOM_BOTH, CUSTOM_DOWNSTREAM) to be
|
/* pending events (TAG, CUSTOM_BOTH, CUSTOM_DOWNSTREAM) to be
|
||||||
* pushed in the data stream */
|
* pushed in the data stream */
|
||||||
GList *pending_events;
|
GList *pending_events; /* OBJECT_LOCK */
|
||||||
volatile gint have_events;
|
volatile gint have_events; /* OBJECT_LOCK */
|
||||||
|
|
||||||
/* QoS *//* with LOCK */
|
/* QoS *//* with LOCK */
|
||||||
gboolean qos_enabled;
|
gboolean qos_enabled; /* unused */
|
||||||
gdouble proportion;
|
gdouble proportion; /* OBJECT_LOCK */
|
||||||
GstClockTime earliest_time;
|
GstClockTime earliest_time; /* OBJECT_LOCK */
|
||||||
|
|
||||||
GstBufferPool *pool;
|
GstBufferPool *pool; /* OBJECT_LOCK */
|
||||||
GstAllocator *allocator;
|
GstAllocator *allocator; /* OBJECT_LOCK */
|
||||||
GstAllocationParams params;
|
GstAllocationParams params; /* OBJECT_LOCK */
|
||||||
|
|
||||||
GCond async_cond;
|
GCond async_cond; /* OBJECT_LOCK */
|
||||||
};
|
};
|
||||||
|
|
||||||
static GstElementClass *parent_class = NULL;
|
static GstElementClass *parent_class = NULL;
|
||||||
|
@ -328,7 +333,7 @@ static gboolean gst_base_src_decide_allocation_default (GstBaseSrc * basesrc,
|
||||||
GstQuery * query);
|
GstQuery * query);
|
||||||
|
|
||||||
static gboolean gst_base_src_set_flushing (GstBaseSrc * basesrc,
|
static gboolean gst_base_src_set_flushing (GstBaseSrc * basesrc,
|
||||||
gboolean flushing, gboolean live_play, gboolean * playing);
|
gboolean flushing);
|
||||||
|
|
||||||
static gboolean gst_base_src_start (GstBaseSrc * basesrc);
|
static gboolean gst_base_src_start (GstBaseSrc * basesrc);
|
||||||
static gboolean gst_base_src_stop (GstBaseSrc * basesrc);
|
static gboolean gst_base_src_stop (GstBaseSrc * basesrc);
|
||||||
|
@ -484,6 +489,31 @@ gst_base_src_finalize (GObject * object)
|
||||||
G_OBJECT_CLASS (parent_class)->finalize (object);
|
G_OBJECT_CLASS (parent_class)->finalize (object);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Call with LIVE_LOCK held */
|
||||||
|
static GstFlowReturn
|
||||||
|
gst_base_src_wait_playing_unlocked (GstBaseSrc * src)
|
||||||
|
{
|
||||||
|
while (G_UNLIKELY (!src->live_running && !src->priv->flushing)) {
|
||||||
|
/* block until the state changes, or we get a flush, or something */
|
||||||
|
GST_DEBUG_OBJECT (src, "live source waiting for running state");
|
||||||
|
GST_LIVE_WAIT (src);
|
||||||
|
GST_DEBUG_OBJECT (src, "live source unlocked");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (src->priv->flushing)
|
||||||
|
goto flushing;
|
||||||
|
|
||||||
|
return GST_FLOW_OK;
|
||||||
|
|
||||||
|
/* ERRORS */
|
||||||
|
flushing:
|
||||||
|
{
|
||||||
|
GST_DEBUG_OBJECT (src, "we are flushing");
|
||||||
|
return GST_FLOW_FLUSHING;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* gst_base_src_wait_playing:
|
* gst_base_src_wait_playing:
|
||||||
* @src: the src
|
* @src: the src
|
||||||
|
@ -503,25 +533,15 @@ gst_base_src_finalize (GObject * object)
|
||||||
GstFlowReturn
|
GstFlowReturn
|
||||||
gst_base_src_wait_playing (GstBaseSrc * src)
|
gst_base_src_wait_playing (GstBaseSrc * src)
|
||||||
{
|
{
|
||||||
|
GstFlowReturn ret;
|
||||||
|
|
||||||
g_return_val_if_fail (GST_IS_BASE_SRC (src), GST_FLOW_ERROR);
|
g_return_val_if_fail (GST_IS_BASE_SRC (src), GST_FLOW_ERROR);
|
||||||
|
|
||||||
do {
|
GST_LIVE_LOCK (src);
|
||||||
/* block until the state changes, or we get a flush, or something */
|
ret = gst_base_src_wait_playing_unlocked (src);
|
||||||
GST_DEBUG_OBJECT (src, "live source waiting for running state");
|
GST_LIVE_UNLOCK (src);
|
||||||
GST_LIVE_WAIT (src);
|
|
||||||
GST_DEBUG_OBJECT (src, "live source unlocked");
|
|
||||||
if (src->priv->flushing)
|
|
||||||
goto flushing;
|
|
||||||
} while (G_UNLIKELY (!src->live_running));
|
|
||||||
|
|
||||||
return GST_FLOW_OK;
|
return ret;
|
||||||
|
|
||||||
/* ERRORS */
|
|
||||||
flushing:
|
|
||||||
{
|
|
||||||
GST_DEBUG_OBJECT (src, "we are flushing");
|
|
||||||
return GST_FLOW_FLUSHING;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -855,6 +875,7 @@ gst_base_src_new_seamless_segment (GstBaseSrc * src, gint64 start, gint64 stop,
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* called with STREAM_LOCK */
|
||||||
static gboolean
|
static gboolean
|
||||||
gst_base_src_send_stream_start (GstBaseSrc * src)
|
gst_base_src_send_stream_start (GstBaseSrc * src)
|
||||||
{
|
{
|
||||||
|
@ -1577,7 +1598,7 @@ gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock)
|
||||||
GstSeekFlags flags;
|
GstSeekFlags flags;
|
||||||
GstSeekType start_type, stop_type;
|
GstSeekType start_type, stop_type;
|
||||||
gint64 start, stop;
|
gint64 start, stop;
|
||||||
gboolean flush, playing;
|
gboolean flush;
|
||||||
gboolean update;
|
gboolean update;
|
||||||
gboolean relative_seek = FALSE;
|
gboolean relative_seek = FALSE;
|
||||||
gboolean seekseg_configured = FALSE;
|
gboolean seekseg_configured = FALSE;
|
||||||
|
@ -1629,7 +1650,7 @@ gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock)
|
||||||
|
|
||||||
/* unblock streaming thread. */
|
/* unblock streaming thread. */
|
||||||
if (unlock)
|
if (unlock)
|
||||||
gst_base_src_set_flushing (src, TRUE, FALSE, &playing);
|
gst_base_src_set_flushing (src, TRUE);
|
||||||
|
|
||||||
/* grab streaming lock, this should eventually be possible, either
|
/* grab streaming lock, this should eventually be possible, either
|
||||||
* because the task is paused, our streaming thread stopped
|
* because the task is paused, our streaming thread stopped
|
||||||
|
@ -1645,7 +1666,7 @@ gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock)
|
||||||
}
|
}
|
||||||
|
|
||||||
if (unlock)
|
if (unlock)
|
||||||
gst_base_src_set_flushing (src, FALSE, playing, NULL);
|
gst_base_src_set_flushing (src, FALSE);
|
||||||
|
|
||||||
/* If we configured the seeksegment above, don't overwrite it now. Otherwise
|
/* If we configured the seeksegment above, don't overwrite it now. Otherwise
|
||||||
* copy the current segment info into the temp segment that we can actually
|
* copy the current segment info into the temp segment that we can actually
|
||||||
|
@ -1762,51 +1783,30 @@ gst_base_src_send_event (GstElement * element, GstEvent * event)
|
||||||
/* bidirectional events */
|
/* bidirectional events */
|
||||||
case GST_EVENT_FLUSH_START:
|
case GST_EVENT_FLUSH_START:
|
||||||
GST_DEBUG_OBJECT (src, "pushing flush-start event downstream");
|
GST_DEBUG_OBJECT (src, "pushing flush-start event downstream");
|
||||||
result = gst_pad_push_event (src->srcpad, event);
|
|
||||||
/* also unblock the create function */
|
|
||||||
gst_base_src_activate_pool (src, FALSE);
|
|
||||||
/* unlock any subclasses, we need to do this before grabbing the
|
|
||||||
* LIVE_LOCK since we hold this lock before going into ::create. We pass an
|
|
||||||
* unlock to the params because of backwards compat (see seek handler)*/
|
|
||||||
if (bclass->unlock)
|
|
||||||
bclass->unlock (src);
|
|
||||||
|
|
||||||
/* the live lock is released when we are blocked, waiting for playing or
|
result = gst_pad_push_event (src->srcpad, event);
|
||||||
* when we sync to the clock. */
|
gst_base_src_set_flushing (src, TRUE);
|
||||||
GST_LIVE_LOCK (src);
|
|
||||||
src->priv->flushing = TRUE;
|
|
||||||
/* clear pending EOS if any */
|
|
||||||
if (g_atomic_int_get (&src->priv->has_pending_eos)) {
|
|
||||||
GST_OBJECT_LOCK (src);
|
|
||||||
CLEAR_PENDING_EOS (src);
|
|
||||||
src->priv->forced_eos = FALSE;
|
|
||||||
GST_OBJECT_UNLOCK (src);
|
|
||||||
}
|
|
||||||
if (bclass->unlock_stop)
|
|
||||||
bclass->unlock_stop (src);
|
|
||||||
if (src->clock_id)
|
|
||||||
gst_clock_id_unschedule (src->clock_id);
|
|
||||||
GST_DEBUG_OBJECT (src, "signal");
|
|
||||||
GST_LIVE_SIGNAL (src);
|
|
||||||
GST_LIVE_UNLOCK (src);
|
|
||||||
event = NULL;
|
event = NULL;
|
||||||
break;
|
break;
|
||||||
case GST_EVENT_FLUSH_STOP:
|
case GST_EVENT_FLUSH_STOP:
|
||||||
{
|
{
|
||||||
gboolean start;
|
gboolean start;
|
||||||
|
|
||||||
GST_LIVE_LOCK (src);
|
GST_PAD_STREAM_LOCK (src->srcpad);
|
||||||
src->priv->segment_pending = TRUE;
|
gst_base_src_set_flushing (src, FALSE);
|
||||||
src->priv->flushing = FALSE;
|
|
||||||
GST_DEBUG_OBJECT (src, "pushing flush-stop event downstream");
|
GST_DEBUG_OBJECT (src, "pushing flush-stop event downstream");
|
||||||
result = gst_pad_push_event (src->srcpad, event);
|
result = gst_pad_push_event (src->srcpad, event);
|
||||||
|
|
||||||
gst_base_src_activate_pool (src, TRUE);
|
/* For external flush, restart the task .. */
|
||||||
|
GST_LIVE_LOCK (src);
|
||||||
|
src->priv->segment_pending = TRUE;
|
||||||
|
|
||||||
GST_OBJECT_LOCK (src->srcpad);
|
GST_OBJECT_LOCK (src->srcpad);
|
||||||
start = (GST_PAD_MODE (src->srcpad) == GST_PAD_MODE_PUSH);
|
start = (GST_PAD_MODE (src->srcpad) == GST_PAD_MODE_PUSH);
|
||||||
GST_OBJECT_UNLOCK (src->srcpad);
|
GST_OBJECT_UNLOCK (src->srcpad);
|
||||||
|
|
||||||
|
/* ... and for live sources, only if in playing state */
|
||||||
if (src->is_live) {
|
if (src->is_live) {
|
||||||
if (!src->live_running)
|
if (!src->live_running)
|
||||||
start = FALSE;
|
start = FALSE;
|
||||||
|
@ -1815,7 +1815,10 @@ gst_base_src_send_event (GstElement * element, GstEvent * event)
|
||||||
if (start)
|
if (start)
|
||||||
gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop,
|
gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop,
|
||||||
src->srcpad, NULL);
|
src->srcpad, NULL);
|
||||||
|
|
||||||
GST_LIVE_UNLOCK (src);
|
GST_LIVE_UNLOCK (src);
|
||||||
|
GST_PAD_STREAM_UNLOCK (src->srcpad);
|
||||||
|
|
||||||
event = NULL;
|
event = NULL;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -1823,47 +1826,68 @@ gst_base_src_send_event (GstElement * element, GstEvent * event)
|
||||||
/* downstream serialized events */
|
/* downstream serialized events */
|
||||||
case GST_EVENT_EOS:
|
case GST_EVENT_EOS:
|
||||||
{
|
{
|
||||||
|
gboolean push_mode;
|
||||||
|
|
||||||
/* queue EOS and make sure the task or pull function performs the EOS
|
/* queue EOS and make sure the task or pull function performs the EOS
|
||||||
* actions.
|
* actions.
|
||||||
*
|
*
|
||||||
* We have two possibilities:
|
* For push mode, This will be done in 3 steps. It is required to not
|
||||||
|
* block here as gst_element_send_event() will hold the STATE_LOCK, hence
|
||||||
|
* blocking would prevent asynchronous state change to complete.
|
||||||
*
|
*
|
||||||
* - Before we are to enter the _create function, we check the has_pending_eos
|
* 1. We stop the streaming thread
|
||||||
* first and do EOS instead of entering it.
|
* 2. We set the pending eos
|
||||||
* - If we are in the _create function or we did not manage to set the
|
* 3. We start the streaming thread again, so it is performed
|
||||||
* flag fast enough and we are about to enter the _create function,
|
* asynchronously.
|
||||||
* we unlock it so that we exit with FLUSHING immediately. We then
|
*
|
||||||
* check the EOS flag and do the EOS logic.
|
* For pull mode, we simply mark the pending EOS without flushing.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
GST_OBJECT_LOCK (src->srcpad);
|
||||||
|
push_mode = GST_PAD_MODE (src->srcpad) == GST_PAD_MODE_PUSH;
|
||||||
|
GST_OBJECT_UNLOCK (src->srcpad);
|
||||||
|
|
||||||
|
if (push_mode) {
|
||||||
|
gst_base_src_set_flushing (src, TRUE);
|
||||||
|
|
||||||
|
GST_PAD_STREAM_LOCK (src->srcpad);
|
||||||
|
gst_base_src_set_flushing (src, FALSE);
|
||||||
|
|
||||||
GST_OBJECT_LOCK (src);
|
GST_OBJECT_LOCK (src);
|
||||||
g_atomic_int_set (&src->priv->has_pending_eos, TRUE);
|
g_atomic_int_set (&src->priv->has_pending_eos, TRUE);
|
||||||
if (src->priv->pending_eos)
|
if (src->priv->pending_eos)
|
||||||
gst_event_unref (src->priv->pending_eos);
|
gst_event_unref (src->priv->pending_eos);
|
||||||
src->priv->pending_eos = event;
|
src->priv->pending_eos = event;
|
||||||
event = NULL;
|
|
||||||
GST_OBJECT_UNLOCK (src);
|
GST_OBJECT_UNLOCK (src);
|
||||||
|
|
||||||
GST_DEBUG_OBJECT (src, "EOS marked, calling unlock");
|
GST_DEBUG_OBJECT (src,
|
||||||
|
"EOS marked, start task for asynchronous handling");
|
||||||
|
gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop,
|
||||||
|
src->srcpad, NULL);
|
||||||
|
|
||||||
|
GST_PAD_STREAM_UNLOCK (src->srcpad);
|
||||||
|
} else {
|
||||||
|
/* In pull mode, we need not to return flushing to downstream, though
|
||||||
|
* the stream lock is not kept after getrange was unblocked */
|
||||||
|
GST_OBJECT_LOCK (src);
|
||||||
|
g_atomic_int_set (&src->priv->has_pending_eos, TRUE);
|
||||||
|
if (src->priv->pending_eos)
|
||||||
|
gst_event_unref (src->priv->pending_eos);
|
||||||
|
src->priv->pending_eos = event;
|
||||||
|
GST_OBJECT_UNLOCK (src);
|
||||||
|
|
||||||
/* unlock the _create function so that we can check the has_pending_eos flag
|
|
||||||
* and we can do EOS. This will eventually release the LIVE_LOCK again so
|
|
||||||
* that we can grab it and stop the unlock again. We don't take the stream
|
|
||||||
* lock so that this operation is guaranteed to never block. */
|
|
||||||
gst_base_src_activate_pool (src, FALSE);
|
gst_base_src_activate_pool (src, FALSE);
|
||||||
if (bclass->unlock)
|
if (bclass->unlock)
|
||||||
bclass->unlock (src);
|
bclass->unlock (src);
|
||||||
|
|
||||||
GST_DEBUG_OBJECT (src, "unlock called, waiting for LIVE_LOCK");
|
GST_PAD_STREAM_LOCK (src->srcpad);
|
||||||
|
|
||||||
GST_LIVE_LOCK (src);
|
|
||||||
GST_DEBUG_OBJECT (src, "LIVE_LOCK acquired, calling unlock_stop");
|
|
||||||
/* now stop the unlock of the streaming thread again. Grabbing the live
|
|
||||||
* lock is enough because that protects the create function. */
|
|
||||||
if (bclass->unlock_stop)
|
if (bclass->unlock_stop)
|
||||||
bclass->unlock_stop (src);
|
bclass->unlock_stop (src);
|
||||||
gst_base_src_activate_pool (src, TRUE);
|
GST_PAD_STREAM_UNLOCK (src->srcpad);
|
||||||
GST_LIVE_UNLOCK (src);
|
}
|
||||||
|
|
||||||
|
|
||||||
|
event = NULL;
|
||||||
result = TRUE;
|
result = TRUE;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -2005,10 +2029,10 @@ gst_base_src_default_event (GstBaseSrc * src, GstEvent * event)
|
||||||
case GST_EVENT_FLUSH_START:
|
case GST_EVENT_FLUSH_START:
|
||||||
/* cancel any blocking getrange, is normally called
|
/* cancel any blocking getrange, is normally called
|
||||||
* when in pull mode. */
|
* when in pull mode. */
|
||||||
result = gst_base_src_set_flushing (src, TRUE, FALSE, NULL);
|
result = gst_base_src_set_flushing (src, TRUE);
|
||||||
break;
|
break;
|
||||||
case GST_EVENT_FLUSH_STOP:
|
case GST_EVENT_FLUSH_STOP:
|
||||||
result = gst_base_src_set_flushing (src, FALSE, TRUE, NULL);
|
result = gst_base_src_set_flushing (src, FALSE);
|
||||||
break;
|
break;
|
||||||
case GST_EVENT_QOS:
|
case GST_EVENT_QOS:
|
||||||
{
|
{
|
||||||
|
@ -2427,7 +2451,7 @@ gst_base_src_get_range (GstBaseSrc * src, guint64 offset, guint length,
|
||||||
again:
|
again:
|
||||||
if (src->is_live) {
|
if (src->is_live) {
|
||||||
if (G_UNLIKELY (!src->live_running)) {
|
if (G_UNLIKELY (!src->live_running)) {
|
||||||
ret = gst_base_src_wait_playing (src);
|
ret = gst_base_src_wait_playing_unlocked (src);
|
||||||
if (ret != GST_FLOW_OK)
|
if (ret != GST_FLOW_OK)
|
||||||
goto stopped;
|
goto stopped;
|
||||||
}
|
}
|
||||||
|
@ -2470,7 +2494,23 @@ again:
|
||||||
|
|
||||||
res_buf = in_buf = *buf;
|
res_buf = in_buf = *buf;
|
||||||
|
|
||||||
|
GST_LIVE_UNLOCK (src);
|
||||||
ret = bclass->create (src, offset, length, &res_buf);
|
ret = bclass->create (src, offset, length, &res_buf);
|
||||||
|
GST_LIVE_LOCK (src);
|
||||||
|
|
||||||
|
/* As we released the LIVE_LOCK, the state may have changed */
|
||||||
|
if (src->is_live) {
|
||||||
|
if (G_UNLIKELY (!src->live_running)) {
|
||||||
|
GstFlowReturn wait_ret;
|
||||||
|
wait_ret = gst_base_src_wait_playing_unlocked (src);
|
||||||
|
if (wait_ret != GST_FLOW_OK) {
|
||||||
|
if (ret == GST_FLOW_OK && *buf == NULL)
|
||||||
|
gst_buffer_unref (res_buf);
|
||||||
|
ret = wait_ret;
|
||||||
|
goto stopped;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/* The create function could be unlocked because we have a pending EOS. It's
|
/* The create function could be unlocked because we have a pending EOS. It's
|
||||||
* possible that we have a valid buffer from create that we need to
|
* possible that we have a valid buffer from create that we need to
|
||||||
|
@ -2677,6 +2717,7 @@ start_failed:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Called with STREAM_LOCK */
|
||||||
static void
|
static void
|
||||||
gst_base_src_loop (GstPad * pad)
|
gst_base_src_loop (GstPad * pad)
|
||||||
{
|
{
|
||||||
|
@ -2698,6 +2739,14 @@ gst_base_src_loop (GstPad * pad)
|
||||||
goto flushing;
|
goto flushing;
|
||||||
GST_LIVE_UNLOCK (src);
|
GST_LIVE_UNLOCK (src);
|
||||||
|
|
||||||
|
/* Just return if EOS is pushed again, as the app might be unaware that an
|
||||||
|
* EOS have been sent already */
|
||||||
|
if (GST_PAD_IS_EOS (pad)) {
|
||||||
|
GST_DEBUG_OBJECT (src, "Pad is marked as EOS, pause the task");
|
||||||
|
gst_pad_pause_task (pad);
|
||||||
|
goto done;
|
||||||
|
}
|
||||||
|
|
||||||
gst_base_src_send_stream_start (src);
|
gst_base_src_send_stream_start (src);
|
||||||
|
|
||||||
/* The stream-start event could've caused something to flush us */
|
/* The stream-start event could've caused something to flush us */
|
||||||
|
@ -3427,7 +3476,7 @@ gst_base_src_start_complete (GstBaseSrc * basesrc, GstFlowReturn ret)
|
||||||
|
|
||||||
/* stop flushing now but for live sources, still block in the LIVE lock when
|
/* stop flushing now but for live sources, still block in the LIVE lock when
|
||||||
* we are not yet PLAYING */
|
* we are not yet PLAYING */
|
||||||
gst_base_src_set_flushing (basesrc, FALSE, FALSE, NULL);
|
gst_base_src_set_flushing (basesrc, FALSE);
|
||||||
|
|
||||||
gst_pad_mark_reconfigure (GST_BASE_SRC_PAD (basesrc));
|
gst_pad_mark_reconfigure (GST_BASE_SRC_PAD (basesrc));
|
||||||
|
|
||||||
|
@ -3546,7 +3595,7 @@ gst_base_src_stop (GstBaseSrc * basesrc)
|
||||||
GST_DEBUG_OBJECT (basesrc, "stopping source");
|
GST_DEBUG_OBJECT (basesrc, "stopping source");
|
||||||
|
|
||||||
/* flush all */
|
/* flush all */
|
||||||
gst_base_src_set_flushing (basesrc, TRUE, FALSE, NULL);
|
gst_base_src_set_flushing (basesrc, TRUE);
|
||||||
/* stop the task */
|
/* stop the task */
|
||||||
gst_pad_stop_task (basesrc->srcpad);
|
gst_pad_stop_task (basesrc->srcpad);
|
||||||
|
|
||||||
|
@ -3579,34 +3628,26 @@ was_stopped:
|
||||||
/* start or stop flushing dataprocessing
|
/* start or stop flushing dataprocessing
|
||||||
*/
|
*/
|
||||||
static gboolean
|
static gboolean
|
||||||
gst_base_src_set_flushing (GstBaseSrc * basesrc,
|
gst_base_src_set_flushing (GstBaseSrc * basesrc, gboolean flushing)
|
||||||
gboolean flushing, gboolean live_play, gboolean * playing)
|
|
||||||
{
|
{
|
||||||
GstBaseSrcClass *bclass;
|
GstBaseSrcClass *bclass;
|
||||||
|
|
||||||
bclass = GST_BASE_SRC_GET_CLASS (basesrc);
|
bclass = GST_BASE_SRC_GET_CLASS (basesrc);
|
||||||
|
|
||||||
GST_DEBUG_OBJECT (basesrc, "flushing %d, live_play %d", flushing, live_play);
|
GST_DEBUG_OBJECT (basesrc, "flushing %d", flushing);
|
||||||
|
|
||||||
if (flushing) {
|
if (flushing) {
|
||||||
gst_base_src_activate_pool (basesrc, FALSE);
|
gst_base_src_activate_pool (basesrc, FALSE);
|
||||||
/* unlock any subclasses, we need to do this before grabbing the
|
/* unlock any subclasses to allow turning off the streaming thread */
|
||||||
* LIVE_LOCK since we hold this lock before going into ::create. We pass an
|
|
||||||
* unlock to the params because of backwards compat (see seek handler)*/
|
|
||||||
if (bclass->unlock)
|
if (bclass->unlock)
|
||||||
bclass->unlock (basesrc);
|
bclass->unlock (basesrc);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* the live lock is released when we are blocked, waiting for playing or
|
/* the live lock is released when we are blocked, waiting for playing,
|
||||||
* when we sync to the clock. */
|
* when we sync to the clock or creating a buffer */
|
||||||
GST_LIVE_LOCK (basesrc);
|
GST_LIVE_LOCK (basesrc);
|
||||||
if (playing)
|
|
||||||
*playing = basesrc->live_running;
|
|
||||||
basesrc->priv->flushing = flushing;
|
basesrc->priv->flushing = flushing;
|
||||||
if (flushing) {
|
if (flushing) {
|
||||||
/* if we are locked in the live lock, signal it to make it flush */
|
|
||||||
basesrc->live_running = TRUE;
|
|
||||||
|
|
||||||
/* clear pending EOS if any */
|
/* clear pending EOS if any */
|
||||||
if (g_atomic_int_get (&basesrc->priv->has_pending_eos)) {
|
if (g_atomic_int_get (&basesrc->priv->has_pending_eos)) {
|
||||||
GST_OBJECT_LOCK (basesrc);
|
GST_OBJECT_LOCK (basesrc);
|
||||||
|
@ -3615,17 +3656,10 @@ gst_base_src_set_flushing (GstBaseSrc * basesrc,
|
||||||
GST_OBJECT_UNLOCK (basesrc);
|
GST_OBJECT_UNLOCK (basesrc);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* step 1, now that we have the LIVE lock, clear our unlock request */
|
/* unblock clock sync (if any) or any other blocking thing */
|
||||||
if (bclass->unlock_stop)
|
|
||||||
bclass->unlock_stop (basesrc);
|
|
||||||
|
|
||||||
/* step 2, unblock clock sync (if any) or any other blocking thing */
|
|
||||||
if (basesrc->clock_id)
|
if (basesrc->clock_id)
|
||||||
gst_clock_id_unschedule (basesrc->clock_id);
|
gst_clock_id_unschedule (basesrc->clock_id);
|
||||||
} else {
|
} else {
|
||||||
/* signal the live source that it can start playing */
|
|
||||||
basesrc->live_running = live_play;
|
|
||||||
|
|
||||||
gst_base_src_activate_pool (basesrc, TRUE);
|
gst_base_src_activate_pool (basesrc, TRUE);
|
||||||
|
|
||||||
/* Drop all delayed events */
|
/* Drop all delayed events */
|
||||||
|
@ -3639,9 +3673,18 @@ gst_base_src_set_flushing (GstBaseSrc * basesrc,
|
||||||
}
|
}
|
||||||
GST_OBJECT_UNLOCK (basesrc);
|
GST_OBJECT_UNLOCK (basesrc);
|
||||||
}
|
}
|
||||||
|
|
||||||
GST_LIVE_SIGNAL (basesrc);
|
GST_LIVE_SIGNAL (basesrc);
|
||||||
GST_LIVE_UNLOCK (basesrc);
|
GST_LIVE_UNLOCK (basesrc);
|
||||||
|
|
||||||
|
if (!flushing) {
|
||||||
|
/* Now wait for the stream lock to be released and clear our unlock request */
|
||||||
|
GST_PAD_STREAM_LOCK (basesrc->srcpad);
|
||||||
|
if (bclass->unlock_stop)
|
||||||
|
bclass->unlock_stop (basesrc);
|
||||||
|
GST_PAD_STREAM_UNLOCK (basesrc->srcpad);
|
||||||
|
}
|
||||||
|
|
||||||
return TRUE;
|
return TRUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3650,17 +3693,6 @@ gst_base_src_set_flushing (GstBaseSrc * basesrc,
|
||||||
static gboolean
|
static gboolean
|
||||||
gst_base_src_set_playing (GstBaseSrc * basesrc, gboolean live_play)
|
gst_base_src_set_playing (GstBaseSrc * basesrc, gboolean live_play)
|
||||||
{
|
{
|
||||||
GstBaseSrcClass *bclass;
|
|
||||||
|
|
||||||
bclass = GST_BASE_SRC_GET_CLASS (basesrc);
|
|
||||||
|
|
||||||
/* unlock subclasses locked in ::create, we only do this when we stop playing. */
|
|
||||||
if (!live_play) {
|
|
||||||
GST_DEBUG_OBJECT (basesrc, "unlock");
|
|
||||||
if (bclass->unlock)
|
|
||||||
bclass->unlock (basesrc);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* we are now able to grab the LIVE lock, when we get it, we can be
|
/* we are now able to grab the LIVE lock, when we get it, we can be
|
||||||
* waiting for PLAYING while blocked in the LIVE cond or we can be waiting
|
* waiting for PLAYING while blocked in the LIVE cond or we can be waiting
|
||||||
* for the clock. */
|
* for the clock. */
|
||||||
|
@ -3678,11 +3710,6 @@ gst_base_src_set_playing (GstBaseSrc * basesrc, gboolean live_play)
|
||||||
if (live_play) {
|
if (live_play) {
|
||||||
gboolean start;
|
gboolean start;
|
||||||
|
|
||||||
/* clear our unlock request when going to PLAYING */
|
|
||||||
GST_DEBUG_OBJECT (basesrc, "unlock stop");
|
|
||||||
if (bclass->unlock_stop)
|
|
||||||
bclass->unlock_stop (basesrc);
|
|
||||||
|
|
||||||
/* for live sources we restart the timestamp correction */
|
/* for live sources we restart the timestamp correction */
|
||||||
GST_OBJECT_LOCK (basesrc);
|
GST_OBJECT_LOCK (basesrc);
|
||||||
basesrc->priv->latency = -1;
|
basesrc->priv->latency = -1;
|
||||||
|
@ -3840,7 +3867,7 @@ gst_base_src_change_state (GstElement * element, GstStateChange transition)
|
||||||
case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
|
case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
|
||||||
GST_DEBUG_OBJECT (basesrc, "PLAYING->PAUSED");
|
GST_DEBUG_OBJECT (basesrc, "PLAYING->PAUSED");
|
||||||
if (gst_base_src_is_live (basesrc)) {
|
if (gst_base_src_is_live (basesrc)) {
|
||||||
/* make sure we block in the live lock in PAUSED */
|
/* make sure we block in the live cond in PAUSED */
|
||||||
gst_base_src_set_playing (basesrc, FALSE);
|
gst_base_src_set_playing (basesrc, FALSE);
|
||||||
no_preroll = TRUE;
|
no_preroll = TRUE;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue