mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2025-03-29 20:35:40 +00:00
rtponviftimestamp: Do not rearange order of data
If a buffer or a buffer list is cached, no events serialized with the data stream should get through. The cached buffers and events should be purged when we stop flushing. https://bugzilla.gnome.org/show_bug.cgi?id=757688
This commit is contained in:
parent
fd0ca0a972
commit
a58826292e
3 changed files with 479 additions and 60 deletions
|
@ -42,6 +42,11 @@ static GstFlowReturn gst_rtp_onvif_timestamp_chain (GstPad * pad,
|
|||
static GstFlowReturn gst_rtp_onvif_timestamp_chain_list (GstPad * pad,
|
||||
GstObject * parent, GstBufferList * list);
|
||||
|
||||
static GstFlowReturn handle_and_push_buffer (GstRtpOnvifTimestamp * self,
|
||||
GstBuffer * buf, gboolean end_contiguous);
|
||||
static GstFlowReturn handle_and_push_buffer_list (GstRtpOnvifTimestamp * self,
|
||||
GstBufferList * list, gboolean end_contiguous);
|
||||
|
||||
static GstStaticPadTemplate sink_template_factory =
|
||||
GST_STATIC_PAD_TEMPLATE ("sink",
|
||||
GST_PAD_SINK,
|
||||
|
@ -112,6 +117,65 @@ gst_rtp_onvif_timestamp_set_property (GObject * object,
|
|||
}
|
||||
}
|
||||
|
||||
/* send cached buffer or list, and events, if present */
|
||||
static GstFlowReturn
|
||||
send_cached_buffer_and_events (GstRtpOnvifTimestamp * self,
|
||||
gboolean end_contiguous)
|
||||
{
|
||||
GstFlowReturn ret = GST_FLOW_OK;
|
||||
|
||||
g_assert (!(self->buffer && self->list));
|
||||
|
||||
if (self->buffer) {
|
||||
GST_DEBUG_OBJECT (self, "pushing %" GST_PTR_FORMAT, self->buffer);
|
||||
ret = handle_and_push_buffer (self, self->buffer, end_contiguous);
|
||||
self->buffer = NULL;
|
||||
}
|
||||
if (self->list) {
|
||||
GST_DEBUG_OBJECT (self, "pushing %" GST_PTR_FORMAT, self->list);
|
||||
ret = handle_and_push_buffer_list (self, self->list, end_contiguous);
|
||||
self->list = NULL;
|
||||
}
|
||||
|
||||
if (ret != GST_FLOW_OK)
|
||||
goto out;
|
||||
|
||||
while (!g_queue_is_empty (self->event_queue)) {
|
||||
GstEvent *event;
|
||||
|
||||
event = GST_EVENT_CAST (g_queue_pop_head (self->event_queue));
|
||||
GST_LOG_OBJECT (self->sinkpad, "sending %" GST_PTR_FORMAT, event);
|
||||
(void) gst_pad_send_event (self->sinkpad, event);
|
||||
}
|
||||
|
||||
out:
|
||||
return ret;
|
||||
}
|
||||
|
||||
static void
|
||||
purge_cached_buffer_and_events (GstRtpOnvifTimestamp * self)
|
||||
{
|
||||
g_assert (!(self->buffer && self->list));
|
||||
|
||||
if (self->buffer) {
|
||||
GST_DEBUG_OBJECT (self, "purging %" GST_PTR_FORMAT, self->buffer);
|
||||
gst_buffer_unref (self->buffer);
|
||||
self->buffer = NULL;
|
||||
}
|
||||
if (self->list) {
|
||||
GST_DEBUG_OBJECT (self, "purging %" GST_PTR_FORMAT, self->list);
|
||||
gst_buffer_list_unref (self->list);
|
||||
self->list = NULL;
|
||||
}
|
||||
|
||||
while (!g_queue_is_empty (self->event_queue)) {
|
||||
GstEvent *event;
|
||||
|
||||
event = GST_EVENT_CAST (g_queue_pop_head (self->event_queue));
|
||||
gst_event_unref (event);
|
||||
}
|
||||
}
|
||||
|
||||
static GstStateChangeReturn
|
||||
gst_rtp_onvif_timestamp_change_state (GstElement * element,
|
||||
GstStateChange transition)
|
||||
|
@ -121,6 +185,7 @@ gst_rtp_onvif_timestamp_change_state (GstElement * element,
|
|||
|
||||
switch (transition) {
|
||||
case GST_STATE_CHANGE_PAUSED_TO_READY:
|
||||
purge_cached_buffer_and_events (self);
|
||||
gst_segment_init (&self->segment, GST_FORMAT_UNDEFINED);
|
||||
break;
|
||||
default:
|
||||
|
@ -161,10 +226,7 @@ gst_rtp_onvif_timestamp_finalize (GObject * object)
|
|||
{
|
||||
GstRtpOnvifTimestamp *self = GST_RTP_ONVIF_TIMESTAMP (object);
|
||||
|
||||
if (self->buffer)
|
||||
gst_buffer_unref (self->buffer);
|
||||
if (self->list)
|
||||
gst_buffer_list_unref (self->list);
|
||||
g_queue_free (self->event_queue);
|
||||
|
||||
G_OBJECT_CLASS (gst_rtp_onvif_timestamp_parent_class)->finalize (object);
|
||||
}
|
||||
|
@ -219,42 +281,62 @@ gst_rtp_onvif_timestamp_class_init (GstRtpOnvifTimestampClass * klass)
|
|||
0, "ONVIF NTP timestamps RTP extension");
|
||||
}
|
||||
|
||||
static GstFlowReturn handle_and_push_buffer (GstRtpOnvifTimestamp * self,
|
||||
GstBuffer * buf, gboolean end_contiguous);
|
||||
static GstFlowReturn handle_and_push_buffer_list (GstRtpOnvifTimestamp * self,
|
||||
GstBufferList * list, gboolean end_contiguous);
|
||||
|
||||
static gboolean
|
||||
gst_rtp_onvif_timestamp_sink_event (GstPad * pad, GstObject * parent,
|
||||
GstEvent * event)
|
||||
{
|
||||
GstRtpOnvifTimestamp *self = GST_RTP_ONVIF_TIMESTAMP (parent);
|
||||
gboolean drop = FALSE;
|
||||
gboolean ret = TRUE;
|
||||
|
||||
GST_DEBUG_OBJECT (pad, "handling event %s", GST_EVENT_TYPE_NAME (event));
|
||||
|
||||
/* handle serialized events, which, should not be enqueued */
|
||||
switch (GST_EVENT_TYPE (event)) {
|
||||
case GST_EVENT_SEGMENT:
|
||||
gst_event_copy_segment (event, &self->segment);
|
||||
break;
|
||||
case GST_EVENT_FLUSH_STOP:
|
||||
gst_segment_init (&self->segment, GST_FORMAT_UNDEFINED);
|
||||
break;
|
||||
case GST_EVENT_EOS:
|
||||
/* Push pending buffers, if any */
|
||||
if (self->buffer) {
|
||||
handle_and_push_buffer (self, self->buffer, TRUE);
|
||||
self->buffer = NULL;
|
||||
}
|
||||
if (self->list) {
|
||||
handle_and_push_buffer_list (self, self->list, TRUE);
|
||||
self->list = NULL;
|
||||
{
|
||||
GstFlowReturn res;
|
||||
/* Push pending buffers, if any */
|
||||
res = send_cached_buffer_and_events (self, TRUE);
|
||||
if (res != GST_FLOW_OK) {
|
||||
drop = TRUE;
|
||||
ret = FALSE;
|
||||
goto out;
|
||||
}
|
||||
break;
|
||||
}
|
||||
case GST_EVENT_FLUSH_STOP:
|
||||
purge_cached_buffer_and_events (self);
|
||||
gst_segment_init (&self->segment, GST_FORMAT_UNDEFINED);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
return gst_pad_event_default (pad, parent, event);
|
||||
/* enqueue serialized events if there is a cached buffer */
|
||||
if (GST_EVENT_IS_SERIALIZED (event) && (self->buffer || self->list)) {
|
||||
GST_DEBUG ("enqueueing serialized event");
|
||||
g_queue_push_tail (self->event_queue, event);
|
||||
event = NULL;
|
||||
goto out;
|
||||
}
|
||||
|
||||
/* handle rest of the events */
|
||||
switch (GST_EVENT_TYPE (event)) {
|
||||
case GST_EVENT_SEGMENT:
|
||||
gst_event_copy_segment (event, &self->segment);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
out:
|
||||
if (drop)
|
||||
gst_event_unref (event);
|
||||
else if (event)
|
||||
ret = gst_pad_event_default (pad, parent, event);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
static void
|
||||
|
@ -278,10 +360,11 @@ gst_rtp_onvif_timestamp_init (GstRtpOnvifTimestamp * self)
|
|||
self->prop_ntp_offset = DEFAULT_NTP_OFFSET;
|
||||
self->prop_set_e_bit = DEFAULT_SET_E_BIT;
|
||||
|
||||
gst_segment_init (&self->segment, GST_FORMAT_UNDEFINED);
|
||||
|
||||
self->event_queue = g_queue_new ();
|
||||
self->buffer = NULL;
|
||||
self->list = NULL;
|
||||
|
||||
gst_segment_init (&self->segment, GST_FORMAT_UNDEFINED);
|
||||
}
|
||||
|
||||
#define EXTENSION_ID 0xABAC
|
||||
|
@ -414,7 +497,7 @@ done:
|
|||
return TRUE;
|
||||
}
|
||||
|
||||
/* @buf: (transfer all) */
|
||||
/* @buf: (transfer full) */
|
||||
static GstFlowReturn
|
||||
handle_and_push_buffer (GstRtpOnvifTimestamp * self, GstBuffer * buf,
|
||||
gboolean end_contiguous)
|
||||
|
@ -439,20 +522,15 @@ gst_rtp_onvif_timestamp_chain (GstPad * pad, GstObject * parent,
|
|||
return handle_and_push_buffer (self, buf, FALSE);
|
||||
}
|
||||
|
||||
/* We have to wait for the *next* buffer before pushing this one */
|
||||
/* send any previously cached item(s), this leaves an empty queue */
|
||||
result = send_cached_buffer_and_events (self, GST_BUFFER_IS_DISCONT (buf));
|
||||
|
||||
if (self->buffer) {
|
||||
/* push the *previous* buffer received */
|
||||
result = handle_and_push_buffer (self, self->buffer,
|
||||
GST_BUFFER_IS_DISCONT (buf));
|
||||
}
|
||||
|
||||
/* Transfer ownership */
|
||||
/* enqueue the new item, as the only item in the queue */
|
||||
self->buffer = buf;
|
||||
return result;
|
||||
}
|
||||
|
||||
/* @buf: (transfer all) */
|
||||
/* @buf: (transfer full) */
|
||||
static GstFlowReturn
|
||||
handle_and_push_buffer_list (GstRtpOnvifTimestamp * self,
|
||||
GstBufferList * list, gboolean end_contiguous)
|
||||
|
@ -477,24 +555,18 @@ gst_rtp_onvif_timestamp_chain_list (GstPad * pad, GstObject * parent,
|
|||
GstBufferList * list)
|
||||
{
|
||||
GstRtpOnvifTimestamp *self = GST_RTP_ONVIF_TIMESTAMP (parent);
|
||||
GstFlowReturn result = GST_FLOW_OK;
|
||||
GstBuffer *buf;
|
||||
GstFlowReturn result = GST_FLOW_OK;
|
||||
|
||||
if (!self->prop_set_e_bit) {
|
||||
return handle_and_push_buffer_list (self, list, FALSE);
|
||||
}
|
||||
|
||||
/* We have to wait for the *next* list before pushing this one */
|
||||
/* send any previously cached item(s), this leaves an empty queue */
|
||||
buf = gst_buffer_list_get (list, 0);
|
||||
result = send_cached_buffer_and_events (self, GST_BUFFER_IS_DISCONT (buf));
|
||||
|
||||
if (self->list) {
|
||||
/* push the *previous* list received */
|
||||
buf = gst_buffer_list_get (list, 0);
|
||||
|
||||
result = handle_and_push_buffer_list (self, self->list,
|
||||
GST_BUFFER_IS_DISCONT (buf));
|
||||
}
|
||||
|
||||
/* Transfer ownership */
|
||||
/* enqueue the new item, as the only item in the queue */
|
||||
self->list = list;
|
||||
return result;
|
||||
}
|
||||
|
|
|
@ -55,8 +55,8 @@ struct _GstRtpOnvifTimestamp {
|
|||
GstClockTime ntp_offset;
|
||||
|
||||
GstSegment segment;
|
||||
gboolean received_segment;
|
||||
/* Buffer waiting to be handled, only used if prop_set_e_bit is TRUE */
|
||||
GQueue *event_queue;
|
||||
GstBuffer *buffer;
|
||||
GstBufferList *list;
|
||||
};
|
||||
|
|
|
@ -27,6 +27,11 @@
|
|||
static GstElement *element;
|
||||
static GstPad *mysrcpad;
|
||||
static GstPad *mysinkpad;
|
||||
/* These are global mainly because they are used from the setup/cleanup
|
||||
* fixture functions */
|
||||
static gulong myprobe;
|
||||
static GList *mypushedevents;
|
||||
static GList *myreceivedevents;
|
||||
|
||||
static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
|
||||
GST_PAD_SINK,
|
||||
|
@ -39,8 +44,86 @@ static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
|
|||
GST_STATIC_CAPS ("application/x-rtp")
|
||||
);
|
||||
|
||||
#define NTP_OFFSET ((guint64) 1245)
|
||||
#define TIMESTAMP ((GstClockTime) 42)
|
||||
#define NTP_OFFSET ((guint64) 1245)
|
||||
#define TIMESTAMP ((GstClockTime)42)
|
||||
#define COMPARE TRUE
|
||||
#define NO_COMPARE FALSE
|
||||
|
||||
static GstPadProbeReturn
|
||||
event_probe (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
|
||||
{
|
||||
GstEvent *event = GST_PAD_PROBE_INFO_EVENT (info);
|
||||
|
||||
GST_DEBUG ("got %s", GST_EVENT_TYPE_NAME (event));
|
||||
myreceivedevents = g_list_append (myreceivedevents, gst_event_ref (event));
|
||||
|
||||
return GST_PAD_PROBE_OK;
|
||||
}
|
||||
|
||||
static GstEvent *
|
||||
create_event (GstEventType type)
|
||||
{
|
||||
GstEvent *event = NULL;
|
||||
|
||||
switch (type) {
|
||||
case GST_EVENT_CUSTOM_DOWNSTREAM:
|
||||
event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
|
||||
gst_structure_new ("x-app/test", "test-field", G_TYPE_STRING,
|
||||
"test-value", NULL));
|
||||
break;
|
||||
case GST_EVENT_CUSTOM_DOWNSTREAM_OOB:
|
||||
event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM_OOB,
|
||||
gst_structure_new ("x-app/test", "test-field", G_TYPE_STRING,
|
||||
"test-value", NULL));
|
||||
break;
|
||||
case GST_EVENT_EOS:
|
||||
event = gst_event_new_eos ();
|
||||
break;
|
||||
default:
|
||||
g_assert_not_reached ();
|
||||
break;
|
||||
}
|
||||
|
||||
return event;
|
||||
}
|
||||
|
||||
static void
|
||||
create_and_push_event (GstEventType type)
|
||||
{
|
||||
GstEvent *event = create_event (type);
|
||||
|
||||
mypushedevents = g_list_append (mypushedevents, event);
|
||||
fail_unless (gst_pad_push_event (mysrcpad, event));
|
||||
}
|
||||
|
||||
static void
|
||||
check_and_clear_events (gint expected, gboolean compare)
|
||||
{
|
||||
GList *p;
|
||||
GList *r;
|
||||
|
||||
/* verify that there's as many queued events as expected */
|
||||
fail_unless_equals_int (g_list_length (myreceivedevents), expected);
|
||||
|
||||
if (compare) {
|
||||
fail_unless_equals_int (expected, g_list_length (mypushedevents));
|
||||
|
||||
/* verify that the events are queued in the expected order */
|
||||
r = myreceivedevents;
|
||||
p = mypushedevents;
|
||||
|
||||
while (p != NULL) {
|
||||
fail_unless_equals_pointer (p->data, r->data);
|
||||
p = g_list_next (p);
|
||||
r = g_list_next (r);
|
||||
}
|
||||
}
|
||||
|
||||
g_list_free_full (myreceivedevents, (GDestroyNotify)gst_event_unref);
|
||||
myreceivedevents = NULL;
|
||||
g_list_free (mypushedevents);
|
||||
mypushedevents = NULL;
|
||||
}
|
||||
|
||||
static void
|
||||
setup (void)
|
||||
|
@ -54,7 +137,6 @@ setup (void)
|
|||
gst_pad_set_active (mysrcpad, TRUE);
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
cleanup (void)
|
||||
{
|
||||
|
@ -70,6 +152,30 @@ cleanup (void)
|
|||
|
||||
gst_check_teardown_element (element);
|
||||
element = NULL;
|
||||
|
||||
gst_check_drop_buffers ();
|
||||
}
|
||||
|
||||
static void
|
||||
setup_with_event (void)
|
||||
{
|
||||
setup ();
|
||||
|
||||
myprobe = gst_pad_add_probe (mysinkpad,
|
||||
GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM, event_probe, NULL, NULL);
|
||||
myreceivedevents = NULL;
|
||||
mypushedevents = NULL;
|
||||
}
|
||||
|
||||
static void
|
||||
cleanup_with_event (void)
|
||||
{
|
||||
gst_pad_remove_probe (mysinkpad, myprobe);
|
||||
myprobe = 0;
|
||||
myreceivedevents = NULL;
|
||||
mypushedevents = NULL;
|
||||
|
||||
cleanup ();
|
||||
}
|
||||
|
||||
static void
|
||||
|
@ -290,22 +396,263 @@ GST_START_TEST (test_apply_e_bit)
|
|||
|
||||
GST_END_TEST;
|
||||
|
||||
GST_START_TEST (test_flushing)
|
||||
{
|
||||
GstBuffer *buffer;
|
||||
|
||||
/* set the e-bit, so the element use caching */
|
||||
g_object_set (element, "set-e-bit", TRUE, NULL);
|
||||
/* set the ntp-offset, since no one will provide a clock */
|
||||
g_object_set (element, "ntp-offset", NTP_OFFSET, NULL);
|
||||
|
||||
ASSERT_SET_STATE (element, GST_STATE_PLAYING, GST_STATE_CHANGE_SUCCESS);
|
||||
gst_check_setup_events (mysrcpad, element, NULL, GST_FORMAT_TIME);
|
||||
|
||||
/* create and push the first buffer */
|
||||
buffer = create_rtp_buffer (TIMESTAMP, TRUE, FALSE);
|
||||
fail_unless_equals_int (gst_pad_push (mysrcpad, buffer), GST_FLOW_OK);
|
||||
|
||||
/* no buffers should have made it through */
|
||||
fail_unless_equals_int (g_list_length (buffers), 0);
|
||||
|
||||
/* flush the element */
|
||||
fail_unless (gst_pad_push_event (mysrcpad, gst_event_new_flush_start ()));
|
||||
fail_unless (gst_pad_push_event (mysrcpad, gst_event_new_flush_stop (FALSE)));
|
||||
|
||||
/* resend events */
|
||||
gst_check_setup_events (mysrcpad, element, NULL, GST_FORMAT_TIME);
|
||||
|
||||
/* create and push a second buffer */
|
||||
buffer = create_rtp_buffer (TIMESTAMP + 1, TRUE, FALSE);
|
||||
fail_unless_equals_int (gst_pad_push (mysrcpad, buffer), GST_FLOW_OK);
|
||||
|
||||
/* still no buffers should have made it through (the first one should have
|
||||
* been dropped during flushing) */
|
||||
fail_unless_equals_int (g_list_length (buffers), 0);
|
||||
|
||||
ASSERT_SET_STATE (element, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
|
||||
}
|
||||
|
||||
GST_END_TEST;
|
||||
|
||||
GST_START_TEST (test_reusable_element_no_e_bit)
|
||||
{
|
||||
GstBuffer *buffer;
|
||||
|
||||
/* set the ntp-offset, since no one will provide a clock */
|
||||
g_object_set (element, "ntp-offset", NTP_OFFSET, NULL);
|
||||
|
||||
ASSERT_SET_STATE (element, GST_STATE_PLAYING, GST_STATE_CHANGE_SUCCESS);
|
||||
gst_check_setup_events (mysrcpad, element, NULL, GST_FORMAT_TIME);
|
||||
|
||||
/* create and push the first buffer */
|
||||
buffer = create_rtp_buffer (TIMESTAMP, TRUE, FALSE);
|
||||
fail_unless_equals_int (gst_pad_push (mysrcpad, buffer), GST_FLOW_OK);
|
||||
|
||||
/* create and push a second buffer */
|
||||
buffer = create_rtp_buffer (TIMESTAMP + 1, TRUE, FALSE);
|
||||
fail_unless_equals_int (gst_pad_push (mysrcpad, buffer), GST_FLOW_OK);
|
||||
|
||||
/* create and push a third buffer */
|
||||
buffer = create_rtp_buffer (TIMESTAMP + 2, TRUE, FALSE);
|
||||
fail_unless_equals_int (gst_pad_push (mysrcpad, buffer), GST_FLOW_OK);
|
||||
|
||||
ASSERT_SET_STATE (element, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
|
||||
|
||||
fail_unless_equals_int (g_list_length (buffers), 3);
|
||||
|
||||
ASSERT_SET_STATE (element, GST_STATE_PLAYING, GST_STATE_CHANGE_SUCCESS);
|
||||
gst_check_setup_events (mysrcpad, element, NULL, GST_FORMAT_TIME);
|
||||
|
||||
/* create and push the first buffer */
|
||||
buffer = create_rtp_buffer (TIMESTAMP, TRUE, FALSE);
|
||||
fail_unless_equals_int (gst_pad_push (mysrcpad, buffer), GST_FLOW_OK);
|
||||
|
||||
/* create and push a second buffer */
|
||||
buffer = create_rtp_buffer (TIMESTAMP + 1, TRUE, FALSE);
|
||||
fail_unless_equals_int (gst_pad_push (mysrcpad, buffer), GST_FLOW_OK);
|
||||
|
||||
/* create and push a third buffer */
|
||||
buffer = create_rtp_buffer (TIMESTAMP + 2, TRUE, FALSE);
|
||||
fail_unless_equals_int (gst_pad_push (mysrcpad, buffer), GST_FLOW_OK);
|
||||
|
||||
ASSERT_SET_STATE (element, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
|
||||
|
||||
fail_unless_equals_int (g_list_length (buffers), 6);
|
||||
}
|
||||
|
||||
GST_END_TEST;
|
||||
|
||||
GST_START_TEST (test_reusable_element_e_bit)
|
||||
{
|
||||
GstBuffer *buffer;
|
||||
|
||||
/* set the e-bit, so the element use caching */
|
||||
g_object_set (element, "set-e-bit", TRUE, NULL);
|
||||
/* set the ntp-offset, since no one will provide a clock */
|
||||
g_object_set (element, "ntp-offset", NTP_OFFSET, NULL);
|
||||
|
||||
ASSERT_SET_STATE (element, GST_STATE_PLAYING, GST_STATE_CHANGE_SUCCESS);
|
||||
gst_check_setup_events (mysrcpad, element, NULL, GST_FORMAT_TIME);
|
||||
|
||||
/* create and push the first buffer */
|
||||
buffer = create_rtp_buffer (TIMESTAMP, TRUE, FALSE);
|
||||
fail_unless_equals_int (gst_pad_push (mysrcpad, buffer), GST_FLOW_OK);
|
||||
|
||||
/* create and push a second buffer */
|
||||
buffer = create_rtp_buffer (TIMESTAMP + 1, TRUE, FALSE);
|
||||
fail_unless_equals_int (gst_pad_push (mysrcpad, buffer), GST_FLOW_OK);
|
||||
|
||||
/* create and push a third buffer */
|
||||
buffer = create_rtp_buffer (TIMESTAMP + 2, TRUE, FALSE);
|
||||
fail_unless_equals_int (gst_pad_push (mysrcpad, buffer), GST_FLOW_OK);
|
||||
|
||||
ASSERT_SET_STATE (element, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
|
||||
|
||||
fail_unless_equals_int (g_list_length (buffers), 2);
|
||||
|
||||
ASSERT_SET_STATE (element, GST_STATE_PLAYING, GST_STATE_CHANGE_SUCCESS);
|
||||
gst_check_setup_events (mysrcpad, element, NULL, GST_FORMAT_TIME);
|
||||
|
||||
/* create and push the first buffer */
|
||||
buffer = create_rtp_buffer (TIMESTAMP, TRUE, FALSE);
|
||||
fail_unless_equals_int (gst_pad_push (mysrcpad, buffer), GST_FLOW_OK);
|
||||
|
||||
/* create and push a second buffer */
|
||||
buffer = create_rtp_buffer (TIMESTAMP + 1, TRUE, FALSE);
|
||||
fail_unless_equals_int (gst_pad_push (mysrcpad, buffer), GST_FLOW_OK);
|
||||
|
||||
/* create and push a third buffer */
|
||||
buffer = create_rtp_buffer (TIMESTAMP + 2, TRUE, FALSE);
|
||||
fail_unless_equals_int (gst_pad_push (mysrcpad, buffer), GST_FLOW_OK);
|
||||
|
||||
ASSERT_SET_STATE (element, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
|
||||
|
||||
fail_unless_equals_int (g_list_length (buffers), 4);
|
||||
}
|
||||
|
||||
GST_END_TEST;
|
||||
|
||||
GST_START_TEST (test_serialized_events)
|
||||
{
|
||||
GstBuffer *buffer;
|
||||
|
||||
/* we want the e-bit set so that buffers are cached */
|
||||
g_object_set (element, "set-e-bit", TRUE, NULL);
|
||||
g_object_set (element, "ntp-offset", NTP_OFFSET, NULL);
|
||||
|
||||
ASSERT_SET_STATE (element, GST_STATE_PLAYING, GST_STATE_CHANGE_SUCCESS);
|
||||
|
||||
/* send intitial events (stream-start and segment) */
|
||||
gst_check_setup_events (mysrcpad, element, NULL, GST_FORMAT_TIME);
|
||||
check_and_clear_events (2, NO_COMPARE);
|
||||
|
||||
/* events received while no buffer is cached should be forwarded */
|
||||
create_and_push_event (GST_EVENT_CUSTOM_DOWNSTREAM);
|
||||
check_and_clear_events (1, NO_COMPARE);
|
||||
|
||||
/* create and push the first buffer, which should be cached */
|
||||
buffer = create_rtp_buffer (TIMESTAMP, TRUE, FALSE);
|
||||
fail_unless_equals_int (gst_pad_push (mysrcpad, buffer), GST_FLOW_OK);
|
||||
fail_unless_equals_int (g_list_length (buffers), 0);
|
||||
/* serialized events should be queued when there's a buffer cached */
|
||||
create_and_push_event (GST_EVENT_CUSTOM_DOWNSTREAM);
|
||||
fail_unless_equals_int (g_list_length (myreceivedevents), 0);
|
||||
/* there's still a buffer cached... */
|
||||
create_and_push_event (GST_EVENT_CUSTOM_DOWNSTREAM);
|
||||
fail_unless_equals_int (g_list_length (myreceivedevents), 0);
|
||||
|
||||
/* receiving a new buffer should let the first through, along with the
|
||||
* queued serialized events */
|
||||
buffer = create_rtp_buffer (TIMESTAMP, TRUE, FALSE);
|
||||
fail_unless_equals_int (gst_pad_push (mysrcpad, buffer), GST_FLOW_OK);
|
||||
fail_unless_equals_int (g_list_length (buffers), 1);
|
||||
check_and_clear_events (2, COMPARE);
|
||||
|
||||
/* there's still a buffer cached, a new serialized event should be quueud */
|
||||
create_and_push_event (GST_EVENT_CUSTOM_DOWNSTREAM);
|
||||
fail_unless_equals_int (g_list_length (myreceivedevents), 0);
|
||||
|
||||
/* when receiving an EOS cached buffer and queued events should be forwarded */
|
||||
create_and_push_event (GST_EVENT_EOS);
|
||||
check_and_clear_events (2, COMPARE);
|
||||
}
|
||||
|
||||
GST_END_TEST;
|
||||
|
||||
GST_START_TEST (test_non_serialized_events)
|
||||
{
|
||||
GstEvent *event;
|
||||
GstBuffer *buffer;
|
||||
|
||||
/* we want the e-bit set so that buffers are cached */
|
||||
g_object_set (element, "set-e-bit", TRUE, NULL);
|
||||
g_object_set (element, "ntp-offset", NTP_OFFSET, NULL);
|
||||
|
||||
ASSERT_SET_STATE (element, GST_STATE_PLAYING, GST_STATE_CHANGE_SUCCESS);
|
||||
|
||||
/* send intitial events (stream-start and segment) */
|
||||
gst_check_setup_events (mysrcpad, element, NULL, GST_FORMAT_TIME);
|
||||
fail_unless_equals_int (g_list_length (myreceivedevents), 2);
|
||||
check_and_clear_events (2, NO_COMPARE);
|
||||
|
||||
/* events received while no buffer is cached should be forwarded */
|
||||
create_and_push_event (GST_EVENT_CUSTOM_DOWNSTREAM_OOB);
|
||||
check_and_clear_events (1, COMPARE);
|
||||
|
||||
/* create and push the first buffer, which should be cached */
|
||||
buffer = create_rtp_buffer (TIMESTAMP, TRUE, FALSE);
|
||||
fail_unless_equals_int (gst_pad_push (mysrcpad, buffer), GST_FLOW_OK);
|
||||
fail_unless_equals_int (g_list_length (buffers), 0);
|
||||
/* non-serialized events should be forwarded regardless of whether
|
||||
* there is a cached buffer */
|
||||
create_and_push_event (GST_EVENT_CUSTOM_DOWNSTREAM_OOB);
|
||||
check_and_clear_events (1, COMPARE);
|
||||
|
||||
/* there's still a buffer cached, push a serialized event and make sure
|
||||
* it's queued */
|
||||
create_and_push_event (GST_EVENT_CUSTOM_DOWNSTREAM);
|
||||
fail_unless_equals_int (g_list_length (myreceivedevents), 0);
|
||||
/* non-serialized events should be forwarded regardless of whether there
|
||||
* are serialized events queued, thus the g_list_prepend below */
|
||||
event = create_event (GST_EVENT_CUSTOM_DOWNSTREAM_OOB);
|
||||
mypushedevents = g_list_prepend (mypushedevents, event);
|
||||
fail_unless (gst_pad_push_event (mysrcpad, event));
|
||||
fail_unless_equals_int (g_list_length (myreceivedevents), 1);
|
||||
|
||||
/* when receiving an EOS cached buffer and queued events should be forwarded */
|
||||
create_and_push_event (GST_EVENT_EOS);
|
||||
fail_unless_equals_int (g_list_length (buffers), 1);
|
||||
check_and_clear_events (3, COMPARE);
|
||||
}
|
||||
|
||||
GST_END_TEST;
|
||||
|
||||
static Suite *
|
||||
onviftimestamp_suite (void)
|
||||
{
|
||||
Suite *s = suite_create ("onviftimestamp");
|
||||
TCase *tc_chain;
|
||||
TCase *tc_general, *tc_events;
|
||||
|
||||
tc_chain = tcase_create ("apply");
|
||||
tc_general = tcase_create ("apply");
|
||||
suite_add_tcase (s, tc_general);
|
||||
tcase_add_checked_fixture (tc_general, setup, cleanup);
|
||||
|
||||
suite_add_tcase (s, tc_chain);
|
||||
tcase_add_checked_fixture (tc_chain, setup, cleanup);
|
||||
tcase_add_test (tc_general, test_apply_discont);
|
||||
tcase_add_test (tc_general, test_apply_not_discont);
|
||||
tcase_add_test (tc_general, test_apply_clean_point);
|
||||
tcase_add_test (tc_general, test_apply_no_e_bit);
|
||||
tcase_add_test (tc_general, test_apply_e_bit);
|
||||
tcase_add_test (tc_general, test_flushing);
|
||||
tcase_add_test (tc_general, test_reusable_element_no_e_bit);
|
||||
tcase_add_test (tc_general, test_reusable_element_e_bit);
|
||||
|
||||
tcase_add_test (tc_chain, test_apply_discont);
|
||||
tcase_add_test (tc_chain, test_apply_not_discont);
|
||||
tcase_add_test (tc_chain, test_apply_clean_point);
|
||||
tcase_add_test (tc_chain, test_apply_no_e_bit);
|
||||
tcase_add_test (tc_chain, test_apply_e_bit);
|
||||
tc_events = tcase_create ("events");
|
||||
suite_add_tcase (s, tc_events);
|
||||
tcase_add_checked_fixture (tc_events, setup_with_event, cleanup_with_event);
|
||||
|
||||
tcase_add_test (tc_events, test_serialized_events);
|
||||
tcase_add_test (tc_events, test_non_serialized_events);
|
||||
|
||||
return s;
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue