libs/gst/base/gstbasesrc.c: Make sending an EOS event to the basesrc non-blocking even if the implementation does blo...

Original commit message from CVS:
Based on patch by: Bjarne Rosengren <bjarne at axis dot com>
* libs/gst/base/gstbasesrc.c: (gst_base_src_send_event),
(gst_base_src_get_range), (gst_base_src_pad_get_range),
(gst_base_src_loop), (gst_base_src_set_flushing),
(gst_base_src_change_state):
Make sending an EOS event to the basesrc non-blocking even if the
implementation does blocking waits in the create function. This is done
by unlocking the create function when EOS is sent.
Fixes #535218.
This commit is contained in:
Bjarne Rosengren 2008-05-28 13:48:17 +00:00 committed by Wim Taymans
parent 3d2df075e3
commit eae0c4cb9a
2 changed files with 76 additions and 34 deletions

View file

@ -1,3 +1,16 @@
2008-05-28 Wim Taymans <wim.taymans@collabora.co.uk>
Based on patch by: Bjarne Rosengren <bjarne at axis dot com>
* libs/gst/base/gstbasesrc.c: (gst_base_src_send_event),
(gst_base_src_get_range), (gst_base_src_pad_get_range),
(gst_base_src_loop), (gst_base_src_set_flushing),
(gst_base_src_change_state):
Make sending an EOS event to the basesrc non-blocking even if the
implementation does blocking waits in the create function. This is done
by unlocking the create function when EOS is sent.
Fixes #535218.
2008-05-28 Sebastian Dröge <slomo@circular-chaos.org> 2008-05-28 Sebastian Dröge <slomo@circular-chaos.org>
* tools/gst-inspect.c: (print_element_properties_info): * tools/gst-inspect.c: (print_element_properties_info):

View file

@ -155,7 +155,8 @@
* </para> * </para>
* <para> * <para>
* Since GStreamer 0.10.16 an application may send an EOS event to a source * Since GStreamer 0.10.16 an application may send an EOS event to a source
* element to make it send an EOS event downstream. This can typically be done * element to make it perform the EOS logic (send EOS event downstream or post a
* #GST_MESSAGE_SEGMENT_DONE on the bus). This can typically be done
* with the gst_element_send_event() function on the element or its parent bin. * with the gst_element_send_event() function on the element or its parent bin.
* </para> * </para>
* <para> * <para>
@ -234,8 +235,8 @@ struct _GstBaseSrcPrivate
GstEvent *close_segment; GstEvent *close_segment;
GstEvent *start_segment; GstEvent *start_segment;
/* if EOS is pending */ /* if EOS is pending (atomic) */
gboolean pending_eos; gint pending_eos;
/* startup latency is the time it takes between going to PLAYING and producing /* startup latency is the time it takes between going to PLAYING and producing
* the first BUFFER with running_time 0. This value is included in the latency * the first BUFFER with running_time 0. This value is included in the latency
@ -1306,6 +1307,8 @@ gst_base_src_send_event (GstElement * element, GstEvent * event)
src = GST_BASE_SRC (element); src = GST_BASE_SRC (element);
GST_DEBUG_OBJECT (src, "reveived %s event", GST_EVENT_TYPE_NAME (event));
switch (GST_EVENT_TYPE (event)) { switch (GST_EVENT_TYPE (event)) {
/* bidirectional events */ /* bidirectional events */
case GST_EVENT_FLUSH_START: case GST_EVENT_FLUSH_START:
@ -1316,13 +1319,46 @@ gst_base_src_send_event (GstElement * element, GstEvent * event)
/* downstream serialized events */ /* downstream serialized events */
case GST_EVENT_EOS: case GST_EVENT_EOS:
/* queue EOS and make sure the task or pull function {
* performs the EOS actions. */ GstBaseSrcClass *bclass;
bclass = GST_BASE_SRC_GET_CLASS (src);
/* queue EOS and make sure the task or pull function performs the EOS
* actions.
*
* We have two possibilities:
*
* - Before we are to enter the _create function, we check the pending_eos
* first and do EOS instead of entering it.
* - If we are in the _create function or we did not manage to set the
* flag fast enough and we are about to enter the _create function,
* we unlock it so that we exit with WRONG_STATE immediatly. We then
* check the EOS flag and do the EOS logic.
*/
g_atomic_int_set (&src->priv->pending_eos, TRUE);
GST_DEBUG_OBJECT (src, "EOS marked, calling unlock");
/* unlock the _create function so that we can check the pending_eos flag
* and we can do EOS. This will eventually release the LIVE_LOCK again so
* that we can grab it and stop the unlock again. We don't take the stream
* lock so that this operation is guaranteed to never block. */
if (bclass->unlock)
bclass->unlock (src);
GST_DEBUG_OBJECT (src, "unlock called, waiting for LIVE_LOCK");
GST_LIVE_LOCK (src); GST_LIVE_LOCK (src);
src->priv->pending_eos = TRUE; GST_DEBUG_OBJECT (src, "LIVE_LOCK acquired, calling unlock_stop");
/* now stop the unlock of the streaming thread again. Grabbing the live
* lock is enough because that protects the create function. */
if (bclass->unlock_stop)
bclass->unlock_stop (src);
GST_LIVE_UNLOCK (src); GST_LIVE_UNLOCK (src);
result = TRUE; result = TRUE;
break; break;
}
case GST_EVENT_NEWSEGMENT: case GST_EVENT_NEWSEGMENT:
/* sending random NEWSEGMENT downstream can break sync. */ /* sending random NEWSEGMENT downstream can break sync. */
break; break;
@ -1812,11 +1848,28 @@ gst_base_src_get_range (GstBaseSrc * src, guint64 offset, guint length,
src->num_buffers_left--; src->num_buffers_left--;
} }
/* don't enter the create function if a pending EOS event was set. For the
* logic of the pending_eos, check the event function of this class. */
if (G_UNLIKELY (g_atomic_int_get (&src->priv->pending_eos)))
goto eos;
GST_DEBUG_OBJECT (src, GST_DEBUG_OBJECT (src,
"calling create offset %" G_GUINT64_FORMAT " length %u, time %" "calling create offset %" G_GUINT64_FORMAT " length %u, time %"
G_GINT64_FORMAT, offset, length, src->segment.time); G_GINT64_FORMAT, offset, length, src->segment.time);
ret = bclass->create (src, offset, length, buf); ret = bclass->create (src, offset, length, buf);
/* The create function could be unlocked because we have a pending EOS. It's
* possible that we have a valid buffer from create that we need to
* discard when the create function returned _OK. */
if (G_UNLIKELY (g_atomic_int_get (&src->priv->pending_eos))) {
if (ret == GST_FLOW_OK) {
gst_buffer_unref (*buf);
*buf = NULL;
}
goto eos;
}
if (G_UNLIKELY (ret != GST_FLOW_OK)) if (G_UNLIKELY (ret != GST_FLOW_OK))
goto not_ok; goto not_ok;
@ -1836,9 +1889,6 @@ gst_base_src_get_range (GstBaseSrc * src, guint64 offset, guint length,
if (G_UNLIKELY (src->priv->flushing)) if (G_UNLIKELY (src->priv->flushing))
goto flushing; goto flushing;
if (G_UNLIKELY (src->priv->pending_eos))
goto eos;
switch (status) { switch (status) {
case GST_CLOCK_EARLY: case GST_CLOCK_EARLY:
/* the buffer is too late. We currently don't drop the buffer. */ /* the buffer is too late. We currently don't drop the buffer. */
@ -1915,8 +1965,6 @@ flushing:
eos: eos:
{ {
GST_DEBUG_OBJECT (src, "we are EOS"); GST_DEBUG_OBJECT (src, "we are EOS");
gst_buffer_unref (*buf);
*buf = NULL;
return GST_FLOW_UNEXPECTED; return GST_FLOW_UNEXPECTED;
} }
} }
@ -1934,10 +1982,6 @@ gst_base_src_pad_get_range (GstPad * pad, guint64 offset, guint length,
if (G_UNLIKELY (src->priv->flushing)) if (G_UNLIKELY (src->priv->flushing))
goto flushing; goto flushing;
/* if we're EOS, return right away */
if (G_UNLIKELY (src->priv->pending_eos))
goto eos;
res = gst_base_src_get_range (src, offset, length, buf); res = gst_base_src_get_range (src, offset, length, buf);
done: done:
@ -1954,12 +1998,6 @@ flushing:
res = GST_FLOW_WRONG_STATE; res = GST_FLOW_WRONG_STATE;
goto done; goto done;
} }
eos:
{
GST_DEBUG_OBJECT (src, "we are EOS");
res = GST_FLOW_UNEXPECTED;
goto done;
}
} }
static gboolean static gboolean
@ -2034,13 +2072,10 @@ gst_base_src_loop (GstPad * pad)
src = GST_BASE_SRC (GST_OBJECT_PARENT (pad)); src = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
GST_LIVE_LOCK (src); GST_LIVE_LOCK (src);
if (G_UNLIKELY (src->priv->flushing)) if (G_UNLIKELY (src->priv->flushing))
goto flushing; goto flushing;
/* if we're EOS, return right away */
if (G_UNLIKELY (src->priv->pending_eos))
goto eos;
src->priv->last_sent_eos = FALSE; src->priv->last_sent_eos = FALSE;
blocksize = src->blocksize; blocksize = src->blocksize;
@ -2179,13 +2214,6 @@ flushing:
ret = GST_FLOW_WRONG_STATE; ret = GST_FLOW_WRONG_STATE;
goto pause; goto pause;
} }
eos:
{
GST_DEBUG_OBJECT (src, "we are EOS");
GST_LIVE_UNLOCK (src);
ret = GST_FLOW_UNEXPECTED;
goto pause;
}
pause: pause:
{ {
const gchar *reason = gst_flow_get_name (ret); const gchar *reason = gst_flow_get_name (ret);
@ -2468,8 +2496,9 @@ gst_base_src_set_flushing (GstBaseSrc * basesrc,
if (flushing) { if (flushing) {
/* if we are locked in the live lock, signal it to make it flush */ /* if we are locked in the live lock, signal it to make it flush */
basesrc->live_running = TRUE; basesrc->live_running = TRUE;
/* clear pending EOS if any */ /* clear pending EOS if any */
basesrc->priv->pending_eos = FALSE; g_atomic_int_set (&basesrc->priv->pending_eos, FALSE);
/* step 1, now that we have the LIVE lock, clear our unlock request */ /* step 1, now that we have the LIVE lock, clear our unlock request */
if (bclass->unlock_stop) if (bclass->unlock_stop)
@ -2725,7 +2754,7 @@ gst_base_src_change_state (GstElement * element, GstStateChange transition)
gst_pad_push_event (basesrc->srcpad, gst_event_new_eos ()); gst_pad_push_event (basesrc->srcpad, gst_event_new_eos ());
basesrc->priv->last_sent_eos = TRUE; basesrc->priv->last_sent_eos = TRUE;
} }
basesrc->priv->pending_eos = FALSE; g_atomic_int_set (&basesrc->priv->pending_eos, FALSE);
event_p = &basesrc->data.ABI.pending_seek; event_p = &basesrc->data.ABI.pending_seek;
gst_event_replace (event_p, NULL); gst_event_replace (event_p, NULL);
event_p = &basesrc->priv->close_segment; event_p = &basesrc->priv->close_segment;