typefindelement: Protect internal fields from concurrent changes from different threads

Fixes bug #608877.
This commit is contained in:
Sebastian Dröge 2010-02-04 21:11:25 +01:00
parent 33a38153f4
commit 88736cd675

View file

@ -161,14 +161,22 @@ static void
gst_type_find_element_have_type (GstTypeFindElement * typefind,
guint probability, const GstCaps * caps)
{
GstCaps *copy;
g_assert (caps != NULL);
GST_INFO_OBJECT (typefind, "found caps %" GST_PTR_FORMAT ", probability=%u",
caps, probability);
GST_OBJECT_LOCK (typefind);
if (typefind->caps)
gst_caps_unref (typefind->caps);
typefind->caps = gst_caps_copy (caps);
gst_pad_set_caps (typefind->src, (GstCaps *) caps);
copy = gst_caps_ref (typefind->caps);
GST_OBJECT_UNLOCK (typefind);
gst_pad_set_caps (typefind->src, copy);
gst_caps_unref (copy);
}
static void
@ -290,6 +298,7 @@ gst_type_find_element_dispose (GObject * object)
gst_buffer_unref (typefind->store);
typefind->store = NULL;
}
if (typefind->force_caps) {
gst_caps_unref (typefind->force_caps);
typefind->force_caps = NULL;
@ -336,7 +345,9 @@ gst_type_find_element_get_property (GObject * object, guint prop_id,
switch (prop_id) {
case PROP_CAPS:
GST_OBJECT_LOCK (typefind);
g_value_set_boxed (value, typefind->caps);
GST_OBJECT_UNLOCK (typefind);
break;
case PROP_MINIMUM:
g_value_set_uint (value, typefind->min_probability);
@ -378,8 +389,11 @@ gst_type_find_handle_src_query (GstPad * pad, GstQuery * query)
gint64 peer_pos;
GstFormat format;
if (typefind->store == NULL)
GST_OBJECT_LOCK (typefind);
if (typefind->store == NULL) {
GST_OBJECT_UNLOCK (typefind);
goto out;
}
gst_query_parse_position (query, &format, &peer_pos);
@ -392,6 +406,7 @@ gst_type_find_handle_src_query (GstPad * pad, GstQuery * query)
/* FIXME */
break;
}
GST_OBJECT_UNLOCK (typefind);
gst_query_set_position (query, format, peer_pos);
break;
}
@ -438,9 +453,12 @@ start_typefinding (GstTypeFindElement * typefind)
{
GST_DEBUG_OBJECT (typefind, "starting typefinding");
gst_pad_set_caps (typefind->src, NULL);
if (typefind->caps) {
GST_OBJECT_LOCK (typefind);
if (typefind->caps)
gst_caps_replace (&typefind->caps, NULL);
}
GST_OBJECT_UNLOCK (typefind);
typefind->mode = MODE_TYPEFIND;
}
@ -457,14 +475,20 @@ stop_typefinding (GstTypeFindElement * typefind)
GST_DEBUG_OBJECT (typefind, "stopping typefinding%s",
push_cached_buffers ? " and pushing cached buffers" : "");
GST_OBJECT_LOCK (typefind);
if (typefind->store) {
GstBuffer *store = typefind->store;
typefind->store = NULL;
gst_buffer_set_caps (store, typefind->caps);
GST_OBJECT_UNLOCK (typefind);
if (!push_cached_buffers) {
gst_buffer_unref (typefind->store);
gst_buffer_unref (store);
} else {
GstPad *peer = gst_pad_get_peer (typefind->src);
typefind->mode = MODE_NORMAL;
gst_buffer_set_caps (typefind->store, typefind->caps);
/* make sure the user gets a meaningful error message in this case,
* which is not a core bug or bug of any kind (as the default error
@ -480,15 +504,17 @@ stop_typefinding (GstTypeFindElement * typefind)
"element does not support pull mode",
GST_DEBUG_PAD_NAME (peer)));
typefind->mode = MODE_ERROR; /* make the chain function error out */
gst_buffer_unref (store);
} else {
gst_type_find_element_send_cached_events (typefind);
gst_pad_push (typefind->src, typefind->store);
gst_pad_push (typefind->src, store);
}
if (peer)
gst_object_unref (peer);
}
typefind->store = NULL;
} else {
GST_OBJECT_UNLOCK (typefind);
}
}
@ -512,9 +538,11 @@ gst_type_find_element_handle_event (GstPad * pad, GstEvent * event)
/* we might not have started typefinding yet because there was not
* enough data so far; just give it a shot now and see what we get */
GST_OBJECT_LOCK (typefind);
if (typefind->store) {
caps = gst_type_find_helper_for_buffer (GST_OBJECT (typefind),
typefind->store, &prob);
GST_OBJECT_UNLOCK (typefind);
if (caps && prob >= typefind->min_probability) {
g_signal_emit (typefind, gst_type_find_element_signals[HAVE_TYPE],
@ -525,6 +553,7 @@ gst_type_find_element_handle_event (GstPad * pad, GstEvent * event)
}
gst_caps_replace (&caps, NULL);
} else {
GST_OBJECT_UNLOCK (typefind);
/* keep message in sync with the one in the pad activate function */
GST_ELEMENT_ERROR (typefind, STREAM, TYPE_NOT_FOUND,
(_("Stream contains no data.")),
@ -536,11 +565,13 @@ gst_type_find_element_handle_event (GstPad * pad, GstEvent * event)
break;
}
case GST_EVENT_FLUSH_STOP:
GST_OBJECT_LOCK (typefind);
g_list_foreach (typefind->cached_events,
(GFunc) gst_mini_object_unref, NULL);
g_list_free (typefind->cached_events);
typefind->cached_events = NULL;
gst_buffer_replace (&typefind->store, NULL);
GST_OBJECT_UNLOCK (typefind);
/* fall through */
case GST_EVENT_FLUSH_START:
res = gst_pad_push_event (typefind->src, event);
@ -548,8 +579,10 @@ gst_type_find_element_handle_event (GstPad * pad, GstEvent * event)
default:
GST_DEBUG_OBJECT (typefind, "Saving %s event to send later",
GST_EVENT_TYPE_NAME (event));
GST_OBJECT_LOCK (typefind);
typefind->cached_events =
g_list_append (typefind->cached_events, event);
GST_OBJECT_UNLOCK (typefind);
res = TRUE;
break;
}
@ -568,17 +601,21 @@ gst_type_find_element_handle_event (GstPad * pad, GstEvent * event)
static void
gst_type_find_element_send_cached_events (GstTypeFindElement * typefind)
{
GList *l;
GList *l, *cached_events;
for (l = typefind->cached_events; l != NULL; l = l->next) {
GST_OBJECT_LOCK (typefind);
cached_events = typefind->cached_events;
typefind->cached_events = NULL;
GST_OBJECT_UNLOCK (typefind);
for (l = cached_events; l != NULL; l = l->next) {
GstEvent *event = GST_EVENT (l->data);
GST_DEBUG_OBJECT (typefind, "sending cached %s event",
GST_EVENT_TYPE_NAME (event));
gst_pad_push_event (typefind->src, event);
}
g_list_free (typefind->cached_events);
typefind->cached_events = NULL;
g_list_free (cached_events);
}
static gboolean
@ -602,12 +639,19 @@ gst_type_find_element_setcaps (GstPad * pad, GstCaps * caps)
typefind->mode = MODE_NORMAL;
gst_type_find_element_send_cached_events (typefind);
GST_OBJECT_LOCK (typefind);
if (typefind->store) {
GST_DEBUG_OBJECT (typefind, "Pushing store: %d",
GST_BUFFER_SIZE (typefind->store));
gst_buffer_set_caps (typefind->store, typefind->caps);
gst_pad_push (typefind->src, typefind->store);
GstBuffer *store = typefind->store;
typefind->store = NULL;
GST_DEBUG_OBJECT (typefind, "Pushing store: %d", GST_BUFFER_SIZE (store));
gst_buffer_set_caps (store, typefind->caps);
GST_OBJECT_UNLOCK (typefind);
gst_pad_push (typefind->src, store);
} else {
GST_OBJECT_UNLOCK (typefind);
}
}
@ -709,13 +753,17 @@ gst_type_find_element_chain (GstPad * pad, GstBuffer * buffer)
/* we should already have called GST_ELEMENT_ERROR */
return GST_FLOW_ERROR;
case MODE_NORMAL:
GST_OBJECT_LOCK (typefind);
gst_buffer_set_caps (buffer, typefind->caps);
GST_OBJECT_UNLOCK (typefind);
return gst_pad_push (typefind->src, buffer);
case MODE_TYPEFIND:{
GST_OBJECT_LOCK (typefind);
if (typefind->store)
typefind->store = gst_buffer_join (typefind->store, buffer);
else
typefind->store = buffer;
GST_OBJECT_UNLOCK (typefind);
res = gst_type_find_element_chain_do_typefinding (typefind);
@ -738,20 +786,23 @@ gst_type_find_element_chain_do_typefinding (GstTypeFindElement * typefind)
GstTypeFindProbability probability;
GstCaps *caps;
GST_OBJECT_LOCK (typefind);
if (GST_BUFFER_SIZE (typefind->store) < TYPE_FIND_MIN_SIZE) {
GST_DEBUG_OBJECT (typefind, "not enough data for typefinding yet "
"(%u bytes)", GST_BUFFER_SIZE (typefind->store));
GST_OBJECT_UNLOCK (typefind);
return GST_FLOW_OK;
}
caps = gst_type_find_helper_for_buffer (GST_OBJECT (typefind),
typefind->store, &probability);
if (caps == NULL && GST_BUFFER_SIZE (typefind->store) > TYPE_FIND_MAX_SIZE) {
GST_OBJECT_UNLOCK (typefind);
GST_ELEMENT_ERROR (typefind, STREAM, TYPE_NOT_FOUND, (NULL), (NULL));
stop_typefinding (typefind);
return GST_FLOW_ERROR;
} else if (caps == NULL) {
GST_OBJECT_UNLOCK (typefind);
GST_DEBUG_OBJECT (typefind, "no caps found with %u bytes of data, "
"waiting for more data", GST_BUFFER_SIZE (typefind->store));
return GST_FLOW_OK;
@ -766,14 +817,17 @@ gst_type_find_element_chain_do_typefinding (GstTypeFindElement * typefind)
gst_caps_replace (&caps, NULL);
if (GST_BUFFER_SIZE (typefind->store) >= TYPE_FIND_MAX_SIZE) {
GST_OBJECT_UNLOCK (typefind);
GST_ELEMENT_ERROR (typefind, STREAM, TYPE_NOT_FOUND, (NULL), (NULL));
stop_typefinding (typefind);
return GST_FLOW_ERROR;
}
GST_OBJECT_UNLOCK (typefind);
GST_DEBUG_OBJECT (typefind, "waiting for more data to try again");
return GST_FLOW_OK;
}
GST_OBJECT_UNLOCK (typefind);
/* probability is good enough too, so let's make it known ... */
g_signal_emit (typefind, gst_type_find_element_signals[HAVE_TYPE], 0,
@ -806,8 +860,11 @@ gst_type_find_element_getrange (GstPad * srcpad,
ret = gst_pad_pull_range (typefind->sink, offset, length, buffer);
if (ret == GST_FLOW_OK && buffer && *buffer)
if (ret == GST_FLOW_OK && buffer && *buffer) {
GST_OBJECT_LOCK (typefind);
gst_buffer_set_caps (*buffer, typefind->caps);
GST_OBJECT_UNLOCK (typefind);
}
return ret;
}
@ -832,11 +889,14 @@ gst_type_find_element_activate (GstPad * pad)
typefind = GST_TYPE_FIND_ELEMENT (GST_OBJECT_PARENT (pad));
/* if we have force caps, use those */
GST_OBJECT_LOCK (typefind);
if (typefind->force_caps) {
found_caps = gst_caps_ref (typefind->force_caps);
probability = GST_TYPE_FIND_MAXIMUM;
GST_OBJECT_UNLOCK (typefind);
goto done;
}
GST_OBJECT_UNLOCK (typefind);
/* 1. try to activate in pull mode. if not, switch to push and succeed.
2. try to pull type find.
@ -943,12 +1003,14 @@ gst_type_find_element_change_state (GstElement * element,
switch (transition) {
case GST_STATE_CHANGE_PAUSED_TO_READY:
case GST_STATE_CHANGE_READY_TO_NULL:
GST_OBJECT_LOCK (typefind);
gst_caps_replace (&typefind->caps, NULL);
g_list_foreach (typefind->cached_events,
(GFunc) gst_mini_object_unref, NULL);
g_list_free (typefind->cached_events);
typefind->cached_events = NULL;
GST_OBJECT_UNLOCK (typefind);
break;
default:
break;