rtp: make rtp packet probation configurable (bug #682512)

This commit is contained in:
Aleix Conchillo Flaque 2012-08-22 16:36:21 -07:00 committed by Wim Taymans
parent 67f3d6ac16
commit 4a200b670f
7 changed files with 83 additions and 14 deletions

View file

@ -201,6 +201,7 @@ enum
#define DEFAULT_NUM_ACTIVE_SOURCES 0 #define DEFAULT_NUM_ACTIVE_SOURCES 0
#define DEFAULT_USE_PIPELINE_CLOCK FALSE #define DEFAULT_USE_PIPELINE_CLOCK FALSE
#define DEFAULT_RTCP_MIN_INTERVAL (RTP_STATS_MIN_INTERVAL * GST_SECOND) #define DEFAULT_RTCP_MIN_INTERVAL (RTP_STATS_MIN_INTERVAL * GST_SECOND)
#define DEFAULT_PROBATION RTP_DEFAULT_PROBATION
enum enum
{ {
@ -215,6 +216,7 @@ enum
PROP_INTERNAL_SESSION, PROP_INTERNAL_SESSION,
PROP_USE_PIPELINE_CLOCK, PROP_USE_PIPELINE_CLOCK,
PROP_RTCP_MIN_INTERVAL, PROP_RTCP_MIN_INTERVAL,
PROP_PROBATION,
PROP_LAST PROP_LAST
}; };
@ -572,6 +574,12 @@ gst_rtp_session_class_init (GstRtpSessionClass * klass)
0, G_MAXUINT64, DEFAULT_RTCP_MIN_INTERVAL, 0, G_MAXUINT64, DEFAULT_RTCP_MIN_INTERVAL,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_PROBATION,
g_param_spec_uint ("probation", "Number of probations",
"Consecutive packet sequence numbers to accept the source",
0, G_MAXUINT, DEFAULT_PROBATION,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
gstelement_class->change_state = gstelement_class->change_state =
GST_DEBUG_FUNCPTR (gst_rtp_session_change_state); GST_DEBUG_FUNCPTR (gst_rtp_session_change_state);
gstelement_class->request_new_pad = gstelement_class->request_new_pad =
@ -696,6 +704,9 @@ gst_rtp_session_set_property (GObject * object, guint prop_id,
g_object_set_property (G_OBJECT (priv->session), "rtcp-min-interval", g_object_set_property (G_OBJECT (priv->session), "rtcp-min-interval",
value); value);
break; break;
case PROP_PROBATION:
g_object_set_property (G_OBJECT (priv->session), "probation", value);
break;
default: default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break; break;
@ -747,6 +758,9 @@ gst_rtp_session_get_property (GObject * object, guint prop_id,
g_object_get_property (G_OBJECT (priv->session), "rtcp-min-interval", g_object_get_property (G_OBJECT (priv->session), "rtcp-min-interval",
value); value);
break; break;
case PROP_PROBATION:
g_object_get_property (G_OBJECT (priv->session), "probation", value);
break;
default: default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break; break;

View file

@ -66,6 +66,7 @@ enum
#define DEFAULT_RTCP_MIN_INTERVAL (RTP_STATS_MIN_INTERVAL * GST_SECOND) #define DEFAULT_RTCP_MIN_INTERVAL (RTP_STATS_MIN_INTERVAL * GST_SECOND)
#define DEFAULT_RTCP_FEEDBACK_RETENTION_WINDOW (2 * GST_SECOND) #define DEFAULT_RTCP_FEEDBACK_RETENTION_WINDOW (2 * GST_SECOND)
#define DEFAULT_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD (3) #define DEFAULT_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD (3)
#define DEFAULT_PROBATION RTP_DEFAULT_PROBATION
enum enum
{ {
@ -85,6 +86,7 @@ enum
PROP_RTCP_MIN_INTERVAL, PROP_RTCP_MIN_INTERVAL,
PROP_RTCP_FEEDBACK_RETENTION_WINDOW, PROP_RTCP_FEEDBACK_RETENTION_WINDOW,
PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD, PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD,
PROP_PROBATION,
PROP_LAST PROP_LAST
}; };
@ -439,6 +441,12 @@ rtp_session_class_init (RTPSessionClass * klass)
0, G_MAXUINT, DEFAULT_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD, 0, G_MAXUINT, DEFAULT_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_PROBATION,
g_param_spec_uint ("probation", "Number of probations",
"Consecutive packet sequence numbers to accept the source",
0, G_MAXUINT, DEFAULT_PROBATION,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
klass->get_source_by_ssrc = klass->get_source_by_ssrc =
GST_DEBUG_FUNCPTR (rtp_session_get_source_by_ssrc); GST_DEBUG_FUNCPTR (rtp_session_get_source_by_ssrc);
klass->on_sending_rtcp = GST_DEBUG_FUNCPTR (rtp_session_on_sending_rtcp); klass->on_sending_rtcp = GST_DEBUG_FUNCPTR (rtp_session_on_sending_rtcp);
@ -489,6 +497,8 @@ rtp_session_init (RTPSession * sess)
sess->header_len = 28; sess->header_len = 28;
sess->mtu = DEFAULT_RTCP_MTU; sess->mtu = DEFAULT_RTCP_MTU;
sess->probation = DEFAULT_PROBATION;
/* some default SDES entries */ /* some default SDES entries */
/* we do not want to leak details like the username or hostname here */ /* we do not want to leak details like the username or hostname here */
@ -616,6 +626,10 @@ rtp_session_set_property (GObject * object, guint prop_id,
case PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD: case PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD:
sess->rtcp_immediate_feedback_threshold = g_value_get_uint (value); sess->rtcp_immediate_feedback_threshold = g_value_get_uint (value);
break; break;
case PROP_PROBATION:
sess->probation = g_value_get_uint (value);
g_object_set_property (G_OBJECT (sess->source), "probation", value);
break;
default: default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break; break;
@ -673,6 +687,10 @@ rtp_session_get_property (GObject * object, guint prop_id,
case PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD: case PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD:
g_value_set_uint (value, sess->rtcp_immediate_feedback_threshold); g_value_set_uint (value, sess->rtcp_immediate_feedback_threshold);
break; break;
case PROP_PROBATION:
g_value_set_uint (value, sess->probation);
g_object_get_property (G_OBJECT (sess->source), "probation", value);
break;
default: default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break; break;
@ -1344,9 +1362,9 @@ obtain_source (RTPSession * sess, guint32 ssrc, gboolean * created,
* packets of an SSRC, on the other hand, is a strong indication that we * packets of an SSRC, on the other hand, is a strong indication that we
* are dealing with a valid source. */ * are dealing with a valid source. */
if (rtp) if (rtp)
source->probation = RTP_DEFAULT_PROBATION; g_object_set (source, "probation", sess->probation, NULL);
else else
source->probation = 0; g_object_set (source, "probation", 0, NULL);
/* store from address, if any */ /* store from address, if any */
if (arrival->address) { if (arrival->address) {

View file

@ -79,7 +79,7 @@ typedef GstFlowReturn (*RTPSessionSendRTP) (RTPSession *sess, RTPSource *src, gp
* *
* Returns: a #GstFlowReturn. * Returns: a #GstFlowReturn.
*/ */
typedef GstFlowReturn (*RTPSessionSendRTCP) (RTPSession *sess, RTPSource *src, GstBuffer *buffer, typedef GstFlowReturn (*RTPSessionSendRTCP) (RTPSession *sess, RTPSource *src, GstBuffer *buffer,
gboolean eos, gpointer user_data); gboolean eos, gpointer user_data);
/** /**
@ -113,7 +113,7 @@ typedef gint (*RTPSessionClockRate) (RTPSession *sess, guint8 payload, gpointer
* @sess: an #RTPSession * @sess: an #RTPSession
* @user_data: user data specified when registering * @user_data: user data specified when registering
* *
* This callback will be called when @sess needs to cancel the current timeout. * This callback will be called when @sess needs to cancel the current timeout.
* The currently running timeout should be canceled and a new reporting interval * The currently running timeout should be canceled and a new reporting interval
* should be requested from @sess. * should be requested from @sess.
*/ */
@ -189,6 +189,8 @@ struct _RTPSession {
guint header_len; guint header_len;
guint mtu; guint mtu;
guint probation;
/* bandwidths */ /* bandwidths */
gboolean recalc_bandwidth; gboolean recalc_bandwidth;
guint bandwidth; guint bandwidth;

View file

@ -39,6 +39,7 @@ enum
#define DEFAULT_IS_VALIDATED FALSE #define DEFAULT_IS_VALIDATED FALSE
#define DEFAULT_IS_SENDER FALSE #define DEFAULT_IS_SENDER FALSE
#define DEFAULT_SDES NULL #define DEFAULT_SDES NULL
#define DEFAULT_PROBATION RTP_DEFAULT_PROBATION
enum enum
{ {
@ -49,6 +50,7 @@ enum
PROP_IS_SENDER, PROP_IS_SENDER,
PROP_SDES, PROP_SDES,
PROP_STATS, PROP_STATS,
PROP_PROBATION,
PROP_LAST PROP_LAST
}; };
@ -199,6 +201,12 @@ rtp_source_class_init (RTPSourceClass * klass)
"The stats of this source", GST_TYPE_STRUCTURE, "The stats of this source", GST_TYPE_STRUCTURE,
G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_PROBATION,
g_param_spec_uint ("probation", "Number of probations",
"Consecutive packet sequence numbers to accept the source",
0, G_MAXUINT, DEFAULT_PROBATION,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
GST_DEBUG_CATEGORY_INIT (rtp_source_debug, "rtpsource", 0, "RTP Source"); GST_DEBUG_CATEGORY_INIT (rtp_source_debug, "rtpsource", 0, "RTP Source");
} }
@ -227,7 +235,8 @@ rtp_source_init (RTPSource * src)
* packets or a valid RTCP packet */ * packets or a valid RTCP packet */
src->validated = FALSE; src->validated = FALSE;
src->internal = FALSE; src->internal = FALSE;
src->probation = RTP_DEFAULT_PROBATION; src->probation = DEFAULT_PROBATION;
src->curr_probation = src->probation;
src->closing = FALSE; src->closing = FALSE;
src->sdes = gst_structure_new_empty ("application/x-rtp-source-sdes"); src->sdes = gst_structure_new_empty ("application/x-rtp-source-sdes");
@ -461,6 +470,9 @@ rtp_source_set_property (GObject * object, guint prop_id,
case PROP_SSRC: case PROP_SSRC:
src->ssrc = g_value_get_uint (value); src->ssrc = g_value_get_uint (value);
break; break;
case PROP_PROBATION:
src->probation = g_value_get_uint (value);
break;
default: default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break; break;
@ -494,6 +506,9 @@ rtp_source_get_property (GObject * object, guint prop_id,
case PROP_STATS: case PROP_STATS:
g_value_take_boxed (value, rtp_source_create_stats (src)); g_value_take_boxed (value, rtp_source_create_stats (src));
break; break;
case PROP_PROBATION:
g_value_set_uint (value, src->probation);
break;
default: default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break; break;
@ -1057,28 +1072,28 @@ rtp_source_process_rtp (RTPSource * src, GstBuffer * buffer,
/* first time we heard of this source */ /* first time we heard of this source */
init_seq (src, seqnr); init_seq (src, seqnr);
src->stats.max_seq = seqnr - 1; src->stats.max_seq = seqnr - 1;
src->probation = RTP_DEFAULT_PROBATION; src->curr_probation = src->probation;
} }
udelta = seqnr - stats->max_seq; udelta = seqnr - stats->max_seq;
/* if we are still on probation, check seqnum */ /* if we are still on probation, check seqnum */
if (src->probation) { if (src->curr_probation) {
expected = src->stats.max_seq + 1; expected = src->stats.max_seq + 1;
/* when in probation, we require consecutive seqnums */ /* when in probation, we require consecutive seqnums */
if (seqnr == expected) { if (seqnr == expected) {
/* expected packet */ /* expected packet */
GST_DEBUG ("probation: seqnr %d == expected %d", seqnr, expected); GST_DEBUG ("probation: seqnr %d == expected %d", seqnr, expected);
src->probation--; src->curr_probation--;
src->stats.max_seq = seqnr; src->stats.max_seq = seqnr;
if (src->probation == 0) { if (src->curr_probation == 0) {
GST_DEBUG ("probation done!"); GST_DEBUG ("probation done!");
init_seq (src, seqnr); init_seq (src, seqnr);
} else { } else {
GstBuffer *q; GstBuffer *q;
GST_DEBUG ("probation %d: queue buffer", src->probation); GST_DEBUG ("probation %d: queue buffer", src->curr_probation);
/* when still in probation, keep packets in a list. */ /* when still in probation, keep packets in a list. */
g_queue_push_tail (src->packets, buffer); g_queue_push_tail (src->packets, buffer);
/* remove packets from queue if there are too many */ /* remove packets from queue if there are too many */
@ -1113,7 +1128,7 @@ rtp_source_process_rtp (RTPSource * src, GstBuffer * buffer,
} }
} else { } else {
/* duplicate or reordered packet, will be filtered by jitterbuffer. */ /* duplicate or reordered packet, will be filtered by jitterbuffer. */
GST_WARNING ("duplicate or reordered packet"); GST_WARNING ("duplicate or reordered packet (seqnr %d)", seqnr);
} }
src->stats.octets_received += arrival->payload_len; src->stats.octets_received += arrival->payload_len;
@ -1149,7 +1164,7 @@ bad_sequence:
probation_seqnum: probation_seqnum:
{ {
GST_WARNING ("probation: seqnr %d != expected %d", seqnr, expected); GST_WARNING ("probation: seqnr %d != expected %d", seqnr, expected);
src->probation = RTP_DEFAULT_PROBATION; src->curr_probation = src->probation;
src->stats.max_seq = seqnr; src->stats.max_seq = seqnr;
gst_buffer_unref (buffer); gst_buffer_unref (buffer);
return GST_FLOW_OK; return GST_FLOW_OK;

View file

@ -72,7 +72,7 @@ typedef struct _RTPSourceClass RTPSourceClass;
* *
* Returns: a #GstFlowReturn. * Returns: a #GstFlowReturn.
*/ */
typedef GstFlowReturn (*RTPSourcePushRTP) (RTPSource *src, GstBuffer *buffer, typedef GstFlowReturn (*RTPSourcePushRTP) (RTPSource *src, GstBuffer *buffer,
gpointer user_data); gpointer user_data);
/** /**
@ -127,7 +127,8 @@ struct _RTPSource {
/*< private >*/ /*< private >*/
guint32 ssrc; guint32 ssrc;
gint probation; guint probation;
guint curr_probation;
gboolean validated; gboolean validated;
gboolean internal; gboolean internal;
gboolean is_csrc; gboolean is_csrc;

View file

@ -179,6 +179,7 @@ gst_rtsp_src_buffer_mode_get_type (void)
#define DEFAULT_BUFFER_MODE BUFFER_MODE_AUTO #define DEFAULT_BUFFER_MODE BUFFER_MODE_AUTO
#define DEFAULT_PORT_RANGE NULL #define DEFAULT_PORT_RANGE NULL
#define DEFAULT_SHORT_HEADER FALSE #define DEFAULT_SHORT_HEADER FALSE
#define DEFAULT_PROBATION 2
enum enum
{ {
@ -203,6 +204,7 @@ enum
PROP_PORT_RANGE, PROP_PORT_RANGE,
PROP_UDP_BUFFER_SIZE, PROP_UDP_BUFFER_SIZE,
PROP_SHORT_HEADER, PROP_SHORT_HEADER,
PROP_PROBATION,
PROP_LAST PROP_LAST
}; };
@ -482,6 +484,12 @@ gst_rtspsrc_class_init (GstRTSPSrcClass * klass)
"Only send the basic RTSP headers for broken encoders", "Only send the basic RTSP headers for broken encoders",
DEFAULT_SHORT_HEADER, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); DEFAULT_SHORT_HEADER, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_PROBATION,
g_param_spec_uint ("probation", "Number of probations",
"Consecutive packet sequence numbers to accept the source",
0, G_MAXUINT, DEFAULT_PROBATION,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
gstelement_class->send_event = gst_rtspsrc_send_event; gstelement_class->send_event = gst_rtspsrc_send_event;
gstelement_class->change_state = gst_rtspsrc_change_state; gstelement_class->change_state = gst_rtspsrc_change_state;
@ -525,6 +533,7 @@ gst_rtspsrc_init (GstRTSPSrc * src)
src->client_port_range.max = 0; src->client_port_range.max = 0;
src->udp_buffer_size = DEFAULT_UDP_BUFFER_SIZE; src->udp_buffer_size = DEFAULT_UDP_BUFFER_SIZE;
src->short_header = DEFAULT_SHORT_HEADER; src->short_header = DEFAULT_SHORT_HEADER;
src->probation = DEFAULT_PROBATION;
/* get a list of all extensions */ /* get a list of all extensions */
src->extensions = gst_rtsp_ext_list_get (); src->extensions = gst_rtsp_ext_list_get ();
@ -719,6 +728,9 @@ gst_rtspsrc_set_property (GObject * object, guint prop_id, const GValue * value,
case PROP_SHORT_HEADER: case PROP_SHORT_HEADER:
rtspsrc->short_header = g_value_get_boolean (value); rtspsrc->short_header = g_value_get_boolean (value);
break; break;
case PROP_PROBATION:
rtspsrc->probation = g_value_get_uint (value);
break;
default: default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break; break;
@ -820,6 +832,9 @@ gst_rtspsrc_get_property (GObject * object, guint prop_id, GValue * value,
case PROP_SHORT_HEADER: case PROP_SHORT_HEADER:
g_value_set_boolean (value, rtspsrc->short_header); g_value_set_boolean (value, rtspsrc->short_header);
break; break;
case PROP_PROBATION:
g_value_set_uint (value, rtspsrc->probation);
break;
default: default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break; break;
@ -2475,6 +2490,9 @@ gst_rtspsrc_stream_configure_manager (GstRTSPSrc * src, GstRTSPStream * stream,
g_object_set (rtpsession, "rtcp-rs-bandwidth", stream->rs_bandwidth, g_object_set (rtpsession, "rtcp-rs-bandwidth", stream->rs_bandwidth,
NULL); NULL);
} }
g_object_set (rtpsession, "probation", src->probation, NULL);
g_signal_connect (rtpsession, "on-bye-ssrc", (GCallback) on_bye_ssrc, g_signal_connect (rtpsession, "on-bye-ssrc", (GCallback) on_bye_ssrc,
stream); stream);
g_signal_connect (rtpsession, "on-bye-timeout", (GCallback) on_timeout, g_signal_connect (rtpsession, "on-bye-timeout", (GCallback) on_timeout,

View file

@ -217,6 +217,7 @@ struct _GstRTSPSrc {
GstRTSPRange client_port_range; GstRTSPRange client_port_range;
gint udp_buffer_size; gint udp_buffer_size;
gboolean short_header; gboolean short_header;
guint probation;
/* state */ /* state */
GstRTSPState state; GstRTSPState state;