rtsp-stream: use idle source in on_message_sent

When the underlying layers are running on_message_sent, this sometimes
causes the underlying layer to send more data, which will cause the
underlying layer to run callback on_message_sent again. This can go on
and on.

To break this chain, we introduce an idle source that takes care of
sending data if there are more to send when running callback

https://bugzilla.gnome.org/show_bug.cgi?id=797289
This commit is contained in:
Göran Jönsson 2018-10-18 07:25:05 +02:00 committed by Sebastian Dröge
parent ebafccb65a
commit 7cfd59820a
4 changed files with 87 additions and 15 deletions

View file

@ -2533,6 +2533,7 @@ handle_setup_request (GstRTSPClient * client, GstRTSPContext * ctx)
g_object_ref (trans);
add_data_seq (client, ct->interleaved.min);
add_data_seq (client, ct->interleaved.max);
gst_rtsp_stream_set_watch_context (stream, priv->watch_context);
}
/* create and serialize the server transport */

View file

@ -126,7 +126,7 @@ gst_rtsp_latency_bin_set_property (GObject * object, guint propid,
switch (propid) {
case PROP_ELEMENT:
if (!gst_rtsp_latency_bin_add_element (latency_bin,
g_value_get_object (value))) {
g_value_get_object (value))) {
GST_WARNING_OBJECT (latency_bin, "Could not add the element");
}
break;
@ -199,8 +199,7 @@ set_target_failed:
static gboolean
gst_rtsp_latency_bin_element_query (GstElement * element,
GstQuery * query)
gst_rtsp_latency_bin_element_query (GstElement * element, GstQuery * query)
{
gboolean ret = TRUE;
@ -215,8 +214,8 @@ gst_rtsp_latency_bin_element_query (GstElement * element,
break;
default:
ret =
GST_ELEMENT_CLASS (gst_rtsp_latency_bin_parent_class)->query (
GST_ELEMENT (element), query);
GST_ELEMENT_CLASS (gst_rtsp_latency_bin_parent_class)->query
(GST_ELEMENT (element), query);
break;
}
@ -224,8 +223,7 @@ gst_rtsp_latency_bin_element_query (GstElement * element,
}
static gboolean
gst_rtsp_latency_bin_element_event (GstElement * element,
GstEvent * event)
gst_rtsp_latency_bin_element_event (GstElement * element, GstEvent * event)
{
gboolean ret = TRUE;
@ -239,9 +237,9 @@ gst_rtsp_latency_bin_element_event (GstElement * element,
gst_event_unref (event);
break;
default:
ret = GST_ELEMENT_CLASS (
gst_rtsp_latency_bin_parent_class)->send_event (
GST_ELEMENT (element), event);
ret =
GST_ELEMENT_CLASS (gst_rtsp_latency_bin_parent_class)->send_event
(GST_ELEMENT (element), event);
break;
}
@ -302,8 +300,8 @@ gst_rtsp_latency_bin_message_handler (GstBin * bin, GstMessage * message)
break;
}
default:
GST_BIN_CLASS (gst_rtsp_latency_bin_parent_class)->handle_message (
bin, message);
GST_BIN_CLASS (gst_rtsp_latency_bin_parent_class)->handle_message (bin,
message);
break;
}
}

View file

@ -181,6 +181,7 @@ struct _GstRTSPStreamPrivate
GHashTable *ptmap;
GstRTSPPublishClockMode publish_clock_mode;
GMainContext *watch_context;
};
#define DEFAULT_CONTROL NULL
@ -297,6 +298,7 @@ gst_rtsp_stream_init (GstRTSPStream * stream)
NULL, (GDestroyNotify) gst_caps_unref);
priv->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
(GDestroyNotify) gst_caps_unref);
priv->watch_context = NULL;
}
typedef struct _UdpClientAddrInfo UdpClientAddrInfo;
@ -376,6 +378,9 @@ gst_rtsp_stream_finalize (GObject * obj)
g_hash_table_unref (priv->keys);
g_hash_table_destroy (priv->ptmap);
if (priv->watch_context)
g_main_context_unref (priv->watch_context);
G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
}
@ -4327,12 +4332,37 @@ mcast_error:
}
}
static gboolean
cb_send_tcp_message (GstRTSPStream * stream)
{
GstRTSPStreamPrivate *priv = stream->priv;
gint idx = -1;
gint i;
g_mutex_lock (&priv->lock);
/* iterate from 1 and down, so we prioritize RTCP over RTP */
for (i = 1; i >= 0; i--) {
if (priv->have_buffer[i]) {
/* send message */
idx = i;
break;
}
}
if (idx != -1)
send_tcp_message (stream, idx);
g_mutex_unlock (&priv->lock);
return G_SOURCE_REMOVE;
}
static void
on_message_sent (gpointer user_data)
{
GstRTSPStream *stream = user_data;
GstRTSPStreamPrivate *priv = stream->priv;
gint idx = -1;
GSource *idle_src;
GST_DEBUG_OBJECT (stream, "message send complete");
@ -4357,9 +4387,24 @@ on_message_sent (gpointer user_data)
}
}
if (idx != -1)
send_tcp_message (stream, idx);
if (idx != -1) {
/* When appsink running this callback we want to send as much as we can
* But when idle callback or watch callback is running we will first
* queue an idle probe. This so we prevent a loop to occur were callback
* is sending more data that then call the callback that sends more data
* and so on. If the loop occur then it will starve out handling off
* other events that are handled by watch's context. */
if (priv->watch_context && g_main_context_is_owner (priv->watch_context)) {
/* underlaying layer is running this callback */
idle_src = g_idle_source_new ();
g_source_set_callback (idle_src, (GSourceFunc) cb_send_tcp_message,
g_object_ref (stream), g_object_unref);
g_source_attach (idle_src, priv->watch_context);
} else {
/* appsink is running this callback */
send_tcp_message (stream, idx);
}
}
g_mutex_unlock (&priv->lock);
return;
@ -5728,3 +5773,28 @@ gst_rtsp_stream_get_ulpfec_percentage (GstRTSPStream * stream)
return res;
}
/**
* gst_rtsp_stream_set_watch_context:
* @stream: a #GstRTSPStream
* @context: a #GMainContext
*
* Sets stream private watch_context.
*
*/
void
gst_rtsp_stream_set_watch_context (GstRTSPStream * stream,
GMainContext * context)
{
GstRTSPStreamPrivate *priv;
priv = stream->priv;
g_mutex_lock (&priv->lock);
if (priv->watch_context != NULL) {
g_main_context_unref (priv->watch_context);
priv->watch_context = NULL;
}
if (context)
priv->watch_context = g_main_context_ref (context);
g_mutex_unlock (&priv->lock);
}

View file

@ -354,6 +354,9 @@ void gst_rtsp_stream_set_ulpfec_percentage (GstRTSPStream *stream,
GST_RTSP_SERVER_API
guint gst_rtsp_stream_get_ulpfec_percentage (GstRTSPStream *stream);
GST_RTSP_SERVER_API
void gst_rtsp_stream_set_watch_context (GstRTSPStream * stream, GMainContext * context);
/**
* GstRTSPStreamTransportFilterFunc:
* @stream: a #GstRTSPStream object