rtspsrc: Consistently set seqnums on events

Set udpsrc seqnums on all events sent to the udpsrc's, and before
forwarding events out of rtspsrc set the latest seek seqnum on them if
any.

Also produce a consistent seqnum in rtspsrc from the very beginning.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/3409>
This commit is contained in:
Sebastian Dröge 2022-11-14 21:19:12 +02:00 committed by GStreamer Marge Bot
parent e6efd288c2
commit 424e208170

View file

@ -2779,8 +2779,7 @@ gst_rtspsrc_set_state (GstRTSPSrc * src, GstState state)
} }
static void static void
gst_rtspsrc_flush (GstRTSPSrc * src, gboolean flush, gboolean playing, gst_rtspsrc_flush (GstRTSPSrc * src, gboolean flush, gboolean playing)
guint32 seqnum)
{ {
GstEvent *event; GstEvent *event;
gint cmd; gint cmd;
@ -2788,13 +2787,11 @@ gst_rtspsrc_flush (GstRTSPSrc * src, gboolean flush, gboolean playing,
if (flush) { if (flush) {
event = gst_event_new_flush_start (); event = gst_event_new_flush_start ();
gst_event_set_seqnum (event, seqnum);
GST_DEBUG_OBJECT (src, "start flush"); GST_DEBUG_OBJECT (src, "start flush");
cmd = CMD_WAIT; cmd = CMD_WAIT;
state = GST_STATE_PAUSED; state = GST_STATE_PAUSED;
} else { } else {
event = gst_event_new_flush_stop (TRUE); event = gst_event_new_flush_stop (TRUE);
gst_event_set_seqnum (event, seqnum);
GST_DEBUG_OBJECT (src, "stop flush; playing %d", playing); GST_DEBUG_OBJECT (src, "stop flush; playing %d", playing);
cmd = CMD_LOOP; cmd = CMD_LOOP;
if (playing) if (playing)
@ -2927,7 +2924,7 @@ gst_rtspsrc_perform_seek (GstRTSPSrc * src, GstEvent * event)
* blocking in preroll). */ * blocking in preroll). */
if (flush) { if (flush) {
GST_DEBUG_OBJECT (src, "starting flush"); GST_DEBUG_OBJECT (src, "starting flush");
gst_rtspsrc_flush (src, TRUE, FALSE, gst_event_get_seqnum (event)); gst_rtspsrc_flush (src, TRUE, FALSE);
} else { } else {
if (src->task) { if (src->task) {
gst_task_pause (src->task); gst_task_pause (src->task);
@ -2977,7 +2974,7 @@ gst_rtspsrc_perform_seek (GstRTSPSrc * src, GstEvent * event)
if (flush) { if (flush) {
/* if we started flush, we stop now */ /* if we started flush, we stop now */
GST_DEBUG_OBJECT (src, "stopping flush"); GST_DEBUG_OBJECT (src, "stopping flush");
gst_rtspsrc_flush (src, FALSE, playing, gst_event_get_seqnum (event)); gst_rtspsrc_flush (src, FALSE, playing);
} }
/* now we did the seek and can activate the new segment values */ /* now we did the seek and can activate the new segment values */
@ -3131,15 +3128,13 @@ gst_rtspsrc_update_src_event (GstRTSPSrc * self, GstRTSPStream * stream,
event = gst_event_new_stream_start (stream_id); event = gst_event_new_stream_start (stream_id);
gst_rtspsrc_stream_start_event_add_group_id (self, event); gst_rtspsrc_stream_start_event_add_group_id (self, event);
g_free (stream_id); g_free (stream_id);
gst_event_set_seqnum (event, self->seek_seqnum);
break; break;
} }
case GST_EVENT_SEGMENT: default:
if (self->seek_seqnum != GST_SEQNUM_INVALID) {
event = gst_event_make_writable (event); event = gst_event_make_writable (event);
gst_event_set_seqnum (event, self->seek_seqnum); gst_event_set_seqnum (event, self->seek_seqnum);
}
break;
default:
break; break;
} }
@ -5196,8 +5191,8 @@ gst_rtspsrc_stream_push_event (GstRTSPSrc * src, GstRTSPStream * stream,
if (stream->udpsrc[0]) { if (stream->udpsrc[0]) {
GstEvent *sent_event; GstEvent *sent_event;
if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) { if (stream->segment_seqnum[0] != GST_SEQNUM_INVALID) {
sent_event = gst_event_new_eos (); sent_event = gst_event_copy (event);
gst_event_set_seqnum (sent_event, stream->segment_seqnum[0]); gst_event_set_seqnum (sent_event, stream->segment_seqnum[0]);
} else { } else {
sent_event = gst_event_ref (event); sent_event = gst_event_ref (event);
@ -5205,32 +5200,38 @@ gst_rtspsrc_stream_push_event (GstRTSPSrc * src, GstRTSPStream * stream,
res = gst_element_send_event (stream->udpsrc[0], sent_event); res = gst_element_send_event (stream->udpsrc[0], sent_event);
} else if (stream->channelpad[0]) { } else if (stream->channelpad[0]) {
gst_event_ref (event); GstEvent *sent_event;
sent_event = gst_event_copy (event);
gst_event_set_seqnum (sent_event, src->seek_seqnum);
if (GST_PAD_IS_SRC (stream->channelpad[0])) if (GST_PAD_IS_SRC (stream->channelpad[0]))
res = gst_pad_push_event (stream->channelpad[0], event); res = gst_pad_push_event (stream->channelpad[0], sent_event);
else else
res = gst_pad_send_event (stream->channelpad[0], event); res = gst_pad_send_event (stream->channelpad[0], sent_event);
} }
if (stream->udpsrc[1]) { if (stream->udpsrc[1]) {
GstEvent *sent_event; GstEvent *sent_event;
if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
sent_event = gst_event_new_eos ();
if (stream->segment_seqnum[1] != GST_SEQNUM_INVALID) { if (stream->segment_seqnum[1] != GST_SEQNUM_INVALID) {
sent_event = gst_event_copy (event);
gst_event_set_seqnum (sent_event, stream->segment_seqnum[1]); gst_event_set_seqnum (sent_event, stream->segment_seqnum[1]);
}
} else { } else {
sent_event = gst_event_ref (event); sent_event = gst_event_ref (event);
} }
res &= gst_element_send_event (stream->udpsrc[1], sent_event); res &= gst_element_send_event (stream->udpsrc[1], sent_event);
} else if (stream->channelpad[1]) { } else if (stream->channelpad[1]) {
gst_event_ref (event); GstEvent *sent_event;
sent_event = gst_event_copy (event);
gst_event_set_seqnum (sent_event, src->seek_seqnum);
if (GST_PAD_IS_SRC (stream->channelpad[1])) if (GST_PAD_IS_SRC (stream->channelpad[1]))
res &= gst_pad_push_event (stream->channelpad[1], event); res &= gst_pad_push_event (stream->channelpad[1], sent_event);
else else
res &= gst_pad_send_event (stream->channelpad[1], event); res &= gst_pad_send_event (stream->channelpad[1], sent_event);
} }
done: done:
@ -9460,6 +9461,7 @@ gst_rtspsrc_change_state (GstElement * element, GstStateChange transition)
goto start_failed; goto start_failed;
break; break;
case GST_STATE_CHANGE_READY_TO_PAUSED: case GST_STATE_CHANGE_READY_TO_PAUSED:
rtspsrc->seek_seqnum = gst_util_seqnum_next ();
/* init some state */ /* init some state */
rtspsrc->cur_protocols = rtspsrc->protocols; rtspsrc->cur_protocols = rtspsrc->protocols;
/* first attempt, don't ignore timeouts */ /* first attempt, don't ignore timeouts */