rtpsource: Don't do probation for RTX sources

Disable probation for RTX sources as packets will arrive very
irregularly and waiting for a second packet usually exceeds the deadline
of the retransmission.

Fixes https://gitlab.freedesktop.org/gstreamer/gst-plugins-good/-/issues/181

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/3112>
This commit is contained in:
Sebastian Dröge 2022-10-03 20:28:47 +03:00 committed by GStreamer Marge Bot
parent 5568cb33f7
commit bd5a4d321b
5 changed files with 76 additions and 84 deletions

View file

@ -298,7 +298,7 @@ static GstFlowReturn gst_rtp_session_send_rtcp (RTPSession * sess,
RTPSource * src, GstBuffer * buffer, gboolean eos, gpointer user_data); RTPSource * src, GstBuffer * buffer, gboolean eos, gpointer user_data);
static GstFlowReturn gst_rtp_session_sync_rtcp (RTPSession * sess, static GstFlowReturn gst_rtp_session_sync_rtcp (RTPSession * sess,
GstBuffer * buffer, gpointer user_data); GstBuffer * buffer, gpointer user_data);
static gint gst_rtp_session_clock_rate (RTPSession * sess, guint8 payload, static GstCaps *gst_rtp_session_caps (RTPSession * sess, guint8 payload,
gpointer user_data); gpointer user_data);
static void gst_rtp_session_reconsider (RTPSession * sess, gpointer user_data); static void gst_rtp_session_reconsider (RTPSession * sess, gpointer user_data);
static void gst_rtp_session_request_key_unit (RTPSession * sess, guint32 ssrc, static void gst_rtp_session_request_key_unit (RTPSession * sess, guint32 ssrc,
@ -328,7 +328,7 @@ static RTPSessionCallbacks callbacks = {
gst_rtp_session_send_rtp, gst_rtp_session_send_rtp,
gst_rtp_session_sync_rtcp, gst_rtp_session_sync_rtcp,
gst_rtp_session_send_rtcp, gst_rtp_session_send_rtcp,
gst_rtp_session_clock_rate, gst_rtp_session_caps,
gst_rtp_session_reconsider, gst_rtp_session_reconsider,
gst_rtp_session_request_key_unit, gst_rtp_session_request_key_unit,
gst_rtp_session_request_time, gst_rtp_session_request_time,
@ -1678,41 +1678,12 @@ no_caps:
} }
/* called when the session manager needs the clock rate */ /* called when the session manager needs the clock rate */
static gint static GstCaps *
gst_rtp_session_clock_rate (RTPSession * sess, guint8 payload, gst_rtp_session_caps (RTPSession * sess, guint8 payload, gpointer user_data)
gpointer user_data)
{ {
gint result = -1; GstRtpSession *rtpsession = GST_RTP_SESSION_CAST (user_data);
GstRtpSession *rtpsession;
GstCaps *caps;
const GstStructure *s;
rtpsession = GST_RTP_SESSION_CAST (user_data); return gst_rtp_session_get_caps_for_pt (rtpsession, payload);
caps = gst_rtp_session_get_caps_for_pt (rtpsession, payload);
if (!caps)
goto done;
s = gst_caps_get_structure (caps, 0);
if (!gst_structure_get_int (s, "clock-rate", &result))
goto no_clock_rate;
gst_caps_unref (caps);
GST_DEBUG_OBJECT (rtpsession, "parsed clock-rate %d", result);
done:
return result;
/* ERRORS */
no_clock_rate:
{
gst_caps_unref (caps);
GST_DEBUG_OBJECT (rtpsession, "No clock-rate in caps!");
goto done;
}
} }
/* called when the session manager asks us to reconsider the timeout */ /* called when the session manager asks us to reconsider the timeout */

View file

@ -1224,9 +1224,9 @@ rtp_session_set_callbacks (RTPSession * sess, RTPSessionCallbacks * callbacks,
sess->callbacks.sync_rtcp = callbacks->sync_rtcp; sess->callbacks.sync_rtcp = callbacks->sync_rtcp;
sess->sync_rtcp_user_data = user_data; sess->sync_rtcp_user_data = user_data;
} }
if (callbacks->clock_rate) { if (callbacks->caps) {
sess->callbacks.clock_rate = callbacks->clock_rate; sess->callbacks.caps = callbacks->caps;
sess->clock_rate_user_data = user_data; sess->caps_user_data = user_data;
} }
if (callbacks->reconsider) { if (callbacks->reconsider) {
sess->callbacks.reconsider = callbacks->reconsider; sess->callbacks.reconsider = callbacks->reconsider;
@ -1331,7 +1331,7 @@ rtp_session_set_sync_rtcp_callback (RTPSession * sess,
} }
/** /**
* rtp_session_set_clock_rate_callback: * rtp_session_set_caps_callback:
* @sess: an #RTPSession * @sess: an #RTPSession
* @callback: callback to set * @callback: callback to set
* @user_data: user data passed in the callback * @user_data: user data passed in the callback
@ -1339,13 +1339,13 @@ rtp_session_set_sync_rtcp_callback (RTPSession * sess,
* Configure only the clock_rate callback to be notified of the clock_rate action. * Configure only the clock_rate callback to be notified of the clock_rate action.
*/ */
void void
rtp_session_set_clock_rate_callback (RTPSession * sess, rtp_session_set_caps_callback (RTPSession * sess,
RTPSessionClockRate callback, gpointer user_data) RTPSessionCaps callback, gpointer user_data)
{ {
g_return_if_fail (RTP_IS_SESSION (sess)); g_return_if_fail (RTP_IS_SESSION (sess));
sess->callbacks.clock_rate = callback; sess->callbacks.caps = callback;
sess->clock_rate_user_data = user_data; sess->caps_user_data = user_data;
} }
/** /**
@ -1549,30 +1549,26 @@ source_push_rtp (RTPSource * source, gpointer data, RTPSession * session)
return result; return result;
} }
static gint static GstCaps *
source_clock_rate (RTPSource * source, guint8 pt, RTPSession * session) source_caps (RTPSource * source, guint8 pt, RTPSession * session)
{ {
gint result; GstCaps *result = NULL;
RTP_SESSION_UNLOCK (session); RTP_SESSION_UNLOCK (session);
if (session->callbacks.clock_rate) if (session->callbacks.caps)
result = result = session->callbacks.caps (session, pt, session->caps_user_data);
session->callbacks.clock_rate (session, pt,
session->clock_rate_user_data);
else
result = -1;
RTP_SESSION_LOCK (session); RTP_SESSION_LOCK (session);
GST_DEBUG ("got clock-rate %d for pt %d", result, pt); GST_DEBUG ("got caps %" GST_PTR_FORMAT " for pt %d", result, pt);
return result; return result;
} }
static RTPSourceCallbacks callbacks = { static RTPSourceCallbacks callbacks = {
(RTPSourcePushRTP) source_push_rtp, (RTPSourcePushRTP) source_push_rtp,
(RTPSourceClockRate) source_clock_rate, (RTPSourceCaps) source_caps,
}; };
@ -1924,7 +1920,8 @@ obtain_internal_source (RTPSession * sess, guint32 ssrc, gboolean * created,
source->validated = TRUE; source->validated = TRUE;
source->internal = TRUE; source->internal = TRUE;
source->probation = FALSE; source->probation = 0;
source->curr_probation = 0;
rtp_source_set_sdes_struct (source, gst_structure_copy (sess->sdes)); rtp_source_set_sdes_struct (source, gst_structure_copy (sess->sdes));
rtp_source_set_callbacks (source, &callbacks, sess); rtp_source_set_callbacks (source, &callbacks, sess);

View file

@ -97,16 +97,16 @@ typedef GstFlowReturn (*RTPSessionSendRTCP) (RTPSession *sess, RTPSource *src, G
typedef GstFlowReturn (*RTPSessionSyncRTCP) (RTPSession *sess, GstBuffer *buffer, gpointer user_data); typedef GstFlowReturn (*RTPSessionSyncRTCP) (RTPSession *sess, GstBuffer *buffer, gpointer user_data);
/** /**
* RTPSessionClockRate: * RTPSessionCaps:
* @sess: an #RTPSession * @sess: an #RTPSession
* @payload: the payload * @payload: the payload
* @user_data: user data specified when registering * @user_data: user data specified when registering
* *
* This callback will be called when @sess needs the clock-rate of @payload. * This callback will be called when @sess needs the caps of @payload.
* *
* Returns: the clock-rate of @pt. * Returns: the caps of @pt.
*/ */
typedef gint (*RTPSessionClockRate) (RTPSession *sess, guint8 payload, gpointer user_data); typedef GstCaps * (*RTPSessionCaps) (RTPSession *sess, guint8 payload, gpointer user_data);
/** /**
* RTPSessionReconsider: * RTPSessionReconsider:
@ -209,7 +209,7 @@ typedef struct {
RTPSessionSendRTP send_rtp; RTPSessionSendRTP send_rtp;
RTPSessionSyncRTCP sync_rtcp; RTPSessionSyncRTCP sync_rtcp;
RTPSessionSendRTCP send_rtcp; RTPSessionSendRTCP send_rtcp;
RTPSessionClockRate clock_rate; RTPSessionCaps caps;
RTPSessionReconsider reconsider; RTPSessionReconsider reconsider;
RTPSessionRequestKeyUnit request_key_unit; RTPSessionRequestKeyUnit request_key_unit;
RTPSessionRequestTime request_time; RTPSessionRequestTime request_time;
@ -288,7 +288,7 @@ struct _RTPSession {
gpointer send_rtp_user_data; gpointer send_rtp_user_data;
gpointer send_rtcp_user_data; gpointer send_rtcp_user_data;
gpointer sync_rtcp_user_data; gpointer sync_rtcp_user_data;
gpointer clock_rate_user_data; gpointer caps_user_data;
gpointer reconsider_user_data; gpointer reconsider_user_data;
gpointer request_key_unit_user_data; gpointer request_key_unit_user_data;
gpointer request_time_user_data; gpointer request_time_user_data;
@ -375,8 +375,8 @@ void rtp_session_set_send_rtcp_callback (RTPSession * sess,
void rtp_session_set_sync_rtcp_callback (RTPSession * sess, void rtp_session_set_sync_rtcp_callback (RTPSession * sess,
RTPSessionSyncRTCP callback, RTPSessionSyncRTCP callback,
gpointer user_data); gpointer user_data);
void rtp_session_set_clock_rate_callback (RTPSession * sess, void rtp_session_set_caps_callback (RTPSession * sess,
RTPSessionClockRate callback, RTPSessionCaps callback,
gpointer user_data); gpointer user_data);
void rtp_session_set_reconsider_callback (RTPSession * sess, void rtp_session_set_reconsider_callback (RTPSession * sess,
RTPSessionReconsider callback, RTPSessionReconsider callback,

View file

@ -345,7 +345,7 @@ rtp_source_finalize (GObject * object)
g_free (src->bye_reason); g_free (src->bye_reason);
gst_caps_replace (&src->send_caps, NULL); gst_caps_replace (&src->caps, NULL);
g_list_free_full (src->conflicting_addresses, g_list_free_full (src->conflicting_addresses,
(GDestroyNotify) rtp_conflicting_address_free); (GDestroyNotify) rtp_conflicting_address_free);
@ -649,7 +649,7 @@ rtp_source_set_callbacks (RTPSource * src, RTPSourceCallbacks * cb,
g_return_if_fail (RTP_IS_SOURCE (src)); g_return_if_fail (RTP_IS_SOURCE (src));
src->callbacks.push_rtp = cb->push_rtp; src->callbacks.push_rtp = cb->push_rtp;
src->callbacks.clock_rate = cb->clock_rate; src->callbacks.caps = cb->caps;
src->user_data = user_data; src->user_data = user_data;
} }
@ -829,7 +829,7 @@ rtp_source_update_send_caps (RTPSource * src, GstCaps * caps)
gboolean rtx; gboolean rtx;
/* nothing changed, return */ /* nothing changed, return */
if (caps == NULL || src->send_caps == caps) if (caps == NULL || src->caps == caps)
return; return;
s = gst_caps_get_structure (caps, 0); s = gst_caps_get_structure (caps, 0);
@ -869,7 +869,7 @@ rtp_source_update_send_caps (RTPSource * src, GstCaps * caps)
GST_DEBUG ("got %sseqnum-offset %" G_GINT32_FORMAT, rtx ? "rtx " : "", GST_DEBUG ("got %sseqnum-offset %" G_GINT32_FORMAT, rtx ? "rtx " : "",
src->seqnum_offset); src->seqnum_offset);
gst_caps_replace (&src->send_caps, caps); gst_caps_replace (&src->caps, caps);
if (rtx) { if (rtx) {
src->media_ssrc = ssrc; src->media_ssrc = ssrc;
@ -940,7 +940,7 @@ push_packet (RTPSource * src, GstBuffer * buffer)
} }
static void static void
fetch_clock_rate_from_payload (RTPSource * src, guint8 payload) fetch_caps_for_payload (RTPSource * src, guint8 payload)
{ {
if (src->payload == -1) { if (src->payload == -1) {
/* first payload received, nothing was in the caps, lock on to this payload */ /* first payload received, nothing was in the caps, lock on to this payload */
@ -954,16 +954,40 @@ fetch_clock_rate_from_payload (RTPSource * src, guint8 payload)
src->stats.transit = -1; src->stats.transit = -1;
} }
if (src->clock_rate == -1) { if (src->clock_rate == -1 || !src->caps) {
GstCaps *caps = NULL;
if (src->callbacks.caps) {
caps = src->callbacks.caps (src, payload, src->user_data);
}
GST_DEBUG ("got caps %" GST_PTR_FORMAT, caps);
if (caps) {
const GstStructure *s;
gint clock_rate = -1; gint clock_rate = -1;
const gchar *encoding_name;
if (src->callbacks.clock_rate) s = gst_caps_get_structure (caps, 0);
clock_rate = src->callbacks.clock_rate (src, payload, src->user_data);
GST_DEBUG ("got clock-rate %d", clock_rate);
if (gst_structure_get_int (s, "clock-rate", &clock_rate)) {
src->clock_rate = clock_rate; src->clock_rate = clock_rate;
gst_rtp_packet_rate_ctx_reset (&src->packet_rate_ctx, clock_rate); gst_rtp_packet_rate_ctx_reset (&src->packet_rate_ctx, clock_rate);
} else {
GST_DEBUG ("No clock-rate in caps!");
}
encoding_name = gst_structure_get_string (s, "encoding-name");
/* Disable probation for RTX sources as packets will arrive very
* irregularly and waiting for a second packet usually exceeds the
* deadline of the retransmission */
if (g_strcmp0 (encoding_name, "rtx") == 0) {
src->probation = src->curr_probation = 0;
}
}
gst_caps_replace (&src->caps, caps);
gst_clear_caps (&caps);
} }
} }
@ -1280,7 +1304,7 @@ rtp_source_process_rtp (RTPSource * src, RTPPacketInfo * pinfo)
g_return_val_if_fail (RTP_IS_SOURCE (src), GST_FLOW_ERROR); g_return_val_if_fail (RTP_IS_SOURCE (src), GST_FLOW_ERROR);
g_return_val_if_fail (pinfo != NULL, GST_FLOW_ERROR); g_return_val_if_fail (pinfo != NULL, GST_FLOW_ERROR);
fetch_clock_rate_from_payload (src, pinfo->pt); fetch_caps_for_payload (src, pinfo->pt);
if (!update_receiver_stats (src, pinfo, TRUE)) if (!update_receiver_stats (src, pinfo, TRUE))
return GST_FLOW_OK; return GST_FLOW_OK;
@ -1572,7 +1596,7 @@ rtp_source_get_new_sr (RTPSource * src, guint64 ntpnstime,
if (src->clock_rate == -1 && src->pt_set) { if (src->clock_rate == -1 && src->pt_set) {
GST_INFO ("no clock-rate, getting for pt %u and SSRC %u", src->pt, GST_INFO ("no clock-rate, getting for pt %u and SSRC %u", src->pt,
src->ssrc); src->ssrc);
fetch_clock_rate_from_payload (src, src->pt); fetch_caps_for_payload (src, src->pt);
} }
if (src->clock_rate != -1) { if (src->clock_rate != -1) {

View file

@ -86,29 +86,29 @@ typedef GstFlowReturn (*RTPSourcePushRTP) (RTPSource *src, gpointer data,
gpointer user_data); gpointer user_data);
/** /**
* RTPSourceClockRate: * RTPSourceCaps:
* @src: an #RTPSource * @src: an #RTPSource
* @payload: a payload type * @payload: a payload type
* @user_data: user data specified when registering * @user_data: user data specified when registering
* *
* This callback will be called when @src needs the clock-rate of the * This callback will be called when @src needs the caps of the
* @payload. * @payload.
* *
* Returns: a clock-rate for @payload. * Returns: a caps for @payload.
*/ */
typedef gint (*RTPSourceClockRate) (RTPSource *src, guint8 payload, gpointer user_data); typedef GstCaps * (*RTPSourceCaps) (RTPSource *src, guint8 payload, gpointer user_data);
/** /**
* RTPSourceCallbacks: * RTPSourceCallbacks:
* @push_rtp: a packet becomes available for handling * @push_rtp: a packet becomes available for handling
* @clock_rate: a clock-rate is requested * @caps: a caps is requested
* @get_time: the current clock time is requested * @get_time: the current clock time is requested
* *
* Callbacks performed by #RTPSource when actions need to be performed. * Callbacks performed by #RTPSource when actions need to be performed.
*/ */
typedef struct { typedef struct {
RTPSourcePushRTP push_rtp; RTPSourcePushRTP push_rtp;
RTPSourceClockRate clock_rate; RTPSourceCaps caps;
} RTPSourceCallbacks; } RTPSourceCallbacks;
/** /**
@ -161,7 +161,7 @@ struct _RTPSource {
GSocketAddress *rtcp_from; GSocketAddress *rtcp_from;
gint payload; gint payload;
GstCaps *send_caps; GstCaps *caps;
gint clock_rate; gint clock_rate;
gint32 seqnum_offset; gint32 seqnum_offset;