From 85e26f65468b6407ef753220c70695ef87700045 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Fri, 5 Sep 2008 13:52:34 +0000 Subject: [PATCH] gst/rtpmanager/gstrtpbin.*: Add signal to notify listeners when a sender becomes a receiver. Original commit message from CVS: * gst/rtpmanager/gstrtpbin.c: (on_sender_timeout), (create_session), (gst_rtp_bin_associate), (gst_rtp_bin_sync_chain), (gst_rtp_bin_class_init), (gst_rtp_bin_request_new_pad): * gst/rtpmanager/gstrtpbin.h: Add signal to notify listeners when a sender becomes a receiver. Tweak lip-sync code, don't store our own copy of the ts-offset of the jitterbuffer, don't adjust sync if the change is less than 4msec. Get the RTP timestamp <-> GStreamer timestamp relation directly from the jitterbuffer instead of our inaccurate version from the source. * gst/rtpmanager/gstrtpjitterbuffer.c: (gst_rtp_jitter_buffer_chain), (gst_rtp_jitter_buffer_loop), (gst_rtp_jitter_buffer_get_sync): * gst/rtpmanager/gstrtpjitterbuffer.h: Add G_LIKELY macros, use global defines for max packet reorder and dropouts. Reset the jitterbuffer clock skew detection when packets seqnums are changed unexpectedly. * gst/rtpmanager/gstrtpsession.c: (on_sender_timeout), (gst_rtp_session_class_init), (gst_rtp_session_init): * gst/rtpmanager/gstrtpsession.h: Add sender timeout signal. * gst/rtpmanager/rtpjitterbuffer.c: (rtp_jitter_buffer_reset_skew), (calculate_skew), (rtp_jitter_buffer_insert), (rtp_jitter_buffer_get_sync): * gst/rtpmanager/rtpjitterbuffer.h: Add some G_LIKELY macros. Keep track of the extended RTP timestamp so that we can report the RTP timestamp <-> GStreamer timestamp relation for lip-sync. Remove server timestamp gap detection code, the server can sometimes make a huge gap in timestamps (talk spurts,...) see #549774. Detect timetamp weirdness instead by observing the sender/receiver timestamp relation and resync if it changes more than 1 second. Add method to report about the current rtp <-> gst timestamp relation which is needed for lip-sync. * gst/rtpmanager/rtpsession.c: (rtp_session_class_init), (on_sender_timeout), (check_collision), (rtp_session_process_sr), (session_cleanup): * gst/rtpmanager/rtpsession.h: Add sender timeout signal. Remove inaccurate rtp <-> gst timestamp relation code, the jitterbuffer can now do an accurate reporting about this. * gst/rtpmanager/rtpsource.c: (rtp_source_init), (rtp_source_update_caps), (calculate_jitter), (rtp_source_process_rtp): * gst/rtpmanager/rtpsource.h: Remove inaccurate rtp <-> gst timestamp relation code. * gst/rtpmanager/rtpstats.h: Define global max-reorder and max-dropout constants for use in various subsystems. --- gst/rtpmanager/gstrtpbin.c | 60 +++++++++++++++------ gst/rtpmanager/gstrtpbin.h | 1 + gst/rtpmanager/gstrtpjitterbuffer.c | 80 ++++++++++++++++++---------- gst/rtpmanager/gstrtpjitterbuffer.h | 3 ++ gst/rtpmanager/gstrtpsession.c | 23 ++++++++ gst/rtpmanager/gstrtpsession.h | 1 + gst/rtpmanager/rtpjitterbuffer.c | 82 +++++++++++++++++------------ gst/rtpmanager/rtpjitterbuffer.h | 6 ++- gst/rtpmanager/rtpsession.c | 36 ++++++++++--- gst/rtpmanager/rtpsession.h | 1 + gst/rtpmanager/rtpsource.c | 15 ------ gst/rtpmanager/rtpsource.h | 4 -- gst/rtpmanager/rtpstats.h | 13 ++++- 13 files changed, 218 insertions(+), 107 deletions(-) diff --git a/gst/rtpmanager/gstrtpbin.c b/gst/rtpmanager/gstrtpbin.c index 46ef4bb912..7f402c36b4 100644 --- a/gst/rtpmanager/gstrtpbin.c +++ b/gst/rtpmanager/gstrtpbin.c @@ -120,6 +120,7 @@ #include "gstrtpbin-marshal.h" #include "gstrtpbin.h" #include "gstrtpsession.h" +#include "gstrtpjitterbuffer.h" GST_DEBUG_CATEGORY_STATIC (gst_rtp_bin_debug); #define GST_CAT_DEFAULT gst_rtp_bin_debug @@ -236,6 +237,7 @@ enum SIGNAL_ON_BYE_SSRC, SIGNAL_ON_BYE_TIMEOUT, SIGNAL_ON_TIMEOUT, + SIGNAL_ON_SENDER_TIMEOUT, LAST_SIGNAL }; @@ -323,7 +325,6 @@ struct _GstRtpBinStream guint64 clock_base_time; gint clock_rate; gint64 ts_offset; - gint64 prev_ts_offset; gint last_pt; }; @@ -455,6 +456,13 @@ on_timeout (GstElement * session, guint32 ssrc, GstRtpBinSession * sess) sess->id, ssrc); } +static void +on_sender_timeout (GstElement * session, guint32 ssrc, GstRtpBinSession * sess) +{ + g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SENDER_TIMEOUT], 0, + sess->id, ssrc); +} + /* create a session with the given id. Must be called with RTP_BIN_LOCK */ static GstRtpBinSession * create_session (GstRtpBin * rtpbin, gint id) @@ -507,6 +515,8 @@ create_session (GstRtpBin * rtpbin, gint id) g_signal_connect (sess->session, "on-bye-timeout", (GCallback) on_bye_timeout, sess); g_signal_connect (sess->session, "on-timeout", (GCallback) on_timeout, sess); + g_signal_connect (sess->session, "on-sender-timeout", + (GCallback) on_sender_timeout, sess); /* FIXME, change state only to what's needed */ gst_bin_add (GST_BIN_CAST (rtpbin), session); @@ -863,32 +873,31 @@ gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len, /* calculate offsets for each stream */ for (walk = client->streams; walk; walk = g_slist_next (walk)) { GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data; - - if (ostream->unix_delta == 0) - continue; + gint64 prev_ts_offset; ostream->ts_offset = ostream->unix_delta - min; + g_object_get (ostream->buffer, "ts-offset", &prev_ts_offset, NULL); + /* delta changed, see how much */ - if (ostream->prev_ts_offset != ostream->ts_offset) { + if (prev_ts_offset != ostream->ts_offset) { gint64 diff; - if (ostream->prev_ts_offset > ostream->ts_offset) - diff = ostream->prev_ts_offset - ostream->ts_offset; + if (prev_ts_offset > ostream->ts_offset) + diff = prev_ts_offset - ostream->ts_offset; else - diff = ostream->ts_offset - ostream->prev_ts_offset; + diff = ostream->ts_offset - prev_ts_offset; GST_DEBUG_OBJECT (bin, "ts-offset %" G_GUINT64_FORMAT ", prev %" G_GUINT64_FORMAT - ", diff: %" G_GINT64_FORMAT, ostream->ts_offset, - ostream->prev_ts_offset, diff); + ", diff: %" G_GINT64_FORMAT, ostream->ts_offset, prev_ts_offset, + diff); - /* only change diff when it changed more than 1 millisecond. This + /* only change diff when it changed more than 4 milliseconds. This * compensates for rounding errors in NTP to RTP timestamp * conversions */ - if (diff > GST_MSECOND && diff < (3 * GST_SECOND)) { + if (diff > 4 * GST_MSECOND && diff < (3 * GST_SECOND)) { g_object_set (ostream->buffer, "ts-offset", ostream->ts_offset, NULL); - ostream->prev_ts_offset = ostream->ts_offset; } } GST_DEBUG_OBJECT (bin, "stream SSRC %08x, delta %" G_GINT64_FORMAT, @@ -937,8 +946,7 @@ gst_rtp_bin_sync_chain (GstPad * pad, GstBuffer * buffer) gboolean have_sr, have_sdes; gboolean more; guint64 clock_base; - - clock_base = GST_BUFFER_OFFSET (buffer); + guint64 clock_base_time; stream = gst_pad_get_element_private (pad); bin = stream->bin; @@ -948,6 +956,12 @@ gst_rtp_bin_sync_chain (GstPad * pad, GstBuffer * buffer) if (!gst_rtcp_buffer_validate (buffer)) goto invalid_rtcp; + /* get the last relation between the rtp timestamps and the gstreamer + * timestamps. We get this info directly from the jitterbuffer which + * constructs gstreamer timestamps from rtp timestamps */ + gst_rtp_jitter_buffer_get_sync (GST_RTP_JITTER_BUFFER (stream->buffer), + &clock_base, &clock_base_time); + /* clock base changes when there is a huge gap in the timestamps or seqnum. * When this happens we don't want to calculate the extended timestamp based * on the previous one but reset the calculation. */ @@ -1008,7 +1022,7 @@ gst_rtp_bin_sync_chain (GstPad * pad, GstBuffer * buffer) if (type == GST_RTCP_SDES_CNAME) { stream->clock_base = clock_base; - stream->clock_base_time = GST_BUFFER_OFFSET_END (buffer); + stream->clock_base_time = clock_base_time; /* associate the stream to CNAME */ gst_rtp_bin_associate (bin, stream, len, data); } @@ -1328,6 +1342,19 @@ gst_rtp_bin_class_init (GstRtpBinClass * klass) G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_timeout), NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2, G_TYPE_UINT, G_TYPE_UINT); + /** + * GstRtpBin::on-sender-timeout: + * @rtpbin: the object which received the signal + * @session: the session + * @ssrc: the SSRC + * + * Notify of a sender SSRC that has timed out and became a receiver + */ + gst_rtp_bin_signals[SIGNAL_ON_SENDER_TIMEOUT] = + g_signal_new ("on-sender-timeout", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_sender_timeout), + NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2, + G_TYPE_UINT, G_TYPE_UINT); g_object_class_install_property (gobject_class, PROP_SDES_CNAME, g_param_spec_string ("sdes-cname", "SDES CNAME", @@ -2332,6 +2359,7 @@ gst_rtp_bin_request_new_pad (GstElement * element, GstRtpBin *rtpbin; GstElementClass *klass; GstPad *result; + gchar *pad_name = NULL; g_return_val_if_fail (templ != NULL, NULL); diff --git a/gst/rtpmanager/gstrtpbin.h b/gst/rtpmanager/gstrtpbin.h index 898b6dbbd2..7ef605d1f9 100644 --- a/gst/rtpmanager/gstrtpbin.h +++ b/gst/rtpmanager/gstrtpbin.h @@ -74,6 +74,7 @@ struct _GstRtpBinClass { void (*on_bye_ssrc) (GstRtpBin *rtpbin, guint session, guint32 ssrc); void (*on_bye_timeout) (GstRtpBin *rtpbin, guint session, guint32 ssrc); void (*on_timeout) (GstRtpBin *rtpbin, guint session, guint32 ssrc); + void (*on_sender_timeout) (GstRtpBin *rtpbin, guint session, guint32 ssrc); }; GType gst_rtp_bin_get_type (void); diff --git a/gst/rtpmanager/gstrtpjitterbuffer.c b/gst/rtpmanager/gstrtpjitterbuffer.c index b9b1569111..d48bc40fac 100644 --- a/gst/rtpmanager/gstrtpjitterbuffer.c +++ b/gst/rtpmanager/gstrtpjitterbuffer.c @@ -65,6 +65,7 @@ #include "gstrtpjitterbuffer.h" #include "rtpjitterbuffer.h" +#include "rtpstats.h" GST_DEBUG_CATEGORY (rtpjitterbuffer_debug); #define GST_CAT_DEFAULT (rtpjitterbuffer_debug) @@ -108,7 +109,7 @@ enum #define JBUF_LOCK_CHECK(priv,label) G_STMT_START { \ JBUF_LOCK (priv); \ - if (priv->srcresult != GST_FLOW_OK) \ + if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK)) \ goto label; \ } G_STMT_END @@ -117,7 +118,7 @@ enum #define JBUF_WAIT_CHECK(priv,label) G_STMT_START { \ JBUF_WAIT(priv); \ - if (priv->srcresult != GST_FLOW_OK) \ + if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK)) \ goto label; \ } G_STMT_END @@ -830,12 +831,12 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer) jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad)); - if (!gst_rtp_buffer_validate (buffer)) + if (G_UNLIKELY (!gst_rtp_buffer_validate (buffer))) goto invalid_buffer; priv = jitterbuffer->priv; - if (priv->last_pt != gst_rtp_buffer_get_payload_type (buffer)) { + if (G_UNLIKELY (priv->last_pt != gst_rtp_buffer_get_payload_type (buffer))) { GstCaps *caps; priv->last_pt = gst_rtp_buffer_get_payload_type (buffer); @@ -848,14 +849,14 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer) } } - if (priv->clock_rate == -1) { + if (G_UNLIKELY (priv->clock_rate == -1)) { guint8 pt; /* no clock rate given on the caps, try to get one with the signal */ pt = gst_rtp_buffer_get_payload_type (buffer); gst_rtp_jitter_buffer_get_clock_rate (jitterbuffer, pt); - if (priv->clock_rate == -1) + if (G_UNLIKELY (priv->clock_rate == -1)) goto not_negotiated; } @@ -875,35 +876,42 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer) JBUF_LOCK_CHECK (priv, out_flushing); /* don't accept more data on EOS */ - if (priv->eos) + if (G_UNLIKELY (priv->eos)) goto have_eos; /* let's check if this buffer is too late, we can only accept packets with * bigger seqnum than the one we last pushed. */ - if (priv->last_popped_seqnum != -1) { + if (G_LIKELY (priv->last_popped_seqnum != -1)) { gint gap; + gboolean reset = FALSE; gap = gst_rtp_buffer_compare_seqnum (priv->last_popped_seqnum, seqnum); - if (gap <= 0) { + if (G_UNLIKELY (gap <= 0)) { /* priv->last_popped_seqnum >= seqnum, this packet is too late or the * sender might have been restarted with different seqnum. */ - if (gap < -100) { + if (gap < -RTP_MAX_MISORDER) { GST_DEBUG_OBJECT (jitterbuffer, "reset: buffer too old %d", gap); - priv->last_popped_seqnum = -1; - priv->next_seqnum = -1; + reset = TRUE; } else { goto too_late; } } else { /* priv->last_popped_seqnum < seqnum, this is a new packet */ - if (gap > 3000) { + if (G_UNLIKELY (gap > RTP_MAX_DROPOUT)) { GST_DEBUG_OBJECT (jitterbuffer, "reset: too many dropped packets %d", gap); - priv->last_popped_seqnum = -1; - priv->next_seqnum = -1; + reset = TRUE; + } else { + GST_DEBUG_OBJECT (jitterbuffer, "dropped packets %d but <= %d", gap, + RTP_MAX_DROPOUT); } } + if (G_UNLIKELY (reset)) { + priv->last_popped_seqnum = -1; + priv->next_seqnum = -1; + rtp_jitter_buffer_reset_skew (priv->jbuf); + } } /* let's drop oldest packet if the queue is already full and drop-on-latency @@ -915,7 +923,7 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer) latency_ts = gst_util_uint64_scale_int (priv->latency_ms, priv->clock_rate, 1000); - if (rtp_jitter_buffer_get_ts_diff (priv->jbuf) >= latency_ts) { + if (G_UNLIKELY (rtp_jitter_buffer_get_ts_diff (priv->jbuf) >= latency_ts)) { GstBuffer *old_buf; old_buf = rtp_jitter_buffer_pop (priv->jbuf); @@ -934,8 +942,8 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer) /* now insert the packet into the queue in sorted order. This function returns * FALSE if a packet with the same seqnum was already in the queue, meaning we * have a duplicate. */ - if (!rtp_jitter_buffer_insert (priv->jbuf, buffer, timestamp, - priv->clock_rate, &tail)) + if (G_UNLIKELY (!rtp_jitter_buffer_insert (priv->jbuf, buffer, timestamp, + priv->clock_rate, &tail))) goto duplicate; /* signal addition of new buffer when the _loop is waiting. */ @@ -944,7 +952,7 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer) /* let's unschedule and unblock any waiting buffers. We only want to do this * when the tail buffer changed */ - if (priv->clock_id && tail) { + if (G_UNLIKELY (priv->clock_id && tail)) { GST_DEBUG_OBJECT (jitterbuffer, "Unscheduling waiting buffer, new tail buffer"); gst_clock_id_unschedule (priv->clock_id); @@ -1051,12 +1059,12 @@ again: GST_DEBUG_OBJECT (jitterbuffer, "Peeking item"); while (TRUE) { /* always wait if we are blocked */ - if (!priv->blocked) { + if (G_LIKELY (!priv->blocked)) { /* if we have a packet, we can exit the loop and grab it */ if (rtp_jitter_buffer_num_packets (priv->jbuf) > 0) break; /* no packets but we are EOS, do eos logic */ - if (priv->eos) + if (G_UNLIKELY (priv->eos)) goto do_eos; } /* underrun, wait for packets or flushing now */ @@ -1091,12 +1099,12 @@ again: /* get the gap between this and the previous packet. If we don't know the * previous packet seqnum assume no gap. */ - if (next_seqnum != -1) { + if (G_LIKELY (next_seqnum != -1)) { gap = gst_rtp_buffer_compare_seqnum (next_seqnum, seqnum); /* if we have a packet that we already pushed or considered dropped, pop it * off and get the next packet */ - if (gap < 0) { + if (G_UNLIKELY (gap < 0)) { GST_DEBUG_OBJECT (jitterbuffer, "Old packet #%d, next #%d dropping", seqnum, next_seqnum); outbuf = rtp_jitter_buffer_pop (priv->jbuf); @@ -1116,7 +1124,7 @@ again: * determine if we have missing a packet. If we have a missing packet (which * must be before this packet) we can wait for it until the deadline for this * packet expires. */ - if (gap != 0 && out_time != -1) { + if (G_UNLIKELY (gap != 0 && out_time != -1)) { GstClockID id; GstClockTime sync_time; GstClockReturn ret; @@ -1188,8 +1196,9 @@ again: /* at this point, the clock could have been unlocked by a timeout, a new * tail element was added to the queue or because we are shutting down. Check * for shutdown first. */ - if (priv->srcresult != GST_FLOW_OK) - goto flushing; + if G_UNLIKELY + ((priv->srcresult != GST_FLOW_OK)) + goto flushing; /* if we got unscheduled and we are not flushing, it's because a new tail * element became available in the queue. Grab it and try to push or sync. */ @@ -1239,7 +1248,7 @@ push_buffer: /* when we get here we are ready to pop and push the buffer */ outbuf = rtp_jitter_buffer_pop (priv->jbuf); - if (discont || priv->discont) { + if (G_UNLIKELY (discont || priv->discont)) { /* set DISCONT flag when we missed a packet. We pushed the buffer writable * into the jitterbuffer so we can modify now. */ GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT); @@ -1261,7 +1270,7 @@ push_buffer: "Pushing buffer %d, timestamp %" GST_TIME_FORMAT, seqnum, GST_TIME_ARGS (out_time)); result = gst_pad_push (priv->srcpad, outbuf); - if (result != GST_FLOW_OK) + if (G_UNLIKELY (result != GST_FLOW_OK)) goto pause; return; @@ -1451,3 +1460,18 @@ gst_rtp_jitter_buffer_get_property (GObject * object, break; } } + +void +gst_rtp_jitter_buffer_get_sync (GstRtpJitterBuffer * buffer, guint64 * rtptime, + guint64 * timestamp) +{ + GstRtpJitterBufferPrivate *priv; + + g_return_if_fail (GST_IS_RTP_JITTER_BUFFER (buffer)); + + priv = buffer->priv; + + JBUF_LOCK (priv); + rtp_jitter_buffer_get_sync (priv->jbuf, rtptime, timestamp); + JBUF_UNLOCK (priv); +} diff --git a/gst/rtpmanager/gstrtpjitterbuffer.h b/gst/rtpmanager/gstrtpjitterbuffer.h index 290aee0944..15185a2527 100644 --- a/gst/rtpmanager/gstrtpjitterbuffer.h +++ b/gst/rtpmanager/gstrtpjitterbuffer.h @@ -79,6 +79,9 @@ struct _GstRtpJitterBufferClass GType gst_rtp_jitter_buffer_get_type (void); +void gst_rtp_jitter_buffer_get_sync (GstRtpJitterBuffer *buffer, + guint64 *rtptime, guint64 *timestamp); + G_END_DECLS #endif /* __GST_RTP_JITTER_BUFFER_H__ */ diff --git a/gst/rtpmanager/gstrtpsession.c b/gst/rtpmanager/gstrtpsession.c index cc794b626a..e78e972d6d 100644 --- a/gst/rtpmanager/gstrtpsession.c +++ b/gst/rtpmanager/gstrtpsession.c @@ -193,6 +193,7 @@ enum SIGNAL_ON_BYE_SSRC, SIGNAL_ON_BYE_TIMEOUT, SIGNAL_ON_TIMEOUT, + SIGNAL_ON_SENDER_TIMEOUT, LAST_SIGNAL }; @@ -416,6 +417,13 @@ on_timeout (RTPSession * session, RTPSource * src, GstRtpSession * sess) src->ssrc); } +static void +on_sender_timeout (RTPSession * session, RTPSource * src, GstRtpSession * sess) +{ + g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_SENDER_TIMEOUT], 0, + src->ssrc); +} + GST_BOILERPLATE (GstRtpSession, gst_rtp_session, GstElement, GST_TYPE_ELEMENT); static void @@ -574,6 +582,18 @@ gst_rtp_session_class_init (GstRtpSessionClass * klass) g_signal_new ("on-timeout", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, on_timeout), NULL, NULL, g_cclosure_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT); + /** + * GstRtpSession::on-sender-timeout: + * @sess: the object which received the signal + * @ssrc: the SSRC + * + * Notify of a sender SSRC that has timed out and became a receiver + */ + gst_rtp_session_signals[SIGNAL_ON_SENDER_TIMEOUT] = + g_signal_new ("on-sender-timeout", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, + on_sender_timeout), NULL, NULL, g_cclosure_marshal_VOID__UINT, + G_TYPE_NONE, 1, G_TYPE_UINT); g_object_class_install_property (gobject_class, PROP_NTP_NS_BASE, g_param_spec_uint64 ("ntp-ns-base", "NTP base time", @@ -655,6 +675,7 @@ gst_rtp_session_init (GstRtpSession * rtpsession, GstRtpSessionClass * klass) rtpsession->priv->lock = g_mutex_new (); rtpsession->priv->sysclock = gst_system_clock_obtain (); rtpsession->priv->session = rtp_session_new (); + /* configure callbacks */ rtp_session_set_callbacks (rtpsession->priv->session, &callbacks, rtpsession); /* configure signals */ @@ -674,6 +695,8 @@ gst_rtp_session_init (GstRtpSession * rtpsession, GstRtpSessionClass * klass) (GCallback) on_bye_timeout, rtpsession); g_signal_connect (rtpsession->priv->session, "on-timeout", (GCallback) on_timeout, rtpsession); + g_signal_connect (rtpsession->priv->session, "on-sender-timeout", + (GCallback) on_sender_timeout, rtpsession); rtpsession->priv->ptmap = g_hash_table_new_full (NULL, NULL, NULL, (GDestroyNotify) gst_caps_unref); diff --git a/gst/rtpmanager/gstrtpsession.h b/gst/rtpmanager/gstrtpsession.h index 5bbf377a1f..9481a1c25f 100644 --- a/gst/rtpmanager/gstrtpsession.h +++ b/gst/rtpmanager/gstrtpsession.h @@ -71,6 +71,7 @@ struct _GstRtpSessionClass { void (*on_bye_ssrc) (GstRtpSession *sess, guint32 ssrc); void (*on_bye_timeout) (GstRtpSession *sess, guint32 ssrc); void (*on_timeout) (GstRtpSession *sess, guint32 ssrc); + void (*on_sender_timeout) (GstRtpSession *sess, guint32 ssrc); }; GType gst_rtp_session_get_type (void); diff --git a/gst/rtpmanager/rtpjitterbuffer.c b/gst/rtpmanager/rtpjitterbuffer.c index 70a49c1e7b..050adef0e4 100644 --- a/gst/rtpmanager/rtpjitterbuffer.c +++ b/gst/rtpmanager/rtpjitterbuffer.c @@ -104,6 +104,7 @@ rtp_jitter_buffer_reset_skew (RTPJitterBuffer * jbuf) { jbuf->base_time = -1; jbuf->base_rtptime = -1; + jbuf->base_extrtp = -1; jbuf->ext_rtptime = -1; jbuf->window_pos = 0; jbuf->window_filling = TRUE; @@ -185,21 +186,23 @@ calculate_skew (RTPJitterBuffer * jbuf, guint32 rtptime, GstClockTime time, gstrtptime = gst_util_uint64_scale_int (ext_rtptime, GST_SECOND, clock_rate); -again: /* first time, lock on to time and gstrtptime */ - if (jbuf->base_time == -1) + if (G_UNLIKELY (jbuf->base_time == -1)) jbuf->base_time = time; - if (jbuf->base_rtptime == -1) + if (G_UNLIKELY (jbuf->base_rtptime == -1)) { jbuf->base_rtptime = gstrtptime; + jbuf->base_extrtp = ext_rtptime; + } - if (gstrtptime >= jbuf->base_rtptime) + if (G_LIKELY (gstrtptime >= jbuf->base_rtptime)) send_diff = gstrtptime - jbuf->base_rtptime; else { /* elapsed time at sender, timestamps can go backwards and thus be smaller * than our base time, take a new base time in that case. */ GST_DEBUG ("backward timestamps at server, taking new base time"); - jbuf->base_rtptime = gstrtptime; jbuf->base_time = time; + jbuf->base_rtptime = gstrtptime; + jbuf->base_extrtp = ext_rtptime; send_diff = 0; } @@ -208,27 +211,6 @@ again: GST_TIME_ARGS (gstrtptime), GST_TIME_ARGS (jbuf->base_rtptime), GST_TIME_ARGS (send_diff)); - if (jbuf->prev_send_diff != -1 && time != -1) { - gint64 delta_diff; - - if (send_diff > jbuf->prev_send_diff) - delta_diff = send_diff - jbuf->prev_send_diff; - else - delta_diff = jbuf->prev_send_diff - send_diff; - - /* server changed rtp timestamps too quickly, reset skew detection and start - * again. This value is sortof arbitrary and can be a bad measurement up if - * there are many packets missing because then we get a big gap that is - * unrelated to a timestamp switch. */ - if (delta_diff > GST_SECOND) { - GST_DEBUG ("delta changed too quickly %" GST_TIME_FORMAT " reset skew", - GST_TIME_ARGS (delta_diff)); - rtp_jitter_buffer_reset_skew (jbuf); - goto again; - } - } - jbuf->prev_send_diff = send_diff; - /* we don't have an arrival timestamp so we can't do skew detection. we * should still apply a timestamp based on RTP timestamp and base_time */ if (time == -1) @@ -244,17 +226,30 @@ again: /* measure the diff */ delta = ((gint64) recv_diff) - ((gint64) send_diff); + /* if the difference between the sender timeline and the receiver timeline + * changed too quickly we have to resync because the server likely restarted + * its timestamps. */ + if (ABS (delta - jbuf->skew) > GST_SECOND) { + GST_DEBUG ("delta %" GST_TIME_FORMAT " too big, reset skew", + delta - jbuf->skew); + jbuf->base_time = time; + jbuf->base_rtptime = gstrtptime; + jbuf->base_extrtp = ext_rtptime; + send_diff = 0; + delta = 0; + } + pos = jbuf->window_pos; - if (jbuf->window_filling) { + if (G_UNLIKELY (jbuf->window_filling)) { /* we are filling the window */ GST_DEBUG ("filling %d, delta %" G_GINT64_FORMAT, pos, delta); jbuf->window[pos++] = delta; /* calc the min delta we observed */ - if (pos == 1 || delta < jbuf->window_min) + if (G_UNLIKELY (pos == 1 || delta < jbuf->window_min)) jbuf->window_min = delta; - if (send_diff >= MAX_TIME || pos >= MAX_WINDOW) { + if (G_UNLIKELY (send_diff >= MAX_TIME || pos >= MAX_WINDOW)) { jbuf->window_size = pos; /* window filled */ @@ -288,11 +283,11 @@ again: old = jbuf->window[pos]; jbuf->window[pos++] = delta; - if (delta <= jbuf->window_min) { + if (G_UNLIKELY (delta <= jbuf->window_min)) { /* if the new value we inserted is smaller or equal to the current min, * it becomes the new min */ jbuf->window_min = delta; - } else if (old == jbuf->window_min) { + } else if (G_UNLIKELY (old == jbuf->window_min)) { gint64 min = G_MAXINT64; /* if we removed the old min, we have to find a new min */ @@ -313,7 +308,7 @@ again: delta, jbuf->window_min); } /* wrap around in the window */ - if (pos >= jbuf->window_size) + if (G_UNLIKELY (pos >= jbuf->window_size)) pos = 0; jbuf->window_pos = pos; @@ -382,14 +377,14 @@ rtp_jitter_buffer_insert (RTPJitterBuffer * jbuf, GstBuffer * buf, time = calculate_skew (jbuf, rtptime, time, clock_rate); GST_BUFFER_TIMESTAMP (buf) = time; - if (list) + if (G_LIKELY (list)) g_queue_insert_before (jbuf->packets, list, buf); else g_queue_push_tail (jbuf->packets, buf); /* tail was changed when we did not find a previous packet, we set the return * flag when requested. */ - if (tail) + if (G_UNLIKELY (tail)) *tail = (list == NULL); return TRUE; @@ -514,3 +509,22 @@ rtp_jitter_buffer_get_ts_diff (RTPJitterBuffer * jbuf) } return result; } + +/** + * rtp_jitter_buffer_get_sync: + * @jbuf: an #RTPJitterBuffer + * @rtptime: result RTP time + * @timestamp: result GStreamer timestamp + * + * Returns the relation between the RTP timestamp and the GStreamer timestamp + * used for constructing timestamps. + */ +void +rtp_jitter_buffer_get_sync (RTPJitterBuffer * jbuf, guint64 * rtptime, + guint64 * timestamp) +{ + if (rtptime) + *rtptime = jbuf->base_extrtp; + if (timestamp) + *timestamp = jbuf->base_time + jbuf->skew; +} diff --git a/gst/rtpmanager/rtpjitterbuffer.h b/gst/rtpmanager/rtpjitterbuffer.h index ffd73ff930..62f3f47eed 100644 --- a/gst/rtpmanager/rtpjitterbuffer.h +++ b/gst/rtpmanager/rtpjitterbuffer.h @@ -22,7 +22,6 @@ #include #include -#include typedef struct _RTPJitterBuffer RTPJitterBuffer; typedef struct _RTPJitterBufferClass RTPJitterBufferClass; @@ -57,6 +56,7 @@ struct _RTPJitterBuffer { /* for calculating skew */ GstClockTime base_time; GstClockTime base_rtptime; + GstClockTime base_extrtp; guint64 ext_rtptime; gint64 window[RTP_JITTER_BUFFER_MAX_WINDOW]; guint window_pos; @@ -90,4 +90,8 @@ void rtp_jitter_buffer_flush (RTPJitterBuffer *jbuf) guint rtp_jitter_buffer_num_packets (RTPJitterBuffer *jbuf); guint32 rtp_jitter_buffer_get_ts_diff (RTPJitterBuffer *jbuf); +void rtp_jitter_buffer_get_sync (RTPJitterBuffer *jbuf, guint64 *rtptime, + guint64 *timestamp); + + #endif /* __RTP_JITTER_BUFFER_H__ */ diff --git a/gst/rtpmanager/rtpsession.c b/gst/rtpmanager/rtpsession.c index 947fef7e3b..428181f2ff 100644 --- a/gst/rtpmanager/rtpsession.c +++ b/gst/rtpmanager/rtpsession.c @@ -40,6 +40,7 @@ enum SIGNAL_ON_BYE_SSRC, SIGNAL_ON_BYE_TIMEOUT, SIGNAL_ON_TIMEOUT, + SIGNAL_ON_SENDER_TIMEOUT, LAST_SIGNAL }; @@ -212,6 +213,18 @@ rtp_session_class_init (RTPSessionClass * klass) G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_timeout), NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1, RTP_TYPE_SOURCE); + /** + * RTPSession::on-sender-timeout: + * @session: the object which received the signal + * @src: the RTPSource that timed out + * + * Notify of an SSRC that was a sender but timed out and became a receiver. + */ + rtp_session_signals[SIGNAL_ON_SENDER_TIMEOUT] = + g_signal_new ("on-sender-timeout", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_sender_timeout), + NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1, + RTP_TYPE_SOURCE); g_object_class_install_property (gobject_class, PROP_INTERNAL_SOURCE, g_param_spec_object ("internal-source", "Internal Source", @@ -513,6 +526,15 @@ on_timeout (RTPSession * sess, RTPSource * source) RTP_SESSION_LOCK (sess); } +static void +on_sender_timeout (RTPSession * sess, RTPSource * source) +{ + RTP_SESSION_UNLOCK (sess); + g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SENDER_TIMEOUT], 0, + source); + RTP_SESSION_LOCK (sess); +} + /** * rtp_session_new: * @@ -908,9 +930,8 @@ check_collision (RTPSession * sess, RTPSource * source, RTPArrivalStats * arrival, gboolean rtp) { /* If we have not arrival address, we can't do collision checking */ - if (!arrival->have_address) { + if (!arrival->have_address) return FALSE; - } if (sess->source != source) { /* This is not our local source, but lets check if two remote @@ -1479,12 +1500,6 @@ rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet, if (!source) return; - /* we somehow need to transfer the clock_base and the base time to the next - * element, we use the offset and offset_end fields in the buffer for this - * hack */ - GST_BUFFER_OFFSET (packet->buffer) = source->clock_base; - GST_BUFFER_OFFSET_END (packet->buffer) = source->clock_base_time; - prevsender = RTP_SOURCE_IS_SENDER (source); /* first update the source */ @@ -2096,6 +2111,7 @@ session_cleanup (const gchar * key, RTPSource * source, ReportData * data) { gboolean remove = FALSE; gboolean byetimeout = FALSE; + gboolean sendertimeout = FALSE; gboolean is_sender, is_active; RTPSession *sess = data->sess; GstClockTime interval; @@ -2138,6 +2154,7 @@ session_cleanup (const gchar * key, RTPSource * source, ReportData * data) GST_TIME_ARGS (source->last_rtp_activity)); source->is_sender = FALSE; sess->stats.sender_sources--; + sendertimeout = TRUE; } } } @@ -2153,6 +2170,9 @@ session_cleanup (const gchar * key, RTPSource * source, ReportData * data) on_bye_timeout (sess, source); else on_timeout (sess, source); + } else { + if (sendertimeout) + on_sender_timeout (sess, source); } return remove; } diff --git a/gst/rtpmanager/rtpsession.h b/gst/rtpmanager/rtpsession.h index a32f91151b..dd3fbc1354 100644 --- a/gst/rtpmanager/rtpsession.h +++ b/gst/rtpmanager/rtpsession.h @@ -228,6 +228,7 @@ struct _RTPSessionClass { void (*on_bye_ssrc) (RTPSession *sess, RTPSource *source); void (*on_bye_timeout) (RTPSession *sess, RTPSource *source); void (*on_timeout) (RTPSession *sess, RTPSource *source); + void (*on_sender_timeout) (RTPSession *sess, RTPSource *source); }; GType rtp_session_get_type (void); diff --git a/gst/rtpmanager/rtpsource.c b/gst/rtpmanager/rtpsource.c index ddbf733be5..8d9d6ecf4c 100644 --- a/gst/rtpmanager/rtpsource.c +++ b/gst/rtpmanager/rtpsource.c @@ -170,8 +170,6 @@ rtp_source_init (RTPSource * src) src->payload = 0; src->clock_rate = -1; - src->clock_base = -1; - src->clock_base_time = -1; src->packets = g_queue_new (); src->seqnum_base = -1; src->last_rtptime = -1; @@ -527,10 +525,6 @@ rtp_source_update_caps (RTPSource * src, GstCaps * caps) gst_structure_get_int (s, "clock-rate", &src->clock_rate); GST_DEBUG ("got clock-rate %d", src->clock_rate); - if (gst_structure_get_uint (s, "clock-base", &val)) - src->clock_base = val; - GST_DEBUG ("got clock-base %" G_GINT64_FORMAT, src->clock_base); - if (gst_structure_get_uint (s, "seqnum-base", &val)) src->seqnum_base = val; GST_DEBUG ("got seqnum-base %" G_GINT32_FORMAT, src->seqnum_base); @@ -771,13 +765,6 @@ calculate_jitter (RTPSource * src, GstBuffer * buffer, rtptime = gst_rtp_buffer_get_timestamp (buffer); - /* no clock-base, take first rtptime as base */ - if (src->clock_base == -1) { - GST_DEBUG ("using clock-base of %" G_GUINT32_FORMAT, rtptime); - src->clock_base = rtptime; - src->clock_base_time = arrival->timestamp; - } - /* convert arrival time to RTP timestamp units, truncate to 32 bits, we don't * care about the absolute value, just the difference. */ rtparrival = gst_util_uint64_scale_int (ntpnstime, clock_rate, GST_SECOND); @@ -923,13 +910,11 @@ rtp_source_process_rtp (RTPSource * src, GstBuffer * buffer, } else { /* unacceptable jump */ stats->bad_seq = (seqnr + 1) & (RTP_SEQ_MOD - 1); - src->clock_base = -1; goto bad_sequence; } } else { /* duplicate or reordered packet, will be filtered by jitterbuffer. */ GST_WARNING ("duplicate or reordered packet"); - src->clock_base = -1; } src->stats.octets_received += arrival->payload_len; diff --git a/gst/rtpmanager/rtpsource.h b/gst/rtpmanager/rtpsource.h index a2ba2d611b..c4c23a8bf7 100644 --- a/gst/rtpmanager/rtpsource.h +++ b/gst/rtpmanager/rtpsource.h @@ -32,8 +32,6 @@ #define RTP_DEFAULT_PROBATION 2 #define RTP_SEQ_MOD (1 << 16) -#define RTP_MAX_DROPOUT 3000 -#define RTP_MAX_MISORDER 100 typedef struct _RTPSource RTPSource; typedef struct _RTPSourceClass RTPSourceClass; @@ -133,8 +131,6 @@ struct _RTPSource { GstCaps *caps; gint clock_rate; gint32 seqnum_base; - gint64 clock_base; - guint64 clock_base_time; GstClockTime bye_time; GstClockTime last_activity; diff --git a/gst/rtpmanager/rtpstats.h b/gst/rtpmanager/rtpstats.h index f82c9585df..3408300d31 100644 --- a/gst/rtpmanager/rtpstats.h +++ b/gst/rtpmanager/rtpstats.h @@ -150,11 +150,22 @@ typedef struct { #define RTP_STATS_RECEIVER_FRACTION (1.0 - RTP_STATS_SENDER_FRACTION) /* - * When receiving a BYE from a source, remove the source fomr the database + * When receiving a BYE from a source, remove the source from the database * after this timeout. */ #define RTP_STATS_BYE_TIMEOUT (2 * GST_SECOND) +/* + * The maximum number of missing packets we tollerate. These are packets with a + * sequence number bigger than the last seen packet. + */ +#define RTP_MAX_DROPOUT 3000 +/* + * The maximum number of misordered packets we tollerate. These are packets with + * a sequence number smaller than the last seen packet. + */ +#define RTP_MAX_MISORDER 100 + /** * RTPSessionStats: *