/* GStreamer plugin for forward error correction * Copyright (C) 2017 Pexip * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA * * Author: Mikhail Fludkov <misha@pexip.com> */ #include "rtpstoragestream.h" static RtpStorageItem * rtp_storage_item_new (GstBuffer * buffer, guint8 pt, guint16 seq) { RtpStorageItem *ret = g_slice_new0 (RtpStorageItem); ret->buffer = buffer; ret->pt = pt; ret->seq = seq; return ret; } static void rtp_storage_item_free (RtpStorageItem * item) { g_assert (item->buffer != NULL); gst_buffer_unref (item->buffer); g_slice_free (RtpStorageItem, item); } static gint rtp_storage_item_compare (gconstpointer a, gconstpointer b, gpointer userdata) { gint seq_diff = gst_rtp_buffer_compare_seqnum ( ((RtpStorageItem const *) a)->seq, ((RtpStorageItem const *) b)->seq); if (seq_diff >= 0) return 0; return 1; } static void rtp_storage_stream_resize (RtpStorageStream * stream, GstClockTime size_time) { GList *it; guint i, too_old_buffers_num = 0; g_assert (GST_CLOCK_TIME_IS_VALID (stream->max_arrival_time)); g_assert (GST_CLOCK_TIME_IS_VALID (size_time)); g_assert_cmpint (size_time, >, 0); /* Iterating from oldest sequence numbers to newest */ for (i = 0, it = stream->queue.tail; it; it = it->prev, ++i) { RtpStorageItem *item = it->data; GstClockTime arrival_time = GST_BUFFER_DTS_OR_PTS (item->buffer); if (GST_CLOCK_TIME_IS_VALID (arrival_time)) { if (stream->max_arrival_time - arrival_time > size_time) { too_old_buffers_num = i + 1; } else break; } } for (i = 0; i < too_old_buffers_num; ++i) { RtpStorageItem *item = g_queue_pop_tail (&stream->queue); rtp_storage_item_free (item); } } void rtp_storage_stream_resize_and_add_item (RtpStorageStream * stream, GstClockTime size_time, GstBuffer * buffer, guint8 pt, guint16 seq) { GstClockTime arrival_time = GST_BUFFER_DTS_OR_PTS (buffer); if (G_LIKELY (GST_CLOCK_TIME_IS_VALID (arrival_time))) { if (G_LIKELY (GST_CLOCK_TIME_IS_VALID (stream->max_arrival_time))) stream->max_arrival_time = MAX (stream->max_arrival_time, arrival_time); else stream->max_arrival_time = arrival_time; rtp_storage_stream_resize (stream, size_time); rtp_storage_stream_add_item (stream, buffer, pt, seq); } else { rtp_storage_stream_add_item (stream, buffer, pt, seq); } } RtpStorageStream * rtp_storage_stream_new (guint32 ssrc) { RtpStorageStream *ret = g_slice_new0 (RtpStorageStream); ret->max_arrival_time = GST_CLOCK_TIME_NONE; ret->ssrc = ssrc; g_mutex_init (&ret->stream_lock); return ret; } void rtp_storage_stream_free (RtpStorageStream * stream) { STREAM_LOCK (stream); while (stream->queue.length) rtp_storage_item_free (g_queue_pop_tail (&stream->queue)); STREAM_UNLOCK (stream); g_mutex_clear (&stream->stream_lock); g_slice_free (RtpStorageStream, stream); } void rtp_storage_stream_add_item (RtpStorageStream * stream, GstBuffer * buffer, guint8 pt, guint16 seq) { RtpStorageItem *item = rtp_storage_item_new (buffer, pt, seq); GList *sibling = g_queue_find_custom (&stream->queue, item, (GCompareFunc) rtp_storage_item_compare); g_queue_insert_before (&stream->queue, sibling, item); } GstBufferList * rtp_storage_stream_get_packets_for_recovery (RtpStorageStream * stream, guint8 pt_fec, guint16 lost_seq) { guint ret_length = 0; GList *end = NULL; GList *start = NULL; gboolean saw_fec = TRUE; /* To initialize the start pointer in the loop below */ GList *it; /* Looking for media stream chunk with FEC packets at the end, which could * can have the lost packet. For example: * * |#10 FEC| |#9 FEC| |#8| ... |#6| |#5 FEC| |#4 FEC| |#3 FEC| |#2| |#1| |#0| * * Say @lost_seq = 7. Want to return bufferlist with packets [#6 : #10]. Other * packets are not relevant for recovery of packet 7. * * Or the lost packet can be in the storage. In that case single packet is returned. * It can happen if: * - it could have arrived right after it was considered lost (more of a corner case) * - it was recovered together with the other lost packet (most likely) */ for (it = stream->queue.tail; it; it = it->prev) { RtpStorageItem *item = it->data; gboolean found_end = FALSE; /* Is the buffer we lost in the storage? */ if (item->seq == lost_seq) { start = it; end = it; ret_length = 1; break; } if (pt_fec == item->pt) { gint seq_diff = gst_rtp_buffer_compare_seqnum (lost_seq, item->seq); if (seq_diff >= 0) { if (it->prev) { gboolean media_next = pt_fec != ((RtpStorageItem *) it->prev->data)->pt; found_end = media_next; } else found_end = TRUE; } saw_fec = TRUE; } else if (saw_fec) { saw_fec = FALSE; start = it; ret_length = 0; } ++ret_length; if (found_end) { end = it; break; } } if (end && !start) start = end; if (start && end) { GstBufferList *ret = gst_buffer_list_new_sized (ret_length); GList *it; for (it = start; it != end->prev; it = it->prev) gst_buffer_list_add (ret, gst_buffer_ref (((RtpStorageItem *) it->data)->buffer)); return ret; } return NULL; } GstBuffer * rtp_storage_stream_get_redundant_packet (RtpStorageStream * stream, guint16 lost_seq) { GList *it; for (it = stream->queue.head; it; it = it->next) { RtpStorageItem *item = it->data; if (item->seq == lost_seq) return gst_buffer_ref (item->buffer); } return NULL; }