collectpads: implement flushing seek support

Implement common flushing seek logic in GstCollectPads. Add new
API so that elements can opt-in to using the new logic
(gst_collect_pads_src_event_default) and can extend it
(gst_collect_pads_set_flush_function) to flush any internal
state.

See https://bugzilla.gnome.org/show_bug.cgi?id=706779 and
https://bugzilla.gnome.org/show_bug.cgi?id=706441 for the
background discussion.

API: gst_collect_pads_set_flush_function()
API: gst_collect_pads_src_event_default()

https://bugzilla.gnome.org/show_bug.cgi?id=708416
This commit is contained in:
Alessandro Decina 2013-09-16 09:55:58 +02:00 committed by Sebastian Dröge
parent f52b5ddcd2
commit 7aec5739eb
2 changed files with 177 additions and 25 deletions

View file

@ -132,11 +132,17 @@ struct _GstCollectPadsPrivate
gpointer query_user_data; gpointer query_user_data;
GstCollectPadsClipFunction clip_func; GstCollectPadsClipFunction clip_func;
gpointer clip_user_data; gpointer clip_user_data;
GstCollectPadsFlushFunction flush_func;
gpointer flush_user_data;
/* no other lock needed */ /* no other lock needed */
GMutex evt_lock; /* these make up sort of poor man's event signaling */ GMutex evt_lock; /* these make up sort of poor man's event signaling */
GCond evt_cond; GCond evt_cond;
guint32 evt_cookie; guint32 evt_cookie;
gboolean seeking;
gboolean pending_flush_start;
gboolean pending_flush_stop;
}; };
static void gst_collect_pads_clear (GstCollectPads * pads, static void gst_collect_pads_clear (GstCollectPads * pads,
@ -260,6 +266,10 @@ gst_collect_pads_init (GstCollectPads * pads)
g_mutex_init (&pads->priv->evt_lock); g_mutex_init (&pads->priv->evt_lock);
g_cond_init (&pads->priv->evt_cond); g_cond_init (&pads->priv->evt_cond);
pads->priv->evt_cookie = 0; pads->priv->evt_cookie = 0;
pads->priv->seeking = FALSE;
pads->priv->pending_flush_start = FALSE;
pads->priv->pending_flush_stop = FALSE;
} }
static void static void
@ -541,6 +551,17 @@ gst_collect_pads_set_clip_function (GstCollectPads * pads,
pads->priv->clip_user_data = user_data; pads->priv->clip_user_data = user_data;
} }
void
gst_collect_pads_set_flush_function (GstCollectPads * pads,
GstCollectPadsFlushFunction func, gpointer user_data)
{
g_return_if_fail (pads != NULL);
g_return_if_fail (GST_IS_COLLECT_PADS (pads));
pads->priv->flush_func = func;
pads->priv->flush_user_data = user_data;
}
/** /**
* gst_collect_pads_add_pad: * gst_collect_pads_add_pad:
* @pads: the collectpads to use * @pads: the collectpads to use
@ -1179,7 +1200,7 @@ gst_collect_pads_set_waiting (GstCollectPads * pads, GstCollectData * data,
/* Do something only on a change and if not locked */ /* Do something only on a change and if not locked */
if (!GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_LOCKED) && if (!GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_LOCKED) &&
(GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_WAITING) != (GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_WAITING) !=
! !waiting)) { !!waiting)) {
/* Set waiting state for this pad */ /* Set waiting state for this pad */
if (waiting) if (waiting)
GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_WAITING); GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_WAITING);
@ -1284,6 +1305,10 @@ gst_collect_pads_check_collected (GstCollectPads * pads)
GST_DEBUG_OBJECT (pads, "All active pads (%d) are EOS, calling %s", GST_DEBUG_OBJECT (pads, "All active pads (%d) are EOS, calling %s",
pads->priv->numpads, GST_DEBUG_FUNCPTR_NAME (func)); pads->priv->numpads, GST_DEBUG_FUNCPTR_NAME (func));
if (G_UNLIKELY (g_atomic_int_compare_and_exchange (&pads->priv->seeking,
TRUE, FALSE) == TRUE)) {
GST_INFO_OBJECT (pads, "finished seeking");
}
do { do {
flow_ret = func (pads, user_data); flow_ret = func (pads, user_data);
} while (flow_ret == GST_FLOW_OK); } while (flow_ret == GST_FLOW_OK);
@ -1298,6 +1323,10 @@ gst_collect_pads_check_collected (GstCollectPads * pads)
pads->priv->queuedpads, pads->priv->eospads, pads->priv->numpads, pads->priv->queuedpads, pads->priv->eospads, pads->priv->numpads,
GST_DEBUG_FUNCPTR_NAME (func)); GST_DEBUG_FUNCPTR_NAME (func));
if (G_UNLIKELY (g_atomic_int_compare_and_exchange (&pads->priv->seeking,
TRUE, FALSE) == TRUE)) {
GST_INFO_OBJECT (pads, "finished seeking");
}
flow_ret = func (pads, user_data); flow_ret = func (pads, user_data);
collected = TRUE; collected = TRUE;
@ -1624,33 +1653,59 @@ gst_collect_pads_event_default (GstCollectPads * pads, GstCollectData * data,
switch (GST_EVENT_TYPE (event)) { switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_FLUSH_START: case GST_EVENT_FLUSH_START:
{ {
/* forward event to unblock check_collected */ if (g_atomic_int_get (&pads->priv->seeking)) {
GST_DEBUG_OBJECT (pad, "forwarding flush start"); /* drop all but the first FLUSH_STARTs when seeking */
res = gst_pad_event_default (pad, parent, event); if (g_atomic_int_compare_and_exchange (&pads->priv->pending_flush_start,
event = NULL; TRUE, FALSE) == FALSE)
goto eat;
/* now unblock the chain function. /* unblock collect pads */
* no cond per pad, so they all unblock, gst_pad_event_default (pad, parent, event);
* non-flushing block again */ event = NULL;
GST_COLLECT_PADS_STREAM_LOCK (pads);
GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_FLUSHING);
gst_collect_pads_clear (pads, data);
/* cater for possible default muxing functionality */ GST_COLLECT_PADS_STREAM_LOCK (pads);
if (buffer_func) { /* Start flushing. We never call gst_collect_pads_set_flushing (FALSE), we
/* restore to initial state */ * instead wait until each pad gets its FLUSH_STOP and let that reset the pad to
gst_collect_pads_set_waiting (pads, data, TRUE); * non-flushing (which happens in gst_collect_pads_event_default).
/* if the current pad is affected, reset state, recalculate later */ */
if (pads->priv->earliest_data == data) { gst_collect_pads_set_flushing (pads, TRUE);
unref_data (data);
pads->priv->earliest_data = NULL; if (pads->priv->flush_func)
pads->priv->earliest_time = GST_CLOCK_TIME_NONE; pads->priv->flush_func (pads, pads->priv->flush_user_data);
g_atomic_int_set (&pads->priv->pending_flush_stop, TRUE);
GST_COLLECT_PADS_STREAM_UNLOCK (pads);
goto eat;
} else {
/* forward event to unblock check_collected */
GST_DEBUG_OBJECT (pad, "forwarding flush start");
res = gst_pad_event_default (pad, parent, event);
event = NULL;
/* now unblock the chain function.
* no cond per pad, so they all unblock,
* non-flushing block again */
GST_COLLECT_PADS_STREAM_LOCK (pads);
GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_FLUSHING);
gst_collect_pads_clear (pads, data);
/* cater for possible default muxing functionality */
if (buffer_func) {
/* restore to initial state */
gst_collect_pads_set_waiting (pads, data, TRUE);
/* if the current pad is affected, reset state, recalculate later */
if (pads->priv->earliest_data == data) {
unref_data (data);
pads->priv->earliest_data = NULL;
pads->priv->earliest_time = GST_CLOCK_TIME_NONE;
}
} }
GST_COLLECT_PADS_STREAM_UNLOCK (pads);
goto eat;
} }
GST_COLLECT_PADS_STREAM_UNLOCK (pads);
goto eat;
} }
case GST_EVENT_FLUSH_STOP: case GST_EVENT_FLUSH_STOP:
{ {
@ -1673,7 +1728,15 @@ gst_collect_pads_event_default (GstCollectPads * pads, GstCollectData * data,
} }
GST_COLLECT_PADS_STREAM_UNLOCK (pads); GST_COLLECT_PADS_STREAM_UNLOCK (pads);
goto forward; if (g_atomic_int_get (&pads->priv->seeking)) {
if (g_atomic_int_compare_and_exchange (&pads->priv->pending_flush_stop,
TRUE, FALSE))
goto forward;
else
goto eat;
} else {
goto forward;
}
} }
case GST_EVENT_EOS: case GST_EVENT_EOS:
{ {
@ -1774,6 +1837,73 @@ forward:
return gst_pad_event_default (pad, parent, event); return gst_pad_event_default (pad, parent, event);
} }
typedef struct
{
GstEvent *event;
gboolean result;
} EventData;
static gboolean
event_forward_func (GstPad * pad, EventData * data)
{
data->result &= gst_pad_push_event (pad, gst_event_ref (data->event));
return !data->result;
}
static gboolean
forward_event_to_all_sinkpads (GstPad * srcpad, GstEvent * event)
{
EventData data;
data.event = event;
data.result = TRUE;
gst_pad_forward (srcpad, (GstPadForwardFunction) event_forward_func, &data);
return data.result;
}
gboolean
gst_collect_pads_src_event_default (GstCollectPads * pads, GstPad * pad,
GstEvent * event)
{
GstObject *parent;
gboolean res = TRUE;
parent = GST_OBJECT_PARENT (pad);
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_SEEK:{
GstSeekFlags flags;
GST_INFO_OBJECT (pads, "starting seek");
gst_event_parse_seek (event, NULL, NULL, &flags, NULL, NULL, NULL, NULL);
if (flags & GST_SEEK_FLAG_FLUSH) {
g_atomic_int_set (&pads->priv->seeking, TRUE);
g_atomic_int_set (&pads->priv->pending_flush_start, TRUE);
/* forward the seek upstream */
res = forward_event_to_all_sinkpads (pad, event);
event = NULL;
if (!res) {
g_atomic_int_set (&pads->priv->seeking, FALSE);
g_atomic_int_set (&pads->priv->pending_flush_start, FALSE);
}
}
GST_INFO_OBJECT (pads, "seek done, result: %d", res);
break;
}
default:
break;
}
if (event)
res = gst_pad_event_default (pad, parent, event);
return res;
}
static gboolean static gboolean
gst_collect_pads_event_default_internal (GstCollectPads * pads, gst_collect_pads_event_default_internal (GstCollectPads * pads,
GstCollectData * data, GstEvent * event, gpointer user_data) GstCollectData * data, GstEvent * event, gpointer user_data)

View file

@ -236,6 +236,23 @@ typedef GstFlowReturn (*GstCollectPadsClipFunction) (GstCollectPads *pads, GstCo
GstBuffer *inbuffer, GstBuffer **outbuffer, GstBuffer *inbuffer, GstBuffer **outbuffer,
gpointer user_data); gpointer user_data);
/**
* GstCollectPadsFlushFunction:
* @pads: a #GstCollectPads
* @user_data: user data
*
* A function that will be called while processing a flushing seek event.
*
* The function should flush any internal state of the element and the state of
* all the pads. It should clear only the state not directly managed by the
* @pads object. It is therefore not necessary to call
* gst_collect_pads_set_flushing nor gst_collect_pads_clear from this function.
*
* Since: FIXME
*/
typedef void (*GstCollectPadsFlushFunction) (GstCollectPads *pads, gpointer user_data);
/** /**
* GST_COLLECT_PADS_GET_STREAM_LOCK: * GST_COLLECT_PADS_GET_STREAM_LOCK:
* @pads: a #GstCollectPads * @pads: a #GstCollectPads
@ -311,6 +328,9 @@ void gst_collect_pads_set_compare_function (GstCollectPads *pads,
void gst_collect_pads_set_clip_function (GstCollectPads *pads, void gst_collect_pads_set_clip_function (GstCollectPads *pads,
GstCollectPadsClipFunction clipfunc, GstCollectPadsClipFunction clipfunc,
gpointer user_data); gpointer user_data);
void gst_collect_pads_set_flush_function (GstCollectPads *pads,
GstCollectPadsFlushFunction func,
gpointer user_data);
/* pad management */ /* pad management */
GstCollectData* gst_collect_pads_add_pad (GstCollectPads *pads, GstPad *pad, guint size, GstCollectData* gst_collect_pads_add_pad (GstCollectPads *pads, GstPad *pad, guint size,
@ -349,6 +369,8 @@ GstFlowReturn gst_collect_pads_clip_running_time (GstCollectPads * pads,
/* default handlers */ /* default handlers */
gboolean gst_collect_pads_event_default (GstCollectPads * pads, GstCollectData * data, gboolean gst_collect_pads_event_default (GstCollectPads * pads, GstCollectData * data,
GstEvent * event, gboolean discard); GstEvent * event, gboolean discard);
gboolean gst_collect_pads_src_event_default (GstCollectPads * pads, GstPad * pad,
GstEvent * event);
gboolean gst_collect_pads_query_default (GstCollectPads * pads, GstCollectData * data, gboolean gst_collect_pads_query_default (GstCollectPads * pads, GstCollectData * data,
GstQuery * query, gboolean discard); GstQuery * query, gboolean discard);