gst/base/gstcollectpads.*: Handle EOS correctly.

Original commit message from CVS:
2005-10-11  Julien MOUTTE  <julien@moutte.net>

* gst/base/gstcollectpads.c: (gst_collectpads_init),
(gst_collectpads_add_pad), (gst_collectpads_pop),
(gst_collectpads_event), (gst_collectpads_chain):
* gst/base/gstcollectpads.h: Handle EOS correctly.
This commit is contained in:
Julien Moutte 2005-10-11 16:25:35 +00:00
parent 6e59ecdf7b
commit 5706cb69b4
5 changed files with 141 additions and 10 deletions

View file

@ -1,3 +1,10 @@
2005-10-11 Julien MOUTTE <julien@moutte.net>
* gst/base/gstcollectpads.c: (gst_collectpads_init),
(gst_collectpads_add_pad), (gst_collectpads_pop),
(gst_collectpads_event), (gst_collectpads_chain):
* gst/base/gstcollectpads.h: Handle EOS correctly.
2005-10-11 Thomas Vander Stichele <thomas at apestaart dot org>
* tools/gst-launch.c: (main):

View file

@ -52,6 +52,7 @@
#include "gstcollectpads.h"
static GstFlowReturn gst_collectpads_chain (GstPad * pad, GstBuffer * buffer);
static gboolean gst_collectpads_event (GstPad * pad, GstEvent * event);
static void gst_collectpads_class_init (GstCollectPadsClass * klass);
static void gst_collectpads_init (GstCollectPads * pads);
@ -106,6 +107,7 @@ gst_collectpads_init (GstCollectPads * pads)
pads->cookie = 0;
pads->numpads = 0;
pads->queuedpads = 0;
pads->eospads = 0;
pads->started = FALSE;
}
@ -198,6 +200,7 @@ gst_collectpads_add_pad (GstCollectPads * pads, GstPad * pad, guint size)
GST_LOCK (pads);
pads->data = g_slist_append (pads->data, data);
gst_pad_set_chain_function (pad, gst_collectpads_chain);
gst_pad_set_event_function (pad, gst_collectpads_event);
gst_pad_set_element_private (pad, data);
pads->numpads++;
pads->cookie++;
@ -390,9 +393,12 @@ gst_collectpads_pop (GstCollectPads * pads, GstCollectData * data)
GstBuffer *result;
result = data->buffer;
gst_buffer_replace (&data->buffer, NULL);
data->pos = 0;
pads->queuedpads--;
if (result) {
gst_buffer_replace (&data->buffer, NULL);
data->pos = 0;
pads->queuedpads--;
}
GST_COLLECTPADS_SIGNAL (pads);
@ -510,6 +516,56 @@ gst_collectpads_flush (GstCollectPads * pads, GstCollectData * data, guint size)
return flushsize;
}
static gboolean
gst_collectpads_event (GstPad * pad, GstEvent * event)
{
GstCollectData *data;
GstCollectPads *pads;
/* some magic to get the managing collectpads */
data = (GstCollectData *) gst_pad_get_element_private (pad);
if (data == NULL)
goto not_ours;
pads = data->collect;
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_EOS:
{
GstFlowReturn ret = GST_FLOW_OK;
GST_LOCK (pads);
pads->eospads++;
/* if all pads are EOS and we have a function, call it */
if ((pads->eospads == pads->numpads) && pads->func) {
ret = pads->func (pads, pads->user_data);
}
GST_UNLOCK (pads);
/* We eat this event */
gst_event_unref (event);
return TRUE;
break;
}
default:
goto beach;
}
beach:
return gst_pad_event_default (pad, event);
/* ERRORS */
not_ours:
{
GST_DEBUG ("collectpads not ours");
return FALSE;
}
}
static GstFlowReturn
gst_collectpads_chain (GstPad * pad, GstBuffer * buffer)
{
@ -534,6 +590,12 @@ gst_collectpads_chain (GstPad * pad, GstBuffer * buffer)
if (!pads->started)
goto not_started;
/* Call the collected callback until a pad with a buffer is popped. */
while (((pads->queuedpads + pads->eospads) == pads->numpads) &&
pads->func != NULL) {
ret = pads->func (pads, pads->user_data);
}
/* queue buffer on this pad, block if filled */
while (data->buffer != NULL) {
GST_COLLECTPADS_WAIT (pads);
@ -545,7 +607,7 @@ gst_collectpads_chain (GstPad * pad, GstBuffer * buffer)
gst_buffer_replace (&data->buffer, buffer);
/* if all pads have data and we have a function, call it */
if ((pads->queuedpads == pads->numpads) && pads->func) {
if (((pads->queuedpads + pads->eospads) == pads->numpads) && pads->func) {
ret = pads->func (pads, pads->user_data);
} else {
ret = GST_FLOW_OK;

View file

@ -42,7 +42,6 @@ typedef struct _GstCollectData
GstPad *pad;
GstBuffer *buffer;
guint pos;
gboolean eos;
} GstCollectData;
/* function will be called when all pads have data */
@ -68,6 +67,7 @@ struct _GstCollectPads {
guint numpads; /* number of pads */
guint queuedpads; /* number of pads with a buffer */
guint eospads; /* number of pads that are EOS */
gboolean started;

View file

@ -52,6 +52,7 @@
#include "gstcollectpads.h"
static GstFlowReturn gst_collectpads_chain (GstPad * pad, GstBuffer * buffer);
static gboolean gst_collectpads_event (GstPad * pad, GstEvent * event);
static void gst_collectpads_class_init (GstCollectPadsClass * klass);
static void gst_collectpads_init (GstCollectPads * pads);
@ -106,6 +107,7 @@ gst_collectpads_init (GstCollectPads * pads)
pads->cookie = 0;
pads->numpads = 0;
pads->queuedpads = 0;
pads->eospads = 0;
pads->started = FALSE;
}
@ -198,6 +200,7 @@ gst_collectpads_add_pad (GstCollectPads * pads, GstPad * pad, guint size)
GST_LOCK (pads);
pads->data = g_slist_append (pads->data, data);
gst_pad_set_chain_function (pad, gst_collectpads_chain);
gst_pad_set_event_function (pad, gst_collectpads_event);
gst_pad_set_element_private (pad, data);
pads->numpads++;
pads->cookie++;
@ -390,9 +393,12 @@ gst_collectpads_pop (GstCollectPads * pads, GstCollectData * data)
GstBuffer *result;
result = data->buffer;
gst_buffer_replace (&data->buffer, NULL);
data->pos = 0;
pads->queuedpads--;
if (result) {
gst_buffer_replace (&data->buffer, NULL);
data->pos = 0;
pads->queuedpads--;
}
GST_COLLECTPADS_SIGNAL (pads);
@ -510,6 +516,56 @@ gst_collectpads_flush (GstCollectPads * pads, GstCollectData * data, guint size)
return flushsize;
}
static gboolean
gst_collectpads_event (GstPad * pad, GstEvent * event)
{
GstCollectData *data;
GstCollectPads *pads;
/* some magic to get the managing collectpads */
data = (GstCollectData *) gst_pad_get_element_private (pad);
if (data == NULL)
goto not_ours;
pads = data->collect;
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_EOS:
{
GstFlowReturn ret = GST_FLOW_OK;
GST_LOCK (pads);
pads->eospads++;
/* if all pads are EOS and we have a function, call it */
if ((pads->eospads == pads->numpads) && pads->func) {
ret = pads->func (pads, pads->user_data);
}
GST_UNLOCK (pads);
/* We eat this event */
gst_event_unref (event);
return TRUE;
break;
}
default:
goto beach;
}
beach:
return gst_pad_event_default (pad, event);
/* ERRORS */
not_ours:
{
GST_DEBUG ("collectpads not ours");
return FALSE;
}
}
static GstFlowReturn
gst_collectpads_chain (GstPad * pad, GstBuffer * buffer)
{
@ -534,6 +590,12 @@ gst_collectpads_chain (GstPad * pad, GstBuffer * buffer)
if (!pads->started)
goto not_started;
/* Call the collected callback until a pad with a buffer is popped. */
while (((pads->queuedpads + pads->eospads) == pads->numpads) &&
pads->func != NULL) {
ret = pads->func (pads, pads->user_data);
}
/* queue buffer on this pad, block if filled */
while (data->buffer != NULL) {
GST_COLLECTPADS_WAIT (pads);
@ -545,7 +607,7 @@ gst_collectpads_chain (GstPad * pad, GstBuffer * buffer)
gst_buffer_replace (&data->buffer, buffer);
/* if all pads have data and we have a function, call it */
if ((pads->queuedpads == pads->numpads) && pads->func) {
if (((pads->queuedpads + pads->eospads) == pads->numpads) && pads->func) {
ret = pads->func (pads, pads->user_data);
} else {
ret = GST_FLOW_OK;

View file

@ -42,7 +42,6 @@ typedef struct _GstCollectData
GstPad *pad;
GstBuffer *buffer;
guint pos;
gboolean eos;
} GstCollectData;
/* function will be called when all pads have data */
@ -68,6 +67,7 @@ struct _GstCollectPads {
guint numpads; /* number of pads */
guint queuedpads; /* number of pads with a buffer */
guint eospads; /* number of pads that are EOS */
gboolean started;