diff --git a/ChangeLog b/ChangeLog index 2b47fa74a7..1c05230e3b 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,10 @@ +2005-10-11 Julien MOUTTE + + * gst/base/gstcollectpads.c: (gst_collectpads_init), + (gst_collectpads_add_pad), (gst_collectpads_pop), + (gst_collectpads_event), (gst_collectpads_chain): + * gst/base/gstcollectpads.h: Handle EOS correctly. + 2005-10-11 Thomas Vander Stichele * tools/gst-launch.c: (main): diff --git a/gst/base/gstcollectpads.c b/gst/base/gstcollectpads.c index 89cb7b7519..497118524a 100644 --- a/gst/base/gstcollectpads.c +++ b/gst/base/gstcollectpads.c @@ -52,6 +52,7 @@ #include "gstcollectpads.h" static GstFlowReturn gst_collectpads_chain (GstPad * pad, GstBuffer * buffer); +static gboolean gst_collectpads_event (GstPad * pad, GstEvent * event); static void gst_collectpads_class_init (GstCollectPadsClass * klass); static void gst_collectpads_init (GstCollectPads * pads); @@ -106,6 +107,7 @@ gst_collectpads_init (GstCollectPads * pads) pads->cookie = 0; pads->numpads = 0; pads->queuedpads = 0; + pads->eospads = 0; pads->started = FALSE; } @@ -198,6 +200,7 @@ gst_collectpads_add_pad (GstCollectPads * pads, GstPad * pad, guint size) GST_LOCK (pads); pads->data = g_slist_append (pads->data, data); gst_pad_set_chain_function (pad, gst_collectpads_chain); + gst_pad_set_event_function (pad, gst_collectpads_event); gst_pad_set_element_private (pad, data); pads->numpads++; pads->cookie++; @@ -390,9 +393,12 @@ gst_collectpads_pop (GstCollectPads * pads, GstCollectData * data) GstBuffer *result; result = data->buffer; - gst_buffer_replace (&data->buffer, NULL); - data->pos = 0; - pads->queuedpads--; + if (result) { + gst_buffer_replace (&data->buffer, NULL); + data->pos = 0; + pads->queuedpads--; + + } GST_COLLECTPADS_SIGNAL (pads); @@ -510,6 +516,56 @@ gst_collectpads_flush (GstCollectPads * pads, GstCollectData * data, guint size) return flushsize; } +static gboolean +gst_collectpads_event (GstPad * pad, GstEvent * event) +{ + GstCollectData *data; + GstCollectPads *pads; + + /* some magic to get the managing collectpads */ + data = (GstCollectData *) gst_pad_get_element_private (pad); + if (data == NULL) + goto not_ours; + + pads = data->collect; + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_EOS: + { + GstFlowReturn ret = GST_FLOW_OK; + + GST_LOCK (pads); + + pads->eospads++; + + /* if all pads are EOS and we have a function, call it */ + if ((pads->eospads == pads->numpads) && pads->func) { + ret = pads->func (pads, pads->user_data); + } + + GST_UNLOCK (pads); + + /* We eat this event */ + gst_event_unref (event); + return TRUE; + break; + } + default: + goto beach; + } + +beach: + return gst_pad_event_default (pad, event); + + /* ERRORS */ +not_ours: + { + GST_DEBUG ("collectpads not ours"); + return FALSE; + } +} + + static GstFlowReturn gst_collectpads_chain (GstPad * pad, GstBuffer * buffer) { @@ -534,6 +590,12 @@ gst_collectpads_chain (GstPad * pad, GstBuffer * buffer) if (!pads->started) goto not_started; + /* Call the collected callback until a pad with a buffer is popped. */ + while (((pads->queuedpads + pads->eospads) == pads->numpads) && + pads->func != NULL) { + ret = pads->func (pads, pads->user_data); + } + /* queue buffer on this pad, block if filled */ while (data->buffer != NULL) { GST_COLLECTPADS_WAIT (pads); @@ -545,7 +607,7 @@ gst_collectpads_chain (GstPad * pad, GstBuffer * buffer) gst_buffer_replace (&data->buffer, buffer); /* if all pads have data and we have a function, call it */ - if ((pads->queuedpads == pads->numpads) && pads->func) { + if (((pads->queuedpads + pads->eospads) == pads->numpads) && pads->func) { ret = pads->func (pads, pads->user_data); } else { ret = GST_FLOW_OK; diff --git a/gst/base/gstcollectpads.h b/gst/base/gstcollectpads.h index 4f6c5db051..0449f33836 100644 --- a/gst/base/gstcollectpads.h +++ b/gst/base/gstcollectpads.h @@ -42,7 +42,6 @@ typedef struct _GstCollectData GstPad *pad; GstBuffer *buffer; guint pos; - gboolean eos; } GstCollectData; /* function will be called when all pads have data */ @@ -68,6 +67,7 @@ struct _GstCollectPads { guint numpads; /* number of pads */ guint queuedpads; /* number of pads with a buffer */ + guint eospads; /* number of pads that are EOS */ gboolean started; diff --git a/libs/gst/base/gstcollectpads.c b/libs/gst/base/gstcollectpads.c index 89cb7b7519..497118524a 100644 --- a/libs/gst/base/gstcollectpads.c +++ b/libs/gst/base/gstcollectpads.c @@ -52,6 +52,7 @@ #include "gstcollectpads.h" static GstFlowReturn gst_collectpads_chain (GstPad * pad, GstBuffer * buffer); +static gboolean gst_collectpads_event (GstPad * pad, GstEvent * event); static void gst_collectpads_class_init (GstCollectPadsClass * klass); static void gst_collectpads_init (GstCollectPads * pads); @@ -106,6 +107,7 @@ gst_collectpads_init (GstCollectPads * pads) pads->cookie = 0; pads->numpads = 0; pads->queuedpads = 0; + pads->eospads = 0; pads->started = FALSE; } @@ -198,6 +200,7 @@ gst_collectpads_add_pad (GstCollectPads * pads, GstPad * pad, guint size) GST_LOCK (pads); pads->data = g_slist_append (pads->data, data); gst_pad_set_chain_function (pad, gst_collectpads_chain); + gst_pad_set_event_function (pad, gst_collectpads_event); gst_pad_set_element_private (pad, data); pads->numpads++; pads->cookie++; @@ -390,9 +393,12 @@ gst_collectpads_pop (GstCollectPads * pads, GstCollectData * data) GstBuffer *result; result = data->buffer; - gst_buffer_replace (&data->buffer, NULL); - data->pos = 0; - pads->queuedpads--; + if (result) { + gst_buffer_replace (&data->buffer, NULL); + data->pos = 0; + pads->queuedpads--; + + } GST_COLLECTPADS_SIGNAL (pads); @@ -510,6 +516,56 @@ gst_collectpads_flush (GstCollectPads * pads, GstCollectData * data, guint size) return flushsize; } +static gboolean +gst_collectpads_event (GstPad * pad, GstEvent * event) +{ + GstCollectData *data; + GstCollectPads *pads; + + /* some magic to get the managing collectpads */ + data = (GstCollectData *) gst_pad_get_element_private (pad); + if (data == NULL) + goto not_ours; + + pads = data->collect; + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_EOS: + { + GstFlowReturn ret = GST_FLOW_OK; + + GST_LOCK (pads); + + pads->eospads++; + + /* if all pads are EOS and we have a function, call it */ + if ((pads->eospads == pads->numpads) && pads->func) { + ret = pads->func (pads, pads->user_data); + } + + GST_UNLOCK (pads); + + /* We eat this event */ + gst_event_unref (event); + return TRUE; + break; + } + default: + goto beach; + } + +beach: + return gst_pad_event_default (pad, event); + + /* ERRORS */ +not_ours: + { + GST_DEBUG ("collectpads not ours"); + return FALSE; + } +} + + static GstFlowReturn gst_collectpads_chain (GstPad * pad, GstBuffer * buffer) { @@ -534,6 +590,12 @@ gst_collectpads_chain (GstPad * pad, GstBuffer * buffer) if (!pads->started) goto not_started; + /* Call the collected callback until a pad with a buffer is popped. */ + while (((pads->queuedpads + pads->eospads) == pads->numpads) && + pads->func != NULL) { + ret = pads->func (pads, pads->user_data); + } + /* queue buffer on this pad, block if filled */ while (data->buffer != NULL) { GST_COLLECTPADS_WAIT (pads); @@ -545,7 +607,7 @@ gst_collectpads_chain (GstPad * pad, GstBuffer * buffer) gst_buffer_replace (&data->buffer, buffer); /* if all pads have data and we have a function, call it */ - if ((pads->queuedpads == pads->numpads) && pads->func) { + if (((pads->queuedpads + pads->eospads) == pads->numpads) && pads->func) { ret = pads->func (pads, pads->user_data); } else { ret = GST_FLOW_OK; diff --git a/libs/gst/base/gstcollectpads.h b/libs/gst/base/gstcollectpads.h index 4f6c5db051..0449f33836 100644 --- a/libs/gst/base/gstcollectpads.h +++ b/libs/gst/base/gstcollectpads.h @@ -42,7 +42,6 @@ typedef struct _GstCollectData GstPad *pad; GstBuffer *buffer; guint pos; - gboolean eos; } GstCollectData; /* function will be called when all pads have data */ @@ -68,6 +67,7 @@ struct _GstCollectPads { guint numpads; /* number of pads */ guint queuedpads; /* number of pads with a buffer */ + guint eospads; /* number of pads that are EOS */ gboolean started;