mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2024-09-24 13:00:36 +00:00
libs/gst/base/gstcollectpads.*: Clean up the mess that is collectpads, add comments and
Original commit message from CVS: * libs/gst/base/gstcollectpads.c: (gst_collect_pads_init), (gst_collect_pads_finalize), (gst_collect_pads_add_pad), (gst_collect_pads_remove_pad), (gst_collect_pads_set_flushing), (gst_collect_pads_start), (gst_collect_pads_stop), (gst_collect_pads_peek), (gst_collect_pads_pop), (gst_collect_pads_available), (gst_collect_pads_read), (gst_collect_pads_flush), (gst_collect_pads_check_pads), (gst_collect_pads_is_collected), (gst_collect_pads_event), (gst_collect_pads_chain): * libs/gst/base/gstcollectpads.h: Clean up the mess that is collectpads, add comments and FIXMEs where needed. Maintain a separate pad list so we can add pads while collecting the other ones. For this we need a new separate lock (see comics). Fix memory leak in finalize. Refactor some weird code to set/unset pad flushing flags, mark with comments. Don't crash in _available, _read, _flush when we're EOS. * tests/check/libs/.cvsignore: Ignore adapter check binary.
This commit is contained in:
parent
b38fcba51b
commit
ce4d0980f1
4 changed files with 262 additions and 96 deletions
25
ChangeLog
25
ChangeLog
|
@ -1,3 +1,28 @@
|
|||
2006-05-09 Wim Taymans <wim@fluendo.com>
|
||||
|
||||
* libs/gst/base/gstcollectpads.c: (gst_collect_pads_init),
|
||||
(gst_collect_pads_finalize), (gst_collect_pads_add_pad),
|
||||
(gst_collect_pads_remove_pad), (gst_collect_pads_set_flushing),
|
||||
(gst_collect_pads_start), (gst_collect_pads_stop),
|
||||
(gst_collect_pads_peek), (gst_collect_pads_pop),
|
||||
(gst_collect_pads_available), (gst_collect_pads_read),
|
||||
(gst_collect_pads_flush), (gst_collect_pads_check_pads),
|
||||
(gst_collect_pads_is_collected), (gst_collect_pads_event),
|
||||
(gst_collect_pads_chain):
|
||||
* libs/gst/base/gstcollectpads.h:
|
||||
Clean up the mess that is collectpads, add comments and
|
||||
FIXMEs where needed.
|
||||
Maintain a separate pad list so we can add pads while
|
||||
collecting the other ones. For this we need a new separate
|
||||
lock (see comics).
|
||||
Fix memory leak in finalize.
|
||||
Refactor some weird code to set/unset pad flushing flags, mark
|
||||
with comments.
|
||||
Don't crash in _available, _read, _flush when we're EOS.
|
||||
|
||||
* tests/check/libs/.cvsignore:
|
||||
Ignore adapter check binary.
|
||||
|
||||
2006-05-09 Tim-Philipp Müller <tim at centricular dot net>
|
||||
|
||||
* gst/gstindex.c: (gst_index_resolver_get_type):
|
||||
|
|
|
@ -107,6 +107,11 @@ gst_collect_pads_init (GstCollectPads * pads, GstCollectPadsClass * g_class)
|
|||
pads->queuedpads = 0;
|
||||
pads->eospads = 0;
|
||||
pads->started = FALSE;
|
||||
|
||||
/* members to manage the pad list */
|
||||
pads->abidata.ABI.pad_lock = g_mutex_new ();
|
||||
pads->abidata.ABI.pad_cookie = 0;
|
||||
pads->abidata.ABI.pad_list = NULL;
|
||||
}
|
||||
|
||||
static void
|
||||
|
@ -117,20 +122,20 @@ gst_collect_pads_finalize (GObject * object)
|
|||
|
||||
gst_collect_pads_stop (pads);
|
||||
g_cond_free (pads->cond);
|
||||
g_mutex_free (pads->abidata.ABI.pad_lock);
|
||||
|
||||
/* Remove pads */
|
||||
for (collected = pads->data; collected; collected = g_slist_next (collected)) {
|
||||
GstCollectData *pdata = (GstCollectData *) collected->data;
|
||||
|
||||
if (pdata->pad) {
|
||||
if (pdata->pad)
|
||||
gst_object_unref (pdata->pad);
|
||||
}
|
||||
|
||||
g_free (pdata);
|
||||
}
|
||||
/* Free pads list */
|
||||
g_slist_free (pads->data);
|
||||
|
||||
/* FIXME, free data */
|
||||
|
||||
G_OBJECT_CLASS (parent_class)->finalize (object);
|
||||
}
|
||||
|
||||
|
@ -209,18 +214,20 @@ gst_collect_pads_add_pad (GstCollectPads * pads, GstPad * pad, guint size)
|
|||
data->collect = pads;
|
||||
data->pad = gst_object_ref (pad);
|
||||
data->buffer = NULL;
|
||||
data->pos = 0;
|
||||
gst_segment_init (&data->segment, GST_FORMAT_UNDEFINED);
|
||||
data->abidata.ABI.flushing = FALSE;
|
||||
data->abidata.ABI.new_segment = FALSE;
|
||||
data->abidata.ABI.eos = FALSE;
|
||||
|
||||
GST_OBJECT_LOCK (pads);
|
||||
pads->data = g_slist_append (pads->data, data);
|
||||
GST_COLLECT_PADS_PAD_LOCK (pads);
|
||||
pads->abidata.ABI.pad_list =
|
||||
g_slist_append (pads->abidata.ABI.pad_list, data);
|
||||
gst_pad_set_chain_function (pad, GST_DEBUG_FUNCPTR (gst_collect_pads_chain));
|
||||
gst_pad_set_event_function (pad, GST_DEBUG_FUNCPTR (gst_collect_pads_event));
|
||||
gst_pad_set_element_private (pad, data);
|
||||
pads->numpads++;
|
||||
pads->cookie++;
|
||||
GST_OBJECT_UNLOCK (pads);
|
||||
pads->abidata.ABI.pad_cookie++;
|
||||
GST_COLLECT_PADS_PAD_UNLOCK (pads);
|
||||
|
||||
return data;
|
||||
}
|
||||
|
@ -254,20 +261,26 @@ gst_collect_pads_remove_pad (GstCollectPads * pads, GstPad * pad)
|
|||
g_return_val_if_fail (pad != NULL, FALSE);
|
||||
g_return_val_if_fail (GST_IS_PAD (pad), FALSE);
|
||||
|
||||
GST_OBJECT_LOCK (pads);
|
||||
list = g_slist_find_custom (pads->data, pad, (GCompareFunc) find_pad);
|
||||
GST_COLLECT_PADS_PAD_LOCK (pads);
|
||||
list =
|
||||
g_slist_find_custom (pads->abidata.ABI.pad_list, pad,
|
||||
(GCompareFunc) find_pad);
|
||||
if (list) {
|
||||
pads->abidata.ABI.pad_list =
|
||||
g_slist_delete_link (pads->abidata.ABI.pad_list, list);
|
||||
/* clear the stuff we configured */
|
||||
gst_pad_set_chain_function (pad, NULL);
|
||||
gst_pad_set_event_function (pad, NULL);
|
||||
/* FIXME, check that freeing the private data does not causes
|
||||
* crashes in the streaming thread */
|
||||
gst_pad_set_element_private (pad, NULL);
|
||||
g_free (list->data);
|
||||
pads->data = g_slist_delete_link (pads->data, list);
|
||||
gst_object_unref (pad);
|
||||
pads->abidata.ABI.pad_cookie++;
|
||||
}
|
||||
pads->numpads--;
|
||||
/* FIXME : if the pad has data queued we should decrease the number of
|
||||
queuedpads */
|
||||
pads->cookie++;
|
||||
GST_OBJECT_UNLOCK (pads);
|
||||
GST_COLLECT_PADS_PAD_UNLOCK (pads);
|
||||
|
||||
return list != NULL;
|
||||
return (list != NULL);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -277,6 +290,8 @@ gst_collect_pads_remove_pad (GstCollectPads * pads, GstPad * pad)
|
|||
*
|
||||
* Check if a pad is active.
|
||||
*
|
||||
* This function is currently not implemented.
|
||||
*
|
||||
* Returns: TRUE if the pad is active.
|
||||
*
|
||||
* MT safe.
|
||||
|
@ -299,8 +314,9 @@ gst_collect_pads_is_active (GstCollectPads * pads, GstPad * pad)
|
|||
* @pads: the collectspads to use
|
||||
*
|
||||
* Collect data on all pads. This function is usually called
|
||||
* from a GstTask function in an element. This function is
|
||||
* currently not implemented.
|
||||
* from a GstTask function in an element.
|
||||
*
|
||||
* This function is currently not implemented.
|
||||
*
|
||||
* Returns: GstFlowReturn of the operation.
|
||||
*
|
||||
|
@ -324,8 +340,9 @@ gst_collect_pads_collect (GstCollectPads * pads)
|
|||
* @length: the length to collect
|
||||
*
|
||||
* Collect data with @offset and @length on all pads. This function
|
||||
* is typically called in the getrange function of an element. This
|
||||
* function is currently not implemented.
|
||||
* is typically called in the getrange function of an element.
|
||||
*
|
||||
* This function is currently not implemented.
|
||||
*
|
||||
* Returns: GstFlowReturn of the operation.
|
||||
*
|
||||
|
@ -343,6 +360,31 @@ gst_collect_pads_collect_range (GstCollectPads * pads, guint64 offset,
|
|||
return GST_FLOW_ERROR;
|
||||
}
|
||||
|
||||
/* FIXME, I think this function is used to work around bad behaviour
|
||||
* of elements that add pads to themselves without activating them.
|
||||
*/
|
||||
static void
|
||||
gst_collect_pads_set_flushing (GstCollectPads * pads, gboolean flushing)
|
||||
{
|
||||
GSList *walk = NULL;
|
||||
|
||||
GST_COLLECT_PADS_PAD_LOCK (pads);
|
||||
/* Update the pads flushing flag */
|
||||
for (walk = pads->data; walk; walk = g_slist_next (walk)) {
|
||||
GstCollectData *cdata = walk->data;
|
||||
|
||||
if (GST_IS_PAD (cdata->pad)) {
|
||||
GST_OBJECT_LOCK (cdata->pad);
|
||||
if (flushing)
|
||||
GST_PAD_SET_FLUSHING (cdata->pad);
|
||||
else
|
||||
GST_PAD_UNSET_FLUSHING (cdata->pad);
|
||||
GST_OBJECT_UNLOCK (cdata->pad);
|
||||
}
|
||||
}
|
||||
GST_COLLECT_PADS_PAD_UNLOCK (pads);
|
||||
}
|
||||
|
||||
/**
|
||||
* gst_collect_pads_start:
|
||||
* @pads: the collectspads to use
|
||||
|
@ -354,27 +396,17 @@ gst_collect_pads_collect_range (GstCollectPads * pads, guint64 offset,
|
|||
void
|
||||
gst_collect_pads_start (GstCollectPads * pads)
|
||||
{
|
||||
GSList *walk = NULL;
|
||||
|
||||
g_return_if_fail (pads != NULL);
|
||||
g_return_if_fail (GST_IS_COLLECT_PADS (pads));
|
||||
|
||||
GST_DEBUG_OBJECT (pads, "starting collect pads");
|
||||
|
||||
/* make sure stop and collect cannot be called anymore */
|
||||
GST_OBJECT_LOCK (pads);
|
||||
/* Set our pads as non flushing */
|
||||
walk = pads->data;
|
||||
while (walk) {
|
||||
GstCollectData *cdata = walk->data;
|
||||
|
||||
if (GST_IS_PAD (cdata->pad)) {
|
||||
GST_OBJECT_LOCK (cdata->pad);
|
||||
GST_PAD_UNSET_FLUSHING (cdata->pad);
|
||||
GST_OBJECT_UNLOCK (cdata->pad);
|
||||
}
|
||||
/* make pads streamable */
|
||||
gst_collect_pads_set_flushing (pads, FALSE);
|
||||
|
||||
walk = g_slist_next (walk);
|
||||
}
|
||||
/* Start collect pads */
|
||||
pads->started = TRUE;
|
||||
GST_OBJECT_UNLOCK (pads);
|
||||
|
@ -392,31 +424,22 @@ gst_collect_pads_start (GstCollectPads * pads)
|
|||
void
|
||||
gst_collect_pads_stop (GstCollectPads * pads)
|
||||
{
|
||||
GSList *walk = NULL;
|
||||
|
||||
g_return_if_fail (pads != NULL);
|
||||
g_return_if_fail (GST_IS_COLLECT_PADS (pads));
|
||||
|
||||
GST_DEBUG_OBJECT (pads, "stopping collect pads");
|
||||
|
||||
/* make sure collect and start cannot be called anymore */
|
||||
GST_OBJECT_LOCK (pads);
|
||||
/* Set our pads as flushing */
|
||||
walk = pads->data;
|
||||
while (walk) {
|
||||
GstCollectData *cdata = walk->data;
|
||||
|
||||
if (GST_IS_PAD (cdata->pad)) {
|
||||
GST_OBJECT_LOCK (cdata->pad);
|
||||
GST_PAD_SET_FLUSHING (cdata->pad);
|
||||
GST_OBJECT_UNLOCK (cdata->pad);
|
||||
}
|
||||
/* make pads not accept data anymore */
|
||||
gst_collect_pads_set_flushing (pads, TRUE);
|
||||
|
||||
walk = g_slist_next (walk);
|
||||
}
|
||||
/* Stop collect pads */
|
||||
pads->started = FALSE;
|
||||
/* Wake them up */
|
||||
/* Wake them up so then can end the chain functions. */
|
||||
GST_COLLECT_PADS_BROADCAST (pads);
|
||||
|
||||
GST_OBJECT_UNLOCK (pads);
|
||||
}
|
||||
|
||||
|
@ -443,9 +466,7 @@ gst_collect_pads_peek (GstCollectPads * pads, GstCollectData * data)
|
|||
g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL);
|
||||
g_return_val_if_fail (data != NULL, NULL);
|
||||
|
||||
result = data->buffer;
|
||||
|
||||
if (result)
|
||||
if ((result = data->buffer))
|
||||
gst_buffer_ref (result);
|
||||
|
||||
GST_DEBUG ("Peeking at pad %s:%s: buffer=%p",
|
||||
|
@ -478,11 +499,11 @@ gst_collect_pads_pop (GstCollectPads * pads, GstCollectData * data)
|
|||
g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL);
|
||||
g_return_val_if_fail (data != NULL, NULL);
|
||||
|
||||
result = data->buffer;
|
||||
if (result) {
|
||||
if ((result = data->buffer)) {
|
||||
buffer_p = &data->buffer;
|
||||
gst_buffer_replace (buffer_p, NULL);
|
||||
data->pos = 0;
|
||||
/* one less pad with queued data now */
|
||||
pads->queuedpads--;
|
||||
}
|
||||
|
||||
|
@ -521,18 +542,30 @@ gst_collect_pads_available (GstCollectPads * pads)
|
|||
|
||||
for (collected = pads->data; collected; collected = g_slist_next (collected)) {
|
||||
GstCollectData *pdata;
|
||||
GstBuffer *buffer;
|
||||
gint size;
|
||||
|
||||
pdata = (GstCollectData *) collected->data;
|
||||
|
||||
if (pdata->buffer == NULL)
|
||||
/* ignore pad with EOS */
|
||||
if (pdata->abidata.ABI.eos)
|
||||
continue;
|
||||
|
||||
/* an empty buffer not EOS is weird */
|
||||
if ((buffer = pdata->buffer) == NULL)
|
||||
goto not_filled;
|
||||
|
||||
size = GST_BUFFER_SIZE (pdata->buffer) - pdata->pos;
|
||||
/* this is the size left of the buffer */
|
||||
size = GST_BUFFER_SIZE (buffer) - pdata->pos;
|
||||
|
||||
/* need to return the min of all available data */
|
||||
if (size < result)
|
||||
result = size;
|
||||
}
|
||||
/* nothing changed, all must be EOS then, return 0 */
|
||||
if (result == G_MAXUINT)
|
||||
result = 0;
|
||||
|
||||
return result;
|
||||
|
||||
not_filled:
|
||||
|
@ -565,15 +598,20 @@ gst_collect_pads_read (GstCollectPads * pads, GstCollectData * data,
|
|||
guint8 ** bytes, guint size)
|
||||
{
|
||||
guint readsize;
|
||||
GstBuffer *buffer;
|
||||
|
||||
g_return_val_if_fail (pads != NULL, 0);
|
||||
g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), 0);
|
||||
g_return_val_if_fail (data != NULL, 0);
|
||||
g_return_val_if_fail (bytes != NULL, 0);
|
||||
|
||||
readsize = MIN (size, GST_BUFFER_SIZE (data->buffer) - data->pos);
|
||||
/* no buffer, must be EOS */
|
||||
if ((buffer = data->buffer) == NULL)
|
||||
return 0;
|
||||
|
||||
*bytes = GST_BUFFER_DATA (data->buffer) + data->pos;
|
||||
readsize = MIN (size, GST_BUFFER_SIZE (buffer) - data->pos);
|
||||
|
||||
*bytes = GST_BUFFER_DATA (buffer) + data->pos;
|
||||
|
||||
return readsize;
|
||||
}
|
||||
|
@ -599,18 +637,25 @@ gst_collect_pads_flush (GstCollectPads * pads, GstCollectData * data,
|
|||
guint size)
|
||||
{
|
||||
guint flushsize;
|
||||
GstBuffer *buffer;
|
||||
|
||||
g_return_val_if_fail (pads != NULL, 0);
|
||||
g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), 0);
|
||||
g_return_val_if_fail (data != NULL, 0);
|
||||
|
||||
flushsize = MIN (size, GST_BUFFER_SIZE (data->buffer) - data->pos);
|
||||
/* no buffer, must be EOS */
|
||||
if ((buffer = data->buffer) == NULL)
|
||||
return 0;
|
||||
|
||||
/* this is what we can flush at max */
|
||||
flushsize = MIN (size, GST_BUFFER_SIZE (buffer) - data->pos);
|
||||
|
||||
data->pos += size;
|
||||
|
||||
if (data->pos >= GST_BUFFER_SIZE (data->buffer)) {
|
||||
if (data->pos >= GST_BUFFER_SIZE (buffer)) {
|
||||
GstBuffer *buf;
|
||||
|
||||
/* _pop will also reset data->pos to 0 */
|
||||
buf = gst_collect_pads_pop (pads, data);
|
||||
gst_buffer_unref (buf);
|
||||
}
|
||||
|
@ -618,6 +663,57 @@ gst_collect_pads_flush (GstCollectPads * pads, GstCollectData * data,
|
|||
return flushsize;
|
||||
}
|
||||
|
||||
/* see if pads were added or removed and update our stats. Any pad
|
||||
* added after releasing the PAD_LOCK will get collected in the next
|
||||
* round.
|
||||
*
|
||||
* We can do a quick check by checking the cookies, that get changed
|
||||
* whenever the pad list is updated.
|
||||
*
|
||||
* Must be called with LOCK.
|
||||
*/
|
||||
static void
|
||||
gst_collect_pads_check_pads (GstCollectPads * pads)
|
||||
{
|
||||
/* the master list and cookie are protected with the PAD_LOCK */
|
||||
GST_COLLECT_PADS_PAD_LOCK (pads);
|
||||
if (pads->abidata.ABI.pad_cookie != pads->cookie) {
|
||||
GSList *collected;
|
||||
|
||||
/* clear list and stats */
|
||||
pads->data = NULL;
|
||||
pads->numpads = 0;
|
||||
pads->queuedpads = 0;
|
||||
pads->eospads = 0;
|
||||
|
||||
/* loop over the master pad list */
|
||||
collected = pads->abidata.ABI.pad_list;
|
||||
for (; collected; collected = g_slist_next (collected)) {
|
||||
GstCollectData *data;
|
||||
|
||||
/* update the stats */
|
||||
pads->numpads++;
|
||||
data = collected->data;
|
||||
if (data->buffer)
|
||||
pads->queuedpads++;
|
||||
if (data->abidata.ABI.eos)
|
||||
pads->eospads++;
|
||||
|
||||
/* add to the list of pads to collect */
|
||||
pads->data = g_slist_prepend (pads->data, data);
|
||||
}
|
||||
/* and update the cookie */
|
||||
pads->cookie = pads->abidata.ABI.pad_cookie;
|
||||
}
|
||||
GST_COLLECT_PADS_PAD_UNLOCK (pads);
|
||||
}
|
||||
|
||||
/* checks if all the pads are collected and call the collectfunction
|
||||
*
|
||||
* Should be called with LOCK.
|
||||
*
|
||||
* Returns: TRUE if the collectfunction was called, FALSE otherwise.
|
||||
*/
|
||||
static gboolean
|
||||
gst_collect_pads_is_collected (GstCollectPads * pads, GstFlowReturn * ret)
|
||||
{
|
||||
|
@ -625,8 +721,13 @@ gst_collect_pads_is_collected (GstCollectPads * pads, GstFlowReturn * ret)
|
|||
gboolean res = FALSE;
|
||||
|
||||
g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), FALSE);
|
||||
g_return_val_if_fail (pads->func != NULL, FALSE);
|
||||
|
||||
/* If all our pads are EOS just collect once */
|
||||
/* check for new pads, update stats etc.. */
|
||||
gst_collect_pads_check_pads (pads);
|
||||
|
||||
/* If all our pads are EOS just collect once to let the element
|
||||
* do its final EOS handling. */
|
||||
if (pads->eospads == pads->numpads) {
|
||||
GST_DEBUG ("All active pads (%d) are EOS, calling %s",
|
||||
pads->numpads, GST_DEBUG_FUNCPTR_NAME (pads->func));
|
||||
|
@ -641,7 +742,7 @@ gst_collect_pads_is_collected (GstCollectPads * pads, GstFlowReturn * ret)
|
|||
function
|
||||
FIXME: Shouldn't we also check gst_pad_is_blocked () somewhere
|
||||
*/
|
||||
while (((pads->queuedpads + pads->eospads) >= pads->numpads) && pads->func) {
|
||||
while (((pads->queuedpads + pads->eospads) >= pads->numpads)) {
|
||||
GST_DEBUG ("All active pads (%d) have data, calling %s",
|
||||
pads->numpads, GST_DEBUG_FUNCPTR_NAME (pads->func));
|
||||
flow_ret = pads->func (pads, pads->user_data);
|
||||
|
@ -657,9 +758,8 @@ beach:
|
|||
GST_DEBUG ("Not all active pads (%d) have data, continuing", pads->numpads);
|
||||
}
|
||||
|
||||
if (ret) {
|
||||
if (ret)
|
||||
*ret = flow_ret;
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
@ -667,6 +767,7 @@ beach:
|
|||
static gboolean
|
||||
gst_collect_pads_event (GstPad * pad, GstEvent * event)
|
||||
{
|
||||
gboolean res;
|
||||
GstCollectData *data;
|
||||
GstCollectPads *pads;
|
||||
|
||||
|
@ -675,6 +776,8 @@ gst_collect_pads_event (GstPad * pad, GstEvent * event)
|
|||
if (data == NULL)
|
||||
goto not_ours;
|
||||
|
||||
res = TRUE;
|
||||
|
||||
pads = data->collect;
|
||||
|
||||
GST_DEBUG ("Got %s event on pad %s:%s", GST_EVENT_TYPE_NAME (event),
|
||||
|
@ -686,15 +789,16 @@ gst_collect_pads_event (GstPad * pad, GstEvent * event)
|
|||
/* forward event to unblock is_collected */
|
||||
gst_pad_event_default (pad, event);
|
||||
|
||||
/* now unblock the chain function
|
||||
no cond per pad, so they all unblock, non-flushing block again */
|
||||
/* now unblock the chain function.
|
||||
* no cond per pad, so they all unblock,
|
||||
* non-flushing block again */
|
||||
GST_OBJECT_LOCK (pads);
|
||||
data->abidata.ABI.flushing = TRUE;
|
||||
GST_COLLECT_PADS_BROADCAST (pads);
|
||||
GST_OBJECT_UNLOCK (pads);
|
||||
|
||||
/* event already cleaned up by forwarding */
|
||||
return TRUE;
|
||||
goto done;
|
||||
}
|
||||
case GST_EVENT_FLUSH_STOP:
|
||||
{
|
||||
|
@ -702,31 +806,43 @@ gst_collect_pads_event (GstPad * pad, GstEvent * event)
|
|||
GST_OBJECT_LOCK (pads);
|
||||
data->abidata.ABI.flushing = FALSE;
|
||||
gst_collect_pads_pop (pads, data);
|
||||
/* if the pad was EOS, remove the EOS flag and
|
||||
* decrement the number of eospads */
|
||||
if (data->abidata.ABI.eos == TRUE) {
|
||||
pads->eospads--;
|
||||
data->abidata.ABI.eos = FALSE;
|
||||
}
|
||||
GST_OBJECT_UNLOCK (pads);
|
||||
goto beach;
|
||||
|
||||
/* forward event */
|
||||
goto forward;
|
||||
}
|
||||
case GST_EVENT_EOS:
|
||||
{
|
||||
GST_OBJECT_LOCK (pads);
|
||||
|
||||
pads->eospads++;
|
||||
|
||||
/* if the pad was not EOS, make it EOS and so we
|
||||
* have one more eospad */
|
||||
if (data->abidata.ABI.eos == FALSE) {
|
||||
data->abidata.ABI.eos = TRUE;
|
||||
pads->eospads++;
|
||||
}
|
||||
/* check if we need collecting anything */
|
||||
gst_collect_pads_is_collected (pads, NULL);
|
||||
|
||||
GST_OBJECT_UNLOCK (pads);
|
||||
|
||||
/* We eat this event */
|
||||
/* We eat this event, element should do something
|
||||
* in the collected callback. */
|
||||
gst_event_unref (event);
|
||||
return TRUE;
|
||||
goto done;
|
||||
}
|
||||
case GST_EVENT_NEWSEGMENT:
|
||||
{
|
||||
gint64 start, stop, time;
|
||||
gdouble rate;
|
||||
gdouble rate, arate;
|
||||
GstFormat format;
|
||||
gboolean update;
|
||||
|
||||
gst_event_parse_new_segment (event, &update, &rate, &format,
|
||||
gst_event_parse_new_segment_full (event, &update, &rate, &arate, &format,
|
||||
&start, &stop, &time);
|
||||
|
||||
GST_DEBUG_OBJECT (data->pad, "got newsegment, start %" GST_TIME_FORMAT
|
||||
|
@ -736,12 +852,13 @@ gst_collect_pads_event (GstPad * pad, GstEvent * event)
|
|||
if (data->segment.format != format)
|
||||
gst_segment_init (&data->segment, format);
|
||||
|
||||
gst_segment_set_newsegment (&data->segment, update, rate, format,
|
||||
start, stop, time);
|
||||
gst_segment_set_newsegment_full (&data->segment, update, rate, arate,
|
||||
format, start, stop, time);
|
||||
|
||||
data->abidata.ABI.new_segment = TRUE;
|
||||
|
||||
/* For now we eat this event */
|
||||
/* we must not forward this event since multiple segments will be
|
||||
* accumulated and this is certainly not what we want. */
|
||||
gst_event_unref (event);
|
||||
/* FIXME: collect-pads based elements need to create their own newsegment
|
||||
event (and only one really)
|
||||
|
@ -753,14 +870,18 @@ gst_collect_pads_event (GstPad * pad, GstEvent * event)
|
|||
(that's what avimux does for something IIRC)
|
||||
see #340060
|
||||
*/
|
||||
return TRUE;
|
||||
goto done;
|
||||
}
|
||||
default:
|
||||
goto beach;
|
||||
/* forward other events */
|
||||
goto forward;
|
||||
}
|
||||
|
||||
beach:
|
||||
return gst_pad_event_default (pad, event);
|
||||
forward:
|
||||
res = gst_pad_event_default (pad, event);
|
||||
|
||||
done:
|
||||
return res;
|
||||
|
||||
/* ERRORS */
|
||||
not_ours:
|
||||
|
@ -771,10 +892,11 @@ not_ours:
|
|||
}
|
||||
|
||||
/* For each buffer we receive we check if our collected condition is reached
|
||||
and if so we call the collected function. When this is done we check if
|
||||
data has been unqueued. If data is still queued we wait holding the stream
|
||||
lock to make sure no EOS event can happen while we are ready to be
|
||||
collected */
|
||||
* and if so we call the collected function. When this is done we check if
|
||||
* data has been unqueued. If data is still queued we wait holding the stream
|
||||
* lock to make sure no EOS event can happen while we are ready to be
|
||||
* collected
|
||||
*/
|
||||
static GstFlowReturn
|
||||
gst_collect_pads_chain (GstPad * pad, GstBuffer * buffer)
|
||||
{
|
||||
|
@ -811,13 +933,16 @@ gst_collect_pads_chain (GstPad * pad, GstBuffer * buffer)
|
|||
buffer_p = &data->buffer;
|
||||
gst_buffer_replace (buffer_p, buffer);
|
||||
|
||||
if (data->segment.format == GST_FORMAT_TIME
|
||||
&& GST_BUFFER_TIMESTAMP_IS_VALID (buffer))
|
||||
gst_segment_set_last_stop (&data->segment, GST_FORMAT_TIME,
|
||||
GST_BUFFER_TIMESTAMP (buffer));
|
||||
/* update segment last position if in TIME */
|
||||
if (data->segment.format == GST_FORMAT_TIME) {
|
||||
GstClockTime timestamp = GST_BUFFER_TIMESTAMP (buffer);
|
||||
|
||||
if (GST_CLOCK_TIME_IS_VALID (timestamp))
|
||||
gst_segment_set_last_stop (&data->segment, GST_FORMAT_TIME, timestamp);
|
||||
}
|
||||
|
||||
/* Check if our collected condition is matched and call the collected function
|
||||
if it is */
|
||||
* if it is */
|
||||
gst_collect_pads_is_collected (pads, &ret);
|
||||
|
||||
/* We still have data queued on this pad, wait for something to happen */
|
||||
|
@ -832,7 +957,6 @@ gst_collect_pads_chain (GstPad * pad, GstBuffer * buffer)
|
|||
if (data->abidata.ABI.flushing)
|
||||
goto flushing;
|
||||
}
|
||||
|
||||
GST_OBJECT_UNLOCK (pads);
|
||||
|
||||
return ret;
|
||||
|
|
|
@ -49,6 +49,7 @@ typedef struct _GstCollectPadsClass GstCollectPadsClass;
|
|||
*/
|
||||
struct _GstCollectData
|
||||
{
|
||||
/* with LOCK of @collect */
|
||||
GstCollectPads *collect;
|
||||
GstPad *pad;
|
||||
GstBuffer *buffer;
|
||||
|
@ -60,6 +61,7 @@ struct _GstCollectData
|
|||
struct {
|
||||
gboolean flushing;
|
||||
gboolean new_segment;
|
||||
gboolean eos;
|
||||
} ABI;
|
||||
/* adding + 0 to mark ABI change to be undone later */
|
||||
gpointer _gst_reserved[GST_PADDING + 0];
|
||||
|
@ -77,6 +79,10 @@ struct _GstCollectData
|
|||
*/
|
||||
typedef GstFlowReturn (*GstCollectPadsFunction) (GstCollectPads *pads, gpointer user_data);
|
||||
|
||||
#define GST_COLLECT_PADS_GET_PAD_LOCK(pads) (((GstCollectPads *)pads)->abidata.ABI.pad_lock)
|
||||
#define GST_COLLECT_PADS_PAD_LOCK(pads) (g_mutex_lock(GST_COLLECT_PADS_GET_PAD_LOCK (pads)))
|
||||
#define GST_COLLECT_PADS_PAD_UNLOCK(pads) (g_mutex_unlock(GST_COLLECT_PADS_GET_PAD_LOCK (pads)))
|
||||
|
||||
#define GST_COLLECT_PADS_GET_COND(pads) (((GstCollectPads *)pads)->cond)
|
||||
#define GST_COLLECT_PADS_WAIT(pads) (g_cond_wait (GST_COLLECT_PADS_GET_COND (pads), GST_OBJECT_GET_LOCK (pads)))
|
||||
#define GST_COLLECT_PADS_SIGNAL(pads) (g_cond_signal (GST_COLLECT_PADS_GET_COND (pads)))
|
||||
|
@ -96,21 +102,31 @@ struct _GstCollectPads {
|
|||
GSList *data; /* list of CollectData items */
|
||||
|
||||
/*< private >*/
|
||||
guint32 cookie;
|
||||
guint32 cookie; /* @data list cookie */
|
||||
|
||||
/* with LOCK */
|
||||
GCond *cond; /* to signal removal of data */
|
||||
|
||||
GstCollectPadsFunction func; /* function and user_data for callback */
|
||||
gpointer user_data;
|
||||
|
||||
guint numpads; /* number of pads */
|
||||
guint numpads; /* number of pads in @data */
|
||||
guint queuedpads; /* number of pads with a buffer */
|
||||
guint eospads; /* number of pads that are EOS */
|
||||
|
||||
gboolean started;
|
||||
|
||||
/*< private >*/
|
||||
gpointer _gst_reserved[GST_PADDING];
|
||||
union {
|
||||
struct {
|
||||
/* since 0.10.6 */ /* with PAD_LOCK */
|
||||
GMutex *pad_lock; /* used to serialize add/remove */
|
||||
GSList *pad_list; /* updated pad list */
|
||||
guint32 pad_cookie; /* updated cookie */
|
||||
} ABI;
|
||||
/* adding + 0 to mark ABI change to be undone later */
|
||||
gpointer _gst_reserved[GST_PADDING + 0];
|
||||
} abidata;
|
||||
};
|
||||
|
||||
struct _GstCollectPadsClass {
|
||||
|
|
1
tests/check/libs/.gitignore
vendored
1
tests/check/libs/.gitignore
vendored
|
@ -1,4 +1,5 @@
|
|||
.dirstamp
|
||||
adapter
|
||||
gdp
|
||||
controller
|
||||
gstnetclientclock
|
||||
|
|
Loading…
Reference in a new issue