diff --git a/ChangeLog b/ChangeLog index 007eecd8c0..815f7c9f95 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,51 @@ +2007-09-12 Wim Taymans + + * gst/rtpmanager/gstrtpbin.c: (calc_ntp_ns_base), + (gst_rtp_bin_change_state), (new_payload_found), (create_send_rtp): + Calculate and configure the NTP base time so that we can generate better + NTP times in SR packets. + Set caps on new ghostpad. + + * gst/rtpmanager/gstrtpjitterbuffer.c: + (gst_rtp_jitter_buffer_loop): + Clean debug statement. + + * gst/rtpmanager/gstrtpsession.c: (gst_rtp_session_class_init), + (gst_rtp_session_init), (gst_rtp_session_set_property), + (gst_rtp_session_get_property), (get_current_ntp_ns_time), + (rtcp_thread), (gst_rtp_session_event_recv_rtp_sink), + (gst_rtp_session_internal_links), (gst_rtp_session_chain_recv_rtp), + (gst_rtp_session_event_send_rtp_sink), + (gst_rtp_session_chain_send_rtp), (create_recv_rtp_sink), + (create_send_rtp_sink): + * gst/rtpmanager/gstrtpsession.h: + Add ntp-ns-base property to convert running_time to NTP time. + Handle NEWSEGMENT events on send and recv RTP pads so that we can + calculate the running time and thus NTP time of the packets. + Simplify getting the current NTP time using the pipeline clock. + Implement internal links functions. + Use the buffer timestamp to calculate the NTP time instead of the clock. + + * gst/rtpmanager/gstrtpssrcdemux.c: (create_demux_pad_for_ssrc), + (gst_rtp_ssrc_demux_init), (gst_rtp_ssrc_demux_sink_event), + (gst_rtp_ssrc_demux_chain), (gst_rtp_ssrc_demux_rtcp_chain), + (gst_rtp_ssrc_demux_internal_links), + (gst_rtp_ssrc_demux_src_query): + * gst/rtpmanager/gstrtpssrcdemux.h: + Implement internal links function. + Calculate the diff between different streams, this might be used later + to get the inter stream latency. + + * gst/rtpmanager/rtpsession.c: (rtp_session_send_rtp): + Simple cleanup. + + * gst/rtpmanager/rtpsource.c: (rtp_source_init), + (calculate_jitter), (rtp_source_send_rtp), (rtp_source_get_new_sr): + Make the clock skew window a little bigger. + Apply the clock skew to all buffers, not just one with a new timestamp. + Calculate and debug sender clock drift. + Use extended last timestamp to interpollate for SR reports. + 2007-09-12 Tim-Philipp Müller Patch by: Peter Kjellerstedt diff --git a/gst/rtpmanager/gstrtpbin.c b/gst/rtpmanager/gstrtpbin.c index a4ba67c3ea..7d5fb5d134 100644 --- a/gst/rtpmanager/gstrtpbin.c +++ b/gst/rtpmanager/gstrtpbin.c @@ -208,6 +208,8 @@ GST_STATIC_PAD_TEMPLATE ("sink_%d", struct _GstRtpBinPrivate { GMutex *bin_lock; + + GstClockTime ntp_ns_base; }; /* signals and args */ @@ -1142,6 +1144,30 @@ gst_rtp_bin_provide_clock (GstElement * element) return GST_CLOCK_CAST (gst_object_ref (rtpbin->provided_clock)); } +static void +calc_ntp_ns_base (GstRtpBin * bin) +{ + GstClockTime now; + GTimeVal current; + GSList *walk; + + /* get the current time and convert it to NTP time in nanoseconds */ + g_get_current_time (¤t); + now = GST_TIMEVAL_TO_TIME (current); + now += (2208988800LL * GST_SECOND); + + GST_RTP_BIN_LOCK (bin); + bin->priv->ntp_ns_base = now; + for (walk = bin->sessions; walk; walk = g_slist_next (walk)) { + GstRtpBinSession *session = (GstRtpBinSession *) walk->data; + + g_object_set (session->session, "ntp-ns-base", now, NULL); + } + GST_RTP_BIN_UNLOCK (bin); + + return; +} + static GstStateChangeReturn gst_rtp_bin_change_state (GstElement * element, GstStateChange transition) { @@ -1156,6 +1182,7 @@ gst_rtp_bin_change_state (GstElement * element, GstStateChange transition) case GST_STATE_CHANGE_READY_TO_PAUSED: break; case GST_STATE_CHANGE_PAUSED_TO_PLAYING: + calc_ntp_ns_base (rtpbin); break; default: break; @@ -1199,6 +1226,7 @@ new_payload_found (GstElement * element, guint pt, GstPad * pad, gpad = gst_ghost_pad_new_from_template (padname, pad, templ); g_free (padname); + gst_pad_set_caps (gpad, GST_PAD_CAPS (pad)); gst_pad_set_active (gpad, TRUE); gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), gpad); } @@ -1553,9 +1581,6 @@ create_send_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name) if (session->send_rtp_sink == NULL) goto pad_failed; - g_signal_connect (session->send_rtp_sink, "notify::caps", - (GCallback) caps_changed, session); - result = gst_ghost_pad_new_from_template (name, session->send_rtp_sink, templ); gst_pad_set_active (result, TRUE); diff --git a/gst/rtpmanager/gstrtpjitterbuffer.c b/gst/rtpmanager/gstrtpjitterbuffer.c index a23fbb87a1..08a55f2bc8 100644 --- a/gst/rtpmanager/gstrtpjitterbuffer.c +++ b/gst/rtpmanager/gstrtpjitterbuffer.c @@ -977,7 +977,7 @@ again: GST_DEBUG_OBJECT (jitterbuffer, "Popped buffer #%d, rtptime %u, exttime %" G_GUINT64_FORMAT - ",now %d left", seqnum, rtp_time, exttimestamp, + ", now %d left", seqnum, rtp_time, exttimestamp, rtp_jitter_buffer_num_packets (priv->jbuf)); /* If we don't know what the next seqnum should be (== -1) we have to wait diff --git a/gst/rtpmanager/gstrtpsession.c b/gst/rtpmanager/gstrtpsession.c index 87948a4bbe..70833655b0 100644 --- a/gst/rtpmanager/gstrtpsession.c +++ b/gst/rtpmanager/gstrtpsession.c @@ -214,6 +214,8 @@ enum LAST_SIGNAL }; +#define DEFAULT_NTP_NS_BASE 0 + enum { PROP_0, @@ -462,6 +464,11 @@ gst_rtp_session_class_init (GstRtpSessionClass * klass) G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, on_timeout), NULL, NULL, g_cclosure_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT); + g_object_class_install_property (gobject_class, PROP_NTP_NS_BASE, + g_param_spec_uint64 ("ntp-ns-base", "NTP base time", + "The NTP base time corresponding to running_time 0", 0, + G_MAXUINT64, DEFAULT_NTP_NS_BASE, G_PARAM_READWRITE)); + gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_rtp_session_change_state); gstelement_class->request_new_pad = @@ -497,6 +504,9 @@ gst_rtp_session_init (GstRtpSession * rtpsession, GstRtpSessionClass * klass) g_signal_connect (rtpsession->priv->session, "on-timeout", (GCallback) on_timeout, rtpsession); rtpsession->priv->ptmap = g_hash_table_new (NULL, NULL); + + gst_segment_init (&rtpsession->recv_rtp_seg, GST_FORMAT_UNDEFINED); + gst_segment_init (&rtpsession->send_rtp_seg, GST_FORMAT_UNDEFINED); } static void @@ -521,7 +531,11 @@ gst_rtp_session_set_property (GObject * object, guint prop_id, switch (prop_id) { case PROP_NTP_NS_BASE: + GST_OBJECT_LOCK (rtpsession); rtpsession->priv->ntpnsbase = g_value_get_uint64 (value); + GST_DEBUG_OBJECT (rtpsession, "setting NTP base to %" GST_TIME_FORMAT, + GST_TIME_ARGS (rtpsession->priv->ntpnsbase)); + GST_OBJECT_UNLOCK (rtpsession); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); @@ -539,7 +553,9 @@ gst_rtp_session_get_property (GObject * object, guint prop_id, switch (prop_id) { case PROP_NTP_NS_BASE: + GST_OBJECT_LOCK (rtpsession); g_value_set_uint64 (value, rtpsession->priv->ntpnsbase); + GST_OBJECT_UNLOCK (rtpsession); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); @@ -548,19 +564,31 @@ gst_rtp_session_get_property (GObject * object, guint prop_id, } static guint64 -get_current_ntp_ns_time (GstRtpSession * rtpsession, GstClock * clock) +get_current_ntp_ns_time (GstRtpSession * rtpsession) { guint64 ntpnstime; + GstClock *clock; + GstClockTime base_time, ntpnsbase; + + GST_OBJECT_LOCK (rtpsession); + if ((clock = GST_ELEMENT_CLOCK (rtpsession))) { + base_time = GST_ELEMENT_CAST (rtpsession)->base_time; + ntpnsbase = rtpsession->priv->ntpnsbase; + gst_object_ref (clock); + GST_OBJECT_UNLOCK (rtpsession); - if (clock) { /* get current NTP time */ ntpnstime = gst_clock_get_time (clock); /* convert to running time */ - ntpnstime -= gst_element_get_base_time (GST_ELEMENT_CAST (rtpsession)); + ntpnstime -= base_time; /* add NTP base offset */ - ntpnstime += rtpsession->priv->ntpnsbase; - } else + ntpnstime += ntpnsbase; + + gst_object_unref (clock); + } else { + GST_OBJECT_UNLOCK (rtpsession); ntpnstime = -1; + } return ntpnstime; } @@ -568,7 +596,7 @@ get_current_ntp_ns_time (GstRtpSession * rtpsession, GstClock * clock) static void rtcp_thread (GstRtpSession * rtpsession) { - GstClock *sysclock, *clock; + GstClock *sysclock; GstClockID id; GstClockTime current_time; GstClockTime next_timeout; @@ -579,9 +607,6 @@ rtcp_thread (GstRtpSession * rtpsession) if (sysclock == NULL) goto no_sysclock; - /* to get the current NTP time, we use the pipeline clock */ - clock = gst_element_get_clock (GST_ELEMENT_CAST (rtpsession)); - current_time = gst_clock_get_time (sysclock); GST_DEBUG_OBJECT (rtpsession, "entering RTCP thread"); @@ -619,7 +644,7 @@ rtcp_thread (GstRtpSession * rtpsession) current_time = gst_clock_get_time (sysclock); /* get current NTP time */ - ntpnstime = get_current_ntp_ns_time (rtpsession, clock); + ntpnstime = get_current_ntp_ns_time (rtpsession); /* we get unlocked because we need to perform reconsideration, don't perform * the timeout but get a new reporting estimate. */ @@ -969,6 +994,41 @@ gst_rtp_session_event_recv_rtp_sink (GstPad * pad, GstEvent * event) GST_EVENT_TYPE_NAME (event)); switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_FLUSH_STOP: + gst_segment_init (&rtpsession->recv_rtp_seg, GST_FORMAT_UNDEFINED); + break; + case GST_EVENT_NEWSEGMENT: + { + gboolean update; + gdouble rate, arate; + GstFormat format; + gint64 start, stop, time; + GstSegment *segment; + + segment = &rtpsession->recv_rtp_seg; + + /* the newsegment event is needed to convert the RTP timestamp to + * running_time, which is needed to generate a mapping from RTP to NTP + * timestamps in SR reports */ + gst_event_parse_new_segment_full (event, &update, &rate, &arate, &format, + &start, &stop, &time); + + GST_DEBUG_OBJECT (rtpsession, + "configured NEWSEGMENT update %d, rate %lf, applied rate %lf, " + "format GST_FORMAT_TIME, " + "%" GST_TIME_FORMAT " -- %" GST_TIME_FORMAT + ", time %" GST_TIME_FORMAT ", accum %" GST_TIME_FORMAT, + update, rate, arate, GST_TIME_ARGS (segment->start), + GST_TIME_ARGS (segment->stop), GST_TIME_ARGS (segment->time), + GST_TIME_ARGS (segment->accum)); + + gst_segment_set_newsegment_full (segment, update, rate, + arate, format, start, stop, time); + + /* push event forward */ + ret = gst_pad_push_event (rtpsession->recv_rtp_src, event); + break; + } default: ret = gst_pad_push_event (rtpsession->recv_rtp_src, event); break; @@ -976,6 +1036,31 @@ gst_rtp_session_event_recv_rtp_sink (GstPad * pad, GstEvent * event) gst_object_unref (rtpsession); return ret; + +} +static GList * +gst_rtp_session_internal_links (GstPad * pad) +{ + GstRtpSession *rtpsession; + GstRtpSessionPrivate *priv; + GList *res = NULL; + + rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad)); + priv = rtpsession->priv; + + if (pad == rtpsession->recv_rtp_src) { + res = g_list_prepend (res, rtpsession->recv_rtp_sink); + } else if (pad == rtpsession->recv_rtp_sink) { + res = g_list_prepend (res, rtpsession->recv_rtp_src); + } else if (pad == rtpsession->send_rtp_src) { + res = g_list_prepend (res, rtpsession->send_rtp_sink); + } else if (pad == rtpsession->send_rtp_sink) { + res = g_list_prepend (res, rtpsession->send_rtp_src); + } + + gst_object_unref (rtpsession); + + return res; } static gboolean @@ -1006,14 +1091,25 @@ gst_rtp_session_chain_recv_rtp (GstPad * pad, GstBuffer * buffer) GstRtpSessionPrivate *priv; GstFlowReturn ret; guint64 ntpnstime; + GstClockTime timestamp; rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad)); priv = rtpsession->priv; GST_DEBUG_OBJECT (rtpsession, "received RTP packet"); - ntpnstime = - get_current_ntp_ns_time (rtpsession, GST_ELEMENT_CLOCK (rtpsession)); + /* get NTP time when this packet was captured, this depends on the timestamp. */ + timestamp = GST_BUFFER_TIMESTAMP (buffer); + if (GST_CLOCK_TIME_IS_VALID (timestamp)) { + /* convert to running time using the segment values */ + ntpnstime = + gst_segment_to_running_time (&rtpsession->recv_rtp_seg, GST_FORMAT_TIME, + timestamp); + /* add constant to convert running time to NTP time */ + ntpnstime += priv->ntpnsbase; + } else { + ntpnstime = get_current_ntp_ns_time (rtpsession); + } ret = rtp_session_process_rtp (priv->session, buffer, ntpnstime); @@ -1084,6 +1180,9 @@ gst_rtp_session_event_send_rtp_sink (GstPad * pad, GstEvent * event) GST_DEBUG_OBJECT (rtpsession, "received event"); switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_FLUSH_STOP: + gst_segment_init (&rtpsession->send_rtp_seg, GST_FORMAT_UNDEFINED); + break; case GST_EVENT_NEWSEGMENT: { gboolean update; @@ -1146,7 +1245,10 @@ gst_rtp_session_chain_send_rtp (GstPad * pad, GstBuffer * buffer) timestamp = GST_BUFFER_TIMESTAMP (buffer); if (GST_CLOCK_TIME_IS_VALID (timestamp)) { /* convert to running time using the segment start value. */ - ntpnstime = timestamp - rtpsession->send_rtp_seg.start; + ntpnstime = + gst_segment_to_running_time (&rtpsession->send_rtp_seg, GST_FORMAT_TIME, + timestamp); + /* convert to NTP time by adding the NTP base */ ntpnstime += priv->ntpnsbase; } else ntpnstime = -1; @@ -1175,6 +1277,8 @@ create_recv_rtp_sink (GstRtpSession * rtpsession) gst_rtp_session_event_recv_rtp_sink); gst_pad_set_setcaps_function (rtpsession->recv_rtp_sink, gst_rtp_session_sink_setcaps); + gst_pad_set_internal_link_function (rtpsession->recv_rtp_sink, + gst_rtp_session_internal_links); gst_pad_set_active (rtpsession->recv_rtp_sink, TRUE); gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->recv_rtp_sink); @@ -1183,6 +1287,8 @@ create_recv_rtp_sink (GstRtpSession * rtpsession) rtpsession->recv_rtp_src = gst_pad_new_from_static_template (&rtpsession_recv_rtp_src_template, "recv_rtp_src"); + gst_pad_set_internal_link_function (rtpsession->recv_rtp_src, + gst_rtp_session_internal_links); gst_pad_use_fixed_caps (rtpsession->recv_rtp_src); gst_pad_set_active (rtpsession->recv_rtp_src, TRUE); gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->recv_rtp_src); @@ -1235,8 +1341,8 @@ create_send_rtp_sink (GstRtpSession * rtpsession) gst_rtp_session_chain_send_rtp); gst_pad_set_event_function (rtpsession->send_rtp_sink, gst_rtp_session_event_send_rtp_sink); - gst_pad_set_setcaps_function (rtpsession->send_rtp_sink, - gst_rtp_session_sink_setcaps); + gst_pad_set_internal_link_function (rtpsession->send_rtp_sink, + gst_rtp_session_internal_links); gst_pad_set_active (rtpsession->send_rtp_sink, TRUE); gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->send_rtp_sink); @@ -1245,6 +1351,8 @@ create_send_rtp_sink (GstRtpSession * rtpsession) gst_pad_new_from_static_template (&rtpsession_send_rtp_src_template, "send_rtp_src"); gst_pad_use_fixed_caps (rtpsession->send_rtp_src); + gst_pad_set_internal_link_function (rtpsession->send_rtp_src, + gst_rtp_session_internal_links); gst_pad_set_active (rtpsession->send_rtp_src, TRUE); gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->send_rtp_src); diff --git a/gst/rtpmanager/gstrtpsession.h b/gst/rtpmanager/gstrtpsession.h index 09565ac1e0..3fffb0679c 100644 --- a/gst/rtpmanager/gstrtpsession.h +++ b/gst/rtpmanager/gstrtpsession.h @@ -43,6 +43,7 @@ struct _GstRtpSession { /*< private >*/ GstPad *recv_rtp_sink; + GstSegment recv_rtp_seg; GstPad *recv_rtcp_sink; GstPad *send_rtp_sink; GstSegment send_rtp_seg; diff --git a/gst/rtpmanager/gstrtpssrcdemux.c b/gst/rtpmanager/gstrtpssrcdemux.c index 5457bc3513..c728a6d589 100644 --- a/gst/rtpmanager/gstrtpssrcdemux.c +++ b/gst/rtpmanager/gstrtpssrcdemux.c @@ -125,6 +125,8 @@ static gboolean gst_rtp_ssrc_demux_rtcp_sink_event (GstPad * pad, /* srcpad stuff */ static gboolean gst_rtp_ssrc_demux_src_event (GstPad * pad, GstEvent * event); +static GList *gst_rtp_ssrc_demux_internal_links (GstPad * pad); +static gboolean gst_rtp_ssrc_demux_src_query (GstPad * pad, GstQuery * query); static guint gst_rtp_ssrc_demux_signals[LAST_SIGNAL] = { 0 }; @@ -137,6 +139,7 @@ struct _GstRtpSsrcDemuxPad GstPad *rtp_pad; GstCaps *caps; GstPad *rtcp_pad; + GstClockTime first_ts; }; /* find a src pad for a given SSRC, returns NULL if the SSRC was not found @@ -156,7 +159,8 @@ find_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc) } static GstRtpSsrcDemuxPad * -create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc) +create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc, + GstClockTime timestamp) { GstPad *rtp_pad, *rtcp_pad; GstElementClass *klass; @@ -177,13 +181,27 @@ create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc) rtcp_pad = gst_pad_new_from_template (templ, padname); g_free (padname); + /* we use the first timestamp received to calculate the difference between + * timestamps on all streams */ + GST_DEBUG_OBJECT (demux, "SSRC %08x, first timestamp %" GST_TIME_FORMAT, + ssrc, GST_TIME_ARGS (timestamp)); + /* wrap in structure and add to list */ demuxpad = g_new0 (GstRtpSsrcDemuxPad, 1); demuxpad->ssrc = ssrc; demuxpad->rtp_pad = rtp_pad; demuxpad->rtcp_pad = rtcp_pad; + demuxpad->first_ts = timestamp; + + GST_DEBUG_OBJECT (demux, "first timestamp %" GST_TIME_FORMAT, + GST_TIME_ARGS (timestamp)); + + gst_pad_set_element_private (rtp_pad, demuxpad); + gst_pad_set_element_private (rtcp_pad, demuxpad); demux->srcpads = g_slist_prepend (demux->srcpads, demuxpad); + + /* unlock to perform the remainder and to fire our signal */ GST_OBJECT_UNLOCK (demux); /* copy caps from input */ @@ -193,7 +211,13 @@ create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc) gst_pad_use_fixed_caps (rtcp_pad); gst_pad_set_event_function (rtp_pad, gst_rtp_ssrc_demux_src_event); + gst_pad_set_query_function (rtp_pad, gst_rtp_ssrc_demux_src_query); + gst_pad_set_internal_link_function (rtp_pad, + gst_rtp_ssrc_demux_internal_links); gst_pad_set_active (rtp_pad, TRUE); + + gst_pad_set_internal_link_function (rtcp_pad, + gst_rtp_ssrc_demux_internal_links); gst_pad_set_active (rtcp_pad, TRUE); gst_element_add_pad (GST_ELEMENT_CAST (demux), rtp_pad); @@ -277,6 +301,8 @@ gst_rtp_ssrc_demux_init (GstRtpSsrcDemux * demux, gst_pad_set_event_function (demux->rtcp_sink, gst_rtp_ssrc_demux_rtcp_sink_event); gst_element_add_pad (GST_ELEMENT_CAST (demux), demux->rtcp_sink); + + gst_segment_init (&demux->segment, GST_FORMAT_UNDEFINED); } static void @@ -298,6 +324,9 @@ gst_rtp_ssrc_demux_sink_event (GstPad * pad, GstEvent * event) demux = GST_RTP_SSRC_DEMUX (gst_pad_get_parent (pad)); switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_FLUSH_STOP: + gst_segment_init (&demux->segment, GST_FORMAT_UNDEFINED); + break; case GST_EVENT_NEWSEGMENT: default: { @@ -370,7 +399,9 @@ gst_rtp_ssrc_demux_chain (GstPad * pad, GstBuffer * buf) GST_OBJECT_LOCK (demux); dpad = find_demux_pad_for_ssrc (demux, ssrc); if (dpad == NULL) { - if (!(dpad = create_demux_pad_for_ssrc (demux, ssrc))) + if (!(dpad = + create_demux_pad_for_ssrc (demux, ssrc, + GST_BUFFER_TIMESTAMP (buf)))) goto create_failed; } GST_OBJECT_UNLOCK (demux); @@ -419,6 +450,7 @@ gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad, GstBuffer * buf) /* first packet must be SR or RR or else the validate would have failed */ switch (gst_rtcp_packet_get_type (&packet)) { case GST_RTCP_TYPE_SR: + /* get the ssrc so that we can route it to the right source pad */ gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, NULL, NULL, NULL, NULL); break; @@ -435,7 +467,7 @@ gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad, GstBuffer * buf) dpad = find_demux_pad_for_ssrc (demux, ssrc); if (dpad == NULL) { GST_DEBUG_OBJECT (demux, "creating pad for SSRC %08x", ssrc); - if (!(dpad = create_demux_pad_for_ssrc (demux, ssrc))) + if (!(dpad = create_demux_pad_for_ssrc (demux, ssrc, -1))) goto create_failed; } GST_OBJECT_UNLOCK (demux); @@ -482,6 +514,84 @@ gst_rtp_ssrc_demux_src_event (GstPad * pad, GstEvent * event) return res; } +static GList * +gst_rtp_ssrc_demux_internal_links (GstPad * pad) +{ + GstRtpSsrcDemux *demux; + GList *res = NULL; + GSList *walk; + + demux = GST_RTP_SSRC_DEMUX (gst_pad_get_parent (pad)); + + GST_OBJECT_LOCK (demux); + for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) { + GstRtpSsrcDemuxPad *dpad = (GstRtpSsrcDemuxPad *) walk->data; + + if (pad == demux->rtp_sink) { + res = g_list_prepend (res, dpad->rtp_pad); + } else if (pad == demux->rtcp_sink) { + res = g_list_prepend (res, dpad->rtcp_pad); + } else if (pad == dpad->rtp_pad) { + res = g_list_prepend (res, demux->rtp_sink); + break; + } else if (pad == dpad->rtcp_pad) { + res = g_list_prepend (res, demux->rtcp_sink); + break; + } + } + GST_OBJECT_UNLOCK (demux); + + gst_object_unref (demux); + return res; +} + +static gboolean +gst_rtp_ssrc_demux_src_query (GstPad * pad, GstQuery * query) +{ + GstRtpSsrcDemux *demux; + gboolean res = FALSE; + + demux = GST_RTP_SSRC_DEMUX (gst_pad_get_parent (pad)); + + switch (GST_QUERY_TYPE (query)) { + case GST_QUERY_LATENCY: + { + + if ((res = gst_pad_peer_query (demux->rtp_sink, query))) { + gboolean live; + GstClockTime min_latency, max_latency; + GstRtpSsrcDemuxPad *demuxpad; + + demuxpad = gst_pad_get_element_private (pad); + + gst_query_parse_latency (query, &live, &min_latency, &max_latency); + + GST_DEBUG_OBJECT (demux, "peer min latency %" GST_TIME_FORMAT, + GST_TIME_ARGS (min_latency)); + + GST_DEBUG_OBJECT (demux, + "latency for SSRC %08x, latency %" GST_TIME_FORMAT, demuxpad->ssrc, + GST_TIME_ARGS (demuxpad->first_ts)); + +#if 0 + min_latency += demuxpad->first_ts; + if (max_latency != -1) + max_latency += demuxpad->first_ts; +#endif + + gst_query_set_latency (query, live, min_latency, max_latency); + } + break; + } + default: + res = gst_pad_query_default (pad, query); + break; + } + gst_object_unref (demux); + + return res; +} + static GstStateChangeReturn gst_rtp_ssrc_demux_change_state (GstElement * element, GstStateChange transition) diff --git a/gst/rtpmanager/gstrtpssrcdemux.h b/gst/rtpmanager/gstrtpssrcdemux.h index bea2769d7a..88aeed8038 100644 --- a/gst/rtpmanager/gstrtpssrcdemux.h +++ b/gst/rtpmanager/gstrtpssrcdemux.h @@ -36,6 +36,8 @@ struct _GstRtpSsrcDemux { GstElement parent; + GstSegment segment; + GstPad *rtp_sink; GstPad *rtcp_sink; GSList *srcpads; diff --git a/gst/rtpmanager/rtpsession.c b/gst/rtpmanager/rtpsession.c index e7f72b403d..724fa24a0f 100644 --- a/gst/rtpmanager/rtpsession.c +++ b/gst/rtpmanager/rtpsession.c @@ -1423,7 +1423,7 @@ rtp_session_send_rtp (RTPSession * sess, GstBuffer * buffer, guint64 ntptime) prevsender = RTP_SOURCE_IS_SENDER (source); /* we use our own source to send */ - result = rtp_source_send_rtp (sess->source, buffer, ntptime); + result = rtp_source_send_rtp (source, buffer, ntptime); if (RTP_SOURCE_IS_SENDER (source) && !prevsender) sess->stats.sender_sources++; diff --git a/gst/rtpmanager/rtpsource.c b/gst/rtpmanager/rtpsource.c index 63543358e5..c4152474b0 100644 --- a/gst/rtpmanager/rtpsource.c +++ b/gst/rtpmanager/rtpsource.c @@ -74,6 +74,7 @@ rtp_source_init (RTPSource * src) src->prev_ext_rtptime = -1; src->packets = g_queue_new (); src->seqnum_base = -1; + src->last_rtptime = -1; src->stats.cycles = -1; src->stats.jitter = 0; @@ -320,22 +321,19 @@ calculate_jitter (RTPSource * src, GstBuffer * buffer, * out of sync and we must compensate. */ skew = ntpdiff - rtpdiff; /* average out the skew to get a smooth value. */ - src->avg_skew = (31 * src->avg_skew + skew) / 32; + src->avg_skew = (63 * src->avg_skew + skew) / 64; - GST_DEBUG ("skew %" G_GINT64_FORMAT ", avg %" G_GINT64_FORMAT, skew, + GST_DEBUG ("new skew %" G_GINT64_FORMAT ", avg %" G_GINT64_FORMAT, skew, src->avg_skew); - if (src->avg_skew != 0) { - guint32 timestamp; - - /* patch the buffer RTP timestamp with the skew */ - GST_DEBUG ("adjusting timestamp %" G_GINT64_FORMAT, src->avg_skew); - timestamp = gst_rtp_buffer_get_timestamp (buffer); - timestamp += src->avg_skew; - gst_rtp_buffer_set_timestamp (buffer, timestamp); - } /* store previous extended timestamp */ src->prev_ext_rtptime = ext_rtptime; } + if (src->avg_skew != 0) { + /* patch the buffer RTP timestamp with the skew */ + GST_DEBUG ("skew timestamp RTP %" G_GUINT32_FORMAT " -> %" G_GINT64_FORMAT, + rtptime, rtptime + src->avg_skew); + gst_rtp_buffer_set_timestamp (buffer, rtptime + src->avg_skew); + } /* convert arrival time to RTP timestamp units, truncate to 32 bits, we don't * care about the absolute value, just the difference. */ @@ -555,6 +553,9 @@ rtp_source_send_rtp (RTPSource * src, GstBuffer * buffer, guint64 ntpnstime) { GstFlowReturn result = GST_FLOW_OK; guint len; + guint32 rtptime; + guint64 ext_rtptime; + guint64 ntp_diff, rtp_diff; g_return_val_if_fail (RTP_IS_SOURCE (src), GST_FLOW_ERROR); g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR); @@ -570,9 +571,27 @@ rtp_source_send_rtp (RTPSource * src, GstBuffer * buffer, guint64 ntpnstime) src->stats.packets_sent++; src->stats.octets_sent += len; + rtptime = gst_rtp_buffer_get_timestamp (buffer); + ext_rtptime = src->last_rtptime; + ext_rtptime = gst_rtp_buffer_ext_timestamp (&ext_rtptime, rtptime); + + GST_DEBUG ("SSRC %08x, RTP %" G_GUINT64_FORMAT ", NTP %" GST_TIME_FORMAT, + src->ssrc, ext_rtptime, GST_TIME_ARGS (ntpnstime)); + + if (ext_rtptime > src->last_rtptime) { + rtp_diff = ext_rtptime - src->last_rtptime; + ntp_diff = ntpnstime - src->last_ntpnstime; + + /* calc the diff so we can detect drift at the sender. This can also be used + * to guestimate the clock rate if the NTP time is locked to the RTP + * timestamps (as is the case when the capture device is providing the clock). */ + GST_DEBUG ("SSRC %08x, diff RTP %" G_GUINT64_FORMAT ", diff NTP %" + GST_TIME_FORMAT, src->ssrc, rtp_diff, GST_TIME_ARGS (ntp_diff)); + } + /* we keep track of the last received RTP timestamp and the corresponding * NTP timestamp so that we can use this info when constructing SR reports */ - src->last_rtptime = gst_rtp_buffer_get_timestamp (buffer); + src->last_rtptime = ext_rtptime; src->last_ntpnstime = ntpnstime; /* push packet */ @@ -587,7 +606,8 @@ rtp_source_send_rtp (RTPSource * src, GstBuffer * buffer, guint64 ntpnstime) * get the correct SSRC. */ buffer = gst_buffer_make_writable (buffer); - GST_DEBUG ("updating SSRC from %08x to %08x", ssrc, src->ssrc); + GST_WARNING ("updating SSRC from %08x to %08x, fix the payloader", ssrc, + src->ssrc); gst_rtp_buffer_set_ssrc (buffer, src->ssrc); } GST_DEBUG ("pushing RTP packet %" G_GUINT64_FORMAT, @@ -716,7 +736,7 @@ rtp_source_get_new_sr (RTPSource * src, GstClockTime ntpnstime, guint64 * ntptime, guint32 * rtptime, guint32 * packet_count, guint32 * octet_count) { - guint32 t_rtp; + guint64 t_rtp; guint64 t_current_ntp; GstClockTimeDiff diff; @@ -730,7 +750,7 @@ rtp_source_get_new_sr (RTPSource * src, GstClockTime ntpnstime, t_rtp = src->last_rtptime; GST_DEBUG ("last_ntpnstime %" GST_TIME_FORMAT ", last_rtptime %" - G_GUINT32_FORMAT, GST_TIME_ARGS (src->last_ntpnstime), t_rtp); + G_GUINT64_FORMAT, GST_TIME_ARGS (src->last_ntpnstime), t_rtp); if (src->clock_rate != -1) { /* get the diff with the SR time */ @@ -752,11 +772,12 @@ rtp_source_get_new_sr (RTPSource * src, GstClockTime ntpnstime, GST_WARNING ("no clock-rate, cannot interpollate rtp time"); } + /* convert the NTP time in nanoseconds to 32.32 fixed point */ t_current_ntp = gst_util_uint64_scale (ntpnstime, (1LL << 32), GST_SECOND); GST_DEBUG ("NTP %08x:%08x, RTP %" G_GUINT32_FORMAT, (guint32) (t_current_ntp >> 32), (guint32) (t_current_ntp & 0xffffffff), - t_rtp); + (guint32) t_rtp); if (ntptime) *ntptime = t_current_ntp;