basesink: clean up event handling

Add new wait_eos vmethod to wait for the eos timeout before posting the EOS
message on the bus.
Add default event handler. Move the default event actions in there. Call the
event vmethod from the pad event handler. Subclasses are now supposed to chain
up to the parent event handler or unref the event and do their own thing.
Avoid passing unused parameters to functions.
This commit is contained in:
Wim Taymans 2011-12-02 22:20:08 +01:00
parent b6335d9505
commit 0e38f0dad0
5 changed files with 145 additions and 180 deletions

View file

@ -389,6 +389,10 @@ static void gst_base_sink_loop (GstPad * pad);
static gboolean gst_base_sink_pad_activate (GstPad * pad, GstObject * parent);
static gboolean gst_base_sink_pad_activate_mode (GstPad * pad,
GstObject * parent, GstPadMode mode, gboolean active);
static gboolean gst_base_sink_default_event (GstBaseSink * basesink,
GstEvent * event);
static GstFlowReturn gst_base_sink_default_wait_eos (GstBaseSink * basesink,
GstEvent * event);
static gboolean gst_base_sink_event (GstPad * pad, GstObject * parent,
GstEvent * event);
@ -546,6 +550,8 @@ gst_base_sink_class_init (GstBaseSinkClass * klass)
GST_DEBUG_FUNCPTR (gst_base_sink_default_activate_pull);
klass->get_times = GST_DEBUG_FUNCPTR (gst_base_sink_get_times);
klass->query = GST_DEBUG_FUNCPTR (default_sink_query);
klass->event = GST_DEBUG_FUNCPTR (gst_base_sink_default_event);
klass->wait_eos = GST_DEBUG_FUNCPTR (gst_base_sink_default_wait_eos);
/* Registering debug symbols for function pointers */
GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_fixate);
@ -1777,19 +1783,19 @@ gst_base_sink_get_sync_times (GstBaseSink * basesink, GstMiniObject * obj,
/* EOS event needs syncing */
case GST_EVENT_EOS:
{
if (basesink->segment.rate >= 0.0) {
if (segment->rate >= 0.0) {
sstart = sstop = priv->current_sstop;
if (!GST_CLOCK_TIME_IS_VALID (sstart)) {
/* we have not seen a buffer yet, use the segment values */
sstart = sstop = gst_segment_to_stream_time (&basesink->segment,
basesink->segment.format, basesink->segment.stop);
sstart = sstop = gst_segment_to_stream_time (segment,
segment->format, segment->stop);
}
} else {
sstart = sstop = priv->current_sstart;
if (!GST_CLOCK_TIME_IS_VALID (sstart)) {
/* we have not seen a buffer yet, use the segment values */
sstart = sstop = gst_segment_to_stream_time (&basesink->segment,
basesink->segment.format, basesink->segment.start);
sstart = sstop = gst_segment_to_stream_time (segment,
segment->format, segment->start);
}
}
@ -2276,7 +2282,7 @@ flushing:
* does not take ownership of obj.
*/
static GstFlowReturn
gst_base_sink_do_sync (GstBaseSink * basesink, GstPad * pad,
gst_base_sink_do_sync (GstBaseSink * basesink,
GstMiniObject * obj, gboolean * late, gboolean * step_end, guint8 obj_type)
{
GstClockTimeDiff jitter = 0;
@ -2764,7 +2770,7 @@ gst_base_sink_do_render_stats (GstBaseSink * basesink, gboolean start)
* takes ownership of obj.
*/
static GstFlowReturn
gst_base_sink_render_object (GstBaseSink * basesink, GstPad * pad,
gst_base_sink_render_object (GstBaseSink * basesink,
guint8 obj_type, gpointer obj)
{
GstFlowReturn ret;
@ -2776,10 +2782,8 @@ gst_base_sink_render_object (GstBaseSink * basesink, GstPad * pad,
priv = basesink->priv;
if (OBJ_IS_BUFFERLIST (obj_type)) {
/*
* If buffer list, use the first group buffer within the list
* for syncing
*/
/* If buffer list, use the first group buffer within the list
* for syncing */
sync_obj = gst_buffer_list_get (GST_BUFFER_LIST_CAST (obj), 0);
g_assert (NULL != sync_obj);
} else {
@ -2792,9 +2796,7 @@ again:
/* synchronize this object, non syncable objects return OK
* immediately. */
ret =
gst_base_sink_do_sync (basesink, pad, sync_obj, &late, &step_end,
obj_type);
ret = gst_base_sink_do_sync (basesink, sync_obj, &late, &step_end, obj_type);
if (G_UNLIKELY (ret != GST_FLOW_OK))
goto sync_failed;
@ -2848,91 +2850,6 @@ again:
priv->rendered++;
}
} else if (G_LIKELY (OBJ_IS_EVENT (obj_type))) {
GstEvent *event = GST_EVENT_CAST (obj);
gboolean event_res = TRUE;
GstEventType type;
bclass = GST_BASE_SINK_GET_CLASS (basesink);
type = GST_EVENT_TYPE (event);
GST_DEBUG_OBJECT (basesink, "rendering event %p, type %s", obj,
gst_event_type_get_name (type));
if (bclass->event)
event_res = bclass->event (basesink, event);
/* when we get here we could be flushing again when the event handler calls
* _wait_eos(). We have to ignore this object in that case. */
if (G_UNLIKELY (basesink->flushing))
goto flushing;
if (G_LIKELY (event_res)) {
guint32 seqnum;
seqnum = basesink->priv->seqnum = gst_event_get_seqnum (event);
GST_DEBUG_OBJECT (basesink, "Got seqnum #%" G_GUINT32_FORMAT, seqnum);
switch (type) {
case GST_EVENT_EOS:
{
GstMessage *message;
/* the EOS event is completely handled so we mark
* ourselves as being in the EOS state. eos is also
* protected by the object lock so we can read it when
* answering the POSITION query. */
GST_OBJECT_LOCK (basesink);
basesink->eos = TRUE;
GST_OBJECT_UNLOCK (basesink);
/* ok, now we can post the message */
GST_DEBUG_OBJECT (basesink, "Now posting EOS");
message = gst_message_new_eos (GST_OBJECT_CAST (basesink));
gst_message_set_seqnum (message, seqnum);
gst_element_post_message (GST_ELEMENT_CAST (basesink), message);
break;
}
case GST_EVENT_SEGMENT:
/* configure the segment */
/* The segment is protected with both the STREAM_LOCK and the OBJECT_LOCK.
* We protect with the OBJECT_LOCK so that we can use the values to
* safely answer a POSITION query. */
GST_OBJECT_LOCK (basesink);
/* the newsegment event is needed to bring the buffer timestamps to the
* stream time and to drop samples outside of the playback segment. */
gst_event_copy_segment (event, &basesink->segment);
GST_DEBUG_OBJECT (basesink, "configured SEGMENT %" GST_SEGMENT_FORMAT,
&basesink->segment);
basesink->have_newsegment = TRUE;
GST_OBJECT_UNLOCK (basesink);
break;
case GST_EVENT_TAG:
{
GstTagList *taglist;
gst_event_parse_tag (event, &taglist);
gst_element_post_message (GST_ELEMENT_CAST (basesink),
gst_message_new_tag (GST_OBJECT_CAST (basesink),
gst_tag_list_copy (taglist)));
break;
}
case GST_EVENT_SINK_MESSAGE:
{
GstMessage *msg = NULL;
gst_event_parse_sink_message (event, &msg);
if (msg)
gst_element_post_message (GST_ELEMENT_CAST (basesink), msg);
}
default:
break;
}
}
} else {
g_return_val_if_reached (GST_FLOW_ERROR);
}
@ -3123,43 +3040,80 @@ gst_base_sink_flush_stop (GstBaseSink * basesink, GstPad * pad,
GST_OBJECT_UNLOCK (basesink);
}
static gboolean
gst_base_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
static GstFlowReturn
gst_base_sink_default_wait_eos (GstBaseSink * basesink, GstEvent * event)
{
GstFlowReturn ret;
gboolean late, step_end;
ret = gst_base_sink_do_sync (basesink, GST_MINI_OBJECT_CAST (event),
&late, &step_end, _PR_IS_EVENT);
return ret;
}
static gboolean
gst_base_sink_default_event (GstBaseSink * basesink, GstEvent * event)
{
GstBaseSink *basesink;
gboolean result = TRUE;
GstBaseSinkClass *bclass;
basesink = GST_BASE_SINK (parent);
bclass = GST_BASE_SINK_GET_CLASS (basesink);
GST_DEBUG_OBJECT (basesink, "received event %p %" GST_PTR_FORMAT, event,
event);
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_FLUSH_START:
{
GST_DEBUG_OBJECT (basesink, "flush-start %p", event);
gst_base_sink_flush_start (basesink, basesink->sinkpad);
break;
}
case GST_EVENT_FLUSH_STOP:
{
gboolean reset_time;
gst_event_parse_flush_stop (event, &reset_time);
GST_DEBUG_OBJECT (basesink, "flush-stop %p, reset_time: %d", event,
reset_time);
gst_base_sink_flush_stop (basesink, basesink->sinkpad, reset_time);
break;
}
case GST_EVENT_EOS:
{
GstFlowReturn ret;
GST_BASE_SINK_PREROLL_LOCK (basesink);
if (G_UNLIKELY (basesink->flushing))
goto flushing;
if (G_UNLIKELY (basesink->priv->received_eos))
goto after_eos;
GstMessage *message;
guint32 seqnum;
/* we set the received EOS flag here so that we can use it when testing if
* we are prerolled and to refuse more buffers. */
basesink->priv->received_eos = TRUE;
/* EOS is a prerollable object, we call the unlocked version because it
* does not check the received_eos flag. */
ret = gst_base_sink_render_object (basesink, pad,
_PR_IS_EVENT, GST_MINI_OBJECT_CAST (event));
if (G_UNLIKELY (ret != GST_FLOW_OK))
result = FALSE;
/* wait for EOS */
if (G_LIKELY (bclass->wait_eos)) {
GstFlowReturn ret;
GST_BASE_SINK_PREROLL_UNLOCK (basesink);
ret = bclass->wait_eos (basesink, event);
if (G_UNLIKELY (ret != GST_FLOW_OK)) {
result = FALSE;
goto done;
}
}
/* the EOS event is completely handled so we mark
* ourselves as being in the EOS state. eos is also
* protected by the object lock so we can read it when
* answering the POSITION query. */
GST_OBJECT_LOCK (basesink);
basesink->eos = TRUE;
GST_OBJECT_UNLOCK (basesink);
/* ok, now we can post the message */
GST_DEBUG_OBJECT (basesink, "Now posting EOS");
seqnum = basesink->priv->seqnum = gst_event_get_seqnum (event);
GST_DEBUG_OBJECT (basesink, "Got seqnum #%" G_GUINT32_FORMAT, seqnum);
message = gst_message_new_eos (GST_OBJECT_CAST (basesink));
gst_message_set_seqnum (message, seqnum);
gst_element_post_message (GST_ELEMENT_CAST (basesink), message);
break;
}
case GST_EVENT_CAPS:
@ -3177,63 +3131,73 @@ gst_base_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
gst_caps_replace (&basesink->priv->caps, caps);
GST_OBJECT_UNLOCK (basesink);
}
gst_event_unref (event);
break;
}
case GST_EVENT_SEGMENT:
/* configure the segment */
/* The segment is protected with both the STREAM_LOCK and the OBJECT_LOCK.
* We protect with the OBJECT_LOCK so that we can use the values to
* safely answer a POSITION query. */
GST_OBJECT_LOCK (basesink);
/* the newsegment event is needed to bring the buffer timestamps to the
* stream time and to drop samples outside of the playback segment. */
gst_event_copy_segment (event, &basesink->segment);
GST_DEBUG_OBJECT (basesink, "configured SEGMENT %" GST_SEGMENT_FORMAT,
&basesink->segment);
basesink->have_newsegment = TRUE;
GST_OBJECT_UNLOCK (basesink);
break;
case GST_EVENT_TAG:
{
GstFlowReturn ret;
GstTagList *taglist;
GST_DEBUG_OBJECT (basesink, "segment %p", event);
gst_event_parse_tag (event, &taglist);
GST_BASE_SINK_PREROLL_LOCK (basesink);
if (G_UNLIKELY (basesink->flushing))
goto flushing;
if (G_UNLIKELY (basesink->priv->received_eos))
goto after_eos;
ret =
gst_base_sink_render_object (basesink, pad,
_PR_IS_EVENT, GST_MINI_OBJECT_CAST (event));
if (G_UNLIKELY (ret != GST_FLOW_OK))
result = FALSE;
GST_BASE_SINK_PREROLL_UNLOCK (basesink);
gst_element_post_message (GST_ELEMENT_CAST (basesink),
gst_message_new_tag (GST_OBJECT_CAST (basesink),
gst_tag_list_copy (taglist)));
break;
}
case GST_EVENT_FLUSH_START:
if (bclass->event)
bclass->event (basesink, event);
GST_DEBUG_OBJECT (basesink, "flush-start %p", event);
gst_base_sink_flush_start (basesink, pad);
gst_event_unref (event);
break;
case GST_EVENT_FLUSH_STOP:
case GST_EVENT_SINK_MESSAGE:
{
gboolean reset_time;
GstMessage *msg = NULL;
if (bclass->event)
bclass->event (basesink, event);
gst_event_parse_flush_stop (event, &reset_time);
GST_DEBUG_OBJECT (basesink, "flush-stop %p, reset_time: %d", event,
reset_time);
gst_base_sink_flush_stop (basesink, pad, reset_time);
gst_event_unref (event);
gst_event_parse_sink_message (event, &msg);
if (msg)
gst_element_post_message (GST_ELEMENT_CAST (basesink), msg);
break;
}
default:
/* other events are sent to queue or subclass depending on if they
* are serialized. */
if (GST_EVENT_IS_SERIALIZED (event)) {
GstFlowReturn ret;
break;
}
done:
gst_event_unref (event);
return result;
}
static gboolean
gst_base_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
{
GstBaseSink *basesink;
gboolean result = TRUE;
GstBaseSinkClass *bclass;
basesink = GST_BASE_SINK_CAST (parent);
bclass = GST_BASE_SINK_GET_CLASS (basesink);
GST_DEBUG_OBJECT (basesink, "received event %p %" GST_PTR_FORMAT, event,
event);
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_FLUSH_STOP:
/* special case for this serialized event because we don't want to grab
* the PREROLL lock or check if we were flushing */
if (bclass->event)
result = bclass->event (basesink, event);
break;
default:
if (GST_EVENT_IS_SERIALIZED (event)) {
GST_BASE_SINK_PREROLL_LOCK (basesink);
if (G_UNLIKELY (basesink->flushing))
goto flushing;
@ -3241,16 +3205,13 @@ gst_base_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
if (G_UNLIKELY (basesink->priv->received_eos))
goto after_eos;
ret = gst_base_sink_render_object (basesink, pad, _PR_IS_EVENT,
GST_MINI_OBJECT_CAST (event));
if (G_UNLIKELY (ret != GST_FLOW_OK))
result = FALSE;
if (bclass->event)
result = bclass->event (basesink, event);
GST_BASE_SINK_PREROLL_UNLOCK (basesink);
} else {
if (bclass->event)
bclass->event (basesink, event);
gst_event_unref (event);
result = bclass->event (basesink, event);
}
break;
}
@ -3262,8 +3223,8 @@ flushing:
{
GST_DEBUG_OBJECT (basesink, "we are flushing");
GST_BASE_SINK_PREROLL_UNLOCK (basesink);
result = FALSE;
gst_event_unref (event);
result = FALSE;
goto done;
}
@ -3271,8 +3232,8 @@ after_eos:
{
GST_DEBUG_OBJECT (basesink, "Event received after EOS, dropping");
GST_BASE_SINK_PREROLL_UNLOCK (basesink);
result = FALSE;
gst_event_unref (event);
result = FALSE;
goto done;
}
}
@ -3396,7 +3357,7 @@ gst_base_sink_chain_unlocked (GstBaseSink * basesink, GstPad * pad,
/* now we can process the buffer in the queue, this function takes ownership
* of the buffer */
result = gst_base_sink_render_object (basesink, pad, obj_type, obj);
result = gst_base_sink_render_object (basesink, obj_type, obj);
return result;
/* ERRORS */

View file

@ -127,6 +127,8 @@ struct _GstBaseSink {
* @unlock_stop: Clear the previous unlock request. Subclasses should clear
* any state they set during unlock(), such as clearing command queues.
* @event: Override this to handle events arriving on the sink pad
* @wait_eos: Override this to implement custom logic to wait for the EOS time.
* subclasses should always first chain up to the default implementation.
* @preroll: Called to present the preroll buffer if desired
* @render: Called when a buffer should be presented or output, at the
* correct moment if the #GstBaseSink has been set to sync to the clock.
@ -175,6 +177,9 @@ struct _GstBaseSinkClass {
/* notify subclass of event */
gboolean (*event) (GstBaseSink *sink, GstEvent *event);
/* wait for eos, subclasses should chain up to parent first */
GstFlowReturn (*wait_eos) (GstBaseSink *sink, GstEvent *event);
/* notify subclass of preroll buffer or real buffer */
GstFlowReturn (*preroll) (GstBaseSink *sink, GstBuffer *buffer);
GstFlowReturn (*render) (GstBaseSink *sink, GstBuffer *buffer);

View file

@ -425,11 +425,7 @@ gst_fake_sink_event (GstBaseSink * bsink, GstEvent * event)
gst_fake_sink_notify_last_message (sink);
}
if (GST_BASE_SINK_CLASS (parent_class)->event) {
return GST_BASE_SINK_CLASS (parent_class)->event (bsink, event);
} else {
return TRUE;
}
return GST_BASE_SINK_CLASS (parent_class)->event (bsink, event);
}
static GstFlowReturn

View file

@ -576,13 +576,14 @@ gst_fd_sink_event (GstBaseSink * sink, GstEvent * event)
break;
}
return TRUE;
return GST_BASE_SINK_CLASS (parent_class)->event (sink, event);
seek_failed:
{
GST_ELEMENT_ERROR (fdsink, RESOURCE, SEEK, (NULL),
("Error while seeking on file descriptor %d: %s",
fdsink->fd, g_strerror (errno)));
gst_event_unref (event);
return FALSE;
}

View file

@ -583,7 +583,7 @@ gst_file_sink_event (GstBaseSink * sink, GstEvent * event)
break;
}
return TRUE;
return GST_BASE_SINK_CLASS (parent_class)->event (sink, event);
/* ERRORS */
seek_failed:
@ -591,6 +591,7 @@ seek_failed:
GST_ELEMENT_ERROR (filesink, RESOURCE, SEEK,
(_("Error while seeking in file \"%s\"."), filesink->filename),
GST_ERROR_SYSTEM);
gst_event_unref (event);
return FALSE;
}
flush_failed:
@ -598,6 +599,7 @@ flush_failed:
GST_ELEMENT_ERROR (filesink, RESOURCE, WRITE,
(_("Error while writing to file \"%s\"."), filesink->filename),
GST_ERROR_SYSTEM);
gst_event_unref (event);
return FALSE;
}
}