basesink: Post a latency message whenever we're ready to answer the query

Usually the latency message is only posted whenever latency of an
element changes but that might be too early as the sinks might not be
able to query the latency at that point yet.

Similarly adding a new sink should cause latency reconfiguration once
that new sink is able to report its latency.

This fixes latency configuration in pipelines where webrtcbin is the
only "sink", i.e. it is used in a sendonly session. Before, the latency
would always be configured to 0.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/843>
This commit is contained in:
Sebastian Dröge 2021-06-24 11:28:28 +03:00
parent 81bd8913b5
commit ba294415d7
4 changed files with 52 additions and 25 deletions

View file

@ -1769,6 +1769,9 @@ gst_base_sink_commit_state (GstBaseSink * basesink)
next, pending, GST_STATE_VOID_PENDING)); next, pending, GST_STATE_VOID_PENDING));
} }
gst_element_post_message (GST_ELEMENT_CAST (basesink),
gst_message_new_latency (GST_OBJECT_CAST (basesink)));
GST_STATE_BROADCAST (basesink); GST_STATE_BROADCAST (basesink);
return TRUE; return TRUE;
@ -1797,6 +1800,9 @@ nothing_pending:
/* we can report latency queries now */ /* we can report latency queries now */
basesink->priv->have_latency = TRUE; basesink->priv->have_latency = TRUE;
GST_OBJECT_UNLOCK (basesink); GST_OBJECT_UNLOCK (basesink);
gst_element_post_message (GST_ELEMENT_CAST (basesink),
gst_message_new_latency (GST_OBJECT_CAST (basesink)));
return TRUE; return TRUE;
} }
stopping_unlocked: stopping_unlocked:
@ -5693,6 +5699,8 @@ gst_base_sink_change_state (GstElement * element, GstStateChange transition)
gst_message_new_async_start (GST_OBJECT_CAST (basesink))); gst_message_new_async_start (GST_OBJECT_CAST (basesink)));
} else { } else {
priv->have_latency = TRUE; priv->have_latency = TRUE;
gst_element_post_message (GST_ELEMENT_CAST (basesink),
gst_message_new_latency (GST_OBJECT_CAST (basesink)));
} }
GST_BASE_SINK_PREROLL_UNLOCK (basesink); GST_BASE_SINK_PREROLL_UNLOCK (basesink);
break; break;

View file

@ -42,7 +42,22 @@ pop_async_done (GstBus * bus)
} }
static void static void
pop_messages (GstBus * bus, int count) pop_latency (GstBus * bus)
{
GstMessage *message;
GST_DEBUG ("popping async-done message");
message = gst_bus_poll (bus, GST_MESSAGE_LATENCY, -1);
fail_unless (message && GST_MESSAGE_TYPE (message)
== GST_MESSAGE_LATENCY, "did not get GST_MESSAGE_LATENCY");
gst_message_unref (message);
GST_DEBUG ("popped message");
}
static void
pop_state_changed (GstBus * bus, int count)
{ {
GstMessage *message; GstMessage *message;
@ -561,7 +576,7 @@ GST_START_TEST (test_message_state_changed_children)
ASSERT_OBJECT_REFCOUNT (sink, "sink", 2); ASSERT_OBJECT_REFCOUNT (sink, "sink", 2);
ASSERT_OBJECT_REFCOUNT (pipeline, "pipeline", 2); ASSERT_OBJECT_REFCOUNT (pipeline, "pipeline", 2);
pop_messages (bus, 3); pop_state_changed (bus, 3);
fail_if (gst_bus_have_pending (bus), "unexpected pending messages"); fail_if (gst_bus_have_pending (bus), "unexpected pending messages");
ASSERT_OBJECT_REFCOUNT (bus, "bus", 2); ASSERT_OBJECT_REFCOUNT (bus, "bus", 2);
@ -599,18 +614,19 @@ GST_START_TEST (test_message_state_changed_children)
ASSERT_OBJECT_REFCOUNT (src, "src", 4); ASSERT_OBJECT_REFCOUNT (src, "src", 4);
/* refcount can be 4 if the bin is still processing the async_done message of /* refcount can be 4 if the bin is still processing the async_done message of
* the sink. */ * the sink. */
ASSERT_OBJECT_REFCOUNT_BETWEEN (sink, "sink", 2, 3); ASSERT_OBJECT_REFCOUNT_BETWEEN (sink, "sink", 2, 4);
/* 3 or 4 is valid, because the pipeline might still be posting /* 3 or 4 is valid, because the pipeline might still be posting
* its state_change message */ * its state_change message */
ASSERT_OBJECT_REFCOUNT_BETWEEN (pipeline, "pipeline", 3, 4); ASSERT_OBJECT_REFCOUNT_BETWEEN (pipeline, "pipeline", 3, 4);
pop_messages (bus, 3); pop_state_changed (bus, 3);
pop_async_done (bus); pop_async_done (bus);
pop_latency (bus);
fail_if ((gst_bus_pop (bus)) != NULL); fail_if ((gst_bus_pop (bus)) != NULL);
ASSERT_OBJECT_REFCOUNT (bus, "bus", 2); ASSERT_OBJECT_REFCOUNT_BETWEEN (bus, "bus", 2, 3);
ASSERT_OBJECT_REFCOUNT (src, "src", 1); ASSERT_OBJECT_REFCOUNT (src, "src", 1);
ASSERT_OBJECT_REFCOUNT (sink, "sink", 2); ASSERT_OBJECT_REFCOUNT_BETWEEN (sink, "sink", 2, 3);
ASSERT_OBJECT_REFCOUNT (pipeline, "pipeline", 1); ASSERT_OBJECT_REFCOUNT (pipeline, "pipeline", 1);
/* change state to PLAYING, spawning three messages */ /* change state to PLAYING, spawning three messages */
@ -632,7 +648,7 @@ GST_START_TEST (test_message_state_changed_children)
ASSERT_OBJECT_REFCOUNT_BETWEEN (sink, "sink", 2, 4); ASSERT_OBJECT_REFCOUNT_BETWEEN (sink, "sink", 2, 4);
ASSERT_OBJECT_REFCOUNT (pipeline, "pipeline", 3); ASSERT_OBJECT_REFCOUNT (pipeline, "pipeline", 3);
pop_messages (bus, 3); pop_state_changed (bus, 3);
fail_if ((gst_bus_pop (bus)) != NULL); fail_if ((gst_bus_pop (bus)) != NULL);
ASSERT_OBJECT_REFCOUNT (bus, "bus", 2); ASSERT_OBJECT_REFCOUNT (bus, "bus", 2);
@ -650,10 +666,10 @@ GST_START_TEST (test_message_state_changed_children)
/* each object is referenced by two messages, the source also has the /* each object is referenced by two messages, the source also has the
* stream-status message referencing it */ * stream-status message referencing it */
ASSERT_OBJECT_REFCOUNT (src, "src", 4); ASSERT_OBJECT_REFCOUNT (src, "src", 4);
ASSERT_OBJECT_REFCOUNT (sink, "sink", 3); ASSERT_OBJECT_REFCOUNT_BETWEEN (sink, "sink", 3, 4);
ASSERT_OBJECT_REFCOUNT (pipeline, "pipeline", 3); ASSERT_OBJECT_REFCOUNT (pipeline, "pipeline", 3);
pop_messages (bus, 6); pop_state_changed (bus, 6);
fail_if ((gst_bus_pop (bus)) != NULL); fail_if ((gst_bus_pop (bus)) != NULL);
ASSERT_OBJECT_REFCOUNT (src, "src", 1); ASSERT_OBJECT_REFCOUNT (src, "src", 1);
@ -706,8 +722,9 @@ GST_START_TEST (test_watch_for_state_change)
GST_CLOCK_TIME_NONE); GST_CLOCK_TIME_NONE);
fail_unless (ret == GST_STATE_CHANGE_SUCCESS); fail_unless (ret == GST_STATE_CHANGE_SUCCESS);
pop_messages (bus, 6); pop_state_changed (bus, 6);
pop_async_done (bus); pop_async_done (bus);
pop_latency (bus);
fail_unless (gst_bus_have_pending (bus) == FALSE, fail_unless (gst_bus_have_pending (bus) == FALSE,
"Unexpected messages on bus"); "Unexpected messages on bus");
@ -715,15 +732,17 @@ GST_START_TEST (test_watch_for_state_change)
ret = gst_element_set_state (GST_ELEMENT (bin), GST_STATE_PLAYING); ret = gst_element_set_state (GST_ELEMENT (bin), GST_STATE_PLAYING);
fail_unless (ret == GST_STATE_CHANGE_SUCCESS); fail_unless (ret == GST_STATE_CHANGE_SUCCESS);
pop_messages (bus, 3); pop_state_changed (bus, 3);
/* this one might return either SUCCESS or ASYNC, likely SUCCESS */ /* this one might return either SUCCESS or ASYNC, likely SUCCESS */
ret = gst_element_set_state (GST_ELEMENT (bin), GST_STATE_PAUSED); ret = gst_element_set_state (GST_ELEMENT (bin), GST_STATE_PAUSED);
gst_element_get_state (GST_ELEMENT (bin), NULL, NULL, GST_CLOCK_TIME_NONE); gst_element_get_state (GST_ELEMENT (bin), NULL, NULL, GST_CLOCK_TIME_NONE);
pop_messages (bus, 3); pop_state_changed (bus, 3);
if (ret == GST_STATE_CHANGE_ASYNC) if (ret == GST_STATE_CHANGE_ASYNC) {
pop_async_done (bus); pop_async_done (bus);
pop_latency (bus);
}
fail_unless (gst_bus_have_pending (bus) == FALSE, fail_unless (gst_bus_have_pending (bus) == FALSE,
"Unexpected messages on bus"); "Unexpected messages on bus");
@ -932,7 +951,7 @@ GST_START_TEST (test_children_state_change_order_flagged_sink)
ASSERT_STATE_CHANGE_MSG (bus, sink, GST_STATE_READY, GST_STATE_PAUSED, 107); ASSERT_STATE_CHANGE_MSG (bus, sink, GST_STATE_READY, GST_STATE_PAUSED, 107);
#else #else
pop_messages (bus, 2); /* pop remaining ready => paused messages off the bus */ pop_state_changed (bus, 2); /* pop remaining ready => paused messages off the bus */
ASSERT_STATE_CHANGE_MSG (bus, pipeline, GST_STATE_READY, GST_STATE_PAUSED, ASSERT_STATE_CHANGE_MSG (bus, pipeline, GST_STATE_READY, GST_STATE_PAUSED,
108); 108);
pop_async_done (bus); pop_async_done (bus);
@ -953,8 +972,8 @@ GST_START_TEST (test_children_state_change_order_flagged_sink)
fail_if (ret != GST_STATE_CHANGE_SUCCESS, "State change to READY failed"); fail_if (ret != GST_STATE_CHANGE_SUCCESS, "State change to READY failed");
/* TODO: do we need to check downwards state change order as well? */ /* TODO: do we need to check downwards state change order as well? */
pop_messages (bus, 4); /* pop playing => paused messages off the bus */ pop_state_changed (bus, 4); /* pop playing => paused messages off the bus */
pop_messages (bus, 4); /* pop paused => ready messages off the bus */ pop_state_changed (bus, 4); /* pop paused => ready messages off the bus */
while (GST_OBJECT_REFCOUNT_VALUE (pipeline) > 1) while (GST_OBJECT_REFCOUNT_VALUE (pipeline) > 1)
THREAD_SWITCH (); THREAD_SWITCH ();
@ -1037,7 +1056,7 @@ GST_START_TEST (test_children_state_change_order_semi_sink)
ASSERT_STATE_CHANGE_MSG (bus, src, GST_STATE_READY, GST_STATE_PAUSED, 206); ASSERT_STATE_CHANGE_MSG (bus, src, GST_STATE_READY, GST_STATE_PAUSED, 206);
ASSERT_STATE_CHANGE_MSG (bus, sink, GST_STATE_READY, GST_STATE_PAUSED, 207); ASSERT_STATE_CHANGE_MSG (bus, sink, GST_STATE_READY, GST_STATE_PAUSED, 207);
#else #else
pop_messages (bus, 2); /* pop remaining ready => paused messages off the bus */ pop_state_changed (bus, 2); /* pop remaining ready => paused messages off the bus */
ASSERT_STATE_CHANGE_MSG (bus, pipeline, GST_STATE_READY, GST_STATE_PAUSED, ASSERT_STATE_CHANGE_MSG (bus, pipeline, GST_STATE_READY, GST_STATE_PAUSED,
208); 208);
pop_async_done (bus); pop_async_done (bus);
@ -1057,8 +1076,8 @@ GST_START_TEST (test_children_state_change_order_semi_sink)
fail_if (ret != GST_STATE_CHANGE_SUCCESS, "State change to READY failed"); fail_if (ret != GST_STATE_CHANGE_SUCCESS, "State change to READY failed");
/* TODO: do we need to check downwards state change order as well? */ /* TODO: do we need to check downwards state change order as well? */
pop_messages (bus, 4); /* pop playing => paused messages off the bus */ pop_state_changed (bus, 4); /* pop playing => paused messages off the bus */
pop_messages (bus, 4); /* pop paused => ready messages off the bus */ pop_state_changed (bus, 4); /* pop paused => ready messages off the bus */
GST_DEBUG ("waiting for pipeline to reach refcount 1"); GST_DEBUG ("waiting for pipeline to reach refcount 1");
while (GST_OBJECT_REFCOUNT_VALUE (pipeline) > 1) while (GST_OBJECT_REFCOUNT_VALUE (pipeline) > 1)

View file

@ -92,7 +92,7 @@ GST_START_TEST (test_pipeline_unref)
run_pipeline (pipeline, s, run_pipeline (pipeline, s,
GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED | GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED |
GST_MESSAGE_STREAM_STATUS | GST_MESSAGE_ASYNC_DONE | GST_MESSAGE_STREAM_STATUS | GST_MESSAGE_ASYNC_DONE |
GST_MESSAGE_STREAM_START, GST_MESSAGE_EOS); GST_MESSAGE_STREAM_START | GST_MESSAGE_LATENCY, GST_MESSAGE_EOS);
while (GST_OBJECT_REFCOUNT_VALUE (src) > 1) while (GST_OBJECT_REFCOUNT_VALUE (src) > 1)
THREAD_SWITCH (); THREAD_SWITCH ();
ASSERT_OBJECT_REFCOUNT (src, "src", 1); ASSERT_OBJECT_REFCOUNT (src, "src", 1);

View file

@ -103,31 +103,31 @@ GST_START_TEST (test_2_elements)
run_pipeline (setup_pipeline (s), s, run_pipeline (setup_pipeline (s), s,
GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED | GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED |
GST_MESSAGE_STREAM_STATUS | GST_MESSAGE_ASYNC_DONE | GST_MESSAGE_STREAM_STATUS | GST_MESSAGE_ASYNC_DONE |
GST_MESSAGE_STREAM_START, GST_MESSAGE_UNKNOWN); GST_MESSAGE_STREAM_START | GST_MESSAGE_LATENCY, GST_MESSAGE_UNKNOWN);
s = "fakesrc can-activate-push=true ! fakesink can-activate-pull=false"; s = "fakesrc can-activate-push=true ! fakesink can-activate-pull=false";
run_pipeline (setup_pipeline (s), s, run_pipeline (setup_pipeline (s), s,
GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED | GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED |
GST_MESSAGE_STREAM_STATUS | GST_MESSAGE_ASYNC_DONE | GST_MESSAGE_STREAM_STATUS | GST_MESSAGE_ASYNC_DONE |
GST_MESSAGE_STREAM_START, GST_MESSAGE_UNKNOWN); GST_MESSAGE_STREAM_START | GST_MESSAGE_LATENCY, GST_MESSAGE_UNKNOWN);
s = "fakesrc can-activate-push=false num-buffers=10 ! fakesink can-activate-pull=true"; s = "fakesrc can-activate-push=false num-buffers=10 ! fakesink can-activate-pull=true";
run_pipeline (setup_pipeline (s), s, run_pipeline (setup_pipeline (s), s,
GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED | GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED |
GST_MESSAGE_STREAM_STATUS | GST_MESSAGE_ASYNC_DONE | GST_MESSAGE_STREAM_STATUS | GST_MESSAGE_ASYNC_DONE |
GST_MESSAGE_STREAM_START, GST_MESSAGE_EOS); GST_MESSAGE_STREAM_START | GST_MESSAGE_LATENCY, GST_MESSAGE_EOS);
s = "fakesrc can-activate-push=true num-buffers=10 ! fakesink can-activate-pull=false"; s = "fakesrc can-activate-push=true num-buffers=10 ! fakesink can-activate-pull=false";
run_pipeline (setup_pipeline (s), s, run_pipeline (setup_pipeline (s), s,
GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED | GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED |
GST_MESSAGE_STREAM_STATUS | GST_MESSAGE_ASYNC_DONE | GST_MESSAGE_STREAM_STATUS | GST_MESSAGE_ASYNC_DONE |
GST_MESSAGE_STREAM_START, GST_MESSAGE_EOS); GST_MESSAGE_STREAM_START | GST_MESSAGE_LATENCY, GST_MESSAGE_EOS);
s = "fakesrc can-activate-push=false ! fakesink can-activate-pull=false"; s = "fakesrc can-activate-push=false ! fakesink can-activate-pull=false";
ASSERT_CRITICAL (run_pipeline (setup_pipeline (s), s, ASSERT_CRITICAL (run_pipeline (setup_pipeline (s), s,
GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED | GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED |
GST_MESSAGE_STREAM_STATUS | GST_MESSAGE_ASYNC_DONE | GST_MESSAGE_STREAM_STATUS | GST_MESSAGE_ASYNC_DONE |
GST_MESSAGE_STREAM_START, GST_MESSAGE_UNKNOWN)); GST_MESSAGE_STREAM_START | GST_MESSAGE_LATENCY, GST_MESSAGE_UNKNOWN));
} }
GST_END_TEST; GST_END_TEST;