gst-libs/gst/rtp/gstbasertpdepayload.c: Check sequence numbers, mark input buffers with a discont flag for the subcla...

Original commit message from CVS:
* gst-libs/gst/rtp/gstbasertpdepayload.c:
(gst_base_rtp_depayload_chain),
(gst_base_rtp_depayload_handle_sink_event),
(gst_base_rtp_depayload_push_full),
(gst_base_rtp_depayload_change_state):
Check sequence numbers, mark input buffers with a discont flag for the
subclass when we detected a gap, drop duplicate buffers. We do this
because one can use the element without a jitterbuffer in front and we
don't want to feed the subclasses invalid or reordered data.
Do an error when the subclass did not provide a process function instead
of crashing.
Some other small cleanups.
This commit is contained in:
Wim Taymans 2008-05-23 14:14:28 +00:00
parent 747d52adb3
commit 79a725148d
2 changed files with 92 additions and 4 deletions

View file

@ -1,3 +1,18 @@
2008-05-23 Wim Taymans <wim.taymans@collabora.co.uk>
* gst-libs/gst/rtp/gstbasertpdepayload.c:
(gst_base_rtp_depayload_chain),
(gst_base_rtp_depayload_handle_sink_event),
(gst_base_rtp_depayload_push_full),
(gst_base_rtp_depayload_change_state):
Check sequence numbers, mark input buffers with a discont flag for the
subclass when we detected a gap, drop duplicate buffers. We do this
because one can use the element without a jitterbuffer in front and we
don't want to feed the subclasses invalid or reordered data.
Do an error when the subclass did not provide a process function instead
of crashing.
Some other small cleanups.
2008-05-22 Tim-Philipp Müller <tim.muller at collabora co uk>
* gst/videotestsrc/videotestsrc.c: (paint_hline_NV12_NV21):

View file

@ -56,6 +56,8 @@ struct _GstBaseRTPDepayloadPrivate
gboolean discont;
GstClockTime timestamp;
GstClockTime duration;
guint32 next_seqnum;
};
/* Filter signals and args */
@ -255,25 +257,79 @@ gst_base_rtp_depayload_chain (GstPad * pad, GstBuffer * in)
GstFlowReturn ret = GST_FLOW_OK;
GstBuffer *out_buf;
GstClockTime timestamp;
guint16 seqnum;
gboolean reset_seq, discont;
gint gap;
filter = GST_BASE_RTP_DEPAYLOAD (GST_OBJECT_PARENT (pad));
/* we must validate, it's possible that this element is plugged right after a
* network receiver and we don't want to operate on invalid data */
if (!gst_rtp_buffer_validate (in))
goto invalid_buffer;
priv = filter->priv;
priv->discont = GST_BUFFER_IS_DISCONT (in);
timestamp = GST_BUFFER_TIMESTAMP (in);
/* convert to running_time and save the timestamp, this is the timestamp
* we put on outgoing buffers. */
timestamp = GST_BUFFER_TIMESTAMP (in);
timestamp = gst_segment_to_running_time (&filter->segment, GST_FORMAT_TIME,
timestamp);
priv->timestamp = timestamp;
priv->duration = GST_BUFFER_DURATION (in);
seqnum = gst_rtp_buffer_get_seq (in);
reset_seq = TRUE;
discont = FALSE;
GST_LOG_OBJECT (filter, "discont %d, seqnum %u, timestamp %"
GST_TIME_FORMAT, priv->discont, seqnum, GST_TIME_ARGS (timestamp));
/* Check seqnum. This is a very simple check that makes sure that the seqnums
* are striclty increasing, dropping anything that is out of the ordinary. We
* can only do this when the next_seqnum is known. */
if (priv->next_seqnum != -1) {
gap = gst_rtp_buffer_compare_seqnum (seqnum, priv->next_seqnum);
/* if we have no gap, all is fine */
if (G_UNLIKELY (gap != 0)) {
GST_LOG_OBJECT (filter, "got packet %u, expected %u, gap %d", seqnum,
priv->next_seqnum, gap);
if (gap < 0) {
/* seqnum > next_seqnum, we are missing some packets, this is always a
* DISCONT. */
GST_LOG_OBJECT (filter, "%d missing packets", gap);
discont = TRUE;
} else {
/* seqnum < next_seqnum, we have seen this packet before or the sender
* could be restarted. If the packet is not too old, we throw it away as
* a duplicate, otherwise we mark discont and continue. 100 misordered
* packets is a good threshold. See also RFC 4737. */
if (gap < 100)
goto dropping;
GST_LOG_OBJECT (filter,
"%d > 100, packet too old, sender likely restarted", gap);
discont = TRUE;
}
}
}
priv->next_seqnum = (seqnum + 1) & 0xffff;
if (discont && !priv->discont) {
GST_LOG_OBJECT (filter, "mark DISCONT on input buffer");
/* we detected a seqnum discont but the buffer was not flagged with a discont,
* set the discont flag so that the subclass can throw away old data. */
priv->discont = TRUE;
GST_BUFFER_FLAG_SET (in, GST_BUFFER_FLAG_DISCONT);
}
bclass = GST_BASE_RTP_DEPAYLOAD_GET_CLASS (filter);
if (G_UNLIKELY (bclass->process == NULL))
goto no_process;
/* let's send it out to processing */
out_buf = bclass->process (filter, in);
if (out_buf) {
@ -298,6 +354,20 @@ invalid_buffer:
gst_buffer_unref (in);
return GST_FLOW_OK;
}
dropping:
{
GST_WARNING_OBJECT (filter, "%d <= 100, dropping old packet", gap);
gst_buffer_unref (in);
return GST_FLOW_OK;
}
no_process:
{
/* this is not fatal but should be filtered earlier */
GST_ELEMENT_ERROR (filter, STREAM, NOT_IMPLEMENTED, (NULL),
("The subclass does not have a process method"));
gst_buffer_unref (in);
return GST_FLOW_ERROR;
}
}
static gboolean
@ -314,6 +384,7 @@ gst_base_rtp_depayload_handle_sink_event (GstPad * pad, GstEvent * event)
gst_segment_init (&filter->segment, GST_FORMAT_UNDEFINED);
filter->need_newsegment = TRUE;
filter->priv->next_seqnum = -1;
break;
case GST_EVENT_NEWSEGMENT:
{
@ -377,7 +448,7 @@ gst_base_rtp_depayload_push_full (GstBaseRTPDepayload * filter,
/* set the caps if any */
srccaps = GST_PAD_CAPS (filter->srcpad);
if (srccaps)
if (G_LIKELY (srccaps))
gst_buffer_set_caps (out_buf, srccaps);
bclass = GST_BASE_RTP_DEPAYLOAD_GET_CLASS (filter);
@ -386,7 +457,8 @@ gst_base_rtp_depayload_push_full (GstBaseRTPDepayload * filter,
if (bclass->set_gst_timestamp && do_ts)
bclass->set_gst_timestamp (filter, rtptime, out_buf);
if (priv->discont) {
if (G_UNLIKELY (priv->discont)) {
GST_LOG_OBJECT (filter, "Marking DISCONT on output buffer");
GST_BUFFER_FLAG_SET (out_buf, GST_BUFFER_FLAG_DISCONT);
priv->discont = FALSE;
}
@ -395,8 +467,8 @@ gst_base_rtp_depayload_push_full (GstBaseRTPDepayload * filter,
GST_LOG_OBJECT (filter, "Pushing buffer size %d, timestamp %" GST_TIME_FORMAT,
GST_BUFFER_SIZE (out_buf),
GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (out_buf)));
ret = gst_pad_push (filter->srcpad, out_buf);
GST_LOG_OBJECT (filter, "Pushed buffer: %s", gst_flow_get_name (ret));
return ret;
}
@ -548,6 +620,7 @@ gst_base_rtp_depayload_change_state (GstElement * element,
priv->npt_stop = -1;
priv->play_speed = 1.0;
priv->play_scale = 1.0;
filter->priv->next_seqnum = -1;
break;
case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
break;