rtpbasedepayload: Fixed HdrExt handling for aggregated output buffer

If a depayloader aggregates multiple RTP buffers into one buffer only
the last RTP buffer was checked for header extensions. Now the
depayloader remembers all RTP packets pushed before a output buffer is
pushed and checks all RTP buffers for header extensions.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/4979>
This commit is contained in:
Jochen Henneberg 2023-07-06 10:13:06 +02:00
parent 62f09513e5
commit d086491909
5 changed files with 634 additions and 8 deletions

View file

@ -2332,7 +2332,39 @@ audio codec</doc>
<source-position filename="../subprojects/gst-plugins-base/gst-libs/gst/rtp/gstrtpbaseaudiopayload.h"/>
</record>
<class name="RTPBaseDepayload" c:symbol-prefix="rtp_base_depayload" c:type="GstRTPBaseDepayload" parent="Gst.Element" abstract="1" glib:type-name="GstRTPBaseDepayload" glib:get-type="gst_rtp_base_depayload_get_type" glib:type-struct="RTPBaseDepayloadClass">
<doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/rtp/gstrtpbasedepayload.c">Provides a base class for RTP depayloaders</doc>
<doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/rtp/gstrtpbasedepayload.c">Provides a base class for RTP depayloaders
In order to handle RTP header extensions correctly if the
depayloader aggregates multiple RTP packet payloads into one output
buffer this class provides the function
gst_rtp_base_depayload_set_aggregate_hdrext_enabled(). If the
aggregation is enabled the virtual functions
@GstRTPBaseDepayload.process or
@GstRTPBaseDepayload.process_rtp_packet must tell the base class
what happens to the current RTP packet. By default the base class
assumes that the packet payload is used with the next output
buffer.
If the RTP packet will not be used with an output buffer
gst_rtp_base_depayload_dropped() must be called. A typical
situation would be if we are waiting for a keyframe.
If the RTP packet will be used but not with the current output
buffer but with the next one gst_rtp_base_depayload_delayed() must
be called. This may happen if the current RTP packet signals the
start of a new output buffer and the currently processed output
buffer will be pushed first. The undelay happens implicitly once
the current buffer has been pushed or
gst_rtp_base_depayload_flush() has been called.
If gst_rtp_base_depayload_flush() is called all RTP packets that
have not been dropped since the last output buffer are dropped,
e.g. if an output buffer is discarded due to malformed data. This
may or may not include the current RTP packet depending on the 2nd
parameter @keep_current.
Be aware that in case gst_rtp_base_depayload_push_list() is used
each buffer will see the same list of RTP header extensions.</doc>
<source-position filename="../subprojects/gst-plugins-base/gst-libs/gst/rtp/gstrtpbasedepayload.h"/>
<virtual-method name="handle_event">
<source-position filename="../subprojects/gst-plugins-base/gst-libs/gst/rtp/gstrtpbasedepayload.h"/>
@ -2404,6 +2436,100 @@ audio codec</doc>
</parameter>
</parameters>
</virtual-method>
<method name="delayed" c:identifier="gst_rtp_base_depayload_delayed" version="1.24">
<doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/rtp/gstrtpbasedepayload.c">Called from @GstRTPBaseDepayload.process or
@GstRTPBaseDepayload.process_rtp_packet when the depayloader needs
to keep the current input RTP header for use with the next output
buffer.
The delayed buffer will remain until the end of processing the
current output buffer and then enqueued for processing with the
next output buffer.
A typical use-case is when the depayloader implementation will
start a new output buffer for the current input RTP buffer but push
the current output buffer first.
Must be called with the stream lock held.</doc>
<source-position filename="../subprojects/gst-plugins-base/gst-libs/gst/rtp/gstrtpbasedepayload.h"/>
<return-value transfer-ownership="none">
<type name="none" c:type="void"/>
</return-value>
<parameters>
<instance-parameter name="depayload" transfer-ownership="none">
<doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/rtp/gstrtpbasedepayload.c">a #GstRTPBaseDepayload</doc>
<type name="RTPBaseDepayload" c:type="GstRTPBaseDepayload*"/>
</instance-parameter>
</parameters>
</method>
<method name="dropped" c:identifier="gst_rtp_base_depayload_dropped" version="1.24">
<doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/rtp/gstrtpbasedepayload.c">Called from @GstRTPBaseDepayload.process or
@GstRTPBaseDepayload.process_rtp_packet if the depayloader does not
use the current buffer for the output buffer. This will either drop
the delayed buffer or the last buffer from the header extension
cache.
A typical use-case is when the depayloader implementation is
dropping an input RTP buffer while waiting for the first keyframe.
Must be called with the stream lock held.</doc>
<source-position filename="../subprojects/gst-plugins-base/gst-libs/gst/rtp/gstrtpbasedepayload.h"/>
<return-value transfer-ownership="none">
<type name="none" c:type="void"/>
</return-value>
<parameters>
<instance-parameter name="depayload" transfer-ownership="none">
<doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/rtp/gstrtpbasedepayload.c">a #GstRTPBaseDepayload</doc>
<type name="RTPBaseDepayload" c:type="GstRTPBaseDepayload*"/>
</instance-parameter>
</parameters>
</method>
<method name="flush" c:identifier="gst_rtp_base_depayload_flush" version="1.24">
<doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/rtp/gstrtpbasedepayload.c">If @GstRTPBaseDepayload.process or
@GstRTPBaseDepayload.process_rtp_packet drop an output buffer this
function tells the base class to flush header extension cache as
well.
This will not drop an input RTP header marked as delayed from
gst_rtp_base_depayload_delayed().
If @keep_current is %TRUE the current input RTP header will be kept
and enqueued after flushing the previous input RTP headers.
A typical use-case for @keep_current is when the depayloader
implementation invalidates the current output buffer and starts a
new one with the current RTP input buffer.
Must be called with the stream lock held.</doc>
<source-position filename="../subprojects/gst-plugins-base/gst-libs/gst/rtp/gstrtpbasedepayload.h"/>
<return-value transfer-ownership="none">
<type name="none" c:type="void"/>
</return-value>
<parameters>
<instance-parameter name="depayload" transfer-ownership="none">
<doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/rtp/gstrtpbasedepayload.c">a #GstRTPBaseDepayload</doc>
<type name="RTPBaseDepayload" c:type="GstRTPBaseDepayload*"/>
</instance-parameter>
<parameter name="keep_current" transfer-ownership="none">
<doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/rtp/gstrtpbasedepayload.c">if the current RTP buffer shall be kept</doc>
<type name="gboolean" c:type="gboolean"/>
</parameter>
</parameters>
</method>
<method name="is_aggregate_hdrext_enabled" c:identifier="gst_rtp_base_depayload_is_aggregate_hdrext_enabled" version="1.24">
<doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/rtp/gstrtpbasedepayload.c">Queries whether header extensions will be aggregated per depayloaded buffers.</doc>
<source-position filename="../subprojects/gst-plugins-base/gst-libs/gst/rtp/gstrtpbasedepayload.h"/>
<return-value transfer-ownership="none">
<doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/rtp/gstrtpbasedepayload.c">%TRUE if aggregate-header-extension is enabled.</doc>
<type name="gboolean" c:type="gboolean"/>
</return-value>
<parameters>
<instance-parameter name="depayload" transfer-ownership="none">
<doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/rtp/gstrtpbasedepayload.c">a #GstRTPBaseDepayload</doc>
<type name="RTPBaseDepayload" c:type="GstRTPBaseDepayload*"/>
</instance-parameter>
</parameters>
</method>
<method name="is_source_info_enabled" c:identifier="gst_rtp_base_depayload_is_source_info_enabled" version="1.16">
<doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/rtp/gstrtpbasedepayload.c">Queries whether #GstRTPSourceMeta will be added to depayloaded buffers.</doc>
<source-position filename="../subprojects/gst-plugins-base/gst-libs/gst/rtp/gstrtpbasedepayload.h"/>
@ -2459,6 +2585,23 @@ the outgoing buffer when it didn't have a timestamp already.</doc>
</parameter>
</parameters>
</method>
<method name="set_aggregate_hdrext_enabled" c:identifier="gst_rtp_base_depayload_set_aggregate_hdrext_enabled" version="1.24">
<doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/rtp/gstrtpbasedepayload.c">Enable or disable aggregating header extensions.</doc>
<source-position filename="../subprojects/gst-plugins-base/gst-libs/gst/rtp/gstrtpbasedepayload.h"/>
<return-value transfer-ownership="none">
<type name="none" c:type="void"/>
</return-value>
<parameters>
<instance-parameter name="depayload" transfer-ownership="none">
<doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/rtp/gstrtpbasedepayload.c">a #GstRTPBaseDepayload</doc>
<type name="RTPBaseDepayload" c:type="GstRTPBaseDepayload*"/>
</instance-parameter>
<parameter name="enable" transfer-ownership="none">
<doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/rtp/gstrtpbasedepayload.c">whether to aggregate header extensions per output buffer</doc>
<type name="gboolean" c:type="gboolean"/>
</parameter>
</parameters>
</method>
<method name="set_source_info_enabled" c:identifier="gst_rtp_base_depayload_set_source_info_enabled" version="1.16">
<doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/rtp/gstrtpbasedepayload.c">Enable or disable adding #GstRTPSourceMeta to depayloaded buffers.</doc>
<source-position filename="../subprojects/gst-plugins-base/gst-libs/gst/rtp/gstrtpbasedepayload.h"/>

View file

@ -24,6 +24,38 @@
* @short_description: Base class for RTP depayloader
*
* Provides a base class for RTP depayloaders
*
* In order to handle RTP header extensions correctly if the
* depayloader aggregates multiple RTP packet payloads into one output
* buffer this class provides the function
* gst_rtp_base_depayload_set_aggregate_hdrext_enabled(). If the
* aggregation is enabled the virtual functions
* @GstRTPBaseDepayload.process or
* @GstRTPBaseDepayload.process_rtp_packet must tell the base class
* what happens to the current RTP packet. By default the base class
* assumes that the packet payload is used with the next output
* buffer.
*
* If the RTP packet will not be used with an output buffer
* gst_rtp_base_depayload_dropped() must be called. A typical
* situation would be if we are waiting for a keyframe.
*
* If the RTP packet will be used but not with the current output
* buffer but with the next one gst_rtp_base_depayload_delayed() must
* be called. This may happen if the current RTP packet signals the
* start of a new output buffer and the currently processed output
* buffer will be pushed first. The undelay happens implicitly once
* the current buffer has been pushed or
* gst_rtp_base_depayload_flush() has been called.
*
* If gst_rtp_base_depayload_flush() is called all RTP packets that
* have not been dropped since the last output buffer are dropped,
* e.g. if an output buffer is discarded due to malformed data. This
* may or may not include the current RTP packet depending on the 2nd
* parameter @keep_current.
*
* Be aware that in case gst_rtp_base_depayload_push_list() is used
* each buffer will see the same list of RTP header extensions.
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
@ -75,6 +107,13 @@ struct _GstRTPBaseDepayloadPrivate
/* array of GstRTPHeaderExtension's * */
GPtrArray *header_exts;
/* maintain buffer list for header extensions read() */
gboolean hdrext_aggregate;
GstBufferList *hdrext_buffers;
GstBuffer *hdrext_delayed;
GstBuffer *hdrext_outbuf;
gboolean hdrext_read_result;
};
/* Filter signals and args */
@ -139,6 +178,11 @@ static void gst_rtp_base_depayload_add_extension (GstRTPBaseDepayload *
static void gst_rtp_base_depayload_clear_extensions (GstRTPBaseDepayload *
rtpbasepayload);
static gboolean gst_rtp_base_depayload_operate_hdrext_buffer (GstBuffer **
buffer, guint idx, gpointer depayloader);
static void gst_rtp_base_depayload_reset_hdrext_buffers (GstRTPBaseDepayload *
rtpbasepayload);
GType
gst_rtp_base_depayload_get_type (void)
{
@ -404,11 +448,13 @@ gst_rtp_base_depayload_init (GstRTPBaseDepayload * filter,
priv->source_info = DEFAULT_SOURCE_INFO;
priv->max_reorder = DEFAULT_MAX_REORDER;
priv->auto_hdr_ext = DEFAULT_AUTO_HEADER_EXTENSION;
priv->hdrext_aggregate = FALSE;
gst_segment_init (&filter->segment, GST_FORMAT_UNDEFINED);
priv->header_exts =
g_ptr_array_new_with_free_func ((GDestroyNotify) gst_object_unref);
priv->hdrext_buffers = gst_buffer_list_new ();
}
static void
@ -417,7 +463,7 @@ gst_rtp_base_depayload_finalize (GObject * object)
GstRTPBaseDepayload *rtpbasedepayload = GST_RTP_BASE_DEPAYLOAD (object);
g_ptr_array_unref (rtpbasedepayload->priv->header_exts);
rtpbasedepayload->priv->header_exts = NULL;
gst_clear_buffer_list (&rtpbasedepayload->priv->hdrext_buffers);
G_OBJECT_CLASS (parent_class)->finalize (object);
}
@ -808,6 +854,22 @@ gst_rtp_base_depayload_handle_buffer (GstRTPBaseDepayload * filter,
priv->input_buffer = in;
if (discont) {
gst_rtp_base_depayload_reset_hdrext_buffers (filter);
g_assert_null (priv->hdrext_delayed);
}
/* update RTP buffer cache for header extensions */
if (priv->hdrext_aggregate) {
GstBuffer *b = gst_buffer_new ();
/* make a copy of the buffer that only contains the RTP header
with the extensions to not waste too much memory */
guint s = gst_rtp_buffer_get_header_len (&rtp);
gst_buffer_copy_into (b, in,
GST_BUFFER_COPY_MEMORY | GST_BUFFER_COPY_DEEP, 0, s);
gst_buffer_list_add (priv->hdrext_buffers, b);
}
if (process_rtp_packet_func != NULL) {
out_buf = process_rtp_packet_func (filter, &rtp);
gst_rtp_buffer_unmap (&rtp);
@ -820,10 +882,22 @@ gst_rtp_base_depayload_handle_buffer (GstRTPBaseDepayload * filter,
/* let's send it out to processing */
if (out_buf) {
if (priv->process_flow_ret == GST_FLOW_OK)
if (priv->process_flow_ret == GST_FLOW_OK) {
priv->process_flow_ret = gst_rtp_base_depayload_push (filter, out_buf);
else
} else {
gst_buffer_unref (out_buf);
gst_rtp_base_depayload_reset_hdrext_buffers (filter);
}
}
/* if the current buffer is delayed the depayloader should either
have called gst_rtp_base_depayload_push() internally or returned
a buffer that's pushed, either way the buffer cache should be
empty here and we append the delayed buffer */
if (priv->hdrext_delayed) {
g_assert_true (gst_buffer_list_length (priv->hdrext_buffers) == 0);
gst_buffer_list_add (priv->hdrext_buffers, priv->hdrext_delayed);
priv->hdrext_delayed = NULL;
}
gst_buffer_unref (in);
@ -1283,12 +1357,34 @@ out:
return needs_src_caps_update;
}
static gboolean
gst_rtp_base_depayload_operate_hdrext_buffer (GstBuffer ** buffer,
guint idx, gpointer depayloader)
{
GstRTPBaseDepayload *depayload = depayloader;
depayload->priv->hdrext_read_result |=
read_rtp_header_extensions (depayload, *buffer,
depayload->priv->hdrext_outbuf);
return TRUE;
}
static void
gst_rtp_base_depayload_reset_hdrext_buffers (GstRTPBaseDepayload * depayload)
{
GstRTPBaseDepayloadPrivate *priv = depayload->priv;
gst_buffer_list_unref (priv->hdrext_buffers);
priv->hdrext_buffers = gst_buffer_list_new ();
}
static gboolean
gst_rtp_base_depayload_set_headers (GstRTPBaseDepayload * depayload,
GstBuffer * buffer)
{
GstRTPBaseDepayloadPrivate *priv = depayload->priv;
GstClockTime pts, dts, duration;
gboolean ret = FALSE;
pts = GST_BUFFER_PTS (buffer);
dts = GST_BUFFER_DTS (buffer);
@ -1318,10 +1414,25 @@ gst_rtp_base_depayload_set_headers (GstRTPBaseDepayload * depayload,
if (priv->source_info)
add_rtp_source_meta (buffer, priv->input_buffer);
return read_rtp_header_extensions (depayload, priv->input_buffer, buffer);
if (priv->hdrext_aggregate) {
priv->hdrext_read_result = FALSE;
priv->hdrext_outbuf = buffer;
/* if we have an empty list but a delayed RTP buffer let's use it */
if (!gst_buffer_list_length (priv->hdrext_buffers) &&
priv->hdrext_delayed) {
gst_buffer_list_add (priv->hdrext_buffers, priv->hdrext_delayed);
priv->hdrext_delayed = NULL;
}
gst_buffer_list_foreach (priv->hdrext_buffers,
gst_rtp_base_depayload_operate_hdrext_buffer, depayload);
ret = priv->hdrext_read_result;
priv->hdrext_outbuf = NULL;
} else {
ret = read_rtp_header_extensions (depayload, priv->input_buffer, buffer);
}
}
return FALSE;
return ret;
}
static GstFlowReturn
@ -1447,6 +1558,8 @@ gst_rtp_base_depayload_do_push (GstRTPBaseDepayload * filter, gboolean is_list,
gst_clear_buffer (&buf);
}
gst_rtp_base_depayload_reset_hdrext_buffers (filter);
return res;
}
@ -1718,3 +1831,146 @@ gst_rtp_base_depayload_is_source_info_enabled (GstRTPBaseDepayload * depayload)
{
return depayload->priv->source_info;
}
/**
* gst_rtp_base_depayload_set_aggregate_hdrext_enabled:
* @depayload: a #GstRTPBaseDepayload
* @enable: whether to aggregate header extensions per output buffer
*
* Enable or disable aggregating header extensions.
*
* Since: 1.24
**/
void
gst_rtp_base_depayload_set_aggregate_hdrext_enabled (GstRTPBaseDepayload *
depayload, gboolean enable)
{
depayload->priv->hdrext_aggregate = enable;
if (!enable)
gst_rtp_base_depayload_reset_hdrext_buffers (depayload);
}
/**
* gst_rtp_base_depayload_is_aggregate_hdrext_enabled:
* @depayload: a #GstRTPBaseDepayload
*
* Queries whether header extensions will be aggregated per depayloaded buffers.
*
* Returns: %TRUE if aggregate-header-extension is enabled.
*
* Since: 1.24
**/
gboolean
gst_rtp_base_depayload_is_aggregate_hdrext_enabled (GstRTPBaseDepayload *
depayload)
{
return depayload->priv->hdrext_aggregate;
}
/**
* gst_rtp_base_depayload_dropped:
* @depayload: a #GstRTPBaseDepayload
*
* Called from @GstRTPBaseDepayload.process or
* @GstRTPBaseDepayload.process_rtp_packet if the depayloader does not
* use the current buffer for the output buffer. This will either drop
* the delayed buffer or the last buffer from the header extension
* cache.
*
* A typical use-case is when the depayloader implementation is
* dropping an input RTP buffer while waiting for the first keyframe.
*
* Must be called with the stream lock held.
*
* Since: 1.24
**/
void
gst_rtp_base_depayload_dropped (GstRTPBaseDepayload * depayload)
{
GstRTPBaseDepayloadPrivate *priv = depayload->priv;
guint l = gst_buffer_list_length (priv->hdrext_buffers);
if (priv->hdrext_delayed) {
gst_clear_buffer (&priv->hdrext_delayed);
} else if (l) {
gst_buffer_list_remove (priv->hdrext_buffers, l - 1, 1);
}
}
/**
* gst_rtp_base_depayload_delayed:
* @depayload: a #GstRTPBaseDepayload
*
* Called from @GstRTPBaseDepayload.process or
* @GstRTPBaseDepayload.process_rtp_packet when the depayloader needs
* to keep the current input RTP header for use with the next output
* buffer.
*
* The delayed buffer will remain until the end of processing the
* current output buffer and then enqueued for processing with the
* next output buffer.
*
* A typical use-case is when the depayloader implementation will
* start a new output buffer for the current input RTP buffer but push
* the current output buffer first.
*
* Must be called with the stream lock held.
*
* Since: 1.24
**/
void
gst_rtp_base_depayload_delayed (GstRTPBaseDepayload * depayload)
{
GstRTPBaseDepayloadPrivate *priv = depayload->priv;
guint l = gst_buffer_list_length (priv->hdrext_buffers);
if (l) {
priv->hdrext_delayed = gst_buffer_list_get (priv->hdrext_buffers, l - 1);
gst_buffer_ref (priv->hdrext_delayed);
gst_buffer_list_remove (priv->hdrext_buffers, l - 1, 1);
}
}
/**
* gst_rtp_base_depayload_flush:
* @depayload: a #GstRTPBaseDepayload
* @keep_current: if the current RTP buffer shall be kept
*
* If @GstRTPBaseDepayload.process or
* @GstRTPBaseDepayload.process_rtp_packet drop an output buffer this
* function tells the base class to flush header extension cache as
* well.
*
* This will not drop an input RTP header marked as delayed from
* gst_rtp_base_depayload_delayed().
*
* If @keep_current is %TRUE the current input RTP header will be kept
* and enqueued after flushing the previous input RTP headers.
*
* A typical use-case for @keep_current is when the depayloader
* implementation invalidates the current output buffer and starts a
* new one with the current RTP input buffer.
*
* Must be called with the stream lock held.
*
* Since: 1.24
**/
void
gst_rtp_base_depayload_flush (GstRTPBaseDepayload * depayload,
gboolean keep_current)
{
GstRTPBaseDepayloadPrivate *priv = depayload->priv;
guint l = gst_buffer_list_length (priv->hdrext_buffers);
/* if the current buffer shall not be kept or has already been
removed from the cache clear the cache */
if (!keep_current || priv->hdrext_delayed) {
gst_rtp_base_depayload_reset_hdrext_buffers (depayload);
} else if (l) {
/* clear all cached buffers (if any) except the delayed */
GstBuffer *b = gst_buffer_list_get (priv->hdrext_buffers, l - 1);
gst_buffer_ref (b);
gst_rtp_base_depayload_reset_hdrext_buffers (depayload);
gst_buffer_list_add (priv->hdrext_buffers, b);
}
}

View file

@ -127,6 +127,22 @@ GST_RTP_API
void gst_rtp_base_depayload_set_source_info_enabled (GstRTPBaseDepayload * depayload,
gboolean enable);
GST_RTP_API
void gst_rtp_base_depayload_dropped (GstRTPBaseDepayload * depayload);
GST_RTP_API
void gst_rtp_base_depayload_delayed (GstRTPBaseDepayload * depayload);
GST_RTP_API
void gst_rtp_base_depayload_flush (GstRTPBaseDepayload * depayload,
gboolean keep_current);
GST_RTP_API
gboolean gst_rtp_base_depayload_is_aggregate_hdrext_enabled (GstRTPBaseDepayload * depayload);
GST_RTP_API
void gst_rtp_base_depayload_set_aggregate_hdrext_enabled (GstRTPBaseDepayload * depayload,
gboolean enable);
G_DEFINE_AUTOPTR_CLEANUP_FUNC(GstRTPBaseDepayload, gst_object_unref)

View file

@ -48,8 +48,17 @@ typedef enum
GST_RTP_DUMMY_RETURN_TO_PUSH,
GST_RTP_DUMMY_USE_PUSH_FUNC,
GST_RTP_DUMMY_USE_PUSH_LIST_FUNC,
GST_RTP_DUMMY_USE_PUSH_AGGREGATE_FUNC,
} GstRtpDummyPushMethod;
typedef enum
{
GST_RTP_DUMMY_PUSH_AGGREGATE_DEFAULT,
GST_RTP_DUMMY_PUSH_AGGREGATE_DROP,
GST_RTP_DUMMY_PUSH_AGGREGATE_DELAYED,
GST_RTP_DUMMY_PUSH_AGGREGATE_FLUSH,
} GstRtpDummyPushAggregateMethod;
typedef struct _GstRtpDummyDepay GstRtpDummyDepay;
typedef struct _GstRtpDummyDepayClass GstRtpDummyDepayClass;
@ -60,6 +69,10 @@ struct _GstRtpDummyDepay
GstRtpDummyPushMethod push_method;
guint num_buffers_in_blist;
GstRtpDummyPushAggregateMethod aggregate_method;
guint num_buffers_to_aggregate;
guint num_buffers_aggregated;
};
struct _GstRtpDummyDepayClass
@ -112,6 +125,8 @@ gst_rtp_dummy_depay_init (GstRtpDummyDepay * depay)
{
depay->rtptime = 0;
depay->num_buffers_in_blist = 1;
depay->num_buffers_to_aggregate = 1;
depay->num_buffers_aggregated = 0;
}
static GstRtpDummyDepay *
@ -180,6 +195,34 @@ gst_rtp_dummy_depay_process (GstRTPBaseDepayload * depayload, GstBuffer * buf)
gst_rtp_base_depayload_push_list (depayload, blist);
break;
}
case GST_RTP_DUMMY_USE_PUSH_AGGREGATE_FUNC:
++self->num_buffers_aggregated;
if (self->num_buffers_aggregated != self->num_buffers_to_aggregate) {
switch (self->aggregate_method) {
case GST_RTP_DUMMY_PUSH_AGGREGATE_DROP:
gst_rtp_base_depayload_dropped (depayload);
break;
case GST_RTP_DUMMY_PUSH_AGGREGATE_DEFAULT:
case GST_RTP_DUMMY_PUSH_AGGREGATE_DELAYED:
case GST_RTP_DUMMY_PUSH_AGGREGATE_FLUSH:
break;
}
gst_clear_buffer (&outbuf);
} else {
switch (self->aggregate_method) {
case GST_RTP_DUMMY_PUSH_AGGREGATE_DELAYED:
gst_rtp_base_depayload_delayed (depayload);
break;
case GST_RTP_DUMMY_PUSH_AGGREGATE_FLUSH:
gst_rtp_base_depayload_flush (depayload, TRUE);
break;
case GST_RTP_DUMMY_PUSH_AGGREGATE_DROP:
case GST_RTP_DUMMY_PUSH_AGGREGATE_DEFAULT:
break;
}
self->num_buffers_aggregated = 0;
}
break;
case GST_RTP_DUMMY_RETURN_TO_PUSH:
break;
}
@ -1877,6 +1920,170 @@ GST_START_TEST (rtp_base_depayload_hdr_ext_caps_change)
GST_END_TEST;
static GstFlowReturn
hdr_ext_aggregate_chain_func (GstPad * pad, GstObject * parent,
GstBuffer * buffer)
{
GstFlowReturn res;
GstCaps *caps;
guint val;
GstPad *srcpad;
GstElement *depay;
static gboolean first = TRUE;
static guint expected_caps_val = 0;
res = gst_check_chain_func (pad, parent, buffer);
if (res != GST_FLOW_OK) {
return res;
}
caps = gst_pad_get_current_caps (pad);
fail_unless (gst_structure_get_uint (gst_caps_get_structure (caps, 0),
"dummy-hdrext-val", &val));
srcpad = gst_pad_get_peer (pad);
depay = gst_pad_get_parent_element (srcpad);
switch (GST_RTP_DUMMY_DEPAY (depay)->aggregate_method) {
case GST_RTP_DUMMY_PUSH_AGGREGATE_DEFAULT:
/* Every fifth buffer increments "dummy-hdrext-val", but we
aggregate 5 buffers per output buffer so we increment for every
output buffer. */
expected_caps_val++;
break;
case GST_RTP_DUMMY_PUSH_AGGREGATE_DROP:
/* We aggregate 5 buffers per output buffer but drop 4 of them
from the buffer cache. */
if (g_list_length (buffers) % 5 == 1) {
expected_caps_val++;
}
break;
case GST_RTP_DUMMY_PUSH_AGGREGATE_DELAYED:
/* We aggregate 6 buffers per output buffer but delay the 6th one
which will then account to the 2nd output buffer. Thus the 1st
output buffer will process 5 header extensions (val increments
by one) whereas the 2nd buffer will process 6 (val increments
by two)! */
if (first) {
first = FALSE;
expected_caps_val++;
} else {
expected_caps_val += 2;
}
break;
case GST_RTP_DUMMY_PUSH_AGGREGATE_FLUSH:
/* We aggregate 5 buffers per output buffer but flush 4 of them
from the hdr ext buffer cache. */
if (g_list_length (buffers) % 5 == 1) {
expected_caps_val++;
}
break;
}
gst_object_unref (depay);
gst_object_unref (srcpad);
fail_unless_equals_int (expected_caps_val, val);
gst_caps_unref (caps);
return res;
}
static void
hdr_ext_aggregate_test (gint n_buffers, gint n_aggregate,
GstRtpDummyPushAggregateMethod method)
{
GstRTPHeaderExtension *ext;
State *state;
guint i;
state = create_depayloader ("application/x-rtp", NULL);
gst_rtp_base_depayload_set_aggregate_hdrext_enabled (GST_RTP_BASE_DEPAYLOAD
(state->element), TRUE);
gst_pad_set_chain_function (state->sinkpad, hdr_ext_aggregate_chain_func);
ext = rtp_dummy_hdr_ext_new ();
gst_rtp_header_extension_set_id (ext, 1);
GST_RTP_DUMMY_DEPAY (state->element)->push_method =
GST_RTP_DUMMY_USE_PUSH_AGGREGATE_FUNC;
GST_RTP_DUMMY_DEPAY (state->element)->num_buffers_to_aggregate = n_aggregate;
GST_RTP_DUMMY_DEPAY (state->element)->aggregate_method = method;
g_signal_emit_by_name (state->element, "add-extension", ext);
set_state (state, GST_STATE_PLAYING);
for (i = 0; i < n_buffers; ++i) {
push_rtp_buffer (state, "pts", 0 * GST_SECOND,
"rtptime", G_GUINT64_CONSTANT (0x1234), "seq", 0x4242 + i, "hdrext-1",
ext, NULL);
}
set_state (state, GST_STATE_NULL);
validate_buffers_received (n_buffers / n_aggregate);
gst_object_unref (ext);
destroy_depayloader (state);
}
GST_START_TEST (rtp_base_depayload_hdr_ext_aggregate)
{
const gint num_buffers = 30;
const gint num_buffers_aggregate = 5; /* must match the modulo from
hdrext */
fail_unless_equals_int (num_buffers % num_buffers_aggregate, 0);
hdr_ext_aggregate_test (num_buffers, num_buffers_aggregate,
GST_RTP_DUMMY_PUSH_AGGREGATE_DEFAULT);
}
GST_END_TEST;
GST_START_TEST (rtp_base_depayload_hdr_ext_aggregate_drop)
{
const gint num_buffers = 30;
const gint num_buffers_aggregate = 5; /* must match the modulo from
hdrext */
fail_unless_equals_int (num_buffers % num_buffers_aggregate, 0);
hdr_ext_aggregate_test (num_buffers, num_buffers_aggregate,
GST_RTP_DUMMY_PUSH_AGGREGATE_DROP);
}
GST_END_TEST;
GST_START_TEST (rtp_base_depayload_hdr_ext_aggregate_delayed)
{
const gint num_buffers = 12; /* must be two times
num_buffers_aggregate */
const gint num_buffers_aggregate = 6; /* must match the modulo from
hdrext + 1 */
fail_unless_equals_int (num_buffers % num_buffers_aggregate, 0);
fail_unless_equals_int (num_buffers / num_buffers_aggregate, 2);
hdr_ext_aggregate_test (num_buffers, num_buffers_aggregate,
GST_RTP_DUMMY_PUSH_AGGREGATE_DELAYED);
}
GST_END_TEST;
GST_START_TEST (rtp_base_depayload_hdr_ext_aggregate_flush)
{
const gint num_buffers = 30;
const gint num_buffers_aggregate = 5; /* must match the modulo from
hdrext */
fail_unless_equals_int (num_buffers % num_buffers_aggregate, 0);
hdr_ext_aggregate_test (num_buffers, num_buffers_aggregate,
GST_RTP_DUMMY_PUSH_AGGREGATE_FLUSH);
}
GST_END_TEST;
static Suite *
rtp_basepayloading_suite (void)
{
@ -1921,7 +2128,10 @@ rtp_basepayloading_suite (void)
tcase_add_test (tc_chain, rtp_base_depayload_multiple_exts);
tcase_add_test (tc_chain, rtp_base_depayload_caps_request_ignored);
tcase_add_test (tc_chain, rtp_base_depayload_hdr_ext_caps_change);
tcase_add_test (tc_chain, rtp_base_depayload_hdr_ext_aggregate);
tcase_add_test (tc_chain, rtp_base_depayload_hdr_ext_aggregate_drop);
tcase_add_test (tc_chain, rtp_base_depayload_hdr_ext_aggregate_delayed);
tcase_add_test (tc_chain, rtp_base_depayload_hdr_ext_aggregate_flush);
return s;
}

View file

@ -199,6 +199,7 @@ gst_rtp_dummy_hdr_ext_read (GstRTPHeaderExtension * ext,
if (dummy->read_count % 5 == 1) {
/* Every fifth buffer triggers caps change. */
++dummy->caps_field_value;
gst_rtp_header_extension_set_wants_update_non_rtp_src_caps (ext, TRUE);
}
@ -236,7 +237,7 @@ gst_rtp_dummy_hdr_ext_update_non_rtp_src_caps (GstRTPHeaderExtension * ext,
GstRTPDummyHdrExt *dummy = GST_RTP_DUMMY_HDR_EXT (ext);
gst_caps_set_simple (caps, "dummy-hdrext-val", G_TYPE_UINT,
++dummy->caps_field_value, NULL);
dummy->caps_field_value, NULL);
return TRUE;
}