rtprtxreceive: modify to use a payload-type map like rtprtxsend

This commit is contained in:
George Kiagiadakis 2013-11-29 19:35:44 +01:00 committed by Wim Taymans
parent c8a04bc7b2
commit 9226091235
5 changed files with 72 additions and 96 deletions

View file

@ -120,18 +120,17 @@
#include <gst/gst.h>
#include <gst/rtp/gstrtpbuffer.h>
#include <string.h>
#include <stdlib.h>
#include "gstrtprtxreceive.h"
GST_DEBUG_CATEGORY_STATIC (gst_rtp_rtx_receive_debug);
#define GST_CAT_DEFAULT gst_rtp_rtx_receive_debug
#define DEFAULT_RTX_PAYLOAD_TYPES ""
enum
{
PROP_0,
PROP_RTX_PAYLOAD_TYPES,
PROP_PAYLOAD_TYPE_MAP,
PROP_NUM_RTX_REQUESTS,
PROP_NUM_RTX_PACKETS,
PROP_NUM_RTX_ASSOC_PACKETS,
@ -179,12 +178,10 @@ gst_rtp_rtx_receive_class_init (GstRtpRtxReceiveClass * klass)
gobject_class->set_property = gst_rtp_rtx_receive_set_property;
gobject_class->finalize = gst_rtp_rtx_receive_finalize;
g_object_class_install_property (gobject_class, PROP_RTX_PAYLOAD_TYPES,
g_param_spec_string ("rtx-payload-types",
"Colon separated list of payload format type",
"Set through SDP (fmtp), it helps to detect restransmission streams "
"eg 97:101:127", DEFAULT_RTX_PAYLOAD_TYPES,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_PAYLOAD_TYPE_MAP,
g_param_spec_boxed ("payload-type-map", "Payload Type Map",
"Map of original payload types to their retransmission payload types",
GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_NUM_RTX_REQUESTS,
g_param_spec_uint ("num-rtx-requests", "Num RTX Requests",
@ -221,9 +218,7 @@ gst_rtp_rtx_receive_reset (GstRtpRtxReceive * rtx)
{
g_mutex_lock (&rtx->lock);
g_hash_table_remove_all (rtx->ssrc2_ssrc1_map);
g_hash_table_remove_all (rtx->ssrc1_payload_type_map);
g_hash_table_remove_all (rtx->seqnum_ssrc1_map);
g_hash_table_remove_all (rtx->rtx_payload_type_set);
rtx->num_rtx_requests = 0;
rtx->num_rtx_packets = 0;
rtx->num_rtx_assoc_packets = 0;
@ -242,20 +237,14 @@ gst_rtp_rtx_receive_finalize (GObject * object)
rtx->ssrc2_ssrc1_map = NULL;
}
if (rtx->ssrc1_payload_type_map) {
g_hash_table_destroy (rtx->ssrc1_payload_type_map);
rtx->ssrc1_payload_type_map = NULL;
}
if (rtx->seqnum_ssrc1_map) {
g_hash_table_destroy (rtx->seqnum_ssrc1_map);
rtx->seqnum_ssrc1_map = NULL;
}
if (rtx->rtx_payload_type_set) {
g_hash_table_destroy (rtx->rtx_payload_type_set);
rtx->rtx_payload_type_set = NULL;
}
g_hash_table_unref (rtx->rtx_pt_map);
if (rtx->pending_rtx_pt_map)
gst_structure_free (rtx->pending_rtx_pt_map);
g_mutex_clear (&rtx->lock);
@ -286,10 +275,10 @@ gst_rtp_rtx_receive_init (GstRtpRtxReceive * rtx)
gst_element_add_pad (GST_ELEMENT (rtx), rtx->sinkpad);
rtx->ssrc2_ssrc1_map = g_hash_table_new (g_direct_hash, g_direct_equal);
rtx->ssrc1_payload_type_map =
g_hash_table_new (g_direct_hash, g_direct_equal);
rtx->seqnum_ssrc1_map = g_hash_table_new (g_direct_hash, g_direct_equal);
rtx->rtx_payload_type_set = g_hash_table_new (g_direct_hash, g_direct_equal);
rtx->rtx_pt_map = g_hash_table_new (g_direct_hash, g_direct_equal);
rtx->rtx_pt_map_changed = FALSE;
g_mutex_init (&rtx->lock);
}
@ -401,6 +390,23 @@ gst_rtp_rtx_receive_src_event (GstPad * pad, GstObject * parent,
return res;
}
static gboolean
structure_to_hash_table_inv (GQuark field_id, const GValue * value,
gpointer hash)
{
const gchar *field_str;
guint field_uint;
guint value_uint;
field_str = g_quark_to_string (field_id);
field_uint = atoi (field_str);
value_uint = g_value_get_uint (value);
g_hash_table_insert ((GHashTable *) hash, GUINT_TO_POINTER (value_uint),
GUINT_TO_POINTER (field_uint));
return TRUE;
}
/* Copy fixed header and extension. Replace current ssrc by ssrc1,
* remove OSN and replace current seq num by OSN.
* Copy memory to avoid to manually copy each rtp buffer field.
@ -486,14 +492,27 @@ gst_rtp_rtx_receive_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
/* check if we have a retransmission packet (this information comes from SDP) */
g_mutex_lock (&rtx->lock);
/* transfer payload type while holding the lock */
if (rtx->rtx_pt_map_changed) {
g_hash_table_remove_all (rtx->rtx_pt_map);
gst_structure_foreach (rtx->pending_rtx_pt_map, structure_to_hash_table_inv,
rtx->rtx_pt_map);
rtx->rtx_pt_map_changed = FALSE;
}
is_rtx =
g_hash_table_lookup_extended (rtx->rtx_payload_type_set,
g_hash_table_lookup_extended (rtx->rtx_pt_map,
GUINT_TO_POINTER (payload_type), NULL, NULL);
g_mutex_unlock (&rtx->lock);
if (is_rtx) {
/* read OSN in the rtx payload */
orign_seqnum = GST_READ_UINT16_BE (gst_rtp_buffer_get_payload (&rtp));
origin_payload_type =
GPOINTER_TO_UINT (g_hash_table_lookup (rtx->rtx_pt_map,
GUINT_TO_POINTER (payload_type)));
}
g_mutex_lock (&rtx->lock);
@ -512,12 +531,6 @@ gst_rtp_rtx_receive_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
" already associated to master stream %" G_GUINT32_FORMAT, ssrc,
GPOINTER_TO_UINT (ssrc1));
ssrc2 = ssrc;
/* also retrieve the payload type of the original stream in order to
* reconstruct the packet */
origin_payload_type =
GPOINTER_TO_UINT (g_hash_table_lookup (rtx->ssrc1_payload_type_map,
ssrc1));
} else {
/* the current retransmisted packet has its rtx stream not already
* associated to a master stream, so retrieve it from our request
@ -551,11 +564,6 @@ gst_rtp_rtx_receive_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
*/
g_hash_table_insert (rtx->ssrc2_ssrc1_map, ssrc1,
GUINT_TO_POINTER (ssrc2));
/* retrieve the original payload type */
origin_payload_type =
GPOINTER_TO_UINT (g_hash_table_lookup (rtx->ssrc1_payload_type_map,
ssrc1));
} else {
/* we are not able to associate this rtx packet with a master stream */
GST_DEBUG
@ -564,10 +572,6 @@ gst_rtp_rtx_receive_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
drop = TRUE;
}
}
} else { /* not rtx */
/* store ssrc -> pt association */
g_hash_table_insert (rtx->ssrc1_payload_type_map, GUINT_TO_POINTER (ssrc),
GUINT_TO_POINTER (payload_type));
}
/* if not dropped the packet was successfully associated */
@ -607,18 +611,6 @@ gst_rtp_rtx_receive_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
return ret;
}
static void
construct_pt_string (gpointer key, gpointer value, gpointer user_data)
{
GString **str = (GString **) user_data;
if (!(*str)) {
*str = g_string_new (NULL);
g_string_printf (*str, "%d", GPOINTER_TO_UINT (key));
} else {
g_string_append_printf (*str, ":%d", GPOINTER_TO_UINT (key));
}
}
static void
gst_rtp_rtx_receive_get_property (GObject * object,
guint prop_id, GValue * value, GParamSpec * pspec)
@ -626,16 +618,11 @@ gst_rtp_rtx_receive_get_property (GObject * object,
GstRtpRtxReceive *rtx = GST_RTP_RTX_RECEIVE (object);
switch (prop_id) {
case PROP_RTX_PAYLOAD_TYPES:{
GString *str = NULL;
case PROP_PAYLOAD_TYPE_MAP:
g_mutex_lock (&rtx->lock);
g_hash_table_foreach (rtx->rtx_payload_type_set,
(GHFunc) construct_pt_string, &str);
if (str)
g_value_take_string (value, g_string_free (str, FALSE));
g_value_set_boxed (value, rtx->pending_rtx_pt_map);
g_mutex_unlock (&rtx->lock);
break;
}
case PROP_NUM_RTX_REQUESTS:
g_mutex_lock (&rtx->lock);
g_value_set_uint (value, rtx->num_rtx_requests);
@ -662,30 +649,14 @@ gst_rtp_rtx_receive_set_property (GObject * object,
guint prop_id, const GValue * value, GParamSpec * pspec)
{
GstRtpRtxReceive *rtx = GST_RTP_RTX_RECEIVE (object);
gchar **str_fmtp = NULL;
guint nb_fmtp = 0;
gint i = 0;
switch (prop_id) {
case PROP_RTX_PAYLOAD_TYPES:
case PROP_PAYLOAD_TYPE_MAP:
g_mutex_lock (&rtx->lock);
/* parses string ex: 97:101:122 */
str_fmtp = g_strsplit (g_value_get_string (value), ":", -1);
nb_fmtp = g_strv_length (str_fmtp);
if (nb_fmtp > 0) {
for (i = 0; i < nb_fmtp; ++i) {
gdouble fmtpd = g_strtod (str_fmtp[i], NULL);
/* dynamic range is in [95, 127] */
if (fmtpd > 95 && fmtpd < 128) {
guint8 fmtp = fmtpd;
g_hash_table_add (rtx->rtx_payload_type_set,
GUINT_TO_POINTER (fmtp));
GST_INFO ("add rtx payload type %" G_GUINT16_FORMAT, fmtp);
}
}
}
if (str_fmtp)
g_strfreev (str_fmtp);
if (rtx->pending_rtx_pt_map)
gst_structure_free (rtx->pending_rtx_pt_map);
rtx->pending_rtx_pt_map = g_value_dup_boxed (value);
rtx->rtx_pt_map_changed = TRUE;
g_mutex_unlock (&rtx->lock);
break;
default:

View file

@ -52,20 +52,15 @@ struct _GstRtpRtxReceive
* as we make sure all ssrc are unique */
GHashTable *ssrc2_ssrc1_map;
/* retrieve master payload type from master stream ssrc */
GHashTable *ssrc1_payload_type_map;
/* contains seqnum of request packets of whom their ssrc have
* not been associated to a rtx stream yet */
GHashTable *seqnum_ssrc1_map;
/* contains a set of payload type for all retranmission stream
* that this element should handle (usually using SDP)
* it allow to recognize if the current packet is from a rtx stream
* or not. It's not deterministic because several rtx streams can use
* the same payload type
*/
GHashTable *rtx_payload_type_set;
/* rtx pt (uint) -> origin pt (uint) */
GHashTable *rtx_pt_map;
/* origin pt (string) -> rtx pt (uint) */
GstStructure *pending_rtx_pt_map;
gboolean rtx_pt_map_changed;
/* statistics */
guint num_rtx_requests;

View file

@ -260,7 +260,7 @@ GST_START_TEST (test_simple_rtpbin_aux)
g_object_set (rtppayloader, "pt", 96, NULL);
g_object_set (rtppayloader, "seqnum-offset", 1, NULL);
g_object_set (rtprtxsend, "payload-type-map", pt_map, NULL);
g_object_set (rtprtxreceive, "rtx-payload-types", "99:111:125", NULL);
g_object_set (rtprtxreceive, "payload-type-map", pt_map, NULL);
gst_structure_free (pt_map);
/* set rtp aux receive */

View file

@ -220,7 +220,7 @@ GST_START_TEST (test_push_forward_seq)
pt_map = gst_structure_new ("application/x-rtp-pt-map",
"0", G_TYPE_UINT, 97, NULL);
g_object_set (rtprtxsend, "payload-type-map", pt_map, NULL);
g_object_set (rtprtxreceive, "rtx-payload-types", "97", NULL);
g_object_set (rtprtxreceive, "payload-type-map", pt_map, NULL);
gst_structure_free (pt_map);
/* push buffers: 0,1,2, */
@ -408,7 +408,7 @@ start_test_drop_and_check_results (GstElement * bin, GstElement * rtppayloader,
g_object_set (rtppayloader, "pt", 96, NULL);
g_object_set (rtppayloader, "seqnum-offset", 1, NULL);
g_object_set (rtprtxsend, "payload-type-map", pt_map, NULL);
g_object_set (rtprtxreceive, "rtx-payload-types", "99:111:125", NULL);
g_object_set (rtprtxreceive, "payload-type-map", pt_map, NULL);
gst_structure_free (pt_map);
send_rtxdata->count = 1;
@ -994,6 +994,7 @@ GST_START_TEST (test_drop_multiple_sender)
guint drop_every_n_packets = 0;
GList *send_rtxdata_list = NULL;
RTXReceiveMultipleData receive_rtxdata;
GstStructure *pt_map;
GST_INFO ("preparing test");
@ -1031,6 +1032,12 @@ GST_START_TEST (test_drop_multiple_sender)
g_list_append (send_rtxdata_list, add_sender (bin, "videotestsrc",
"rtpvrawpay", 99, 124));
pt_map = gst_structure_new ("application/x-rtp-pt-map",
"96", G_TYPE_UINT, 121, "97", G_TYPE_UINT, 122,
"98", G_TYPE_UINT, 123, "99", G_TYPE_UINT, 124, NULL);
g_object_set (rtprtxreceive, "payload-type-map", pt_map, NULL);
gst_structure_free (pt_map);
res = gst_element_link (funnel, rtprtxreceive);
fail_unless (res == TRUE, NULL);
res = gst_element_link (rtprtxreceive, sink);
@ -1053,7 +1060,6 @@ GST_START_TEST (test_drop_multiple_sender)
for (drop_every_n_packets = 2; drop_every_n_packets < 10;
drop_every_n_packets++) {
g_object_set (rtprtxreceive, "rtx-payload-types", "121:122:123:124", NULL);
nb_eos = 0;
start_test_drop_multiple_and_check_results (bin, send_rtxdata_list,
&receive_rtxdata, drop_every_n_packets);
@ -1402,7 +1408,7 @@ GST_START_TEST (test_rtxreceive_data_reconstruction)
pt_map = gst_structure_new ("application/x-rtp-pt-map",
"96", G_TYPE_UINT, 99, NULL);
g_object_set (rtxsend, "payload-type-map", pt_map, NULL);
g_object_set (rtxrecv, "rtx-payload-types", "99", NULL);
g_object_set (rtxrecv, "payload-type-map", pt_map, NULL);
gst_structure_free (pt_map);
fail_unless_equals_int (gst_element_link (rtxsend, rtxrecv), TRUE);

View file

@ -247,11 +247,15 @@ request_aux_receiver (GstElement * rtpbin, guint sessid, SessionData * session)
GstElement *rtx, *bin;
GstPad *pad;
gchar *name;
GstStructure *pt_map;
GST_INFO ("creating AUX receiver");
bin = gst_bin_new (NULL);
rtx = gst_element_factory_make ("rtprtxreceive", NULL);
g_object_set (rtx, "rtx-payload-types", "99", NULL);
pt_map = gst_structure_new ("application/x-rtp-pt-map",
"96", G_TYPE_UINT, 99, NULL);
g_object_set (rtx, "payload-type-map", pt_map, NULL);
gst_structure_free (pt_map);
gst_bin_add (GST_BIN (bin), rtx);
pad = gst_element_get_static_pad (rtx, "src");
@ -296,7 +300,7 @@ join_session (GstElement * pipeline, GstElement * rtpBin, SessionData * session)
/* enable RFC4588 retransmission handling by setting rtprtxreceive
* as the "aux" element of rtpbin */
g_signal_emit_by_name (rtpBin, "request-aux-receiver",
g_signal_connect (rtpBin, "request-aux-receiver",
(GCallback) request_aux_receiver, session);
gst_bin_add_many (GST_BIN (pipeline), rtpSrc, rtcpSrc, rtcpSink, NULL);