mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2024-11-27 12:11:13 +00:00
gst/rtpmanager/gstrtpbin.*: Add signal to notify listeners when a sender becomes a receiver.
Original commit message from CVS: * gst/rtpmanager/gstrtpbin.c: (on_sender_timeout), (create_session), (gst_rtp_bin_associate), (gst_rtp_bin_sync_chain), (gst_rtp_bin_class_init), (gst_rtp_bin_request_new_pad): * gst/rtpmanager/gstrtpbin.h: Add signal to notify listeners when a sender becomes a receiver. Tweak lip-sync code, don't store our own copy of the ts-offset of the jitterbuffer, don't adjust sync if the change is less than 4msec. Get the RTP timestamp <-> GStreamer timestamp relation directly from the jitterbuffer instead of our inaccurate version from the source. * gst/rtpmanager/gstrtpjitterbuffer.c: (gst_rtp_jitter_buffer_chain), (gst_rtp_jitter_buffer_loop), (gst_rtp_jitter_buffer_get_sync): * gst/rtpmanager/gstrtpjitterbuffer.h: Add G_LIKELY macros, use global defines for max packet reorder and dropouts. Reset the jitterbuffer clock skew detection when packets seqnums are changed unexpectedly. * gst/rtpmanager/gstrtpsession.c: (on_sender_timeout), (gst_rtp_session_class_init), (gst_rtp_session_init): * gst/rtpmanager/gstrtpsession.h: Add sender timeout signal. * gst/rtpmanager/rtpjitterbuffer.c: (rtp_jitter_buffer_reset_skew), (calculate_skew), (rtp_jitter_buffer_insert), (rtp_jitter_buffer_get_sync): * gst/rtpmanager/rtpjitterbuffer.h: Add some G_LIKELY macros. Keep track of the extended RTP timestamp so that we can report the RTP timestamp <-> GStreamer timestamp relation for lip-sync. Remove server timestamp gap detection code, the server can sometimes make a huge gap in timestamps (talk spurts,...) see #549774. Detect timetamp weirdness instead by observing the sender/receiver timestamp relation and resync if it changes more than 1 second. Add method to report about the current rtp <-> gst timestamp relation which is needed for lip-sync. * gst/rtpmanager/rtpsession.c: (rtp_session_class_init), (on_sender_timeout), (check_collision), (rtp_session_process_sr), (session_cleanup): * gst/rtpmanager/rtpsession.h: Add sender timeout signal. Remove inaccurate rtp <-> gst timestamp relation code, the jitterbuffer can now do an accurate reporting about this. * gst/rtpmanager/rtpsource.c: (rtp_source_init), (rtp_source_update_caps), (calculate_jitter), (rtp_source_process_rtp): * gst/rtpmanager/rtpsource.h: Remove inaccurate rtp <-> gst timestamp relation code. * gst/rtpmanager/rtpstats.h: Define global max-reorder and max-dropout constants for use in various subsystems.
This commit is contained in:
parent
5c89bb2ab3
commit
85e26f6546
13 changed files with 218 additions and 107 deletions
|
@ -120,6 +120,7 @@
|
|||
#include "gstrtpbin-marshal.h"
|
||||
#include "gstrtpbin.h"
|
||||
#include "gstrtpsession.h"
|
||||
#include "gstrtpjitterbuffer.h"
|
||||
|
||||
GST_DEBUG_CATEGORY_STATIC (gst_rtp_bin_debug);
|
||||
#define GST_CAT_DEFAULT gst_rtp_bin_debug
|
||||
|
@ -236,6 +237,7 @@ enum
|
|||
SIGNAL_ON_BYE_SSRC,
|
||||
SIGNAL_ON_BYE_TIMEOUT,
|
||||
SIGNAL_ON_TIMEOUT,
|
||||
SIGNAL_ON_SENDER_TIMEOUT,
|
||||
LAST_SIGNAL
|
||||
};
|
||||
|
||||
|
@ -323,7 +325,6 @@ struct _GstRtpBinStream
|
|||
guint64 clock_base_time;
|
||||
gint clock_rate;
|
||||
gint64 ts_offset;
|
||||
gint64 prev_ts_offset;
|
||||
gint last_pt;
|
||||
};
|
||||
|
||||
|
@ -455,6 +456,13 @@ on_timeout (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
|
|||
sess->id, ssrc);
|
||||
}
|
||||
|
||||
static void
|
||||
on_sender_timeout (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
|
||||
{
|
||||
g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SENDER_TIMEOUT], 0,
|
||||
sess->id, ssrc);
|
||||
}
|
||||
|
||||
/* create a session with the given id. Must be called with RTP_BIN_LOCK */
|
||||
static GstRtpBinSession *
|
||||
create_session (GstRtpBin * rtpbin, gint id)
|
||||
|
@ -507,6 +515,8 @@ create_session (GstRtpBin * rtpbin, gint id)
|
|||
g_signal_connect (sess->session, "on-bye-timeout",
|
||||
(GCallback) on_bye_timeout, sess);
|
||||
g_signal_connect (sess->session, "on-timeout", (GCallback) on_timeout, sess);
|
||||
g_signal_connect (sess->session, "on-sender-timeout",
|
||||
(GCallback) on_sender_timeout, sess);
|
||||
|
||||
/* FIXME, change state only to what's needed */
|
||||
gst_bin_add (GST_BIN_CAST (rtpbin), session);
|
||||
|
@ -863,32 +873,31 @@ gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len,
|
|||
/* calculate offsets for each stream */
|
||||
for (walk = client->streams; walk; walk = g_slist_next (walk)) {
|
||||
GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;
|
||||
|
||||
if (ostream->unix_delta == 0)
|
||||
continue;
|
||||
gint64 prev_ts_offset;
|
||||
|
||||
ostream->ts_offset = ostream->unix_delta - min;
|
||||
|
||||
g_object_get (ostream->buffer, "ts-offset", &prev_ts_offset, NULL);
|
||||
|
||||
/* delta changed, see how much */
|
||||
if (ostream->prev_ts_offset != ostream->ts_offset) {
|
||||
if (prev_ts_offset != ostream->ts_offset) {
|
||||
gint64 diff;
|
||||
|
||||
if (ostream->prev_ts_offset > ostream->ts_offset)
|
||||
diff = ostream->prev_ts_offset - ostream->ts_offset;
|
||||
if (prev_ts_offset > ostream->ts_offset)
|
||||
diff = prev_ts_offset - ostream->ts_offset;
|
||||
else
|
||||
diff = ostream->ts_offset - ostream->prev_ts_offset;
|
||||
diff = ostream->ts_offset - prev_ts_offset;
|
||||
|
||||
GST_DEBUG_OBJECT (bin,
|
||||
"ts-offset %" G_GUINT64_FORMAT ", prev %" G_GUINT64_FORMAT
|
||||
", diff: %" G_GINT64_FORMAT, ostream->ts_offset,
|
||||
ostream->prev_ts_offset, diff);
|
||||
", diff: %" G_GINT64_FORMAT, ostream->ts_offset, prev_ts_offset,
|
||||
diff);
|
||||
|
||||
/* only change diff when it changed more than 1 millisecond. This
|
||||
/* only change diff when it changed more than 4 milliseconds. This
|
||||
* compensates for rounding errors in NTP to RTP timestamp
|
||||
* conversions */
|
||||
if (diff > GST_MSECOND && diff < (3 * GST_SECOND)) {
|
||||
if (diff > 4 * GST_MSECOND && diff < (3 * GST_SECOND)) {
|
||||
g_object_set (ostream->buffer, "ts-offset", ostream->ts_offset, NULL);
|
||||
ostream->prev_ts_offset = ostream->ts_offset;
|
||||
}
|
||||
}
|
||||
GST_DEBUG_OBJECT (bin, "stream SSRC %08x, delta %" G_GINT64_FORMAT,
|
||||
|
@ -937,8 +946,7 @@ gst_rtp_bin_sync_chain (GstPad * pad, GstBuffer * buffer)
|
|||
gboolean have_sr, have_sdes;
|
||||
gboolean more;
|
||||
guint64 clock_base;
|
||||
|
||||
clock_base = GST_BUFFER_OFFSET (buffer);
|
||||
guint64 clock_base_time;
|
||||
|
||||
stream = gst_pad_get_element_private (pad);
|
||||
bin = stream->bin;
|
||||
|
@ -948,6 +956,12 @@ gst_rtp_bin_sync_chain (GstPad * pad, GstBuffer * buffer)
|
|||
if (!gst_rtcp_buffer_validate (buffer))
|
||||
goto invalid_rtcp;
|
||||
|
||||
/* get the last relation between the rtp timestamps and the gstreamer
|
||||
* timestamps. We get this info directly from the jitterbuffer which
|
||||
* constructs gstreamer timestamps from rtp timestamps */
|
||||
gst_rtp_jitter_buffer_get_sync (GST_RTP_JITTER_BUFFER (stream->buffer),
|
||||
&clock_base, &clock_base_time);
|
||||
|
||||
/* clock base changes when there is a huge gap in the timestamps or seqnum.
|
||||
* When this happens we don't want to calculate the extended timestamp based
|
||||
* on the previous one but reset the calculation. */
|
||||
|
@ -1008,7 +1022,7 @@ gst_rtp_bin_sync_chain (GstPad * pad, GstBuffer * buffer)
|
|||
|
||||
if (type == GST_RTCP_SDES_CNAME) {
|
||||
stream->clock_base = clock_base;
|
||||
stream->clock_base_time = GST_BUFFER_OFFSET_END (buffer);
|
||||
stream->clock_base_time = clock_base_time;
|
||||
/* associate the stream to CNAME */
|
||||
gst_rtp_bin_associate (bin, stream, len, data);
|
||||
}
|
||||
|
@ -1328,6 +1342,19 @@ gst_rtp_bin_class_init (GstRtpBinClass * klass)
|
|||
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_timeout),
|
||||
NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
|
||||
G_TYPE_UINT, G_TYPE_UINT);
|
||||
/**
|
||||
* GstRtpBin::on-sender-timeout:
|
||||
* @rtpbin: the object which received the signal
|
||||
* @session: the session
|
||||
* @ssrc: the SSRC
|
||||
*
|
||||
* Notify of a sender SSRC that has timed out and became a receiver
|
||||
*/
|
||||
gst_rtp_bin_signals[SIGNAL_ON_SENDER_TIMEOUT] =
|
||||
g_signal_new ("on-sender-timeout", G_TYPE_FROM_CLASS (klass),
|
||||
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_sender_timeout),
|
||||
NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
|
||||
G_TYPE_UINT, G_TYPE_UINT);
|
||||
|
||||
g_object_class_install_property (gobject_class, PROP_SDES_CNAME,
|
||||
g_param_spec_string ("sdes-cname", "SDES CNAME",
|
||||
|
@ -2332,6 +2359,7 @@ gst_rtp_bin_request_new_pad (GstElement * element,
|
|||
GstRtpBin *rtpbin;
|
||||
GstElementClass *klass;
|
||||
GstPad *result;
|
||||
|
||||
gchar *pad_name = NULL;
|
||||
|
||||
g_return_val_if_fail (templ != NULL, NULL);
|
||||
|
|
|
@ -74,6 +74,7 @@ struct _GstRtpBinClass {
|
|||
void (*on_bye_ssrc) (GstRtpBin *rtpbin, guint session, guint32 ssrc);
|
||||
void (*on_bye_timeout) (GstRtpBin *rtpbin, guint session, guint32 ssrc);
|
||||
void (*on_timeout) (GstRtpBin *rtpbin, guint session, guint32 ssrc);
|
||||
void (*on_sender_timeout) (GstRtpBin *rtpbin, guint session, guint32 ssrc);
|
||||
};
|
||||
|
||||
GType gst_rtp_bin_get_type (void);
|
||||
|
|
|
@ -65,6 +65,7 @@
|
|||
|
||||
#include "gstrtpjitterbuffer.h"
|
||||
#include "rtpjitterbuffer.h"
|
||||
#include "rtpstats.h"
|
||||
|
||||
GST_DEBUG_CATEGORY (rtpjitterbuffer_debug);
|
||||
#define GST_CAT_DEFAULT (rtpjitterbuffer_debug)
|
||||
|
@ -108,7 +109,7 @@ enum
|
|||
|
||||
#define JBUF_LOCK_CHECK(priv,label) G_STMT_START { \
|
||||
JBUF_LOCK (priv); \
|
||||
if (priv->srcresult != GST_FLOW_OK) \
|
||||
if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK)) \
|
||||
goto label; \
|
||||
} G_STMT_END
|
||||
|
||||
|
@ -117,7 +118,7 @@ enum
|
|||
|
||||
#define JBUF_WAIT_CHECK(priv,label) G_STMT_START { \
|
||||
JBUF_WAIT(priv); \
|
||||
if (priv->srcresult != GST_FLOW_OK) \
|
||||
if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK)) \
|
||||
goto label; \
|
||||
} G_STMT_END
|
||||
|
||||
|
@ -830,12 +831,12 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer)
|
|||
|
||||
jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
|
||||
|
||||
if (!gst_rtp_buffer_validate (buffer))
|
||||
if (G_UNLIKELY (!gst_rtp_buffer_validate (buffer)))
|
||||
goto invalid_buffer;
|
||||
|
||||
priv = jitterbuffer->priv;
|
||||
|
||||
if (priv->last_pt != gst_rtp_buffer_get_payload_type (buffer)) {
|
||||
if (G_UNLIKELY (priv->last_pt != gst_rtp_buffer_get_payload_type (buffer))) {
|
||||
GstCaps *caps;
|
||||
|
||||
priv->last_pt = gst_rtp_buffer_get_payload_type (buffer);
|
||||
|
@ -848,14 +849,14 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer)
|
|||
}
|
||||
}
|
||||
|
||||
if (priv->clock_rate == -1) {
|
||||
if (G_UNLIKELY (priv->clock_rate == -1)) {
|
||||
guint8 pt;
|
||||
|
||||
/* no clock rate given on the caps, try to get one with the signal */
|
||||
pt = gst_rtp_buffer_get_payload_type (buffer);
|
||||
|
||||
gst_rtp_jitter_buffer_get_clock_rate (jitterbuffer, pt);
|
||||
if (priv->clock_rate == -1)
|
||||
if (G_UNLIKELY (priv->clock_rate == -1))
|
||||
goto not_negotiated;
|
||||
}
|
||||
|
||||
|
@ -875,34 +876,41 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer)
|
|||
|
||||
JBUF_LOCK_CHECK (priv, out_flushing);
|
||||
/* don't accept more data on EOS */
|
||||
if (priv->eos)
|
||||
if (G_UNLIKELY (priv->eos))
|
||||
goto have_eos;
|
||||
|
||||
/* let's check if this buffer is too late, we can only accept packets with
|
||||
* bigger seqnum than the one we last pushed. */
|
||||
if (priv->last_popped_seqnum != -1) {
|
||||
if (G_LIKELY (priv->last_popped_seqnum != -1)) {
|
||||
gint gap;
|
||||
gboolean reset = FALSE;
|
||||
|
||||
gap = gst_rtp_buffer_compare_seqnum (priv->last_popped_seqnum, seqnum);
|
||||
|
||||
if (gap <= 0) {
|
||||
if (G_UNLIKELY (gap <= 0)) {
|
||||
/* priv->last_popped_seqnum >= seqnum, this packet is too late or the
|
||||
* sender might have been restarted with different seqnum. */
|
||||
if (gap < -100) {
|
||||
if (gap < -RTP_MAX_MISORDER) {
|
||||
GST_DEBUG_OBJECT (jitterbuffer, "reset: buffer too old %d", gap);
|
||||
priv->last_popped_seqnum = -1;
|
||||
priv->next_seqnum = -1;
|
||||
reset = TRUE;
|
||||
} else {
|
||||
goto too_late;
|
||||
}
|
||||
} else {
|
||||
/* priv->last_popped_seqnum < seqnum, this is a new packet */
|
||||
if (gap > 3000) {
|
||||
if (G_UNLIKELY (gap > RTP_MAX_DROPOUT)) {
|
||||
GST_DEBUG_OBJECT (jitterbuffer, "reset: too many dropped packets %d",
|
||||
gap);
|
||||
reset = TRUE;
|
||||
} else {
|
||||
GST_DEBUG_OBJECT (jitterbuffer, "dropped packets %d but <= %d", gap,
|
||||
RTP_MAX_DROPOUT);
|
||||
}
|
||||
}
|
||||
if (G_UNLIKELY (reset)) {
|
||||
priv->last_popped_seqnum = -1;
|
||||
priv->next_seqnum = -1;
|
||||
}
|
||||
rtp_jitter_buffer_reset_skew (priv->jbuf);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -915,7 +923,7 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer)
|
|||
latency_ts =
|
||||
gst_util_uint64_scale_int (priv->latency_ms, priv->clock_rate, 1000);
|
||||
|
||||
if (rtp_jitter_buffer_get_ts_diff (priv->jbuf) >= latency_ts) {
|
||||
if (G_UNLIKELY (rtp_jitter_buffer_get_ts_diff (priv->jbuf) >= latency_ts)) {
|
||||
GstBuffer *old_buf;
|
||||
|
||||
old_buf = rtp_jitter_buffer_pop (priv->jbuf);
|
||||
|
@ -934,8 +942,8 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer)
|
|||
/* now insert the packet into the queue in sorted order. This function returns
|
||||
* FALSE if a packet with the same seqnum was already in the queue, meaning we
|
||||
* have a duplicate. */
|
||||
if (!rtp_jitter_buffer_insert (priv->jbuf, buffer, timestamp,
|
||||
priv->clock_rate, &tail))
|
||||
if (G_UNLIKELY (!rtp_jitter_buffer_insert (priv->jbuf, buffer, timestamp,
|
||||
priv->clock_rate, &tail)))
|
||||
goto duplicate;
|
||||
|
||||
/* signal addition of new buffer when the _loop is waiting. */
|
||||
|
@ -944,7 +952,7 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer)
|
|||
|
||||
/* let's unschedule and unblock any waiting buffers. We only want to do this
|
||||
* when the tail buffer changed */
|
||||
if (priv->clock_id && tail) {
|
||||
if (G_UNLIKELY (priv->clock_id && tail)) {
|
||||
GST_DEBUG_OBJECT (jitterbuffer,
|
||||
"Unscheduling waiting buffer, new tail buffer");
|
||||
gst_clock_id_unschedule (priv->clock_id);
|
||||
|
@ -1051,12 +1059,12 @@ again:
|
|||
GST_DEBUG_OBJECT (jitterbuffer, "Peeking item");
|
||||
while (TRUE) {
|
||||
/* always wait if we are blocked */
|
||||
if (!priv->blocked) {
|
||||
if (G_LIKELY (!priv->blocked)) {
|
||||
/* if we have a packet, we can exit the loop and grab it */
|
||||
if (rtp_jitter_buffer_num_packets (priv->jbuf) > 0)
|
||||
break;
|
||||
/* no packets but we are EOS, do eos logic */
|
||||
if (priv->eos)
|
||||
if (G_UNLIKELY (priv->eos))
|
||||
goto do_eos;
|
||||
}
|
||||
/* underrun, wait for packets or flushing now */
|
||||
|
@ -1091,12 +1099,12 @@ again:
|
|||
|
||||
/* get the gap between this and the previous packet. If we don't know the
|
||||
* previous packet seqnum assume no gap. */
|
||||
if (next_seqnum != -1) {
|
||||
if (G_LIKELY (next_seqnum != -1)) {
|
||||
gap = gst_rtp_buffer_compare_seqnum (next_seqnum, seqnum);
|
||||
|
||||
/* if we have a packet that we already pushed or considered dropped, pop it
|
||||
* off and get the next packet */
|
||||
if (gap < 0) {
|
||||
if (G_UNLIKELY (gap < 0)) {
|
||||
GST_DEBUG_OBJECT (jitterbuffer, "Old packet #%d, next #%d dropping",
|
||||
seqnum, next_seqnum);
|
||||
outbuf = rtp_jitter_buffer_pop (priv->jbuf);
|
||||
|
@ -1116,7 +1124,7 @@ again:
|
|||
* determine if we have missing a packet. If we have a missing packet (which
|
||||
* must be before this packet) we can wait for it until the deadline for this
|
||||
* packet expires. */
|
||||
if (gap != 0 && out_time != -1) {
|
||||
if (G_UNLIKELY (gap != 0 && out_time != -1)) {
|
||||
GstClockID id;
|
||||
GstClockTime sync_time;
|
||||
GstClockReturn ret;
|
||||
|
@ -1188,7 +1196,8 @@ again:
|
|||
/* at this point, the clock could have been unlocked by a timeout, a new
|
||||
* tail element was added to the queue or because we are shutting down. Check
|
||||
* for shutdown first. */
|
||||
if (priv->srcresult != GST_FLOW_OK)
|
||||
if G_UNLIKELY
|
||||
((priv->srcresult != GST_FLOW_OK))
|
||||
goto flushing;
|
||||
|
||||
/* if we got unscheduled and we are not flushing, it's because a new tail
|
||||
|
@ -1239,7 +1248,7 @@ push_buffer:
|
|||
/* when we get here we are ready to pop and push the buffer */
|
||||
outbuf = rtp_jitter_buffer_pop (priv->jbuf);
|
||||
|
||||
if (discont || priv->discont) {
|
||||
if (G_UNLIKELY (discont || priv->discont)) {
|
||||
/* set DISCONT flag when we missed a packet. We pushed the buffer writable
|
||||
* into the jitterbuffer so we can modify now. */
|
||||
GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT);
|
||||
|
@ -1261,7 +1270,7 @@ push_buffer:
|
|||
"Pushing buffer %d, timestamp %" GST_TIME_FORMAT, seqnum,
|
||||
GST_TIME_ARGS (out_time));
|
||||
result = gst_pad_push (priv->srcpad, outbuf);
|
||||
if (result != GST_FLOW_OK)
|
||||
if (G_UNLIKELY (result != GST_FLOW_OK))
|
||||
goto pause;
|
||||
|
||||
return;
|
||||
|
@ -1451,3 +1460,18 @@ gst_rtp_jitter_buffer_get_property (GObject * object,
|
|||
break;
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
gst_rtp_jitter_buffer_get_sync (GstRtpJitterBuffer * buffer, guint64 * rtptime,
|
||||
guint64 * timestamp)
|
||||
{
|
||||
GstRtpJitterBufferPrivate *priv;
|
||||
|
||||
g_return_if_fail (GST_IS_RTP_JITTER_BUFFER (buffer));
|
||||
|
||||
priv = buffer->priv;
|
||||
|
||||
JBUF_LOCK (priv);
|
||||
rtp_jitter_buffer_get_sync (priv->jbuf, rtptime, timestamp);
|
||||
JBUF_UNLOCK (priv);
|
||||
}
|
||||
|
|
|
@ -79,6 +79,9 @@ struct _GstRtpJitterBufferClass
|
|||
|
||||
GType gst_rtp_jitter_buffer_get_type (void);
|
||||
|
||||
void gst_rtp_jitter_buffer_get_sync (GstRtpJitterBuffer *buffer,
|
||||
guint64 *rtptime, guint64 *timestamp);
|
||||
|
||||
G_END_DECLS
|
||||
|
||||
#endif /* __GST_RTP_JITTER_BUFFER_H__ */
|
||||
|
|
|
@ -193,6 +193,7 @@ enum
|
|||
SIGNAL_ON_BYE_SSRC,
|
||||
SIGNAL_ON_BYE_TIMEOUT,
|
||||
SIGNAL_ON_TIMEOUT,
|
||||
SIGNAL_ON_SENDER_TIMEOUT,
|
||||
LAST_SIGNAL
|
||||
};
|
||||
|
||||
|
@ -416,6 +417,13 @@ on_timeout (RTPSession * session, RTPSource * src, GstRtpSession * sess)
|
|||
src->ssrc);
|
||||
}
|
||||
|
||||
static void
|
||||
on_sender_timeout (RTPSession * session, RTPSource * src, GstRtpSession * sess)
|
||||
{
|
||||
g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_SENDER_TIMEOUT], 0,
|
||||
src->ssrc);
|
||||
}
|
||||
|
||||
GST_BOILERPLATE (GstRtpSession, gst_rtp_session, GstElement, GST_TYPE_ELEMENT);
|
||||
|
||||
static void
|
||||
|
@ -574,6 +582,18 @@ gst_rtp_session_class_init (GstRtpSessionClass * klass)
|
|||
g_signal_new ("on-timeout", G_TYPE_FROM_CLASS (klass),
|
||||
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, on_timeout),
|
||||
NULL, NULL, g_cclosure_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
|
||||
/**
|
||||
* GstRtpSession::on-sender-timeout:
|
||||
* @sess: the object which received the signal
|
||||
* @ssrc: the SSRC
|
||||
*
|
||||
* Notify of a sender SSRC that has timed out and became a receiver
|
||||
*/
|
||||
gst_rtp_session_signals[SIGNAL_ON_SENDER_TIMEOUT] =
|
||||
g_signal_new ("on-sender-timeout", G_TYPE_FROM_CLASS (klass),
|
||||
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass,
|
||||
on_sender_timeout), NULL, NULL, g_cclosure_marshal_VOID__UINT,
|
||||
G_TYPE_NONE, 1, G_TYPE_UINT);
|
||||
|
||||
g_object_class_install_property (gobject_class, PROP_NTP_NS_BASE,
|
||||
g_param_spec_uint64 ("ntp-ns-base", "NTP base time",
|
||||
|
@ -655,6 +675,7 @@ gst_rtp_session_init (GstRtpSession * rtpsession, GstRtpSessionClass * klass)
|
|||
rtpsession->priv->lock = g_mutex_new ();
|
||||
rtpsession->priv->sysclock = gst_system_clock_obtain ();
|
||||
rtpsession->priv->session = rtp_session_new ();
|
||||
|
||||
/* configure callbacks */
|
||||
rtp_session_set_callbacks (rtpsession->priv->session, &callbacks, rtpsession);
|
||||
/* configure signals */
|
||||
|
@ -674,6 +695,8 @@ gst_rtp_session_init (GstRtpSession * rtpsession, GstRtpSessionClass * klass)
|
|||
(GCallback) on_bye_timeout, rtpsession);
|
||||
g_signal_connect (rtpsession->priv->session, "on-timeout",
|
||||
(GCallback) on_timeout, rtpsession);
|
||||
g_signal_connect (rtpsession->priv->session, "on-sender-timeout",
|
||||
(GCallback) on_sender_timeout, rtpsession);
|
||||
rtpsession->priv->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
|
||||
(GDestroyNotify) gst_caps_unref);
|
||||
|
||||
|
|
|
@ -71,6 +71,7 @@ struct _GstRtpSessionClass {
|
|||
void (*on_bye_ssrc) (GstRtpSession *sess, guint32 ssrc);
|
||||
void (*on_bye_timeout) (GstRtpSession *sess, guint32 ssrc);
|
||||
void (*on_timeout) (GstRtpSession *sess, guint32 ssrc);
|
||||
void (*on_sender_timeout) (GstRtpSession *sess, guint32 ssrc);
|
||||
};
|
||||
|
||||
GType gst_rtp_session_get_type (void);
|
||||
|
|
|
@ -104,6 +104,7 @@ rtp_jitter_buffer_reset_skew (RTPJitterBuffer * jbuf)
|
|||
{
|
||||
jbuf->base_time = -1;
|
||||
jbuf->base_rtptime = -1;
|
||||
jbuf->base_extrtp = -1;
|
||||
jbuf->ext_rtptime = -1;
|
||||
jbuf->window_pos = 0;
|
||||
jbuf->window_filling = TRUE;
|
||||
|
@ -185,21 +186,23 @@ calculate_skew (RTPJitterBuffer * jbuf, guint32 rtptime, GstClockTime time,
|
|||
|
||||
gstrtptime = gst_util_uint64_scale_int (ext_rtptime, GST_SECOND, clock_rate);
|
||||
|
||||
again:
|
||||
/* first time, lock on to time and gstrtptime */
|
||||
if (jbuf->base_time == -1)
|
||||
if (G_UNLIKELY (jbuf->base_time == -1))
|
||||
jbuf->base_time = time;
|
||||
if (jbuf->base_rtptime == -1)
|
||||
if (G_UNLIKELY (jbuf->base_rtptime == -1)) {
|
||||
jbuf->base_rtptime = gstrtptime;
|
||||
jbuf->base_extrtp = ext_rtptime;
|
||||
}
|
||||
|
||||
if (gstrtptime >= jbuf->base_rtptime)
|
||||
if (G_LIKELY (gstrtptime >= jbuf->base_rtptime))
|
||||
send_diff = gstrtptime - jbuf->base_rtptime;
|
||||
else {
|
||||
/* elapsed time at sender, timestamps can go backwards and thus be smaller
|
||||
* than our base time, take a new base time in that case. */
|
||||
GST_DEBUG ("backward timestamps at server, taking new base time");
|
||||
jbuf->base_rtptime = gstrtptime;
|
||||
jbuf->base_time = time;
|
||||
jbuf->base_rtptime = gstrtptime;
|
||||
jbuf->base_extrtp = ext_rtptime;
|
||||
send_diff = 0;
|
||||
}
|
||||
|
||||
|
@ -208,27 +211,6 @@ again:
|
|||
GST_TIME_ARGS (gstrtptime), GST_TIME_ARGS (jbuf->base_rtptime),
|
||||
GST_TIME_ARGS (send_diff));
|
||||
|
||||
if (jbuf->prev_send_diff != -1 && time != -1) {
|
||||
gint64 delta_diff;
|
||||
|
||||
if (send_diff > jbuf->prev_send_diff)
|
||||
delta_diff = send_diff - jbuf->prev_send_diff;
|
||||
else
|
||||
delta_diff = jbuf->prev_send_diff - send_diff;
|
||||
|
||||
/* server changed rtp timestamps too quickly, reset skew detection and start
|
||||
* again. This value is sortof arbitrary and can be a bad measurement up if
|
||||
* there are many packets missing because then we get a big gap that is
|
||||
* unrelated to a timestamp switch. */
|
||||
if (delta_diff > GST_SECOND) {
|
||||
GST_DEBUG ("delta changed too quickly %" GST_TIME_FORMAT " reset skew",
|
||||
GST_TIME_ARGS (delta_diff));
|
||||
rtp_jitter_buffer_reset_skew (jbuf);
|
||||
goto again;
|
||||
}
|
||||
}
|
||||
jbuf->prev_send_diff = send_diff;
|
||||
|
||||
/* we don't have an arrival timestamp so we can't do skew detection. we
|
||||
* should still apply a timestamp based on RTP timestamp and base_time */
|
||||
if (time == -1)
|
||||
|
@ -244,17 +226,30 @@ again:
|
|||
/* measure the diff */
|
||||
delta = ((gint64) recv_diff) - ((gint64) send_diff);
|
||||
|
||||
/* if the difference between the sender timeline and the receiver timeline
|
||||
* changed too quickly we have to resync because the server likely restarted
|
||||
* its timestamps. */
|
||||
if (ABS (delta - jbuf->skew) > GST_SECOND) {
|
||||
GST_DEBUG ("delta %" GST_TIME_FORMAT " too big, reset skew",
|
||||
delta - jbuf->skew);
|
||||
jbuf->base_time = time;
|
||||
jbuf->base_rtptime = gstrtptime;
|
||||
jbuf->base_extrtp = ext_rtptime;
|
||||
send_diff = 0;
|
||||
delta = 0;
|
||||
}
|
||||
|
||||
pos = jbuf->window_pos;
|
||||
|
||||
if (jbuf->window_filling) {
|
||||
if (G_UNLIKELY (jbuf->window_filling)) {
|
||||
/* we are filling the window */
|
||||
GST_DEBUG ("filling %d, delta %" G_GINT64_FORMAT, pos, delta);
|
||||
jbuf->window[pos++] = delta;
|
||||
/* calc the min delta we observed */
|
||||
if (pos == 1 || delta < jbuf->window_min)
|
||||
if (G_UNLIKELY (pos == 1 || delta < jbuf->window_min))
|
||||
jbuf->window_min = delta;
|
||||
|
||||
if (send_diff >= MAX_TIME || pos >= MAX_WINDOW) {
|
||||
if (G_UNLIKELY (send_diff >= MAX_TIME || pos >= MAX_WINDOW)) {
|
||||
jbuf->window_size = pos;
|
||||
|
||||
/* window filled */
|
||||
|
@ -288,11 +283,11 @@ again:
|
|||
old = jbuf->window[pos];
|
||||
jbuf->window[pos++] = delta;
|
||||
|
||||
if (delta <= jbuf->window_min) {
|
||||
if (G_UNLIKELY (delta <= jbuf->window_min)) {
|
||||
/* if the new value we inserted is smaller or equal to the current min,
|
||||
* it becomes the new min */
|
||||
jbuf->window_min = delta;
|
||||
} else if (old == jbuf->window_min) {
|
||||
} else if (G_UNLIKELY (old == jbuf->window_min)) {
|
||||
gint64 min = G_MAXINT64;
|
||||
|
||||
/* if we removed the old min, we have to find a new min */
|
||||
|
@ -313,7 +308,7 @@ again:
|
|||
delta, jbuf->window_min);
|
||||
}
|
||||
/* wrap around in the window */
|
||||
if (pos >= jbuf->window_size)
|
||||
if (G_UNLIKELY (pos >= jbuf->window_size))
|
||||
pos = 0;
|
||||
jbuf->window_pos = pos;
|
||||
|
||||
|
@ -382,14 +377,14 @@ rtp_jitter_buffer_insert (RTPJitterBuffer * jbuf, GstBuffer * buf,
|
|||
time = calculate_skew (jbuf, rtptime, time, clock_rate);
|
||||
GST_BUFFER_TIMESTAMP (buf) = time;
|
||||
|
||||
if (list)
|
||||
if (G_LIKELY (list))
|
||||
g_queue_insert_before (jbuf->packets, list, buf);
|
||||
else
|
||||
g_queue_push_tail (jbuf->packets, buf);
|
||||
|
||||
/* tail was changed when we did not find a previous packet, we set the return
|
||||
* flag when requested. */
|
||||
if (tail)
|
||||
if (G_UNLIKELY (tail))
|
||||
*tail = (list == NULL);
|
||||
|
||||
return TRUE;
|
||||
|
@ -514,3 +509,22 @@ rtp_jitter_buffer_get_ts_diff (RTPJitterBuffer * jbuf)
|
|||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* rtp_jitter_buffer_get_sync:
|
||||
* @jbuf: an #RTPJitterBuffer
|
||||
* @rtptime: result RTP time
|
||||
* @timestamp: result GStreamer timestamp
|
||||
*
|
||||
* Returns the relation between the RTP timestamp and the GStreamer timestamp
|
||||
* used for constructing timestamps.
|
||||
*/
|
||||
void
|
||||
rtp_jitter_buffer_get_sync (RTPJitterBuffer * jbuf, guint64 * rtptime,
|
||||
guint64 * timestamp)
|
||||
{
|
||||
if (rtptime)
|
||||
*rtptime = jbuf->base_extrtp;
|
||||
if (timestamp)
|
||||
*timestamp = jbuf->base_time + jbuf->skew;
|
||||
}
|
||||
|
|
|
@ -22,7 +22,6 @@
|
|||
|
||||
#include <gst/gst.h>
|
||||
#include <gst/rtp/gstrtcpbuffer.h>
|
||||
#include <gst/netbuffer/gstnetbuffer.h>
|
||||
|
||||
typedef struct _RTPJitterBuffer RTPJitterBuffer;
|
||||
typedef struct _RTPJitterBufferClass RTPJitterBufferClass;
|
||||
|
@ -57,6 +56,7 @@ struct _RTPJitterBuffer {
|
|||
/* for calculating skew */
|
||||
GstClockTime base_time;
|
||||
GstClockTime base_rtptime;
|
||||
GstClockTime base_extrtp;
|
||||
guint64 ext_rtptime;
|
||||
gint64 window[RTP_JITTER_BUFFER_MAX_WINDOW];
|
||||
guint window_pos;
|
||||
|
@ -90,4 +90,8 @@ void rtp_jitter_buffer_flush (RTPJitterBuffer *jbuf)
|
|||
guint rtp_jitter_buffer_num_packets (RTPJitterBuffer *jbuf);
|
||||
guint32 rtp_jitter_buffer_get_ts_diff (RTPJitterBuffer *jbuf);
|
||||
|
||||
void rtp_jitter_buffer_get_sync (RTPJitterBuffer *jbuf, guint64 *rtptime,
|
||||
guint64 *timestamp);
|
||||
|
||||
|
||||
#endif /* __RTP_JITTER_BUFFER_H__ */
|
||||
|
|
|
@ -40,6 +40,7 @@ enum
|
|||
SIGNAL_ON_BYE_SSRC,
|
||||
SIGNAL_ON_BYE_TIMEOUT,
|
||||
SIGNAL_ON_TIMEOUT,
|
||||
SIGNAL_ON_SENDER_TIMEOUT,
|
||||
LAST_SIGNAL
|
||||
};
|
||||
|
||||
|
@ -212,6 +213,18 @@ rtp_session_class_init (RTPSessionClass * klass)
|
|||
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_timeout),
|
||||
NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
|
||||
RTP_TYPE_SOURCE);
|
||||
/**
|
||||
* RTPSession::on-sender-timeout:
|
||||
* @session: the object which received the signal
|
||||
* @src: the RTPSource that timed out
|
||||
*
|
||||
* Notify of an SSRC that was a sender but timed out and became a receiver.
|
||||
*/
|
||||
rtp_session_signals[SIGNAL_ON_SENDER_TIMEOUT] =
|
||||
g_signal_new ("on-sender-timeout", G_TYPE_FROM_CLASS (klass),
|
||||
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_sender_timeout),
|
||||
NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
|
||||
RTP_TYPE_SOURCE);
|
||||
|
||||
g_object_class_install_property (gobject_class, PROP_INTERNAL_SOURCE,
|
||||
g_param_spec_object ("internal-source", "Internal Source",
|
||||
|
@ -513,6 +526,15 @@ on_timeout (RTPSession * sess, RTPSource * source)
|
|||
RTP_SESSION_LOCK (sess);
|
||||
}
|
||||
|
||||
static void
|
||||
on_sender_timeout (RTPSession * sess, RTPSource * source)
|
||||
{
|
||||
RTP_SESSION_UNLOCK (sess);
|
||||
g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SENDER_TIMEOUT], 0,
|
||||
source);
|
||||
RTP_SESSION_LOCK (sess);
|
||||
}
|
||||
|
||||
/**
|
||||
* rtp_session_new:
|
||||
*
|
||||
|
@ -908,9 +930,8 @@ check_collision (RTPSession * sess, RTPSource * source,
|
|||
RTPArrivalStats * arrival, gboolean rtp)
|
||||
{
|
||||
/* If we have not arrival address, we can't do collision checking */
|
||||
if (!arrival->have_address) {
|
||||
if (!arrival->have_address)
|
||||
return FALSE;
|
||||
}
|
||||
|
||||
if (sess->source != source) {
|
||||
/* This is not our local source, but lets check if two remote
|
||||
|
@ -1479,12 +1500,6 @@ rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet,
|
|||
if (!source)
|
||||
return;
|
||||
|
||||
/* we somehow need to transfer the clock_base and the base time to the next
|
||||
* element, we use the offset and offset_end fields in the buffer for this
|
||||
* hack */
|
||||
GST_BUFFER_OFFSET (packet->buffer) = source->clock_base;
|
||||
GST_BUFFER_OFFSET_END (packet->buffer) = source->clock_base_time;
|
||||
|
||||
prevsender = RTP_SOURCE_IS_SENDER (source);
|
||||
|
||||
/* first update the source */
|
||||
|
@ -2096,6 +2111,7 @@ session_cleanup (const gchar * key, RTPSource * source, ReportData * data)
|
|||
{
|
||||
gboolean remove = FALSE;
|
||||
gboolean byetimeout = FALSE;
|
||||
gboolean sendertimeout = FALSE;
|
||||
gboolean is_sender, is_active;
|
||||
RTPSession *sess = data->sess;
|
||||
GstClockTime interval;
|
||||
|
@ -2138,6 +2154,7 @@ session_cleanup (const gchar * key, RTPSource * source, ReportData * data)
|
|||
GST_TIME_ARGS (source->last_rtp_activity));
|
||||
source->is_sender = FALSE;
|
||||
sess->stats.sender_sources--;
|
||||
sendertimeout = TRUE;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -2153,6 +2170,9 @@ session_cleanup (const gchar * key, RTPSource * source, ReportData * data)
|
|||
on_bye_timeout (sess, source);
|
||||
else
|
||||
on_timeout (sess, source);
|
||||
} else {
|
||||
if (sendertimeout)
|
||||
on_sender_timeout (sess, source);
|
||||
}
|
||||
return remove;
|
||||
}
|
||||
|
|
|
@ -228,6 +228,7 @@ struct _RTPSessionClass {
|
|||
void (*on_bye_ssrc) (RTPSession *sess, RTPSource *source);
|
||||
void (*on_bye_timeout) (RTPSession *sess, RTPSource *source);
|
||||
void (*on_timeout) (RTPSession *sess, RTPSource *source);
|
||||
void (*on_sender_timeout) (RTPSession *sess, RTPSource *source);
|
||||
};
|
||||
|
||||
GType rtp_session_get_type (void);
|
||||
|
|
|
@ -170,8 +170,6 @@ rtp_source_init (RTPSource * src)
|
|||
|
||||
src->payload = 0;
|
||||
src->clock_rate = -1;
|
||||
src->clock_base = -1;
|
||||
src->clock_base_time = -1;
|
||||
src->packets = g_queue_new ();
|
||||
src->seqnum_base = -1;
|
||||
src->last_rtptime = -1;
|
||||
|
@ -527,10 +525,6 @@ rtp_source_update_caps (RTPSource * src, GstCaps * caps)
|
|||
gst_structure_get_int (s, "clock-rate", &src->clock_rate);
|
||||
GST_DEBUG ("got clock-rate %d", src->clock_rate);
|
||||
|
||||
if (gst_structure_get_uint (s, "clock-base", &val))
|
||||
src->clock_base = val;
|
||||
GST_DEBUG ("got clock-base %" G_GINT64_FORMAT, src->clock_base);
|
||||
|
||||
if (gst_structure_get_uint (s, "seqnum-base", &val))
|
||||
src->seqnum_base = val;
|
||||
GST_DEBUG ("got seqnum-base %" G_GINT32_FORMAT, src->seqnum_base);
|
||||
|
@ -771,13 +765,6 @@ calculate_jitter (RTPSource * src, GstBuffer * buffer,
|
|||
|
||||
rtptime = gst_rtp_buffer_get_timestamp (buffer);
|
||||
|
||||
/* no clock-base, take first rtptime as base */
|
||||
if (src->clock_base == -1) {
|
||||
GST_DEBUG ("using clock-base of %" G_GUINT32_FORMAT, rtptime);
|
||||
src->clock_base = rtptime;
|
||||
src->clock_base_time = arrival->timestamp;
|
||||
}
|
||||
|
||||
/* convert arrival time to RTP timestamp units, truncate to 32 bits, we don't
|
||||
* care about the absolute value, just the difference. */
|
||||
rtparrival = gst_util_uint64_scale_int (ntpnstime, clock_rate, GST_SECOND);
|
||||
|
@ -923,13 +910,11 @@ rtp_source_process_rtp (RTPSource * src, GstBuffer * buffer,
|
|||
} else {
|
||||
/* unacceptable jump */
|
||||
stats->bad_seq = (seqnr + 1) & (RTP_SEQ_MOD - 1);
|
||||
src->clock_base = -1;
|
||||
goto bad_sequence;
|
||||
}
|
||||
} else {
|
||||
/* duplicate or reordered packet, will be filtered by jitterbuffer. */
|
||||
GST_WARNING ("duplicate or reordered packet");
|
||||
src->clock_base = -1;
|
||||
}
|
||||
|
||||
src->stats.octets_received += arrival->payload_len;
|
||||
|
|
|
@ -32,8 +32,6 @@
|
|||
#define RTP_DEFAULT_PROBATION 2
|
||||
|
||||
#define RTP_SEQ_MOD (1 << 16)
|
||||
#define RTP_MAX_DROPOUT 3000
|
||||
#define RTP_MAX_MISORDER 100
|
||||
|
||||
typedef struct _RTPSource RTPSource;
|
||||
typedef struct _RTPSourceClass RTPSourceClass;
|
||||
|
@ -133,8 +131,6 @@ struct _RTPSource {
|
|||
GstCaps *caps;
|
||||
gint clock_rate;
|
||||
gint32 seqnum_base;
|
||||
gint64 clock_base;
|
||||
guint64 clock_base_time;
|
||||
|
||||
GstClockTime bye_time;
|
||||
GstClockTime last_activity;
|
||||
|
|
|
@ -150,11 +150,22 @@ typedef struct {
|
|||
#define RTP_STATS_RECEIVER_FRACTION (1.0 - RTP_STATS_SENDER_FRACTION)
|
||||
|
||||
/*
|
||||
* When receiving a BYE from a source, remove the source fomr the database
|
||||
* When receiving a BYE from a source, remove the source from the database
|
||||
* after this timeout.
|
||||
*/
|
||||
#define RTP_STATS_BYE_TIMEOUT (2 * GST_SECOND)
|
||||
|
||||
/*
|
||||
* The maximum number of missing packets we tollerate. These are packets with a
|
||||
* sequence number bigger than the last seen packet.
|
||||
*/
|
||||
#define RTP_MAX_DROPOUT 3000
|
||||
/*
|
||||
* The maximum number of misordered packets we tollerate. These are packets with
|
||||
* a sequence number smaller than the last seen packet.
|
||||
*/
|
||||
#define RTP_MAX_MISORDER 100
|
||||
|
||||
/**
|
||||
* RTPSessionStats:
|
||||
*
|
||||
|
|
Loading…
Reference in a new issue