rtsp-stream: postpone UDP socket allocation until SETUP

Postpone the allocation of the UDP sockets until we know
what transport has been chosen by the client.
Both unicast and multicast UDP sources are created in one
function.

https://bugzilla.gnome.org/show_bug.cgi?id=757488
This commit is contained in:
Patricia Muscalu 2016-02-23 14:59:32 +01:00 committed by Sebastian Dröge
parent d10ba734cd
commit f62a9a7eb9
3 changed files with 186 additions and 180 deletions

View file

@ -1403,26 +1403,26 @@ default_configure_client_transport (GstRTSPClient * client,
/* we have a valid transport now, set the destination of the client. */
if (ct->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
gboolean use_client_settings;
GSocketFamily family;
use_client_settings =
gst_rtsp_auth_check (GST_RTSP_AUTH_CHECK_TRANSPORT_CLIENT_SETTINGS);
if (ct->destination && use_client_settings) {
GstRTSPAddress *addr;
family = priv->is_ipv6 ? G_SOCKET_FAMILY_IPV6 : G_SOCKET_FAMILY_IPV4;
addr = gst_rtsp_stream_reserve_address (ctx->stream, ct->destination,
ct->port.min, ct->port.max - ct->port.min + 1, ct->ttl);
if (!gst_rtsp_stream_allocate_udp_sockets (ctx->stream, family, ct, TRUE))
goto no_udp_protocol;
if (addr == NULL)
goto no_address;
gst_rtsp_address_free (addr);
} else {
GstRTSPAddress *addr;
GSocketFamily family;
family = priv->is_ipv6 ? G_SOCKET_FAMILY_IPV6 : G_SOCKET_FAMILY_IPV4;
if (!gst_rtsp_stream_allocate_udp_sockets (ctx->stream, family, ct, FALSE))
goto no_udp_protocol;
gst_rtsp_stream_get_server_port (ctx->stream, &ct->port, family);
addr = gst_rtsp_stream_get_multicast_address (ctx->stream, family);
if (addr == NULL)
goto no_address;
@ -1475,6 +1475,11 @@ default_configure_client_transport (GstRTSPClient * client,
gst_rtsp_session_media_alloc_channels (ctx->sessmedia,
&ct->interleaved);
}
} else if (ct->lower_transport & GST_RTSP_LOWER_TRANS_UDP) {
GSocketFamily family;
family = priv->is_ipv6 ? G_SOCKET_FAMILY_IPV6 : G_SOCKET_FAMILY_IPV4;
if (!gst_rtsp_stream_allocate_udp_sockets (ctx->stream, family, ct, FALSE))
goto no_udp_protocol;
}
}
return TRUE;
@ -1485,6 +1490,11 @@ no_address:
GST_ERROR_OBJECT (client, "failed to acquire address for stream");
return FALSE;
}
no_udp_protocol:
{
GST_ERROR_OBJECT (client, "failed to allocate udp ports");
return FALSE;
}
}
static GstRTSPTransport *

View file

@ -62,15 +62,6 @@
#define GST_RTSP_STREAM_GET_PRIVATE(obj) \
(G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_STREAM, GstRTSPStreamPrivate))
typedef struct
{
GstRTSPStreamTransport *transport;
/* RTP and RTCP source */
GstElement *udpsrc[2];
GstPad *selpad[2];
} GstRTSPMulticastTransportSource;
struct _GstRTSPStreamPrivate
{
GMutex lock;
@ -106,10 +97,14 @@ struct _GstRTSPStreamPrivate
/* sinks used for sending and receiving RTP and RTCP over ipv4, they share
* sockets */
GstElement *udpsrc_v4[2];
/* UDP sources for UDP multicast transports */
GstElement *udpsrc_mcast_v4[2];
/* sinks used for sending and receiving RTP and RTCP over ipv6, they share
* sockets */
GstElement *udpsrc_v6[2];
/* UDP sources for UDP multicast transports */
GstElement *udpsrc_mcast_v6[2];
GstElement *udpqueue[2];
GstElement *udpsink[2];
@ -142,6 +137,8 @@ struct _GstRTSPStreamPrivate
GstRTSPAddressPool *pool;
GstRTSPAddress *addr_v4;
GstRTSPAddress *addr_v6;
gboolean have_ipv4_mcast;
gboolean have_ipv6_mcast;
/* the caps of the stream */
gulong caps_sig;
@ -157,9 +154,6 @@ struct _GstRTSPStreamPrivate
guint tr_cache_cookie_rtcp;
/* UDP sources for UDP multicast transports */
GList *transport_sources;
gint dscp_qos;
/* stream blocking */
@ -1106,7 +1100,8 @@ no_udp_protocol:
/* must be called with lock */
static void
play_udpsources_one_family (GstRTSPStream * stream, GSocketFamily family)
play_udpsources_one_family (GstRTSPStream * stream, GstElement * udpsrc_out[2],
GSocketFamily family)
{
GstRTSPStreamPrivate *priv;
GstPad *pad, *selpad;
@ -1118,39 +1113,22 @@ play_udpsources_one_family (GstRTSPStream * stream, GSocketFamily family)
for (i = 0; i < 2; i++) {
if (priv->sinkpad || i == 1) {
if (family == G_SOCKET_FAMILY_IPV4 && priv->udpsrc_v4[i]) {
if (priv->srcpad) {
/* we set and keep these to playing so that they don't cause NO_PREROLL return
* values. This is only relevant for PLAY pipelines */
gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_PLAYING);
gst_element_set_locked_state (priv->udpsrc_v4[i], TRUE);
gst_element_set_state (udpsrc_out[i], GST_STATE_PLAYING);
gst_element_set_locked_state (udpsrc_out[i], TRUE);
}
/* add udpsrc */
gst_bin_add (bin, priv->udpsrc_v4[i]);
gst_bin_add (bin, udpsrc_out[i]);
/* and link to the funnel v4 */
/* and link to the funnel */
selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
pad = gst_element_get_static_pad (priv->udpsrc_v4[i], "src");
pad = gst_element_get_static_pad (udpsrc_out[i], "src");
gst_pad_link (pad, selpad);
gst_object_unref (pad);
gst_object_unref (selpad);
}
if (family == G_SOCKET_FAMILY_IPV6 && priv->udpsrc_v6[i]) {
if (priv->srcpad) {
gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_PLAYING);
gst_element_set_locked_state (priv->udpsrc_v6[i], TRUE);
}
gst_bin_add (bin, priv->udpsrc_v6[i]);
/* and link to the funnel v6 */
selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
pad = gst_element_get_static_pad (priv->udpsrc_v6[i], "src");
gst_pad_link (pad, selpad);
gst_object_unref (pad);
gst_object_unref (selpad);
}
}
}
gst_object_unref (bin);
@ -1159,18 +1137,27 @@ play_udpsources_one_family (GstRTSPStream * stream, GSocketFamily family)
/* must be called with lock */
static gboolean
create_and_configure_udpsources_one_family (GstElement * udpsrc_out[2],
GSocket * rtp_socket, GSocket * rtcp_socket, GSocketFamily family)
GSocket * rtp_socket, GSocket * rtcp_socket, GSocketFamily family,
const gchar * address, gint rtpport, gint rtcpport,
GstRTSPLowerTrans transport)
{
GstStateChangeReturn ret;
/* we keep these elements, we will further configure them when the
* client told us to really use the UDP ports. */
udpsrc_out[0] = gst_element_factory_make ("udpsrc", NULL);
udpsrc_out[1] = gst_element_factory_make ("udpsrc", NULL);
if (udpsrc_out[0] == NULL || udpsrc_out[1] == NULL)
goto error;
if (transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
g_object_set (G_OBJECT (udpsrc_out[0]), "address", address, NULL);
g_object_set (G_OBJECT (udpsrc_out[1]), "address", address, NULL);
g_object_set (G_OBJECT (udpsrc_out[0]), "port", rtpport, NULL);
g_object_set (G_OBJECT (udpsrc_out[1]), "port", rtcpport, NULL);
g_object_set (G_OBJECT (udpsrc_out[0]), "loop", FALSE, NULL);
g_object_set (G_OBJECT (udpsrc_out[1]), "loop", FALSE, NULL);
}
g_object_set (G_OBJECT (udpsrc_out[0]), "socket", rtp_socket, NULL);
g_object_set (G_OBJECT (udpsrc_out[1]), "socket", rtcp_socket, NULL);
@ -1198,7 +1185,8 @@ error:
static gboolean
alloc_ports_one_family (GstRTSPStream * stream, GSocketFamily family,
GstElement * udpsrc_out[2], GstRTSPRange * server_port_out,
GstRTSPAddress ** server_addr_out)
GstRTSPTransport *ct, GstRTSPAddress ** server_addr_out,
gboolean use_client_settings)
{
GstRTSPStreamPrivate *priv = stream->priv;
GSocket *rtp_socket = NULL;
@ -1209,12 +1197,15 @@ alloc_ports_one_family (GstRTSPStream * stream, GSocketFamily family,
GList *rejected_addresses = NULL;
GstRTSPAddress *addr = NULL;
GInetAddress *inetaddr = NULL;
gchar *addr_str;
GSocketAddress *rtp_sockaddr = NULL;
GSocketAddress *rtcp_sockaddr = NULL;
GstRTSPAddressPool * pool;
GstRTSPLowerTrans transport;
pool = priv->pool;
count = 0;
transport = ct->lower_transport;
/* Start with random port */
tmp_rtp = 0;
@ -1238,18 +1229,29 @@ again:
goto no_udp_protocol;
}
if (pool && gst_rtsp_address_pool_has_unicast_addresses (pool)) {
if (pool) {
GstRTSPAddressFlags flags;
if (transport == GST_RTSP_LOWER_TRANS_UDP &&
gst_rtsp_address_pool_has_unicast_addresses (pool))
flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_UNICAST;
else if (transport == GST_RTSP_LOWER_TRANS_UDP_MCAST)
flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST;
else
goto no_ports;
if (addr)
rejected_addresses = g_list_prepend (rejected_addresses, addr);
flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_UNICAST;
if (family == G_SOCKET_FAMILY_IPV6)
flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
else
flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
if (ct->destination && transport == GST_RTSP_LOWER_TRANS_UDP_MCAST && use_client_settings)
gst_rtsp_address_pool_reserve_address (pool, ct->destination,
ct->port.min, 2, ct->ttl, &addr);
else
addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
if (addr == NULL)
@ -1306,12 +1308,23 @@ again:
}
g_object_unref (rtcp_sockaddr);
if (addr == NULL)
addr_str = g_inet_address_to_string (inetaddr);
else
addr_str = addr->address;
g_clear_object (&inetaddr);
if (!create_and_configure_udpsources_one_family (udpsrc_out, rtp_socket,
rtcp_socket, family))
rtcp_socket, family, addr_str, tmp_rtp, tmp_rtcp, transport)) {
if (addr == NULL)
g_free (addr_str);
goto no_udp_protocol;
play_udpsources_one_family (stream, family);
}
if (addr == NULL)
g_free (addr_str);
play_udpsources_one_family (stream, udpsrc_out, family);
g_object_get (G_OBJECT (udpsrc_out[0]), "port", &rtpport, NULL);
g_object_get (G_OBJECT (udpsrc_out[1]), "port", &rtcpport, NULL);
@ -1320,7 +1333,6 @@ again:
if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
goto port_error;
/* set RTP and RTCP sockets */
set_sockets_for_udpsinks (stream, rtp_socket, rtcp_socket, family);
@ -1368,21 +1380,65 @@ cleanup:
}
}
/* must be called with lock */
static gboolean
alloc_ports (GstRTSPStream * stream)
/**
* gst_rtsp_stream_allocate_udp_sockets:
* @stream: a #GstRTSPStream
* @family: protocol family
* @transport_method: transport method
*
* Allocates RTP and RTCP ports.
*
* Returns: %TRUE if the RTP and RTCP sockets have been succeccully allocated.
*/
gboolean
gst_rtsp_stream_allocate_udp_sockets (GstRTSPStream * stream,
GSocketFamily family, GstRTSPTransport *ct, gboolean use_client_settings)
{
GstRTSPStreamPrivate *priv = stream->priv;
GstRTSPStreamPrivate *priv;
gboolean result = FALSE;
GstRTSPLowerTrans transport = ct->lower_transport;
g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
priv = stream->priv;
g_return_val_if_fail (priv->is_joined, FALSE);
g_mutex_lock (&priv->lock);
if (family == G_SOCKET_FAMILY_IPV4) {
if (transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
if (priv->have_ipv4_mcast)
goto done;
priv->have_ipv4_mcast =
alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4, priv->udpsrc_mcast_v4,
&priv->server_port_v4, ct, &priv->addr_v4, use_client_settings);
} else {
priv->have_ipv4 =
alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4, priv->udpsrc_v4,
&priv->server_port_v4, &priv->server_addr_v4);
&priv->server_port_v4, ct, &priv->server_addr_v4, use_client_settings);
}
} else {
if (transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
if (priv->have_ipv6_mcast)
goto done;
priv->have_ipv6_mcast =
alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6, priv->udpsrc_mcast_v6,
&priv->server_port_v6, ct, &priv->addr_v6, use_client_settings);
} else {
if (priv->have_ipv6)
goto done;
priv->have_ipv6 =
alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6, priv->udpsrc_v6,
&priv->server_port_v6, &priv->server_addr_v6);
&priv->server_port_v6, ct, &priv->server_addr_v6, use_client_settings);
}
}
return priv->have_ipv4 || priv->have_ipv6;
done:
result = priv->have_ipv4 || priv->have_ipv4_mcast || priv->have_ipv6 ||
priv->have_ipv6_mcast;
g_mutex_unlock (&priv->lock);
return result;
}
/**
@ -2474,7 +2530,6 @@ gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
guint idx;
gchar *name;
GstPadLinkReturn ret;
gboolean is_udp;
g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
@ -2571,12 +2626,6 @@ gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
create_receiver_part (stream, bin, state);
is_udp = ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
(priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST));
if (is_udp && !alloc_ports (stream))
goto no_udp_protocol;
if (priv->srcpad) {
/* be notified of caps changes */
priv->caps_sig = g_signal_connect (priv->send_src[0], "notify::caps",
@ -2629,6 +2678,16 @@ no_udp_protocol:
gst_object_unref (priv->udpsrc_v4[1]);
priv->udpsrc_v4[1] = NULL;
}
if (priv->udpsrc_mcast_v4[0]) {
gst_element_set_state (priv->udpsrc_mcast_v4[0], GST_STATE_NULL);
gst_object_unref (priv->udpsrc_mcast_v4[0]);
priv->udpsrc_mcast_v4[0] = NULL;
}
if (priv->udpsrc_mcast_v4[1]) {
gst_element_set_state (priv->udpsrc_mcast_v4[1], GST_STATE_NULL);
gst_object_unref (priv->udpsrc_mcast_v4[1]);
priv->udpsrc_mcast_v4[1] = NULL;
}
if (priv->udpsrc_v6[0]) {
gst_element_set_state (priv->udpsrc_v6[0], GST_STATE_NULL);
gst_object_unref (priv->udpsrc_v6[0]);
@ -2639,6 +2698,16 @@ no_udp_protocol:
gst_object_unref (priv->udpsrc_v6[1]);
priv->udpsrc_v6[1] = NULL;
}
if (priv->udpsrc_mcast_v6[0]) {
gst_element_set_state (priv->udpsrc_mcast_v6[0], GST_STATE_NULL);
gst_object_unref (priv->udpsrc_mcast_v6[0]);
priv->udpsrc_mcast_v6[0] = NULL;
}
if (priv->udpsrc_mcast_v6[1]) {
gst_element_set_state (priv->udpsrc_mcast_v6[1], GST_STATE_NULL);
gst_object_unref (priv->udpsrc_mcast_v6[1]);
priv->udpsrc_mcast_v6[1] = NULL;
}
g_mutex_unlock (&priv->lock);
return FALSE;
}
@ -2660,7 +2729,6 @@ gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
{
GstRTSPStreamPrivate *priv;
gint i;
GList *l;
gboolean is_tcp, is_udp;
g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
@ -2732,6 +2800,20 @@ gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
}
}
if (priv->udpsrc_mcast_v4[i]) {
if (priv->sinkpad || i == 1) {
/* and set udpsrc to NULL now before removing */
gst_element_set_locked_state (priv->udpsrc_mcast_v4[i], FALSE);
gst_element_set_state (priv->udpsrc_mcast_v4[i], GST_STATE_NULL);
/* removing them should also nicely release the request
* pads when they finalize */
gst_bin_remove (bin, priv->udpsrc_mcast_v4[i]);
} else {
gst_element_set_state (priv->udpsrc_mcast_v4[i], GST_STATE_NULL);
gst_object_unref (priv->udpsrc_mcast_v4[i]);
}
}
if (priv->udpsrc_v6[i]) {
if (priv->sinkpad || i == 1) {
gst_element_set_locked_state (priv->udpsrc_v6[i], FALSE);
@ -2742,16 +2824,15 @@ gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
gst_object_unref (priv->udpsrc_v6[i]);
}
}
for (l = priv->transport_sources; l; l = l->next) {
GstRTSPMulticastTransportSource *s = l->data;
if (!s->udpsrc[i])
continue;
gst_element_set_locked_state (s->udpsrc[i], FALSE);
gst_element_set_state (s->udpsrc[i], GST_STATE_NULL);
gst_bin_remove (bin, s->udpsrc[i]);
if (priv->udpsrc_mcast_v6[i]) {
if (priv->sinkpad || i == 1) {
gst_element_set_locked_state (priv->udpsrc_mcast_v6[i], FALSE);
gst_element_set_state (priv->udpsrc_mcast_v6[i], GST_STATE_NULL);
gst_bin_remove (bin, priv->udpsrc_mcast_v6[i]);
} else {
gst_element_set_state (priv->udpsrc_mcast_v6[i], GST_STATE_NULL);
gst_object_unref (priv->udpsrc_mcast_v6[i]);
}
}
if (priv->udpsink[i] && is_udp && (priv->srcpad || i == 1))
@ -2777,6 +2858,8 @@ gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
priv->udpsrc_v4[i] = NULL;
priv->udpsrc_v6[i] = NULL;
priv->udpsrc_mcast_v4[i] = NULL;
priv->udpsrc_mcast_v6[i] = NULL;
priv->udpsink[i] = NULL;
priv->appsrc[i] = NULL;
priv->appsink[i] = NULL;
@ -2786,13 +2869,6 @@ gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
priv->funnel[i] = NULL;
}
for (l = priv->transport_sources; l; l = l->next) {
GstRTSPMulticastTransportSource *s = l->data;
g_slice_free (GstRTSPMulticastTransportSource, s);
}
g_list_free (priv->transport_sources);
priv->transport_sources = NULL;
if (priv->srcpad) {
gst_object_unref (priv->send_src[0]);
priv->send_src[0] = NULL;
@ -3139,89 +3215,6 @@ update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
switch (tr->lower_transport) {
case GST_RTSP_LOWER_TRANS_UDP_MCAST:
{
GstRTSPMulticastTransportSource *source;
GstBin *bin;
bin = GST_BIN (gst_object_get_parent (GST_OBJECT (priv->funnel[1])));
if (add) {
gchar *host;
gint i;
GstPad *selpad, *pad;
source = g_slice_new0 (GstRTSPMulticastTransportSource);
source->transport = trans;
for (i = 0; i < 2; i++) {
host =
g_strdup_printf ("udp://%s:%d", tr->destination,
(i == 0) ? tr->port.min : tr->port.max);
source->udpsrc[i] =
gst_element_make_from_uri (GST_URI_SRC, host, NULL, NULL);
g_free (host);
g_object_set (source->udpsrc[i], "loop", FALSE, NULL);
if (priv->srcpad) {
/* we set and keep these to playing so that they don't cause NO_PREROLL return
* values. This is only relevant for PLAY pipelines */
gst_element_set_state (source->udpsrc[i], GST_STATE_PLAYING);
gst_element_set_locked_state (source->udpsrc[i], TRUE);
}
/* add udpsrc */
gst_bin_add (bin, source->udpsrc[i]);
/* and link to the funnel v4 */
if (priv->sinkpad || i == 1) {
source->selpad[i] = selpad =
gst_element_get_request_pad (priv->funnel[i], "sink_%u");
pad = gst_element_get_static_pad (source->udpsrc[i], "src");
gst_pad_link (pad, selpad);
gst_object_unref (pad);
gst_object_unref (selpad);
}
}
priv->transport_sources =
g_list_prepend (priv->transport_sources, source);
} else {
GList *l;
for (l = priv->transport_sources; l; l = l->next) {
source = l->data;
if (source->transport == trans) {
priv->transport_sources =
g_list_delete_link (priv->transport_sources, l);
break;
}
}
if (l != NULL) {
gint i;
for (i = 0; i < 2; i++) {
/* Will automatically unlink everything */
gst_bin_remove (bin,
GST_ELEMENT (gst_object_ref (source->udpsrc[i])));
gst_element_set_state (source->udpsrc[i], GST_STATE_NULL);
gst_object_unref (source->udpsrc[i]);
if (priv->sinkpad || i == 1) {
gst_element_release_request_pad (priv->funnel[i],
source->selpad[i]);
}
}
g_slice_free (GstRTSPMulticastTransportSource, source);
}
}
gst_object_unref (bin);
/* fall through for the generic case */
}
case GST_RTSP_LOWER_TRANS_UDP:
{
gchar *dest;

View file

@ -168,6 +168,9 @@ guint gst_rtsp_stream_get_buffer_size (GstRTSPStream *stream);
void gst_rtsp_stream_set_pt_map (GstRTSPStream * stream, guint pt, GstCaps * caps);
GstElement * gst_rtsp_stream_request_aux_sender (GstRTSPStream * stream, guint sessid);
gboolean gst_rtsp_stream_allocate_udp_sockets (GstRTSPStream * stream, GSocketFamily family,
GstRTSPTransport *transport, gboolean use_client_setttings);
/**
* GstRTSPStreamTransportFilterFunc:
* @stream: a #GstRTSPStream object