rtspconnection: add limit to queued messages

Add a limit to the amount of queued bytes or messages we allow on the watch.

API: GstRTSPConnection::gst_rtsp_watch_set_send_backlog()
API: GstRTSPConnection::gst_rtsp_watch_get_send_backlog()
This commit is contained in:
Wim Taymans 2012-12-14 11:36:58 +01:00
parent 66cafcc34d
commit 65c5ecd270
2 changed files with 91 additions and 3 deletions

View file

@ -2873,10 +2873,13 @@ struct _GstRTSPWatch
guint id; guint id;
GMutex mutex; GMutex mutex;
GQueue *messages; GQueue *messages;
gsize messages_bytes;
guint8 *write_data; guint8 *write_data;
guint write_off; guint write_off;
guint write_size; guint write_size;
guint write_id; guint write_id;
gsize max_bytes;
guint max_messages;
GstRTSPWatchFuncs funcs; GstRTSPWatchFuncs funcs;
@ -3029,6 +3032,8 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED,
if (rec == NULL) if (rec == NULL)
break; break;
watch->messages_bytes -= rec->size;
watch->write_off = 0; watch->write_off = 0;
watch->write_data = rec->data; watch->write_data = rec->data;
watch->write_size = rec->size; watch->write_size = rec->size;
@ -3134,6 +3139,7 @@ gst_rtsp_source_finalize (GSource * source)
g_queue_foreach (watch->messages, (GFunc) gst_rtsp_rec_free, NULL); g_queue_foreach (watch->messages, (GFunc) gst_rtsp_rec_free, NULL);
g_queue_free (watch->messages); g_queue_free (watch->messages);
watch->messages = NULL; watch->messages = NULL;
watch->messages_bytes = 0;
g_free (watch->write_data); g_free (watch->write_data);
g_mutex_clear (&watch->mutex); g_mutex_clear (&watch->mutex);
@ -3262,10 +3268,64 @@ gst_rtsp_watch_unref (GstRTSPWatch * watch)
g_source_unref ((GSource *) watch); g_source_unref ((GSource *) watch);
} }
/**
* gst_rtsp_watch_set_send_backlog:
* @watch: a #GstRTSPWatch
* @bytes: maximum bytes
* @messages: maximum messages
*
* Set the maximum amount of bytes and messages that will be queued in @watch.
* When the maximum amounts are exceeded, gst_rtsp_watch_write_data() and
* gst_rtsp_watch_send_message() will return #GST_RTSP_ENOMEM.
*
* A value of 0 for @bytes or @messages means no limits.
*
* Since: 1.1.1
*/
void
gst_rtsp_watch_set_send_backlog (GstRTSPWatch * watch,
gsize bytes, guint messages)
{
g_return_if_fail (watch != NULL);
g_mutex_lock (&watch->mutex);
watch->max_bytes = bytes;
watch->max_messages = messages;
g_mutex_unlock (&watch->mutex);
GST_DEBUG ("set backlog to bytes %" G_GSIZE_FORMAT ", messages %u",
bytes, messages);
}
/**
* gst_rtsp_watch_get_send_backlog:
* @watch: a #GstRTSPWatch
* @bytes: (out) (allow-none): maximum bytes
* @messages: (out) (allow-none): maximum messages
*
* Get the maximum amount of bytes and messages that will be queued in @watch.
* See gst_rtsp_watch_set_send_backlog().
*
* Since: 1.1.1
*/
void
gst_rtsp_watch_get_send_backlog (GstRTSPWatch * watch,
gsize * bytes, guint * messages)
{
g_return_if_fail (watch != NULL);
g_mutex_lock (&watch->mutex);
if (bytes)
*bytes = watch->max_bytes;
if (messages)
*messages = watch->max_messages;
g_mutex_unlock (&watch->mutex);
}
/** /**
* gst_rtsp_watch_write_data: * gst_rtsp_watch_write_data:
* @watch: a #GstRTSPWatch * @watch: a #GstRTSPWatch
* @data: the data to queue * @data: (array length=size) (transfer full): the data to queue
* @size: the size of @data * @size: the size of @data
* @id: (out) (allow-none): location for a message ID or %NULL * @id: (out) (allow-none): location for a message ID or %NULL
* *
@ -3278,7 +3338,12 @@ gst_rtsp_watch_unref (GstRTSPWatch * watch)
* *
* This function will take ownership of @data and g_free() it after use. * This function will take ownership of @data and g_free() it after use.
* *
* Returns: #GST_RTSP_OK on success. * If the amount of queued data exceeds the limits set with
* gst_rtsp_watch_set_send_backlog(), this function will return
* #GST_RTSP_ENOMEM.
*
* Returns: #GST_RTSP_OK on success. #GST_RTSP_ENOMEM when the backlog limits
* are reached.
*/ */
GstRTSPResult GstRTSPResult
gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data, gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data,
@ -3308,6 +3373,12 @@ gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data,
} }
} }
/* check limits */
if ((watch->max_bytes != 0 && watch->messages_bytes >= watch->max_bytes) ||
(watch->max_messages != 0
&& watch->messages->length >= watch->max_messages))
goto too_much_backlog;
/* make a record with the data and id for sending async */ /* make a record with the data and id for sending async */
rec = g_slice_new (GstRTSPRec); rec = g_slice_new (GstRTSPRec);
if (off == 0) { if (off == 0) {
@ -3324,8 +3395,9 @@ gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data,
rec->id = ++watch->id; rec->id = ++watch->id;
} while (G_UNLIKELY (rec->id == 0)); } while (G_UNLIKELY (rec->id == 0));
/* add the record to a queue. FIXME we would like to have an upper limit here */ /* add the record to a queue. */
g_queue_push_head (watch->messages, rec); g_queue_push_head (watch->messages, rec);
watch->messages_bytes += rec->size;
/* make sure the main context will now also check for writability on the /* make sure the main context will now also check for writability on the
* socket */ * socket */
@ -3345,6 +3417,17 @@ done:
g_main_context_wakeup (context); g_main_context_wakeup (context);
return res; return res;
/* ERRORS */
too_much_backlog:
{
GST_WARNING ("too much backlog: max_bytes %" G_GSIZE_FORMAT ", current %"
G_GSIZE_FORMAT ", max_messages %u, current %u", watch->max_bytes,
watch->messages_bytes, watch->max_messages, watch->messages->length);
g_mutex_unlock (&watch->mutex);
g_free ((gpointer) data);
return GST_RTSP_ENOMEM;
}
} }
/** /**

View file

@ -187,6 +187,11 @@ void gst_rtsp_watch_unref (GstRTSPWatch *watch);
guint gst_rtsp_watch_attach (GstRTSPWatch *watch, guint gst_rtsp_watch_attach (GstRTSPWatch *watch,
GMainContext *context); GMainContext *context);
void gst_rtsp_watch_set_send_backlog (GstRTSPWatch *watch,
gsize bytes, guint messages);
void gst_rtsp_watch_get_send_backlog (GstRTSPWatch *watch,
gsize *bytes, guint *messages);
GstRTSPResult gst_rtsp_watch_write_data (GstRTSPWatch *watch, GstRTSPResult gst_rtsp_watch_write_data (GstRTSPWatch *watch,
const guint8 *data, const guint8 *data,
guint size, guint *id); guint size, guint *id);