rtspconnection: Use GstQueueArray instead of GQueue for the queued messages

This commit is contained in:
Sebastian Dröge 2018-06-28 10:19:19 +02:00
parent 1c8bf44dea
commit d4f607ef40

View file

@ -63,6 +63,7 @@
/* we include this here to get the G_OS_* defines */ /* we include this here to get the G_OS_* defines */
#include <glib.h> #include <glib.h>
#include <gst/gst.h> #include <gst/gst.h>
#include <gst/base/base.h>
/* necessary for IP_TOS define */ /* necessary for IP_TOS define */
#include <gio/gnetworking.h> #include <gio/gnetworking.h>
@ -3161,7 +3162,7 @@ struct _GstRTSPWatch
/* queued message for transmission */ /* queued message for transmission */
guint id; guint id;
GMutex mutex; GMutex mutex;
GQueue *messages; GstQueueArray *messages;
gsize messages_bytes; gsize messages_bytes;
guint8 *write_data; guint8 *write_data;
guint write_off; guint write_off;
@ -3179,7 +3180,7 @@ struct _GstRTSPWatch
}; };
#define IS_BACKLOG_FULL(w) (((w)->max_bytes != 0 && (w)->messages_bytes >= (w)->max_bytes) || \ #define IS_BACKLOG_FULL(w) (((w)->max_bytes != 0 && (w)->messages_bytes >= (w)->max_bytes) || \
((w)->max_messages != 0 && (w)->messages->length >= (w)->max_messages)) ((w)->max_messages != 0 && gst_queue_array_get_length((w)->messages) >= (w)->max_messages))
static gboolean static gboolean
gst_rtsp_source_prepare (GSource * source, gint * timeout) gst_rtsp_source_prepare (GSource * source, gint * timeout)
@ -3431,11 +3432,11 @@ gst_rtsp_source_dispatch_write (GPollableOutputStream * stream,
g_mutex_lock (&watch->mutex); g_mutex_lock (&watch->mutex);
do { do {
if (watch->write_data == NULL) { if (watch->write_data == NULL) {
GstRTSPRec *rec; GstRTSPRec *rec_ptr, rec;
/* get a new message from the queue */ /* get a new message from the queue */
rec = g_queue_pop_tail (watch->messages); rec_ptr = gst_queue_array_pop_head_struct (watch->messages);
if (rec == NULL) { if (rec_ptr == NULL) {
if (watch->writesrc) { if (watch->writesrc) {
if (!g_source_is_destroyed ((GSource *) watch)) if (!g_source_is_destroyed ((GSource *) watch))
g_source_remove_child_source ((GSource *) watch, watch->writesrc); g_source_remove_child_source ((GSource *) watch, watch->writesrc);
@ -3462,14 +3463,13 @@ gst_rtsp_source_dispatch_write (GPollableOutputStream * stream,
break; break;
} }
watch->messages_bytes -= rec->size; rec = *rec_ptr;
watch->messages_bytes -= rec.size;
watch->write_off = 0; watch->write_off = 0;
watch->write_data = rec->data; watch->write_data = rec.data;
watch->write_size = rec->size; watch->write_size = rec.size;
watch->write_id = rec->id; watch->write_id = rec.id;
g_slice_free (GstRTSPRec, rec);
} }
res = write_bytes (conn->output_stream, watch->write_data, res = write_bytes (conn->output_stream, watch->write_data,
@ -3515,18 +3515,18 @@ write_error:
} }
static void static void
gst_rtsp_rec_free (gpointer data) gst_rtsp_rec_clear (gpointer data)
{ {
GstRTSPRec *rec = data; GstRTSPRec *rec = data;
g_free (rec->data); g_free (rec->data);
g_slice_free (GstRTSPRec, rec);
} }
static void static void
gst_rtsp_source_finalize (GSource * source) gst_rtsp_source_finalize (GSource * source)
{ {
GstRTSPWatch *watch = (GstRTSPWatch *) source; GstRTSPWatch *watch = (GstRTSPWatch *) source;
GstRTSPRec *rec;
if (watch->notify) if (watch->notify)
watch->notify (watch->user_data); watch->notify (watch->user_data);
@ -3534,8 +3534,9 @@ gst_rtsp_source_finalize (GSource * source)
build_reset (&watch->builder); build_reset (&watch->builder);
gst_rtsp_message_unset (&watch->message); gst_rtsp_message_unset (&watch->message);
g_queue_foreach (watch->messages, (GFunc) gst_rtsp_rec_free, NULL); while ((rec = gst_queue_array_pop_head_struct (watch->messages)))
g_queue_free (watch->messages); gst_rtsp_rec_clear (rec);
gst_queue_array_free (watch->messages);
watch->messages = NULL; watch->messages = NULL;
watch->messages_bytes = 0; watch->messages_bytes = 0;
@ -3597,7 +3598,7 @@ gst_rtsp_watch_new (GstRTSPConnection * conn,
result->builder.state = STATE_START; result->builder.state = STATE_START;
g_mutex_init (&result->mutex); g_mutex_init (&result->mutex);
result->messages = g_queue_new (); result->messages = gst_queue_array_new_for_struct (sizeof (GstRTSPRec), 10);
g_cond_init (&result->queue_not_full); g_cond_init (&result->queue_not_full);
gst_rtsp_watch_reset (result); gst_rtsp_watch_reset (result);
@ -3783,7 +3784,7 @@ gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data,
guint size, guint * id) guint size, guint * id)
{ {
GstRTSPResult res; GstRTSPResult res;
GstRTSPRec *rec; GstRTSPRec rec;
guint off = 0; guint off = 0;
GMainContext *context = NULL; GMainContext *context = NULL;
@ -3796,10 +3797,11 @@ gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data,
goto flushing; goto flushing;
/* try to send the message synchronously first */ /* try to send the message synchronously first */
if (watch->messages->length == 0 && watch->write_data == NULL) { if (gst_queue_array_get_length (watch->messages) == 0
&& watch->write_data == NULL) {
res = res =
write_bytes (watch->conn->output_stream, data, &off, size, write_bytes (watch->conn->output_stream, data, &off, size, FALSE,
FALSE, watch->conn->cancellable); watch->conn->cancellable);
if (res != GST_RTSP_EINTR) { if (res != GST_RTSP_EINTR) {
if (id != NULL) if (id != NULL)
*id = 0; *id = 0;
@ -3813,24 +3815,24 @@ gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data,
goto too_much_backlog; goto too_much_backlog;
/* make a record with the data and id for sending async */ /* make a record with the data and id for sending async */
rec = g_slice_new (GstRTSPRec); memset (&rec, 0, sizeof (rec));
if (off == 0) { if (off == 0) {
rec->data = (guint8 *) data; rec.data = (guint8 *) data;
rec->size = size; rec.size = size;
} else { } else {
rec->data = g_memdup (data + off, size - off); rec.data = g_memdup (data + off, size - off);
rec->size = size - off; rec.size = size - off;
g_free ((gpointer) data); g_free ((gpointer) data);
} }
do { do {
/* make sure rec->id is never 0 */ /* make sure rec->id is never 0 */
rec->id = ++watch->id; rec.id = ++watch->id;
} while (G_UNLIKELY (rec->id == 0)); } while (G_UNLIKELY (rec.id == 0));
/* add the record to a queue. */ /* add the record to a queue. */
g_queue_push_head (watch->messages, rec); gst_queue_array_push_tail_struct (watch->messages, &rec);
watch->messages_bytes += rec->size; watch->messages_bytes += rec.size;
/* make sure the main context will now also check for writability on the /* make sure the main context will now also check for writability on the
* socket */ * socket */
@ -3853,7 +3855,7 @@ gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data,
} }
if (id != NULL) if (id != NULL)
*id = rec->id; *id = rec.id;
res = GST_RTSP_OK; res = GST_RTSP_OK;
done: done:
@ -3876,7 +3878,8 @@ too_much_backlog:
{ {
GST_WARNING ("too much backlog: max_bytes %" G_GSIZE_FORMAT ", current %" GST_WARNING ("too much backlog: max_bytes %" G_GSIZE_FORMAT ", current %"
G_GSIZE_FORMAT ", max_messages %u, current %u", watch->max_bytes, G_GSIZE_FORMAT ", max_messages %u, current %u", watch->max_bytes,
watch->messages_bytes, watch->max_messages, watch->messages->length); watch->messages_bytes, watch->max_messages,
gst_queue_array_get_length (watch->messages));
g_mutex_unlock (&watch->mutex); g_mutex_unlock (&watch->mutex);
g_free ((gpointer) data); g_free ((gpointer) data);
return GST_RTSP_ENOMEM; return GST_RTSP_ENOMEM;
@ -4002,8 +4005,10 @@ gst_rtsp_watch_set_flushing (GstRTSPWatch * watch, gboolean flushing)
watch->flushing = flushing; watch->flushing = flushing;
g_cond_signal (&watch->queue_not_full); g_cond_signal (&watch->queue_not_full);
if (flushing) { if (flushing) {
g_queue_foreach (watch->messages, (GFunc) gst_rtsp_rec_free, NULL); GstRTSPRec *rec;
g_queue_clear (watch->messages);
while ((rec = gst_queue_array_pop_head_struct (watch->messages)))
gst_rtsp_rec_clear (rec);
} }
g_mutex_unlock (&watch->mutex); g_mutex_unlock (&watch->mutex);
} }