diff --git a/gst/rtpmanager/gstrtpssrcdemux.c b/gst/rtpmanager/gstrtpssrcdemux.c index a70fbf33ef..3c3e9e6b5a 100644 --- a/gst/rtpmanager/gstrtpssrcdemux.c +++ b/gst/rtpmanager/gstrtpssrcdemux.c @@ -89,6 +89,13 @@ typedef enum RTCP_PAD } PadType; +#define DEFAULT_MAX_STREAMS G_MAXUINT +enum +{ + PROP_0, + PROP_MAX_STREAMS +}; + /* signals */ enum { @@ -273,6 +280,7 @@ find_or_create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc, gchar *padname; GstRtpSsrcDemuxPad *demuxpad; GstPad *retpad; + guint num_streams; INTERNAL_STREAM_LOCK (demux); @@ -281,6 +289,13 @@ find_or_create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc, INTERNAL_STREAM_UNLOCK (demux); return retpad; } + /* We create 2 src pads per ssrc (RTP & RTCP). Checking if we are allowed + to create 2 more pads */ + num_streams = (GST_ELEMENT_CAST (demux)->numsrcpads) >> 1; + if (num_streams >= demux->max_streams) { + INTERNAL_STREAM_UNLOCK (demux); + return NULL; + } GST_DEBUG_OBJECT (demux, "creating new pad for SSRC %08x", ssrc); @@ -347,6 +362,40 @@ find_or_create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc, return retpad; } +static void +gst_rtp_ssrc_demux_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec) +{ + GstRtpSsrcDemux *demux; + + demux = GST_RTP_SSRC_DEMUX (object); + switch (prop_id) { + case PROP_MAX_STREAMS: + demux->max_streams = g_value_get_uint (value); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_rtp_ssrc_demux_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec) +{ + GstRtpSsrcDemux *demux; + + demux = GST_RTP_SSRC_DEMUX (object); + switch (prop_id) { + case PROP_MAX_STREAMS: + g_value_set_uint (value, demux->max_streams); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + static void gst_rtp_ssrc_demux_class_init (GstRtpSsrcDemuxClass * klass) { @@ -360,6 +409,14 @@ gst_rtp_ssrc_demux_class_init (GstRtpSsrcDemuxClass * klass) gobject_klass->dispose = gst_rtp_ssrc_demux_dispose; gobject_klass->finalize = gst_rtp_ssrc_demux_finalize; + gobject_klass->set_property = gst_rtp_ssrc_demux_set_property; + gobject_klass->get_property = gst_rtp_ssrc_demux_get_property; + + g_object_class_install_property (gobject_klass, PROP_MAX_STREAMS, + g_param_spec_uint ("max-streams", "Max Streams", + "The maximum number of streams allowed", + 0, G_MAXUINT, DEFAULT_MAX_STREAMS, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); /** * GstRtpSsrcDemux::new-ssrc-pad: @@ -451,6 +508,8 @@ gst_rtp_ssrc_demux_init (GstRtpSsrcDemux * demux) gst_rtp_ssrc_demux_iterate_internal_links_sink); gst_element_add_pad (GST_ELEMENT_CAST (demux), demux->rtcp_sink); + demux->max_streams = DEFAULT_MAX_STREAMS; + g_rec_mutex_init (&demux->padlock); } @@ -644,10 +703,11 @@ invalid_payload: } create_failed: { - GST_ELEMENT_ERROR (demux, STREAM, DECODE, (NULL), - ("Could not create new pad")); gst_buffer_unref (buf); - return GST_FLOW_ERROR; + GST_WARNING_OBJECT (demux, + "Dropping buffer SSRC %08x. " + "Max streams number reached (%u)", ssrc, demux->max_streams); + return GST_FLOW_OK; } } @@ -738,10 +798,11 @@ unexpected_rtcp: } create_failed: { - GST_ELEMENT_ERROR (demux, STREAM, DECODE, (NULL), - ("Could not create new pad")); gst_buffer_unref (buf); - return GST_FLOW_ERROR; + GST_WARNING_OBJECT (demux, + "Dropping buffer SSRC %08x. " + "Max streams number reached (%u)", ssrc, demux->max_streams); + return GST_FLOW_OK; } } diff --git a/gst/rtpmanager/gstrtpssrcdemux.h b/gst/rtpmanager/gstrtpssrcdemux.h index 82df4449e8..3bb210c8e1 100644 --- a/gst/rtpmanager/gstrtpssrcdemux.h +++ b/gst/rtpmanager/gstrtpssrcdemux.h @@ -41,6 +41,7 @@ struct _GstRtpSsrcDemux GRecMutex padlock; GSList *srcpads; + guint max_streams; }; struct _GstRtpSsrcDemuxClass diff --git a/tests/check/elements/rtpssrcdemux.c b/tests/check/elements/rtpssrcdemux.c index e5e1e26167..dfb8e11c3a 100644 --- a/tests/check/elements/rtpssrcdemux.c +++ b/tests/check/elements/rtpssrcdemux.c @@ -2,6 +2,8 @@ * * Copyright (C) 2018 Collabora Ltd. * Author: Nicolas Dufresne + * Copyright (C) 2019 Pexip + * Author: Havard Graff * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public @@ -241,6 +243,40 @@ GST_START_TEST (test_oob_event_locking) GST_END_TEST; + +static void +new_ssrc_pad_found (GstElement * element, G_GNUC_UNUSED guint ssrc, + GstPad * pad, GSList ** src_h) +{ + GstHarness *h = gst_harness_new_with_element (element, NULL, NULL); + gst_harness_add_element_src_pad (h, pad); + *src_h = g_slist_prepend (*src_h, h); +} + +GST_START_TEST (test_rtpssrcdemux_max_streams) +{ + GstHarness *h = gst_harness_new_with_padnames ("rtpssrcdemux", "sink", NULL); + GSList *src_h = NULL; + gint i; + + g_object_set (h->element, "max-streams", 64, NULL); + gst_harness_set_src_caps_str (h, "application/x-rtp"); + g_signal_connect (h->element, + "new-ssrc-pad", (GCallback) new_ssrc_pad_found, &src_h); + gst_harness_play (h); + + for (i = 0; i < 128; ++i) { + fail_unless_equals_int (GST_FLOW_OK, + gst_harness_push (h, create_buffer (0, i))); + } + + fail_unless_equals_int (g_slist_length (src_h), 64); + g_slist_free_full (src_h, (GDestroyNotify) gst_harness_teardown); + gst_harness_teardown (h); +} + +GST_END_TEST; + static Suite * rtpssrcdemux_suite (void) { @@ -250,6 +286,7 @@ rtpssrcdemux_suite (void) suite_add_tcase (s, tc_chain); tcase_add_test (tc_chain, test_event_forwarding); tcase_add_test (tc_chain, test_oob_event_locking); + tcase_add_test (tc_chain, test_rtpssrcdemux_max_streams); return s; }