diff --git a/gst-libs/gst/rtsp/gstrtspconnection.c b/gst-libs/gst/rtsp/gstrtspconnection.c index f501974cdc..aed3b26706 100644 --- a/gst-libs/gst/rtsp/gstrtspconnection.c +++ b/gst-libs/gst/rtsp/gstrtspconnection.c @@ -2873,10 +2873,13 @@ struct _GstRTSPWatch guint id; GMutex mutex; GQueue *messages; + gsize messages_bytes; guint8 *write_data; guint write_off; guint write_size; guint write_id; + gsize max_bytes; + guint max_messages; GstRTSPWatchFuncs funcs; @@ -3029,6 +3032,8 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED, if (rec == NULL) break; + watch->messages_bytes -= rec->size; + watch->write_off = 0; watch->write_data = rec->data; watch->write_size = rec->size; @@ -3134,6 +3139,7 @@ gst_rtsp_source_finalize (GSource * source) g_queue_foreach (watch->messages, (GFunc) gst_rtsp_rec_free, NULL); g_queue_free (watch->messages); watch->messages = NULL; + watch->messages_bytes = 0; g_free (watch->write_data); g_mutex_clear (&watch->mutex); @@ -3262,10 +3268,64 @@ gst_rtsp_watch_unref (GstRTSPWatch * watch) g_source_unref ((GSource *) watch); } +/** + * gst_rtsp_watch_set_send_backlog: + * @watch: a #GstRTSPWatch + * @bytes: maximum bytes + * @messages: maximum messages + * + * Set the maximum amount of bytes and messages that will be queued in @watch. + * When the maximum amounts are exceeded, gst_rtsp_watch_write_data() and + * gst_rtsp_watch_send_message() will return #GST_RTSP_ENOMEM. + * + * A value of 0 for @bytes or @messages means no limits. + * + * Since: 1.1.1 + */ +void +gst_rtsp_watch_set_send_backlog (GstRTSPWatch * watch, + gsize bytes, guint messages) +{ + g_return_if_fail (watch != NULL); + + g_mutex_lock (&watch->mutex); + watch->max_bytes = bytes; + watch->max_messages = messages; + g_mutex_unlock (&watch->mutex); + + GST_DEBUG ("set backlog to bytes %" G_GSIZE_FORMAT ", messages %u", + bytes, messages); +} + +/** + * gst_rtsp_watch_get_send_backlog: + * @watch: a #GstRTSPWatch + * @bytes: (out) (allow-none): maximum bytes + * @messages: (out) (allow-none): maximum messages + * + * Get the maximum amount of bytes and messages that will be queued in @watch. + * See gst_rtsp_watch_set_send_backlog(). + * + * Since: 1.1.1 + */ +void +gst_rtsp_watch_get_send_backlog (GstRTSPWatch * watch, + gsize * bytes, guint * messages) +{ + g_return_if_fail (watch != NULL); + + g_mutex_lock (&watch->mutex); + if (bytes) + *bytes = watch->max_bytes; + if (messages) + *messages = watch->max_messages; + g_mutex_unlock (&watch->mutex); +} + /** * gst_rtsp_watch_write_data: * @watch: a #GstRTSPWatch - * @data: the data to queue + * @data: (array length=size) (transfer full): the data to queue * @size: the size of @data * @id: (out) (allow-none): location for a message ID or %NULL * @@ -3278,7 +3338,12 @@ gst_rtsp_watch_unref (GstRTSPWatch * watch) * * This function will take ownership of @data and g_free() it after use. * - * Returns: #GST_RTSP_OK on success. + * If the amount of queued data exceeds the limits set with + * gst_rtsp_watch_set_send_backlog(), this function will return + * #GST_RTSP_ENOMEM. + * + * Returns: #GST_RTSP_OK on success. #GST_RTSP_ENOMEM when the backlog limits + * are reached. */ GstRTSPResult gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data, @@ -3308,6 +3373,12 @@ gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data, } } + /* check limits */ + if ((watch->max_bytes != 0 && watch->messages_bytes >= watch->max_bytes) || + (watch->max_messages != 0 + && watch->messages->length >= watch->max_messages)) + goto too_much_backlog; + /* make a record with the data and id for sending async */ rec = g_slice_new (GstRTSPRec); if (off == 0) { @@ -3324,8 +3395,9 @@ gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data, rec->id = ++watch->id; } while (G_UNLIKELY (rec->id == 0)); - /* add the record to a queue. FIXME we would like to have an upper limit here */ + /* add the record to a queue. */ g_queue_push_head (watch->messages, rec); + watch->messages_bytes += rec->size; /* make sure the main context will now also check for writability on the * socket */ @@ -3345,6 +3417,17 @@ done: g_main_context_wakeup (context); return res; + + /* ERRORS */ +too_much_backlog: + { + GST_WARNING ("too much backlog: max_bytes %" G_GSIZE_FORMAT ", current %" + G_GSIZE_FORMAT ", max_messages %u, current %u", watch->max_bytes, + watch->messages_bytes, watch->max_messages, watch->messages->length); + g_mutex_unlock (&watch->mutex); + g_free ((gpointer) data); + return GST_RTSP_ENOMEM; + } } /** diff --git a/gst-libs/gst/rtsp/gstrtspconnection.h b/gst-libs/gst/rtsp/gstrtspconnection.h index 935e2d1998..ab81de9fc0 100644 --- a/gst-libs/gst/rtsp/gstrtspconnection.h +++ b/gst-libs/gst/rtsp/gstrtspconnection.h @@ -187,6 +187,11 @@ void gst_rtsp_watch_unref (GstRTSPWatch *watch); guint gst_rtsp_watch_attach (GstRTSPWatch *watch, GMainContext *context); +void gst_rtsp_watch_set_send_backlog (GstRTSPWatch *watch, + gsize bytes, guint messages); +void gst_rtsp_watch_get_send_backlog (GstRTSPWatch *watch, + gsize *bytes, guint *messages); + GstRTSPResult gst_rtsp_watch_write_data (GstRTSPWatch *watch, const guint8 *data, guint size, guint *id);