diff --git a/gst/rtpmanager/Makefile.am b/gst/rtpmanager/Makefile.am index 15f1abe56c..fdc0309562 100644 --- a/gst/rtpmanager/Makefile.am +++ b/gst/rtpmanager/Makefile.am @@ -10,6 +10,8 @@ libgstrtpmanager_la_SOURCES = gstrtpmanager.c \ gstrtpmux.c \ gstrtpptdemux.c \ gstrtprtxqueue.c \ + gstrtprtxreceive.c \ + gstrtprtxsend.c \ gstrtpssrcdemux.c \ rtpjitterbuffer.c \ rtpsession.c \ @@ -24,6 +26,8 @@ noinst_HEADERS = gstrtpbin.h \ gstrtpptdemux.h \ gstrtpssrcdemux.h \ gstrtprtxqueue.h \ + gstrtprtxreceive.h \ + gstrtprtxsend.h \ rtpjitterbuffer.h \ rtpsession.h \ rtpsource.h \ diff --git a/gst/rtpmanager/gstrtpmanager.c b/gst/rtpmanager/gstrtpmanager.c index 9a28399970..426b0b79bf 100644 --- a/gst/rtpmanager/gstrtpmanager.c +++ b/gst/rtpmanager/gstrtpmanager.c @@ -26,6 +26,8 @@ #include "gstrtpptdemux.h" #include "gstrtpsession.h" #include "gstrtprtxqueue.h" +#include "gstrtprtxreceive.h" +#include "gstrtprtxsend.h" #include "gstrtpssrcdemux.h" #include "gstrtpdtmfmux.h" #include "gstrtpmux.h" @@ -51,6 +53,12 @@ plugin_init (GstPlugin * plugin) if (!gst_rtp_rtx_queue_plugin_init (plugin)) return FALSE; + if (!gst_rtp_rtx_receive_plugin_init (plugin)) + return FALSE; + + if (!gst_rtp_rtx_send_plugin_init (plugin)) + return FALSE; + if (!gst_element_register (plugin, "rtpssrcdemux", GST_RANK_NONE, GST_TYPE_RTP_SSRC_DEMUX)) return FALSE; diff --git a/gst/rtpmanager/gstrtprtxreceive.c b/gst/rtpmanager/gstrtprtxreceive.c new file mode 100644 index 0000000000..f69a7d1341 --- /dev/null +++ b/gst/rtpmanager/gstrtprtxreceive.c @@ -0,0 +1,734 @@ +/* RTP Retransmission receiver element for GStreamer + * + * gstrtprtxreceive.c: + * + * Copyright (C) 2013 Collabora Ltd. + * @author Julien Isorce + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +/** + * SECTION:element-rtprtxreceive + * @see_also: rtprtxsend, rtpsession, rtpjitterbuffer + * + * The receiver will listen to the custom retransmission events from the + * downstream jitterbuffer and will remember the SSRC1 of the stream and + * seqnum that was requested. When it sees a packet with one of the stored + * seqnum, it associates the SSRC2 of the stream with the SSRC1 of the + * master stream. From then it knows that SSRC2 is the retransmission + * stream of SSRC1. This algorithm is stated in RFC 4588. For this + * algorithm to work, RFC4588 also states that no two pending retransmission + * requests can exist for the same seqnum and different SSRCs or else it + * would be impossible to associate the retransmission with the original + * requester SSRC. + * When the RTX receiver has associated the retransmission packets, + * it can depayload and forward them to the source pad of the element. + * RTX is SSRC-multiplexed. See #GstRtpRtxSend + * + * + * Example pipelines + * |[ + * gst-launch-1.0 rtpsession name=rtpsession \ + * audiotestsrc ! speexenc ! rtpspeexpay pt=97 ! rtprtxsend rtx-payload-type=99 ! \ + * identity drop-probability=0.1 ! rtpsession.send_rtp_sink \ + * rtpsession.send_rtp_src ! udpsink host="127.0.0.1" port=5000 \ + * udpsrc port=5001 ! rtpsession.recv_rtcp_sink \ + * rtpsession.send_rtcp_src ! udpsink host="127.0.0.1" port=5002 sync=false async=false + * ]| Send audio stream through port 5000. (5001 and 5002 are just the rtcp link with the receiver) + * |[ + * gst-launch-1.0 rtpsession name=rtpsession \ + * udpsrc port=5000 caps="application/x-rtp,media=(string)audio,clock-rate=(int)44100,encoding-name=(string)SPEEX,encoding-params=(string)1,octet-align=(string)1" ! \ + * rtpsession.recv_rtp_sink \ + * rtpsession.recv_rtp_src ! rtprtxreceive rtx-payload-types="99" ! rtpjitterbuffer do-retransmission=true ! rtpspeexdepay ! \ + * speexdec ! audioconvert ! autoaudiosink \ + * rtpsession.send_rtcp_src ! udpsink host="127.0.0.1" port=5001 \ + * udpsrc port=5002 ! rtpsession.recv_rtcp_sink sync=fakse async=false + * ]| Receive audio stream from port 5000. (5001 and 5002 are just the rtcp link with the sender) + * On sender side make sure to use a different payload type for the stream and + * its associated retransmission stream (see #GstRtpRtxSend). Note that several retransmission streams can + * have the same payload type so this is not deterministic. Actually the + * rtprtxreceiver element does the association using seqnum values. + * On receiver side set all the retransmission payload types (Those informations are retrieve + * through SDP). + * You should still hear a clear sound when setting drop-probability to something greater than 0. + * The rtpjitterbuffer will generate a custom upstream event GstRTPRetransmissionRequest when + * it assumes that one packet is missing. Then this request is translated to a FB NACK in the rtcp link + * Finally the rtpsession of the sender side re-convert it in a GstRTPRetransmissionRequest that will + * be handle by rtprtxsend. + * When increasing this value it may be possible that even the retransmission stream would be dropped + * so the receiver will ask to resend the packets again and again until it actually receive them. + * If the value is too high the rtprtxsend will not be able to retrieve the packet in its list of + * stored packets. For learning purpose you could try to increase the max-size-packets or max-size-time + * rtprtxsender's properties. + * Also note that you should use rtprtxsend through rtpbin and its set-aux-send property. See #GstRtpBin. + * |[ + * gst-launch-1.0 rtpsession name=rtpsession0 \ + * audiotestsrc wave=0 ! speexenc ! rtpspeexpay pt=97 ! rtprtxsend rtx-payload-type=99 seqnum-offset=1 ! \ + * identity drop-probability=0.1 ! rtpsession0.send_rtp_sink \ + * rtpsession0.send_rtp_src ! udpsink host="127.0.0.1" port=5000 \ + * udpsrc port=5001 ! rtpsession0.recv_rtcp_sink \ + * rtpsession0.send_rtcp_src ! udpsink host="127.0.0.1" port=5002 sync=false async=false \ + * rtpsession name=rtpsession1 \ + * audiotestsrc wave=0 ! speexenc ! rtpspeexpay pt=97 ! rtprtxsend rtx-payload-type=99 seqnum-offset=10 ! \ + * identity drop-probability=0.1 ! rtpsession1.send_rtp_sink \ + * rtpsession1.send_rtp_src ! udpsink host="127.0.0.1" port=5000 \ + * udpsrc port=5004 ! rtpsession1.recv_rtcp_sink \ + * rtpsession1.send_rtcp_src ! udpsink host="127.0.0.1" port=5002 sync=false async=false + * ]| Send two audio streams to port 5000. + * |[ + * gst-launch-1.0 rtpsession name=rtpsession + * udpsrc port=5000 caps="application/x-rtp,media=(string)audio,clock-rate=(int)44100,encoding-name=(string)SPEEX,encoding-params=(string)1,octet-align=(string)1" ! \ + * rtpsession.recv_rtp_sink \ + * rtpsession.recv_rtp_src ! rtprtxreceive rtx-payload-types="99" ! rtpssrcdemux name=demux \ + * demux. ! queue ! rtpjitterbuffer do-retransmission=true ! rtpspeexdepay ! speexdec ! audioconvert ! autoaudiosink \ + * demux. ! queue ! rtpjitterbuffer do-retransmission=true ! rtpspeexdepay ! speexdec ! audioconvert ! autoaudiosink \ + * rtpsession.send_rtcp_src ! ! tee name=t ! queue ! udpsink host="127.0.0.1" port=5001 t. ! queue ! udpsink host="127.0.0.1" port=5004 \ + * udpsrc port=5002 ! rtpsession.recv_rtcp_sink sync=fakse async=false + * ]| Receive audio stream from port 5000. + * On sender side the two streams have the same payload type for master streams, Same about retransmission streams. + * The streams are sent to the network through two distincts sessions. + * But we need to set a different seqnum-offset to make sure their seqnum navigate at a different rate like in concrete cases. + * We could also choose the same seqnum offset but we would require to set a different initial seqnum value. + * This is also why the rtprtxreceive can succeed to do the association between master and retransmission stream. + * On receiver side the same session is used to receive the two streams. So the rtpssrcdemux is here to demultiplex + * those two streams. The rtprtxreceive is responsible for reconstructing the original packets from the two retransmission streams. + * You can play with the drop-probability value for one or both streams. + * You should hear a clear sound. (after a few seconds the two streams wave feel synchronized) + * + * + * Last reviewed on 2013-11-08 (1.x) + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include +#include +#include + +#include "gstrtprtxreceive.h" + +GST_DEBUG_CATEGORY_STATIC (gst_rtp_rtx_receive_debug); +#define GST_CAT_DEFAULT gst_rtp_rtx_receive_debug + +#define DEFAULT_RTX_PAYLOAD_TYPES "" + +enum +{ + PROP_0, + PROP_RTX_PAYLOAD_TYPES, + PROP_NUM_RTX_REQUESTS, + PROP_NUM_RTX_PACKETS, + PROP_NUM_RTX_ASSOC_PACKETS, + PROP_LAST +}; + +static GstStaticPadTemplate src_factory = GST_STATIC_PAD_TEMPLATE ("src", + GST_PAD_SRC, + GST_PAD_ALWAYS, + GST_STATIC_CAPS ("application/x-rtp") + ); + +static GstStaticPadTemplate sink_factory = GST_STATIC_PAD_TEMPLATE ("sink", + GST_PAD_SINK, + GST_PAD_ALWAYS, + GST_STATIC_CAPS ("application/x-rtp") + ); + +static gboolean gst_rtp_rtx_receive_src_event (GstPad * pad, GstObject * parent, + GstEvent * event); +static GstFlowReturn gst_rtp_rtx_receive_chain (GstPad * pad, + GstObject * parent, GstBuffer * buffer); + +static GstStateChangeReturn gst_rtp_rtx_receive_change_state (GstElement * + element, GstStateChange transition); + +static void gst_rtp_rtx_receive_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec); +static void gst_rtp_rtx_receive_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec); +static void gst_rtp_rtx_receive_finalize (GObject * object); + +G_DEFINE_TYPE (GstRtpRtxReceive, gst_rtp_rtx_receive, GST_TYPE_ELEMENT); + +static void +gst_rtp_rtx_receive_class_init (GstRtpRtxReceiveClass * klass) +{ + GObjectClass *gobject_class; + GstElementClass *gstelement_class; + + gobject_class = (GObjectClass *) klass; + gstelement_class = (GstElementClass *) klass; + + gobject_class->get_property = gst_rtp_rtx_receive_get_property; + gobject_class->set_property = gst_rtp_rtx_receive_set_property; + gobject_class->finalize = gst_rtp_rtx_receive_finalize; + + g_object_class_install_property (gobject_class, PROP_RTX_PAYLOAD_TYPES, + g_param_spec_string ("rtx-payload-types", + "Colon separated list of payload format type", + "Set through SDP (fmtp), it helps to detect restransmission streams " + "eg 97:101:127", DEFAULT_RTX_PAYLOAD_TYPES, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, PROP_NUM_RTX_REQUESTS, + g_param_spec_uint ("num-rtx-requests", "Num RTX Requests", + "Number of retransmission events received", 0, G_MAXUINT, + 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, PROP_NUM_RTX_PACKETS, + g_param_spec_uint ("num-rtx-packets", "Num RTX Packets", + " Number of retransmission packets received", 0, G_MAXUINT, + 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, PROP_NUM_RTX_ASSOC_PACKETS, + g_param_spec_uint ("num-rtx-assoc-packets", + "Num RTX Associated Packets", "Number of retransmission packets " + "correctly associated with retransmission requests", 0, G_MAXUINT, + 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); + + gst_element_class_add_pad_template (gstelement_class, + gst_static_pad_template_get (&src_factory)); + gst_element_class_add_pad_template (gstelement_class, + gst_static_pad_template_get (&sink_factory)); + + gst_element_class_set_static_metadata (gstelement_class, + "RTP Retransmission receiver", "Codec", + "Receive retransmitted RTP packets according to RFC4588", + "Julien Isorce "); + + gstelement_class->change_state = + GST_DEBUG_FUNCPTR (gst_rtp_rtx_receive_change_state); +} + +static void +gst_rtp_rtx_receive_reset (GstRtpRtxReceive * rtx) +{ + g_mutex_lock (&rtx->lock); + g_hash_table_remove_all (rtx->ssrc2_ssrc1_map); + g_hash_table_remove_all (rtx->ssrc1_payload_type_map); + g_hash_table_remove_all (rtx->seqnum_ssrc1_map); + g_hash_table_remove_all (rtx->rtx_payload_type_set); + rtx->num_rtx_requests = 0; + rtx->num_rtx_packets = 0; + rtx->num_rtx_assoc_packets = 0; + g_mutex_unlock (&rtx->lock); +} + +static void +gst_rtp_rtx_receive_finalize (GObject * object) +{ + GstRtpRtxReceive *rtx = GST_RTP_RTX_RECEIVE (object); + + gst_rtp_rtx_receive_reset (rtx); + + if (rtx->ssrc2_ssrc1_map) { + g_hash_table_destroy (rtx->ssrc2_ssrc1_map); + rtx->ssrc2_ssrc1_map = NULL; + } + + if (rtx->ssrc1_payload_type_map) { + g_hash_table_destroy (rtx->ssrc1_payload_type_map); + rtx->ssrc1_payload_type_map = NULL; + } + + if (rtx->seqnum_ssrc1_map) { + g_hash_table_destroy (rtx->seqnum_ssrc1_map); + rtx->seqnum_ssrc1_map = NULL; + } + + if (rtx->rtx_payload_type_set) { + g_hash_table_destroy (rtx->rtx_payload_type_set); + rtx->rtx_payload_type_set = NULL; + } + + g_mutex_clear (&rtx->lock); + + G_OBJECT_CLASS (gst_rtp_rtx_receive_parent_class)->finalize (object); +} + +static void +gst_rtp_rtx_receive_init (GstRtpRtxReceive * rtx) +{ + GstElementClass *klass = GST_ELEMENT_GET_CLASS (rtx); + + rtx->srcpad = + gst_pad_new_from_template (gst_element_class_get_pad_template (klass, + "src"), "src"); + GST_PAD_SET_PROXY_CAPS (rtx->srcpad); + GST_PAD_SET_PROXY_ALLOCATION (rtx->srcpad); + gst_pad_set_event_function (rtx->srcpad, + GST_DEBUG_FUNCPTR (gst_rtp_rtx_receive_src_event)); + gst_element_add_pad (GST_ELEMENT (rtx), rtx->srcpad); + + rtx->sinkpad = + gst_pad_new_from_template (gst_element_class_get_pad_template (klass, + "sink"), "sink"); + GST_PAD_SET_PROXY_CAPS (rtx->sinkpad); + GST_PAD_SET_PROXY_ALLOCATION (rtx->sinkpad); + gst_pad_set_chain_function (rtx->sinkpad, + GST_DEBUG_FUNCPTR (gst_rtp_rtx_receive_chain)); + gst_element_add_pad (GST_ELEMENT (rtx), rtx->sinkpad); + + rtx->ssrc2_ssrc1_map = g_hash_table_new (g_direct_hash, g_direct_equal); + rtx->ssrc1_payload_type_map = + g_hash_table_new (g_direct_hash, g_direct_equal); + rtx->seqnum_ssrc1_map = g_hash_table_new (g_direct_hash, g_direct_equal); + rtx->rtx_payload_type_set = g_hash_table_new (g_direct_hash, g_direct_equal); + + g_mutex_init (&rtx->lock); +} + +static gboolean +gst_rtp_rtx_receive_src_event (GstPad * pad, GstObject * parent, + GstEvent * event) +{ + GstRtpRtxReceive *rtx = GST_RTP_RTX_RECEIVE (parent); + gboolean res; + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_CUSTOM_UPSTREAM: + { + const GstStructure *s = gst_event_get_structure (event); + + /* This event usually comes from the downstream gstrtpjitterbuffer */ + if (gst_structure_has_name (s, "GstRTPRetransmissionRequest")) { + guint seqnum = 0; + guint ssrc = 0; + gpointer ssrc1 = 0; + gpointer ssrc2 = 0; + + /* retrieve seqnum of the packet that need to be restransmisted */ + if (!gst_structure_get_uint (s, "seqnum", &seqnum)) + seqnum = -1; + + /* retrieve ssrc of the packet that need to be restransmisted + * it's usefull when reconstructing the original packet from the rtx packet */ + if (!gst_structure_get_uint (s, "ssrc", &ssrc)) + ssrc = -1; + + GST_DEBUG_OBJECT (rtx, + "request seqnum: %" G_GUINT16_FORMAT ", ssrc: %" G_GUINT32_FORMAT, + seqnum, ssrc); + + g_mutex_lock (&rtx->lock); + + /* increase number of seen requests for our statistics */ + ++rtx->num_rtx_requests; + + /* First, we lookup in our map to see if we have already associate this + * master stream ssrc with its retransmisted stream. + * Every ssrc are unique so we can use the same hash table + * for both retrieving the ssrc1 from ssrc2 and also ssrc2 from ssrc1 + */ + if (g_hash_table_lookup_extended (rtx->ssrc2_ssrc1_map, + GUINT_TO_POINTER (ssrc), NULL, &ssrc2) + && GPOINTER_TO_UINT (ssrc2) != GPOINTER_TO_UINT (ssrc)) { + GST_DEBUG ("Retransmited stream %" G_GUINT32_FORMAT + " already associated to its master", GPOINTER_TO_UINT (ssrc2)); + } else { + /* not already associated but also we have to check that we have not + * already considered this request. + */ + if (g_hash_table_lookup_extended (rtx->seqnum_ssrc1_map, + GUINT_TO_POINTER (seqnum), NULL, &ssrc1)) { + if (GPOINTER_TO_UINT (ssrc1) == ssrc) { + /* do nothing because we have already considered this request + * The jitter may be too impatient of the rtx packet has been + * lost too. + * It does not mean we reject the event, we still want to forward + * the request to the gstrtpsession to be translater into a FB NACK + */ + GST_DEBUG ("Duplicated request seqnum: %" G_GUINT16_FORMAT + ", ssrc1: %" G_GUINT32_FORMAT, seqnum, ssrc); + } else { + /* From RFC 4588: + * the receiver MUST NOT have two outstanding requests for the + * same packet sequence number in two different original streams + * before the association is resolved. Otherwise it's impossible + * to associate a rtx stream and its master stream + */ + GST_DEBUG ("reject request for seqnum %" G_GUINT16_FORMAT + "of master stream %" G_GUINT32_FORMAT, seqnum, ssrc); + res = TRUE; + + /* remove seqnum in order to reuse the spot */ + g_hash_table_remove (rtx->seqnum_ssrc1_map, + GUINT_TO_POINTER (seqnum)); + + /* do not forward the event as we are rejecting this request */ + g_mutex_unlock (&rtx->lock); + gst_event_unref (event); + return res; + } + } else { + /* the request has not been already considered + * insert it for the first time */ + GST_DEBUG + ("packet number %" G_GUINT16_FORMAT " of master stream %" + G_GUINT32_FORMAT " needs to be retransmited", seqnum, ssrc); + g_hash_table_insert (rtx->seqnum_ssrc1_map, + GUINT_TO_POINTER (seqnum), GUINT_TO_POINTER (ssrc)); + } + } + + g_mutex_unlock (&rtx->lock); + } + /* Transfer event upstream so that the request can acutally by translated + * through gstrtpsession through the network */ + res = gst_pad_event_default (pad, parent, event); + break; + } + default: + res = gst_pad_event_default (pad, parent, event); + break; + } + return res; +} + +/* Copy fixed header and extension. Replace current ssrc by ssrc1, + * remove OSN and replace current seq num by OSN. + * Copy memory to avoid to manually copy each rtp buffer field. + */ +static GstBuffer * +_gst_rtp_buffer_new_from_rtx (GstRTPBuffer * rtp, guint32 ssrc1, + guint16 orign_seqnum, guint8 origin_payload_type) +{ + GstMemory *mem = NULL; + GstRTPBuffer new_rtp = GST_RTP_BUFFER_INIT; + GstBuffer *new_buffer = gst_buffer_new (); + GstMapInfo map; + guint payload_len = 0; + + /* copy fixed header */ + mem = gst_memory_copy (rtp->map[0].memory, 0, rtp->size[0]); + gst_buffer_append_memory (new_buffer, mem); + + /* copy extension if any */ + if (rtp->size[1]) { + mem = gst_memory_copy (rtp->map[1].memory, 0, rtp->size[1]); + gst_buffer_append_memory (new_buffer, mem); + } + + /* copy payload and remove OSN */ + payload_len = rtp->size[2] - 2; + mem = gst_allocator_alloc (NULL, payload_len, NULL); + + gst_memory_map (mem, &map, GST_MAP_WRITE); + if (rtp->size[2]) + memcpy (map.data, (guint8 *) rtp->data[2] + 2, payload_len); + gst_memory_unmap (mem, &map); + gst_buffer_append_memory (new_buffer, mem); + + /* the sender always constructs rtx packets without padding, + * But the receiver can still receive rtx packets with padding. + * So just copy it. + */ + if (rtp->size[3]) { + guint pad_len = rtp->size[3]; + + mem = gst_allocator_alloc (NULL, pad_len, NULL); + + gst_memory_map (mem, &map, GST_MAP_WRITE); + map.data[pad_len - 1] = pad_len; + gst_memory_unmap (mem, &map); + + gst_buffer_append_memory (new_buffer, mem); + } + + /* set ssrc and seq num */ + gst_rtp_buffer_map (new_buffer, GST_MAP_WRITE, &new_rtp); + gst_rtp_buffer_set_ssrc (&new_rtp, ssrc1); + gst_rtp_buffer_set_seq (&new_rtp, orign_seqnum); + gst_rtp_buffer_set_payload_type (&new_rtp, origin_payload_type); + gst_rtp_buffer_unmap (&new_rtp); + + return new_buffer; +} + +static GstFlowReturn +gst_rtp_rtx_receive_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) +{ + GstRtpRtxReceive *rtx = GST_RTP_RTX_RECEIVE (parent); + GstRTPBuffer rtp = GST_RTP_BUFFER_INIT; + GstFlowReturn ret = GST_FLOW_OK; + GstBuffer *new_buffer = NULL; + guint32 ssrc = 0; + gpointer ssrc1 = 0; + guint32 ssrc2 = 0; + guint16 seqnum = 0; + guint16 orign_seqnum = 0; + guint8 payload_type = 0; + guint8 origin_payload_type = 0; + gboolean is_rtx = FALSE; + gboolean drop = FALSE; + + /* map current rtp packet to parse its header */ + gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp); + ssrc = gst_rtp_buffer_get_ssrc (&rtp); + seqnum = gst_rtp_buffer_get_seq (&rtp); + payload_type = gst_rtp_buffer_get_payload_type (&rtp); + + /* check if we have a retransmission packet (this information comes from SDP) */ + g_mutex_lock (&rtx->lock); + is_rtx = + g_hash_table_lookup_extended (rtx->rtx_payload_type_set, + GUINT_TO_POINTER (payload_type), NULL, NULL); + g_mutex_unlock (&rtx->lock); + + if (is_rtx) { + /* read OSN in the rtx payload */ + orign_seqnum = GST_READ_UINT16_BE (gst_rtp_buffer_get_payload (&rtp)); + } + + g_mutex_lock (&rtx->lock); + + /* if the current packet is from a retransmission stream */ + if (is_rtx) { + /* increase our statistic */ + ++rtx->num_rtx_packets; + + /* first we check if we already have associated this retransmission stream + * to a master stream */ + if (g_hash_table_lookup_extended (rtx->ssrc2_ssrc1_map, + GUINT_TO_POINTER (ssrc), NULL, &ssrc1)) { + GST_DEBUG + ("packet is from retransmission stream %" G_GUINT32_FORMAT + " already associated to master stream %" G_GUINT32_FORMAT, ssrc, + GPOINTER_TO_UINT (ssrc1)); + ssrc2 = ssrc; + + /* also retrieve the payload type of the original stream in order to + * reconstruct the packet */ + origin_payload_type = + GPOINTER_TO_UINT (g_hash_table_lookup (rtx->ssrc1_payload_type_map, + ssrc1)); + } else { + /* the current retransmisted packet has its rtx stream not already + * associated to a master stream, so retrieve it from our request + * history */ + if (g_hash_table_lookup_extended (rtx->seqnum_ssrc1_map, + GUINT_TO_POINTER (orign_seqnum), NULL, &ssrc1)) { + GST_DEBUG + ("associate retransmisted stream %" G_GUINT32_FORMAT + " to master stream %" G_GUINT32_FORMAT " thanks to packet %" + G_GUINT16_FORMAT "", ssrc, GPOINTER_TO_UINT (ssrc1), orign_seqnum); + ssrc2 = ssrc; + + /* free the spot so that this seqnum can be used to do another + * association */ + g_hash_table_remove (rtx->seqnum_ssrc1_map, + GUINT_TO_POINTER (orign_seqnum)); + + /* actually do the association between rtx stream and master stream */ + g_hash_table_insert (rtx->ssrc2_ssrc1_map, GUINT_TO_POINTER (ssrc2), + ssrc1); + + /* just put a guard */ + if (GPOINTER_TO_UINT (ssrc1) == ssrc2) + g_warning + ("RTX receiver ssrc2_ssrc1_map bad state, ssrc %" G_GUINT32_FORMAT + " are the same\n", ssrc); + + /* also do the association between master stream and rtx stream + * every ssrc are unique so we can use the same hash table + * for both retrieving the ssrc1 from ssrc2 and also ssrc2 from ssrc1 + */ + g_hash_table_insert (rtx->ssrc2_ssrc1_map, ssrc1, + GUINT_TO_POINTER (ssrc2)); + + /* retrieve the original payload type */ + origin_payload_type = + GPOINTER_TO_UINT (g_hash_table_lookup (rtx->ssrc1_payload_type_map, + ssrc1)); + } else { + /* we are not able to associate this rtx packet with a master stream */ + GST_DEBUG + ("drop rtx packet because its orign_seqnum %" G_GUINT16_FORMAT + " is not in pending retransmission requests", orign_seqnum); + drop = TRUE; + } + } + } else { /* not rtx */ + /* store ssrc -> pt association */ + g_hash_table_insert (rtx->ssrc1_payload_type_map, GUINT_TO_POINTER (ssrc), + GUINT_TO_POINTER (payload_type)); + } + + /* if not dropped the packet was successfully associated */ + if (is_rtx && !drop) + ++rtx->num_rtx_assoc_packets; + + g_mutex_unlock (&rtx->lock); + + /* just drop the packet if the association could not have been made */ + if (drop) { + gst_rtp_buffer_unmap (&rtp); + gst_buffer_unref (buffer); + return GST_FLOW_OK; + } + + /* create the retransmission packet */ + if (is_rtx) + new_buffer = + _gst_rtp_buffer_new_from_rtx (&rtp, GPOINTER_TO_UINT (ssrc1), + orign_seqnum, origin_payload_type); + + gst_rtp_buffer_unmap (&rtp); + + /* push the packet */ + if (is_rtx) { + gst_buffer_unref (buffer); + GST_LOG_OBJECT (rtx, "push packet seqnum:%" G_GUINT16_FORMAT + " from a restransmission stream ssrc2:%" G_GUINT32_FORMAT " (src %" + G_GUINT32_FORMAT ")", orign_seqnum, ssrc2, GPOINTER_TO_UINT (ssrc1)); + ret = gst_pad_push (rtx->srcpad, new_buffer); + } else { + GST_LOG_OBJECT (rtx, "push packet seqnum:%" G_GUINT16_FORMAT + " from a master stream ssrc: %" G_GUINT32_FORMAT, seqnum, ssrc); + ret = gst_pad_push (rtx->srcpad, buffer); + } + + return ret; +} + +static void +construct_pt_string (gpointer key, gpointer value, gpointer user_data) +{ + GString **str = (GString **) user_data; + if (!(*str)) { + *str = g_string_new (NULL); + g_string_printf (*str, "%d", GPOINTER_TO_UINT (key)); + } else { + g_string_append_printf (*str, ":%d", GPOINTER_TO_UINT (key)); + } +} + +static void +gst_rtp_rtx_receive_get_property (GObject * object, + guint prop_id, GValue * value, GParamSpec * pspec) +{ + GstRtpRtxReceive *rtx = GST_RTP_RTX_RECEIVE (object); + + switch (prop_id) { + case PROP_RTX_PAYLOAD_TYPES:{ + GString *str = NULL; + g_mutex_lock (&rtx->lock); + g_hash_table_foreach (rtx->rtx_payload_type_set, + (GHFunc) construct_pt_string, &str); + if (str) + g_value_take_string (value, g_string_free (str, FALSE)); + g_mutex_unlock (&rtx->lock); + break; + } + case PROP_NUM_RTX_REQUESTS: + g_mutex_lock (&rtx->lock); + g_value_set_uint (value, rtx->num_rtx_requests); + g_mutex_unlock (&rtx->lock); + break; + case PROP_NUM_RTX_PACKETS: + g_mutex_lock (&rtx->lock); + g_value_set_uint (value, rtx->num_rtx_packets); + g_mutex_unlock (&rtx->lock); + break; + case PROP_NUM_RTX_ASSOC_PACKETS: + g_mutex_lock (&rtx->lock); + g_value_set_uint (value, rtx->num_rtx_assoc_packets); + g_mutex_unlock (&rtx->lock); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_rtp_rtx_receive_set_property (GObject * object, + guint prop_id, const GValue * value, GParamSpec * pspec) +{ + GstRtpRtxReceive *rtx = GST_RTP_RTX_RECEIVE (object); + gchar **str_fmtp = NULL; + guint nb_fmtp = 0; + gint i = 0; + + switch (prop_id) { + case PROP_RTX_PAYLOAD_TYPES: + g_mutex_lock (&rtx->lock); + /* parses string ex: 97:101:122 */ + str_fmtp = g_strsplit (g_value_get_string (value), ":", -1); + nb_fmtp = g_strv_length (str_fmtp); + if (nb_fmtp > 0) { + for (i = 0; i < nb_fmtp; ++i) { + gdouble fmtpd = g_strtod (str_fmtp[i], NULL); + /* dynamic range is in [95, 127] */ + if (fmtpd > 95 && fmtpd < 128) { + guint8 fmtp = fmtpd; + g_hash_table_add (rtx->rtx_payload_type_set, + GUINT_TO_POINTER (fmtp)); + GST_INFO ("add rtx payload type %" G_GUINT16_FORMAT, fmtp); + } + } + } + if (str_fmtp) + g_strfreev (str_fmtp); + g_mutex_unlock (&rtx->lock); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static GstStateChangeReturn +gst_rtp_rtx_receive_change_state (GstElement * element, + GstStateChange transition) +{ + GstStateChangeReturn ret; + GstRtpRtxReceive *rtx; + + rtx = GST_RTP_RTX_RECEIVE (element); + + switch (transition) { + default: + break; + } + + ret = + GST_ELEMENT_CLASS (gst_rtp_rtx_receive_parent_class)->change_state + (element, transition); + + switch (transition) { + case GST_STATE_CHANGE_PAUSED_TO_READY: + gst_rtp_rtx_receive_reset (rtx); + break; + default: + break; + } + + return ret; +} + +gboolean +gst_rtp_rtx_receive_plugin_init (GstPlugin * plugin) +{ + GST_DEBUG_CATEGORY_INIT (gst_rtp_rtx_receive_debug, "rtprtxreceive", 0, + "rtp retransmission receiver"); + + return gst_element_register (plugin, "rtprtxreceive", GST_RANK_NONE, + GST_TYPE_RTP_RTX_RECEIVE); +} diff --git a/gst/rtpmanager/gstrtprtxreceive.h b/gst/rtpmanager/gstrtprtxreceive.h new file mode 100644 index 0000000000..7c42fa9024 --- /dev/null +++ b/gst/rtpmanager/gstrtprtxreceive.h @@ -0,0 +1,86 @@ +/* RTP Retransmission receiver element for GStreamer + * + * gstrtprtxreceive.h: + * + * Copyright (C) 2013 Collabora Ltd. + * @author Julien Isorce + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifndef __GST_RTP_RTX_RECEIVE_H__ +#define __GST_RTP_RTX_RECEIVE_H__ + +#include +#include + +G_BEGIN_DECLS +#define GST_TYPE_RTP_RTX_RECEIVE (gst_rtp_rtx_receive_get_type()) +#define GST_RTP_RTX_RECEIVE(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_RTP_RTX_RECEIVE, GstRtpRtxReceive)) +#define GST_RTP_RTX_RECEIVE_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_RTP_RTX_RECEIVE, GstRtpRtxReceiveClass)) +#define GST_RTP_RTX_RECEIVE_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj), GST_TYPE_RTP_RTX_RECEIVE, GstRtpRtxReceiveClass)) +#define GST_IS_RTP_RTX_RECEIVE(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_RTP_RTX_RECEIVE)) +#define GST_IS_RTP_RTX_RECEIVE_CLASS(obj) (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_RTP_RTX_RECEIVE)) +typedef struct _GstRtpRtxReceive GstRtpRtxReceive; +typedef struct _GstRtpRtxReceiveClass GstRtpRtxReceiveClass; + +struct _GstRtpRtxReceive +{ + GstElement element; + + /* pad */ + GstPad *sinkpad; + GstPad *srcpad; + + GMutex lock; + + /* retrieve associated master stream from rtx stream + * it also works to retrieve rtx stream from master stream + * as we make sure all ssrc are unique */ + GHashTable *ssrc2_ssrc1_map; + + /* retrieve master payload type from master stream ssrc */ + GHashTable *ssrc1_payload_type_map; + + /* contains seqnum of request packets of whom their ssrc have + * not been associated to a rtx stream yet */ + GHashTable *seqnum_ssrc1_map; + + /* contains a set of payload type for all retranmission stream + * that this element should handle (usually using SDP) + * it allow to recognize if the current packet is from a rtx stream + * or not. It's not deterministic because several rtx streams can use + * the same payload type + */ + GHashTable *rtx_payload_type_set; + + /* statistics */ + guint num_rtx_requests; + guint num_rtx_packets; + guint num_rtx_assoc_packets; +}; + +struct _GstRtpRtxReceiveClass +{ + GstElementClass parent_class; +}; + + +GType gst_rtp_rtx_receive_get_type (void); +gboolean gst_rtp_rtx_receive_plugin_init (GstPlugin * plugin); + +G_END_DECLS +#endif /* __GST_RTP_RTX_RECEIVE_H__ */ diff --git a/gst/rtpmanager/gstrtprtxsend.c b/gst/rtpmanager/gstrtprtxsend.c new file mode 100644 index 0000000000..3da8d7690a --- /dev/null +++ b/gst/rtpmanager/gstrtprtxsend.c @@ -0,0 +1,586 @@ +/* RTP Retransmission sender element for GStreamer + * + * gstrtprtxsend.c: + * + * Copyright (C) 2013 Collabora Ltd. + * @author Julien Isorce + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +/** + * SECTION:element-rtprtxsend + * + * See #GstRtpRtxReceive for examples + * + * The purpose of the sender RTX object is to keep a history of RTP packets up + * to a configurable limit (max-size-time or max-size-packets). It will listen + * for upstream custom retransmission events (GstRTPRetransmissionRequest) that + * comes from downstream (#GstRtpSession). When receiving a request it will + * look up the requested seqnum in its list of stored packets. If the packet + * is available, it will create a RTX packet according to RFC 4588 and send + * this as an auxiliary stream. RTX is SSRC-multiplexed + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include +#include +#include + +#include "gstrtprtxsend.h" + +GST_DEBUG_CATEGORY_STATIC (gst_rtp_rtx_send_debug); +#define GST_CAT_DEFAULT gst_rtp_rtx_send_debug + +#define DEFAULT_RTX_PAYLOAD_TYPE 0 +#define DEFAULT_MAX_SIZE_TIME 0 +#define DEFAULT_MAX_SIZE_PACKETS 100 + +enum +{ + PROP_0, + PROP_RTX_PAYLOAD_TYPE, + PROP_MAX_SIZE_TIME, + PROP_MAX_SIZE_PACKETS, + PROP_NUM_RTX_REQUESTS, + PROP_NUM_RTX_PACKETS, + PROP_LAST +}; + +static GstStaticPadTemplate src_factory = GST_STATIC_PAD_TEMPLATE ("src", + GST_PAD_SRC, + GST_PAD_ALWAYS, + GST_STATIC_CAPS ("application/x-rtp") + ); + +static GstStaticPadTemplate sink_factory = GST_STATIC_PAD_TEMPLATE ("sink", + GST_PAD_SINK, + GST_PAD_ALWAYS, + GST_STATIC_CAPS ("application/x-rtp") + ); + +static gboolean gst_rtp_rtx_send_src_event (GstPad * pad, GstObject * parent, + GstEvent * event); +static GstFlowReturn gst_rtp_rtx_send_chain (GstPad * pad, GstObject * parent, + GstBuffer * buffer); + +static GstStateChangeReturn gst_rtp_rtx_send_change_state (GstElement * + element, GstStateChange transition); + +static void gst_rtp_rtx_send_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec); +static void gst_rtp_rtx_send_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec); +static void gst_rtp_rtx_send_finalize (GObject * object); + +G_DEFINE_TYPE (GstRtpRtxSend, gst_rtp_rtx_send, GST_TYPE_ELEMENT); + +static void +gst_rtp_rtx_send_class_init (GstRtpRtxSendClass * klass) +{ + GObjectClass *gobject_class; + GstElementClass *gstelement_class; + + gobject_class = (GObjectClass *) klass; + gstelement_class = (GstElementClass *) klass; + + gobject_class->get_property = gst_rtp_rtx_send_get_property; + gobject_class->set_property = gst_rtp_rtx_send_set_property; + gobject_class->finalize = gst_rtp_rtx_send_finalize; + + g_object_class_install_property (gobject_class, PROP_RTX_PAYLOAD_TYPE, + g_param_spec_uint ("rtx-payload-type", "RTX Payload Type", + "Payload type of the retransmission stream (fmtp in SDP)", 0, + G_MAXUINT, DEFAULT_RTX_PAYLOAD_TYPE, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME, + g_param_spec_uint ("max-size-time", "Max Size Times", + "Amount of ms to queue (0 = unlimited)", 0, G_MAXUINT, + DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, PROP_MAX_SIZE_PACKETS, + g_param_spec_uint ("max-size-packets", "Max Size Packets", + "Amount of packets to queue (0 = unlimited)", 0, G_MAXUINT, + DEFAULT_MAX_SIZE_PACKETS, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, PROP_NUM_RTX_REQUESTS, + g_param_spec_uint ("num-rtx-requests", "Num RTX Requests", + "Number of retransmission events received", 0, G_MAXUINT, + 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, PROP_NUM_RTX_PACKETS, + g_param_spec_uint ("num-rtx-packets", "Num RTX Packets", + " Number of retransmission packets sent", 0, G_MAXUINT, + 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); + + gst_element_class_add_pad_template (gstelement_class, + gst_static_pad_template_get (&src_factory)); + gst_element_class_add_pad_template (gstelement_class, + gst_static_pad_template_get (&sink_factory)); + + gst_element_class_set_static_metadata (gstelement_class, + "RTP Retransmission Sender", "Codec", + "Retransmit RTP packets when needed, according to RFC4588", + "Julien Isorce "); + + gstelement_class->change_state = + GST_DEBUG_FUNCPTR (gst_rtp_rtx_send_change_state); +} + +static void +gst_rtp_rtx_send_reset (GstRtpRtxSend * rtx, gboolean full) +{ + g_mutex_lock (&rtx->lock); + g_queue_foreach (rtx->queue, (GFunc) gst_buffer_unref, NULL); + g_queue_clear (rtx->queue); + g_list_foreach (rtx->pending, (GFunc) gst_buffer_unref, NULL); + g_list_free (rtx->pending); + rtx->pending = NULL; + rtx->master_ssrc = 0; + rtx->next_seqnum = g_random_int_range (0, G_MAXUINT16); + rtx->rtx_ssrc = g_random_int (); + rtx->num_rtx_requests = 0; + rtx->num_rtx_packets = 0; + g_mutex_unlock (&rtx->lock); +} + +static void +gst_rtp_rtx_send_finalize (GObject * object) +{ + GstRtpRtxSend *rtx = GST_RTP_RTX_SEND (object); + + gst_rtp_rtx_send_reset (rtx, TRUE); + g_queue_free (rtx->queue); + g_mutex_clear (&rtx->lock); + + G_OBJECT_CLASS (gst_rtp_rtx_send_parent_class)->finalize (object); +} + +static void +gst_rtp_rtx_send_init (GstRtpRtxSend * rtx) +{ + GstElementClass *klass = GST_ELEMENT_GET_CLASS (rtx); + + rtx->srcpad = + gst_pad_new_from_template (gst_element_class_get_pad_template (klass, + "src"), "src"); + GST_PAD_SET_PROXY_CAPS (rtx->srcpad); + GST_PAD_SET_PROXY_ALLOCATION (rtx->srcpad); + gst_pad_set_event_function (rtx->srcpad, + GST_DEBUG_FUNCPTR (gst_rtp_rtx_send_src_event)); + gst_element_add_pad (GST_ELEMENT (rtx), rtx->srcpad); + + rtx->sinkpad = + gst_pad_new_from_template (gst_element_class_get_pad_template (klass, + "sink"), "sink"); + GST_PAD_SET_PROXY_CAPS (rtx->sinkpad); + GST_PAD_SET_PROXY_ALLOCATION (rtx->sinkpad); + gst_pad_set_chain_function (rtx->sinkpad, + GST_DEBUG_FUNCPTR (gst_rtp_rtx_send_chain)); + gst_element_add_pad (GST_ELEMENT (rtx), rtx->sinkpad); + + rtx->queue = g_queue_new (); + rtx->pending = NULL; + g_mutex_init (&rtx->lock); + + rtx->next_seqnum = g_random_int_range (0, G_MAXUINT16); + rtx->rtx_ssrc = g_random_int (); + + rtx->max_size_time = DEFAULT_MAX_SIZE_TIME; + rtx->max_size_packets = DEFAULT_MAX_SIZE_PACKETS; +} + +static guint32 +choose_ssrc (GstRtpRtxSend * rtx) +{ + guint32 ssrc; + + while (TRUE) { + ssrc = g_random_int (); + + /* make sure to be different than master */ + if (ssrc != rtx->master_ssrc) + break; + } + return ssrc; +} + +typedef struct +{ + GstRtpRtxSend *rtx; + guint seqnum; + gboolean found; +} RTXData; + +/* traverse queue history and try to find the buffer that the + * requested seqnum */ +static void +push_seqnum (GstBuffer * buffer, RTXData * data) +{ + GstRtpRtxSend *rtx = data->rtx; + GstRTPBuffer rtpbuffer = GST_RTP_BUFFER_INIT; + guint16 seqnum; + + if (data->found) + return; + + if (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtpbuffer)) + return; + + seqnum = gst_rtp_buffer_get_seq (&rtpbuffer); + gst_rtp_buffer_unmap (&rtpbuffer); + + /* data->seqnum comes from the request */ + if (seqnum == data->seqnum) { + data->found = TRUE; + GST_DEBUG_OBJECT (rtx, "found %" G_GUINT16_FORMAT, seqnum); + rtx->pending = g_list_prepend (rtx->pending, gst_buffer_ref (buffer)); + } +} + +static gboolean +gst_rtp_rtx_send_src_event (GstPad * pad, GstObject * parent, GstEvent * event) +{ + GstRtpRtxSend *rtx = GST_RTP_RTX_SEND (parent); + gboolean res; + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_CUSTOM_UPSTREAM: + { + const GstStructure *s = gst_event_get_structure (event); + + /* This event usually comes from the downstream gstrtpsession */ + if (gst_structure_has_name (s, "GstRTPRetransmissionRequest")) { + guint32 seqnum = 0; + guint ssrc = 0; + RTXData data; + + /* retrieve seqnum of the packet that need to be restransmisted */ + if (!gst_structure_get_uint (s, "seqnum", &seqnum)) + seqnum = -1; + + /* retrieve ssrc of the packet that need to be restransmisted */ + if (!gst_structure_get_uint (s, "ssrc", &ssrc)) + ssrc = -1; + + GST_DEBUG_OBJECT (rtx, + "request seqnum: %" G_GUINT16_FORMAT ", ssrc: %" G_GUINT32_FORMAT, + seqnum, ssrc); + + g_mutex_lock (&rtx->lock); + /* check if request is for us */ + if (rtx->master_ssrc == ssrc) { + ++rtx->num_rtx_requests; + data.rtx = rtx; + data.seqnum = seqnum; + data.found = FALSE; + /* TODO do a binary search because rtx->queue is sorted by seq num */ + g_queue_foreach (rtx->queue, (GFunc) push_seqnum, &data); + } + g_mutex_unlock (&rtx->lock); + + gst_event_unref (event); + res = TRUE; + + /* This event usually comes from the downstream gstrtpsession */ + } else if (gst_structure_has_name (s, "GstRTPCollision")) { + guint ssrc = 0; + + if (!gst_structure_get_uint (s, "ssrc", &ssrc)) + ssrc = -1; + + GST_DEBUG_OBJECT (rtx, "collision ssrc: %" G_GUINT32_FORMAT, ssrc); + + g_mutex_lock (&rtx->lock); + + /* choose another ssrc for our retransmited stream */ + if (ssrc == rtx->rtx_ssrc) { + rtx->rtx_ssrc = choose_ssrc (rtx); + + /* clear buffers we already saved */ + g_queue_foreach (rtx->queue, (GFunc) gst_buffer_unref, NULL); + g_queue_clear (rtx->queue); + + /* clear buffers that are about to be retransmited */ + g_list_foreach (rtx->pending, (GFunc) gst_buffer_unref, NULL); + g_list_free (rtx->pending); + rtx->pending = NULL; + + g_mutex_unlock (&rtx->lock); + + /* no need to forward to payloader because we make sure to have + * a different ssrc + */ + gst_event_unref (event); + res = TRUE; + } else { + g_mutex_unlock (&rtx->lock); + + /* forward event to payloader in case collided ssrc is + * master stream */ + res = gst_pad_event_default (pad, parent, event); + } + } else { + res = gst_pad_event_default (pad, parent, event); + } + break; + } + default: + res = gst_pad_event_default (pad, parent, event); + break; + } + return res; +} + +/* Copy fixed header and extension. Add OSN before to copy payload + * Copy memory to avoid to manually copy each rtp buffer field. + */ +static GstBuffer * +_gst_rtp_rtx_buffer_new (GstBuffer * buffer, guint32 ssrc, guint16 seqnum, + guint8 fmtp) +{ + GstMemory *mem = NULL; + GstRTPBuffer rtp = GST_RTP_BUFFER_INIT; + GstRTPBuffer new_rtp = GST_RTP_BUFFER_INIT; + GstBuffer *new_buffer = gst_buffer_new (); + GstMapInfo map; + guint payload_len = 0; + + gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp); + + /* gst_rtp_buffer_map does not map the payload so do it now */ + gst_rtp_buffer_get_payload (&rtp); + + /* If payload type is not set through SDP/property then + * just bump the value */ + if (fmtp < 96) + fmtp = gst_rtp_buffer_get_payload_type (&rtp) + 1; + + /* copy fixed header */ + mem = gst_memory_copy (rtp.map[0].memory, 0, rtp.size[0]); + gst_buffer_append_memory (new_buffer, mem); + + /* copy extension if any */ + if (rtp.size[1]) { + mem = gst_memory_copy (rtp.map[1].memory, 0, rtp.size[1]); + gst_buffer_append_memory (new_buffer, mem); + } + + /* copy payload and add OSN just before */ + payload_len = 2 + rtp.size[2]; + mem = gst_allocator_alloc (NULL, payload_len, NULL); + + gst_memory_map (mem, &map, GST_MAP_WRITE); + GST_WRITE_UINT16_BE (map.data, gst_rtp_buffer_get_seq (&rtp)); + if (rtp.size[2]) + memcpy (map.data + 2, rtp.data[2], rtp.size[2]); + gst_memory_unmap (mem, &map); + gst_buffer_append_memory (new_buffer, mem); + + /* everything needed is copied */ + gst_rtp_buffer_unmap (&rtp); + + /* set ssrc, seqnum and fmtp */ + gst_rtp_buffer_map (new_buffer, GST_MAP_WRITE, &new_rtp); + gst_rtp_buffer_set_ssrc (&new_rtp, ssrc); + gst_rtp_buffer_set_seq (&new_rtp, seqnum); + gst_rtp_buffer_set_payload_type (&new_rtp, fmtp); + /* RFC 4588: let other elements do the padding, as normal */ + gst_rtp_buffer_set_padding (&new_rtp, FALSE); + gst_rtp_buffer_unmap (&new_rtp); + + return new_buffer; +} + +/* psuh pending retransmission packet. + * it constructs rtx packet from original paclets */ +static void +do_push (GstBuffer * buffer, GstRtpRtxSend * rtx) +{ + /* RFC4588 two streams multiplexed by sending them in the same session using + * different SSRC values, i.e., SSRC-multiplexing. */ + GST_DEBUG_OBJECT (rtx, + "retransmit seqnum: %" G_GUINT16_FORMAT ", ssrc: %" G_GUINT32_FORMAT, + rtx->next_seqnum, rtx->rtx_ssrc); + gst_pad_push (rtx->srcpad, _gst_rtp_rtx_buffer_new (buffer, rtx->rtx_ssrc, + rtx->next_seqnum++, rtx->rtx_payload_type)); +} + +static GstFlowReturn +gst_rtp_rtx_send_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) +{ + GstRtpRtxSend *rtx = GST_RTP_RTX_SEND (parent); + GstFlowReturn ret = GST_FLOW_ERROR; + GList *pending = NULL; + GstRTPBuffer rtp = GST_RTP_BUFFER_INIT; + guint seqnum = 0; + + g_mutex_lock (&rtx->lock); + + /* retrievemaster stream ssrc */ + gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp); + rtx->master_ssrc = gst_rtp_buffer_get_ssrc (&rtp); + seqnum = gst_rtp_buffer_get_seq (&rtp); + gst_rtp_buffer_unmap (&rtp); + + /* check if our initial aux ssrc is equal to master */ + if (rtx->rtx_ssrc == rtx->master_ssrc) + choose_ssrc (rtx); + + /* add current rtp buffer to queue history */ + g_queue_push_head (rtx->queue, gst_buffer_ref (buffer)); + + /* remove oldest packets from history if they are too many */ + if (rtx->max_size_packets) { + while (g_queue_get_length (rtx->queue) > rtx->max_size_packets) + gst_buffer_unref (g_queue_pop_tail (rtx->queue)); + } + + /* within lock, get packets that have to be retransmited */ + pending = rtx->pending; + rtx->pending = NULL; + + /* assume we will succeed to retransmit those packets */ + rtx->num_rtx_packets += g_list_length (pending); + + /* transfer payload type while holding the lock */ + rtx->rtx_payload_type = rtx->rtx_payload_type_pending; + + g_mutex_unlock (&rtx->lock); + + /* no need to hold the lock to push rtx packets */ + g_list_foreach (pending, (GFunc) do_push, rtx); + g_list_foreach (pending, (GFunc) gst_buffer_unref, NULL); + g_list_free (pending); + + GST_LOG_OBJECT (rtx, + "push seqnum: %" G_GUINT16_FORMAT ", ssrc: %" G_GUINT32_FORMAT, seqnum, + rtx->master_ssrc); + + /* push current rtp packet */ + ret = gst_pad_push (rtx->srcpad, buffer); + + return ret; +} + +static void +gst_rtp_rtx_send_get_property (GObject * object, + guint prop_id, GValue * value, GParamSpec * pspec) +{ + GstRtpRtxSend *rtx = GST_RTP_RTX_SEND (object); + + switch (prop_id) { + case PROP_RTX_PAYLOAD_TYPE: + g_mutex_lock (&rtx->lock); + g_value_set_uint (value, rtx->rtx_payload_type_pending); + g_mutex_unlock (&rtx->lock); + break; + case PROP_MAX_SIZE_TIME: + g_mutex_lock (&rtx->lock); + g_value_set_uint (value, rtx->max_size_time); + g_mutex_unlock (&rtx->lock); + break; + case PROP_MAX_SIZE_PACKETS: + g_mutex_lock (&rtx->lock); + g_value_set_uint (value, rtx->max_size_packets); + g_mutex_unlock (&rtx->lock); + break; + case PROP_NUM_RTX_REQUESTS: + g_mutex_lock (&rtx->lock); + g_value_set_uint (value, rtx->num_rtx_requests); + g_mutex_unlock (&rtx->lock); + break; + case PROP_NUM_RTX_PACKETS: + g_mutex_lock (&rtx->lock); + g_value_set_uint (value, rtx->num_rtx_packets); + g_mutex_unlock (&rtx->lock); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_rtp_rtx_send_set_property (GObject * object, + guint prop_id, const GValue * value, GParamSpec * pspec) +{ + GstRtpRtxSend *rtx = GST_RTP_RTX_SEND (object); + + switch (prop_id) { + case PROP_RTX_PAYLOAD_TYPE: + g_mutex_lock (&rtx->lock); + rtx->rtx_payload_type_pending = g_value_get_uint (value); + g_mutex_unlock (&rtx->lock); + break; + case PROP_MAX_SIZE_TIME: + g_mutex_lock (&rtx->lock); + rtx->max_size_time = g_value_get_uint (value); + g_mutex_unlock (&rtx->lock); + break; + case PROP_MAX_SIZE_PACKETS: + g_mutex_lock (&rtx->lock); + rtx->max_size_packets = g_value_get_uint (value); + g_mutex_unlock (&rtx->lock); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static GstStateChangeReturn +gst_rtp_rtx_send_change_state (GstElement * element, GstStateChange transition) +{ + GstStateChangeReturn ret; + GstRtpRtxSend *rtx; + + rtx = GST_RTP_RTX_SEND (element); + + switch (transition) { + default: + break; + } + + ret = + GST_ELEMENT_CLASS (gst_rtp_rtx_send_parent_class)->change_state (element, + transition); + + switch (transition) { + case GST_STATE_CHANGE_PAUSED_TO_READY: + gst_rtp_rtx_send_reset (rtx, TRUE); + break; + default: + break; + } + + return ret; +} + +gboolean +gst_rtp_rtx_send_plugin_init (GstPlugin * plugin) +{ + GST_DEBUG_CATEGORY_INIT (gst_rtp_rtx_send_debug, "rtprtxsend", 0, + "rtp retransmission sender"); + + return gst_element_register (plugin, "rtprtxsend", GST_RANK_NONE, + GST_TYPE_RTP_RTX_SEND); +} diff --git a/gst/rtpmanager/gstrtprtxsend.h b/gst/rtpmanager/gstrtprtxsend.h new file mode 100644 index 0000000000..aa387cdfeb --- /dev/null +++ b/gst/rtpmanager/gstrtprtxsend.h @@ -0,0 +1,80 @@ +/* RTP Retransmission sender element for GStreamer + * + * gstrtprtxsend.h: + * + * Copyright (C) 2013 Collabora Ltd. + * @author Julien Isorce + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifndef __GST_RTP_RTX_SEND_H__ +#define __GST_RTP_RTX_SEND_H__ + +#include +#include + +G_BEGIN_DECLS +#define GST_TYPE_RTP_RTX_SEND (gst_rtp_rtx_send_get_type()) +#define GST_RTP_RTX_SEND(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_RTP_RTX_SEND, GstRtpRtxSend)) +#define GST_RTP_RTX_SEND_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_RTP_RTX_SEND, GstRtpRtxSendClass)) +#define GST_RTP_RTX_SEND_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj), GST_TYPE_RTP_RTX_SEND, GstRtpRtxSendClass)) +#define GST_IS_RTP_RTX_SEND(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_RTP_RTX_SEND)) +#define GST_IS_RTP_RTX_SEND_CLASS(obj) (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_RTP_RTX_SEND)) +typedef struct _GstRtpRtxSend GstRtpRtxSend; +typedef struct _GstRtpRtxSendClass GstRtpRtxSendClass; + +struct _GstRtpRtxSend +{ + GstElement element; + + /* pad */ + GstPad *sinkpad; + GstPad *srcpad; + + GMutex lock; + + /* history of rtp packets */ + GQueue *queue; + /* rtp packets that will be pushed upon next buffer */ + GList *pending; + + guint32 master_ssrc; + guint32 rtx_ssrc; + guint16 next_seqnum; + guint8 rtx_payload_type; + + /* retreived from SDP */ + guint rtx_payload_type_pending; + guint max_size_time; + guint max_size_packets; + + /* statistics */ + guint num_rtx_requests; + guint num_rtx_packets; +}; + +struct _GstRtpRtxSendClass +{ + GstElementClass parent_class; +}; + + +GType gst_rtp_rtx_send_get_type (void); +gboolean gst_rtp_rtx_send_plugin_init (GstPlugin * plugin); + +G_END_DECLS +#endif /* __GST_RTP_RTX_SEND_H__ */