appsrc: serialize custom events with buffers flow

Application may want to inject events to the pipeline and keep them
synchronized with the buffers flow.

Fix 

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/-/merge_requests/1046>
This commit is contained in:
Guillaume Desmottes 2021-02-22 13:17:18 +01:00
parent 0a657d6db5
commit c148ecf2cb
2 changed files with 135 additions and 21 deletions
gst-libs/gst/app
tests/check/elements

View file

@ -1014,6 +1014,17 @@ gst_app_src_send_event (GstElement * element, GstEvent * event)
g_mutex_unlock (&priv->mutex);
break;
default:
if (GST_EVENT_IS_SERIALIZED (event)) {
GST_DEBUG_OBJECT (appsrc, "queue event: %" GST_PTR_FORMAT, event);
g_mutex_lock (&priv->mutex);
gst_queue_array_push_tail (priv->queue, event);
if ((priv->wait_status & STREAM_WAITING))
g_cond_broadcast (&priv->cond);
g_mutex_unlock (&priv->mutex);
return TRUE;
}
break;
}
@ -1596,6 +1607,11 @@ gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size,
}
while (TRUE) {
/* Our lock may have been release to push events or caps, check out
* state in case we are now flushing. */
if (G_UNLIKELY (priv->flushing))
goto flushing;
/* return data as long as we have some */
if (!gst_queue_array_is_empty (priv->queue)) {
GstMiniObject *obj = gst_queue_array_pop_head (priv->queue);
@ -1618,13 +1634,6 @@ gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size,
if (caps_changed)
gst_app_src_do_negotiate (bsrc);
/* Lock has released so now may need
*- flushing
*- new caps change
*- check queue has data */
if (G_UNLIKELY (priv->flushing))
goto flushing;
/* Continue checks caps and queue */
continue;
}
@ -1661,24 +1670,56 @@ gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size,
*buf = NULL;
} else if (GST_IS_EVENT (obj)) {
GstEvent *event = GST_EVENT (obj);
const GstSegment *segment = NULL;
gst_event_parse_segment (event, &segment);
g_assert (segment != NULL);
GST_DEBUG_OBJECT (appsrc, "pop event %" GST_PTR_FORMAT, event);
if (!gst_segment_is_equal (&priv->current_segment, segment)) {
GST_DEBUG_OBJECT (appsrc,
"Update new segment %" GST_PTR_FORMAT, event);
if (!gst_base_src_new_segment (bsrc, segment)) {
GST_ERROR_OBJECT (appsrc,
"Couldn't set new segment %" GST_PTR_FORMAT, event);
gst_event_unref (event);
goto invalid_segment;
if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) {
const GstSegment *segment = NULL;
gst_event_parse_segment (event, &segment);
g_assert (segment != NULL);
if (!gst_segment_is_equal (&priv->current_segment, segment)) {
GST_DEBUG_OBJECT (appsrc,
"Update new segment %" GST_PTR_FORMAT, event);
if (!gst_base_src_new_segment (bsrc, segment)) {
GST_ERROR_OBJECT (appsrc,
"Couldn't set new segment %" GST_PTR_FORMAT, event);
gst_event_unref (event);
goto invalid_segment;
}
gst_segment_copy_into (segment, &priv->current_segment);
}
gst_segment_copy_into (segment, &priv->current_segment);
}
gst_event_unref (event);
gst_event_unref (event);
} else {
GstEvent *seg_event;
GstSegment last_segment = priv->last_segment;
/* event is serialized with the buffers flow */
/* We are about to push an event, release out lock */
g_mutex_unlock (&priv->mutex);
seg_event =
gst_pad_get_sticky_event (GST_BASE_SRC_PAD (appsrc),
GST_EVENT_SEGMENT, 0);
if (!seg_event) {
seg_event = gst_event_new_segment (&last_segment);
GST_DEBUG_OBJECT (appsrc,
"received serialized event before first buffer, push default segment %"
GST_PTR_FORMAT, seg_event);
gst_pad_push_event (GST_BASE_SRC_PAD (appsrc), seg_event);
} else {
gst_event_unref (seg_event);
}
gst_pad_push_event (GST_BASE_SRC_PAD (appsrc), event);
g_mutex_lock (&priv->mutex);
}
continue;
} else {
g_assert_not_reached ();

View file

@ -1368,6 +1368,78 @@ GST_START_TEST (test_appsrc_limits)
GST_END_TEST;
static GstFlowReturn
send_event_chain_func (GstPad * pad, GstObject * parent, GstBuffer * buf)
{
GST_LOG (" buffer # %3u", (guint) GST_BUFFER_OFFSET (buf));
fail_unless_equals_int (GST_BUFFER_OFFSET (buf), expect_offset);
++expect_offset;
gst_buffer_unref (buf);
if (expect_offset == 2) {
/* test is done */
g_mutex_lock (&check_mutex);
done = TRUE;
g_cond_signal (&check_cond);
g_mutex_unlock (&check_mutex);
}
return GST_FLOW_OK;
}
static gboolean
send_event_event_func (GstPad * pad, GstObject * parent, GstEvent * event)
{
GST_LOG ("event %" GST_PTR_FORMAT, event);
if (GST_EVENT_TYPE (event) == GST_EVENT_CUSTOM_DOWNSTREAM) {
/* this event should arrive after the first buffer */
fail_unless_equals_int (expect_offset, 1);
}
gst_event_unref (event);
return TRUE;
}
/* check that custom downstream events are properly serialized with buffers */
GST_START_TEST (test_appsrc_send_custom_event)
{
GstElement *src;
GstBuffer *buf;
src = setup_appsrc ();
ASSERT_SET_STATE (src, GST_STATE_PLAYING, GST_STATE_CHANGE_SUCCESS);
expect_offset = 0;
gst_pad_set_chain_function (mysinkpad, send_event_chain_func);
gst_pad_set_event_function (mysinkpad, send_event_event_func);
/* send a buffer, a custom event and a second buffer */
buf = gst_buffer_new_and_alloc (1);
GST_BUFFER_OFFSET (buf) = 0;
fail_unless (gst_app_src_push_buffer (GST_APP_SRC_CAST (src),
buf) == GST_FLOW_OK);
gst_element_send_event (src,
gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
gst_structure_new ("custom", NULL, NULL)));
buf = gst_buffer_new_and_alloc (2);
GST_BUFFER_OFFSET (buf) = 1;
fail_unless (gst_app_src_push_buffer (GST_APP_SRC_CAST (src),
buf) == GST_FLOW_OK);
g_mutex_lock (&check_mutex);
while (!done)
g_cond_wait (&check_cond, &check_mutex);
g_mutex_unlock (&check_mutex);
ASSERT_SET_STATE (src, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
cleanup_appsrc (src);
}
GST_END_TEST;
static Suite *
appsrc_suite (void)
{
@ -1382,6 +1454,7 @@ appsrc_suite (void)
tcase_add_test (tc_chain, test_appsrc_period_with_custom_segment);
tcase_add_test (tc_chain, test_appsrc_custom_segment_twice);
tcase_add_test (tc_chain, test_appsrc_limits);
tcase_add_test (tc_chain, test_appsrc_send_custom_event);
if (RUNNING_ON_VALGRIND)
tcase_add_loop_test (tc_chain, test_appsrc_block_deadlock, 0, 5);