ulpfecdec: output perfect seqnums

ULP FEC, as defined in RFC 5109, has the protected and protection
packets sharing the same ssrc, and a different payload type, and
implies rewriting the seqnums of the protected stream when encoding
the protection packets. This has the unfortunate drawback of not
being able to tell whether a lost packet was a protection packet.

rtpbasedepayload relies on gaps in the seqnums to set the DISCONT
flag on buffers it outputs. Before that commit, this created two
problems:

* The protection packets don't make it as far as the depayloader,
  which means it will mark buffers as DISCONT every time the previous
  packets were protected

* While we could work around the previous issue by looking at
  the protection packets ignored and dropped in rtpptdemux, we
  would still mark buffers as DISCONT when a FEC packet was lost,
  as we cannot know that it was indeed a FEC packet, even though
  this should have no impact on the decoding of the stream

With this commit, we consider that when using ULPFEC, gaps in
the seqnums are not a reliable indicator of whether buffers should
be marked as DISCONT or not, and thus rewrite the seqnums on
the decoding side as well to form a perfect sequence, this
obviously doesn't prevent the jitterbuffer from doing its job
as the ulpfec decoder is downstream from it.

https://bugzilla.gnome.org/show_bug.cgi?id=794909
This commit is contained in:
Mathieu Duponchelle 2018-04-02 16:06:35 +02:00
parent 6d92fcd043
commit 90f5ae8f45
3 changed files with 63 additions and 10 deletions

View file

@ -363,11 +363,18 @@ gst_rtp_ulpfec_dec_chain (GstPad * pad, GstObject * parent, GstBuffer * buf)
GstRtpUlpFecDec *self = GST_RTP_ULPFEC_DEC (parent);
if (G_LIKELY (GST_FLOW_OK == self->chain_return_val)) {
GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
buf = gst_buffer_make_writable (buf);
if (G_UNLIKELY (self->unset_discont_flag)) {
self->unset_discont_flag = FALSE;
buf = gst_buffer_make_writable (buf);
GST_BUFFER_FLAG_UNSET (buf, GST_BUFFER_FLAG_DISCONT);
}
gst_rtp_buffer_map (buf, GST_MAP_WRITE, &rtp);
gst_rtp_buffer_set_seq (&rtp, self->next_seqnum++);
gst_rtp_buffer_unmap (&rtp);
return gst_pad_push (self->srcpad, buf);
}
@ -396,6 +403,9 @@ gst_rtp_ulpfec_dec_handle_packet_loss (GstRtpUlpFecDec * self, guint16 seqnum,
gst_rtp_ulpfec_dec_recover (self, self->caps_ssrc, caps_pt,
&recovered_pt, &recovered_seq))) {
if (seqnum == recovered_seq) {
GstBuffer *sent_buffer;
GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
recovered_buffer = gst_buffer_make_writable (recovered_buffer);
GST_BUFFER_PTS (recovered_buffer) = timestamp;
/* GST_BUFFER_DURATION (recovered_buffer) = duration;
@ -403,16 +413,21 @@ gst_rtp_ulpfec_dec_handle_packet_loss (GstRtpUlpFecDec * self, guint16 seqnum,
if (!self->lost_packet_from_storage)
rtp_storage_put_recovered_packet (self->storage,
gst_buffer_ref (recovered_buffer), recovered_pt, self->caps_ssrc,
recovered_seq);
recovered_buffer, recovered_pt, self->caps_ssrc, recovered_seq);
GST_DEBUG_OBJECT (self,
"Pushing recovered packet ssrc=0x%08x seq=%u %" GST_PTR_FORMAT,
self->caps_ssrc, seqnum, recovered_buffer);
sent_buffer = gst_buffer_copy_deep (recovered_buffer);
gst_rtp_buffer_map (sent_buffer, GST_MAP_WRITE, &rtp);
gst_rtp_buffer_set_seq (&rtp, self->next_seqnum++);
gst_rtp_buffer_unmap (&rtp);
ret = FALSE;
self->unset_discont_flag = TRUE;
self->chain_return_val = gst_pad_push (self->srcpad, recovered_buffer);
self->chain_return_val = gst_pad_push (self->srcpad, sent_buffer);
break;
}
@ -444,11 +459,15 @@ gst_rtp_ulpfec_dec_handle_sink_event (GstPad * pad, GstObject * parent,
gst_event_has_name (event, "GstRTPPacketLost")) {
guint seqnum;
GstClockTime timestamp, duration;
GstStructure *s;
event = gst_event_make_writable (event);
s = gst_event_writable_structure (event);
g_assert (self->have_caps_ssrc);
g_assert (self->storage);
if (!gst_structure_get (gst_event_get_structure (event),
if (!gst_structure_get (s,
"seqnum", G_TYPE_UINT, &seqnum,
"timestamp", G_TYPE_UINT64, &timestamp,
"duration", G_TYPE_UINT64, &duration, NULL))
@ -457,10 +476,15 @@ gst_rtp_ulpfec_dec_handle_sink_event (GstPad * pad, GstObject * parent,
forward =
gst_rtp_ulpfec_dec_handle_packet_loss (self, seqnum, timestamp,
duration);
if (forward)
if (forward) {
gst_structure_remove_field (s, "seqnum");
gst_structure_set (s, "might-have-been-fec", G_TYPE_BOOLEAN, TRUE, NULL);
++self->packets_unrecovered;
else
} else {
++self->packets_recovered;
}
GST_DEBUG_OBJECT (self, "Unrecovered / Recovered: %lu / %lu",
(gulong) self->packets_unrecovered, (gulong) self->packets_recovered);
} else if (GST_EVENT_CAPS == GST_EVENT_TYPE (event)) {
@ -514,6 +538,8 @@ gst_rtp_ulpfec_dec_init (GstRtpUlpFecDec * self)
self->fec_pt = DEFAULT_FEC_PT;
self->next_seqnum = g_random_int_range (0, G_MAXINT16);
self->chain_return_val = GST_FLOW_OK;
self->have_caps_ssrc = FALSE;
self->caps_ssrc = 0;

View file

@ -69,6 +69,7 @@ struct _GstRtpUlpFecDec {
GArray *scratch_buf;
gboolean lost_packet_from_storage;
gboolean lost_packet_returned;
guint16 next_seqnum;
/* stats */
gsize fec_packets_received;

View file

@ -74,7 +74,20 @@ push_lost_event (GstHarness * h, guint32 seqnum,
}
if (event_goes_through) {
fail_unless (packet_loss_in == packet_loss_out);
const GstStructure *s = gst_event_get_structure (packet_loss_out);
guint64 tscopy, durcopy;
gboolean might_have_been_fec;
fail_unless (gst_structure_has_name (s, "GstRTPPacketLost"));
fail_if (gst_structure_has_field (s, "seqnum"));
fail_unless (gst_structure_get_uint64 (s, "timestamp", &tscopy));
fail_unless (gst_structure_get_uint64 (s, "duration", &durcopy));
fail_unless (gst_structure_get_boolean (s, "might-have-been-fec",
&might_have_been_fec));
fail_unless_equals_uint64 (timestamp, tscopy);
fail_unless_equals_uint64 (duration, durcopy);
fail_unless (might_have_been_fec == TRUE);
gst_event_unref (packet_loss_out);
} else {
fail_unless (NULL == packet_loss_out);
@ -88,18 +101,31 @@ lose_and_recover_test (GstHarness * h, guint16 lost_seq,
guint64 duration = 222222;
guint64 timestamp = 111111;
GstBuffer *bufout;
GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
GstRTPBuffer rtpout = GST_RTP_BUFFER_INIT;
GstBuffer *wrap;
gpointer reccopy = g_malloc (recbuf_size);
memcpy (reccopy, recbuf, recbuf_size);
push_lost_event (h, lost_seq, timestamp, duration, FALSE);
bufout = gst_harness_pull (h);
fail_unless_equals_int (gst_buffer_get_size (bufout), recbuf_size);
fail_unless_equals_int (GST_BUFFER_PTS (bufout), timestamp);
fail_unless (gst_buffer_memcmp (bufout, 0, recbuf, recbuf_size) == 0);
wrap = gst_buffer_new_wrapped (reccopy, recbuf_size);
gst_rtp_buffer_map (wrap, GST_MAP_WRITE, &rtp);
gst_rtp_buffer_map (bufout, GST_MAP_READ, &rtpout);
gst_rtp_buffer_set_seq (&rtp, gst_rtp_buffer_get_seq (&rtpout));
fail_unless (gst_buffer_memcmp (bufout, 0, reccopy, recbuf_size) == 0);
gst_rtp_buffer_unmap (&rtp);
gst_rtp_buffer_unmap (&rtpout);
fail_unless (!GST_BUFFER_FLAG_IS_SET (bufout, GST_RTP_BUFFER_FLAG_REDUNDANT));
gst_buffer_unref (bufout);
g_free (reccopy);
/* Pushing the next buffer with discont flag set */
bufout = gst_buffer_new ();
bufout = gst_rtp_buffer_new_allocate (0, 0, 0);
GST_BUFFER_FLAG_SET (bufout, GST_BUFFER_FLAG_DISCONT);
bufout = gst_harness_push_and_pull (h, bufout);
/* Checking the flag was unset */