From a25cedb415a2bfb8646376866066f278960943e1 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Mon, 12 Feb 2007 11:32:22 +0000 Subject: [PATCH] docs/design/draft-latency.txt: Small update. Original commit message from CVS: * docs/design/draft-latency.txt: Small update. * docs/libs/gstreamer-libs-sections.txt: * libs/gst/base/gstbasesink.c: (gst_base_sink_class_init), (gst_base_sink_get_latency), (gst_base_sink_query_latency), (gst_base_sink_wait_clock), (gst_base_sink_send_qos), (gst_base_sink_perform_qos), (gst_base_sink_queue_object_unlocked), (gst_base_sink_chain_unlocked), (gst_base_sink_send_event), (gst_base_sink_get_position), (gst_base_sink_query), (gst_base_sink_change_state): * libs/gst/base/gstbasesink.h: API: gst_base_sink_query_latency() to let subclasses query the upstream latency. API: gst_base_sink_get_latency() to let subclasses query the configured latency in the sink. Implement query and set latency. Update some docs. As spotted by Will Newton : Make sure we don't continue preroll when we are flushing. Fixes #405284. * tests/check/pipelines/stress.c: (change_state_timeout), (quit_timeout), (GST_START_TEST), (stress_suite): Test for #405284. --- ChangeLog | 27 ++++ docs/design/draft-latency.txt | 3 +- docs/libs/gstreamer-libs-sections.txt | 2 + libs/gst/base/gstbasesink.c | 195 +++++++++++++++++++++++--- libs/gst/base/gstbasesink.h | 4 + tests/check/pipelines/stress.c | 60 ++++++++ 6 files changed, 267 insertions(+), 24 deletions(-) diff --git a/ChangeLog b/ChangeLog index ecd9d1d54b..6957ca89b9 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,30 @@ +2007-02-12 Wim Taymans,,, + + * docs/design/draft-latency.txt: + Small update. + + * docs/libs/gstreamer-libs-sections.txt: + * libs/gst/base/gstbasesink.c: (gst_base_sink_class_init), + (gst_base_sink_get_latency), (gst_base_sink_query_latency), + (gst_base_sink_wait_clock), (gst_base_sink_send_qos), + (gst_base_sink_perform_qos), (gst_base_sink_queue_object_unlocked), + (gst_base_sink_chain_unlocked), (gst_base_sink_send_event), + (gst_base_sink_get_position), (gst_base_sink_query), + (gst_base_sink_change_state): + * libs/gst/base/gstbasesink.h: + API: gst_base_sink_query_latency() to let subclasses query the upstream + latency. + API: gst_base_sink_get_latency() to let subclasses query the configured + latency in the sink. + Implement query and set latency. + Update some docs. + As spotted by Will Newton : Make sure we + don't continue preroll when we are flushing. Fixes #405284. + + * tests/check/pipelines/stress.c: (change_state_timeout), + (quit_timeout), (GST_START_TEST), (stress_suite): + Test for #405284. + 2007-02-09 Tim-Philipp Müller Patch by: René Stadler diff --git a/docs/design/draft-latency.txt b/docs/design/draft-latency.txt index 90ca400b21..583e213394 100644 --- a/docs/design/draft-latency.txt +++ b/docs/design/draft-latency.txt @@ -313,7 +313,8 @@ Dynamically adjusting latency An element that want to change the latency in the pipeline can do this by posting a LATENCY message on the bus. This message instructs the pipeline to: - - query the latency in the pipeline (which might now have changed) + - query the latency in the pipeline (which might now have changed) with a + LATENCY query. - redistribute a new global latency to all elements with a LATENCY event. A use case where the latency in a pipeline can change could be a network element diff --git a/docs/libs/gstreamer-libs-sections.txt b/docs/libs/gstreamer-libs-sections.txt index 46953ef9fb..692d4a3977 100644 --- a/docs/libs/gstreamer-libs-sections.txt +++ b/docs/libs/gstreamer-libs-sections.txt @@ -176,6 +176,8 @@ gst_base_src_get_type GstBaseSink GstBaseSinkClass +gst_base_sink_query_latency +gst_base_sink_get_latency gst_base_sink_wait_preroll gst_base_sink_set_sync gst_base_sink_get_sync diff --git a/libs/gst/base/gstbasesink.c b/libs/gst/base/gstbasesink.c index 8ad9c5aaef..3f51932be5 100644 --- a/libs/gst/base/gstbasesink.c +++ b/libs/gst/base/gstbasesink.c @@ -125,8 +125,10 @@ * * The qos property will enable the quality-of-service features of the basesink * which gather statistics about the real-time performance of the clock - * synchronisation. For each dropped buffer it will also send a QoS message - * upstream. + * synchronisation. For each buffer received in the sink, statistics are + * gathered and a QOS event is send upstream with these numbers. This + * information can then be used by upstream elements to reduce their processing + * rate, for example. * * Last reviewed on 2006-09-27 (0.10.11) */ @@ -187,6 +189,9 @@ struct _GstBaseSinkPrivate /* number of rendered and dropped frames */ guint64 rendered; guint64 dropped; + + /* latency stuff */ + GstClockTime latency; }; #define DO_RUNNING_AVG(avg,val,size) (((val) + ((size)-1) * (avg)) / (size)) @@ -286,6 +291,7 @@ static gboolean gst_base_sink_pad_activate (GstPad * pad); static gboolean gst_base_sink_pad_activate_push (GstPad * pad, gboolean active); static gboolean gst_base_sink_pad_activate_pull (GstPad * pad, gboolean active); static gboolean gst_base_sink_event (GstPad * pad, GstEvent * event); +static gboolean gst_base_sink_peer_query (GstBaseSink * sink, GstQuery * query); /* check if an object was too late */ static gboolean gst_base_sink_is_too_late (GstBaseSink * basesink, @@ -334,8 +340,9 @@ gst_base_sink_class_init (GstBaseSinkClass * klass) G_PARAM_READWRITE)); g_object_class_install_property (gobject_class, PROP_QOS, - g_param_spec_boolean ("qos", "Qos", "Generate QoS events upstream", - DEFAULT_QOS, G_PARAM_READWRITE)); + g_param_spec_boolean ("qos", "Qos", + "Generate Quality-of-Service events upstream", DEFAULT_QOS, + G_PARAM_READWRITE)); gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_base_sink_change_state); @@ -597,7 +604,7 @@ gst_base_sink_get_max_lateness (GstBaseSink * sink) * @sink: the sink * @enabled: the new qos value. * - * Configures @sink to send QoS events upstream. + * Configures @sink to send Quality-of-Service events upstream. * * Since: 0.10.5 */ @@ -611,10 +618,10 @@ gst_base_sink_set_qos_enabled (GstBaseSink * sink, gboolean enabled) * gst_base_sink_is_qos_enabled: * @sink: the sink * - * Checks if @sink is currently configured to send QoS events + * Checks if @sink is currently configured to send Quality-of-Service events * upstream. * - * Returns: TRUE if the sink is configured to perform QoS. + * Returns: TRUE if the sink is configured to perform Quality-of-Service. * * Since: 0.10.5 */ @@ -628,6 +635,101 @@ gst_base_sink_is_qos_enabled (GstBaseSink * sink) return res; } +/** + * gst_base_sink_get_latency: + * @sink: the sink + * + * Get the currently configured latency. + * + * Returns: The configured latency. + * + * Since: 0.10.12 + */ +GstClockTime +gst_base_sink_get_latency (GstBaseSink * sink) +{ + GstClockTime res; + + GST_OBJECT_LOCK (sink); + res = sink->priv->latency; + GST_OBJECT_UNLOCK (sink); + + return res; +} + +/** + * gst_base_sink_query_latency: + * @sink: the sink + * @live: if the sink is live + * @upstream_live: if an upstream element is live + * @min_latency: the min latency of the upstream elements + * @max_latency: the max latency of the upstream elements + * + * Query the sink for the latency parameters. The latency will be queried from + * the upstream elements. @live will be TRUE if @sink is configured to + * synchronize against the clock. @upstream_live will be TRUE if an upstream + * element is live. + * + * If both @live and @upstream_live are TRUE, the sink will want to compensate + * for the latency introduced by the upstream elements by setting the + * @min_latency to a strictly possitive value. + * + * This function is mostly used by subclasses. + * + * Returns: TRUE if the query succeeded. + * + * Since: 0.10.12 + */ +gboolean +gst_base_sink_query_latency (GstBaseSink * sink, gboolean * live, + gboolean * upstream_live, GstClockTime * min_latency, + GstClockTime * max_latency) +{ + gboolean l, us_live, res; + GstClockTime min, max; + GstQuery *query; + + /* we are live when we sync to the clock */ + l = gst_base_sink_get_sync (sink); + + /* assume no latency */ + min = 0; + max = -1; + us_live = FALSE; + + query = gst_query_new_latency (); + + /* ask the peer for the latency */ + if ((res = gst_base_sink_peer_query (sink, query))) { + GstClockTime us_min, us_max; + + /* get upstream min and max latency */ + gst_query_parse_latency (query, &us_live, &us_min, &us_max); + if (us_live) { + /* upstream live, use its latency, subclasses should use these + * values to create the complete latency. */ + min = us_min; + max = us_max; + } + } + gst_query_unref (query); + + GST_DEBUG_OBJECT (sink, "latency query: live: %d, upstream: %d, min %" + GST_TIME_FORMAT ", max %" GST_TIME_FORMAT, l, us_live, + GST_TIME_ARGS (min), GST_TIME_ARGS (max)); + + if (live) + *live = l; + if (upstream_live) + *upstream_live = us_live; + if (min_latency) + *min_latency = min; + if (max_latency) + *max_latency = max; + + return res; +} + static void gst_base_sink_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec) @@ -1041,7 +1143,6 @@ gst_base_sink_wait_clock (GstBaseSink * basesink, GstClockTime time, GstClockID id; GstClockReturn ret; GstClock *clock; - GstClockTime base_time; if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time))) goto invalid_time; @@ -1053,8 +1154,11 @@ gst_base_sink_wait_clock (GstBaseSink * basesink, GstClockTime time, if (G_UNLIKELY ((clock = GST_ELEMENT_CLOCK (basesink)) == NULL)) goto no_clock; - base_time = GST_ELEMENT_CAST (basesink)->base_time; - id = gst_clock_new_single_shot_id (clock, base_time + time); + /* add base time and latency */ + time += GST_ELEMENT_CAST (basesink)->base_time; + time += basesink->priv->latency; + + id = gst_clock_new_single_shot_id (clock, time); GST_OBJECT_UNLOCK (basesink); basesink->clock_id = id; @@ -1275,7 +1379,7 @@ gst_base_sink_send_qos (GstBaseSink * basesink, GstEvent *event; gboolean res; - /* generate QoS event */ + /* generate Quality-of-Service event */ GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink, "qos: proportion: %lf, diff %" G_GINT64_FORMAT ", timestamp %" GST_TIME_FORMAT, proportion, diff, GST_TIME_ARGS (time)); @@ -1302,7 +1406,7 @@ gst_base_sink_perform_qos (GstBaseSink * sink, gboolean dropped) start = priv->current_rstart; - /* if QoS disabled, do nothing */ + /* if Quality-of-Service disabled, do nothing */ if (!g_atomic_int_get (&priv->qos_enabled) || start == -1) return; @@ -1739,6 +1843,8 @@ gst_base_sink_queue_object_unlocked (GstBaseSink * basesink, GstPad * pad, /* FIXME, do something with the return value? */ ret = gst_base_sink_render_object (basesink, pad, o); + if (ret != GST_FLOW_OK) + goto dequeue_failed; } /* now render the object */ @@ -1763,6 +1869,13 @@ more_preroll: g_queue_push_tail (basesink->preroll_queue, obj); return GST_FLOW_OK; } +dequeue_failed: + { + GST_DEBUG_OBJECT (basesink, "rendering queued objects failed, reason %s", + gst_flow_get_name (ret)); + gst_mini_object_unref (obj); + return ret; + } } /* with STREAM_LOCK @@ -1962,10 +2075,7 @@ gst_base_sink_chain_unlocked (GstBaseSink * basesink, GstPad * pad, if (G_UNLIKELY (!basesink->have_newsegment)) { gboolean sync; - GST_OBJECT_LOCK (basesink); - sync = basesink->sync; - GST_OBJECT_UNLOCK (basesink); - + sync = gst_base_sink_get_sync (basesink); if (sync) { GST_ELEMENT_WARNING (basesink, STREAM, FAILED, (_("Internal data flow problem.")), @@ -2359,17 +2469,36 @@ gst_base_sink_send_event (GstElement * element, GstEvent * event) { GstPad *pad; GstBaseSink *basesink = GST_BASE_SINK (element); - gboolean result; + gboolean forward = TRUE, result = TRUE; - GST_OBJECT_LOCK (element); - pad = basesink->sinkpad; - gst_object_ref (pad); - GST_OBJECT_UNLOCK (element); + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_LATENCY: + { + GstClockTime latency; - result = gst_pad_push_event (pad, event); + gst_event_parse_latency (event, &latency); - gst_object_unref (pad); + GST_OBJECT_LOCK (element); + basesink->priv->latency = latency; + GST_OBJECT_UNLOCK (element); + GST_DEBUG_OBJECT (basesink, "latency set to %" GST_TIME_FORMAT, + GST_TIME_ARGS (latency)); + forward = FALSE; + break; + } + default: + break; + } + if (forward) { + GST_OBJECT_LOCK (element); + pad = gst_object_ref (basesink->sinkpad); + GST_OBJECT_UNLOCK (element); + + result = gst_pad_push_event (pad, event); + + gst_object_unref (pad); + } return result; } @@ -2579,7 +2708,25 @@ gst_base_sink_query (GstElement * element, GstQuery * query) res = gst_base_sink_peer_query (basesink, query); break; case GST_QUERY_LATENCY: + { + gboolean live, us_live; + GstClockTime min, max; + + if ((res = + gst_base_sink_query_latency (basesink, &live, &us_live, &min, + &max))) { + /* if we or the upstream elements are not live, we don't need latency + * compensation */ + if (!live || !us_live) { + GST_DEBUG_OBJECT (basesink, + "no latency compensation, we or upstream are not live"); + min = 0; + max = -1; + } + gst_query_set_latency (query, live, min, max); + } break; + } case GST_QUERY_JITTER: break; case GST_QUERY_RATE: @@ -2633,6 +2780,7 @@ gst_base_sink_change_state (GstElement * element, GstStateChange transition) basesink->priv->current_sstart = 0; basesink->priv->current_sstop = 0; basesink->priv->eos_rtime = -1; + basesink->priv->latency = 0; basesink->eos = FALSE; gst_base_sink_reset_qos (basesink); ret = GST_STATE_CHANGE_ASYNC; @@ -2676,6 +2824,7 @@ gst_base_sink_change_state (GstElement * element, GstStateChange transition) switch (transition) { case GST_STATE_CHANGE_PLAYING_TO_PAUSED: GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED"); + /* FIXME, make sure we cannot enter _render first */ /* we need to call ::unlock before locking PREROLL_LOCK * since we lock it before going into ::render */ diff --git a/libs/gst/base/gstbasesink.h b/libs/gst/base/gstbasesink.h index 0f10a7e759..589ed30785 100644 --- a/libs/gst/base/gstbasesink.h +++ b/libs/gst/base/gstbasesink.h @@ -190,6 +190,10 @@ gint64 gst_base_sink_get_max_lateness (GstBaseSink *sink); void gst_base_sink_set_qos_enabled (GstBaseSink *sink, gboolean enabled); gboolean gst_base_sink_is_qos_enabled (GstBaseSink *sink); +gboolean gst_base_sink_query_latency (GstBaseSink *sink, gboolean *live, gboolean *upstream_live, + GstClockTime *min_latency, GstClockTime *max_latency); +GstClockTime gst_base_sink_get_latency (GstBaseSink *sink); + G_END_DECLS #endif /* __GST_BASE_SINK_H__ */ diff --git a/tests/check/pipelines/stress.c b/tests/check/pipelines/stress.c index 0116685f9b..9d63ea5c4d 100644 --- a/tests/check/pipelines/stress.c +++ b/tests/check/pipelines/stress.c @@ -20,6 +20,64 @@ #include +static int playing = 1; +static int quit = 0; + +static gboolean +change_state_timeout (gpointer data) +{ + GstElement *pipeline = (GstElement *) data; + + if (quit) + return FALSE; + + if (playing) { + playing = 0; + gst_element_set_state (pipeline, GST_STATE_NULL); + } else { + playing = 1; + gst_element_set_state (pipeline, GST_STATE_PLAYING); + } + + return TRUE; +} + +static gboolean +quit_timeout (gpointer data) +{ + quit = 1; + return FALSE; +} + +GST_START_TEST (test_stress_preroll) +{ + GstElement *fakesrc, *fakesink; + GstElement *pipeline; + + fakesrc = gst_element_factory_make ("fakesrc", NULL); + fakesink = gst_element_factory_make ("fakesink", NULL); + pipeline = gst_element_factory_make ("pipeline", NULL); + + g_return_if_fail (fakesrc && fakesink && pipeline); + + g_object_set (G_OBJECT (fakesink), "preroll-queue-len", 4, NULL); + + gst_bin_add_many (GST_BIN (pipeline), fakesrc, fakesink, NULL); + gst_element_link (fakesrc, fakesink); + + gst_element_set_state (pipeline, GST_STATE_PLAYING); + + g_timeout_add (500, &change_state_timeout, pipeline); + g_timeout_add (10000, &quit_timeout, NULL); + + while (!quit) { + g_main_context_iteration (NULL, TRUE); + } + + gst_object_unref (pipeline); +} + +GST_END_TEST; GST_START_TEST (test_stress) { @@ -67,6 +125,8 @@ stress_suite (void) suite_add_tcase (s, tc_chain); tcase_add_test (tc_chain, test_stress); + tcase_add_test (tc_chain, test_stress_preroll); + return s; }