rtsp: Made gst_rtsp_watch_queue_data() thread safe.

This commit is contained in:
Peter Kjellerstedt 2009-08-17 11:46:32 +02:00
parent fb3b761af5
commit 0af04aa4a8

View file

@ -2963,7 +2963,8 @@ struct _GstRTSPWatch
/* queued message for transmission */ /* queued message for transmission */
guint id; guint id;
GAsyncQueue *messages; GMutex *mutex;
GQueue *messages;
guint8 *write_data; guint8 *write_data;
guint write_off; guint write_off;
guint write_size; guint write_size;
@ -3087,12 +3088,13 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED,
} }
if (watch->writefd.revents & WRITE_COND) { if (watch->writefd.revents & WRITE_COND) {
g_mutex_lock (watch->mutex);
do { do {
if (watch->write_data == NULL) { if (watch->write_data == NULL) {
GstRTSPRec *rec; GstRTSPRec *rec;
/* get a new message from the queue */ /* get a new message from the queue */
rec = g_async_queue_try_pop (watch->messages); rec = g_queue_pop_tail (watch->messages);
if (rec == NULL) if (rec == NULL)
goto done; goto done;
@ -3106,6 +3108,7 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED,
res = write_bytes (watch->writefd.fd, watch->write_data, res = write_bytes (watch->writefd.fd, watch->write_data,
&watch->write_off, watch->write_size); &watch->write_off, watch->write_size);
g_mutex_unlock (watch->mutex);
if (res == GST_RTSP_EINTR) if (res == GST_RTSP_EINTR)
goto write_blocked; goto write_blocked;
else if (G_LIKELY (res == GST_RTSP_OK)) { else if (G_LIKELY (res == GST_RTSP_OK)) {
@ -3118,6 +3121,7 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED,
else else
goto error; goto error;
} }
g_mutex_lock (watch->mutex);
g_free (watch->write_data); g_free (watch->write_data);
watch->write_data = NULL; watch->write_data = NULL;
@ -3129,6 +3133,8 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED,
watch->write_added = FALSE; watch->write_added = FALSE;
watch->writefd.revents = 0; watch->writefd.revents = 0;
} }
g_mutex_unlock (watch->mutex);
} }
write_blocked: write_blocked:
@ -3166,9 +3172,12 @@ gst_rtsp_source_finalize (GSource * source)
build_reset (&watch->builder); build_reset (&watch->builder);
gst_rtsp_message_unset (&watch->message); gst_rtsp_message_unset (&watch->message);
g_async_queue_unref (watch->messages); g_queue_foreach (watch->messages, (GFunc) gst_rtsp_rec_free, NULL);
g_queue_free (watch->messages);
watch->messages = NULL; watch->messages = NULL;
g_mutex_free (watch->mutex);
g_free (watch->write_data); g_free (watch->write_data);
if (watch->notify) if (watch->notify)
@ -3221,7 +3230,8 @@ gst_rtsp_watch_new (GstRTSPConnection * conn,
result->conn = conn; result->conn = conn;
result->builder.state = STATE_START; result->builder.state = STATE_START;
result->messages = g_async_queue_new_full (gst_rtsp_rec_free); result->mutex = g_mutex_new ();
result->messages = g_queue_new ();
result->readfd.fd = -1; result->readfd.fd = -1;
result->writefd.fd = -1; result->writefd.fd = -1;
@ -3332,6 +3342,8 @@ gst_rtsp_watch_queue_data (GstRTSPWatch * watch, const guint8 * data,
g_return_val_if_fail (data != NULL, GST_RTSP_EINVAL); g_return_val_if_fail (data != NULL, GST_RTSP_EINVAL);
g_return_val_if_fail (size != 0, GST_RTSP_EINVAL); g_return_val_if_fail (size != 0, GST_RTSP_EINVAL);
g_mutex_lock (watch->mutex);
/* make a record with the data and id */ /* make a record with the data and id */
rec = g_slice_new (GstRTSPRec); rec = g_slice_new (GstRTSPRec);
rec->data = (guint8 *) data; rec->data = (guint8 *) data;
@ -3342,10 +3354,8 @@ gst_rtsp_watch_queue_data (GstRTSPWatch * watch, const guint8 * data,
} while (G_UNLIKELY (rec->id == 0)); } while (G_UNLIKELY (rec->id == 0));
/* add the record to a queue. FIXME we would like to have an upper limit here */ /* add the record to a queue. FIXME we would like to have an upper limit here */
g_async_queue_push (watch->messages, rec); g_queue_push_head (watch->messages, rec);
/* FIXME: does the following need to be made thread-safe? (this might be
* called from a streaming thread, like appsink's render function) */
/* make sure the main context will now also check for writability on the /* make sure the main context will now also check for writability on the
* socket */ * socket */
if (!watch->write_added) { if (!watch->write_added) {
@ -3353,6 +3363,7 @@ gst_rtsp_watch_queue_data (GstRTSPWatch * watch, const guint8 * data,
watch->write_added = TRUE; watch->write_added = TRUE;
} }
g_mutex_unlock (watch->mutex);
return rec->id; return rec->id;
} }