libs/gst/base/gstcollectpads.c: Refactoring of collectpads. This version removes a lot of races without touching API/...

Original commit message from CVS:
2005-12-14  Julien MOUTTE  <julien@moutte.net>

* libs/gst/base/gstcollectpads.c: (gst_collect_pads_base_init),
(gst_collect_pads_remove_pad), (gst_collect_pads_is_collected),
(gst_collect_pads_event), (gst_collect_pads_chain): Refactoring
of collectpads. This version removes a lot of races without
touching API/ABI. Yay !
This commit is contained in:
Julien Moutte 2005-12-14 17:08:36 +00:00
parent bcdfacaf5d
commit ed8fcd46b2
2 changed files with 68 additions and 28 deletions

View file

@ -1,3 +1,11 @@
2005-12-14 Julien MOUTTE <julien@moutte.net>
* libs/gst/base/gstcollectpads.c: (gst_collect_pads_base_init),
(gst_collect_pads_remove_pad), (gst_collect_pads_is_collected),
(gst_collect_pads_event), (gst_collect_pads_chain): Refactoring
of collectpads. This version removes a lot of races without
touching API/ABI. Yay !
2005-12-14 Jan Schmidt <thaytan@mad.scientist.com>
* gst/gstpad.c: (gst_pad_activate_pull), (gst_pad_link_prepare):

View file

@ -246,6 +246,8 @@ gst_collect_pads_remove_pad (GstCollectPads * pads, GstPad * pad)
pads->data = g_slist_delete_link (pads->data, list);
}
pads->numpads--;
/* FIXME : if the pad has data queued we should decrease the number of
queuedpads */
pads->cookie++;
GST_OBJECT_UNLOCK (pads);
@ -561,6 +563,46 @@ gst_collect_pads_flush (GstCollectPads * pads, GstCollectData * data,
return flushsize;
}
static gboolean
gst_collect_pads_is_collected (GstCollectPads * pads, GstFlowReturn * ret)
{
GstFlowReturn flow_ret = GST_FLOW_OK;
gboolean res = FALSE;
g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), FALSE);
/* If all our pads are EOS just collect once */
if (pads->eospads == pads->numpads) {
GST_DEBUG ("All active pads are EOS, calling %s",
GST_DEBUG_FUNCPTR_NAME (pads->func));
flow_ret = pads->func (pads, pads->user_data);
res = TRUE;
goto beach;
}
/* We call the collected function as long as our condition matches.
FIXME: should we error out if the collect function did not pop anything ?
we can get a busy loop here if the element does not pop from the collect
function */
while (((pads->queuedpads + pads->eospads) >= pads->numpads) && pads->func) {
GST_DEBUG ("All active pads have data, calling %s",
GST_DEBUG_FUNCPTR_NAME (pads->func));
flow_ret = pads->func (pads, pads->user_data);
res = TRUE;
}
beach:
if (!res) {
GST_DEBUG ("Not all active pads have data, continuing");
}
if (ret) {
*ret = flow_ret;
}
return res;
}
static gboolean
gst_collect_pads_event (GstPad * pad, GstEvent * event)
{
@ -580,23 +622,17 @@ gst_collect_pads_event (GstPad * pad, GstEvent * event)
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_EOS:
{
GstFlowReturn ret = GST_FLOW_OK;
GST_OBJECT_LOCK (pads);
pads->eospads++;
/* if all pads are EOS and we have a function, call it */
if ((pads->eospads == pads->numpads) && pads->func) {
ret = pads->func (pads, pads->user_data);
}
gst_collect_pads_is_collected (pads, NULL);
GST_OBJECT_UNLOCK (pads);
/* We eat this event */
gst_event_unref (event);
return TRUE;
break;
}
case GST_EVENT_NEWSEGMENT:
{
@ -627,7 +663,11 @@ not_ours:
}
}
/* For each buffer we receive we check if our collected condition is reached
and if so we call the collected function. When this is done we check if
data has been unqueued. If data is still queued we wait holding the stream
lock to make sure no EOS event can happen while we are ready to be
collected */
static GstFlowReturn
gst_collect_pads_chain (GstPad * pad, GstBuffer * buffer)
{
@ -652,13 +692,20 @@ gst_collect_pads_chain (GstPad * pad, GstBuffer * buffer)
if (!pads->started)
goto not_started;
/* Call the collected callback until a pad with a buffer is popped. */
while (((pads->queuedpads + pads->eospads) == pads->numpads) && pads->func)
ret = pads->func (pads, pads->user_data);
GST_DEBUG ("Queuing buffer %p for pad %s:%s", buffer,
GST_DEBUG_PAD_NAME (pad));
/* queue buffer on this pad, block if filled */
/* One more pad has data queued */
pads->queuedpads++;
gst_buffer_replace (&data->buffer, buffer);
/* Check if our collected condition is matched and call the collected function
if it is */
gst_collect_pads_is_collected (pads, &ret);
/* We still have data queued on this pad, wait for something to happen */
while (data->buffer != NULL) {
GST_DEBUG ("Pad %s:%s already has a buffer queued, waiting",
GST_DEBUG ("Pad %s:%s has a buffer queued, waiting",
GST_DEBUG_PAD_NAME (pad));
GST_COLLECT_PADS_WAIT (pads);
GST_DEBUG ("Pad %s:%s resuming", GST_DEBUG_PAD_NAME (pad));
@ -667,21 +714,6 @@ gst_collect_pads_chain (GstPad * pad, GstBuffer * buffer)
goto not_started;
}
GST_DEBUG ("Queuing buffer %p for pad %s:%s", buffer,
GST_DEBUG_PAD_NAME (pad));
pads->queuedpads++;
gst_buffer_replace (&data->buffer, buffer);
/* if all pads have data and we have a function, call it */
if (((pads->queuedpads + pads->eospads) == pads->numpads) && pads->func) {
GST_DEBUG ("All active pads have data, calling %s",
GST_DEBUG_FUNCPTR_NAME (pads->func));
ret = pads->func (pads, pads->user_data);
} else {
GST_DEBUG ("Not all active pads have data, continuing");
ret = GST_FLOW_OK;
}
GST_OBJECT_UNLOCK (pads);
return ret;