diff --git a/gst-libs/gst/rtsp/gstrtspconnection.c b/gst-libs/gst/rtsp/gstrtspconnection.c index 76ae7d439e..419d543649 100644 --- a/gst-libs/gst/rtsp/gstrtspconnection.c +++ b/gst-libs/gst/rtsp/gstrtspconnection.c @@ -63,6 +63,7 @@ /* we include this here to get the G_OS_* defines */ #include #include +#include /* necessary for IP_TOS define */ #include @@ -3161,7 +3162,7 @@ struct _GstRTSPWatch /* queued message for transmission */ guint id; GMutex mutex; - GQueue *messages; + GstQueueArray *messages; gsize messages_bytes; guint8 *write_data; guint write_off; @@ -3179,7 +3180,7 @@ struct _GstRTSPWatch }; #define IS_BACKLOG_FULL(w) (((w)->max_bytes != 0 && (w)->messages_bytes >= (w)->max_bytes) || \ - ((w)->max_messages != 0 && (w)->messages->length >= (w)->max_messages)) + ((w)->max_messages != 0 && gst_queue_array_get_length((w)->messages) >= (w)->max_messages)) static gboolean gst_rtsp_source_prepare (GSource * source, gint * timeout) @@ -3431,11 +3432,11 @@ gst_rtsp_source_dispatch_write (GPollableOutputStream * stream, g_mutex_lock (&watch->mutex); do { if (watch->write_data == NULL) { - GstRTSPRec *rec; + GstRTSPRec *rec_ptr, rec; /* get a new message from the queue */ - rec = g_queue_pop_tail (watch->messages); - if (rec == NULL) { + rec_ptr = gst_queue_array_pop_head_struct (watch->messages); + if (rec_ptr == NULL) { if (watch->writesrc) { if (!g_source_is_destroyed ((GSource *) watch)) g_source_remove_child_source ((GSource *) watch, watch->writesrc); @@ -3462,14 +3463,13 @@ gst_rtsp_source_dispatch_write (GPollableOutputStream * stream, break; } - watch->messages_bytes -= rec->size; + rec = *rec_ptr; + watch->messages_bytes -= rec.size; watch->write_off = 0; - watch->write_data = rec->data; - watch->write_size = rec->size; - watch->write_id = rec->id; - - g_slice_free (GstRTSPRec, rec); + watch->write_data = rec.data; + watch->write_size = rec.size; + watch->write_id = rec.id; } res = write_bytes (conn->output_stream, watch->write_data, @@ -3515,18 +3515,18 @@ write_error: } static void -gst_rtsp_rec_free (gpointer data) +gst_rtsp_rec_clear (gpointer data) { GstRTSPRec *rec = data; g_free (rec->data); - g_slice_free (GstRTSPRec, rec); } static void gst_rtsp_source_finalize (GSource * source) { GstRTSPWatch *watch = (GstRTSPWatch *) source; + GstRTSPRec *rec; if (watch->notify) watch->notify (watch->user_data); @@ -3534,8 +3534,9 @@ gst_rtsp_source_finalize (GSource * source) build_reset (&watch->builder); gst_rtsp_message_unset (&watch->message); - g_queue_foreach (watch->messages, (GFunc) gst_rtsp_rec_free, NULL); - g_queue_free (watch->messages); + while ((rec = gst_queue_array_pop_head_struct (watch->messages))) + gst_rtsp_rec_clear (rec); + gst_queue_array_free (watch->messages); watch->messages = NULL; watch->messages_bytes = 0; @@ -3597,7 +3598,7 @@ gst_rtsp_watch_new (GstRTSPConnection * conn, result->builder.state = STATE_START; g_mutex_init (&result->mutex); - result->messages = g_queue_new (); + result->messages = gst_queue_array_new_for_struct (sizeof (GstRTSPRec), 10); g_cond_init (&result->queue_not_full); gst_rtsp_watch_reset (result); @@ -3783,7 +3784,7 @@ gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data, guint size, guint * id) { GstRTSPResult res; - GstRTSPRec *rec; + GstRTSPRec rec; guint off = 0; GMainContext *context = NULL; @@ -3796,10 +3797,11 @@ gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data, goto flushing; /* try to send the message synchronously first */ - if (watch->messages->length == 0 && watch->write_data == NULL) { + if (gst_queue_array_get_length (watch->messages) == 0 + && watch->write_data == NULL) { res = - write_bytes (watch->conn->output_stream, data, &off, size, - FALSE, watch->conn->cancellable); + write_bytes (watch->conn->output_stream, data, &off, size, FALSE, + watch->conn->cancellable); if (res != GST_RTSP_EINTR) { if (id != NULL) *id = 0; @@ -3813,24 +3815,24 @@ gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data, goto too_much_backlog; /* make a record with the data and id for sending async */ - rec = g_slice_new (GstRTSPRec); + memset (&rec, 0, sizeof (rec)); if (off == 0) { - rec->data = (guint8 *) data; - rec->size = size; + rec.data = (guint8 *) data; + rec.size = size; } else { - rec->data = g_memdup (data + off, size - off); - rec->size = size - off; + rec.data = g_memdup (data + off, size - off); + rec.size = size - off; g_free ((gpointer) data); } do { /* make sure rec->id is never 0 */ - rec->id = ++watch->id; - } while (G_UNLIKELY (rec->id == 0)); + rec.id = ++watch->id; + } while (G_UNLIKELY (rec.id == 0)); /* add the record to a queue. */ - g_queue_push_head (watch->messages, rec); - watch->messages_bytes += rec->size; + gst_queue_array_push_tail_struct (watch->messages, &rec); + watch->messages_bytes += rec.size; /* make sure the main context will now also check for writability on the * socket */ @@ -3853,7 +3855,7 @@ gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data, } if (id != NULL) - *id = rec->id; + *id = rec.id; res = GST_RTSP_OK; done: @@ -3876,7 +3878,8 @@ too_much_backlog: { GST_WARNING ("too much backlog: max_bytes %" G_GSIZE_FORMAT ", current %" G_GSIZE_FORMAT ", max_messages %u, current %u", watch->max_bytes, - watch->messages_bytes, watch->max_messages, watch->messages->length); + watch->messages_bytes, watch->max_messages, + gst_queue_array_get_length (watch->messages)); g_mutex_unlock (&watch->mutex); g_free ((gpointer) data); return GST_RTSP_ENOMEM; @@ -4002,8 +4005,10 @@ gst_rtsp_watch_set_flushing (GstRTSPWatch * watch, gboolean flushing) watch->flushing = flushing; g_cond_signal (&watch->queue_not_full); if (flushing) { - g_queue_foreach (watch->messages, (GFunc) gst_rtsp_rec_free, NULL); - g_queue_clear (watch->messages); + GstRTSPRec *rec; + + while ((rec = gst_queue_array_pop_head_struct (watch->messages))) + gst_rtsp_rec_clear (rec); } g_mutex_unlock (&watch->mutex); }