diff --git a/gst/rtsp-server/rtsp-client.c b/gst/rtsp-server/rtsp-client.c index be12782a4c..70196a1091 100644 --- a/gst/rtsp-server/rtsp-client.c +++ b/gst/rtsp-server/rtsp-client.c @@ -2533,6 +2533,7 @@ handle_setup_request (GstRTSPClient * client, GstRTSPContext * ctx) g_object_ref (trans); add_data_seq (client, ct->interleaved.min); add_data_seq (client, ct->interleaved.max); + gst_rtsp_stream_set_watch_context (stream, priv->watch_context); } /* create and serialize the server transport */ diff --git a/gst/rtsp-server/rtsp-latency-bin.c b/gst/rtsp-server/rtsp-latency-bin.c index 6a12fbe9de..cf7cdf1c33 100644 --- a/gst/rtsp-server/rtsp-latency-bin.c +++ b/gst/rtsp-server/rtsp-latency-bin.c @@ -126,7 +126,7 @@ gst_rtsp_latency_bin_set_property (GObject * object, guint propid, switch (propid) { case PROP_ELEMENT: if (!gst_rtsp_latency_bin_add_element (latency_bin, - g_value_get_object (value))) { + g_value_get_object (value))) { GST_WARNING_OBJECT (latency_bin, "Could not add the element"); } break; @@ -199,8 +199,7 @@ set_target_failed: static gboolean -gst_rtsp_latency_bin_element_query (GstElement * element, - GstQuery * query) +gst_rtsp_latency_bin_element_query (GstElement * element, GstQuery * query) { gboolean ret = TRUE; @@ -215,8 +214,8 @@ gst_rtsp_latency_bin_element_query (GstElement * element, break; default: ret = - GST_ELEMENT_CLASS (gst_rtsp_latency_bin_parent_class)->query ( - GST_ELEMENT (element), query); + GST_ELEMENT_CLASS (gst_rtsp_latency_bin_parent_class)->query + (GST_ELEMENT (element), query); break; } @@ -224,8 +223,7 @@ gst_rtsp_latency_bin_element_query (GstElement * element, } static gboolean -gst_rtsp_latency_bin_element_event (GstElement * element, - GstEvent * event) +gst_rtsp_latency_bin_element_event (GstElement * element, GstEvent * event) { gboolean ret = TRUE; @@ -239,9 +237,9 @@ gst_rtsp_latency_bin_element_event (GstElement * element, gst_event_unref (event); break; default: - ret = GST_ELEMENT_CLASS ( - gst_rtsp_latency_bin_parent_class)->send_event ( - GST_ELEMENT (element), event); + ret = + GST_ELEMENT_CLASS (gst_rtsp_latency_bin_parent_class)->send_event + (GST_ELEMENT (element), event); break; } @@ -302,8 +300,8 @@ gst_rtsp_latency_bin_message_handler (GstBin * bin, GstMessage * message) break; } default: - GST_BIN_CLASS (gst_rtsp_latency_bin_parent_class)->handle_message ( - bin, message); + GST_BIN_CLASS (gst_rtsp_latency_bin_parent_class)->handle_message (bin, + message); break; } } diff --git a/gst/rtsp-server/rtsp-stream.c b/gst/rtsp-server/rtsp-stream.c index 18ec2925c0..60c13301d8 100644 --- a/gst/rtsp-server/rtsp-stream.c +++ b/gst/rtsp-server/rtsp-stream.c @@ -181,6 +181,7 @@ struct _GstRTSPStreamPrivate GHashTable *ptmap; GstRTSPPublishClockMode publish_clock_mode; + GMainContext *watch_context; }; #define DEFAULT_CONTROL NULL @@ -297,6 +298,7 @@ gst_rtsp_stream_init (GstRTSPStream * stream) NULL, (GDestroyNotify) gst_caps_unref); priv->ptmap = g_hash_table_new_full (NULL, NULL, NULL, (GDestroyNotify) gst_caps_unref); + priv->watch_context = NULL; } typedef struct _UdpClientAddrInfo UdpClientAddrInfo; @@ -376,6 +378,9 @@ gst_rtsp_stream_finalize (GObject * obj) g_hash_table_unref (priv->keys); g_hash_table_destroy (priv->ptmap); + if (priv->watch_context) + g_main_context_unref (priv->watch_context); + G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj); } @@ -4327,12 +4332,37 @@ mcast_error: } } +static gboolean +cb_send_tcp_message (GstRTSPStream * stream) +{ + GstRTSPStreamPrivate *priv = stream->priv; + gint idx = -1; + gint i; + + g_mutex_lock (&priv->lock); + + /* iterate from 1 and down, so we prioritize RTCP over RTP */ + for (i = 1; i >= 0; i--) { + if (priv->have_buffer[i]) { + /* send message */ + idx = i; + break; + } + } + + if (idx != -1) + send_tcp_message (stream, idx); + g_mutex_unlock (&priv->lock); + return G_SOURCE_REMOVE; +} + static void on_message_sent (gpointer user_data) { GstRTSPStream *stream = user_data; GstRTSPStreamPrivate *priv = stream->priv; gint idx = -1; + GSource *idle_src; GST_DEBUG_OBJECT (stream, "message send complete"); @@ -4357,9 +4387,24 @@ on_message_sent (gpointer user_data) } } - if (idx != -1) - send_tcp_message (stream, idx); - + if (idx != -1) { + /* When appsink running this callback we want to send as much as we can + * But when idle callback or watch callback is running we will first + * queue an idle probe. This so we prevent a loop to occur were callback + * is sending more data that then call the callback that sends more data + * and so on. If the loop occur then it will starve out handling off + * other events that are handled by watch's context. */ + if (priv->watch_context && g_main_context_is_owner (priv->watch_context)) { + /* underlaying layer is running this callback */ + idle_src = g_idle_source_new (); + g_source_set_callback (idle_src, (GSourceFunc) cb_send_tcp_message, + g_object_ref (stream), g_object_unref); + g_source_attach (idle_src, priv->watch_context); + } else { + /* appsink is running this callback */ + send_tcp_message (stream, idx); + } + } g_mutex_unlock (&priv->lock); return; @@ -5728,3 +5773,28 @@ gst_rtsp_stream_get_ulpfec_percentage (GstRTSPStream * stream) return res; } + +/** + * gst_rtsp_stream_set_watch_context: + * @stream: a #GstRTSPStream + * @context: a #GMainContext + * + * Sets stream private watch_context. + * + */ +void +gst_rtsp_stream_set_watch_context (GstRTSPStream * stream, + GMainContext * context) +{ + GstRTSPStreamPrivate *priv; + priv = stream->priv; + + g_mutex_lock (&priv->lock); + if (priv->watch_context != NULL) { + g_main_context_unref (priv->watch_context); + priv->watch_context = NULL; + } + if (context) + priv->watch_context = g_main_context_ref (context); + g_mutex_unlock (&priv->lock); +} diff --git a/gst/rtsp-server/rtsp-stream.h b/gst/rtsp-server/rtsp-stream.h index 7910bb05d7..53ad57c942 100644 --- a/gst/rtsp-server/rtsp-stream.h +++ b/gst/rtsp-server/rtsp-stream.h @@ -354,6 +354,9 @@ void gst_rtsp_stream_set_ulpfec_percentage (GstRTSPStream *stream, GST_RTSP_SERVER_API guint gst_rtsp_stream_get_ulpfec_percentage (GstRTSPStream *stream); +GST_RTSP_SERVER_API +void gst_rtsp_stream_set_watch_context (GstRTSPStream * stream, GMainContext * context); + /** * GstRTSPStreamTransportFilterFunc: * @stream: a #GstRTSPStream object