decodebin2: refactoring and race fixes

Refactor some code so that we can take the right locks and in the right order.
Fixes quite a bit of races already.
This commit is contained in:
Wim Taymans 2009-03-19 19:19:38 +01:00
parent 2f39597919
commit 17e7948325

View file

@ -284,7 +284,7 @@ static GstPad *gst_decode_group_control_demuxer_pad (GstDecodeGroup * group,
static gboolean gst_decode_group_control_source_pad (GstDecodeGroup * group,
GstDecodePad * pad);
static gboolean gst_decode_group_expose (GstDecodeGroup * group);
static void gst_decode_group_check_if_blocked (GstDecodeGroup * group);
static gboolean gst_decode_group_check_if_blocked (GstDecodeGroup * group);
static void gst_decode_group_set_complete (GstDecodeGroup * group);
static void gst_decode_group_hide (GstDecodeGroup * group);
static void gst_decode_group_free (GstDecodeGroup * group);
@ -1478,27 +1478,32 @@ pad_added_group_cb (GstElement * element, GstPad * pad, GstDecodeGroup * group)
{
GstCaps *caps;
gboolean expose = FALSE;
GstDecodeBin *dbin;
dbin = group->dbin;
GST_DEBUG_OBJECT (pad, "pad added, group:%p", group);
caps = gst_pad_get_caps (pad);
analyze_new_pad (group->dbin, element, pad, caps, group);
analyze_new_pad (dbin, element, pad, caps, group);
if (caps)
gst_caps_unref (caps);
GROUP_MUTEX_LOCK (group);
group->nbdynamic--;
if (group->nbdynamic > 0)
group->nbdynamic--;
GST_LOG ("Group %p has now %d dynamic objects", group, group->nbdynamic);
if (group->nbdynamic == 0)
expose = TRUE;
GROUP_MUTEX_UNLOCK (group);
if (expose) {
GST_LOG
("That was the last dynamic object, now attempting to expose the group");
DECODE_BIN_LOCK (group->dbin);
gst_decode_group_expose (group);
DECODE_BIN_UNLOCK (group->dbin);
GST_LOG_OBJECT (dbin,
"That was the last dynamic object, now attempting to expose the group");
DECODE_BIN_LOCK (dbin);
if (!gst_decode_group_expose (group))
GST_WARNING_OBJECT (dbin, "Couldn't expose group");
DECODE_BIN_UNLOCK (dbin);
}
}
@ -1517,7 +1522,7 @@ no_more_pads_group_cb (GstElement * element, GstDecodeGroup * group)
{
GST_LOG_OBJECT (element, "no more pads, setting group %p to complete", group);
/* FIXME : FILLME */
/* when we received no_more_pads, we can complete the pads of the group */
gst_decode_group_set_complete (group);
}
@ -1681,19 +1686,42 @@ are_raw_caps (GstDecodeBin * dbin, GstCaps * caps)
* GstDecodeGroup functions
****/
/* The overrun callback is used to expose groups that have not yet had their
* no_more_pads called while the (large) multiqueue overflowed. When this
* happens we must assume that the no_more_pads will not arrive anymore and we
* must expose the pads that we have.
*/
static void
multi_queue_overrun_cb (GstElement * queue, GstDecodeGroup * group)
{
GST_LOG_OBJECT (group->dbin, "multiqueue is full");
GstDecodeBin *dbin;
gboolean expose;
/* if we haven't exposed the group, do it */
DECODE_BIN_LOCK (group->dbin);
/* decrease the number of dynamic elements, we don't expect anything anymore
* and we need the groups to be 0 for the expose to work */
if (group->nbdynamic > 0)
group->nbdynamic--;
gst_decode_group_expose (group);
DECODE_BIN_UNLOCK (group->dbin);
dbin = group->dbin;
GST_LOG_OBJECT (dbin, "multiqueue %p is full", queue);
GROUP_MUTEX_LOCK (group);
if (group->complete) {
/* the group was already complete (had the no_more_pads called), we
* can ignore the overrun signal, the last remaining dynamic element
* will expose the group eventually. */
GST_LOG_OBJECT (dbin, "group %p was already complete", group);
expose = FALSE;
} else {
/* set number of dynamic element to 0, we don't expect anything anymore
* and we need the groups to be 0 for the expose to work */
group->nbdynamic = 0;
expose = TRUE;
}
GROUP_MUTEX_UNLOCK (group);
if (expose) {
DECODE_BIN_LOCK (dbin);
if (!gst_decode_group_expose (group))
GST_WARNING_OBJECT (dbin, "Couldn't expose group");
DECODE_BIN_UNLOCK (group->dbin);
}
}
/* gst_decode_group_new:
@ -1875,8 +1903,10 @@ gst_decode_group_control_source_pad (GstDecodeGroup * group,
* and will ghost/expose all pads on decodebin if the group is the current one.
*
* Call with the group lock taken ! MT safe
*
* Returns: TRUE when the group is completely blocked and ready to be exposed.
*/
static void
static gboolean
gst_decode_group_check_if_blocked (GstDecodeGroup * group)
{
GList *tmp;
@ -1885,10 +1915,10 @@ gst_decode_group_check_if_blocked (GstDecodeGroup * group)
GST_LOG ("group : %p , ->complete:%d , ->nbdynamic:%d",
group, group->complete, group->nbdynamic);
/* 1. don't do anything if group is not complete */
/* don't do anything if group is not complete */
if (!group->complete || group->nbdynamic) {
GST_DEBUG_OBJECT (group->dbin, "Group isn't complete yet");
return;
return FALSE;
}
for (tmp = group->endpads; tmp; tmp = g_list_next (tmp)) {
@ -1900,57 +1930,29 @@ gst_decode_group_check_if_blocked (GstDecodeGroup * group)
}
}
/* 2. Update status of group */
/* Update status of group */
group->blocked = blocked;
GST_LOG ("group is blocked:%d", blocked);
/* 3. don't do anything if not blocked completely */
if (!blocked)
return;
/* 4. if we're the current group, expose pads */
DECODE_BIN_LOCK (group->dbin);
if (!gst_decode_group_expose (group))
GST_WARNING_OBJECT (group->dbin, "Couldn't expose group");
DECODE_BIN_UNLOCK (group->dbin);
return blocked;
}
static void
gst_decode_group_check_if_drained (GstDecodeGroup * group)
/* activate the next group when there is one
*
* Returns: TRUE when group was the active group and there was a
* next group to activate.
*/
static gboolean
gst_decode_bin_activate_next_group (GstDecodeBin * dbin, GstDecodeGroup * group)
{
GList *tmp;
GstDecodeBin *dbin = group->dbin;
gboolean drained = TRUE;
GST_LOG ("group : %p", group);
gboolean have_next = FALSE;
DECODE_BIN_LOCK (dbin);
/* Ensure we only emit the drained signal once, for this group */
if (group->drained) {
drained = FALSE;
goto done;
}
for (tmp = group->endpads; tmp; tmp = g_list_next (tmp)) {
GstDecodePad *dpad = (GstDecodePad *) tmp->data;
GST_LOG ("testing dpad %p", dpad);
if (!dpad->drained) {
drained = FALSE;
break;
}
}
group->drained = drained;
if (!drained)
goto done;
/* we are drained. Check if there is a next group to activate */
/* Check if there is a next group to activate */
if ((group == dbin->activegroup) && dbin->groups) {
GstDecodeGroup *newgroup;
/* get the next group */
newgroup = (GstDecodeGroup *) dbin->groups->data;
GST_DEBUG_OBJECT (dbin, "Switching to new group");
@ -1959,18 +1961,69 @@ gst_decode_group_check_if_drained (GstDecodeGroup * group)
gst_decode_group_hide (group);
/* expose next group */
gst_decode_group_expose (newgroup);
/* we're not yet drained now */
drained = FALSE;
}
done:
/* we have a next group */
have_next = TRUE;
}
DECODE_BIN_UNLOCK (dbin);
return have_next;
}
/* check if the group is drained, meaning all pads have seen an EOS
* event. */
static void
gst_decode_pad_handle_eos (GstDecodePad * pad)
{
GList *tmp;
GstDecodeBin *dbin;
GstDecodeGroup *group;
gboolean drained = TRUE;
group = pad->group;
dbin = group->dbin;
GST_LOG_OBJECT (dbin, "group : %p, pad %p", group, pad);
GROUP_MUTEX_LOCK (group);
/* mark pad as drained */
pad->drained = TRUE;
/* Ensure we only emit the drained signal once, for this group */
if (group->drained)
goto was_drained;
for (tmp = group->endpads; tmp; tmp = g_list_next (tmp)) {
GstDecodePad *dpad = (GstDecodePad *) tmp->data;
GST_LOG_OBJECT (dbin, "testing dpad %p", dpad);
if (!dpad->drained) {
drained = FALSE;
break;
}
}
group->drained = drained;
GROUP_MUTEX_UNLOCK (group);
if (drained) {
/* no more groups to activate, we're completely drained now */
GST_LOG ("all groups drained, fire signal");
g_signal_emit (G_OBJECT (dbin), gst_decode_bin_signals[SIGNAL_DRAINED], 0,
NULL);
/* the current group is completely drained, try to activate the next
* group. this function returns FALSE if there was no next group activated
* and so we are really drained. */
if (!gst_decode_bin_activate_next_group (dbin, group)) {
/* no more groups to activate, we're completely drained now */
GST_LOG_OBJECT (dbin, "all groups drained, fire signal");
g_signal_emit (G_OBJECT (dbin), gst_decode_bin_signals[SIGNAL_DRAINED], 0,
NULL);
}
}
return;
was_drained:
{
GST_LOG_OBJECT (dbin, "group was already drained");
GROUP_MUTEX_UNLOCK (group);
return;
}
}
@ -2161,6 +2214,7 @@ name_problem:
}
}
/* must be called with the decodebin lock */
static void
gst_decode_group_hide (GstDecodeGroup * group)
{
@ -2174,7 +2228,6 @@ gst_decode_group_hide (GstDecodeGroup * group)
}
GROUP_MUTEX_LOCK (group);
/* Remove ghost pads */
for (tmp = group->endpads; tmp; tmp = g_list_next (tmp)) {
GstDecodePad *dpad = (GstDecodePad *) tmp->data;
@ -2183,9 +2236,7 @@ gst_decode_group_hide (GstDecodeGroup * group)
gst_element_remove_pad (GST_ELEMENT (group->dbin), GST_PAD (dpad));
dpad->added = FALSE;
}
group->exposed = FALSE;
GROUP_MUTEX_UNLOCK (group);
group->dbin->activegroup = NULL;
@ -2311,21 +2362,37 @@ gst_decode_group_free (GstDecodeGroup * group)
/* gst_decode_group_set_complete:
*
* Mark the group as complete. This means no more streams will be controlled
* through this group.
* through this group. This method is usually called when we got no_more_pads or
* when we added the last pad not from a demuxer.
*
* When this method is called, it is possible that some dynamic plugging is
* going on in streaming threads. We decrement the dynamic counter and when it
* reaches zero, we check if all of our pads are blocked before we finally
* expose the group.
*
* MT safe
*/
static void
gst_decode_group_set_complete (GstDecodeGroup * group)
{
gboolean expose = FALSE;
GST_LOG_OBJECT (group->dbin, "Setting group %p to COMPLETE", group);
GROUP_MUTEX_LOCK (group);
group->complete = TRUE;
if (group->nbdynamic > 0)
group->nbdynamic--;
gst_decode_group_check_if_blocked (group);
expose = gst_decode_group_check_if_blocked (group);
GROUP_MUTEX_UNLOCK (group);
/* don't do anything if not blocked completely */
if (expose) {
DECODE_BIN_LOCK (group->dbin);
if (!gst_decode_group_expose (group))
GST_WARNING_OBJECT (group->dbin, "Couldn't expose group");
DECODE_BIN_UNLOCK (group->dbin);
}
}
/*************************
@ -2350,15 +2417,27 @@ gst_decode_pad_init (GstDecodePad * pad)
static void
source_pad_blocked_cb (GstDecodePad * dpad, gboolean blocked, gpointer unused)
{
GST_LOG_OBJECT (dpad, "blocked:%d, dpad->group:%p", blocked, dpad->group);
GstDecodeGroup *group;
GstDecodeBin *dbin;
gboolean expose = FALSE;
group = dpad->group;
dbin = group->dbin;
GST_LOG_OBJECT (dpad, "blocked:%d, dpad->group:%p", blocked, group);
GROUP_MUTEX_LOCK (group);
/* Update this GstDecodePad status */
dpad->blocked = blocked;
if (blocked)
expose = gst_decode_group_check_if_blocked (group);
GROUP_MUTEX_UNLOCK (group);
if (blocked) {
GROUP_MUTEX_LOCK (dpad->group);
gst_decode_group_check_if_blocked (dpad->group);
GROUP_MUTEX_UNLOCK (dpad->group);
if (expose) {
DECODE_BIN_LOCK (dbin);
if (!gst_decode_group_expose (group))
GST_WARNING_OBJECT (dbin, "Couldn't expose group");
DECODE_BIN_UNLOCK (dbin);
}
}
@ -2368,15 +2447,12 @@ source_pad_event_probe (GstPad * pad, GstEvent * event, GstDecodePad * dpad)
GST_LOG_OBJECT (pad, "%s dpad:%p", GST_EVENT_TYPE_NAME (event), dpad);
if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
/* Set our pad as drained */
dpad->drained = TRUE;
GST_DEBUG_OBJECT (pad, "we received EOS");
/* Check if all pads are drained. If there is a next group to expose, we
* will remove the ghostpad of the current group first, which unlinks the
* peer and so drops the EOS. */
gst_decode_group_check_if_drained (dpad->group);
gst_decode_pad_handle_eos (dpad);
}
/* never drop events */
return TRUE;