diff --git a/docs/libs/gst-plugins-base-libs-sections.txt b/docs/libs/gst-plugins-base-libs-sections.txt index 4ddd44bf8d..e1df111901 100644 --- a/docs/libs/gst-plugins-base-libs-sections.txt +++ b/docs/libs/gst-plugins-base-libs-sections.txt @@ -1490,6 +1490,7 @@ gst_rtsp_watch_send_message gst_rtsp_watch_write_data gst_rtsp_watch_get_send_backlog gst_rtsp_watch_set_send_backlog +gst_rtsp_watch_wait_backlog
diff --git a/gst-libs/gst/rtsp/gstrtspconnection.c b/gst-libs/gst/rtsp/gstrtspconnection.c index f988c0d9a1..241e7818d4 100644 --- a/gst-libs/gst/rtsp/gstrtspconnection.c +++ b/gst-libs/gst/rtsp/gstrtspconnection.c @@ -2999,6 +2999,7 @@ struct _GstRTSPWatch guint write_id; gsize max_bytes; guint max_messages; + GCond queue_not_full; GstRTSPWatchFuncs funcs; @@ -3006,6 +3007,9 @@ struct _GstRTSPWatch GDestroyNotify notify; }; +#define IS_BACKLOG_FULL(w) (((w)->max_bytes != 0 && (w)->messages_bytes >= (w)->max_bytes) || \ + ((w)->max_messages != 0 && (w)->messages->length >= (w)->max_messages)) + static gboolean gst_rtsp_source_prepare (GSource * source, gint * timeout) { @@ -3283,6 +3287,9 @@ gst_rtsp_source_dispatch_write (GPollableOutputStream * stream, res = write_bytes (conn->output_stream, watch->write_data, &watch->write_off, watch->write_size, FALSE, conn->cancellable); + + if (!IS_BACKLOG_FULL (watch)) + g_cond_signal (&watch->queue_not_full); g_mutex_unlock (&watch->mutex); if (res == GST_RTSP_EINTR) @@ -3341,7 +3348,9 @@ gst_rtsp_source_finalize (GSource * source) g_queue_free (watch->messages); watch->messages = NULL; watch->messages_bytes = 0; + g_free (watch->write_data); + g_cond_clear (&watch->queue_not_full); if (watch->readsrc) g_source_unref (watch->readsrc); @@ -3402,6 +3411,7 @@ gst_rtsp_watch_new (GstRTSPConnection * conn, g_mutex_init (&result->mutex); result->messages = g_queue_new (); + g_cond_init (&result->queue_not_full); gst_rtsp_watch_reset (result); result->keep_running = TRUE; @@ -3522,6 +3532,8 @@ gst_rtsp_watch_set_send_backlog (GstRTSPWatch * watch, g_mutex_lock (&watch->mutex); watch->max_bytes = bytes; watch->max_messages = messages; + if (!IS_BACKLOG_FULL (watch)) + g_cond_signal (&watch->queue_not_full); g_mutex_unlock (&watch->mutex); GST_DEBUG ("set backlog to bytes %" G_GSIZE_FORMAT ", messages %u", @@ -3605,9 +3617,7 @@ gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data, } /* check limits */ - if ((watch->max_bytes != 0 && watch->messages_bytes >= watch->max_bytes) || - (watch->max_messages != 0 - && watch->messages->length >= watch->max_messages)) + if (IS_BACKLOG_FULL (watch)) goto too_much_backlog; /* make a record with the data and id for sending async */ @@ -3705,3 +3715,47 @@ gst_rtsp_watch_send_message (GstRTSPWatch * watch, GstRTSPMessage * message, return gst_rtsp_watch_write_data (watch, (guint8 *) g_string_free (str, FALSE), size, id); } + +/** + * gst_rtsp_watch_wait_backlog: + * @watch: a #GstRTSPWatch + * @timeout: a #GTimeVal timeout + * + * Wait until there is place in the backlog queue or @timeout is reached. + * + * If @timeout is #NULL this function can block forever. If @timeout + * contains a valid timeout, this function will return #GST_RTSP_ETIMEOUT + * after the timeout expired. + * + * The typically use of this function is when gst_rtsp_watch_write_data + * returns GST_RTSP_ENOMEM. The caller then calls this function to wait for + * free space in the backlog queue and try again. + * + * Returns: #GST_RTSP_OK when if there is room in queue. + * #GST_RTSP_ETIMEOUT when @timeout was reached. + * #GST_RTSP_EINVAL when called with invalid parameters. + * + * Since: 1.4 + */ +GstRTSPResult +gst_rtsp_watch_wait_backlog (GstRTSPWatch * watch, GTimeVal * timeout) +{ + gint64 end_time; + GstClockTime to; + + g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL); + + to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : 0; + end_time = g_get_monotonic_time () + GST_TIME_AS_USECONDS (to); + + g_mutex_lock (&watch->mutex); + while (IS_BACKLOG_FULL (watch)) { + if (!g_cond_wait_until (&watch->queue_not_full, &watch->mutex, end_time)) { + g_mutex_unlock (&watch->mutex); + return GST_RTSP_ETIMEOUT; + } + } + g_mutex_unlock (&watch->mutex); + + return GST_RTSP_OK; +} diff --git a/gst-libs/gst/rtsp/gstrtspconnection.h b/gst-libs/gst/rtsp/gstrtspconnection.h index 71686e0f44..2348da134c 100644 --- a/gst-libs/gst/rtsp/gstrtspconnection.h +++ b/gst-libs/gst/rtsp/gstrtspconnection.h @@ -214,6 +214,8 @@ GstRTSPResult gst_rtsp_watch_write_data (GstRTSPWatch *watch, GstRTSPResult gst_rtsp_watch_send_message (GstRTSPWatch *watch, GstRTSPMessage *message, guint *id); +GstRTSPResult gst_rtsp_watch_wait_backlog (GstRTSPWatch * watch, + GTimeVal *timeout); G_END_DECLS diff --git a/win32/common/libgstrtsp.def b/win32/common/libgstrtsp.def index 3cd3a36c93..9a0218de5c 100644 --- a/win32/common/libgstrtsp.def +++ b/win32/common/libgstrtsp.def @@ -119,4 +119,5 @@ EXPORTS gst_rtsp_watch_send_message gst_rtsp_watch_set_send_backlog gst_rtsp_watch_unref + gst_rtsp_watch_wait_backlog gst_rtsp_watch_write_data