gst/rtpmanager/: Protect lists and structures with locks.

Original commit message from CVS:
* gst/rtpmanager/gstrtpbin.c: (create_session), (get_pt_map),
(gst_rtp_bin_init), (gst_rtp_bin_finalize), (new_ssrc_pad_found),
(create_recv_rtp), (gst_rtp_bin_request_new_pad):
* gst/rtpmanager/gstrtpbin.h:
* gst/rtpmanager/gstrtpclient.c:
* gst/rtpmanager/gstrtpsession.c: (gst_rtp_session_class_init),
(gst_rtp_session_init), (gst_rtp_session_finalize),
(gst_rtp_session_event_recv_rtp_sink),
(gst_rtp_session_event_recv_rtcp_sink),
(gst_rtp_session_chain_recv_rtcp),
(gst_rtp_session_request_new_pad):
Protect lists and structures with locks.
Return FLOW_OK from RTCP messages for now.
This commit is contained in:
Wim Taymans 2007-04-13 09:20:55 +00:00 committed by Tim-Philipp Müller
parent 8bbea77a41
commit 490113d40d
4 changed files with 71 additions and 12 deletions

View file

@ -101,9 +101,12 @@ GST_STATIC_PAD_TEMPLATE ("send_rtp_src_%d",
#define GST_RTP_BIN_GET_PRIVATE(obj) \ #define GST_RTP_BIN_GET_PRIVATE(obj) \
(G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTP_BIN, GstRTPBinPrivate)) (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTP_BIN, GstRTPBinPrivate))
#define GST_RTP_BIN_LOCK(bin) g_mutex_lock ((bin)->priv->bin_lock)
#define GST_RTP_BIN_UNLOCK(bin) g_mutex_unlock ((bin)->priv->bin_lock)
struct _GstRTPBinPrivate struct _GstRTPBinPrivate
{ {
guint foo; GMutex *bin_lock;
}; };
/* signals and args */ /* signals and args */
@ -151,6 +154,9 @@ struct _GstRTPBinStream
gulong demux_ptreq_sig; gulong demux_ptreq_sig;
}; };
#define GST_RTP_SESSION_LOCK(sess) g_mutex_lock ((sess)->lock)
#define GST_RTP_SESSION_UNLOCK(sess) g_mutex_unlock ((sess)->lock)
/* Manages the receiving end of the packets. /* Manages the receiving end of the packets.
* *
* There is one such structure for each RTP session (audio/video/...). * There is one such structure for each RTP session (audio/video/...).
@ -171,6 +177,8 @@ struct _GstRTPBinSession
GstElement *demux; GstElement *demux;
gulong demux_newpad_sig; gulong demux_newpad_sig;
GMutex *lock;
/* list of GstRTPBinStream */ /* list of GstRTPBinStream */
GSList *streams; GSList *streams;
@ -187,7 +195,7 @@ struct _GstRTPBinSession
GstPad *rtcp_src; GstPad *rtcp_src;
}; };
/* find a session with the given id */ /* find a session with the given id. Must be called with RTP_BIN_LOCK */
static GstRTPBinSession * static GstRTPBinSession *
find_session_by_id (GstRTPBin * rtpbin, gint id) find_session_by_id (GstRTPBin * rtpbin, gint id)
{ {
@ -202,7 +210,7 @@ find_session_by_id (GstRTPBin * rtpbin, gint id)
return NULL; return NULL;
} }
/* create a session with the given id */ /* create a session with the given id. Must be called with RTP_BIN_LOCK */
static GstRTPBinSession * static GstRTPBinSession *
create_session (GstRTPBin * rtpbin, gint id) create_session (GstRTPBin * rtpbin, gint id)
{ {
@ -216,6 +224,7 @@ create_session (GstRTPBin * rtpbin, gint id)
goto no_demux; goto no_demux;
sess = g_new0 (GstRTPBinSession, 1); sess = g_new0 (GstRTPBinSession, 1);
sess->lock = g_mutex_new ();
sess->id = id; sess->id = id;
sess->bin = rtpbin; sess->bin = rtpbin;
sess->session = elem; sess->session = elem;
@ -271,11 +280,12 @@ get_pt_map (GstRTPBinSession * session, guint pt)
GST_DEBUG ("searching pt %d in cache", pt); GST_DEBUG ("searching pt %d in cache", pt);
GST_RTP_SESSION_LOCK (session);
/* first look in the cache */ /* first look in the cache */
caps = g_hash_table_lookup (session->ptmap, GINT_TO_POINTER (pt)); caps = g_hash_table_lookup (session->ptmap, GINT_TO_POINTER (pt));
if (caps) { if (caps)
goto done; goto done;
}
bin = session->bin; bin = session->bin;
@ -304,16 +314,21 @@ get_pt_map (GstRTPBinSession * session, guint pt)
g_hash_table_insert (session->ptmap, GINT_TO_POINTER (pt), caps); g_hash_table_insert (session->ptmap, GINT_TO_POINTER (pt), caps);
done: done:
GST_RTP_SESSION_UNLOCK (session);
return caps; return caps;
/* ERRORS */ /* ERRORS */
no_caps: no_caps:
{ {
GST_RTP_SESSION_UNLOCK (session);
GST_DEBUG ("no pt map could be obtained"); GST_DEBUG ("no pt map could be obtained");
return NULL; return NULL;
} }
} }
/* create a new stream with @ssrc in @session. Must be called with
* RTP_SESSION_LOCK. */
static GstRTPBinStream * static GstRTPBinStream *
create_stream (GstRTPBinSession * session, guint32 ssrc) create_stream (GstRTPBinSession * session, guint32 ssrc)
{ {
@ -457,6 +472,7 @@ static void
gst_rtp_bin_init (GstRTPBin * rtpbin, GstRTPBinClass * klass) gst_rtp_bin_init (GstRTPBin * rtpbin, GstRTPBinClass * klass)
{ {
rtpbin->priv = GST_RTP_BIN_GET_PRIVATE (rtpbin); rtpbin->priv = GST_RTP_BIN_GET_PRIVATE (rtpbin);
rtpbin->priv->bin_lock = g_mutex_new ();
rtpbin->provided_clock = gst_system_clock_obtain (); rtpbin->provided_clock = gst_system_clock_obtain ();
} }
@ -467,6 +483,8 @@ gst_rtp_bin_finalize (GObject * object)
rtpbin = GST_RTP_BIN (object); rtpbin = GST_RTP_BIN (object);
g_mutex_free (rtpbin->priv->bin_lock);
G_OBJECT_CLASS (parent_class)->finalize (object); G_OBJECT_CLASS (parent_class)->finalize (object);
} }
@ -608,6 +626,8 @@ new_ssrc_pad_found (GstElement * element, guint ssrc, GstPad * pad,
GST_DEBUG_OBJECT (session->bin, "new SSRC pad %08x", ssrc); GST_DEBUG_OBJECT (session->bin, "new SSRC pad %08x", ssrc);
GST_RTP_SESSION_LOCK (session);
/* create new stream */ /* create new stream */
stream = create_stream (session, ssrc); stream = create_stream (session, ssrc);
if (!stream) if (!stream)
@ -629,17 +649,21 @@ new_ssrc_pad_found (GstElement * element, guint ssrc, GstPad * pad,
stream->demux_ptreq_sig = g_signal_connect (stream->demux, stream->demux_ptreq_sig = g_signal_connect (stream->demux,
"request-pt-map", (GCallback) pt_map_requested, stream); "request-pt-map", (GCallback) pt_map_requested, stream);
GST_RTP_SESSION_UNLOCK (session);
return; return;
/* ERRORS */ /* ERRORS */
no_stream: no_stream:
{ {
GST_RTP_SESSION_UNLOCK (session);
GST_DEBUG ("could not create stream"); GST_DEBUG ("could not create stream");
return; return;
} }
} }
/* Create a pad for receiving RTP for the session in @name /* Create a pad for receiving RTP for the session in @name. Must be called with
* RTP_BIN_LOCK.
*/ */
static GstPad * static GstPad *
create_recv_rtp (GstRTPBin * rtpbin, GstPadTemplate * templ, const gchar * name) create_recv_rtp (GstRTPBin * rtpbin, GstPadTemplate * templ, const gchar * name)
@ -664,6 +688,7 @@ create_recv_rtp (GstRTPBin * rtpbin, GstPadTemplate * templ, const gchar * name)
if (session == NULL) if (session == NULL)
goto create_error; goto create_error;
} }
/* check if pad was requested */ /* check if pad was requested */
if (session->recv_rtp_sink != NULL) if (session->recv_rtp_sink != NULL)
goto existed; goto existed;
@ -729,7 +754,8 @@ link_failed:
} }
} }
/* Create a pad for receiving RTCP for the session in @name /* Create a pad for receiving RTCP for the session in @name. Must be called with
* RTP_BIN_LOCK.
*/ */
static GstPad * static GstPad *
create_recv_rtcp (GstRTPBin * rtpbin, GstPadTemplate * templ, create_recv_rtcp (GstRTPBin * rtpbin, GstPadTemplate * templ,
@ -821,7 +847,8 @@ link_failed:
#endif #endif
} }
/* Create a pad for sending RTP for the session in @name /* Create a pad for sending RTP for the session in @name. Must be called with
* RTP_BIN_LOCK.
*/ */
static GstPad * static GstPad *
create_send_rtp (GstRTPBin * rtpbin, GstPadTemplate * templ, const gchar * name) create_send_rtp (GstRTPBin * rtpbin, GstPadTemplate * templ, const gchar * name)
@ -905,7 +932,8 @@ no_srcpad:
} }
} }
/* Create a pad for sending RTCP for the session in @name /* Create a pad for sending RTCP for the session in @name. Must be called with
* RTP_BIN_LOCK.
*/ */
static GstPad * static GstPad *
create_rtcp (GstRTPBin * rtpbin, GstPadTemplate * templ, const gchar * name) create_rtcp (GstRTPBin * rtpbin, GstPadTemplate * templ, const gchar * name)
@ -978,6 +1006,8 @@ gst_rtp_bin_request_new_pad (GstElement * element,
rtpbin = GST_RTP_BIN (element); rtpbin = GST_RTP_BIN (element);
klass = GST_ELEMENT_GET_CLASS (element); klass = GST_ELEMENT_GET_CLASS (element);
GST_RTP_BIN_LOCK (rtpbin);
/* figure out the template */ /* figure out the template */
if (templ == gst_element_class_get_pad_template (klass, "recv_rtp_sink_%d")) { if (templ == gst_element_class_get_pad_template (klass, "recv_rtp_sink_%d")) {
result = create_recv_rtp (rtpbin, templ, name); result = create_recv_rtp (rtpbin, templ, name);
@ -992,11 +1022,14 @@ gst_rtp_bin_request_new_pad (GstElement * element,
} else } else
goto wrong_template; goto wrong_template;
GST_RTP_BIN_UNLOCK (rtpbin);
return result; return result;
/* ERRORS */ /* ERRORS */
wrong_template: wrong_template:
{ {
GST_RTP_BIN_UNLOCK (rtpbin);
g_warning ("rtpbin: this is not our template"); g_warning ("rtpbin: this is not our template");
return NULL; return NULL;
} }

View file

@ -42,6 +42,7 @@ struct _GstRTPBin {
/* a list of session */ /* a list of session */
GSList *sessions; GSList *sessions;
/* clock we provide */
GstClock *provided_clock; GstClock *provided_clock;
/*< private >*/ /*< private >*/

View file

@ -83,6 +83,7 @@ GST_STATIC_PAD_TEMPLATE ("rtp_src_%d_%d",
struct _GstRTPClientPrivate struct _GstRTPClientPrivate
{ {
gint foo;
}; };
/* all the info needed to handle the stream with SSRC */ /* all the info needed to handle the stream with SSRC */

View file

@ -114,6 +114,17 @@ enum
PROP_0 PROP_0
}; };
#define GST_RTP_SESSION_GET_PRIVATE(obj) \
(G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTP_SESSION, GstRTPSessionPrivate))
#define GST_RTP_SESSION_LOCK(sess) g_mutex_lock ((sess)->priv->lock)
#define GST_RTP_SESSION_UNLOCK(sess) g_mutex_unlock ((sess)->priv->lock)
struct _GstRTPSessionPrivate
{
GMutex *lock;
};
/* GObject vmethods */ /* GObject vmethods */
static void gst_rtp_session_finalize (GObject * object); static void gst_rtp_session_finalize (GObject * object);
static void gst_rtp_session_set_property (GObject * object, guint prop_id, static void gst_rtp_session_set_property (GObject * object, guint prop_id,
@ -167,6 +178,8 @@ gst_rtp_session_class_init (GstRTPSessionClass * klass)
gobject_class = (GObjectClass *) klass; gobject_class = (GObjectClass *) klass;
gstelement_class = (GstElementClass *) klass; gstelement_class = (GstElementClass *) klass;
g_type_class_add_private (klass, sizeof (GstRTPSessionPrivate));
gobject_class->finalize = gst_rtp_session_finalize; gobject_class->finalize = gst_rtp_session_finalize;
gobject_class->set_property = gst_rtp_session_set_property; gobject_class->set_property = gst_rtp_session_set_property;
gobject_class->get_property = gst_rtp_session_get_property; gobject_class->get_property = gst_rtp_session_get_property;
@ -185,6 +198,8 @@ gst_rtp_session_class_init (GstRTPSessionClass * klass)
static void static void
gst_rtp_session_init (GstRTPSession * rtpsession, GstRTPSessionClass * klass) gst_rtp_session_init (GstRTPSession * rtpsession, GstRTPSessionClass * klass)
{ {
rtpsession->priv = GST_RTP_SESSION_GET_PRIVATE (rtpsession);
rtpsession->priv->lock = g_mutex_new ();
} }
static void static void
@ -193,6 +208,7 @@ gst_rtp_session_finalize (GObject * object)
GstRTPSession *rtpsession; GstRTPSession *rtpsession;
rtpsession = GST_RTP_SESSION (object); rtpsession = GST_RTP_SESSION (object);
g_mutex_free (rtpsession->priv->lock);
G_OBJECT_CLASS (parent_class)->finalize (object); G_OBJECT_CLASS (parent_class)->finalize (object);
} }
@ -269,7 +285,8 @@ gst_rtp_session_event_recv_rtp_sink (GstPad * pad, GstEvent * event)
rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad)); rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
GST_DEBUG_OBJECT (rtpsession, "received event"); GST_DEBUG_OBJECT (rtpsession, "received event %s",
GST_EVENT_TYPE_NAME (event));
switch (GST_EVENT_TYPE (event)) { switch (GST_EVENT_TYPE (event)) {
default: default:
@ -310,7 +327,8 @@ gst_rtp_session_event_recv_rtcp_sink (GstPad * pad, GstEvent * event)
rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad)); rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
GST_DEBUG_OBJECT (rtpsession, "received event"); GST_DEBUG_OBJECT (rtpsession, "received event %s",
GST_EVENT_TYPE_NAME (event));
switch (GST_EVENT_TYPE (event)) { switch (GST_EVENT_TYPE (event)) {
default: default:
@ -340,7 +358,7 @@ gst_rtp_session_chain_recv_rtcp (GstPad * pad, GstBuffer * buffer)
gst_object_unref (rtpsession); gst_object_unref (rtpsession);
return ret; return GST_FLOW_OK;
} }
static GstFlowReturn static GstFlowReturn
@ -504,6 +522,8 @@ gst_rtp_session_request_new_pad (GstElement * element,
GST_DEBUG_OBJECT (element, "requesting pad %s", GST_STR_NULL (name)); GST_DEBUG_OBJECT (element, "requesting pad %s", GST_STR_NULL (name));
GST_RTP_SESSION_LOCK (rtpsession);
/* figure out the template */ /* figure out the template */
if (templ == gst_element_class_get_pad_template (klass, "recv_rtp_sink")) { if (templ == gst_element_class_get_pad_template (klass, "recv_rtp_sink")) {
if (rtpsession->recv_rtp_sink != NULL) if (rtpsession->recv_rtp_sink != NULL)
@ -530,16 +550,20 @@ gst_rtp_session_request_new_pad (GstElement * element,
} else } else
goto wrong_template; goto wrong_template;
GST_RTP_SESSION_UNLOCK (rtpsession);
return result; return result;
/* ERRORS */ /* ERRORS */
wrong_template: wrong_template:
{ {
GST_RTP_SESSION_UNLOCK (rtpsession);
g_warning ("rtpsession: this is not our template"); g_warning ("rtpsession: this is not our template");
return NULL; return NULL;
} }
exists: exists:
{ {
GST_RTP_SESSION_UNLOCK (rtpsession);
g_warning ("rtpsession: pad already requested"); g_warning ("rtpsession: pad already requested");
return NULL; return NULL;
} }