gst/rtpmanager/gstrtpbin.c: Calculate and configure the NTP base time so that we can generate better

Original commit message from CVS:
* gst/rtpmanager/gstrtpbin.c: (calc_ntp_ns_base),
(gst_rtp_bin_change_state), (new_payload_found), (create_send_rtp):
Calculate and configure the NTP base time so that we can generate better
NTP times in SR packets.
Set caps on new ghostpad.
* gst/rtpmanager/gstrtpjitterbuffer.c:
(gst_rtp_jitter_buffer_loop):
Clean debug statement.
* gst/rtpmanager/gstrtpsession.c: (gst_rtp_session_class_init),
(gst_rtp_session_init), (gst_rtp_session_set_property),
(gst_rtp_session_get_property), (get_current_ntp_ns_time),
(rtcp_thread), (gst_rtp_session_event_recv_rtp_sink),
(gst_rtp_session_internal_links), (gst_rtp_session_chain_recv_rtp),
(gst_rtp_session_event_send_rtp_sink),
(gst_rtp_session_chain_send_rtp), (create_recv_rtp_sink),
(create_send_rtp_sink):
* gst/rtpmanager/gstrtpsession.h:
Add ntp-ns-base property to convert running_time to NTP time.
Handle NEWSEGMENT events on send and recv RTP pads so that we can
calculate the running time and thus NTP time of the packets.
Simplify getting the current NTP time using the pipeline clock.
Implement internal links functions.
Use the buffer timestamp to calculate the NTP time instead of the clock.
* gst/rtpmanager/gstrtpssrcdemux.c: (create_demux_pad_for_ssrc),
(gst_rtp_ssrc_demux_init), (gst_rtp_ssrc_demux_sink_event),
(gst_rtp_ssrc_demux_chain), (gst_rtp_ssrc_demux_rtcp_chain),
(gst_rtp_ssrc_demux_internal_links),
(gst_rtp_ssrc_demux_src_query):
* gst/rtpmanager/gstrtpssrcdemux.h:
Implement internal links function.
Calculate the diff between different streams, this might be used later
to get the inter stream latency.
* gst/rtpmanager/rtpsession.c: (rtp_session_send_rtp):
Simple cleanup.
* gst/rtpmanager/rtpsource.c: (rtp_source_init),
(calculate_jitter), (rtp_source_send_rtp), (rtp_source_get_new_sr):
Make the clock skew window a little bigger.
Apply the clock skew to all buffers, not just one with a new timestamp.
Calculate and debug sender clock drift.
Use extended last timestamp to interpollate for SR reports.
This commit is contained in:
Wim Taymans 2007-09-12 18:04:32 +00:00 committed by Tim-Philipp Müller
parent aa8985d1e4
commit 919deb4490
8 changed files with 306 additions and 39 deletions

View file

@ -208,6 +208,8 @@ GST_STATIC_PAD_TEMPLATE ("sink_%d",
struct _GstRtpBinPrivate struct _GstRtpBinPrivate
{ {
GMutex *bin_lock; GMutex *bin_lock;
GstClockTime ntp_ns_base;
}; };
/* signals and args */ /* signals and args */
@ -1142,6 +1144,30 @@ gst_rtp_bin_provide_clock (GstElement * element)
return GST_CLOCK_CAST (gst_object_ref (rtpbin->provided_clock)); return GST_CLOCK_CAST (gst_object_ref (rtpbin->provided_clock));
} }
static void
calc_ntp_ns_base (GstRtpBin * bin)
{
GstClockTime now;
GTimeVal current;
GSList *walk;
/* get the current time and convert it to NTP time in nanoseconds */
g_get_current_time (&current);
now = GST_TIMEVAL_TO_TIME (current);
now += (2208988800LL * GST_SECOND);
GST_RTP_BIN_LOCK (bin);
bin->priv->ntp_ns_base = now;
for (walk = bin->sessions; walk; walk = g_slist_next (walk)) {
GstRtpBinSession *session = (GstRtpBinSession *) walk->data;
g_object_set (session->session, "ntp-ns-base", now, NULL);
}
GST_RTP_BIN_UNLOCK (bin);
return;
}
static GstStateChangeReturn static GstStateChangeReturn
gst_rtp_bin_change_state (GstElement * element, GstStateChange transition) gst_rtp_bin_change_state (GstElement * element, GstStateChange transition)
{ {
@ -1156,6 +1182,7 @@ gst_rtp_bin_change_state (GstElement * element, GstStateChange transition)
case GST_STATE_CHANGE_READY_TO_PAUSED: case GST_STATE_CHANGE_READY_TO_PAUSED:
break; break;
case GST_STATE_CHANGE_PAUSED_TO_PLAYING: case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
calc_ntp_ns_base (rtpbin);
break; break;
default: default:
break; break;
@ -1199,6 +1226,7 @@ new_payload_found (GstElement * element, guint pt, GstPad * pad,
gpad = gst_ghost_pad_new_from_template (padname, pad, templ); gpad = gst_ghost_pad_new_from_template (padname, pad, templ);
g_free (padname); g_free (padname);
gst_pad_set_caps (gpad, GST_PAD_CAPS (pad));
gst_pad_set_active (gpad, TRUE); gst_pad_set_active (gpad, TRUE);
gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), gpad); gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), gpad);
} }
@ -1553,9 +1581,6 @@ create_send_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name)
if (session->send_rtp_sink == NULL) if (session->send_rtp_sink == NULL)
goto pad_failed; goto pad_failed;
g_signal_connect (session->send_rtp_sink, "notify::caps",
(GCallback) caps_changed, session);
result = result =
gst_ghost_pad_new_from_template (name, session->send_rtp_sink, templ); gst_ghost_pad_new_from_template (name, session->send_rtp_sink, templ);
gst_pad_set_active (result, TRUE); gst_pad_set_active (result, TRUE);

View file

@ -977,7 +977,7 @@ again:
GST_DEBUG_OBJECT (jitterbuffer, GST_DEBUG_OBJECT (jitterbuffer,
"Popped buffer #%d, rtptime %u, exttime %" G_GUINT64_FORMAT "Popped buffer #%d, rtptime %u, exttime %" G_GUINT64_FORMAT
",now %d left", seqnum, rtp_time, exttimestamp, ", now %d left", seqnum, rtp_time, exttimestamp,
rtp_jitter_buffer_num_packets (priv->jbuf)); rtp_jitter_buffer_num_packets (priv->jbuf));
/* If we don't know what the next seqnum should be (== -1) we have to wait /* If we don't know what the next seqnum should be (== -1) we have to wait

View file

@ -214,6 +214,8 @@ enum
LAST_SIGNAL LAST_SIGNAL
}; };
#define DEFAULT_NTP_NS_BASE 0
enum enum
{ {
PROP_0, PROP_0,
@ -462,6 +464,11 @@ gst_rtp_session_class_init (GstRtpSessionClass * klass)
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, on_timeout), G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, on_timeout),
NULL, NULL, g_cclosure_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT); NULL, NULL, g_cclosure_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
g_object_class_install_property (gobject_class, PROP_NTP_NS_BASE,
g_param_spec_uint64 ("ntp-ns-base", "NTP base time",
"The NTP base time corresponding to running_time 0", 0,
G_MAXUINT64, DEFAULT_NTP_NS_BASE, G_PARAM_READWRITE));
gstelement_class->change_state = gstelement_class->change_state =
GST_DEBUG_FUNCPTR (gst_rtp_session_change_state); GST_DEBUG_FUNCPTR (gst_rtp_session_change_state);
gstelement_class->request_new_pad = gstelement_class->request_new_pad =
@ -497,6 +504,9 @@ gst_rtp_session_init (GstRtpSession * rtpsession, GstRtpSessionClass * klass)
g_signal_connect (rtpsession->priv->session, "on-timeout", g_signal_connect (rtpsession->priv->session, "on-timeout",
(GCallback) on_timeout, rtpsession); (GCallback) on_timeout, rtpsession);
rtpsession->priv->ptmap = g_hash_table_new (NULL, NULL); rtpsession->priv->ptmap = g_hash_table_new (NULL, NULL);
gst_segment_init (&rtpsession->recv_rtp_seg, GST_FORMAT_UNDEFINED);
gst_segment_init (&rtpsession->send_rtp_seg, GST_FORMAT_UNDEFINED);
} }
static void static void
@ -521,7 +531,11 @@ gst_rtp_session_set_property (GObject * object, guint prop_id,
switch (prop_id) { switch (prop_id) {
case PROP_NTP_NS_BASE: case PROP_NTP_NS_BASE:
GST_OBJECT_LOCK (rtpsession);
rtpsession->priv->ntpnsbase = g_value_get_uint64 (value); rtpsession->priv->ntpnsbase = g_value_get_uint64 (value);
GST_DEBUG_OBJECT (rtpsession, "setting NTP base to %" GST_TIME_FORMAT,
GST_TIME_ARGS (rtpsession->priv->ntpnsbase));
GST_OBJECT_UNLOCK (rtpsession);
break; break;
default: default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
@ -539,7 +553,9 @@ gst_rtp_session_get_property (GObject * object, guint prop_id,
switch (prop_id) { switch (prop_id) {
case PROP_NTP_NS_BASE: case PROP_NTP_NS_BASE:
GST_OBJECT_LOCK (rtpsession);
g_value_set_uint64 (value, rtpsession->priv->ntpnsbase); g_value_set_uint64 (value, rtpsession->priv->ntpnsbase);
GST_OBJECT_UNLOCK (rtpsession);
break; break;
default: default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
@ -548,19 +564,31 @@ gst_rtp_session_get_property (GObject * object, guint prop_id,
} }
static guint64 static guint64
get_current_ntp_ns_time (GstRtpSession * rtpsession, GstClock * clock) get_current_ntp_ns_time (GstRtpSession * rtpsession)
{ {
guint64 ntpnstime; guint64 ntpnstime;
GstClock *clock;
GstClockTime base_time, ntpnsbase;
GST_OBJECT_LOCK (rtpsession);
if ((clock = GST_ELEMENT_CLOCK (rtpsession))) {
base_time = GST_ELEMENT_CAST (rtpsession)->base_time;
ntpnsbase = rtpsession->priv->ntpnsbase;
gst_object_ref (clock);
GST_OBJECT_UNLOCK (rtpsession);
if (clock) {
/* get current NTP time */ /* get current NTP time */
ntpnstime = gst_clock_get_time (clock); ntpnstime = gst_clock_get_time (clock);
/* convert to running time */ /* convert to running time */
ntpnstime -= gst_element_get_base_time (GST_ELEMENT_CAST (rtpsession)); ntpnstime -= base_time;
/* add NTP base offset */ /* add NTP base offset */
ntpnstime += rtpsession->priv->ntpnsbase; ntpnstime += ntpnsbase;
} else
gst_object_unref (clock);
} else {
GST_OBJECT_UNLOCK (rtpsession);
ntpnstime = -1; ntpnstime = -1;
}
return ntpnstime; return ntpnstime;
} }
@ -568,7 +596,7 @@ get_current_ntp_ns_time (GstRtpSession * rtpsession, GstClock * clock)
static void static void
rtcp_thread (GstRtpSession * rtpsession) rtcp_thread (GstRtpSession * rtpsession)
{ {
GstClock *sysclock, *clock; GstClock *sysclock;
GstClockID id; GstClockID id;
GstClockTime current_time; GstClockTime current_time;
GstClockTime next_timeout; GstClockTime next_timeout;
@ -579,9 +607,6 @@ rtcp_thread (GstRtpSession * rtpsession)
if (sysclock == NULL) if (sysclock == NULL)
goto no_sysclock; goto no_sysclock;
/* to get the current NTP time, we use the pipeline clock */
clock = gst_element_get_clock (GST_ELEMENT_CAST (rtpsession));
current_time = gst_clock_get_time (sysclock); current_time = gst_clock_get_time (sysclock);
GST_DEBUG_OBJECT (rtpsession, "entering RTCP thread"); GST_DEBUG_OBJECT (rtpsession, "entering RTCP thread");
@ -619,7 +644,7 @@ rtcp_thread (GstRtpSession * rtpsession)
current_time = gst_clock_get_time (sysclock); current_time = gst_clock_get_time (sysclock);
/* get current NTP time */ /* get current NTP time */
ntpnstime = get_current_ntp_ns_time (rtpsession, clock); ntpnstime = get_current_ntp_ns_time (rtpsession);
/* we get unlocked because we need to perform reconsideration, don't perform /* we get unlocked because we need to perform reconsideration, don't perform
* the timeout but get a new reporting estimate. */ * the timeout but get a new reporting estimate. */
@ -969,6 +994,41 @@ gst_rtp_session_event_recv_rtp_sink (GstPad * pad, GstEvent * event)
GST_EVENT_TYPE_NAME (event)); GST_EVENT_TYPE_NAME (event));
switch (GST_EVENT_TYPE (event)) { switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_FLUSH_STOP:
gst_segment_init (&rtpsession->recv_rtp_seg, GST_FORMAT_UNDEFINED);
break;
case GST_EVENT_NEWSEGMENT:
{
gboolean update;
gdouble rate, arate;
GstFormat format;
gint64 start, stop, time;
GstSegment *segment;
segment = &rtpsession->recv_rtp_seg;
/* the newsegment event is needed to convert the RTP timestamp to
* running_time, which is needed to generate a mapping from RTP to NTP
* timestamps in SR reports */
gst_event_parse_new_segment_full (event, &update, &rate, &arate, &format,
&start, &stop, &time);
GST_DEBUG_OBJECT (rtpsession,
"configured NEWSEGMENT update %d, rate %lf, applied rate %lf, "
"format GST_FORMAT_TIME, "
"%" GST_TIME_FORMAT " -- %" GST_TIME_FORMAT
", time %" GST_TIME_FORMAT ", accum %" GST_TIME_FORMAT,
update, rate, arate, GST_TIME_ARGS (segment->start),
GST_TIME_ARGS (segment->stop), GST_TIME_ARGS (segment->time),
GST_TIME_ARGS (segment->accum));
gst_segment_set_newsegment_full (segment, update, rate,
arate, format, start, stop, time);
/* push event forward */
ret = gst_pad_push_event (rtpsession->recv_rtp_src, event);
break;
}
default: default:
ret = gst_pad_push_event (rtpsession->recv_rtp_src, event); ret = gst_pad_push_event (rtpsession->recv_rtp_src, event);
break; break;
@ -976,6 +1036,31 @@ gst_rtp_session_event_recv_rtp_sink (GstPad * pad, GstEvent * event)
gst_object_unref (rtpsession); gst_object_unref (rtpsession);
return ret; return ret;
}
static GList *
gst_rtp_session_internal_links (GstPad * pad)
{
GstRtpSession *rtpsession;
GstRtpSessionPrivate *priv;
GList *res = NULL;
rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
priv = rtpsession->priv;
if (pad == rtpsession->recv_rtp_src) {
res = g_list_prepend (res, rtpsession->recv_rtp_sink);
} else if (pad == rtpsession->recv_rtp_sink) {
res = g_list_prepend (res, rtpsession->recv_rtp_src);
} else if (pad == rtpsession->send_rtp_src) {
res = g_list_prepend (res, rtpsession->send_rtp_sink);
} else if (pad == rtpsession->send_rtp_sink) {
res = g_list_prepend (res, rtpsession->send_rtp_src);
}
gst_object_unref (rtpsession);
return res;
} }
static gboolean static gboolean
@ -1006,14 +1091,25 @@ gst_rtp_session_chain_recv_rtp (GstPad * pad, GstBuffer * buffer)
GstRtpSessionPrivate *priv; GstRtpSessionPrivate *priv;
GstFlowReturn ret; GstFlowReturn ret;
guint64 ntpnstime; guint64 ntpnstime;
GstClockTime timestamp;
rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad)); rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
priv = rtpsession->priv; priv = rtpsession->priv;
GST_DEBUG_OBJECT (rtpsession, "received RTP packet"); GST_DEBUG_OBJECT (rtpsession, "received RTP packet");
ntpnstime = /* get NTP time when this packet was captured, this depends on the timestamp. */
get_current_ntp_ns_time (rtpsession, GST_ELEMENT_CLOCK (rtpsession)); timestamp = GST_BUFFER_TIMESTAMP (buffer);
if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
/* convert to running time using the segment values */
ntpnstime =
gst_segment_to_running_time (&rtpsession->recv_rtp_seg, GST_FORMAT_TIME,
timestamp);
/* add constant to convert running time to NTP time */
ntpnstime += priv->ntpnsbase;
} else {
ntpnstime = get_current_ntp_ns_time (rtpsession);
}
ret = rtp_session_process_rtp (priv->session, buffer, ntpnstime); ret = rtp_session_process_rtp (priv->session, buffer, ntpnstime);
@ -1084,6 +1180,9 @@ gst_rtp_session_event_send_rtp_sink (GstPad * pad, GstEvent * event)
GST_DEBUG_OBJECT (rtpsession, "received event"); GST_DEBUG_OBJECT (rtpsession, "received event");
switch (GST_EVENT_TYPE (event)) { switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_FLUSH_STOP:
gst_segment_init (&rtpsession->send_rtp_seg, GST_FORMAT_UNDEFINED);
break;
case GST_EVENT_NEWSEGMENT: case GST_EVENT_NEWSEGMENT:
{ {
gboolean update; gboolean update;
@ -1146,7 +1245,10 @@ gst_rtp_session_chain_send_rtp (GstPad * pad, GstBuffer * buffer)
timestamp = GST_BUFFER_TIMESTAMP (buffer); timestamp = GST_BUFFER_TIMESTAMP (buffer);
if (GST_CLOCK_TIME_IS_VALID (timestamp)) { if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
/* convert to running time using the segment start value. */ /* convert to running time using the segment start value. */
ntpnstime = timestamp - rtpsession->send_rtp_seg.start; ntpnstime =
gst_segment_to_running_time (&rtpsession->send_rtp_seg, GST_FORMAT_TIME,
timestamp);
/* convert to NTP time by adding the NTP base */
ntpnstime += priv->ntpnsbase; ntpnstime += priv->ntpnsbase;
} else } else
ntpnstime = -1; ntpnstime = -1;
@ -1175,6 +1277,8 @@ create_recv_rtp_sink (GstRtpSession * rtpsession)
gst_rtp_session_event_recv_rtp_sink); gst_rtp_session_event_recv_rtp_sink);
gst_pad_set_setcaps_function (rtpsession->recv_rtp_sink, gst_pad_set_setcaps_function (rtpsession->recv_rtp_sink,
gst_rtp_session_sink_setcaps); gst_rtp_session_sink_setcaps);
gst_pad_set_internal_link_function (rtpsession->recv_rtp_sink,
gst_rtp_session_internal_links);
gst_pad_set_active (rtpsession->recv_rtp_sink, TRUE); gst_pad_set_active (rtpsession->recv_rtp_sink, TRUE);
gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), gst_element_add_pad (GST_ELEMENT_CAST (rtpsession),
rtpsession->recv_rtp_sink); rtpsession->recv_rtp_sink);
@ -1183,6 +1287,8 @@ create_recv_rtp_sink (GstRtpSession * rtpsession)
rtpsession->recv_rtp_src = rtpsession->recv_rtp_src =
gst_pad_new_from_static_template (&rtpsession_recv_rtp_src_template, gst_pad_new_from_static_template (&rtpsession_recv_rtp_src_template,
"recv_rtp_src"); "recv_rtp_src");
gst_pad_set_internal_link_function (rtpsession->recv_rtp_src,
gst_rtp_session_internal_links);
gst_pad_use_fixed_caps (rtpsession->recv_rtp_src); gst_pad_use_fixed_caps (rtpsession->recv_rtp_src);
gst_pad_set_active (rtpsession->recv_rtp_src, TRUE); gst_pad_set_active (rtpsession->recv_rtp_src, TRUE);
gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->recv_rtp_src); gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->recv_rtp_src);
@ -1235,8 +1341,8 @@ create_send_rtp_sink (GstRtpSession * rtpsession)
gst_rtp_session_chain_send_rtp); gst_rtp_session_chain_send_rtp);
gst_pad_set_event_function (rtpsession->send_rtp_sink, gst_pad_set_event_function (rtpsession->send_rtp_sink,
gst_rtp_session_event_send_rtp_sink); gst_rtp_session_event_send_rtp_sink);
gst_pad_set_setcaps_function (rtpsession->send_rtp_sink, gst_pad_set_internal_link_function (rtpsession->send_rtp_sink,
gst_rtp_session_sink_setcaps); gst_rtp_session_internal_links);
gst_pad_set_active (rtpsession->send_rtp_sink, TRUE); gst_pad_set_active (rtpsession->send_rtp_sink, TRUE);
gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), gst_element_add_pad (GST_ELEMENT_CAST (rtpsession),
rtpsession->send_rtp_sink); rtpsession->send_rtp_sink);
@ -1245,6 +1351,8 @@ create_send_rtp_sink (GstRtpSession * rtpsession)
gst_pad_new_from_static_template (&rtpsession_send_rtp_src_template, gst_pad_new_from_static_template (&rtpsession_send_rtp_src_template,
"send_rtp_src"); "send_rtp_src");
gst_pad_use_fixed_caps (rtpsession->send_rtp_src); gst_pad_use_fixed_caps (rtpsession->send_rtp_src);
gst_pad_set_internal_link_function (rtpsession->send_rtp_src,
gst_rtp_session_internal_links);
gst_pad_set_active (rtpsession->send_rtp_src, TRUE); gst_pad_set_active (rtpsession->send_rtp_src, TRUE);
gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->send_rtp_src); gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->send_rtp_src);

View file

@ -43,6 +43,7 @@ struct _GstRtpSession {
/*< private >*/ /*< private >*/
GstPad *recv_rtp_sink; GstPad *recv_rtp_sink;
GstSegment recv_rtp_seg;
GstPad *recv_rtcp_sink; GstPad *recv_rtcp_sink;
GstPad *send_rtp_sink; GstPad *send_rtp_sink;
GstSegment send_rtp_seg; GstSegment send_rtp_seg;

View file

@ -125,6 +125,8 @@ static gboolean gst_rtp_ssrc_demux_rtcp_sink_event (GstPad * pad,
/* srcpad stuff */ /* srcpad stuff */
static gboolean gst_rtp_ssrc_demux_src_event (GstPad * pad, GstEvent * event); static gboolean gst_rtp_ssrc_demux_src_event (GstPad * pad, GstEvent * event);
static GList *gst_rtp_ssrc_demux_internal_links (GstPad * pad);
static gboolean gst_rtp_ssrc_demux_src_query (GstPad * pad, GstQuery * query);
static guint gst_rtp_ssrc_demux_signals[LAST_SIGNAL] = { 0 }; static guint gst_rtp_ssrc_demux_signals[LAST_SIGNAL] = { 0 };
@ -137,6 +139,7 @@ struct _GstRtpSsrcDemuxPad
GstPad *rtp_pad; GstPad *rtp_pad;
GstCaps *caps; GstCaps *caps;
GstPad *rtcp_pad; GstPad *rtcp_pad;
GstClockTime first_ts;
}; };
/* find a src pad for a given SSRC, returns NULL if the SSRC was not found /* find a src pad for a given SSRC, returns NULL if the SSRC was not found
@ -156,7 +159,8 @@ find_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc)
} }
static GstRtpSsrcDemuxPad * static GstRtpSsrcDemuxPad *
create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc) create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc,
GstClockTime timestamp)
{ {
GstPad *rtp_pad, *rtcp_pad; GstPad *rtp_pad, *rtcp_pad;
GstElementClass *klass; GstElementClass *klass;
@ -177,13 +181,27 @@ create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc)
rtcp_pad = gst_pad_new_from_template (templ, padname); rtcp_pad = gst_pad_new_from_template (templ, padname);
g_free (padname); g_free (padname);
/* we use the first timestamp received to calculate the difference between
* timestamps on all streams */
GST_DEBUG_OBJECT (demux, "SSRC %08x, first timestamp %" GST_TIME_FORMAT,
ssrc, GST_TIME_ARGS (timestamp));
/* wrap in structure and add to list */ /* wrap in structure and add to list */
demuxpad = g_new0 (GstRtpSsrcDemuxPad, 1); demuxpad = g_new0 (GstRtpSsrcDemuxPad, 1);
demuxpad->ssrc = ssrc; demuxpad->ssrc = ssrc;
demuxpad->rtp_pad = rtp_pad; demuxpad->rtp_pad = rtp_pad;
demuxpad->rtcp_pad = rtcp_pad; demuxpad->rtcp_pad = rtcp_pad;
demuxpad->first_ts = timestamp;
GST_DEBUG_OBJECT (demux, "first timestamp %" GST_TIME_FORMAT,
GST_TIME_ARGS (timestamp));
gst_pad_set_element_private (rtp_pad, demuxpad);
gst_pad_set_element_private (rtcp_pad, demuxpad);
demux->srcpads = g_slist_prepend (demux->srcpads, demuxpad); demux->srcpads = g_slist_prepend (demux->srcpads, demuxpad);
/* unlock to perform the remainder and to fire our signal */
GST_OBJECT_UNLOCK (demux); GST_OBJECT_UNLOCK (demux);
/* copy caps from input */ /* copy caps from input */
@ -193,7 +211,13 @@ create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc)
gst_pad_use_fixed_caps (rtcp_pad); gst_pad_use_fixed_caps (rtcp_pad);
gst_pad_set_event_function (rtp_pad, gst_rtp_ssrc_demux_src_event); gst_pad_set_event_function (rtp_pad, gst_rtp_ssrc_demux_src_event);
gst_pad_set_query_function (rtp_pad, gst_rtp_ssrc_demux_src_query);
gst_pad_set_internal_link_function (rtp_pad,
gst_rtp_ssrc_demux_internal_links);
gst_pad_set_active (rtp_pad, TRUE); gst_pad_set_active (rtp_pad, TRUE);
gst_pad_set_internal_link_function (rtcp_pad,
gst_rtp_ssrc_demux_internal_links);
gst_pad_set_active (rtcp_pad, TRUE); gst_pad_set_active (rtcp_pad, TRUE);
gst_element_add_pad (GST_ELEMENT_CAST (demux), rtp_pad); gst_element_add_pad (GST_ELEMENT_CAST (demux), rtp_pad);
@ -277,6 +301,8 @@ gst_rtp_ssrc_demux_init (GstRtpSsrcDemux * demux,
gst_pad_set_event_function (demux->rtcp_sink, gst_pad_set_event_function (demux->rtcp_sink,
gst_rtp_ssrc_demux_rtcp_sink_event); gst_rtp_ssrc_demux_rtcp_sink_event);
gst_element_add_pad (GST_ELEMENT_CAST (demux), demux->rtcp_sink); gst_element_add_pad (GST_ELEMENT_CAST (demux), demux->rtcp_sink);
gst_segment_init (&demux->segment, GST_FORMAT_UNDEFINED);
} }
static void static void
@ -298,6 +324,9 @@ gst_rtp_ssrc_demux_sink_event (GstPad * pad, GstEvent * event)
demux = GST_RTP_SSRC_DEMUX (gst_pad_get_parent (pad)); demux = GST_RTP_SSRC_DEMUX (gst_pad_get_parent (pad));
switch (GST_EVENT_TYPE (event)) { switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_FLUSH_STOP:
gst_segment_init (&demux->segment, GST_FORMAT_UNDEFINED);
break;
case GST_EVENT_NEWSEGMENT: case GST_EVENT_NEWSEGMENT:
default: default:
{ {
@ -370,7 +399,9 @@ gst_rtp_ssrc_demux_chain (GstPad * pad, GstBuffer * buf)
GST_OBJECT_LOCK (demux); GST_OBJECT_LOCK (demux);
dpad = find_demux_pad_for_ssrc (demux, ssrc); dpad = find_demux_pad_for_ssrc (demux, ssrc);
if (dpad == NULL) { if (dpad == NULL) {
if (!(dpad = create_demux_pad_for_ssrc (demux, ssrc))) if (!(dpad =
create_demux_pad_for_ssrc (demux, ssrc,
GST_BUFFER_TIMESTAMP (buf))))
goto create_failed; goto create_failed;
} }
GST_OBJECT_UNLOCK (demux); GST_OBJECT_UNLOCK (demux);
@ -419,6 +450,7 @@ gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad, GstBuffer * buf)
/* first packet must be SR or RR or else the validate would have failed */ /* first packet must be SR or RR or else the validate would have failed */
switch (gst_rtcp_packet_get_type (&packet)) { switch (gst_rtcp_packet_get_type (&packet)) {
case GST_RTCP_TYPE_SR: case GST_RTCP_TYPE_SR:
/* get the ssrc so that we can route it to the right source pad */
gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, NULL, NULL, NULL, gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, NULL, NULL, NULL,
NULL); NULL);
break; break;
@ -435,7 +467,7 @@ gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad, GstBuffer * buf)
dpad = find_demux_pad_for_ssrc (demux, ssrc); dpad = find_demux_pad_for_ssrc (demux, ssrc);
if (dpad == NULL) { if (dpad == NULL) {
GST_DEBUG_OBJECT (demux, "creating pad for SSRC %08x", ssrc); GST_DEBUG_OBJECT (demux, "creating pad for SSRC %08x", ssrc);
if (!(dpad = create_demux_pad_for_ssrc (demux, ssrc))) if (!(dpad = create_demux_pad_for_ssrc (demux, ssrc, -1)))
goto create_failed; goto create_failed;
} }
GST_OBJECT_UNLOCK (demux); GST_OBJECT_UNLOCK (demux);
@ -482,6 +514,84 @@ gst_rtp_ssrc_demux_src_event (GstPad * pad, GstEvent * event)
return res; return res;
} }
static GList *
gst_rtp_ssrc_demux_internal_links (GstPad * pad)
{
GstRtpSsrcDemux *demux;
GList *res = NULL;
GSList *walk;
demux = GST_RTP_SSRC_DEMUX (gst_pad_get_parent (pad));
GST_OBJECT_LOCK (demux);
for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) {
GstRtpSsrcDemuxPad *dpad = (GstRtpSsrcDemuxPad *) walk->data;
if (pad == demux->rtp_sink) {
res = g_list_prepend (res, dpad->rtp_pad);
} else if (pad == demux->rtcp_sink) {
res = g_list_prepend (res, dpad->rtcp_pad);
} else if (pad == dpad->rtp_pad) {
res = g_list_prepend (res, demux->rtp_sink);
break;
} else if (pad == dpad->rtcp_pad) {
res = g_list_prepend (res, demux->rtcp_sink);
break;
}
}
GST_OBJECT_UNLOCK (demux);
gst_object_unref (demux);
return res;
}
static gboolean
gst_rtp_ssrc_demux_src_query (GstPad * pad, GstQuery * query)
{
GstRtpSsrcDemux *demux;
gboolean res = FALSE;
demux = GST_RTP_SSRC_DEMUX (gst_pad_get_parent (pad));
switch (GST_QUERY_TYPE (query)) {
case GST_QUERY_LATENCY:
{
if ((res = gst_pad_peer_query (demux->rtp_sink, query))) {
gboolean live;
GstClockTime min_latency, max_latency;
GstRtpSsrcDemuxPad *demuxpad;
demuxpad = gst_pad_get_element_private (pad);
gst_query_parse_latency (query, &live, &min_latency, &max_latency);
GST_DEBUG_OBJECT (demux, "peer min latency %" GST_TIME_FORMAT,
GST_TIME_ARGS (min_latency));
GST_DEBUG_OBJECT (demux,
"latency for SSRC %08x, latency %" GST_TIME_FORMAT, demuxpad->ssrc,
GST_TIME_ARGS (demuxpad->first_ts));
#if 0
min_latency += demuxpad->first_ts;
if (max_latency != -1)
max_latency += demuxpad->first_ts;
#endif
gst_query_set_latency (query, live, min_latency, max_latency);
}
break;
}
default:
res = gst_pad_query_default (pad, query);
break;
}
gst_object_unref (demux);
return res;
}
static GstStateChangeReturn static GstStateChangeReturn
gst_rtp_ssrc_demux_change_state (GstElement * element, gst_rtp_ssrc_demux_change_state (GstElement * element,
GstStateChange transition) GstStateChange transition)

View file

@ -36,6 +36,8 @@ struct _GstRtpSsrcDemux
{ {
GstElement parent; GstElement parent;
GstSegment segment;
GstPad *rtp_sink; GstPad *rtp_sink;
GstPad *rtcp_sink; GstPad *rtcp_sink;
GSList *srcpads; GSList *srcpads;

View file

@ -1423,7 +1423,7 @@ rtp_session_send_rtp (RTPSession * sess, GstBuffer * buffer, guint64 ntptime)
prevsender = RTP_SOURCE_IS_SENDER (source); prevsender = RTP_SOURCE_IS_SENDER (source);
/* we use our own source to send */ /* we use our own source to send */
result = rtp_source_send_rtp (sess->source, buffer, ntptime); result = rtp_source_send_rtp (source, buffer, ntptime);
if (RTP_SOURCE_IS_SENDER (source) && !prevsender) if (RTP_SOURCE_IS_SENDER (source) && !prevsender)
sess->stats.sender_sources++; sess->stats.sender_sources++;

View file

@ -74,6 +74,7 @@ rtp_source_init (RTPSource * src)
src->prev_ext_rtptime = -1; src->prev_ext_rtptime = -1;
src->packets = g_queue_new (); src->packets = g_queue_new ();
src->seqnum_base = -1; src->seqnum_base = -1;
src->last_rtptime = -1;
src->stats.cycles = -1; src->stats.cycles = -1;
src->stats.jitter = 0; src->stats.jitter = 0;
@ -320,22 +321,19 @@ calculate_jitter (RTPSource * src, GstBuffer * buffer,
* out of sync and we must compensate. */ * out of sync and we must compensate. */
skew = ntpdiff - rtpdiff; skew = ntpdiff - rtpdiff;
/* average out the skew to get a smooth value. */ /* average out the skew to get a smooth value. */
src->avg_skew = (31 * src->avg_skew + skew) / 32; src->avg_skew = (63 * src->avg_skew + skew) / 64;
GST_DEBUG ("skew %" G_GINT64_FORMAT ", avg %" G_GINT64_FORMAT, skew, GST_DEBUG ("new skew %" G_GINT64_FORMAT ", avg %" G_GINT64_FORMAT, skew,
src->avg_skew); src->avg_skew);
if (src->avg_skew != 0) {
guint32 timestamp;
/* patch the buffer RTP timestamp with the skew */
GST_DEBUG ("adjusting timestamp %" G_GINT64_FORMAT, src->avg_skew);
timestamp = gst_rtp_buffer_get_timestamp (buffer);
timestamp += src->avg_skew;
gst_rtp_buffer_set_timestamp (buffer, timestamp);
}
/* store previous extended timestamp */ /* store previous extended timestamp */
src->prev_ext_rtptime = ext_rtptime; src->prev_ext_rtptime = ext_rtptime;
} }
if (src->avg_skew != 0) {
/* patch the buffer RTP timestamp with the skew */
GST_DEBUG ("skew timestamp RTP %" G_GUINT32_FORMAT " -> %" G_GINT64_FORMAT,
rtptime, rtptime + src->avg_skew);
gst_rtp_buffer_set_timestamp (buffer, rtptime + src->avg_skew);
}
/* convert arrival time to RTP timestamp units, truncate to 32 bits, we don't /* convert arrival time to RTP timestamp units, truncate to 32 bits, we don't
* care about the absolute value, just the difference. */ * care about the absolute value, just the difference. */
@ -555,6 +553,9 @@ rtp_source_send_rtp (RTPSource * src, GstBuffer * buffer, guint64 ntpnstime)
{ {
GstFlowReturn result = GST_FLOW_OK; GstFlowReturn result = GST_FLOW_OK;
guint len; guint len;
guint32 rtptime;
guint64 ext_rtptime;
guint64 ntp_diff, rtp_diff;
g_return_val_if_fail (RTP_IS_SOURCE (src), GST_FLOW_ERROR); g_return_val_if_fail (RTP_IS_SOURCE (src), GST_FLOW_ERROR);
g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR); g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
@ -570,9 +571,27 @@ rtp_source_send_rtp (RTPSource * src, GstBuffer * buffer, guint64 ntpnstime)
src->stats.packets_sent++; src->stats.packets_sent++;
src->stats.octets_sent += len; src->stats.octets_sent += len;
rtptime = gst_rtp_buffer_get_timestamp (buffer);
ext_rtptime = src->last_rtptime;
ext_rtptime = gst_rtp_buffer_ext_timestamp (&ext_rtptime, rtptime);
GST_DEBUG ("SSRC %08x, RTP %" G_GUINT64_FORMAT ", NTP %" GST_TIME_FORMAT,
src->ssrc, ext_rtptime, GST_TIME_ARGS (ntpnstime));
if (ext_rtptime > src->last_rtptime) {
rtp_diff = ext_rtptime - src->last_rtptime;
ntp_diff = ntpnstime - src->last_ntpnstime;
/* calc the diff so we can detect drift at the sender. This can also be used
* to guestimate the clock rate if the NTP time is locked to the RTP
* timestamps (as is the case when the capture device is providing the clock). */
GST_DEBUG ("SSRC %08x, diff RTP %" G_GUINT64_FORMAT ", diff NTP %"
GST_TIME_FORMAT, src->ssrc, rtp_diff, GST_TIME_ARGS (ntp_diff));
}
/* we keep track of the last received RTP timestamp and the corresponding /* we keep track of the last received RTP timestamp and the corresponding
* NTP timestamp so that we can use this info when constructing SR reports */ * NTP timestamp so that we can use this info when constructing SR reports */
src->last_rtptime = gst_rtp_buffer_get_timestamp (buffer); src->last_rtptime = ext_rtptime;
src->last_ntpnstime = ntpnstime; src->last_ntpnstime = ntpnstime;
/* push packet */ /* push packet */
@ -587,7 +606,8 @@ rtp_source_send_rtp (RTPSource * src, GstBuffer * buffer, guint64 ntpnstime)
* get the correct SSRC. */ * get the correct SSRC. */
buffer = gst_buffer_make_writable (buffer); buffer = gst_buffer_make_writable (buffer);
GST_DEBUG ("updating SSRC from %08x to %08x", ssrc, src->ssrc); GST_WARNING ("updating SSRC from %08x to %08x, fix the payloader", ssrc,
src->ssrc);
gst_rtp_buffer_set_ssrc (buffer, src->ssrc); gst_rtp_buffer_set_ssrc (buffer, src->ssrc);
} }
GST_DEBUG ("pushing RTP packet %" G_GUINT64_FORMAT, GST_DEBUG ("pushing RTP packet %" G_GUINT64_FORMAT,
@ -716,7 +736,7 @@ rtp_source_get_new_sr (RTPSource * src, GstClockTime ntpnstime,
guint64 * ntptime, guint32 * rtptime, guint32 * packet_count, guint64 * ntptime, guint32 * rtptime, guint32 * packet_count,
guint32 * octet_count) guint32 * octet_count)
{ {
guint32 t_rtp; guint64 t_rtp;
guint64 t_current_ntp; guint64 t_current_ntp;
GstClockTimeDiff diff; GstClockTimeDiff diff;
@ -730,7 +750,7 @@ rtp_source_get_new_sr (RTPSource * src, GstClockTime ntpnstime,
t_rtp = src->last_rtptime; t_rtp = src->last_rtptime;
GST_DEBUG ("last_ntpnstime %" GST_TIME_FORMAT ", last_rtptime %" GST_DEBUG ("last_ntpnstime %" GST_TIME_FORMAT ", last_rtptime %"
G_GUINT32_FORMAT, GST_TIME_ARGS (src->last_ntpnstime), t_rtp); G_GUINT64_FORMAT, GST_TIME_ARGS (src->last_ntpnstime), t_rtp);
if (src->clock_rate != -1) { if (src->clock_rate != -1) {
/* get the diff with the SR time */ /* get the diff with the SR time */
@ -752,11 +772,12 @@ rtp_source_get_new_sr (RTPSource * src, GstClockTime ntpnstime,
GST_WARNING ("no clock-rate, cannot interpollate rtp time"); GST_WARNING ("no clock-rate, cannot interpollate rtp time");
} }
/* convert the NTP time in nanoseconds to 32.32 fixed point */
t_current_ntp = gst_util_uint64_scale (ntpnstime, (1LL << 32), GST_SECOND); t_current_ntp = gst_util_uint64_scale (ntpnstime, (1LL << 32), GST_SECOND);
GST_DEBUG ("NTP %08x:%08x, RTP %" G_GUINT32_FORMAT, GST_DEBUG ("NTP %08x:%08x, RTP %" G_GUINT32_FORMAT,
(guint32) (t_current_ntp >> 32), (guint32) (t_current_ntp & 0xffffffff), (guint32) (t_current_ntp >> 32), (guint32) (t_current_ntp & 0xffffffff),
t_rtp); (guint32) t_rtp);
if (ntptime) if (ntptime)
*ntptime = t_current_ntp; *ntptime = t_current_ntp;