media: add lock to protect state changes

This commit is contained in:
Wim Taymans 2012-11-13 11:15:35 +01:00
parent 4753588b09
commit 9a97de88ea
2 changed files with 138 additions and 27 deletions

View file

@ -156,6 +156,7 @@ gst_rtsp_media_init (GstRTSPMedia * media)
media->streams = g_ptr_array_new_with_free_func (g_object_unref);
g_mutex_init (&media->lock);
g_cond_init (&media->cond);
g_rec_mutex_init (&media->state_lock);
media->shared = DEFAULT_SHARED;
media->reusable = DEFAULT_REUSABLE;
@ -187,6 +188,7 @@ gst_rtsp_media_finalize (GObject * obj)
g_free (media->multicast_group);
g_mutex_clear (&media->lock);
g_cond_clear (&media->cond);
g_rec_mutex_clear (&media->state_lock);
G_OBJECT_CLASS (gst_rtsp_media_parent_class)->finalize (obj);
}
@ -267,6 +269,7 @@ do_loop (GstRTSPMediaClass * klass)
return NULL;
}
/* must be called with state lock */
static void
collect_media_stats (GstRTSPMedia * media)
{
@ -349,7 +352,9 @@ gst_rtsp_media_set_shared (GstRTSPMedia * media, gboolean shared)
{
g_return_if_fail (GST_IS_RTSP_MEDIA (media));
g_mutex_lock (&media->lock);
media->shared = shared;
g_mutex_unlock (&media->lock);
}
/**
@ -363,9 +368,15 @@ gst_rtsp_media_set_shared (GstRTSPMedia * media, gboolean shared)
gboolean
gst_rtsp_media_is_shared (GstRTSPMedia * media)
{
gboolean res;
g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
return media->shared;
g_mutex_lock (&media->lock);
res = media->shared;
g_mutex_unlock (&media->lock);
return res;
}
/**
@ -381,7 +392,9 @@ gst_rtsp_media_set_reusable (GstRTSPMedia * media, gboolean reusable)
{
g_return_if_fail (GST_IS_RTSP_MEDIA (media));
g_mutex_lock (&media->lock);
media->reusable = reusable;
g_mutex_unlock (&media->lock);
}
/**
@ -395,9 +408,15 @@ gst_rtsp_media_set_reusable (GstRTSPMedia * media, gboolean reusable)
gboolean
gst_rtsp_media_is_reusable (GstRTSPMedia * media)
{
gboolean res;
g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
return media->reusable;
g_mutex_lock (&media->lock);
res = media->reusable;
g_mutex_unlock (&media->lock);
return res;
}
/**
@ -412,7 +431,9 @@ gst_rtsp_media_set_protocols (GstRTSPMedia * media, GstRTSPLowerTrans protocols)
{
g_return_if_fail (GST_IS_RTSP_MEDIA (media));
g_mutex_lock (&media->lock);
media->protocols = protocols;
g_mutex_unlock (&media->lock);
}
/**
@ -426,10 +447,16 @@ gst_rtsp_media_set_protocols (GstRTSPMedia * media, GstRTSPLowerTrans protocols)
GstRTSPLowerTrans
gst_rtsp_media_get_protocols (GstRTSPMedia * media)
{
GstRTSPLowerTrans res;
g_return_val_if_fail (GST_IS_RTSP_MEDIA (media),
GST_RTSP_LOWER_TRANS_UNKNOWN);
return media->protocols;
g_mutex_lock (&media->lock);
res = media->protocols;
g_mutex_unlock (&media->lock);
return res;
}
/**
@ -445,7 +472,9 @@ gst_rtsp_media_set_eos_shutdown (GstRTSPMedia * media, gboolean eos_shutdown)
{
g_return_if_fail (GST_IS_RTSP_MEDIA (media));
g_mutex_lock (&media->lock);
media->eos_shutdown = eos_shutdown;
g_mutex_unlock (&media->lock);
}
/**
@ -460,9 +489,15 @@ gst_rtsp_media_set_eos_shutdown (GstRTSPMedia * media, gboolean eos_shutdown)
gboolean
gst_rtsp_media_is_eos_shutdown (GstRTSPMedia * media)
{
gboolean res;
g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
return media->eos_shutdown;
g_mutex_lock (&media->lock);
res = media->eos_shutdown;
g_mutex_unlock (&media->lock);
return res;
}
/**
@ -477,7 +512,9 @@ gst_rtsp_media_set_buffer_size (GstRTSPMedia * media, guint size)
{
g_return_if_fail (GST_IS_RTSP_MEDIA (media));
g_mutex_lock (&media->lock);
media->buffer_size = size;
g_mutex_unlock (&media->lock);
}
/**
@ -491,9 +528,15 @@ gst_rtsp_media_set_buffer_size (GstRTSPMedia * media, guint size)
guint
gst_rtsp_media_get_buffer_size (GstRTSPMedia * media)
{
guint res;
g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
return media->buffer_size;
g_mutex_unlock (&media->lock);
res = media->buffer_size;
g_mutex_unlock (&media->lock);
return res;
}
/**
@ -516,11 +559,11 @@ gst_rtsp_media_set_multicast_group (GstRTSPMedia * media, const gchar * mc)
/**
* gst_rtsp_media_get_multicast_group:
* @media: a #GstRTSPMedia
* @media: (transfer full): a #GstRTSPMedia
*
* Get the multicast group that media from @media will be streamed to.
*
* Returns: the multicast group
* Returns: the multicast group, g_free after usage.
*/
gchar *
gst_rtsp_media_get_multicast_group (GstRTSPMedia * media)
@ -550,15 +593,15 @@ gst_rtsp_media_set_auth (GstRTSPMedia * media, GstRTSPAuth * auth)
g_return_if_fail (GST_IS_RTSP_MEDIA (media));
old = media->auth;
g_mutex_lock (&media->lock);
if ((old = media->auth) != auth)
media->auth = auth ? g_object_ref (auth) : NULL;
else
old = NULL;
g_mutex_unlock (&media->lock);
if (old != auth) {
if (auth)
g_object_ref (auth);
media->auth = auth;
if (old)
g_object_unref (old);
}
if (old)
g_object_unref (old);
}
/**
@ -577,8 +620,10 @@ gst_rtsp_media_get_auth (GstRTSPMedia * media)
g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), NULL);
g_mutex_lock (&media->lock);
if ((result = media->auth))
g_object_ref (result);
g_mutex_unlock (&media->lock);
return result;
}
@ -598,6 +643,7 @@ gst_rtsp_media_set_mtu (GstRTSPMedia * media, guint mtu)
g_return_if_fail (GST_IS_RTSP_MEDIA (media));
g_mutex_lock (&media->lock);
media->mtu = mtu;
for (i = 0; i < media->streams->len; i++) {
GstRTSPStream *stream;
@ -607,6 +653,7 @@ gst_rtsp_media_set_mtu (GstRTSPMedia * media, guint mtu)
stream = g_ptr_array_index (media->streams, i);
gst_rtsp_stream_set_mtu (stream, mtu);
}
g_mutex_unlock (&media->lock);
}
/**
@ -618,9 +665,15 @@ gst_rtsp_media_set_mtu (GstRTSPMedia * media, guint mtu)
guint
gst_rtsp_media_get_mtu (GstRTSPMedia * media)
{
guint res;
g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), 0);
return media->mtu;
g_mutex_lock (&media->lock);
res = media->mtu;
g_mutex_unlock (&media->lock);
return res;
}
/**
@ -673,7 +726,9 @@ gst_rtsp_media_collect_streams (GstRTSPMedia * media)
GST_INFO ("found dynamic element %d, %p", i, elem);
g_mutex_lock (&media->lock);
media->dynamic = g_list_prepend (media->dynamic, elem);
g_mutex_unlock (&media->lock);
have_elem = TRUE;
}
@ -707,6 +762,7 @@ gst_rtsp_media_create_stream (GstRTSPMedia * media, GstElement * payloader,
g_return_val_if_fail (GST_IS_PAD (pad), NULL);
g_return_val_if_fail (GST_PAD_IS_SRC (pad), NULL);
g_mutex_lock (&media->lock);
idx = media->streams->len;
name = g_strdup_printf ("src_%u", idx);
@ -720,6 +776,7 @@ gst_rtsp_media_create_stream (GstRTSPMedia * media, GstElement * payloader,
gst_rtsp_stream_set_mtu (stream, media->mtu);
g_ptr_array_add (media->streams, stream);
g_mutex_unlock (&media->lock);
return stream;
}
@ -735,9 +792,15 @@ gst_rtsp_media_create_stream (GstRTSPMedia * media, GstElement * payloader,
guint
gst_rtsp_media_n_streams (GstRTSPMedia * media)
{
guint res;
g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), 0);
return media->streams->len;
g_mutex_lock (&media->lock);
res = media->streams->len;
g_mutex_unlock (&media->lock);
return res;
}
/**
@ -757,10 +820,12 @@ gst_rtsp_media_get_stream (GstRTSPMedia * media, guint idx)
g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), NULL);
g_mutex_lock (&media->lock);
if (idx < media->streams->len)
res = g_ptr_array_index (media->streams, idx);
else
res = NULL;
g_mutex_unlock (&media->lock);
return res;
}
@ -780,6 +845,7 @@ gst_rtsp_media_get_range_string (GstRTSPMedia * media, gboolean play)
gchar *result;
GstRTSPTimeRange range;
g_mutex_lock (&media->lock);
/* make copy */
range = media->range;
@ -787,6 +853,7 @@ gst_rtsp_media_get_range_string (GstRTSPMedia * media, gboolean play)
range.min.type = GST_RTSP_TIME_NOW;
range.min.seconds = -1;
}
g_mutex_unlock (&media->lock);
result = gst_rtsp_range_to_string (&range);
@ -813,10 +880,9 @@ gst_rtsp_media_seek (GstRTSPMedia * media, GstRTSPTimeRange * range)
g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
g_return_val_if_fail (range != NULL, FALSE);
if (!media->seekable) {
GST_INFO ("pipeline is not seekable");
return TRUE;
}
g_rec_mutex_lock (&media->state_lock);
if (!media->seekable)
goto not_seekable;
if (range->unit != GST_RTSP_RANGE_NPT)
goto not_supported;
@ -880,17 +946,26 @@ gst_rtsp_media_seek (GstRTSPMedia * media, GstRTSPTimeRange * range)
GST_INFO ("no seek needed");
res = TRUE;
}
g_rec_mutex_unlock (&media->state_lock);
return res;
/* ERRORS */
not_seekable:
{
g_rec_mutex_unlock (&media->state_lock);
GST_INFO ("pipeline is not seekable");
return TRUE;
}
not_supported:
{
g_rec_mutex_unlock (&media->state_lock);
GST_WARNING ("seek unit %d not supported", range->unit);
return FALSE;
}
weird_type:
{
g_rec_mutex_unlock (&media->state_lock);
GST_WARNING ("weird range type %d not supported", range->min.type);
return FALSE;
}
@ -952,6 +1027,7 @@ default_handle_message (GstRTSPMedia * media, GstMessage * message)
if (media->is_live)
break;
g_rec_mutex_lock (&media->state_lock);
if (percent == 100) {
/* a 100% message means buffering is done */
media->buffering = FALSE;
@ -974,6 +1050,7 @@ default_handle_message (GstRTSPMedia * media, GstMessage * message)
}
}
media->buffering = TRUE;
g_rec_mutex_unlock (&media->state_lock);
}
break;
}
@ -1011,6 +1088,7 @@ default_handle_message (GstRTSPMedia * media, GstMessage * message)
case GST_MESSAGE_STREAM_STATUS:
break;
case GST_MESSAGE_ASYNC_DONE:
g_rec_mutex_lock (&media->state_lock);
if (!media->adding) {
/* when we are dynamically adding pads, the addition of the udpsrc will
* temporarily produce ASYNC_DONE messages. We have to ignore them and
@ -1022,14 +1100,18 @@ default_handle_message (GstRTSPMedia * media, GstMessage * message)
} else {
GST_INFO ("%p: ignoring ASYNC_DONE", media);
}
g_rec_mutex_unlock (&media->state_lock);
break;
case GST_MESSAGE_EOS:
GST_INFO ("%p: got EOS", media);
g_rec_mutex_lock (&media->state_lock);
if (media->status == GST_RTSP_MEDIA_STATUS_UNPREPARING) {
GST_DEBUG ("shutting down after EOS");
finish_unprepare (media);
g_object_unref (media);
}
g_rec_mutex_unlock (&media->state_lock);
break;
default:
GST_INFO ("%p: got message type %s", media,
@ -1067,6 +1149,7 @@ pad_added_cb (GstElement * element, GstPad * pad, GstRTSPMedia * media)
GST_INFO ("pad added %s:%s, stream %d", GST_DEBUG_PAD_NAME (pad),
stream->idx);
g_rec_mutex_lock (&media->state_lock);
/* we will be adding elements below that will cause ASYNC_DONE to be
* posted in the bus. We want to ignore those messages until the
* pipeline really prerolled. */
@ -1078,18 +1161,24 @@ pad_added_cb (GstElement * element, GstPad * pad, GstRTSPMedia * media)
media->rtpbin, GST_STATE_PAUSED);
media->adding = FALSE;
g_rec_mutex_unlock (&media->state_lock);
}
static void
no_more_pads_cb (GstElement * element, GstRTSPMedia * media)
{
GstElement *fakesink;
g_mutex_lock (&media->lock);
GST_INFO ("no more pads");
if (media->fakesink) {
gst_object_ref (media->fakesink);
gst_bin_remove (GST_BIN (media->pipeline), media->fakesink);
gst_element_set_state (media->fakesink, GST_STATE_NULL);
gst_object_unref (media->fakesink);
if ((fakesink = media->fakesink)) {
gst_object_ref (fakesink);
media->fakesink = NULL;
g_mutex_unlock (&media->lock);
gst_bin_remove (GST_BIN (media->pipeline), fakesink);
gst_element_set_state (fakesink, GST_STATE_NULL);
gst_object_unref (fakesink);
GST_INFO ("removed fakesink");
}
}
@ -1116,6 +1205,7 @@ gst_rtsp_media_prepare (GstRTSPMedia * media)
GstBus *bus;
GList *walk;
g_rec_mutex_lock (&media->state_lock);
if (media->status == GST_RTSP_MEDIA_STATUS_PREPARED)
goto was_prepared;
@ -1202,6 +1292,7 @@ gst_rtsp_media_prepare (GstRTSPMedia * media)
case GST_STATE_CHANGE_FAILURE:
goto state_failed;
}
g_rec_mutex_unlock (&media->state_lock);
/* now wait for all pads to be prerolled, FIXME, we should somehow be
* able to do this async so that we don't block the server thread. */
@ -1218,16 +1309,19 @@ gst_rtsp_media_prepare (GstRTSPMedia * media)
/* OK */
was_prepared:
{
g_rec_mutex_unlock (&media->state_lock);
return TRUE;
}
/* ERRORS */
is_reused:
{
g_rec_mutex_unlock (&media->state_lock);
GST_WARNING ("can not reuse media %p", media);
return FALSE;
}
no_rtpbin:
{
g_rec_mutex_unlock (&media->state_lock);
GST_WARNING ("no rtpbin element");
g_warning ("failed to create element 'rtpbin', check your installation");
return FALSE;
@ -1236,10 +1330,12 @@ state_failed:
{
GST_WARNING ("failed to preroll pipeline");
gst_rtsp_media_unprepare (media);
g_rec_mutex_unlock (&media->state_lock);
return FALSE;
}
}
/* must be called with state-lock */
static void
finish_unprepare (GstRTSPMedia * media)
{
@ -1275,6 +1371,7 @@ finish_unprepare (GstRTSPMedia * media)
g_signal_emit (media, gst_rtsp_media_signals[SIGNAL_UNPREPARED], 0, NULL);
}
/* called with state-lock */
static gboolean
default_unprepare (GstRTSPMedia * media)
{
@ -1308,8 +1405,9 @@ gst_rtsp_media_unprepare (GstRTSPMedia * media)
{
gboolean success;
g_rec_mutex_lock (&media->state_lock);
if (media->status == GST_RTSP_MEDIA_STATUS_UNPREPARED)
return TRUE;
goto was_unprepared;
GST_INFO ("unprepare media %p", media);
media->target_state = GST_STATE_NULL;
@ -1324,8 +1422,16 @@ gst_rtsp_media_unprepare (GstRTSPMedia * media)
} else {
finish_unprepare (media);
}
g_rec_mutex_unlock (&media->state_lock);
return success;
was_unprepared:
{
g_rec_mutex_unlock (&media->state_lock);
GST_INFO ("media %p was already unprepared", media);
return TRUE;
}
}
/**
@ -1349,6 +1455,8 @@ gst_rtsp_media_set_state (GstRTSPMedia * media, GstState state,
g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
g_return_val_if_fail (transports != NULL, FALSE);
g_rec_mutex_lock (&media->state_lock);
/* NULL and READY are the same */
if (state == GST_STATE_READY)
state = GST_STATE_NULL;
@ -1427,5 +1535,7 @@ gst_rtsp_media_set_state (GstRTSPMedia * media, GstState state,
old_active != media->n_active))
collect_media_stats (media);
g_rec_mutex_unlock (&media->state_lock);
return TRUE;
}

View file

@ -116,6 +116,7 @@ struct _GstRTSPMedia {
guint mtu;
GstElement *element;
GRecMutex state_lock;
GPtrArray *streams;
GList *dynamic;
GstRTSPMediaStatus status;