diff --git a/ChangeLog b/ChangeLog index f3ee6eaf6d..2a4f6516bb 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,17 @@ +2007-12-19 Wim Taymans + + * libs/gst/base/gstbasesrc.c: (gst_base_src_send_event), + (gst_base_src_get_range), (gst_base_src_pad_get_range), + (gst_base_src_loop), (gst_base_src_set_flushing), + (gst_base_src_change_state): + Allow sending EOS to the source to make it send out an EOS event from + the streaming thread. + Update docs and deprecate the old NULL/READY shutdown method. + + * tests/check/libs/basesrc.c: (GST_START_TEST), + (gst_basesrc_suite): + Add unit test for controlled shutdown. + 2007-12-19 Wim Taymans * docs/design/part-synchronisation.txt: diff --git a/libs/gst/base/gstbasesrc.c b/libs/gst/base/gstbasesrc.c index 11ac86c6dc..d113e35b4f 100644 --- a/libs/gst/base/gstbasesrc.c +++ b/libs/gst/base/gstbasesrc.c @@ -154,36 +154,21 @@ * been processed and the pipeline can safely be stopped. * * - * Since GStreamer 0.10.3 an application may simply set the source - * element to NULL or READY state to make it send an EOS event downstream. - * The application should lock the state of the source afterwards, so that - * shutting down the pipeline from PLAYING doesn't temporarily start up the - * source element for a second time: - * - * ... - * // stop recording - * gst_element_set_state (audio_source, #GST_STATE_NULL); - * gst_element_set_locked_state (audio_source, %TRUE); - * ... - * - * Now the application should wait for an EOS message - * to be posted on the pipeline's bus. Once it has received - * an EOS message, it may safely shut down the entire pipeline: - * - * ... - * // everything done - shut down pipeline - * gst_element_set_state (pipeline, #GST_STATE_NULL); - * gst_element_set_locked_state (audio_source, %FALSE); - * ... - * + * Since GStreamer 0.10.16 an application may send an EOS event to a source + * element to make it send an EOS event downstream. This can typically be done + * with the gst_element_send_event() function on the element or its parent bin. * * - * Note that setting the source element to NULL or READY when the - * pipeline is in the PAUSED state may cause a deadlock since the streaming - * thread might be blocked in PREROLL. + * After the EOS has been sent to the element, the application should wait for + * an EOS message to be posted on the pipeline's bus. Once this EOS message is + * received, it may safely shut down the entire pipeline. * * - * Last reviewed on 2007-09-13 (0.10.15) + * The old behaviour for controlled shutdown introduced since GStreamer 0.10.3 + * is still available but deprecated as it is dangerous and less flexible. + * + * + * Last reviewed on 2007-12-19 (0.10.16) * * */ @@ -249,6 +234,9 @@ struct _GstBaseSrcPrivate GstEvent *close_segment; GstEvent *start_segment; + /* if EOS is pending */ + gboolean pending_eos; + /* startup latency is the time it takes between going to PLAYING and producing * the first BUFFER with running_time 0. This value is included in the latency * reporting. */ @@ -1278,8 +1266,12 @@ gst_base_src_send_event (GstElement * element, GstEvent * event) /* downstream serialized events */ case GST_EVENT_EOS: - /* FIXME, queue EOS and make sure the task or pull function - * perform the EOS actions. */ + /* queue EOS and make sure the task or pull function + * performs the EOS actions. */ + GST_LIVE_LOCK (src); + src->priv->pending_eos = TRUE; + GST_LIVE_UNLOCK (src); + result = TRUE; break; case GST_EVENT_NEWSEGMENT: /* sending random NEWSEGMENT downstream can break sync. */ @@ -1787,9 +1779,12 @@ gst_base_src_get_range (GstBaseSrc * src, guint64 offset, guint length, status = gst_base_src_do_sync (src, *buf); /* waiting for the clock could have made us flushing */ - if (src->priv->flushing) + if (G_UNLIKELY (src->priv->flushing)) goto flushing; + if (G_UNLIKELY (src->priv->pending_eos)) + goto eos; + switch (status) { case GST_CLOCK_EARLY: /* the buffer is too late. We currently don't drop the buffer. */ @@ -1863,6 +1858,13 @@ flushing: *buf = NULL; return GST_FLOW_WRONG_STATE; } +eos: + { + GST_DEBUG_OBJECT (src, "we are EOS"); + gst_buffer_unref (*buf); + *buf = NULL; + return GST_FLOW_UNEXPECTED; + } } static GstFlowReturn @@ -1875,9 +1877,13 @@ gst_base_src_pad_get_range (GstPad * pad, guint64 offset, guint length, src = GST_BASE_SRC (gst_pad_get_parent (pad)); GST_LIVE_LOCK (src); - if (src->priv->flushing) + if (G_UNLIKELY (src->priv->flushing)) goto flushing; + /* if we're EOS, return right away */ + if (G_UNLIKELY (src->priv->pending_eos)) + goto eos; + res = gst_base_src_get_range (src, offset, length, buf); done: @@ -1894,6 +1900,12 @@ flushing: res = GST_FLOW_WRONG_STATE; goto done; } +eos: + { + GST_DEBUG_OBJECT (src, "we are EOS"); + res = GST_FLOW_UNEXPECTED; + goto done; + } } static gboolean @@ -1967,9 +1979,13 @@ gst_base_src_loop (GstPad * pad) src = GST_BASE_SRC (GST_OBJECT_PARENT (pad)); GST_LIVE_LOCK (src); - if (src->priv->flushing) + if (G_UNLIKELY (src->priv->flushing)) goto flushing; + /* if we're EOS, return right away */ + if (G_UNLIKELY (src->priv->pending_eos)) + goto eos; + src->priv->last_sent_eos = FALSE; /* if we operate in bytes, we can calculate an offset */ @@ -1990,11 +2006,11 @@ gst_base_src_loop (GstPad * pad) goto null_buffer; /* push events to close/start our segment before we push the buffer. */ - if (src->priv->close_segment) { + if (G_UNLIKELY (src->priv->close_segment)) { gst_pad_push_event (pad, src->priv->close_segment); src->priv->close_segment = NULL; } - if (src->priv->start_segment) { + if (G_UNLIKELY (src->priv->start_segment)) { gst_pad_push_event (pad, src->priv->start_segment); src->priv->start_segment = NULL; } @@ -2051,7 +2067,7 @@ gst_base_src_loop (GstPad * pad) goto pause; } - if (eos) { + if (G_UNLIKELY (eos)) { GST_INFO_OBJECT (src, "pausing after end of segment"); ret = GST_FLOW_UNEXPECTED; goto pause; @@ -2068,6 +2084,13 @@ flushing: ret = GST_FLOW_WRONG_STATE; goto pause; } +eos: + { + GST_DEBUG_OBJECT (src, "we are EOS"); + GST_LIVE_UNLOCK (src); + ret = GST_FLOW_UNEXPECTED; + goto pause; + } pause: { const gchar *reason = gst_flow_get_name (ret); @@ -2340,6 +2363,8 @@ gst_base_src_set_flushing (GstBaseSrc * basesrc, if (flushing) { /* if we are locked in the live lock, signal it to make it flush */ basesrc->live_running = TRUE; + /* clear pending EOS if any */ + basesrc->priv->pending_eos = FALSE; /* step 1, now that we have the LIVE lock, clear our unlock request */ if (bclass->unlock_stop) @@ -2595,6 +2620,7 @@ gst_base_src_change_state (GstElement * element, GstStateChange transition) gst_pad_push_event (basesrc->srcpad, gst_event_new_eos ()); basesrc->priv->last_sent_eos = TRUE; } + basesrc->priv->pending_eos = FALSE; event_p = &basesrc->data.ABI.pending_seek; gst_event_replace (event_p, NULL); event_p = &basesrc->priv->close_segment; diff --git a/tests/check/libs/basesrc.c b/tests/check/libs/basesrc.c index 77a022d1b9..0b5a51c99a 100644 --- a/tests/check/libs/basesrc.c +++ b/tests/check/libs/basesrc.c @@ -328,6 +328,160 @@ GST_START_TEST (basesrc_eos_events_pull) GST_END_TEST; +/* basesrc_eos_events_push_live_eos: + * - make sure the source stops and emits EOS when we send an EOS event to the + * pipeline. + */ +GST_START_TEST (basesrc_eos_events_push_live_eos) +{ + GstStateChangeReturn state_ret; + GstElement *src, *sink, *pipe; + GstMessage *msg; + GstBus *bus; + GstPad *srcpad; + guint probe, num_eos = 0; + gboolean res; + + pipe = gst_pipeline_new ("pipeline"); + sink = gst_element_factory_make ("fakesink", "sink"); + src = gst_element_factory_make ("fakesrc", "src"); + + g_assert (pipe != NULL); + g_assert (sink != NULL); + g_assert (src != NULL); + + fail_unless (gst_bin_add (GST_BIN (pipe), src) == TRUE); + fail_unless (gst_bin_add (GST_BIN (pipe), sink) == TRUE); + + fail_unless (gst_element_link (src, sink) == TRUE); + + g_object_set (sink, "can-activate-push", TRUE, NULL); + g_object_set (sink, "can-activate-pull", FALSE, NULL); + + g_object_set (src, "can-activate-push", TRUE, NULL); + g_object_set (src, "can-activate-pull", FALSE, NULL); + + /* set up event probe to count EOS events */ + srcpad = gst_element_get_pad (src, "src"); + fail_unless (srcpad != NULL); + + probe = gst_pad_add_event_probe (srcpad, + G_CALLBACK (eos_event_counter), &num_eos); + + bus = gst_element_get_bus (pipe); + + gst_element_set_state (pipe, GST_STATE_PLAYING); + state_ret = gst_element_get_state (pipe, NULL, NULL, -1); + fail_unless (state_ret == GST_STATE_CHANGE_SUCCESS); + + /* wait a second, then emit the EOS */ + g_usleep (GST_USECOND * 1); + + /* shut down source only (should send EOS event) ... */ + res = gst_element_send_event (pipe, gst_event_new_eos ()); + fail_unless (res == TRUE); + + /* ... and wait for the EOS message from the sink */ + msg = gst_bus_poll (bus, GST_MESSAGE_EOS | GST_MESSAGE_ERROR, -1); + fail_unless (msg != NULL); + fail_unless (GST_MESSAGE_TYPE (msg) != GST_MESSAGE_ERROR); + fail_unless (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_EOS); + + /* should be exactly one EOS event */ + fail_unless (num_eos == 1); + + gst_element_set_state (pipe, GST_STATE_NULL); + gst_element_get_state (pipe, NULL, NULL, -1); + + /* make sure source hasn't sent a second one when going PAUSED => READY */ + fail_unless (num_eos == 1); + + gst_pad_remove_event_probe (srcpad, probe); + gst_object_unref (srcpad); + gst_message_unref (msg); + gst_object_unref (bus); + gst_object_unref (pipe); +} + +GST_END_TEST; + +/* basesrc_eos_events_pull_live_eos: + * - make sure the source stops and emits EOS when we send an EOS event to the + * pipeline. + */ +GST_START_TEST (basesrc_eos_events_pull_live_eos) +{ + GstStateChangeReturn state_ret; + GstElement *src, *sink, *pipe; + GstMessage *msg; + GstBus *bus; + GstPad *srcpad; + guint probe, num_eos = 0; + gboolean res; + + pipe = gst_pipeline_new ("pipeline"); + sink = gst_element_factory_make ("fakesink", "sink"); + src = gst_element_factory_make ("fakesrc", "src"); + + g_assert (pipe != NULL); + g_assert (sink != NULL); + g_assert (src != NULL); + + fail_unless (gst_bin_add (GST_BIN (pipe), src) == TRUE); + fail_unless (gst_bin_add (GST_BIN (pipe), sink) == TRUE); + + fail_unless (gst_element_link (src, sink) == TRUE); + + g_object_set (sink, "can-activate-push", FALSE, NULL); + g_object_set (sink, "can-activate-pull", TRUE, NULL); + + g_object_set (src, "can-activate-push", FALSE, NULL); + g_object_set (src, "can-activate-pull", TRUE, NULL); + + /* set up event probe to count EOS events */ + srcpad = gst_element_get_pad (src, "src"); + fail_unless (srcpad != NULL); + + probe = gst_pad_add_event_probe (srcpad, + G_CALLBACK (eos_event_counter), &num_eos); + + bus = gst_element_get_bus (pipe); + + gst_element_set_state (pipe, GST_STATE_PLAYING); + state_ret = gst_element_get_state (pipe, NULL, NULL, -1); + fail_unless (state_ret == GST_STATE_CHANGE_SUCCESS); + + /* wait a second, then emit the EOS */ + g_usleep (GST_USECOND * 1); + + /* shut down source only (should send EOS event) ... */ + res = gst_element_send_event (pipe, gst_event_new_eos ()); + fail_unless (res == TRUE); + + /* ... and wait for the EOS message from the sink */ + msg = gst_bus_poll (bus, GST_MESSAGE_EOS | GST_MESSAGE_ERROR, -1); + fail_unless (msg != NULL); + fail_unless (GST_MESSAGE_TYPE (msg) != GST_MESSAGE_ERROR); + fail_unless (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_EOS); + + /* no EOS in pull mode */ + fail_unless (num_eos == 0); + + gst_element_set_state (pipe, GST_STATE_NULL); + gst_element_get_state (pipe, NULL, NULL, -1); + + /* make sure source hasn't sent a second one when going PAUSED => READY */ + fail_unless (num_eos == 0); + + gst_pad_remove_event_probe (srcpad, probe); + gst_object_unref (srcpad); + gst_message_unref (msg); + gst_object_unref (bus); + gst_object_unref (pipe); +} + +GST_END_TEST; + Suite * gst_basesrc_suite (void) { @@ -339,6 +493,8 @@ gst_basesrc_suite (void) tcase_add_test (tc, basesrc_eos_events_push); tcase_add_test (tc, basesrc_eos_events_push_live_op); tcase_add_test (tc, basesrc_eos_events_pull_live_op); + tcase_add_test (tc, basesrc_eos_events_push_live_eos); + tcase_add_test (tc, basesrc_eos_events_pull_live_eos); return s; }