stream: add locking

This commit is contained in:
Wim Taymans 2012-11-13 11:14:49 +01:00
parent c7d20e5603
commit 4753588b09
2 changed files with 66 additions and 10 deletions

View file

@ -57,8 +57,9 @@ gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
}
static void
gst_rtsp_stream_init (GstRTSPStream * media)
gst_rtsp_stream_init (GstRTSPStream * stream)
{
g_mutex_init (&stream->lock);
}
static void
@ -73,6 +74,7 @@ gst_rtsp_stream_finalize (GObject * obj)
gst_object_unref (stream->payloader);
gst_object_unref (stream->srcpad);
g_mutex_clear (&stream->lock);
G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
}
@ -140,6 +142,7 @@ gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
return mtu;
}
/* must be called with lock */
static gboolean
alloc_ports (GstRTSPStream * stream)
{
@ -336,14 +339,16 @@ caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
newcaps = gst_pad_get_current_caps (pad);
GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
newcaps);
g_mutex_lock (&stream->lock);
oldcaps = stream->caps;
stream->caps = newcaps;
g_mutex_unlock (&stream->lock);
if (oldcaps)
gst_caps_unref (oldcaps);
GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
newcaps);
}
static void
@ -375,6 +380,7 @@ find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
port = atoi (tmp + 1);
dest = g_strndup (rtcp_from, tmp - rtcp_from);
g_mutex_lock (&stream->lock);
GST_INFO ("finding %s:%d in %d transports", dest, port,
g_list_length (stream->transports));
@ -391,6 +397,8 @@ find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
break;
}
}
g_mutex_unlock (&stream->lock);
g_free (dest);
return result;
@ -518,6 +526,7 @@ handle_new_sample (GstAppSink * sink, gpointer user_data)
stream = (GstRTSPStream *) user_data;
buffer = gst_sample_get_buffer (sample);
g_mutex_lock (&stream->lock);
for (walk = stream->transports; walk; walk = g_list_next (walk)) {
GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
@ -527,6 +536,8 @@ handle_new_sample (GstAppSink * sink, gpointer user_data)
gst_rtsp_stream_transport_send_rtcp (tr, buffer);
}
}
g_mutex_unlock (&stream->lock);
gst_sample_unref (sample);
return GST_FLOW_OK;
@ -565,8 +576,9 @@ gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
g_mutex_lock (&stream->lock);
if (stream->is_joined)
return TRUE;
goto was_joined;
/* create a session with the same index as the stream */
idx = stream->idx;
@ -736,12 +748,19 @@ gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
(GCallback) caps_notify, stream);
stream->is_joined = TRUE;
g_mutex_unlock (&stream->lock);
return TRUE;
/* ERRORS */
was_joined:
{
g_mutex_unlock (&stream->lock);
return TRUE;
}
no_ports:
{
g_mutex_unlock (&stream->lock);
GST_WARNING ("failed to allocate ports %d", idx);
return FALSE;
}
@ -750,6 +769,7 @@ link_failed:
GST_WARNING ("failed to link stream %d", idx);
gst_object_unref (stream->send_rtp_sink);
stream->send_rtp_sink = NULL;
g_mutex_unlock (&stream->lock);
return FALSE;
}
}
@ -775,8 +795,9 @@ gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
g_mutex_lock (&stream->lock);
if (!stream->is_joined)
return TRUE;
goto was_not_joined;
/* all transports must be removed by now */
g_return_val_if_fail (stream->transports == NULL, FALSE);
@ -828,8 +849,14 @@ gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
gst_caps_unref (stream->caps);
stream->is_joined = FALSE;
g_mutex_unlock (&stream->lock);
return TRUE;
was_not_joined:
{
return TRUE;
}
}
/**
@ -876,12 +903,19 @@ GstFlowReturn
gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
{
GstFlowReturn ret;
GstElement *element;
g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
g_return_val_if_fail (stream->is_joined, FALSE);
ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[0]), buffer);
g_mutex_lock (&stream->lock);
element = gst_object_ref (stream->appsrc[0]);
g_mutex_unlock (&stream->lock);
ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
gst_object_unref (element);
return ret;
}
@ -902,16 +936,24 @@ GstFlowReturn
gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
{
GstFlowReturn ret;
GstElement *element;
g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
g_return_val_if_fail (stream->is_joined, FALSE);
ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[1]), buffer);
g_mutex_lock (&stream->lock);
element = gst_object_ref (stream->appsrc[1]);
g_mutex_unlock (&stream->lock);
ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
gst_object_unref (element);
return ret;
}
/* must be called with lock */
static gboolean
update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
gboolean add)
@ -1002,12 +1044,18 @@ gboolean
gst_rtsp_stream_add_transport (GstRTSPStream * stream,
GstRTSPStreamTransport * trans)
{
gboolean res;
g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
g_return_val_if_fail (stream->is_joined, FALSE);
g_return_val_if_fail (trans->transport != NULL, FALSE);
return update_transport (stream, trans, TRUE);
g_mutex_lock (&stream->lock);
res = update_transport (stream, trans, TRUE);
g_mutex_unlock (&stream->lock);
return res;
}
/**
@ -1028,10 +1076,16 @@ gboolean
gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
GstRTSPStreamTransport * trans)
{
gboolean res;
g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
g_return_val_if_fail (stream->is_joined, FALSE);
g_return_val_if_fail (trans->transport != NULL, FALSE);
return update_transport (stream, trans, FALSE);
g_mutex_lock (&stream->lock);
res = update_transport (stream, trans, FALSE);
g_mutex_unlock (&stream->lock);
return res;
}

View file

@ -44,6 +44,7 @@ typedef struct _GstRTSPStreamClass GstRTSPStreamClass;
/**
* GstRTSPStream:
* @parent: the parent instance
* @lock: mutex protecting the stream
* @idx: the stream index
* @srcpad: the srcpad of the stream
* @payloader: the payloader of the format
@ -72,6 +73,7 @@ typedef struct _GstRTSPStreamClass GstRTSPStreamClass;
struct _GstRTSPStream {
GObject parent;
GMutex lock;
guint idx;
GstPad *srcpad;
GstElement *payloader;