client: Add drop-backlog property

When we have too many messages queued for a client (currently hardcoded
to 100) we overflow and drop the messages. Add a drop-backlog property
to control this behaviour. Setting this property to FALSE will retry
to send the messages to the client by waiting for more room in the
backlog.

Fixes https://bugzilla.gnome.org/show_bug.cgi?id=725898
This commit is contained in:
Göran Jönsson 2014-04-01 13:04:21 +02:00 committed by Wim Taymans
parent 0493a63a65
commit 11369d38ef
2 changed files with 60 additions and 7 deletions

View file

@ -81,6 +81,8 @@ struct _GstRTSPClientPrivate
GList *transports;
GList *sessions;
gboolean drop_backlog;
};
static GMutex tunnels_lock;
@ -88,12 +90,14 @@ static GHashTable *tunnels; /* protected by tunnels_lock */
#define DEFAULT_SESSION_POOL NULL
#define DEFAULT_MOUNT_POINTS NULL
#define DEFAULT_DROP_BACKLOG TRUE
enum
{
PROP_0,
PROP_SESSION_POOL,
PROP_MOUNT_POINTS,
PROP_DROP_BACKLOG,
PROP_LAST
};
@ -174,6 +178,11 @@ gst_rtsp_client_class_init (GstRTSPClientClass * klass)
GST_TYPE_RTSP_MOUNT_POINTS,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_DROP_BACKLOG,
g_param_spec_boolean ("drop-backlog", "Drop Backlog",
"Drop data when the backlog queue is full",
DEFAULT_DROP_BACKLOG, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
gst_rtsp_client_signals[SIGNAL_CLOSED] =
g_signal_new ("closed", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
G_STRUCT_OFFSET (GstRTSPClientClass, closed), NULL, NULL,
@ -255,6 +264,7 @@ gst_rtsp_client_init (GstRTSPClient * client)
g_mutex_init (&priv->lock);
g_mutex_init (&priv->send_lock);
priv->close_seq = 0;
priv->drop_backlog = DEFAULT_DROP_BACKLOG;
}
static GstRTSPFilterResult
@ -341,6 +351,8 @@ gst_rtsp_client_finalize (GObject * obj)
GST_INFO ("finalize client %p", client);
if (priv->watch)
gst_rtsp_watch_set_flushing (priv->watch, TRUE);
gst_rtsp_client_set_send_func (client, NULL, NULL, NULL);
if (priv->watch)
@ -378,6 +390,7 @@ gst_rtsp_client_get_property (GObject * object, guint propid,
GValue * value, GParamSpec * pspec)
{
GstRTSPClient *client = GST_RTSP_CLIENT (object);
GstRTSPClientPrivate *priv = client->priv;
switch (propid) {
case PROP_SESSION_POOL:
@ -386,6 +399,9 @@ gst_rtsp_client_get_property (GObject * object, guint propid,
case PROP_MOUNT_POINTS:
g_value_take_object (value, gst_rtsp_client_get_mount_points (client));
break;
case PROP_DROP_BACKLOG:
g_value_set_boolean (value, priv->drop_backlog);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
}
@ -396,6 +412,7 @@ gst_rtsp_client_set_property (GObject * object, guint propid,
const GValue * value, GParamSpec * pspec)
{
GstRTSPClient *client = GST_RTSP_CLIENT (object);
GstRTSPClientPrivate *priv = client->priv;
switch (propid) {
case PROP_SESSION_POOL:
@ -404,6 +421,11 @@ gst_rtsp_client_set_property (GObject * object, guint propid,
case PROP_MOUNT_POINTS:
gst_rtsp_client_set_mount_points (client, g_value_get_object (value));
break;
case PROP_DROP_BACKLOG:
g_mutex_lock (&priv->lock);
priv->drop_backlog = g_value_get_boolean (value);
g_mutex_unlock (&priv->lock);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
}
@ -2842,11 +2864,42 @@ do_send_message (GstRTSPClient * client, GstRTSPMessage * message,
gboolean close, gpointer user_data)
{
GstRTSPClientPrivate *priv = client->priv;
GstRTSPResult ret;
GTimeVal time;
/* send the response and store the seq number so we can wait until it's
* written to the client to close the connection */
return gst_rtsp_watch_send_message (priv->watch, message, close ?
&priv->close_seq : NULL);
time.tv_sec = 1;
time.tv_usec = 0;
do {
/* send the response and store the seq number so we can wait until it's
* written to the client to close the connection */
ret =
gst_rtsp_watch_send_message (priv->watch, message,
close ? &priv->close_seq : NULL);
if (ret == GST_RTSP_OK)
break;
if (ret != GST_RTSP_ENOMEM)
goto error;
/* drop backlog */
if (priv->drop_backlog)
break;
/* queue was full, wait for more space */
GST_DEBUG_OBJECT (client, "waiting for backlog");
ret = gst_rtsp_watch_wait_backlog (priv->watch, &time);
GST_DEBUG_OBJECT (client, "Resend due to backlog full");
} while (ret != GST_RTSP_EINTR);
return ret;
/* ERRORS */
error:
{
GST_DEBUG_OBJECT (client, "got error %d", ret);
return ret;
}
}
static GstRTSPResult
@ -2886,6 +2939,7 @@ closed (GstRTSPWatch * watch, gpointer user_data)
g_mutex_unlock (&tunnels_lock);
}
gst_rtsp_watch_set_flushing (watch, TRUE);
gst_rtsp_client_set_send_func (client, NULL, NULL, NULL);
return GST_RTSP_OK;

View file

@ -1194,7 +1194,6 @@ gst_rtsp_media_collect_streams (GstRTSPMedia * media)
name = g_strdup_printf ("dynpay%d", i);
if ((elem = gst_bin_get_by_name (GST_BIN (element), name))) {
/* a stream that will dynamically create pads to provide RTP packets */
GST_INFO ("found dynamic element %d, %p", i, elem);
g_mutex_lock (&priv->lock);
@ -1936,7 +1935,7 @@ pad_added_cb (GstElement * element, GstPad * pad, GstRTSPMedia * media)
/* join the element in the PAUSED state because this callback is
* called from the streaming thread and it is PAUSED */
if (!gst_rtsp_stream_join_bin (stream, GST_BIN (priv->pipeline),
priv->rtpbin, GST_STATE_PAUSED)) {
priv->rtpbin, GST_STATE_PAUSED)) {
GST_WARNING ("failed to join bin element");
}
@ -2092,7 +2091,7 @@ start_prepare (GstRTSPMedia * media)
stream = g_ptr_array_index (priv->streams, i);
if (!gst_rtsp_stream_join_bin (stream, GST_BIN (priv->pipeline),
priv->rtpbin, GST_STATE_NULL)) {
priv->rtpbin, GST_STATE_NULL)) {
goto join_bin_failed;
}
}