diff --git a/girs/GstRtp-1.0.gir b/girs/GstRtp-1.0.gir index 4dad967d07..6ae0dfdb77 100644 --- a/girs/GstRtp-1.0.gir +++ b/girs/GstRtp-1.0.gir @@ -2332,7 +2332,39 @@ audio codec - Provides a base class for RTP depayloaders + Provides a base class for RTP depayloaders + +In order to handle RTP header extensions correctly if the +depayloader aggregates multiple RTP packet payloads into one output +buffer this class provides the function +gst_rtp_base_depayload_set_aggregate_hdrext_enabled(). If the +aggregation is enabled the virtual functions +@GstRTPBaseDepayload.process or +@GstRTPBaseDepayload.process_rtp_packet must tell the base class +what happens to the current RTP packet. By default the base class +assumes that the packet payload is used with the next output +buffer. + +If the RTP packet will not be used with an output buffer +gst_rtp_base_depayload_dropped() must be called. A typical +situation would be if we are waiting for a keyframe. + +If the RTP packet will be used but not with the current output +buffer but with the next one gst_rtp_base_depayload_delayed() must +be called. This may happen if the current RTP packet signals the +start of a new output buffer and the currently processed output +buffer will be pushed first. The undelay happens implicitly once +the current buffer has been pushed or +gst_rtp_base_depayload_flush() has been called. + +If gst_rtp_base_depayload_flush() is called all RTP packets that +have not been dropped since the last output buffer are dropped, +e.g. if an output buffer is discarded due to malformed data. This +may or may not include the current RTP packet depending on the 2nd +parameter @keep_current. + +Be aware that in case gst_rtp_base_depayload_push_list() is used +each buffer will see the same list of RTP header extensions. @@ -2404,6 +2436,100 @@ audio codec + + Called from @GstRTPBaseDepayload.process or +@GstRTPBaseDepayload.process_rtp_packet when the depayloader needs +to keep the current input RTP header for use with the next output +buffer. + +The delayed buffer will remain until the end of processing the +current output buffer and then enqueued for processing with the +next output buffer. + +A typical use-case is when the depayloader implementation will +start a new output buffer for the current input RTP buffer but push +the current output buffer first. + +Must be called with the stream lock held. + + + + + + + a #GstRTPBaseDepayload + + + + + + Called from @GstRTPBaseDepayload.process or +@GstRTPBaseDepayload.process_rtp_packet if the depayloader does not +use the current buffer for the output buffer. This will either drop +the delayed buffer or the last buffer from the header extension +cache. + +A typical use-case is when the depayloader implementation is +dropping an input RTP buffer while waiting for the first keyframe. + +Must be called with the stream lock held. + + + + + + + a #GstRTPBaseDepayload + + + + + + If @GstRTPBaseDepayload.process or +@GstRTPBaseDepayload.process_rtp_packet drop an output buffer this +function tells the base class to flush header extension cache as +well. + +This will not drop an input RTP header marked as delayed from +gst_rtp_base_depayload_delayed(). + +If @keep_current is %TRUE the current input RTP header will be kept +and enqueued after flushing the previous input RTP headers. + +A typical use-case for @keep_current is when the depayloader +implementation invalidates the current output buffer and starts a +new one with the current RTP input buffer. + +Must be called with the stream lock held. + + + + + + + a #GstRTPBaseDepayload + + + + if the current RTP buffer shall be kept + + + + + + Queries whether header extensions will be aggregated per depayloaded buffers. + + + %TRUE if aggregate-header-extension is enabled. + + + + + a #GstRTPBaseDepayload + + + + Queries whether #GstRTPSourceMeta will be added to depayloaded buffers. @@ -2459,6 +2585,23 @@ the outgoing buffer when it didn't have a timestamp already. + + Enable or disable aggregating header extensions. + + + + + + + a #GstRTPBaseDepayload + + + + whether to aggregate header extensions per output buffer + + + + Enable or disable adding #GstRTPSourceMeta to depayloaded buffers. diff --git a/subprojects/gst-plugins-base/gst-libs/gst/rtp/gstrtpbasedepayload.c b/subprojects/gst-plugins-base/gst-libs/gst/rtp/gstrtpbasedepayload.c index 0fd2d827a5..c788898ef2 100644 --- a/subprojects/gst-plugins-base/gst-libs/gst/rtp/gstrtpbasedepayload.c +++ b/subprojects/gst-plugins-base/gst-libs/gst/rtp/gstrtpbasedepayload.c @@ -24,6 +24,38 @@ * @short_description: Base class for RTP depayloader * * Provides a base class for RTP depayloaders + * + * In order to handle RTP header extensions correctly if the + * depayloader aggregates multiple RTP packet payloads into one output + * buffer this class provides the function + * gst_rtp_base_depayload_set_aggregate_hdrext_enabled(). If the + * aggregation is enabled the virtual functions + * @GstRTPBaseDepayload.process or + * @GstRTPBaseDepayload.process_rtp_packet must tell the base class + * what happens to the current RTP packet. By default the base class + * assumes that the packet payload is used with the next output + * buffer. + * + * If the RTP packet will not be used with an output buffer + * gst_rtp_base_depayload_dropped() must be called. A typical + * situation would be if we are waiting for a keyframe. + * + * If the RTP packet will be used but not with the current output + * buffer but with the next one gst_rtp_base_depayload_delayed() must + * be called. This may happen if the current RTP packet signals the + * start of a new output buffer and the currently processed output + * buffer will be pushed first. The undelay happens implicitly once + * the current buffer has been pushed or + * gst_rtp_base_depayload_flush() has been called. + * + * If gst_rtp_base_depayload_flush() is called all RTP packets that + * have not been dropped since the last output buffer are dropped, + * e.g. if an output buffer is discarded due to malformed data. This + * may or may not include the current RTP packet depending on the 2nd + * parameter @keep_current. + * + * Be aware that in case gst_rtp_base_depayload_push_list() is used + * each buffer will see the same list of RTP header extensions. */ #ifdef HAVE_CONFIG_H #include "config.h" @@ -75,6 +107,13 @@ struct _GstRTPBaseDepayloadPrivate /* array of GstRTPHeaderExtension's * */ GPtrArray *header_exts; + + /* maintain buffer list for header extensions read() */ + gboolean hdrext_aggregate; + GstBufferList *hdrext_buffers; + GstBuffer *hdrext_delayed; + GstBuffer *hdrext_outbuf; + gboolean hdrext_read_result; }; /* Filter signals and args */ @@ -139,6 +178,11 @@ static void gst_rtp_base_depayload_add_extension (GstRTPBaseDepayload * static void gst_rtp_base_depayload_clear_extensions (GstRTPBaseDepayload * rtpbasepayload); +static gboolean gst_rtp_base_depayload_operate_hdrext_buffer (GstBuffer ** + buffer, guint idx, gpointer depayloader); +static void gst_rtp_base_depayload_reset_hdrext_buffers (GstRTPBaseDepayload * + rtpbasepayload); + GType gst_rtp_base_depayload_get_type (void) { @@ -404,11 +448,13 @@ gst_rtp_base_depayload_init (GstRTPBaseDepayload * filter, priv->source_info = DEFAULT_SOURCE_INFO; priv->max_reorder = DEFAULT_MAX_REORDER; priv->auto_hdr_ext = DEFAULT_AUTO_HEADER_EXTENSION; + priv->hdrext_aggregate = FALSE; gst_segment_init (&filter->segment, GST_FORMAT_UNDEFINED); priv->header_exts = g_ptr_array_new_with_free_func ((GDestroyNotify) gst_object_unref); + priv->hdrext_buffers = gst_buffer_list_new (); } static void @@ -417,7 +463,7 @@ gst_rtp_base_depayload_finalize (GObject * object) GstRTPBaseDepayload *rtpbasedepayload = GST_RTP_BASE_DEPAYLOAD (object); g_ptr_array_unref (rtpbasedepayload->priv->header_exts); - rtpbasedepayload->priv->header_exts = NULL; + gst_clear_buffer_list (&rtpbasedepayload->priv->hdrext_buffers); G_OBJECT_CLASS (parent_class)->finalize (object); } @@ -808,6 +854,22 @@ gst_rtp_base_depayload_handle_buffer (GstRTPBaseDepayload * filter, priv->input_buffer = in; + if (discont) { + gst_rtp_base_depayload_reset_hdrext_buffers (filter); + g_assert_null (priv->hdrext_delayed); + } + + /* update RTP buffer cache for header extensions */ + if (priv->hdrext_aggregate) { + GstBuffer *b = gst_buffer_new (); + /* make a copy of the buffer that only contains the RTP header + with the extensions to not waste too much memory */ + guint s = gst_rtp_buffer_get_header_len (&rtp); + gst_buffer_copy_into (b, in, + GST_BUFFER_COPY_MEMORY | GST_BUFFER_COPY_DEEP, 0, s); + gst_buffer_list_add (priv->hdrext_buffers, b); + } + if (process_rtp_packet_func != NULL) { out_buf = process_rtp_packet_func (filter, &rtp); gst_rtp_buffer_unmap (&rtp); @@ -820,10 +882,22 @@ gst_rtp_base_depayload_handle_buffer (GstRTPBaseDepayload * filter, /* let's send it out to processing */ if (out_buf) { - if (priv->process_flow_ret == GST_FLOW_OK) + if (priv->process_flow_ret == GST_FLOW_OK) { priv->process_flow_ret = gst_rtp_base_depayload_push (filter, out_buf); - else + } else { gst_buffer_unref (out_buf); + gst_rtp_base_depayload_reset_hdrext_buffers (filter); + } + } + + /* if the current buffer is delayed the depayloader should either + have called gst_rtp_base_depayload_push() internally or returned + a buffer that's pushed, either way the buffer cache should be + empty here and we append the delayed buffer */ + if (priv->hdrext_delayed) { + g_assert_true (gst_buffer_list_length (priv->hdrext_buffers) == 0); + gst_buffer_list_add (priv->hdrext_buffers, priv->hdrext_delayed); + priv->hdrext_delayed = NULL; } gst_buffer_unref (in); @@ -1283,12 +1357,34 @@ out: return needs_src_caps_update; } +static gboolean +gst_rtp_base_depayload_operate_hdrext_buffer (GstBuffer ** buffer, + guint idx, gpointer depayloader) +{ + GstRTPBaseDepayload *depayload = depayloader; + + depayload->priv->hdrext_read_result |= + read_rtp_header_extensions (depayload, *buffer, + depayload->priv->hdrext_outbuf); + return TRUE; +} + +static void +gst_rtp_base_depayload_reset_hdrext_buffers (GstRTPBaseDepayload * depayload) +{ + GstRTPBaseDepayloadPrivate *priv = depayload->priv; + + gst_buffer_list_unref (priv->hdrext_buffers); + priv->hdrext_buffers = gst_buffer_list_new (); +} + static gboolean gst_rtp_base_depayload_set_headers (GstRTPBaseDepayload * depayload, GstBuffer * buffer) { GstRTPBaseDepayloadPrivate *priv = depayload->priv; GstClockTime pts, dts, duration; + gboolean ret = FALSE; pts = GST_BUFFER_PTS (buffer); dts = GST_BUFFER_DTS (buffer); @@ -1318,10 +1414,25 @@ gst_rtp_base_depayload_set_headers (GstRTPBaseDepayload * depayload, if (priv->source_info) add_rtp_source_meta (buffer, priv->input_buffer); - return read_rtp_header_extensions (depayload, priv->input_buffer, buffer); + if (priv->hdrext_aggregate) { + priv->hdrext_read_result = FALSE; + priv->hdrext_outbuf = buffer; + /* if we have an empty list but a delayed RTP buffer let's use it */ + if (!gst_buffer_list_length (priv->hdrext_buffers) && + priv->hdrext_delayed) { + gst_buffer_list_add (priv->hdrext_buffers, priv->hdrext_delayed); + priv->hdrext_delayed = NULL; + } + gst_buffer_list_foreach (priv->hdrext_buffers, + gst_rtp_base_depayload_operate_hdrext_buffer, depayload); + ret = priv->hdrext_read_result; + priv->hdrext_outbuf = NULL; + } else { + ret = read_rtp_header_extensions (depayload, priv->input_buffer, buffer); + } } - return FALSE; + return ret; } static GstFlowReturn @@ -1447,6 +1558,8 @@ gst_rtp_base_depayload_do_push (GstRTPBaseDepayload * filter, gboolean is_list, gst_clear_buffer (&buf); } + gst_rtp_base_depayload_reset_hdrext_buffers (filter); + return res; } @@ -1718,3 +1831,146 @@ gst_rtp_base_depayload_is_source_info_enabled (GstRTPBaseDepayload * depayload) { return depayload->priv->source_info; } + +/** + * gst_rtp_base_depayload_set_aggregate_hdrext_enabled: + * @depayload: a #GstRTPBaseDepayload + * @enable: whether to aggregate header extensions per output buffer + * + * Enable or disable aggregating header extensions. + * + * Since: 1.24 + **/ +void +gst_rtp_base_depayload_set_aggregate_hdrext_enabled (GstRTPBaseDepayload * + depayload, gboolean enable) +{ + depayload->priv->hdrext_aggregate = enable; + if (!enable) + gst_rtp_base_depayload_reset_hdrext_buffers (depayload); +} + +/** + * gst_rtp_base_depayload_is_aggregate_hdrext_enabled: + * @depayload: a #GstRTPBaseDepayload + * + * Queries whether header extensions will be aggregated per depayloaded buffers. + * + * Returns: %TRUE if aggregate-header-extension is enabled. + * + * Since: 1.24 + **/ +gboolean +gst_rtp_base_depayload_is_aggregate_hdrext_enabled (GstRTPBaseDepayload * + depayload) +{ + return depayload->priv->hdrext_aggregate; +} + +/** + * gst_rtp_base_depayload_dropped: + * @depayload: a #GstRTPBaseDepayload + * + * Called from @GstRTPBaseDepayload.process or + * @GstRTPBaseDepayload.process_rtp_packet if the depayloader does not + * use the current buffer for the output buffer. This will either drop + * the delayed buffer or the last buffer from the header extension + * cache. + * + * A typical use-case is when the depayloader implementation is + * dropping an input RTP buffer while waiting for the first keyframe. + * + * Must be called with the stream lock held. + * + * Since: 1.24 + **/ +void +gst_rtp_base_depayload_dropped (GstRTPBaseDepayload * depayload) +{ + GstRTPBaseDepayloadPrivate *priv = depayload->priv; + guint l = gst_buffer_list_length (priv->hdrext_buffers); + + if (priv->hdrext_delayed) { + gst_clear_buffer (&priv->hdrext_delayed); + } else if (l) { + gst_buffer_list_remove (priv->hdrext_buffers, l - 1, 1); + } +} + +/** + * gst_rtp_base_depayload_delayed: + * @depayload: a #GstRTPBaseDepayload + * + * Called from @GstRTPBaseDepayload.process or + * @GstRTPBaseDepayload.process_rtp_packet when the depayloader needs + * to keep the current input RTP header for use with the next output + * buffer. + * + * The delayed buffer will remain until the end of processing the + * current output buffer and then enqueued for processing with the + * next output buffer. + * + * A typical use-case is when the depayloader implementation will + * start a new output buffer for the current input RTP buffer but push + * the current output buffer first. + * + * Must be called with the stream lock held. + * + * Since: 1.24 + **/ +void +gst_rtp_base_depayload_delayed (GstRTPBaseDepayload * depayload) +{ + GstRTPBaseDepayloadPrivate *priv = depayload->priv; + guint l = gst_buffer_list_length (priv->hdrext_buffers); + + if (l) { + priv->hdrext_delayed = gst_buffer_list_get (priv->hdrext_buffers, l - 1); + gst_buffer_ref (priv->hdrext_delayed); + gst_buffer_list_remove (priv->hdrext_buffers, l - 1, 1); + } +} + +/** + * gst_rtp_base_depayload_flush: + * @depayload: a #GstRTPBaseDepayload + * @keep_current: if the current RTP buffer shall be kept + * + * If @GstRTPBaseDepayload.process or + * @GstRTPBaseDepayload.process_rtp_packet drop an output buffer this + * function tells the base class to flush header extension cache as + * well. + * + * This will not drop an input RTP header marked as delayed from + * gst_rtp_base_depayload_delayed(). + * + * If @keep_current is %TRUE the current input RTP header will be kept + * and enqueued after flushing the previous input RTP headers. + * + * A typical use-case for @keep_current is when the depayloader + * implementation invalidates the current output buffer and starts a + * new one with the current RTP input buffer. + * + * Must be called with the stream lock held. + * + * Since: 1.24 + **/ +void +gst_rtp_base_depayload_flush (GstRTPBaseDepayload * depayload, + gboolean keep_current) +{ + GstRTPBaseDepayloadPrivate *priv = depayload->priv; + guint l = gst_buffer_list_length (priv->hdrext_buffers); + + /* if the current buffer shall not be kept or has already been + removed from the cache clear the cache */ + if (!keep_current || priv->hdrext_delayed) { + gst_rtp_base_depayload_reset_hdrext_buffers (depayload); + } else if (l) { + /* clear all cached buffers (if any) except the delayed */ + GstBuffer *b = gst_buffer_list_get (priv->hdrext_buffers, l - 1); + gst_buffer_ref (b); + gst_rtp_base_depayload_reset_hdrext_buffers (depayload); + gst_buffer_list_add (priv->hdrext_buffers, b); + } +} diff --git a/subprojects/gst-plugins-base/gst-libs/gst/rtp/gstrtpbasedepayload.h b/subprojects/gst-plugins-base/gst-libs/gst/rtp/gstrtpbasedepayload.h index 341a61551c..1b778fc43b 100644 --- a/subprojects/gst-plugins-base/gst-libs/gst/rtp/gstrtpbasedepayload.h +++ b/subprojects/gst-plugins-base/gst-libs/gst/rtp/gstrtpbasedepayload.h @@ -127,6 +127,22 @@ GST_RTP_API void gst_rtp_base_depayload_set_source_info_enabled (GstRTPBaseDepayload * depayload, gboolean enable); +GST_RTP_API +void gst_rtp_base_depayload_dropped (GstRTPBaseDepayload * depayload); + +GST_RTP_API +void gst_rtp_base_depayload_delayed (GstRTPBaseDepayload * depayload); + +GST_RTP_API +void gst_rtp_base_depayload_flush (GstRTPBaseDepayload * depayload, + gboolean keep_current); + +GST_RTP_API +gboolean gst_rtp_base_depayload_is_aggregate_hdrext_enabled (GstRTPBaseDepayload * depayload); + +GST_RTP_API +void gst_rtp_base_depayload_set_aggregate_hdrext_enabled (GstRTPBaseDepayload * depayload, + gboolean enable); G_DEFINE_AUTOPTR_CLEANUP_FUNC(GstRTPBaseDepayload, gst_object_unref) diff --git a/subprojects/gst-plugins-base/tests/check/libs/rtpbasedepayload.c b/subprojects/gst-plugins-base/tests/check/libs/rtpbasedepayload.c index 464b95fca2..29e9098663 100644 --- a/subprojects/gst-plugins-base/tests/check/libs/rtpbasedepayload.c +++ b/subprojects/gst-plugins-base/tests/check/libs/rtpbasedepayload.c @@ -48,8 +48,17 @@ typedef enum GST_RTP_DUMMY_RETURN_TO_PUSH, GST_RTP_DUMMY_USE_PUSH_FUNC, GST_RTP_DUMMY_USE_PUSH_LIST_FUNC, + GST_RTP_DUMMY_USE_PUSH_AGGREGATE_FUNC, } GstRtpDummyPushMethod; +typedef enum +{ + GST_RTP_DUMMY_PUSH_AGGREGATE_DEFAULT, + GST_RTP_DUMMY_PUSH_AGGREGATE_DROP, + GST_RTP_DUMMY_PUSH_AGGREGATE_DELAYED, + GST_RTP_DUMMY_PUSH_AGGREGATE_FLUSH, +} GstRtpDummyPushAggregateMethod; + typedef struct _GstRtpDummyDepay GstRtpDummyDepay; typedef struct _GstRtpDummyDepayClass GstRtpDummyDepayClass; @@ -60,6 +69,10 @@ struct _GstRtpDummyDepay GstRtpDummyPushMethod push_method; guint num_buffers_in_blist; + + GstRtpDummyPushAggregateMethod aggregate_method; + guint num_buffers_to_aggregate; + guint num_buffers_aggregated; }; struct _GstRtpDummyDepayClass @@ -112,6 +125,8 @@ gst_rtp_dummy_depay_init (GstRtpDummyDepay * depay) { depay->rtptime = 0; depay->num_buffers_in_blist = 1; + depay->num_buffers_to_aggregate = 1; + depay->num_buffers_aggregated = 0; } static GstRtpDummyDepay * @@ -180,6 +195,34 @@ gst_rtp_dummy_depay_process (GstRTPBaseDepayload * depayload, GstBuffer * buf) gst_rtp_base_depayload_push_list (depayload, blist); break; } + case GST_RTP_DUMMY_USE_PUSH_AGGREGATE_FUNC: + ++self->num_buffers_aggregated; + if (self->num_buffers_aggregated != self->num_buffers_to_aggregate) { + switch (self->aggregate_method) { + case GST_RTP_DUMMY_PUSH_AGGREGATE_DROP: + gst_rtp_base_depayload_dropped (depayload); + break; + case GST_RTP_DUMMY_PUSH_AGGREGATE_DEFAULT: + case GST_RTP_DUMMY_PUSH_AGGREGATE_DELAYED: + case GST_RTP_DUMMY_PUSH_AGGREGATE_FLUSH: + break; + } + gst_clear_buffer (&outbuf); + } else { + switch (self->aggregate_method) { + case GST_RTP_DUMMY_PUSH_AGGREGATE_DELAYED: + gst_rtp_base_depayload_delayed (depayload); + break; + case GST_RTP_DUMMY_PUSH_AGGREGATE_FLUSH: + gst_rtp_base_depayload_flush (depayload, TRUE); + break; + case GST_RTP_DUMMY_PUSH_AGGREGATE_DROP: + case GST_RTP_DUMMY_PUSH_AGGREGATE_DEFAULT: + break; + } + self->num_buffers_aggregated = 0; + } + break; case GST_RTP_DUMMY_RETURN_TO_PUSH: break; } @@ -1877,6 +1920,170 @@ GST_START_TEST (rtp_base_depayload_hdr_ext_caps_change) GST_END_TEST; +static GstFlowReturn +hdr_ext_aggregate_chain_func (GstPad * pad, GstObject * parent, + GstBuffer * buffer) +{ + GstFlowReturn res; + GstCaps *caps; + guint val; + GstPad *srcpad; + GstElement *depay; + static gboolean first = TRUE; + static guint expected_caps_val = 0; + + res = gst_check_chain_func (pad, parent, buffer); + if (res != GST_FLOW_OK) { + return res; + } + + caps = gst_pad_get_current_caps (pad); + + fail_unless (gst_structure_get_uint (gst_caps_get_structure (caps, 0), + "dummy-hdrext-val", &val)); + + srcpad = gst_pad_get_peer (pad); + depay = gst_pad_get_parent_element (srcpad); + + switch (GST_RTP_DUMMY_DEPAY (depay)->aggregate_method) { + case GST_RTP_DUMMY_PUSH_AGGREGATE_DEFAULT: + /* Every fifth buffer increments "dummy-hdrext-val", but we + aggregate 5 buffers per output buffer so we increment for every + output buffer. */ + expected_caps_val++; + break; + case GST_RTP_DUMMY_PUSH_AGGREGATE_DROP: + /* We aggregate 5 buffers per output buffer but drop 4 of them + from the buffer cache. */ + if (g_list_length (buffers) % 5 == 1) { + expected_caps_val++; + } + break; + case GST_RTP_DUMMY_PUSH_AGGREGATE_DELAYED: + /* We aggregate 6 buffers per output buffer but delay the 6th one + which will then account to the 2nd output buffer. Thus the 1st + output buffer will process 5 header extensions (val increments + by one) whereas the 2nd buffer will process 6 (val increments + by two)! */ + if (first) { + first = FALSE; + expected_caps_val++; + } else { + expected_caps_val += 2; + } + break; + case GST_RTP_DUMMY_PUSH_AGGREGATE_FLUSH: + /* We aggregate 5 buffers per output buffer but flush 4 of them + from the hdr ext buffer cache. */ + if (g_list_length (buffers) % 5 == 1) { + expected_caps_val++; + } + break; + } + + gst_object_unref (depay); + gst_object_unref (srcpad); + + fail_unless_equals_int (expected_caps_val, val); + + gst_caps_unref (caps); + + return res; +} + +static void +hdr_ext_aggregate_test (gint n_buffers, gint n_aggregate, + GstRtpDummyPushAggregateMethod method) +{ + GstRTPHeaderExtension *ext; + State *state; + guint i; + + state = create_depayloader ("application/x-rtp", NULL); + gst_rtp_base_depayload_set_aggregate_hdrext_enabled (GST_RTP_BASE_DEPAYLOAD + (state->element), TRUE); + gst_pad_set_chain_function (state->sinkpad, hdr_ext_aggregate_chain_func); + ext = rtp_dummy_hdr_ext_new (); + gst_rtp_header_extension_set_id (ext, 1); + + GST_RTP_DUMMY_DEPAY (state->element)->push_method = + GST_RTP_DUMMY_USE_PUSH_AGGREGATE_FUNC; + GST_RTP_DUMMY_DEPAY (state->element)->num_buffers_to_aggregate = n_aggregate; + GST_RTP_DUMMY_DEPAY (state->element)->aggregate_method = method; + + g_signal_emit_by_name (state->element, "add-extension", ext); + set_state (state, GST_STATE_PLAYING); + + for (i = 0; i < n_buffers; ++i) { + push_rtp_buffer (state, "pts", 0 * GST_SECOND, + "rtptime", G_GUINT64_CONSTANT (0x1234), "seq", 0x4242 + i, "hdrext-1", + ext, NULL); + } + + set_state (state, GST_STATE_NULL); + validate_buffers_received (n_buffers / n_aggregate); + gst_object_unref (ext); + destroy_depayloader (state); +} + +GST_START_TEST (rtp_base_depayload_hdr_ext_aggregate) +{ + const gint num_buffers = 30; + const gint num_buffers_aggregate = 5; /* must match the modulo from + hdrext */ + + fail_unless_equals_int (num_buffers % num_buffers_aggregate, 0); + + hdr_ext_aggregate_test (num_buffers, num_buffers_aggregate, + GST_RTP_DUMMY_PUSH_AGGREGATE_DEFAULT); +} + +GST_END_TEST; + +GST_START_TEST (rtp_base_depayload_hdr_ext_aggregate_drop) +{ + const gint num_buffers = 30; + const gint num_buffers_aggregate = 5; /* must match the modulo from + hdrext */ + + fail_unless_equals_int (num_buffers % num_buffers_aggregate, 0); + + hdr_ext_aggregate_test (num_buffers, num_buffers_aggregate, + GST_RTP_DUMMY_PUSH_AGGREGATE_DROP); +} + +GST_END_TEST; + +GST_START_TEST (rtp_base_depayload_hdr_ext_aggregate_delayed) +{ + const gint num_buffers = 12; /* must be two times + num_buffers_aggregate */ + const gint num_buffers_aggregate = 6; /* must match the modulo from + hdrext + 1 */ + + fail_unless_equals_int (num_buffers % num_buffers_aggregate, 0); + fail_unless_equals_int (num_buffers / num_buffers_aggregate, 2); + + hdr_ext_aggregate_test (num_buffers, num_buffers_aggregate, + GST_RTP_DUMMY_PUSH_AGGREGATE_DELAYED); +} + +GST_END_TEST; + +GST_START_TEST (rtp_base_depayload_hdr_ext_aggregate_flush) +{ + const gint num_buffers = 30; + const gint num_buffers_aggregate = 5; /* must match the modulo from + hdrext */ + + fail_unless_equals_int (num_buffers % num_buffers_aggregate, 0); + + hdr_ext_aggregate_test (num_buffers, num_buffers_aggregate, + GST_RTP_DUMMY_PUSH_AGGREGATE_FLUSH); +} + +GST_END_TEST; + static Suite * rtp_basepayloading_suite (void) { @@ -1921,7 +2128,10 @@ rtp_basepayloading_suite (void) tcase_add_test (tc_chain, rtp_base_depayload_multiple_exts); tcase_add_test (tc_chain, rtp_base_depayload_caps_request_ignored); tcase_add_test (tc_chain, rtp_base_depayload_hdr_ext_caps_change); - + tcase_add_test (tc_chain, rtp_base_depayload_hdr_ext_aggregate); + tcase_add_test (tc_chain, rtp_base_depayload_hdr_ext_aggregate_drop); + tcase_add_test (tc_chain, rtp_base_depayload_hdr_ext_aggregate_delayed); + tcase_add_test (tc_chain, rtp_base_depayload_hdr_ext_aggregate_flush); return s; } diff --git a/subprojects/gst-plugins-base/tests/check/libs/rtpdummyhdrextimpl.c b/subprojects/gst-plugins-base/tests/check/libs/rtpdummyhdrextimpl.c index ca961328d2..9910d9d61d 100644 --- a/subprojects/gst-plugins-base/tests/check/libs/rtpdummyhdrextimpl.c +++ b/subprojects/gst-plugins-base/tests/check/libs/rtpdummyhdrextimpl.c @@ -199,6 +199,7 @@ gst_rtp_dummy_hdr_ext_read (GstRTPHeaderExtension * ext, if (dummy->read_count % 5 == 1) { /* Every fifth buffer triggers caps change. */ + ++dummy->caps_field_value; gst_rtp_header_extension_set_wants_update_non_rtp_src_caps (ext, TRUE); } @@ -236,7 +237,7 @@ gst_rtp_dummy_hdr_ext_update_non_rtp_src_caps (GstRTPHeaderExtension * ext, GstRTPDummyHdrExt *dummy = GST_RTP_DUMMY_HDR_EXT (ext); gst_caps_set_simple (caps, "dummy-hdrext-val", G_TYPE_UINT, - ++dummy->caps_field_value, NULL); + dummy->caps_field_value, NULL); return TRUE; }