From ea7b843244f3494a96b43d0a5dcaed69b8a5bf56 Mon Sep 17 00:00:00 2001 From: Thiago Santos Date: Tue, 1 Jul 2014 10:07:40 -0300 Subject: [PATCH] gdppay: put all sticky events in streamheader Use the sticky events to compose the streamheader as they are the ones that are persisted to config new pads linked. Instead of storing them ourselves rely on the pad storage that already orders it for us https://bugzilla.gnome.org/show_bug.cgi?id=732596 --- gst/gdp/gstgdppay.c | 244 +++++++++++++--------------------- gst/gdp/gstgdppay.h | 9 +- tests/check/elements/gdppay.c | 23 ++-- 3 files changed, 111 insertions(+), 165 deletions(-) diff --git a/gst/gdp/gstgdppay.c b/gst/gdp/gstgdppay.c index d8bc818dceb..708e13790e4 100644 --- a/gst/gdp/gstgdppay.c +++ b/gst/gdp/gstgdppay.c @@ -188,23 +188,11 @@ gst_gdp_pay_reset (GstGDPPay * this) gst_caps_unref (this->caps); this->caps = NULL; } - if (this->caps_buf) { - gst_buffer_unref (this->caps_buf); - this->caps_buf = NULL; - } - if (this->tag_buf) { - gst_buffer_unref (this->tag_buf); - this->tag_buf = NULL; - } - if (this->new_segment_buf) { - gst_buffer_unref (this->new_segment_buf); - this->new_segment_buf = NULL; - } - if (this->streamstartid_buf) { - gst_buffer_unref (this->streamstartid_buf); - this->streamstartid_buf = NULL; - } + this->have_caps = FALSE; + this->have_segment = FALSE; + this->have_streamstartid = FALSE; this->sent_streamheader = FALSE; + this->reset_streamheader = FALSE; this->offset = 0; } @@ -308,6 +296,48 @@ no_event: } } +static void +gdp_streamheader_array_append_buffer (GValue * array, GstBuffer * buf) +{ + GValue value = { 0, }; + + g_value_init (&value, GST_TYPE_BUFFER); + gst_value_set_buffer (&value, buf); + gst_value_array_append_value (array, &value); + g_value_unset (&value); +} + +typedef struct +{ + GstGDPPay *gdppay; + GValue *array; +} GstGDPPayAndArray; + +static gboolean +gdp_streamheader_array_store_events (GstPad * pad, GstEvent ** event, + gpointer udata) +{ + GstGDPPayAndArray *gdp_and_array = udata; + GstGDPPay *this = gdp_and_array->gdppay; + GValue *array = gdp_and_array->array; + GstBuffer *buf; + + /* Need to handle caps differently to keep compatibility with 1.0 */ + if (GST_EVENT_TYPE (*event) == GST_EVENT_CAPS) { + GstCaps *caps; + + gst_event_parse_caps (*event, &caps); + buf = gst_gdp_buffer_from_caps (this, caps); + } else { + buf = gst_gdp_buffer_from_event (this, *event); + } + + GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_HEADER); + gst_gdp_stamp_buffer (this, buf); + gdp_streamheader_array_append_buffer (array, buf); + + return TRUE; +} /* set our caps with streamheader, based on the latest newsegment and caps, * and (possibly) GDP-serialized buffers of the streamheaders on the src pad */ @@ -315,14 +345,15 @@ static GstFlowReturn gst_gdp_pay_reset_streamheader (GstGDPPay * this) { GstCaps *caps; - /* We use copies of these to avoid circular refcounts */ - GstBuffer *new_segment_buf, *caps_buf, *tag_buf, *streamstartid_buf; GstStructure *structure; GstFlowReturn r = GST_FLOW_OK; gboolean version_one_zero = TRUE; + GstGDPPayAndArray gdp_and_array; GValue array = { 0 }; - GValue value = { 0 }; + + gdp_and_array.gdppay = this; + gdp_and_array.array = &array; GST_DEBUG_OBJECT (this, "start"); /* In version 0.2, we didn't need or send new segment or tags */ @@ -330,13 +361,13 @@ gst_gdp_pay_reset_streamheader (GstGDPPay * this) version_one_zero = FALSE; if (version_one_zero) { - if (!this->new_segment_buf || !this->caps_buf || !this->streamstartid_buf) { + if (!this->have_segment || !this->have_caps || !this->have_streamstartid) { GST_DEBUG_OBJECT (this, "1.0, missing new_segment or caps or stream " "start id, returning"); return GST_FLOW_OK; } } else { - if (!this->caps_buf) { + if (!this->have_caps) { GST_DEBUG_OBJECT (this, "0.2, missing caps, returning"); return GST_FLOW_OK; } @@ -346,51 +377,24 @@ gst_gdp_pay_reset_streamheader (GstGDPPay * this) * Stamp the buffers with offset and offset_end as well. * We do this here so the offsets match the order the buffers go out in */ g_value_init (&array, GST_TYPE_ARRAY); - if (version_one_zero) { - gst_gdp_stamp_buffer (this, this->streamstartid_buf); - GST_DEBUG_OBJECT (this, "appending copy of stream start id buffer %p", - this->streamstartid_buf); - streamstartid_buf = gst_buffer_copy (this->streamstartid_buf); - g_value_init (&value, GST_TYPE_BUFFER); - gst_value_set_buffer (&value, streamstartid_buf); - gst_value_array_append_value (&array, &value); - g_value_unset (&value); - gst_buffer_unref (streamstartid_buf); - } + gst_pad_sticky_events_foreach (this->sinkpad, + gdp_streamheader_array_store_events, &gdp_and_array); + } else { + GstEvent *capsevent = + gst_pad_get_sticky_event (this->sinkpad, GST_EVENT_CAPS, 0); + if (capsevent) { + GstCaps *caps; + GstBuffer *capsbuffer = NULL; - gst_gdp_stamp_buffer (this, this->caps_buf); - GST_DEBUG_OBJECT (this, "appending copy of caps buffer %p", this->caps_buf); - caps_buf = gst_buffer_copy (this->caps_buf); - g_value_init (&value, GST_TYPE_BUFFER); - gst_value_set_buffer (&value, caps_buf); - gst_value_array_append_value (&array, &value); - g_value_unset (&value); - gst_buffer_unref (caps_buf); + gst_event_parse_caps (capsevent, &caps); + capsbuffer = gst_gdp_buffer_from_caps (this, caps); - if (version_one_zero) { - gst_gdp_stamp_buffer (this, this->new_segment_buf); - GST_DEBUG_OBJECT (this, "1.0, appending copy of new segment buffer %p", - this->new_segment_buf); - new_segment_buf = gst_buffer_copy (this->new_segment_buf); - g_value_init (&value, GST_TYPE_BUFFER); - gst_value_set_buffer (&value, new_segment_buf); - gst_value_array_append_value (&array, &value); - g_value_unset (&value); - gst_buffer_unref (new_segment_buf); + gst_gdp_stamp_buffer (this, capsbuffer); + gdp_streamheader_array_append_buffer (&array, capsbuffer); - if (this->tag_buf) { - gst_gdp_stamp_buffer (this, this->tag_buf); - GST_DEBUG_OBJECT (this, "1.0, appending current tags buffer %p", - this->tag_buf); - tag_buf = this->tag_buf; - this->tag_buf = NULL; - - g_value_init (&value, GST_TYPE_BUFFER); - gst_value_set_buffer (&value, tag_buf); - gst_value_array_append_value (&array, &value); - g_value_unset (&value); - gst_buffer_unref (tag_buf); + gst_buffer_unref (capsbuffer); + gst_event_unref (capsevent); } } @@ -399,11 +403,8 @@ gst_gdp_pay_reset_streamheader (GstGDPPay * this) structure = gst_caps_get_structure (this->caps, 0); if (gst_structure_has_field (structure, "streamheader")) { const GValue *sh; - GArray *buffers; - GstBuffer *buffer; - int i; sh = gst_structure_get_value (structure, "streamheader"); @@ -440,10 +441,7 @@ gst_gdp_pay_reset_streamheader (GstGDPPay * this) GST_BUFFER_OFFSET_END (outbuffer) = GST_BUFFER_OFFSET_NONE; GST_BUFFER_TIMESTAMP (outbuffer) = GST_CLOCK_TIME_NONE; - g_value_init (&value, GST_TYPE_BUFFER); - gst_value_set_buffer (&value, outbuffer); - gst_value_array_append_value (&array, &value); - g_value_unset (&value); + gdp_streamheader_array_append_buffer (&array, outbuffer); gst_buffer_unref (outbuffer); } @@ -478,40 +476,6 @@ gst_gdp_pay_reset_streamheader (GstGDPPay * this) } } - /* push out these streamheader buffers, then flush our internal queue */ - GST_DEBUG_OBJECT (this, "Pushing GDP stream-start-id buffer %p", - this->streamstartid_buf); - r = gst_pad_push (this->srcpad, gst_buffer_ref (this->streamstartid_buf)); - if (r != GST_FLOW_OK) { - GST_WARNING_OBJECT (this, "pushing GDP stream-start-id buffer returned %d", - r); - goto done; - } - GST_DEBUG_OBJECT (this, "Pushing GDP caps buffer %p", this->caps_buf); - r = gst_pad_push (this->srcpad, gst_buffer_ref (this->caps_buf)); - if (r != GST_FLOW_OK) { - GST_WARNING_OBJECT (this, "pushing GDP caps buffer returned %d", r); - goto done; - } - GST_DEBUG_OBJECT (this, "Pushing GDP new_segment buffer %p with offset %" - G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT, this->new_segment_buf, - GST_BUFFER_OFFSET (this->new_segment_buf), - GST_BUFFER_OFFSET_END (this->new_segment_buf)); - /* we stored these bufs with refcount 1, so make sure we keep a ref */ - r = gst_pad_push (this->srcpad, gst_buffer_ref (this->new_segment_buf)); - if (r != GST_FLOW_OK) { - GST_WARNING_OBJECT (this, "pushing GDP newsegment buffer returned %d", r); - goto done; - } - if (this->tag_buf) { - GST_DEBUG_OBJECT (this, "Pushing GDP tag buffer %p", this->tag_buf); - /* we stored these bufs with refcount 1, so make sure we keep a ref */ - r = gst_pad_push (this->srcpad, gst_buffer_ref (this->tag_buf)); - if (r != GST_FLOW_OK) { - GST_WARNING_OBJECT (this, "pushing GDP tag buffer returned %d", r); - goto done; - } - } this->sent_streamheader = TRUE; GST_DEBUG_OBJECT (this, "need to push %d queued buffers", g_list_length (this->queue)); @@ -524,7 +488,6 @@ gst_gdp_pay_reset_streamheader (GstGDPPay * this) /* delete buffer from queue now */ this->queue = g_list_delete_link (this->queue, this->queue); - /* set caps and push */ r = gst_pad_push (this->srcpad, buffer); if (r != GST_FLOW_OK) { GST_WARNING_OBJECT (this, "pushing queued GDP buffer returned %d", r); @@ -532,6 +495,8 @@ gst_gdp_pay_reset_streamheader (GstGDPPay * this) } } + this->reset_streamheader = FALSE; + done: gst_caps_unref (caps); GST_DEBUG_OBJECT (this, "stop"); @@ -551,7 +516,7 @@ no_buffer: static GstFlowReturn gst_gdp_queue_buffer (GstGDPPay * this, GstBuffer * buffer) { - if (this->sent_streamheader) { + if (this->sent_streamheader && !this->reset_streamheader) { GST_LOG_OBJECT (this, "Pushing GDP buffer %p, caps %" GST_PTR_FORMAT, buffer, this->caps); return gst_pad_push (this->srcpad, buffer); @@ -559,12 +524,10 @@ gst_gdp_queue_buffer (GstGDPPay * this, GstBuffer * buffer) /* store it on an internal queue. buffer remains reffed. */ this->queue = g_list_append (this->queue, buffer); - GST_DEBUG_OBJECT (this, "streamheader not sent yet, " + GST_DEBUG_OBJECT (this, "streamheader not sent yet or needs update, " "queued buffer %p, now %d buffers queued", buffer, g_list_length (this->queue)); - gst_gdp_pay_reset_streamheader (this); - return GST_FLOW_OK; } @@ -582,7 +545,7 @@ gst_gdp_pay_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) /* we should have received a new_segment before, otherwise it's a bug. * fake one in that case */ - if (!this->new_segment_buf) { + if (!this->have_segment) { GstEvent *event; GstSegment segment; @@ -603,7 +566,7 @@ gst_gdp_pay_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) GST_BUFFER_FLAG_SET (outbuffer, GST_BUFFER_FLAG_HEADER); GST_DEBUG_OBJECT (this, "Storing buffer %p as new_segment_buf", outbuffer); - this->new_segment_buf = outbuffer; + this->have_segment = TRUE; } } /* make sure we've received caps before */ @@ -629,6 +592,9 @@ gst_gdp_pay_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) GST_BUFFER_TIMESTAMP (outbuffer) = GST_BUFFER_TIMESTAMP (buffer); GST_BUFFER_DURATION (outbuffer) = GST_BUFFER_DURATION (buffer); + if (this->reset_streamheader) + gst_gdp_pay_reset_streamheader (this); + ret = gst_gdp_queue_buffer (this, outbuffer); done: @@ -689,28 +655,16 @@ gst_gdp_pay_sink_event (GstPad * pad, GstObject * parent, GstEvent * event) * and not send it on */ switch (GST_EVENT_TYPE (event)) { case GST_EVENT_STREAM_START: - GST_DEBUG_OBJECT (this, "Storing stream start id in buffer %p", - outbuffer); - - if (this->streamstartid_buf) - gst_buffer_unref (this->streamstartid_buf); - this->streamstartid_buf = outbuffer; - - GST_BUFFER_FLAG_SET (outbuffer, GST_BUFFER_FLAG_HEADER); - gst_gdp_pay_reset_streamheader (this); + GST_DEBUG_OBJECT (this, "Received stream start id"); + this->have_streamstartid = TRUE; break; case GST_EVENT_SEGMENT: - GST_DEBUG_OBJECT (this, "Storing in caps buffer %p as new_segment_buf", - outbuffer); - - if (this->new_segment_buf) - gst_buffer_unref (this->new_segment_buf); - this->new_segment_buf = outbuffer; - - GST_BUFFER_FLAG_SET (outbuffer, GST_BUFFER_FLAG_HEADER); - gst_gdp_pay_reset_streamheader (this); + GST_DEBUG_OBJECT (this, "Received segment %" GST_PTR_FORMAT, event); + this->have_segment = TRUE; break; case GST_EVENT_CAPS:{ + GST_DEBUG_OBJECT (this, "Received caps %" GST_PTR_FORMAT, event); + this->have_caps = TRUE; gst_event_parse_caps (event, &caps); gst_buffer_replace (&outbuffer, NULL); if (this->caps == NULL || !gst_caps_is_equal (this->caps, caps)) { @@ -721,34 +675,18 @@ gst_gdp_pay_sink_event (GstPad * pad, GstObject * parent, GstEvent * event) goto no_buffer_from_caps; GST_BUFFER_DURATION (outbuffer) = 0; - GST_BUFFER_FLAG_SET (outbuffer, GST_BUFFER_FLAG_HEADER); - if (this->caps_buf) - gst_buffer_unref (this->caps_buf); - this->caps_buf = outbuffer; - gst_gdp_pay_reset_streamheader (this); } break; } - case GST_EVENT_TAG: - GST_DEBUG_OBJECT (this, "Storing in caps buffer %p as tag_buf", - outbuffer); - - if (this->tag_buf) - gst_buffer_unref (this->tag_buf); - this->tag_buf = outbuffer; - - GST_BUFFER_FLAG_SET (outbuffer, GST_BUFFER_FLAG_HEADER); - gst_gdp_pay_reset_streamheader (this); - break; default: - GST_DEBUG_OBJECT (this, "queuing GDP buffer %p of event %p", outbuffer, - event); - flowret = gst_gdp_queue_buffer (this, outbuffer); - if (flowret != GST_FLOW_OK) - goto push_error; break; } + if (GST_EVENT_IS_STICKY (event)) { + GST_BUFFER_FLAG_SET (outbuffer, GST_BUFFER_FLAG_HEADER); + this->reset_streamheader = TRUE; + } + /* if we have EOS, we should send on EOS ourselves */ if (GST_EVENT_TYPE (event) == GST_EVENT_EOS || GST_EVENT_TYPE (event) == GST_EVENT_STREAM_START) { @@ -757,6 +695,14 @@ gst_gdp_pay_sink_event (GstPad * pad, GstObject * parent, GstEvent * event) ret = gst_pad_push_event (this->srcpad, gst_event_ref (event)); } + if (GST_EVENT_TYPE (event) != GST_EVENT_EOS) { + GST_DEBUG_OBJECT (this, "queuing GDP buffer %p of event %p", outbuffer, + event); + flowret = gst_gdp_queue_buffer (this, outbuffer); + if (flowret != GST_FLOW_OK) + goto push_error; + } + done: gst_event_unref (event); diff --git a/gst/gdp/gstgdppay.h b/gst/gdp/gstgdppay.h index d4433ef7752..1dc71aef146 100644 --- a/gst/gdp/gstgdppay.h +++ b/gst/gdp/gstgdppay.h @@ -52,10 +52,11 @@ struct _GstGDPPay GstCaps *caps; /* incoming caps */ - GstBuffer *streamstartid_buf; - GstBuffer *caps_buf; - GstBuffer *new_segment_buf; - GstBuffer *tag_buf; + gboolean have_streamstartid; + gboolean have_caps; + gboolean have_segment; + + gboolean reset_streamheader; gboolean sent_streamheader; /* TRUE after the first streamheaders are sent */ GList *queue; /* list of queued buffers before streamheaders are sent */ diff --git a/tests/check/elements/gdppay.c b/tests/check/elements/gdppay.c index 704c54f2c0f..ccf97d697c8 100644 --- a/tests/check/elements/gdppay.c +++ b/tests/check/elements/gdppay.c @@ -110,7 +110,7 @@ check_caps_buffer (gint refcount, GstCaps * caps) fail_if ((outbuffer = (GstBuffer *) buffers->data) == NULL); buffers = g_list_remove (buffers, outbuffer); - ASSERT_BUFFER_REFCOUNT (outbuffer, "outbuffer", 2); + ASSERT_BUFFER_REFCOUNT (outbuffer, "outbuffer", refcount); length = GST_DP_HEADER_LENGTH + (strlen (caps_string) + 1); fail_unless_equals_int (gst_buffer_get_size (outbuffer), length); gst_buffer_unref (outbuffer); @@ -159,14 +159,13 @@ GST_START_TEST (test_audio) fail_unless_equals_int (g_list_length (buffers), 4); /* first buffer is the stream-start event */ - check_stream_start_buffer (2); + check_stream_start_buffer (1); - /* second buffer is the serialized caps; - * the element also holds a ref to it */ - check_caps_buffer (2, caps); + /* second buffer is the serialized caps */ + check_caps_buffer (1, caps); /* third buffer is the serialized new_segment event */ - check_segment_buffer (2); + check_segment_buffer (1); /* the fourth buffer is the GDP buffer for our pushed buffer */ fail_if ((outbuffer = (GstBuffer *) buffers->data) == NULL); @@ -319,15 +318,15 @@ GST_START_TEST (test_streamheader) gst_caps_unref (sinkcaps); /* first buffer is the stream-start event */ - check_stream_start_buffer (2); + check_stream_start_buffer (1); /* second buffer is the serialized caps; * the element also holds a ref to it */ - check_caps_buffer (2, caps); + check_caps_buffer (1, caps); /* third buffer is the serialized new_segment event; * the element also holds a ref to it */ - check_segment_buffer (2); + check_segment_buffer (1); /* the fourth buffer is the GDP buffer for our pushed buffer */ fail_if ((outbuffer = (GstBuffer *) buffers->data) == NULL); @@ -500,16 +499,16 @@ GST_START_TEST (test_crc) fail_unless_equals_int (g_list_length (buffers), 4); /* first buffer is the stream-start event */ - check_stream_start_buffer (2); + check_stream_start_buffer (1); /* second buffer is the serialized caps; * the element also holds a ref to it */ - check_caps_buffer (2, caps); + check_caps_buffer (1, caps); /* third buffer is the serialized new_segment event */ fail_if ((outbuffer = (GstBuffer *) buffers->data) == NULL); buffers = g_list_remove (buffers, outbuffer); - ASSERT_BUFFER_REFCOUNT (outbuffer, "outbuffer", 2); + ASSERT_BUFFER_REFCOUNT (outbuffer, "outbuffer", 1); /* verify the header checksum */ /* CRC's start at 58 in the header */