diff --git a/ChangeLog b/ChangeLog index 31348bbe22..69fb5ca6cf 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,36 @@ +2008-11-19 Wim Taymans + + * gst/rtpmanager/gstrtpbin.c: (gst_rtp_bin_associate), + (gst_rtp_bin_handle_sync), (create_stream), (free_stream), + (new_ssrc_pad_found): + Remove internal sync pad, use signals instead to get lip-sync + notifications. + + * gst/rtpmanager/gstrtpjitterbuffer.c: + (gst_rtp_jitter_buffer_base_init), + (gst_rtp_jitter_buffer_class_init), + (gst_rtp_jitter_buffer_internal_links), (create_rtcp_sink), + (remove_rtcp_sink), (gst_rtp_jitter_buffer_request_new_pad), + (gst_rtp_jitter_buffer_release_pad), + (gst_rtp_jitter_buffer_sink_rtcp_event), + (gst_rtp_jitter_buffer_chain_rtcp), + (gst_rtp_jitter_buffer_get_property): + * gst/rtpmanager/gstrtpjitterbuffer.h: + Make it possible to send SR packets to the jitterbuffer. + Check if the SR timestamps are valid by comparing them to the RTP + timestamps. + Signal the SR packet and the timing information to listeners. + + * gst/rtpmanager/gstrtpssrcdemux.c: (create_demux_pad_for_ssrc), + (gst_rtp_ssrc_demux_rtcp_chain), (gst_rtp_ssrc_demux_src_query): + Remove some unused code. + + * gst/rtpmanager/rtpjitterbuffer.c: (rtp_jitter_buffer_reset_skew), + (calculate_skew), (rtp_jitter_buffer_get_sync): + * gst/rtpmanager/rtpjitterbuffer.h: + Keep track of the last seen RTP timestamp so that we can filter out + invalid SR packets. + 2008-11-18 Alessandro Decina * ext/metadata/gstbasemetadata.c: diff --git a/gst/rtpmanager/gstrtpbin.c b/gst/rtpmanager/gstrtpbin.c index 197be52833..11d14d516e 100644 --- a/gst/rtpmanager/gstrtpbin.c +++ b/gst/rtpmanager/gstrtpbin.c @@ -176,14 +176,6 @@ GST_STATIC_PAD_TEMPLATE ("send_rtp_src_%d", GST_STATIC_CAPS ("application/x-rtp") ); -/* padtemplate for the internal pad */ -static GstStaticPadTemplate rtpbin_sync_sink_template = -GST_STATIC_PAD_TEMPLATE ("sink_%d", - GST_PAD_SINK, - GST_PAD_SOMETIMES, - GST_STATIC_CAPS ("application/x-rtcp") - ); - #define GST_RTP_BIN_GET_PRIVATE(obj) \ (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTP_BIN, GstRtpBinPrivate)) @@ -310,8 +302,7 @@ struct _GstRtpBinStream gulong demux_ptreq_sig; gulong demux_pt_change_sig; - /* the internal pad we use to get RTCP sync messages */ - GstPad *sync_pad; + /* data for the RTCP sync signal */ gboolean have_sync; guint64 last_unix; guint64 last_extrtptime; @@ -818,7 +809,8 @@ free_client (GstRtpBinClient * client) } /* associate a stream to the given CNAME. This will make sure all streams for - * that CNAME are synchronized together. */ + * that CNAME are synchronized together. + * Must be called with GST_RTP_BIN_LOCK */ static void gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len, guint8 * data) @@ -828,7 +820,6 @@ gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len, GSList *walk; /* first find or create the CNAME */ - GST_RTP_BIN_LOCK (bin); client = get_client (bin, len, data, &created); /* find stream in the client */ @@ -851,13 +842,6 @@ gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len, stream->ssrc, client, client->cname); } - /* we can only continue if we know the local clock-base and clock-rate */ - if (stream->clock_base == -1) - goto no_clock_base; - - if (stream->clock_rate <= 0) - goto no_clock_rate; - /* take the extended rtptime we found in the SR packet and map it to the * local rtptime. The local rtp time is used to construct timestamps on the * buffers. */ @@ -897,7 +881,7 @@ gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len, * offsets to streams, delaying their playback instead of trying to speed up * other streams (which might be imposible when we have to create negative * latencies). - * The stream that has the smalest diff is selected as the reference stream, + * The stream that has the smallest diff is selected as the reference stream, * all other streams will have a positive offset to this difference. */ min = G_MAXINT64; for (walk = client->streams; walk; walk = g_slist_next (walk)) { @@ -955,22 +939,7 @@ gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len, ostream->ssrc, ostream->ts_offset); } } - GST_RTP_BIN_UNLOCK (bin); - return; - -no_clock_base: - { - GST_WARNING_OBJECT (bin, "we have no clock-base"); - GST_RTP_BIN_UNLOCK (bin); - return; - } -no_clock_rate: - { - GST_WARNING_OBJECT (bin, "we have no clock-rate"); - GST_RTP_BIN_UNLOCK (bin); - return; - } } #define GST_RTCP_BUFFER_FOR_PACKETS(b,buffer,packet) \ @@ -985,44 +954,37 @@ no_clock_rate: for ((b) = gst_rtcp_packet_sdes_first_entry ((packet)); (b); \ (b) = gst_rtcp_packet_sdes_next_entry ((packet))) -static GstFlowReturn -gst_rtp_bin_sync_chain (GstPad * pad, GstBuffer * buffer) +static void +gst_rtp_bin_handle_sync (GstElement * jitterbuffer, GstStructure * s, + GstRtpBinStream * stream) { - GstFlowReturn ret = GST_FLOW_OK; - GstRtpBinStream *stream; GstRtpBin *bin; GstRTCPPacket packet; guint32 ssrc; guint64 ntptime; - guint32 rtptime; gboolean have_sr, have_sdes; gboolean more; guint64 clock_base; guint64 clock_base_time; guint clock_rate; + guint64 extrtptime; + GstBuffer *buffer; - stream = gst_pad_get_element_private (pad); bin = stream->bin; - GST_DEBUG_OBJECT (bin, "received sync packet"); - - if (!gst_rtcp_buffer_validate (buffer)) - goto invalid_rtcp; + GST_DEBUG_OBJECT (bin, "sync handler called"); /* get the last relation between the rtp timestamps and the gstreamer * timestamps. We get this info directly from the jitterbuffer which * constructs gstreamer timestamps from rtp timestamps and so it know exactly * what the current situation is. */ - gst_rtp_jitter_buffer_get_sync (GST_RTP_JITTER_BUFFER (stream->buffer), - &clock_base, &clock_base_time, &clock_rate); - - /* clock base changes when there is a huge gap in the timestamps or seqnum. - * When this happens we don't want to calculate the extended timestamp based - * on the previous one but reset the calculation. */ - if (stream->last_clock_base != clock_base) { - stream->last_extrtptime = -1; - stream->last_clock_base = clock_base; - } + clock_base = g_value_get_uint64 (gst_structure_get_value (s, "base-rtptime")); + clock_base_time = + g_value_get_uint64 (gst_structure_get_value (s, "base-time")); + clock_rate = g_value_get_uint (gst_structure_get_value (s, "clock-rate")); + extrtptime = + g_value_get_uint64 (gst_structure_get_value (s, "sr-ext-rtptime")); + buffer = gst_value_get_buffer (gst_structure_get_value (s, "sr-buffer")); have_sr = FALSE; have_sdes = FALSE; @@ -1035,7 +997,7 @@ gst_rtp_bin_sync_chain (GstPad * pad, GstBuffer * buffer) if (have_sr) break; /* get NTP and RTP times */ - gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, &ntptime, &rtptime, + gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, &ntptime, NULL, NULL, NULL); GST_DEBUG_OBJECT (bin, "received sync packet from SSRC %08x", ssrc); @@ -1044,12 +1006,6 @@ gst_rtp_bin_sync_chain (GstPad * pad, GstBuffer * buffer) continue; have_sr = TRUE; - - /* store values in the stream */ - stream->have_sync = TRUE; - stream->last_unix = gst_rtcp_ntp_to_unix (ntptime); - /* use extended timestamp */ - gst_rtp_buffer_ext_timestamp (&stream->last_extrtptime, rtptime); break; case GST_RTCP_TYPE_SDES: { @@ -1075,11 +1031,17 @@ gst_rtp_bin_sync_chain (GstPad * pad, GstBuffer * buffer) gst_rtcp_packet_sdes_get_entry (&packet, &type, &len, &data); if (type == GST_RTCP_SDES_CNAME) { + GST_RTP_BIN_LOCK (bin); + /* store values in the stream */ + stream->have_sync = TRUE; + stream->last_unix = gst_rtcp_ntp_to_unix (ntptime); + stream->last_extrtptime = extrtptime; stream->clock_base = clock_base; stream->clock_base_time = clock_base_time; stream->clock_rate = clock_rate; /* associate the stream to CNAME */ gst_rtp_bin_associate (bin, stream, len, data); + GST_RTP_BIN_UNLOCK (bin); } } } @@ -1091,20 +1053,6 @@ gst_rtp_bin_sync_chain (GstPad * pad, GstBuffer * buffer) break; } } - - gst_buffer_unref (buffer); - - return ret; - - /* ERRORS */ -invalid_rtcp: - { - /* this is fatal and should be filtered earlier */ - GST_ELEMENT_ERROR (bin, STREAM, DECODE, (NULL), - ("invalid RTCP packet received")); - gst_buffer_unref (buffer); - return GST_FLOW_ERROR; - } } /* create a new stream with @ssrc in @session. Must be called with @@ -1114,8 +1062,6 @@ create_stream (GstRtpBinSession * session, guint32 ssrc) { GstElement *buffer, *demux; GstRtpBinStream *stream; - GstPadTemplate *templ; - gchar *padname; if (!(buffer = gst_element_factory_make ("gstrtpjitterbuffer", NULL))) goto no_jitterbuffer; @@ -1134,19 +1080,6 @@ create_stream (GstRtpBinSession * session, guint32 ssrc) stream->have_sync = FALSE; session->streams = g_slist_prepend (session->streams, stream); - /* make an internal sinkpad for RTCP sync packets. Take ownership of the - * pad. We will link this pad later. */ - padname = g_strdup_printf ("sync_%d", ssrc); - templ = gst_static_pad_template_get (&rtpbin_sync_sink_template); - stream->sync_pad = gst_pad_new_from_template (templ, padname); - gst_object_unref (templ); - g_free (padname); - gst_object_ref (stream->sync_pad); - gst_object_sink (stream->sync_pad); - gst_pad_set_element_private (stream->sync_pad, stream); - gst_pad_set_chain_function (stream->sync_pad, gst_rtp_bin_sync_chain); - gst_pad_set_active (stream->sync_pad, TRUE); - /* provide clock_rate to the jitterbuffer when needed */ g_signal_connect (buffer, "request-pt-map", (GCallback) pt_map_requested, session); @@ -1192,8 +1125,6 @@ free_stream (GstRtpBinStream * stream) gst_bin_remove (GST_BIN_CAST (session->bin), stream->buffer); gst_bin_remove (GST_BIN_CAST (session->bin), stream->demux); - gst_object_unref (stream->sync_pad); - session->streams = g_slist_remove (session->streams, stream); g_free (stream); @@ -1985,7 +1916,7 @@ new_ssrc_pad_found (GstElement * element, guint ssrc, GstPad * pad, } /* get pad and link */ - GST_DEBUG_OBJECT (rtpbin, "linking jitterbuffer"); + GST_DEBUG_OBJECT (rtpbin, "linking jitterbuffer RTP"); padname = g_strdup_printf ("src_%d", ssrc); srcpad = gst_element_get_static_pad (element, padname); g_free (padname); @@ -1994,14 +1925,20 @@ new_ssrc_pad_found (GstElement * element, guint ssrc, GstPad * pad, gst_object_unref (sinkpad); gst_object_unref (srcpad); - /* get the RTCP sync pad */ - GST_DEBUG_OBJECT (rtpbin, "linking sync pad"); + GST_DEBUG_OBJECT (rtpbin, "linking jitterbuffer RTCP"); padname = g_strdup_printf ("rtcp_src_%d", ssrc); srcpad = gst_element_get_static_pad (element, padname); g_free (padname); - gst_pad_link (srcpad, stream->sync_pad); + sinkpad = gst_element_get_request_pad (stream->buffer, "sink_rtcp"); + gst_pad_link (srcpad, sinkpad); + gst_object_unref (sinkpad); gst_object_unref (srcpad); + /* connect to the RTCP sync signal from the jitterbuffer */ + GST_DEBUG_OBJECT (rtpbin, "connecting sync signal"); + g_signal_connect (stream->buffer, + "handle-sync", (GCallback) gst_rtp_bin_handle_sync, stream); + /* connect to the new-pad signal of the payload demuxer, this will expose the * new pad by ghosting it. */ stream->demux_newpad_sig = g_signal_connect (stream->demux, diff --git a/gst/rtpmanager/gstrtpjitterbuffer.c b/gst/rtpmanager/gstrtpjitterbuffer.c index bd47bde43a..e9c2f79494 100644 --- a/gst/rtpmanager/gstrtpjitterbuffer.c +++ b/gst/rtpmanager/gstrtpjitterbuffer.c @@ -87,6 +87,7 @@ enum { SIGNAL_REQUEST_PT_MAP, SIGNAL_CLEAR_PT_MAP, + SIGNAL_HANDLE_SYNC, LAST_SIGNAL }; @@ -127,6 +128,7 @@ enum struct _GstRtpJitterBufferPrivate { GstPad *sinkpad, *srcpad; + GstPad *rtcpsinkpad; RTPJitterBuffer *jbuf; GMutex *jbuf_lock; @@ -190,6 +192,13 @@ GST_STATIC_PAD_TEMPLATE ("sink", */ ) ); +static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_rtcp_template = +GST_STATIC_PAD_TEMPLATE ("sink_rtcp", + GST_PAD_SINK, + GST_PAD_REQUEST, + GST_STATIC_CAPS ("application/x-rtcp") + ); + static GstStaticPadTemplate gst_rtp_jitter_buffer_src_template = GST_STATIC_PAD_TEMPLATE ("src", GST_PAD_SRC, @@ -216,20 +225,30 @@ static void gst_rtp_jitter_buffer_finalize (GObject * object); /* element overrides */ static GstStateChangeReturn gst_rtp_jitter_buffer_change_state (GstElement * element, GstStateChange transition); +static GstPad *gst_rtp_jitter_buffer_request_new_pad (GstElement * element, + GstPadTemplate * templ, const gchar * name); +static void gst_rtp_jitter_buffer_release_pad (GstElement * element, + GstPad * pad); /* pad overrides */ static GstCaps *gst_rtp_jitter_buffer_getcaps (GstPad * pad); +static GList *gst_rtp_jitter_buffer_internal_links (GstPad * pad); /* sinkpad overrides */ static gboolean gst_jitter_buffer_sink_setcaps (GstPad * pad, GstCaps * caps); -static gboolean gst_rtp_jitter_buffer_src_event (GstPad * pad, - GstEvent * event); static gboolean gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstEvent * event); static GstFlowReturn gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer); +static gboolean gst_rtp_jitter_buffer_sink_rtcp_event (GstPad * pad, + GstEvent * event); +static GstFlowReturn gst_rtp_jitter_buffer_chain_rtcp (GstPad * pad, + GstBuffer * buffer); + /* srcpad overrides */ +static gboolean gst_rtp_jitter_buffer_src_event (GstPad * pad, + GstEvent * event); static gboolean gst_rtp_jitter_buffer_src_activate_push (GstPad * pad, gboolean active); static void gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer); @@ -247,6 +266,9 @@ gst_rtp_jitter_buffer_base_init (gpointer klass) gst_static_pad_template_get (&gst_rtp_jitter_buffer_src_template)); gst_element_class_add_pad_template (element_class, gst_static_pad_template_get (&gst_rtp_jitter_buffer_sink_template)); + gst_element_class_add_pad_template (element_class, + gst_static_pad_template_get (&gst_rtp_jitter_buffer_sink_rtcp_template)); + gst_element_class_set_details (element_class, &gst_rtp_jitter_buffer_details); } @@ -320,6 +342,19 @@ gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass) G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass, request_pt_map), NULL, NULL, gst_rtp_bin_marshal_BOXED__UINT, GST_TYPE_CAPS, 1, G_TYPE_UINT); + /** + * GstRtpJitterBuffer::handle-sync: + * @buffer: the object which received the signal + * @struct: a GstStructure containing sync values. + * + * Be notified of new sync values. + */ + gst_rtp_jitter_buffer_signals[SIGNAL_HANDLE_SYNC] = + g_signal_new ("handle-sync", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass, + handle_sync), NULL, NULL, g_cclosure_marshal_VOID__BOXED, + G_TYPE_NONE, 1, GST_TYPE_STRUCTURE); + /** * GstRtpJitterBuffer::clear-pt-map: * @buffer: the object which received the signal @@ -329,11 +364,16 @@ gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass) */ gst_rtp_jitter_buffer_signals[SIGNAL_CLEAR_PT_MAP] = g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass), - G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass, - clear_pt_map), NULL, NULL, g_cclosure_marshal_VOID__VOID, - G_TYPE_NONE, 0, G_TYPE_NONE); + G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, + G_STRUCT_OFFSET (GstRtpJitterBufferClass, clear_pt_map), NULL, NULL, + g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE); - gstelement_class->change_state = gst_rtp_jitter_buffer_change_state; + gstelement_class->change_state = + GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_change_state); + gstelement_class->request_new_pad = + GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_request_new_pad); + gstelement_class->release_pad = + GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_release_pad); klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_clear_pt_map); @@ -403,6 +443,139 @@ gst_rtp_jitter_buffer_finalize (GObject * object) G_OBJECT_CLASS (parent_class)->finalize (object); } +static GList * +gst_rtp_jitter_buffer_internal_links (GstPad * pad) +{ + GstRtpJitterBuffer *jitterbuffer; + GstRtpJitterBufferPrivate *priv; + GList *res = NULL; + + jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad)); + priv = jitterbuffer->priv; + + if (pad == priv->sinkpad) { + res = g_list_prepend (res, priv->srcpad); + } else if (pad == priv->srcpad) { + res = g_list_prepend (res, priv->sinkpad); + } else if (pad == priv->rtcpsinkpad) { + res = NULL; + } + + gst_object_unref (jitterbuffer); + + return res; +} + +static GstPad * +create_rtcp_sink (GstRtpJitterBuffer * jitterbuffer) +{ + GstRtpJitterBufferPrivate *priv; + + priv = jitterbuffer->priv; + + GST_DEBUG_OBJECT (jitterbuffer, "creating RTCP sink pad"); + + priv->rtcpsinkpad = + gst_pad_new_from_static_template + (&gst_rtp_jitter_buffer_sink_rtcp_template, "sink_rtcp"); + gst_pad_set_chain_function (priv->rtcpsinkpad, + gst_rtp_jitter_buffer_chain_rtcp); + gst_pad_set_event_function (priv->rtcpsinkpad, + (GstPadEventFunction) gst_rtp_jitter_buffer_sink_rtcp_event); + gst_pad_set_internal_link_function (priv->rtcpsinkpad, + gst_rtp_jitter_buffer_internal_links); + gst_pad_set_active (priv->rtcpsinkpad, TRUE); + gst_element_add_pad (GST_ELEMENT_CAST (jitterbuffer), priv->rtcpsinkpad); + + return priv->rtcpsinkpad; +} + +static void +remove_rtcp_sink (GstRtpJitterBuffer * jitterbuffer) +{ + GstRtpJitterBufferPrivate *priv; + + priv = jitterbuffer->priv; + + GST_DEBUG_OBJECT (jitterbuffer, "removing RTCP sink pad"); + + gst_pad_set_active (priv->rtcpsinkpad, FALSE); + + gst_element_remove_pad (GST_ELEMENT_CAST (jitterbuffer), priv->rtcpsinkpad); + priv->rtcpsinkpad = NULL; +} + +static GstPad * +gst_rtp_jitter_buffer_request_new_pad (GstElement * element, + GstPadTemplate * templ, const gchar * name) +{ + GstRtpJitterBuffer *jitterbuffer; + GstElementClass *klass; + GstPad *result; + GstRtpJitterBufferPrivate *priv; + + g_return_val_if_fail (templ != NULL, NULL); + g_return_val_if_fail (GST_IS_RTP_JITTER_BUFFER (element), NULL); + + jitterbuffer = GST_RTP_JITTER_BUFFER (element); + priv = jitterbuffer->priv; + klass = GST_ELEMENT_GET_CLASS (element); + + GST_DEBUG_OBJECT (element, "requesting pad %s", GST_STR_NULL (name)); + + /* figure out the template */ + if (templ == gst_element_class_get_pad_template (klass, "sink_rtcp")) { + if (priv->rtcpsinkpad != NULL) + goto exists; + + result = create_rtcp_sink (jitterbuffer); + } else + goto wrong_template; + + return result; + + /* ERRORS */ +wrong_template: + { + g_warning ("gstrtpjitterbuffer: this is not our template"); + return NULL; + } +exists: + { + g_warning ("gstrtpjitterbuffer: pad already requested"); + return NULL; + } +} + +static void +gst_rtp_jitter_buffer_release_pad (GstElement * element, GstPad * pad) +{ + GstRtpJitterBuffer *jitterbuffer; + GstRtpJitterBufferPrivate *priv; + + g_return_if_fail (GST_IS_RTP_JITTER_BUFFER (element)); + g_return_if_fail (GST_IS_PAD (pad)); + + jitterbuffer = GST_RTP_JITTER_BUFFER (element); + priv = jitterbuffer->priv; + + GST_DEBUG_OBJECT (element, "releasing pad %s:%s", GST_DEBUG_PAD_NAME (pad)); + + if (priv->rtcpsinkpad == pad) { + remove_rtcp_sink (jitterbuffer); + } else + goto wrong_pad; + + return; + + /* ERRORS */ +wrong_pad: + { + g_warning ("gstjitterbuffer: asked to release an unknown pad"); + return; + } +} + static void gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer) { @@ -786,6 +959,31 @@ newseg_wrong_format: } } +static gboolean +gst_rtp_jitter_buffer_sink_rtcp_event (GstPad * pad, GstEvent * event) +{ + GstRtpJitterBuffer *jitterbuffer; + GstRtpJitterBufferPrivate *priv; + + jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad)); + priv = jitterbuffer->priv; + + GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event)); + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_FLUSH_START: + break; + case GST_EVENT_FLUSH_STOP: + break; + default: + break; + } + gst_event_unref (event); + gst_object_unref (jitterbuffer); + + return TRUE; +} + static gboolean gst_rtp_jitter_buffer_get_clock_rate (GstRtpJitterBuffer * jitterbuffer, guint8 pt) @@ -1335,6 +1533,124 @@ pause: } } +static GstFlowReturn +gst_rtp_jitter_buffer_chain_rtcp (GstPad * pad, GstBuffer * buffer) +{ + GstRtpJitterBuffer *jitterbuffer; + GstRtpJitterBufferPrivate *priv; + GstFlowReturn ret; + guint64 base_rtptime, timestamp; + guint32 clock_rate; + guint64 last_rtptime; + guint32 ssrc; + GstRTCPPacket packet; + guint64 ext_rtptime, diff; + guint32 rtptime; + gboolean drop = FALSE; + + jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad)); + + if (G_UNLIKELY (!gst_rtcp_buffer_validate (buffer))) + goto invalid_buffer; + + priv = jitterbuffer->priv; + + if (!gst_rtcp_buffer_get_first_packet (buffer, &packet)) + goto invalid_buffer; + + /* first packet must be SR or RR or else the validate would have failed */ + switch (gst_rtcp_packet_get_type (&packet)) { + case GST_RTCP_TYPE_SR: + gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, NULL, &rtptime, + NULL, NULL); + break; + default: + goto ignore_buffer; + } + + GST_DEBUG_OBJECT (jitterbuffer, "received RTCP of SSRC %08x", ssrc); + + JBUF_LOCK (priv); + /* convert the RTP timestamp to our extended timestamp, using the same offset + * we used in the jitterbuffer */ + ext_rtptime = priv->jbuf->ext_rtptime; + ext_rtptime = gst_rtp_buffer_ext_timestamp (&ext_rtptime, rtptime); + + /* get the last values from the jitterbuffer */ + rtp_jitter_buffer_get_sync (priv->jbuf, &base_rtptime, ×tamp, + &clock_rate, &last_rtptime); + + GST_DEBUG_OBJECT (jitterbuffer, "ext SR %" G_GUINT64_FORMAT ", base %" + G_GUINT64_FORMAT ", clock-rate %" G_GUINT32_FORMAT, + ext_rtptime, base_rtptime, clock_rate); + + if (base_rtptime == -1 || clock_rate == -1 || timestamp == -1) { + GST_DEBUG_OBJECT (jitterbuffer, "dropping, no RTP values"); + drop = TRUE; + } else { + /* we can't accept anything that happened before we did the last resync */ + if (base_rtptime > ext_rtptime) { + GST_DEBUG_OBJECT (jitterbuffer, "dropping, older than base time"); + drop = TRUE; + } else { + /* the SR RTP timestamp must be something close to what we last observed + * in the jitterbuffer */ + if (ext_rtptime > last_rtptime) { + /* check how far ahead it is to our RTP timestamps */ + diff = ext_rtptime - last_rtptime; + /* if bigger than 1 second, we drop it */ + if (diff > clock_rate) { + GST_DEBUG_OBJECT (jitterbuffer, "dropping, too far ahead"); + drop = TRUE; + } + GST_DEBUG_OBJECT (jitterbuffer, "ext last %" G_GUINT64_FORMAT ", diff %" + G_GUINT64_FORMAT, last_rtptime, diff); + } + } + } + JBUF_UNLOCK (priv); + + if (!drop) { + GstStructure *s; + + s = gst_structure_new ("application/x-rtp-sync", + "base-rtptime", G_TYPE_UINT64, base_rtptime, + "base-time", G_TYPE_UINT64, timestamp, + "clock-rate", G_TYPE_UINT, clock_rate, + "sr-ext-rtptime", G_TYPE_UINT64, ext_rtptime, + "sr-buffer", GST_TYPE_BUFFER, buffer, NULL); + + GST_DEBUG_OBJECT (jitterbuffer, "signaling sync"); + g_signal_emit (jitterbuffer, + gst_rtp_jitter_buffer_signals[SIGNAL_HANDLE_SYNC], 0, s); + gst_structure_free (s); + } else { + GST_DEBUG_OBJECT (jitterbuffer, "dropping RTCP packet"); + ret = GST_FLOW_OK; + } + +done: + gst_buffer_unref (buffer); + gst_object_unref (jitterbuffer); + + return ret; + +invalid_buffer: + { + /* this is not fatal but should be filtered earlier */ + GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL), + ("Received invalid RTCP payload, dropping")); + ret = GST_FLOW_OK; + goto done; + } +ignore_buffer: + { + GST_DEBUG_OBJECT (jitterbuffer, "ignoring RTCP packet"); + ret = GST_FLOW_OK; + goto done; + } +} + static gboolean gst_rtp_jitter_buffer_query (GstPad * pad, GstQuery * query) { @@ -1485,18 +1801,3 @@ gst_rtp_jitter_buffer_get_property (GObject * object, break; } } - -void -gst_rtp_jitter_buffer_get_sync (GstRtpJitterBuffer * buffer, guint64 * rtptime, - guint64 * timestamp, guint32 * clock_rate) -{ - GstRtpJitterBufferPrivate *priv; - - g_return_if_fail (GST_IS_RTP_JITTER_BUFFER (buffer)); - - priv = buffer->priv; - - JBUF_LOCK (priv); - rtp_jitter_buffer_get_sync (priv->jbuf, rtptime, timestamp, clock_rate); - JBUF_UNLOCK (priv); -} diff --git a/gst/rtpmanager/gstrtpjitterbuffer.h b/gst/rtpmanager/gstrtpjitterbuffer.h index 40908eabf3..45e689793a 100644 --- a/gst/rtpmanager/gstrtpjitterbuffer.h +++ b/gst/rtpmanager/gstrtpjitterbuffer.h @@ -71,6 +71,9 @@ struct _GstRtpJitterBufferClass /* signals */ GstCaps* (*request_pt_map) (GstRtpJitterBuffer *buffer, guint pt); + void (*handle_sync) (GstRtpJitterBuffer *buffer, GstStructure *s); + + /* actions */ void (*clear_pt_map) (GstRtpJitterBuffer *buffer); /*< private > */ @@ -79,10 +82,6 @@ struct _GstRtpJitterBufferClass GType gst_rtp_jitter_buffer_get_type (void); -void gst_rtp_jitter_buffer_get_sync (GstRtpJitterBuffer *buffer, - guint64 *rtptime, guint64 *timestamp, - guint32 *clock_rate); - G_END_DECLS #endif /* __GST_RTP_JITTER_BUFFER_H__ */ diff --git a/gst/rtpmanager/gstrtpssrcdemux.c b/gst/rtpmanager/gstrtpssrcdemux.c index e5b989f7f8..64394c45d4 100644 --- a/gst/rtpmanager/gstrtpssrcdemux.c +++ b/gst/rtpmanager/gstrtpssrcdemux.c @@ -137,7 +137,6 @@ struct _GstRtpSsrcDemuxPad GstPad *rtp_pad; GstCaps *caps; GstPad *rtcp_pad; - GstClockTime first_ts; }; /* find a src pad for a given SSRC, returns NULL if the SSRC was not found @@ -190,7 +189,6 @@ create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc, demuxpad->ssrc = ssrc; demuxpad->rtp_pad = rtp_pad; demuxpad->rtcp_pad = rtcp_pad; - demuxpad->first_ts = timestamp; GST_DEBUG_OBJECT (demux, "first timestamp %" GST_TIME_FORMAT, GST_TIME_ARGS (timestamp)); @@ -484,9 +482,6 @@ gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad, GstBuffer * buf) gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, NULL, NULL, NULL, NULL); break; - case GST_RTCP_TYPE_RR: - ssrc = gst_rtcp_packet_rr_get_ssrc (&packet); - break; default: goto invalid_rtcp; } @@ -599,9 +594,7 @@ gst_rtp_ssrc_demux_src_query (GstPad * pad, GstQuery * query) GST_DEBUG_OBJECT (demux, "peer min latency %" GST_TIME_FORMAT, GST_TIME_ARGS (min_latency)); - GST_DEBUG_OBJECT (demux, - "latency for SSRC %08x, latency %" GST_TIME_FORMAT, demuxpad->ssrc, - GST_TIME_ARGS (demuxpad->first_ts)); + GST_DEBUG_OBJECT (demux, "latency for SSRC %08x", demuxpad->ssrc); gst_query_set_latency (query, live, min_latency, max_latency); } diff --git a/gst/rtpmanager/rtpjitterbuffer.c b/gst/rtpmanager/rtpjitterbuffer.c index 6cfbab6198..d027dc2e96 100644 --- a/gst/rtpmanager/rtpjitterbuffer.c +++ b/gst/rtpmanager/rtpjitterbuffer.c @@ -107,6 +107,7 @@ rtp_jitter_buffer_reset_skew (RTPJitterBuffer * jbuf) jbuf->base_extrtp = -1; jbuf->clock_rate = -1; jbuf->ext_rtptime = -1; + jbuf->last_rtptime = -1; jbuf->window_pos = 0; jbuf->window_filling = TRUE; jbuf->window_min = 0; @@ -188,11 +189,15 @@ calculate_skew (RTPJitterBuffer * jbuf, guint32 rtptime, GstClockTime time, gstrtptime = gst_util_uint64_scale_int (ext_rtptime, GST_SECOND, clock_rate); - if (jbuf->clock_rate != -1 && jbuf->clock_rate != clock_rate) { - GST_DEBUG ("Clock rate changed from %" G_GUINT32_FORMAT " to %" + /* keep track of the last extended rtptime */ + jbuf->last_rtptime = ext_rtptime; + + if (jbuf->clock_rate != clock_rate) { + GST_WARNING ("Clock rate changed from %" G_GUINT32_FORMAT " to %" G_GUINT32_FORMAT, jbuf->clock_rate, clock_rate); jbuf->base_time = -1; jbuf->base_rtptime = -1; + jbuf->clock_rate = clock_rate; } /* first time, lock on to time and gstrtptime */ @@ -202,7 +207,6 @@ calculate_skew (RTPJitterBuffer * jbuf, guint32 rtptime, GstClockTime time, } if (G_UNLIKELY (jbuf->base_rtptime == -1)) { jbuf->base_rtptime = gstrtptime; - jbuf->clock_rate = clock_rate; jbuf->base_extrtp = ext_rtptime; GST_DEBUG ("Taking new base rtptime %" GST_TIME_FORMAT, GST_TIME_ARGS (gstrtptime)); @@ -213,10 +217,9 @@ calculate_skew (RTPJitterBuffer * jbuf, guint32 rtptime, GstClockTime time, else { /* elapsed time at sender, timestamps can go backwards and thus be smaller * than our base time, take a new base time in that case. */ - GST_DEBUG ("backward timestamps at server, taking new base time"); + GST_WARNING ("backward timestamps at server, taking new base time"); jbuf->base_time = time; jbuf->base_rtptime = gstrtptime; - jbuf->clock_rate = clock_rate; jbuf->base_extrtp = ext_rtptime; send_diff = 0; } @@ -245,12 +248,11 @@ calculate_skew (RTPJitterBuffer * jbuf, guint32 rtptime, GstClockTime time, * changed too quickly we have to resync because the server likely restarted * its timestamps. */ if (ABS (delta - jbuf->skew) > GST_SECOND) { - GST_DEBUG ("delta %" GST_TIME_FORMAT " too big, reset skew", + GST_WARNING ("delta %" GST_TIME_FORMAT " too big, reset skew", GST_TIME_ARGS (delta - jbuf->skew)); jbuf->base_time = time; jbuf->base_rtptime = gstrtptime; jbuf->base_extrtp = ext_rtptime; - jbuf->clock_rate = clock_rate; send_diff = 0; delta = 0; } @@ -536,13 +538,20 @@ rtp_jitter_buffer_get_ts_diff (RTPJitterBuffer * jbuf) * @rtptime: result RTP time * @timestamp: result GStreamer timestamp * @clock_rate: clock-rate of @rtptime + * @last_rtptime: last seen rtptime. * * Returns the relation between the RTP timestamp and the GStreamer timestamp * used for constructing timestamps. + * + * For extended RTP timestamp @rtptime with a clock-rate of @clock_rate, + * the GStreamer timestamp is currently @timestamp. + * + * The last seen extended RTP timestamp with clock-rate @clock-rate is returned in + * @last_rtptime. */ void rtp_jitter_buffer_get_sync (RTPJitterBuffer * jbuf, guint64 * rtptime, - guint64 * timestamp, guint32 * clock_rate) + guint64 * timestamp, guint32 * clock_rate, guint64 * last_rtptime) { if (rtptime) *rtptime = jbuf->base_extrtp; @@ -550,4 +559,6 @@ rtp_jitter_buffer_get_sync (RTPJitterBuffer * jbuf, guint64 * rtptime, *timestamp = jbuf->base_time + jbuf->skew; if (clock_rate) *clock_rate = jbuf->clock_rate; + if (last_rtptime) + *last_rtptime = jbuf->last_rtptime; } diff --git a/gst/rtpmanager/rtpjitterbuffer.h b/gst/rtpmanager/rtpjitterbuffer.h index 325f8f7ba9..aa00919045 100644 --- a/gst/rtpmanager/rtpjitterbuffer.h +++ b/gst/rtpmanager/rtpjitterbuffer.h @@ -59,6 +59,7 @@ struct _RTPJitterBuffer { guint32 clock_rate; GstClockTime base_extrtp; guint64 ext_rtptime; + guint64 last_rtptime; gint64 window[RTP_JITTER_BUFFER_MAX_WINDOW]; guint window_pos; guint window_size; @@ -92,7 +93,8 @@ guint rtp_jitter_buffer_num_packets (RTPJitterBuffer *jbuf) guint32 rtp_jitter_buffer_get_ts_diff (RTPJitterBuffer *jbuf); void rtp_jitter_buffer_get_sync (RTPJitterBuffer *jbuf, guint64 *rtptime, - guint64 *timestamp, guint32 *clock_rate); + guint64 *timestamp, guint32 *clock_rate, + guint64 *last_rtptime); #endif /* __RTP_JITTER_BUFFER_H__ */