From 12bc7258b923effe77e4efc92295a0cc42c01f10 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Thu, 23 Dec 2010 15:24:29 +0100 Subject: [PATCH] rtspsrc: improve RTP session handling Store the RTP session in the stream so that we can more efficiently perform actions on the stream based on RTP signals. --- gst/rtsp/gstrtspsrc.c | 165 +++++++++++++++++++++--------------------- gst/rtsp/gstrtspsrc.h | 9 ++- 2 files changed, 90 insertions(+), 84 deletions(-) diff --git a/gst/rtsp/gstrtspsrc.c b/gst/rtsp/gstrtspsrc.c index a21a14ad2a..5b2e84d70f 100644 --- a/gst/rtsp/gstrtspsrc.c +++ b/gst/rtsp/gstrtspsrc.c @@ -1126,6 +1126,10 @@ gst_rtspsrc_stream_free (GstRTSPSrc * src, GstRTSPStream * stream) gst_object_unref (stream->rtcppad); stream->rtcppad = NULL; } + if (stream->session) { + g_object_unref (stream->session); + stream->session = NULL; + } g_free (stream); } @@ -1143,14 +1147,14 @@ gst_rtspsrc_cleanup (GstRTSPSrc * src) } g_list_free (src->streams); src->streams = NULL; - if (src->session) { - if (src->session_sig_id) { - g_signal_handler_disconnect (src->session, src->session_sig_id); - src->session_sig_id = 0; + if (src->manager) { + if (src->manager_sig_id) { + g_signal_handler_disconnect (src->manager, src->manager_sig_id); + src->manager_sig_id = 0; } - gst_element_set_state (src->session, GST_STATE_NULL); - gst_bin_remove (GST_BIN_CAST (src), src->session); - src->session = NULL; + gst_element_set_state (src->manager, GST_STATE_NULL); + gst_bin_remove (GST_BIN_CAST (src), src->manager); + src->manager = NULL; } src->numstreams = 0; if (src->props) @@ -1672,8 +1676,8 @@ gst_rtspsrc_flush (GstRTSPSrc * src, gboolean flush) if (base_time != -1) gst_element_set_base_time (GST_ELEMENT_CAST (src), base_time); /* to manage jitterbuffer buffer mode */ - if (src->session) - gst_element_set_base_time (GST_ELEMENT_CAST (src->session), base_time); + if (src->manager) + gst_element_set_base_time (GST_ELEMENT_CAST (src->manager), base_time); } static GstRTSPResult @@ -2138,7 +2142,7 @@ was_ok: /* this callback is called when the session manager generated a new src pad with * payloaded RTP packets. We simply ghost the pad here. */ static void -new_session_pad (GstElement * session, GstPad * pad, GstRTSPSrc * src) +new_manager_pad (GstElement * manager, GstPad * pad, GstRTSPSrc * src) { gchar *name; GstPadTemplate *template; @@ -2147,7 +2151,7 @@ new_session_pad (GstElement * session, GstPad * pad, GstRTSPSrc * src) GstRTSPStream *stream; gboolean all_added; - GST_DEBUG_OBJECT (src, "got new session pad %" GST_PTR_FORMAT, pad); + GST_DEBUG_OBJECT (src, "got new manager pad %" GST_PTR_FORMAT, pad); GST_RTSP_STATE_LOCK (src); /* find stream */ @@ -2210,7 +2214,7 @@ unknown_stream: } static GstCaps * -request_pt_map (GstElement * sess, guint session, guint pt, GstRTSPSrc * src) +request_pt_map (GstElement * manager, guint session, guint pt, GstRTSPSrc * src) { GstRTSPStream *stream; GstCaps *caps; @@ -2238,17 +2242,12 @@ unknown_stream: } static void -gst_rtspsrc_do_stream_eos (GstRTSPSrc * src, guint session) +gst_rtspsrc_do_stream_eos (GstRTSPSrc * src, GstRTSPStream * stream) { - GstRTSPStream *stream; + guint session = stream->id; GST_DEBUG_OBJECT (src, "setting stream for session %u to EOS", session); - /* get stream for session */ - stream = find_stream (src, &session, (gpointer) find_stream_by_id); - if (!stream) - goto unknown_stream; - if (stream->eos) goto was_eos; @@ -2257,11 +2256,6 @@ gst_rtspsrc_do_stream_eos (GstRTSPSrc * src, guint session) return; /* ERRORS */ -unknown_stream: - { - GST_DEBUG_OBJECT (src, "unknown stream for session %u", session); - return; - } was_eos: { GST_DEBUG_OBJECT (src, "stream for session %u was already EOS", session); @@ -2270,30 +2264,41 @@ was_eos: } static void -on_bye_ssrc (GstElement * manager, guint session, guint32 ssrc, - GstRTSPSrc * src) +on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream) { - GST_DEBUG_OBJECT (src, "SSRC %08x in session %u received BYE", ssrc, session); + GstRTSPSrc *src = stream->parent; - gst_rtspsrc_do_stream_eos (src, session); + GST_DEBUG_OBJECT (src, "source in session %u received BYE", stream->id); + + gst_rtspsrc_do_stream_eos (src, stream); } static void -on_timeout (GstElement * manager, guint session, guint32 ssrc, GstRTSPSrc * src) +on_timeout (GObject * session, GObject * source, GstRTSPStream * stream) { - GST_DEBUG_OBJECT (src, "SSRC %08x in session %u timed out", ssrc, session); + GstRTSPSrc *src = stream->parent; - gst_rtspsrc_do_stream_eos (src, session); + GST_DEBUG_OBJECT (src, "source in session %u timed out", stream->id); + + gst_rtspsrc_do_stream_eos (src, stream); } static void -on_npt_stop (GstElement * manager, guint session, guint32 ssrc, - GstRTSPSrc * src) +on_npt_stop (GObject * session, GObject * source, GstRTSPStream * stream) { - GST_DEBUG_OBJECT (src, "SSRC %08x in session %u reached the NPT stop", ssrc, - session); + GstRTSPSrc *src = stream->parent; - gst_rtspsrc_do_stream_eos (src, session); + GST_DEBUG_OBJECT (src, "source in session %u reached NPT stop", stream->id); + + gst_rtspsrc_do_stream_eos (src, stream); +} + +static void +on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream) +{ + GstRTSPSrc *src = stream->parent; + + GST_DEBUG_OBJECT (src, "source in session %u is active", stream->id); } /* try to get and configure a manager */ @@ -2313,11 +2318,11 @@ gst_rtspsrc_stream_configure_manager (GstRTSPSrc * src, GstRTSPStream * stream, GST_DEBUG_OBJECT (src, "using manager %s", manager); /* configure the manager */ - if (src->session == NULL) { + if (src->manager == NULL) { GObjectClass *klass; GstState target; - if (!(src->session = gst_element_factory_make (manager, NULL))) { + if (!(src->manager = gst_element_factory_make (manager, NULL))) { /* fallback */ if (gst_rtsp_transport_get_manager (transport->trans, &manager, 1) < 0) goto no_manager; @@ -2325,27 +2330,27 @@ gst_rtspsrc_stream_configure_manager (GstRTSPSrc * src, GstRTSPStream * stream, if (!manager) goto use_no_manager; - if (!(src->session = gst_element_factory_make (manager, NULL))) + if (!(src->manager = gst_element_factory_make (manager, NULL))) goto manager_failed; } /* we manage this element */ - gst_bin_add (GST_BIN_CAST (src), src->session); + gst_bin_add (GST_BIN_CAST (src), src->manager); GST_OBJECT_LOCK (src); target = GST_STATE_TARGET (src); GST_OBJECT_UNLOCK (src); - ret = gst_element_set_state (src->session, target); + ret = gst_element_set_state (src->manager, target); if (ret == GST_STATE_CHANGE_FAILURE) - goto start_session_failure; + goto start_manager_failure; - g_object_set (src->session, "latency", src->latency, NULL); + g_object_set (src->manager, "latency", src->latency, NULL); - klass = G_OBJECT_GET_CLASS (G_OBJECT (src->session)); + klass = G_OBJECT_GET_CLASS (G_OBJECT (src->manager)); if (g_object_class_find_property (klass, "buffer-mode")) { if (src->buffer_mode != BUFFER_MODE_AUTO) { - g_object_set (src->session, "buffer-mode", src->buffer_mode, NULL); + g_object_set (src->manager, "buffer-mode", src->buffer_mode, NULL); } else { gboolean need_slave; GstStructure *s; @@ -2368,11 +2373,11 @@ gst_rtspsrc_stream_configure_manager (GstRTSPSrc * src, GstRTSPStream * stream, if (GST_CLOCK_TIME_IS_VALID (src->segment.duration) && src->segment.duration && !need_slave) { GST_DEBUG_OBJECT (src, "selected buffer"); - g_object_set (src->session, "buffer-mode", BUFFER_MODE_BUFFER, + g_object_set (src->manager, "buffer-mode", BUFFER_MODE_BUFFER, NULL); } else { GST_DEBUG_OBJECT (src, "selected slave"); - g_object_set (src->session, "buffer-mode", BUFFER_MODE_SLAVE, NULL); + g_object_set (src->manager, "buffer-mode", BUFFER_MODE_SLAVE, NULL); } } } @@ -2380,46 +2385,35 @@ gst_rtspsrc_stream_configure_manager (GstRTSPSrc * src, GstRTSPStream * stream, /* connect to signals if we did not already do so */ GST_DEBUG_OBJECT (src, "connect to signals on session manager, stream %p", stream); - src->session_sig_id = - g_signal_connect (src->session, "pad-added", - (GCallback) new_session_pad, src); - src->session_ptmap_id = - g_signal_connect (src->session, "request-pt-map", + src->manager_sig_id = + g_signal_connect (src->manager, "pad-added", + (GCallback) new_manager_pad, src); + src->manager_ptmap_id = + g_signal_connect (src->manager, "request-pt-map", (GCallback) request_pt_map, src); - g_signal_connect (src->session, "on-bye-ssrc", (GCallback) on_bye_ssrc, - src); - g_signal_connect (src->session, "on-bye-timeout", (GCallback) on_timeout, - src); - g_signal_connect (src->session, "on-timeout", (GCallback) on_timeout, - src); - /* FIXME: remove this once the rdtmanager is released */ - if (g_signal_lookup ("on-npt-stop", G_OBJECT_TYPE (src->session)) != 0) { - g_signal_connect (src->session, "on-npt-stop", (GCallback) on_npt_stop, - src); - } else { - GST_INFO_OBJECT (src, "skipping on-npt-stop handling, not implemented"); - } } /* we stream directly to the manager, get some pads. Each RTSP stream goes * into a separate RTP session. */ name = g_strdup_printf ("recv_rtp_sink_%d", stream->id); - stream->channelpad[0] = gst_element_get_request_pad (src->session, name); + stream->channelpad[0] = gst_element_get_request_pad (src->manager, name); g_free (name); name = g_strdup_printf ("recv_rtcp_sink_%d", stream->id); - stream->channelpad[1] = gst_element_get_request_pad (src->session, name); + stream->channelpad[1] = gst_element_get_request_pad (src->manager, name); g_free (name); - /* now configure the bandwidth in the session */ + /* now configure the bandwidth in the manager */ if (g_signal_lookup ("get-internal-session", - G_OBJECT_TYPE (src->session)) != 0) { + G_OBJECT_TYPE (src->manager)) != 0) { GObject *rtpsession; - g_signal_emit_by_name (src->session, "get-internal-session", stream->id, + g_signal_emit_by_name (src->manager, "get-internal-session", stream->id, &rtpsession); if (rtpsession) { GST_INFO_OBJECT (src, "configure bandwidth in session %p", rtpsession); + stream->session = rtpsession; + if (stream->as_bandwidth != -1) { GST_INFO_OBJECT (src, "setting AS: %f", (gdouble) (stream->as_bandwidth * 1000)); @@ -2436,7 +2430,16 @@ gst_rtspsrc_stream_configure_manager (GstRTSPSrc * src, GstRTSPStream * stream, g_object_set (rtpsession, "rtcp-rs-bandwidth", stream->rs_bandwidth, NULL); } - g_object_unref (rtpsession); + g_signal_connect (rtpsession, "on-bye-ssrc", (GCallback) on_bye_ssrc, + stream); + g_signal_connect (rtpsession, "on-bye-timeout", (GCallback) on_timeout, + stream); + g_signal_connect (rtpsession, "on-timeout", (GCallback) on_timeout, + stream); + g_signal_connect (rtpsession, "on-npt-stop", (GCallback) on_npt_stop, + stream); + g_signal_connect (rtpsession, "on-ssrc-active", + (GCallback) on_ssrc_active, stream); } } } @@ -2455,9 +2458,9 @@ manager_failed: GST_DEBUG_OBJECT (src, "no session manager element %s found", manager); return FALSE; } -start_session_failure: +start_manager_failure: { - GST_DEBUG_OBJECT (src, "could not start session"); + GST_DEBUG_OBJECT (src, "could not start session manager"); return FALSE; } } @@ -2545,7 +2548,7 @@ gst_rtspsrc_stream_configure_tcp (GstRTSPSrc * src, GstRTSPStream * stream, gst_object_unref (template); } /* setup RTCP transport back to the server if we have to. */ - if (src->session && src->do_rtcp) { + if (src->manager && src->do_rtcp) { GstPad *pad; template = gst_static_pad_template_get (&anysinktemplate); @@ -2557,7 +2560,7 @@ gst_rtspsrc_stream_configure_tcp (GstRTSPSrc * src, GstRTSPStream * stream, /* get session RTCP pad */ name = g_strdup_printf ("send_rtcp_src_%d", stream->id); - pad = gst_element_get_request_pad (src->session, name); + pad = gst_element_get_request_pad (src->manager, name); g_free (name); /* and link */ @@ -2780,7 +2783,7 @@ gst_rtspsrc_stream_configure_udp_sinks (GstRTSPSrc * src, do_rtp = (rtp_port != -1); /* it's possible that the server does not want us to send RTCP in which case * the port is -1 */ - do_rtcp = (rtcp_port != -1 && src->session != NULL && src->do_rtcp); + do_rtcp = (rtcp_port != -1 && src->manager != NULL && src->do_rtcp); /* we need a destination when we have RTP or RTCP ports */ if (destination == NULL && (do_rtp || do_rtcp)) @@ -2889,7 +2892,7 @@ gst_rtspsrc_stream_configure_udp_sinks (GstRTSPSrc * src, /* get session RTCP pad */ name = g_strdup_printf ("send_rtcp_src_%d", stream->id); - pad = gst_element_get_request_pad (src->session, name); + pad = gst_element_get_request_pad (src->manager, name); g_free (name); /* and link */ @@ -3066,7 +3069,7 @@ gst_rtspsrc_activate_streams (GstRTSPSrc * src) if (stream->srcpad) { /* if we don't have a session manager, set the caps now. If we have a * session, we will get a notification of the pad and the caps. */ - if (!src->session) { + if (!src->manager) { GST_DEBUG_OBJECT (src, "setting pad caps for stream %p", stream); gst_pad_set_caps (stream->srcpad, stream->caps); } @@ -3134,9 +3137,9 @@ gst_rtspsrc_configure_caps (GstRTSPSrc * src, GstSegment * segment) } GST_DEBUG_OBJECT (src, "stream %p, caps %" GST_PTR_FORMAT, stream, caps); } - if (src->session) { + if (src->manager) { GST_DEBUG_OBJECT (src, "clear session"); - g_signal_emit_by_name (src->session, "clear-pt-map", NULL); + g_signal_emit_by_name (src->manager, "clear-pt-map", NULL); } } @@ -3592,7 +3595,7 @@ gst_rtspsrc_loop_interleaved (GstRTSPSrc * src) src->need_activate = FALSE; } - if (!src->session) { + if (!src->manager) { /* set stream caps on buffer when we don't have a session manager to do it * for us */ gst_buffer_set_caps (buf, stream->caps); diff --git a/gst/rtsp/gstrtspsrc.h b/gst/rtsp/gstrtspsrc.h index bdaff49b0d..00861b6da9 100644 --- a/gst/rtsp/gstrtspsrc.h +++ b/gst/rtsp/gstrtspsrc.h @@ -138,6 +138,9 @@ struct _GstRTSPStream { /* per stream connection */ GstRTSPConnInfo conninfo; + /* session */ + GObject *session; + /* bandwidth */ guint as_bandwidth; guint rs_bandwidth; @@ -233,9 +236,9 @@ struct _GstRTSPSrc { gboolean seekable; /* session management */ - GstElement *session; - gulong session_sig_id; - gulong session_ptmap_id; + GstElement *manager; + gulong manager_sig_id; + gulong manager_ptmap_id; GstRTSPConnInfo conninfo;