Copy seqnums from events to messages so that they can all be related back to eachother.

Original commit message from CVS:
* gst/gstbin.c: (bin_handle_async_start),
(gst_bin_handle_message_func), (gst_bin_query):
* libs/gst/base/gstbasesink.c: (gst_base_sink_render_object),
(gst_base_sink_event), (gst_base_sink_change_state):
* libs/gst/base/gstbasesrc.c: (gst_base_src_perform_seek),
(gst_base_src_loop), (gst_base_src_change_state):
Copy seqnums from events to messages so that they can all be related
back to eachother.
This commit is contained in:
Wim Taymans 2008-11-04 15:56:55 +00:00
parent 57792e66ac
commit 0a71170006
4 changed files with 101 additions and 26 deletions

View file

@ -1,3 +1,14 @@
2008-11-04 Wim Taymans <wim.taymans@collabora.co.uk>
* gst/gstbin.c: (bin_handle_async_start),
(gst_bin_handle_message_func), (gst_bin_query):
* libs/gst/base/gstbasesink.c: (gst_base_sink_render_object),
(gst_base_sink_event), (gst_base_sink_change_state):
* libs/gst/base/gstbasesrc.c: (gst_base_src_perform_seek),
(gst_base_src_loop), (gst_base_src_change_state):
Copy seqnums from events to messages so that they can all be related
back to eachother.
2008-11-04 Wim Taymans <wim.taymans@collabora.co.uk>
* tools/gst-launch.c: (event_loop):

View file

@ -2519,7 +2519,6 @@ bin_handle_async_start (GstBin * bin, gboolean new_base_time)
if (GST_STATE_RETURN (bin) == GST_STATE_CHANGE_NO_PREROLL)
goto was_no_preroll;
old_state = GST_STATE (bin);
/* when we PLAYING we go back to PAUSED, when preroll happens, we go back to
@ -2763,6 +2762,8 @@ gst_bin_handle_message_func (GstBin * bin, GstMessage * message)
{
GstObject *src;
GstMessageType type;
GstMessage *tmessage;
guint32 seqnum;
src = GST_MESSAGE_SRC (message);
type = GST_MESSAGE_TYPE (message);
@ -2784,9 +2785,13 @@ gst_bin_handle_message_func (GstBin * bin, GstMessage * message)
/* if we are completely EOS, we forward an EOS message */
if (eos) {
GST_DEBUG_OBJECT (bin, "all sinks posted EOS");
gst_element_post_message (GST_ELEMENT_CAST (bin),
gst_message_new_eos (GST_OBJECT_CAST (bin)));
seqnum = gst_message_get_seqnum (message);
tmessage = gst_message_new_eos (GST_OBJECT_CAST (bin));
gst_message_set_seqnum (tmessage, seqnum);
GST_DEBUG_OBJECT (bin,
"all sinks posted EOS, posting seqnum #%" G_GUINT32_FORMAT, seqnum);
gst_element_post_message (GST_ELEMENT_CAST (bin), tmessage);
}
break;
}
@ -2827,10 +2832,13 @@ gst_bin_handle_message_func (GstBin * bin, GstMessage * message)
}
GST_OBJECT_UNLOCK (bin);
if (post) {
seqnum = gst_message_get_seqnum (message);
tmessage = gst_message_new_segment_done (GST_OBJECT_CAST (bin),
format, position);
gst_message_set_seqnum (tmessage, seqnum);
/* post segment done with latest format and position. */
gst_element_post_message (GST_ELEMENT_CAST (bin),
gst_message_new_segment_done (GST_OBJECT_CAST (bin),
format, position));
gst_element_post_message (GST_ELEMENT_CAST (bin), tmessage);
}
break;
}
@ -3245,6 +3253,7 @@ gst_bin_query (GstElement * element, GstQuery * query)
fold_data.query = query;
/* set the result of the query to FALSE initially */
g_value_init (&ret, G_TYPE_BOOLEAN);
g_value_set_boolean (&ret, res);

View file

@ -215,9 +215,13 @@ struct _GstBaseSinkPrivate
/* caps for pull based scheduling */
GstCaps *pull_caps;
/* blocksize for pulling */
guint blocksize;
gboolean discont;
/* seqnum of the stream */
guint32 seqnum;
};
#define DO_RUNNING_AVG(avg,val,size) (((val) + ((size)-1) * (avg)) / (size))
@ -2350,8 +2354,16 @@ gst_base_sink_render_object (GstBaseSink * basesink, GstPad * pad,
goto flushing;
if (G_LIKELY (event_res)) {
guint32 seqnum;
seqnum = basesink->priv->seqnum = gst_event_get_seqnum (event);
GST_DEBUG_OBJECT (basesink, "Got seqnum #%" G_GUINT32_FORMAT, seqnum);
switch (type) {
case GST_EVENT_EOS:
{
GstMessage *message;
/* the EOS event is completely handled so we mark
* ourselves as being in the EOS state. eos is also
* protected by the object lock so we can read it when
@ -2359,11 +2371,15 @@ gst_base_sink_render_object (GstBaseSink * basesink, GstPad * pad,
GST_OBJECT_LOCK (basesink);
basesink->eos = TRUE;
GST_OBJECT_UNLOCK (basesink);
/* ok, now we can post the message */
GST_DEBUG_OBJECT (basesink, "Now posting EOS");
gst_element_post_message (GST_ELEMENT_CAST (basesink),
gst_message_new_eos (GST_OBJECT_CAST (basesink)));
message = gst_message_new_eos (GST_OBJECT_CAST (basesink));
gst_message_set_seqnum (message, seqnum);
gst_element_post_message (GST_ELEMENT_CAST (basesink), message);
break;
}
case GST_EVENT_NEWSEGMENT:
/* configure the segment */
gst_base_sink_configure_segment (basesink, pad, event,
@ -2665,7 +2681,7 @@ gst_base_sink_event (GstPad * pad, GstEvent * event)
gst_event_unref (event);
} else {
/* we set the received EOS flag here so that we can use it when testing if
* we are prerolled and to refure more buffers. */
* we are prerolled and to refuse more buffers. */
basesink->priv->received_eos = TRUE;
/* EOS is a prerollable object, we call the unlocked version because it
@ -3998,10 +4014,13 @@ gst_base_sink_change_state (GstElement * element, GstStateChange transition)
basesink->playing_async = FALSE;
basesink->need_preroll = FALSE;
if (basesink->eos) {
GstMessage *message;
/* need to post EOS message here */
GST_DEBUG_OBJECT (basesink, "Now posting EOS");
gst_element_post_message (GST_ELEMENT_CAST (basesink),
gst_message_new_eos (GST_OBJECT_CAST (basesink)));
message = gst_message_new_eos (GST_OBJECT_CAST (basesink));
gst_message_set_seqnum (message, basesink->priv->seqnum);
gst_element_post_message (GST_ELEMENT_CAST (basesink), message);
} else {
GST_DEBUG_OBJECT (basesink, "signal preroll");
GST_PAD_PREROLL_SIGNAL (basesink->sinkpad);

View file

@ -229,6 +229,9 @@ struct _GstBaseSrcPrivate
GstClockTimeDiff ts_offset;
gboolean do_timestamp;
/* stream sequence number */
guint32 seqnum;
};
static GstElementClass *parent_class = NULL;
@ -1154,6 +1157,8 @@ gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock)
gboolean relative_seek = FALSE;
gboolean seekseg_configured = FALSE;
GstSegment seeksegment;
guint32 seqnum;
GstEvent *tevent;
GST_DEBUG_OBJECT (src, "doing seek");
@ -1180,14 +1185,19 @@ gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock)
}
flush = flags & GST_SEEK_FLAG_FLUSH;
seqnum = gst_event_get_seqnum (event);
} else {
flush = FALSE;
/* get next seqnum */
seqnum = gst_util_seqnum_next ();
}
/* send flush start */
if (flush)
gst_pad_push_event (src->srcpad, gst_event_new_flush_start ());
else
if (flush) {
tevent = gst_event_new_flush_start ();
gst_event_set_seqnum (tevent, seqnum);
gst_pad_push_event (src->srcpad, tevent);
} else
gst_pad_pause_task (src->srcpad);
/* unblock streaming thread. */
@ -1197,6 +1207,14 @@ gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock)
* because the task is paused, our streaming thread stopped
* or because our peer is flushing. */
GST_PAD_STREAM_LOCK (src->srcpad);
if (G_UNLIKELY (src->priv->seqnum == seqnum)) {
/* we have seen this event before, issue a warning for now */
GST_WARNING_OBJECT (src, "duplicate event found %" G_GUINT32_FORMAT,
seqnum);
} else {
src->priv->seqnum = seqnum;
GST_DEBUG_OBJECT (src, "seek with seqnum %" G_GUINT32_FORMAT, seqnum);
}
gst_base_src_set_flushing (src, FALSE, playing, unlock, NULL);
@ -1239,9 +1257,11 @@ gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock)
/* and prepare to continue streaming */
if (flush) {
tevent = gst_event_new_flush_stop ();
gst_event_set_seqnum (tevent, seqnum);
/* send flush stop, peer will accept data and events again. We
* are not yet providing data as we still have the STREAM_LOCK. */
gst_pad_push_event (src->srcpad, gst_event_new_flush_stop ());
gst_pad_push_event (src->srcpad, tevent);
} else if (res && src->data.ABI.running) {
/* we are running the current segment and doing a non-flushing seek,
* close the segment first based on the last_stop. */
@ -1255,6 +1275,7 @@ gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock)
gst_event_new_new_segment_full (TRUE,
src->segment.rate, src->segment.applied_rate, src->segment.format,
src->segment.start, src->segment.last_stop, src->segment.time);
gst_event_set_seqnum (src->priv->close_segment, seqnum);
}
/* The subclass must have converted the segment to the processing format
@ -1271,9 +1292,13 @@ gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock)
memcpy (&src->segment, &seeksegment, sizeof (GstSegment));
if (src->segment.flags & GST_SEEK_FLAG_SEGMENT) {
gst_element_post_message (GST_ELEMENT (src),
gst_message_new_segment_start (GST_OBJECT (src),
src->segment.format, src->segment.last_stop));
GstMessage *message;
message = gst_message_new_segment_start (GST_OBJECT (src),
src->segment.format, src->segment.last_stop);
gst_message_set_seqnum (message, seqnum);
gst_element_post_message (GST_ELEMENT (src), message);
}
/* for deriving a stop position for the playback segment from the seek
@ -1301,6 +1326,7 @@ gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock)
src->segment.rate, src->segment.applied_rate, src->segment.format,
src->segment.start, src->segment.last_stop, src->segment.time);
}
gst_event_set_seqnum (src->priv->start_segment, seqnum);
}
src->priv->discont = TRUE;
@ -2273,6 +2299,7 @@ flushing:
pause:
{
const gchar *reason = gst_flow_get_name (ret);
GstEvent *event;
GST_DEBUG_OBJECT (src, "pausing task, reason %s", reason);
src->data.ABI.running = FALSE;
@ -2281,20 +2308,27 @@ pause:
if (ret == GST_FLOW_UNEXPECTED) {
/* perform EOS logic */
if (src->segment.flags & GST_SEEK_FLAG_SEGMENT) {
gst_element_post_message (GST_ELEMENT_CAST (src),
gst_message_new_segment_done (GST_OBJECT_CAST (src),
src->segment.format, src->segment.last_stop));
GstMessage *message;
message = gst_message_new_segment_done (GST_OBJECT_CAST (src),
src->segment.format, src->segment.last_stop);
gst_message_set_seqnum (message, src->priv->seqnum);
gst_element_post_message (GST_ELEMENT_CAST (src), message);
} else {
gst_pad_push_event (pad, gst_event_new_eos ());
event = gst_event_new_eos ();
gst_event_set_seqnum (event, src->priv->seqnum);
gst_pad_push_event (pad, event);
src->priv->last_sent_eos = TRUE;
}
} else {
event = gst_event_new_eos ();
gst_event_set_seqnum (event, src->priv->seqnum);
/* for fatal errors we post an error message, post the error
* first so the app knows about the error first. */
GST_ELEMENT_ERROR (src, STREAM, FAILED,
(_("Internal data flow error.")),
("streaming task paused, reason %s (%d)", reason, ret));
gst_pad_push_event (pad, gst_event_new_eos ());
gst_pad_push_event (pad, event);
src->priv->last_sent_eos = TRUE;
}
}
@ -2793,7 +2827,7 @@ gst_base_src_change_state (GstElement * element, GstStateChange transition)
break;
case GST_STATE_CHANGE_PAUSED_TO_READY:
{
GstEvent **event_p;
GstEvent **event_p, *event;
/* we don't need to unblock anything here, the pad deactivation code
* already did this */
@ -2803,7 +2837,9 @@ gst_base_src_change_state (GstElement * element, GstStateChange transition)
* the EOS event to the element */
if (!basesrc->priv->last_sent_eos) {
GST_DEBUG_OBJECT (basesrc, "Sending EOS event");
gst_pad_push_event (basesrc->srcpad, gst_event_new_eos ());
event = gst_event_new_eos ();
gst_event_set_seqnum (event, basesrc->priv->seqnum);
gst_pad_push_event (basesrc->srcpad, event);
basesrc->priv->last_sent_eos = TRUE;
}
g_atomic_int_set (&basesrc->priv->pending_eos, FALSE);