media: implement ssrc-multiplexed retransmission support

based off RFC 4588 and the server-rtpaux example in -good
This commit is contained in:
Matthew Waters 2014-11-27 01:12:36 +11:00 committed by Sebastian Dröge
parent 058698c9cf
commit 4f40781fff
8 changed files with 352 additions and 1 deletions

View file

@ -171,6 +171,9 @@ gst_rtsp_media_get_address_pool
gst_rtsp_media_set_buffer_size
gst_rtsp_media_get_buffer_size
gst_rtsp_media_set_retransmission_time
gst_rtsp_media_get_retransmission_time
gst_rtsp_media_setup_sdp
<SUBSECTION MediaPrepare>
@ -257,6 +260,9 @@ gst_rtsp_media_factory_set_buffer_size
gst_rtsp_media_factory_get_suspend_mode
gst_rtsp_media_factory_set_suspend_mode
gst_rtsp_media_factory_set_retransmission_time
gst_rtsp_media_factory_get_retransmission_time
gst_rtsp_media_factory_construct
gst_rtsp_media_factory_create_element
@ -541,6 +547,9 @@ gst_rtsp_stream_get_profiles
gst_rtsp_stream_get_protocols
gst_rtsp_stream_set_protocols
gst_rtsp_stream_get_retransmission_time
gst_rtsp_stream_set_retransmission_time
gst_rtsp_stream_is_transport_supported
gst_rtsp_stream_get_address_pool

View file

@ -58,6 +58,8 @@ struct _GstRTSPMediaFactoryPrivate
guint buffer_size;
GstRTSPAddressPool *pool;
GstClockTime rtx_time;
GMutex medias_lock;
GHashTable *medias; /* protected by medias_lock */
};
@ -822,6 +824,55 @@ gst_rtsp_media_factory_get_protocols (GstRTSPMediaFactory * factory)
return res;
}
/**
* gst_rtsp_media_factory_set_retransmission_time:
* @factory: a #GstRTSPMediaFactory
* @time: a #GstClockTime
*
* Configure the time to store for possible retransmission
*/
void
gst_rtsp_media_factory_set_retransmission_time (GstRTSPMediaFactory * factory,
GstClockTime time)
{
GstRTSPMediaFactoryPrivate *priv;
g_return_if_fail (GST_IS_RTSP_MEDIA_FACTORY (factory));
priv = factory->priv;
GST_DEBUG_OBJECT (factory, "retransmission time %" G_GUINT64_FORMAT, time);
GST_RTSP_MEDIA_FACTORY_LOCK (factory);
priv->rtx_time = time;
GST_RTSP_MEDIA_FACTORY_UNLOCK (factory);
}
/**
* gst_rtsp_media_factory_get_retransmission_time:
* @factory: a #GstRTSPMediaFactory
*
* Get the time that is stored for retransmission purposes
*
* Returns: a #GstClockTime
*/
GstClockTime
gst_rtsp_media_factory_get_retransmission_time (GstRTSPMediaFactory * factory)
{
GstRTSPMediaFactoryPrivate *priv;
GstClockTime res;
g_return_val_if_fail (GST_IS_RTSP_MEDIA_FACTORY (factory), 0);
priv = factory->priv;
GST_RTSP_MEDIA_FACTORY_LOCK (factory);
res = priv->rtx_time;
GST_RTSP_MEDIA_FACTORY_UNLOCK (factory);
return res;
}
static gboolean
compare_media (gpointer key, GstRTSPMedia * media1, GstRTSPMedia * media2)
{
@ -1084,6 +1135,7 @@ default_configure (GstRTSPMediaFactory * factory, GstRTSPMedia * media)
GstRTSPLowerTrans protocols;
GstRTSPAddressPool *pool;
GstRTSPPermissions *perms;
GstClockTime rtx_time;
/* configure the sharedness */
GST_RTSP_MEDIA_FACTORY_LOCK (factory);
@ -1093,6 +1145,7 @@ default_configure (GstRTSPMediaFactory * factory, GstRTSPMedia * media)
size = priv->buffer_size;
profiles = priv->profiles;
protocols = priv->protocols;
rtx_time = priv->rtx_time;
GST_RTSP_MEDIA_FACTORY_UNLOCK (factory);
gst_rtsp_media_set_suspend_mode (media, suspend_mode);
@ -1101,6 +1154,7 @@ default_configure (GstRTSPMediaFactory * factory, GstRTSPMedia * media)
gst_rtsp_media_set_buffer_size (media, size);
gst_rtsp_media_set_profiles (media, profiles);
gst_rtsp_media_set_protocols (media, protocols);
gst_rtsp_media_set_retransmission_time (media, rtx_time);
if ((pool = gst_rtsp_media_factory_get_address_pool (factory))) {
gst_rtsp_media_set_address_pool (media, pool);

View file

@ -141,6 +141,9 @@ GstRTSPAddressPool * gst_rtsp_media_factory_get_address_pool (GstRTSPMediaFacto
void gst_rtsp_media_factory_set_buffer_size (GstRTSPMediaFactory * factory,
guint size);
guint gst_rtsp_media_factory_get_buffer_size (GstRTSPMediaFactory * factory);
void gst_rtsp_media_factory_set_retransmission_time (GstRTSPMediaFactory * factory,
GstClockTime time);
GstClockTime gst_rtsp_media_factory_get_retransmission_time (GstRTSPMediaFactory * factory);
/* creating the media from the factory and a url */
GstRTSPMedia * gst_rtsp_media_factory_construct (GstRTSPMediaFactory *factory,

View file

@ -121,6 +121,9 @@ struct _GstRTSPMediaPrivate
GstRTSPTimeRange range; /* protected by lock */
GstClockTime range_start;
GstClockTime range_stop;
GList *payloads; /* protected by lock */
GstClockTime rtx_time; /* protected by lock */
};
#define DEFAULT_SHARED FALSE
@ -366,6 +369,8 @@ gst_rtsp_media_finalize (GObject * obj)
gst_object_unref (priv->element);
if (priv->pool)
g_object_unref (priv->pool);
if (priv->payloads)
g_list_free (priv->payloads);
g_mutex_clear (&priv->lock);
g_cond_clear (&priv->cond);
g_rec_mutex_clear (&priv->state_lock);
@ -1090,6 +1095,63 @@ gst_rtsp_media_get_buffer_size (GstRTSPMedia * media)
return res;
}
/**
* gst_rtsp_media_set_retransmission_time:
* @media: a #GstRTSPMedia
* @time: the new value
*
* Set the amount of time to store retransmission packets.
*/
void
gst_rtsp_media_set_retransmission_time (GstRTSPMedia * media, GstClockTime time)
{
GstRTSPMediaPrivate *priv;
guint i;
g_return_if_fail (GST_IS_RTSP_MEDIA (media));
GST_LOG_OBJECT (media, "set retransmission time %" G_GUINT64_FORMAT, time);
priv = media->priv;
g_mutex_lock (&priv->lock);
priv->rtx_time = time;
for (i = 0; i < priv->streams->len; i++) {
GstRTSPStream *stream = g_ptr_array_index (priv->streams, i);
gst_rtsp_stream_set_retransmission_time (stream, time);
}
if (priv->rtpbin)
g_object_set (priv->rtpbin, "do-retransmission", time > 0, NULL);
g_mutex_unlock (&priv->lock);
}
/**
* gst_rtsp_media_get_retransmission_time:
* @media: a #GstRTSPMedia
*
* Get the amount of time to store retransmission data.
*
* Returns: the amount of time to store retransmission data.
*/
GstClockTime
gst_rtsp_media_get_retransmission_time (GstRTSPMedia * media)
{
GstRTSPMediaPrivate *priv;
GstClockTime res;
g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
priv = media->priv;
g_mutex_unlock (&priv->lock);
res = priv->rtx_time;
g_mutex_unlock (&priv->lock);
return res;
}
/**
* gst_rtsp_media_use_time_provider:
* @media: a #GstRTSPMedia
@ -1198,6 +1260,37 @@ gst_rtsp_media_get_address_pool (GstRTSPMedia * media)
return result;
}
static GList *
_find_payload_types (GstRTSPMedia * media)
{
GList *ret = NULL;
gint i, n;
n = media->priv->streams->len;
for (i = 0; i < n; i++) {
GstRTSPStream *stream = g_ptr_array_index (media->priv->streams, i);
guint pt = gst_rtsp_stream_get_pt (stream);
ret = g_list_append (ret, GUINT_TO_POINTER (pt));
}
return ret;
}
static guint
_next_available_pt (GList * payloads)
{
guint i;
for (i = 96; i <= 127; i++) {
GList *iter = g_list_find (payloads, GINT_TO_POINTER (i));
if (!iter)
return GPOINTER_TO_UINT (i);
}
return 0;
}
/**
* gst_rtsp_media_collect_streams:
* @media: a #GstRTSPMedia
@ -1279,6 +1372,7 @@ gst_rtsp_media_create_stream (GstRTSPMedia * media, GstElement * payloader,
GstPad *srcpad;
gchar *name;
gint idx;
gint i, n;
g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), NULL);
g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
@ -1303,8 +1397,28 @@ gst_rtsp_media_create_stream (GstRTSPMedia * media, GstElement * payloader,
gst_rtsp_stream_set_address_pool (stream, priv->pool);
gst_rtsp_stream_set_profiles (stream, priv->profiles);
gst_rtsp_stream_set_protocols (stream, priv->protocols);
gst_rtsp_stream_set_retransmission_time (stream, priv->rtx_time);
g_ptr_array_add (priv->streams, stream);
if (priv->payloads)
g_list_free (priv->payloads);
priv->payloads = _find_payload_types (media);
n = priv->streams->len;
for (i = 0; i < n; i++) {
GstRTSPStream *stream = g_ptr_array_index (priv->streams, i);
guint rtx_pt = _next_available_pt (priv->payloads);
if (rtx_pt == 0) {
/* FIXME: ran out of space of dynamic payload types */
break;
}
gst_rtsp_stream_set_retransmission_pt (stream, rtx_pt);
priv->payloads = g_list_append (priv->payloads, GUINT_TO_POINTER (rtx_pt));
}
g_mutex_unlock (&priv->lock);
g_signal_emit (media, gst_rtsp_media_signals[SIGNAL_NEW_STREAM], 0, stream,
@ -2149,6 +2263,9 @@ start_prepare (GstRTSPMedia * media)
}
}
if (priv->rtpbin)
g_object_set (priv->rtpbin, "do-retransmission", priv->rtx_time > 0, NULL);
for (walk = priv->dynamic; walk; walk = g_list_next (walk)) {
GstElement *elem = walk->data;
DynPaySignalHandlers *handlers = g_slice_new (DynPaySignalHandlers);

View file

@ -188,6 +188,9 @@ GstRTSPAddressPool * gst_rtsp_media_get_address_pool (GstRTSPMedia *media);
void gst_rtsp_media_set_buffer_size (GstRTSPMedia *media, guint size);
guint gst_rtsp_media_get_buffer_size (GstRTSPMedia *media);
void gst_rtsp_media_set_retransmission_time (GstRTSPMedia *media, GstClockTime time);
GstClockTime gst_rtsp_media_get_retransmission_time (GstRTSPMedia *media);
void gst_rtsp_media_use_time_provider (GstRTSPMedia *media, gboolean time_provider);
gboolean gst_rtsp_media_is_time_provider (GstRTSPMedia *media);
GstNetTimeProvider * gst_rtsp_media_get_time_provider (GstRTSPMedia *media,

View file

@ -120,6 +120,7 @@ make_media (GstSDPMessage * sdp, GstSDPInfo * info, GstRTSPMedia * media,
const gchar *addrtype, *proto;
gchar *address;
guint ttl;
GstClockTime rtx_time;
gst_sdp_media_new (&smedia);
@ -340,6 +341,9 @@ make_media (GstSDPMessage * sdp, GstSDPInfo * info, GstRTSPMedia * media,
continue;
if (g_str_has_prefix (fname, "srtcp-"))
continue;
/* handled later */
if (g_str_has_prefix (fname, "x-gst-rtsp-server-rtx-time"))
continue;
if (g_str_has_prefix (fname, "a-")) {
/* attribute */
@ -359,6 +363,7 @@ make_media (GstSDPMessage * sdp, GstSDPInfo * info, GstRTSPMedia * media,
first = FALSE;
}
}
if (!first) {
tmp = g_string_free (fmtp, FALSE);
gst_sdp_media_add_attribute (smedia, "fmtp", tmp);
@ -369,6 +374,32 @@ make_media (GstSDPMessage * sdp, GstSDPInfo * info, GstRTSPMedia * media,
update_sdp_from_tags (stream, smedia);
if ((rtx_time = gst_rtsp_stream_get_retransmission_time (stream))) {
/* ssrc multiplexed retransmit functionality */
guint rtx_pt = gst_rtsp_stream_get_retransmission_pt (stream);
if (rtx_pt == 0) {
g_warning ("failed to find an available dynamic payload type. "
"Not adding retransmission");
} else {
gchar *tmp;
tmp = g_strdup_printf ("%d", rtx_pt);
gst_sdp_media_add_format (smedia, tmp);
g_free (tmp);
tmp = g_strdup_printf ("%d rtx/%d", rtx_pt, caps_rate);
gst_sdp_media_add_attribute (smedia, "rtpmap", tmp);
g_free (tmp);
tmp =
g_strdup_printf ("%d apt=%d;rtx-time=%" G_GINT64_FORMAT, rtx_pt,
caps_pt, GST_TIME_AS_MSECONDS (rtx_time));
gst_sdp_media_add_attribute (smedia, "fmtp", tmp);
g_free (tmp);
}
}
gst_sdp_message_add_media (sdp, smedia);
gst_sdp_media_free (smedia);

View file

@ -111,6 +111,11 @@ struct _GstRTSPStreamPrivate
GstElement *tee[2];
GstElement *funnel[2];
/* retransmission */
GstElement *rtxsend;
guint rtx_pt;
GstClockTime rtx_time;
/* server ports for sending/receiving over ipv4 */
GstRTSPRange server_port_v4;
GstRTSPAddress *server_addr_v4;
@ -271,6 +276,9 @@ gst_rtsp_stream_finalize (GObject * obj)
gst_rtsp_address_free (priv->server_addr_v6);
if (priv->pool)
g_object_unref (priv->pool);
if (priv->rtxsend)
g_object_unref (priv->rtxsend);
gst_object_unref (priv->payloader);
gst_object_unref (priv->srcpad);
g_free (priv->control);
@ -1306,6 +1314,82 @@ gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
g_mutex_unlock (&priv->lock);
}
/**
* gst_rtsp_stream_set_retransmission_time:
* @stream: a #GstRTSPStream
* @time: a #GstClockTime
*
* Set the amount of time to store retransmission packets.
*/
void
gst_rtsp_stream_set_retransmission_time (GstRTSPStream * stream,
GstClockTime time)
{
GST_DEBUG_OBJECT (stream, "set retransmission time %" G_GUINT64_FORMAT, time);
g_mutex_lock (&stream->priv->lock);
stream->priv->rtx_time = time;
if (stream->priv->rtxsend)
g_object_set (stream->priv->rtxsend, "max-size-time",
GST_TIME_AS_MSECONDS (time), NULL);
g_mutex_unlock (&stream->priv->lock);
}
/**
* gst_rtsp_media_get_retransmission_time:
* @media: a #GstRTSPMedia
*
* Get the amount of time to store retransmission data.
*
* Returns: the amount of time to store retransmission data.
*/
GstClockTime
gst_rtsp_stream_get_retransmission_time (GstRTSPStream * stream)
{
GstClockTime ret;
g_return_if_fail (GST_IS_RTSP_STREAM (stream));
g_mutex_lock (&stream->priv->lock);
ret = stream->priv->rtx_time;
g_mutex_unlock (&stream->priv->lock);
return ret;
}
void
gst_rtsp_stream_set_retransmission_pt (GstRTSPStream * stream, guint rtx_pt)
{
g_return_if_fail (GST_IS_RTSP_STREAM (stream));
GST_DEBUG_OBJECT (stream, "set retransmission pt %u", rtx_pt);
g_mutex_lock (&stream->priv->lock);
stream->priv->rtx_pt = rtx_pt;
if (stream->priv->rtxsend) {
guint pt = gst_rtsp_stream_get_pt (stream);
gchar *pt_s = g_strdup_printf ("%d", pt);
GstStructure *rtx_pt_map = gst_structure_new ("application/x-rtp-pt-map",
pt_s, G_TYPE_UINT, rtx_pt, NULL);
g_object_set (stream->priv->rtxsend, "payload-type-map", rtx_pt_map, NULL);
}
g_mutex_unlock (&stream->priv->lock);
}
guint
gst_rtsp_stream_get_retransmission_pt (GstRTSPStream * stream)
{
guint rtx_pt;
g_return_if_fail (GST_IS_RTSP_STREAM (stream));
g_mutex_lock (&stream->priv->lock);
rtx_pt = stream->priv->rtx_pt;
g_mutex_unlock (&stream->priv->lock);
return rtx_pt;
}
/* executed from streaming thread */
static void
caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
@ -1657,6 +1741,46 @@ request_rtcp_decoder (GstElement * rtpbin, guint session,
return gst_object_ref (priv->srtpdec);
}
static GstElement *
request_aux_sender (GstElement * rtpbin, guint sessid, GstRTSPStream * stream)
{
GstElement *bin;
GstPad *pad;
GstStructure *pt_map;
gchar *name;
guint pt, rtx_pt;
gchar *pt_s;
pt = gst_rtsp_stream_get_pt (stream);
pt_s = g_strdup_printf ("%u", pt);
rtx_pt = stream->priv->rtx_pt;
GST_INFO ("creating rtxsend with pt %u to %u", pt, rtx_pt);
bin = gst_bin_new (NULL);
stream->priv->rtxsend = gst_element_factory_make ("rtprtxsend", NULL);
pt_map = gst_structure_new ("application/x-rtp-pt-map",
pt_s, G_TYPE_UINT, rtx_pt, NULL);
g_object_set (stream->priv->rtxsend, "payload-type-map", pt_map,
"max-size-time", GST_TIME_AS_MSECONDS (stream->priv->rtx_time), NULL);
gst_structure_free (pt_map);
gst_bin_add (GST_BIN (bin), gst_object_ref (stream->priv->rtxsend));
pad = gst_element_get_static_pad (stream->priv->rtxsend, "src");
name = g_strdup_printf ("src_%u", sessid);
gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
g_free (name);
gst_object_unref (pad);
pad = gst_element_get_static_pad (stream->priv->rtxsend, "sink");
name = g_strdup_printf ("sink_%u", sessid);
gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
g_free (name);
gst_object_unref (pad);
return bin;
}
/**
* gst_rtsp_stream_join_bin:
* @stream: a #GstRTSPStream
@ -1714,6 +1838,12 @@ gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
(GCallback) request_rtcp_decoder, stream);
}
if (priv->rtx_time > 0) {
/* enable retransmission by setting rtprtxsend as the "aux" element of rtpbin */
g_signal_connect (rtpbin, "request-aux-sender",
(GCallback) request_aux_sender, stream);
}
/* get a pad for sending RTP */
name = g_strdup_printf ("send_rtp_sink_%u", idx);
priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);

View file

@ -147,7 +147,6 @@ GSocket * gst_rtsp_stream_get_rtcp_socket (GstRTSPStream *stream,
gboolean gst_rtsp_stream_update_crypto (GstRTSPStream * stream,
guint ssrc, GstCaps * crypto);
gboolean gst_rtsp_stream_query_position (GstRTSPStream * stream,
gint64 * position);
gboolean gst_rtsp_stream_query_stop (GstRTSPStream * stream,
@ -155,6 +154,11 @@ gboolean gst_rtsp_stream_query_stop (GstRTSPStream * stream,
void gst_rtsp_stream_set_seqnum_offset (GstRTSPStream *stream, guint16 seqnum);
guint16 gst_rtsp_stream_get_current_seqnum (GstRTSPStream *stream);
void gst_rtsp_stream_set_retransmission_time (GstRTSPStream *stream, GstClockTime time);
GstClockTime gst_rtsp_stream_get_retransmission_time (GstRTSPStream *stream);
guint gst_rtsp_stream_get_retransmission_pt (GstRTSPStream * stream);
void gst_rtsp_stream_set_retransmission_pt (GstRTSPStream * stream,
guint rtx_pt);
/**
* GstRTSPStreamTransportFilterFunc: