stream: revert back to create udpsrc/udpsink on DESCRIBE for unicast

This is basically reverting changes introduced in commit f62a9a7,
because it was introducing various regressions:

- It introduces a leak of udpsrc elements that got wrongly fixed by adding
  an hash table in commit cba045e. We should have at most 4 udpsrc for unicast:
  ipv4/ipv6, rtp/rtcp. They can be reused for all unicast clients.
- If a mcast client connects, it creates a new socket in SETUP to try to respect
  the destination/port given by the client in the transport, and overrides the
  socket already set on the udpsink element. That means that if we already had a
  client connected, the source address on the udp packets it receives suddenly
  changes.
- If a 2nd mcast client connects, the destination/port in its transport is
  ignored but its transport wasn't updated.

What this patch does:

- Revert back to create udpsrc/udpsink for unicast clients on DESCRIBE.
- Always have a tee+queue when udp is enabled. This could be optimized
  again in a later patch, but is more complicated. If no unicast clients
  connects then those elements are useless, this could be also optimized
  in a later patch.
- When mcast transport is added, it creates a new set of udpsrc/udpsink,
  seperated from those for unicast clients. Since we already support only
  one mcast address, we also create only one set of elements.

https://bugzilla.gnome.org/show_bug.cgi?id=766612
This commit is contained in:
Xavier Claessens 2016-07-28 15:33:05 -04:00 committed by Sebastian Dröge
parent aa0e60445d
commit 8495c47a9d
3 changed files with 319 additions and 376 deletions

View file

@ -1421,27 +1421,26 @@ default_configure_client_transport (GstRTSPClient * client,
/* we have a valid transport now, set the destination of the client. */
if (ct->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
gboolean use_client_settings;
GSocketFamily family;
use_client_settings =
gst_rtsp_auth_check (GST_RTSP_AUTH_CHECK_TRANSPORT_CLIENT_SETTINGS);
if (ct->destination && use_client_settings) {
family = priv->is_ipv6 ? G_SOCKET_FAMILY_IPV6 : G_SOCKET_FAMILY_IPV4;
if (!gst_rtsp_stream_allocate_udp_sockets (ctx->stream, family, ct, TRUE))
goto no_udp_protocol;
} else {
GstRTSPAddress *addr;
addr = gst_rtsp_stream_reserve_address (ctx->stream, ct->destination,
ct->port.min, ct->port.max - ct->port.min + 1, ct->ttl);
if (addr == NULL)
goto no_address;
gst_rtsp_address_free (addr);
} else {
GstRTSPAddress *addr;
GSocketFamily family;
family = priv->is_ipv6 ? G_SOCKET_FAMILY_IPV6 : G_SOCKET_FAMILY_IPV4;
if (!gst_rtsp_stream_allocate_udp_sockets (ctx->stream, family, ct,
FALSE))
goto no_udp_protocol;
gst_rtsp_stream_get_server_port (ctx->stream, &ct->port, family);
addr = gst_rtsp_stream_get_multicast_address (ctx->stream, family);
if (addr == NULL)
goto no_address;
@ -1494,12 +1493,6 @@ default_configure_client_transport (GstRTSPClient * client,
gst_rtsp_session_media_alloc_channels (ctx->sessmedia,
&ct->interleaved);
}
} else if (ct->lower_transport & GST_RTSP_LOWER_TRANS_UDP) {
GSocketFamily family;
family = priv->is_ipv6 ? G_SOCKET_FAMILY_IPV6 : G_SOCKET_FAMILY_IPV4;
if (!gst_rtsp_stream_allocate_udp_sockets (ctx->stream, family, ct,
FALSE))
goto no_udp_protocol;
}
}
return TRUE;
@ -1510,11 +1503,6 @@ no_address:
GST_ERROR_OBJECT (client, "failed to acquire address for stream");
return FALSE;
}
no_udp_protocol:
{
GST_ERROR_OBJECT (client, "failed to allocate udp ports");
return FALSE;
}
}
static GstRTSPTransport *

View file

@ -94,21 +94,18 @@ struct _GstRTSPStreamPrivate
GstElement *srtpdec;
GHashTable *keys;
/* sinks used for sending and receiving RTP and RTCP over ipv4, they share
* sockets */
/* for UDP unicast */
GstElement *udpsrc_v4[2];
/* UDP sources for UDP multicast transports */
GstElement *udpsrc_mcast_v4[2];
/* sinks used for sending and receiving RTP and RTCP over ipv6, they share
* sockets */
GstElement *udpsrc_v6[2];
/* UDP sources for UDP multicast transports */
GstElement *udpsrc_mcast_v6[2];
GstElement *udpqueue[2];
GstElement *udpsink[2];
/* for UDP multicast */
GstElement *mcast_udpsrc_v4[2];
GstElement *mcast_udpsrc_v6[2];
GstElement *mcast_udpqueue[2];
GstElement *mcast_udpsink[2];
/* for TCP transport */
GstElement *appsrc[2];
GstClockTime appsrc_base_time[2];
@ -123,22 +120,18 @@ struct _GstRTSPStreamPrivate
guint rtx_pt;
GstClockTime rtx_time;
/* server ports for sending/receiving over ipv4 */
GstRTSPRange server_port_v4;
GstRTSPAddress *server_addr_v4;
gboolean have_ipv4;
/* pool used to manage unicast and multicast addresses */
GstRTSPAddressPool *pool;
/* server ports for sending/receiving over ipv6 */
/* unicast server addr/port */
GstRTSPRange server_port_v4;
GstRTSPRange server_port_v6;
GstRTSPAddress *server_addr_v4;
GstRTSPAddress *server_addr_v6;
gboolean have_ipv6;
/* multicast addresses */
GstRTSPAddressPool *pool;
GstRTSPAddress *mcast_addr_v4;
GstRTSPAddress *mcast_addr_v6;
gboolean have_ipv4_mcast;
gboolean have_ipv6_mcast;
gchar *multicast_iface;
@ -155,7 +148,6 @@ struct _GstRTSPStreamPrivate
guint tr_cache_cookie_rtp;
guint tr_cache_cookie_rtcp;
gint dscp_qos;
/* stream blocking */
@ -595,22 +587,18 @@ gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
/* Update the dscp qos property on the udp sinks */
static void
update_dscp_qos (GstRTSPStream * stream)
update_dscp_qos (GstRTSPStream * stream, GstElement * udpsink[2])
{
GstRTSPStreamPrivate *priv;
g_return_if_fail (GST_IS_RTSP_STREAM (stream));
priv = stream->priv;
if (priv->udpsink[0]) {
g_object_set (G_OBJECT (priv->udpsink[0]), "qos-dscp", priv->dscp_qos,
NULL);
if (udpsink[0]) {
g_object_set (G_OBJECT (udpsink[0]), "qos-dscp", priv->dscp_qos, NULL);
}
if (priv->udpsink[1]) {
g_object_set (G_OBJECT (priv->udpsink[1]), "qos-dscp", priv->dscp_qos,
NULL);
if (udpsink[1]) {
g_object_set (G_OBJECT (udpsink[1]), "qos-dscp", priv->dscp_qos, NULL);
}
}
@ -639,7 +627,7 @@ gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos)
priv->dscp_qos = dscp_qos;
update_dscp_qos (stream);
update_dscp_qos (stream, priv->udpsink);
}
/**
@ -971,6 +959,12 @@ gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
*addrp = gst_rtsp_address_pool_acquire_address (priv->pool, flags, 2);
if (*addrp == NULL)
goto no_address;
/* FIXME: Also reserve the same port with unicast ANY address, since that's
* where we are going to bind our socket. Probably loop until we find a port
* available in both mcast and unicast pools. Maybe GstRTSPAddressPool
* should do it for us when both GST_RTSP_ADDRESS_FLAG_MULTICAST and
* GST_RTSP_ADDRESS_FLAG_UNICAST are givent. */
}
result = gst_rtsp_address_copy (*addrp);
g_mutex_unlock (&priv->lock);
@ -1050,6 +1044,9 @@ gst_rtsp_stream_reserve_address (GstRTSPStream * stream,
port, n_ports, ttl, addrp);
if (res != GST_RTSP_ADDRESS_POOL_OK)
goto no_address;
/* FIXME: Also reserve the same port with unicast ANY address, since that's
* where we are going to bind our socket. */
} else {
if (strcmp ((*addrp)->address, address) ||
(*addrp)->port != port || (*addrp)->n_ports != n_ports ||
@ -1086,10 +1083,9 @@ different_address:
/* must be called with lock */
static void
set_sockets_for_udpsinks (GstRTSPStream * stream, GSocket * rtp_socket,
set_sockets_for_udpsinks (GstElement * udpsink[2], GSocket * rtp_socket,
GSocket * rtcp_socket, GSocketFamily family)
{
GstRTSPStreamPrivate *priv = stream->priv;
const gchar *multisink_socket;
if (family == G_SOCKET_FAMILY_IPV6)
@ -1097,36 +1093,21 @@ set_sockets_for_udpsinks (GstRTSPStream * stream, GSocket * rtp_socket,
else
multisink_socket = "socket";
g_object_set (G_OBJECT (priv->udpsink[0]), multisink_socket, rtp_socket,
NULL);
g_object_set (G_OBJECT (priv->udpsink[1]), multisink_socket, rtcp_socket,
NULL);
g_object_set (G_OBJECT (udpsink[0]), multisink_socket, rtp_socket, NULL);
g_object_set (G_OBJECT (udpsink[1]), multisink_socket, rtcp_socket, NULL);
}
/* must be called with lock */
static gboolean
create_and_configure_udpsinks (GstRTSPStream * stream)
create_and_configure_udpsinks (GstRTSPStream * stream, GstElement * udpsink[2],
const gchar * multicast_iface)
{
GstRTSPStreamPrivate *priv = stream->priv;
GstElement *udpsink0, *udpsink1;
udpsink0 = NULL;
udpsink1 = NULL;
udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
if (priv->udpsink[0])
udpsink0 = priv->udpsink[0];
else
udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
if (!udpsink0)
goto no_udp_protocol;
if (priv->udpsink[1])
udpsink1 = priv->udpsink[1];
else
udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
if (!udpsink1)
if (!udpsink0 || !udpsink1)
goto no_udp_protocol;
/* configure sinks */
@ -1147,17 +1128,23 @@ create_and_configure_udpsinks (GstRTSPStream * stream)
g_object_set (G_OBJECT (udpsink0), "async", FALSE, NULL);
g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
/* join multicast group when adding clients, so we'll start receiving from it.
* We cannot rely on the udpsrc to join the group since its socket is always a
* local unicast one. */
g_object_set (G_OBJECT (udpsink0), "auto-multicast", TRUE, NULL);
g_object_set (G_OBJECT (udpsink1), "auto-multicast", TRUE, NULL);
g_object_set (G_OBJECT (udpsink0), "multicast-iface", multicast_iface, NULL);
g_object_set (G_OBJECT (udpsink1), "multicast-iface", multicast_iface, NULL);
g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
/* update the dscp qos field in the sinks */
update_dscp_qos (stream);
udpsink[0] = udpsink0;
udpsink[1] = udpsink1;
priv->udpsink[0] = udpsink0;
priv->udpsink[1] = udpsink1;
/* update the dscp qos field in the sinks */
update_dscp_qos (stream, udpsink);
return TRUE;
@ -1168,55 +1155,10 @@ no_udp_protocol:
}
}
/* must be called with lock */
static void
play_udpsources_one_family (GstRTSPStream * stream, GstElement * udpsrc_out[2],
GSocketFamily family)
{
GstRTSPStreamPrivate *priv;
GstPad *pad, *selpad;
guint i;
priv = stream->priv;
for (i = 0; i < 2; i++) {
if (!priv->sinkpad && i == 0) {
/* Only connect recv RTP sink if we expect to receive RTP. Connect recv
* RTCP sink always */
continue;
}
if (priv->srcpad) {
/* we set and keep these to playing so that they don't cause NO_PREROLL return
* values. This is only relevant for PLAY pipelines */
gst_element_set_state (udpsrc_out[i], GST_STATE_PLAYING);
gst_element_set_locked_state (udpsrc_out[i], TRUE);
}
/* add udpsrc */
gst_bin_add (priv->joined_bin, udpsrc_out[i]);
/* and link to the funnel */
selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
pad = gst_element_get_static_pad (udpsrc_out[i], "src");
gst_pad_link (pad, selpad);
gst_object_unref (pad);
gst_object_unref (selpad);
/* otherwise sync state with parent in case it's running already
* at this point */
if (!priv->srcpad) {
gst_element_sync_state_with_parent (udpsrc_out[i]);
}
}
}
/* must be called with lock */
static gboolean
create_and_configure_udpsources_one_family (GstElement * udpsrc_out[2],
GSocket * rtp_socket, GSocket * rtcp_socket, GSocketFamily family,
const gchar * address, gint rtpport, gint rtcpport,
const gchar * multicast_iface, GstRTSPLowerTrans transport)
create_and_configure_udpsources (GstElement * udpsrc_out[2],
GSocket * rtp_socket, GSocket * rtcp_socket)
{
GstStateChangeReturn ret;
@ -1226,22 +1168,17 @@ create_and_configure_udpsources_one_family (GstElement * udpsrc_out[2],
if (udpsrc_out[0] == NULL || udpsrc_out[1] == NULL)
goto error;
if (transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
g_object_set (G_OBJECT (udpsrc_out[0]), "address", address, NULL);
g_object_set (G_OBJECT (udpsrc_out[1]), "address", address, NULL);
g_object_set (G_OBJECT (udpsrc_out[0]), "port", rtpport, NULL);
g_object_set (G_OBJECT (udpsrc_out[1]), "port", rtcpport, NULL);
g_object_set (G_OBJECT (udpsrc_out[0]), "multicast-iface", multicast_iface,
NULL);
g_object_set (G_OBJECT (udpsrc_out[1]), "multicast-iface", multicast_iface,
NULL);
g_object_set (G_OBJECT (udpsrc_out[0]), "loop", FALSE, NULL);
g_object_set (G_OBJECT (udpsrc_out[1]), "loop", FALSE, NULL);
}
g_object_set (G_OBJECT (udpsrc_out[0]), "socket", rtp_socket, NULL);
g_object_set (G_OBJECT (udpsrc_out[1]), "socket", rtcp_socket, NULL);
/* The udpsrc cannot do the join because its socket is always a local unicast
* one. The udpsink sharing the same socket will do it for us. */
g_object_set (G_OBJECT (udpsrc_out[0]), "auto-multicast", FALSE, NULL);
g_object_set (G_OBJECT (udpsrc_out[1]), "auto-multicast", FALSE, NULL);
g_object_set (G_OBJECT (udpsrc_out[0]), "loop", FALSE, NULL);
g_object_set (G_OBJECT (udpsrc_out[1]), "loop", FALSE, NULL);
ret = gst_element_set_state (udpsrc_out[0], GST_STATE_READY);
if (ret == GST_STATE_CHANGE_FAILURE)
goto error;
@ -1268,9 +1205,8 @@ error:
static gboolean
alloc_ports_one_family (GstRTSPStream * stream, GSocketFamily family,
GstElement * udpsrc_out[2], GstRTSPRange * server_port_out,
GstRTSPTransport * ct, GstRTSPAddress ** server_addr_out,
gboolean use_client_settings)
GstElement * udpsrc_out[2], GstElement * udpsink_out[2],
GstRTSPRange * server_port_out, GstRTSPAddress ** server_addr_out)
{
GstRTSPStreamPrivate *priv = stream->priv;
GSocket *rtp_socket = NULL;
@ -1285,12 +1221,15 @@ alloc_ports_one_family (GstRTSPStream * stream, GSocketFamily family,
GSocketAddress *rtp_sockaddr = NULL;
GSocketAddress *rtcp_sockaddr = NULL;
GstRTSPAddressPool *pool;
GstRTSPLowerTrans transport;
const gchar *multicast_iface = priv->multicast_iface;
g_assert (!udpsrc_out[0]);
g_assert (!udpsrc_out[1]);
g_assert ((!udpsink_out[0] && !udpsink_out[1]) ||
(udpsink_out[0] && udpsink_out[1]));
g_assert (*server_addr_out == NULL);
pool = priv->pool;
count = 0;
transport = ct->lower_transport;
/* Start with random port */
tmp_rtp = 0;
@ -1301,9 +1240,6 @@ alloc_ports_one_family (GstRTSPStream * stream, GSocketFamily family,
goto no_udp_protocol;
g_socket_set_multicast_loopback (rtcp_socket, FALSE);
if (*server_addr_out)
gst_rtsp_address_free (*server_addr_out);
/* try to allocate 2 UDP ports, the RTP port should be an even
* number and the RTCP port should be the next (uneven) port */
again:
@ -1316,30 +1252,19 @@ again:
g_socket_set_multicast_loopback (rtp_socket, FALSE);
}
if (pool && ((transport == GST_RTSP_LOWER_TRANS_UDP &&
gst_rtsp_address_pool_has_unicast_addresses (pool))
|| transport == GST_RTSP_LOWER_TRANS_UDP_MCAST)) {
GstRTSPAddressFlags flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT;
if (transport == GST_RTSP_LOWER_TRANS_UDP)
flags |= GST_RTSP_ADDRESS_FLAG_UNICAST;
else
flags |= GST_RTSP_ADDRESS_FLAG_MULTICAST;
if (pool && gst_rtsp_address_pool_has_unicast_addresses (pool)) {
GstRTSPAddressFlags flags;
if (addr)
rejected_addresses = g_list_prepend (rejected_addresses, addr);
flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_UNICAST;
if (family == G_SOCKET_FAMILY_IPV6)
flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
else
flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
if (ct->destination && transport == GST_RTSP_LOWER_TRANS_UDP_MCAST
&& use_client_settings)
gst_rtsp_address_pool_reserve_address (pool, ct->destination,
ct->port.min, 2, ct->ttl, &addr);
else
addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
if (addr == NULL)
goto no_ports;
@ -1348,14 +1273,6 @@ again:
g_clear_object (&inetaddr);
inetaddr = g_inet_address_new_from_string (addr->address);
/* If we're supposed to bind to a multicast address, instead bind
* to ANY and let udpsrc later join the relevant multicast group
*/
if (g_inet_address_get_is_multicast (inetaddr)) {
g_object_unref (inetaddr);
inetaddr = g_inet_address_new_any (family);
}
} else {
if (tmp_rtp != 0) {
tmp_rtp += 2;
@ -1409,9 +1326,7 @@ again:
addr_str = addr->address;
g_clear_object (&inetaddr);
if (!create_and_configure_udpsources_one_family (udpsrc_out, rtp_socket,
rtcp_socket, family, addr_str, tmp_rtp, tmp_rtcp, multicast_iface,
transport)) {
if (!create_and_configure_udpsources (udpsrc_out, rtp_socket, rtcp_socket)) {
if (addr == NULL)
g_free (addr_str);
goto no_udp_protocol;
@ -1420,8 +1335,6 @@ again:
if (addr == NULL)
g_free (addr_str);
play_udpsources_one_family (stream, udpsrc_out, family);
g_object_get (G_OBJECT (udpsrc_out[0]), "port", &rtpport, NULL);
g_object_get (G_OBJECT (udpsrc_out[1]), "port", &rtcpport, NULL);
@ -1429,12 +1342,17 @@ again:
if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
goto port_error;
/* set RTP and RTCP sockets */
set_sockets_for_udpsinks (stream, rtp_socket, rtcp_socket, family);
server_port_out->min = rtpport;
server_port_out->max = rtcpport;
/* This function is called twice (for v4 and v6) but we create only one pair
* of udpsinks. */
if (!udpsink_out[0]
&& !create_and_configure_udpsinks (stream, udpsink_out, NULL))
goto no_udp_protocol;
set_sockets_for_udpsinks (udpsink_out, rtp_socket, rtcp_socket, family);
*server_addr_out = addr;
g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
@ -1485,60 +1403,14 @@ cleanup:
* Allocates RTP and RTCP ports.
*
* Returns: %TRUE if the RTP and RTCP sockets have been succeccully allocated.
* Deprecated: This function shouldn't have been made public
*/
gboolean
gst_rtsp_stream_allocate_udp_sockets (GstRTSPStream * stream,
GSocketFamily family, GstRTSPTransport * ct, gboolean use_client_settings)
{
GstRTSPStreamPrivate *priv;
gboolean result = FALSE;
GstRTSPLowerTrans transport = ct->lower_transport;
g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
priv = stream->priv;
g_return_val_if_fail (priv->joined_bin != NULL, FALSE);
g_mutex_lock (&priv->lock);
if (family == G_SOCKET_FAMILY_IPV4) {
if (transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
if (priv->have_ipv4_mcast)
goto done;
priv->have_ipv4_mcast =
alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4,
priv->udpsrc_mcast_v4, &priv->server_port_v4, ct,
&priv->mcast_addr_v4, use_client_settings);
} else {
priv->have_ipv4 =
alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4, priv->udpsrc_v4,
&priv->server_port_v4, ct, &priv->server_addr_v4,
use_client_settings);
}
} else {
if (transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
if (priv->have_ipv6_mcast)
goto done;
priv->have_ipv6_mcast =
alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6,
priv->udpsrc_mcast_v6, &priv->server_port_v6, ct,
&priv->mcast_addr_v6, use_client_settings);
} else {
if (priv->have_ipv6)
goto done;
priv->have_ipv6 =
alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6, priv->udpsrc_v6,
&priv->server_port_v6, ct, &priv->server_addr_v6,
use_client_settings);
}
}
done:
result = priv->have_ipv4 || priv->have_ipv4_mcast || priv->have_ipv6 ||
priv->have_ipv6_mcast;
g_mutex_unlock (&priv->lock);
return result;
g_warn_if_reached ();
return FALSE;
}
/**
@ -1589,6 +1461,27 @@ gst_rtsp_stream_is_client_side (GstRTSPStream * stream)
return ret;
}
/* must be called with lock */
static gboolean
alloc_ports (GstRTSPStream * stream)
{
GstRTSPStreamPrivate *priv = stream->priv;
gboolean ret = TRUE;
if ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
(priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST)) {
ret = alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4,
priv->udpsrc_v4, priv->udpsink,
&priv->server_port_v4, &priv->server_addr_v4);
ret |= alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6,
priv->udpsrc_v6, priv->udpsink,
&priv->server_port_v6, &priv->server_addr_v6);
}
return ret;
}
/**
* gst_rtsp_stream_get_server_port:
* @stream: a #GstRTSPStream
@ -2463,7 +2356,7 @@ plug_sink (GstBin * bin, GstElement * tee, GstElement * sink,
}
/* must be called with lock */
static gboolean
static void
create_sender_part (GstRTSPStream * stream, GstBin * bin, GstState state)
{
GstRTSPStreamPrivate *priv;
@ -2477,9 +2370,6 @@ create_sender_part (GstRTSPStream * stream, GstBin * bin, GstState state)
is_udp = ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
(priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST));
if (is_udp && !create_and_configure_udpsinks (stream))
goto no_udp_protocol;
for (i = 0; i < 2; i++) {
/* For the sender we create this bit of pipeline for both
* RTP and RTCP. Sync and preroll are enabled on udpsink so
@ -2515,9 +2405,10 @@ create_sender_part (GstRTSPStream * stream, GstBin * bin, GstState state)
&sink_cb, stream, NULL);
}
if (is_udp && is_tcp) {
g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
/* If we have udp always use a tee because we could have mcast clients
* requesting different ports, in which case we'll have to plug more
* udpsinks. */
if (is_udp) {
/* make tee for RTP/RTCP */
priv->tee[i] = gst_element_factory_make ("tee", NULL);
gst_bin_add (bin, priv->tee[i]);
@ -2528,7 +2419,11 @@ create_sender_part (GstRTSPStream * stream, GstBin * bin, GstState state)
gst_object_unref (pad);
plug_sink (bin, priv->tee[i], priv->udpsink[i], &priv->udpqueue[i]);
plug_sink (bin, priv->tee[i], priv->appsink[i], &priv->appqueue[i]);
if (is_tcp) {
g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
plug_sink (bin, priv->tee[i], priv->appsink[i], &priv->appqueue[i]);
}
} else if (is_tcp) {
/* only appsink needed, link it to the session */
pad = gst_element_get_static_pad (priv->appsink[i], "sink");
@ -2540,11 +2435,6 @@ create_sender_part (GstRTSPStream * stream, GstBin * bin, GstState state)
* sink used for RTCP data, not the RTP data. */
if (i == 1)
g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
} else {
/* else only udpsink needed, link it to the session */
pad = gst_element_get_static_pad (priv->udpsink[i], "sink");
gst_pad_link (priv->send_src[i], pad);
gst_object_unref (pad);
}
/* check if we need to set to a special state */
@ -2561,14 +2451,6 @@ create_sender_part (GstRTSPStream * stream, GstBin * bin, GstState state)
gst_element_set_state (priv->tee[i], state);
}
}
return TRUE;
/* ERRORS */
no_udp_protocol:
{
return FALSE;
}
}
/* must be called with lock */
@ -2663,6 +2545,153 @@ create_receiver_part (GstRTSPStream * stream, GstBin * bin, GstState state)
}
}
static gboolean
create_mcast_part_for_transport (GstRTSPStream * stream,
const GstRTSPTransport * tr)
{
GstRTSPStreamPrivate *priv = stream->priv;
GInetAddress *inetaddr;
GSocketFamily family;
GstRTSPAddress *mcast_addr;
GstElement **mcast_udpsrc;
GSocket *rtp_socket = NULL;
GSocket *rtcp_socket = NULL;
GSocketAddress *rtp_sockaddr = NULL;
GSocketAddress *rtcp_sockaddr = NULL;
GError *error = NULL;
const gchar *multicast_iface = priv->multicast_iface;
/* Check if it's a ipv4 or ipv6 transport */
inetaddr = g_inet_address_new_from_string (tr->destination);
family = g_inet_address_get_family (inetaddr);
g_object_unref (inetaddr);
/* Select fields corresponding to the family */
if (family == G_SOCKET_FAMILY_IPV4) {
mcast_addr = priv->mcast_addr_v4;
mcast_udpsrc = priv->mcast_udpsrc_v4;
} else {
mcast_addr = priv->mcast_addr_v6;
mcast_udpsrc = priv->mcast_udpsrc_v6;
}
/* We support only one mcast group per family, make sure this transport
* matches it. */
if (!mcast_addr)
goto no_addr;
if (!g_str_equal (tr->destination, mcast_addr->address) ||
tr->port.min != mcast_addr->port ||
tr->port.max != mcast_addr->port + mcast_addr->n_ports - 1 ||
tr->ttl != mcast_addr->ttl)
goto wrong_addr;
if (mcast_udpsrc[0]) {
/* We already created elements for this family. Since we support only one
* mcast group per family, there is nothing more to do here. */
g_assert (mcast_udpsrc[1]);
g_assert (priv->mcast_udpqueue[0]);
g_assert (priv->mcast_udpqueue[1]);
g_assert (priv->mcast_udpsink[0]);
g_assert (priv->mcast_udpsink[1]);
return TRUE;
}
g_assert (!mcast_udpsrc[1]);
/* Create RTP/RTCP sockets and bind them on ANY with mcast ports */
rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
G_SOCKET_PROTOCOL_UDP, &error);
if (!rtp_socket)
goto socket_error;
g_socket_set_multicast_loopback (rtp_socket, FALSE);
rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
G_SOCKET_PROTOCOL_UDP, &error);
if (!rtcp_socket)
goto socket_error;
g_socket_set_multicast_loopback (rtcp_socket, FALSE);
inetaddr = g_inet_address_new_any (family);
rtp_sockaddr = g_inet_socket_address_new (inetaddr, mcast_addr->port);
rtcp_sockaddr = g_inet_socket_address_new (inetaddr, mcast_addr->port + 1);
g_object_unref (inetaddr);
if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, &error))
goto socket_error;
if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, &error))
goto socket_error;
g_object_unref (rtp_sockaddr);
g_object_unref (rtcp_sockaddr);
/* Add receiver part */
create_and_configure_udpsources (mcast_udpsrc, rtp_socket, rtcp_socket);
if (priv->srcpad) {
plug_src (stream, priv->joined_bin, mcast_udpsrc[0], priv->funnel[0]);
gst_element_sync_state_with_parent (mcast_udpsrc[0]);
}
plug_src (stream, priv->joined_bin, mcast_udpsrc[1], priv->funnel[1]);
gst_element_sync_state_with_parent (mcast_udpsrc[1]);
/* Add sender part, could already have been created for the other family. */
if (!priv->mcast_udpsink[0]) {
g_assert (!priv->mcast_udpsink[1]);
g_assert (!priv->mcast_udpqueue[0]);
g_assert (!priv->mcast_udpqueue[1]);
create_and_configure_udpsinks (stream, priv->mcast_udpsink,
multicast_iface);
set_sockets_for_udpsinks (priv->mcast_udpsink, rtp_socket, rtcp_socket,
family);
if (priv->sinkpad) {
plug_sink (priv->joined_bin, priv->tee[0], priv->mcast_udpsink[0],
&priv->mcast_udpqueue[0]);
gst_element_sync_state_with_parent (priv->mcast_udpsink[0]);
gst_element_sync_state_with_parent (priv->mcast_udpqueue[0]);
}
plug_sink (priv->joined_bin, priv->tee[1], priv->mcast_udpsink[1],
&priv->mcast_udpqueue[1]);
gst_element_sync_state_with_parent (priv->mcast_udpsink[1]);
gst_element_sync_state_with_parent (priv->mcast_udpqueue[1]);
} else {
g_assert (priv->mcast_udpsink[1]);
g_assert (priv->mcast_udpqueue[0]);
g_assert (priv->mcast_udpqueue[1]);
set_sockets_for_udpsinks (priv->mcast_udpsink, rtp_socket, rtcp_socket,
family);
}
return TRUE;
no_addr:
{
GST_WARNING_OBJECT (stream, "Adding mcast transport, but no mcast address "
"has been reserved");
return FALSE;
}
wrong_addr:
{
GST_WARNING_OBJECT (stream, "Adding mcast transport, but it doesn't match "
"the reserved address");
return FALSE;
}
socket_error:
{
GST_ERROR_OBJECT (stream, "Error creating and binding mcast socket: %s",
error->message);
g_clear_object (&rtp_socket);
g_clear_object (&rtcp_socket);
g_clear_object (&rtp_sockaddr);
g_clear_object (&rtcp_sockaddr);
g_clear_error (&error);
return FALSE;
}
}
/**
* gst_rtsp_stream_join_bin:
* @stream: a #GstRTSPStream
@ -2701,6 +2730,9 @@ gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
GST_INFO ("stream %p joining bin as session %u", stream, idx);
if (!alloc_ports (stream))
goto no_ports;
if (priv->profiles & GST_RTSP_PROFILE_SAVP
|| priv->profiles & GST_RTSP_PROFILE_SAVPF) {
/* For SRTP */
@ -2776,9 +2808,7 @@ gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
g_signal_connect (priv->session, "on-sender-ssrc-active",
(GCallback) on_sender_ssrc_active, stream);
if (!create_sender_part (stream, bin, state))
goto no_udp_protocol;
create_sender_part (stream, bin, state);
create_receiver_part (stream, bin, state);
if (priv->srcpad) {
@ -2798,6 +2828,12 @@ was_joined:
g_mutex_unlock (&priv->lock);
return TRUE;
}
no_ports:
{
g_mutex_unlock (&priv->lock);
GST_WARNING ("failed to allocate ports %u", idx);
return FALSE;
}
link_failed:
{
GST_WARNING ("failed to link stream %u", idx);
@ -2806,66 +2842,6 @@ link_failed:
g_mutex_unlock (&priv->lock);
return FALSE;
}
no_udp_protocol:
{
GST_WARNING ("failed to allocate ports %u", idx);
gst_object_unref (priv->send_rtp_sink);
priv->send_rtp_sink = NULL;
gst_object_unref (priv->send_src[0]);
priv->send_src[0] = NULL;
gst_object_unref (priv->send_src[1]);
priv->send_src[1] = NULL;
gst_object_unref (priv->recv_sink[0]);
priv->recv_sink[0] = NULL;
gst_object_unref (priv->recv_sink[1]);
priv->recv_sink[1] = NULL;
if (priv->udpsink[0])
gst_element_set_state (priv->udpsink[0], GST_STATE_NULL);
if (priv->udpsink[1])
gst_element_set_state (priv->udpsink[1], GST_STATE_NULL);
if (priv->udpsrc_v4[0]) {
gst_element_set_state (priv->udpsrc_v4[0], GST_STATE_NULL);
gst_object_unref (priv->udpsrc_v4[0]);
priv->udpsrc_v4[0] = NULL;
}
if (priv->udpsrc_v4[1]) {
gst_element_set_state (priv->udpsrc_v4[1], GST_STATE_NULL);
gst_object_unref (priv->udpsrc_v4[1]);
priv->udpsrc_v4[1] = NULL;
}
if (priv->udpsrc_mcast_v4[0]) {
gst_element_set_state (priv->udpsrc_mcast_v4[0], GST_STATE_NULL);
gst_object_unref (priv->udpsrc_mcast_v4[0]);
priv->udpsrc_mcast_v4[0] = NULL;
}
if (priv->udpsrc_mcast_v4[1]) {
gst_element_set_state (priv->udpsrc_mcast_v4[1], GST_STATE_NULL);
gst_object_unref (priv->udpsrc_mcast_v4[1]);
priv->udpsrc_mcast_v4[1] = NULL;
}
if (priv->udpsrc_v6[0]) {
gst_element_set_state (priv->udpsrc_v6[0], GST_STATE_NULL);
gst_object_unref (priv->udpsrc_v6[0]);
priv->udpsrc_v6[0] = NULL;
}
if (priv->udpsrc_v6[1]) {
gst_element_set_state (priv->udpsrc_v6[1], GST_STATE_NULL);
gst_object_unref (priv->udpsrc_v6[1]);
priv->udpsrc_v6[1] = NULL;
}
if (priv->udpsrc_mcast_v6[0]) {
gst_element_set_state (priv->udpsrc_mcast_v6[0], GST_STATE_NULL);
gst_object_unref (priv->udpsrc_mcast_v6[0]);
priv->udpsrc_mcast_v6[0] = NULL;
}
if (priv->udpsrc_mcast_v6[1]) {
gst_element_set_state (priv->udpsrc_mcast_v6[1], GST_STATE_NULL);
gst_object_unref (priv->udpsrc_mcast_v6[1]);
priv->udpsrc_mcast_v6[1] = NULL;
}
g_mutex_unlock (&priv->lock);
return FALSE;
}
}
static void
@ -2936,17 +2912,22 @@ gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
}
for (i = 0; i < 2; i++) {
clear_element (bin, &priv->udpsink[i]);
clear_element (bin, &priv->appsink[i]);
clear_element (bin, &priv->appqueue[i]);
clear_element (bin, &priv->udpqueue[i]);
clear_element (bin, &priv->tee[i]);
clear_element (bin, &priv->funnel[i]);
clear_element (bin, &priv->appsrc[i]);
clear_element (bin, &priv->udpsrc_v4[i]);
clear_element (bin, &priv->udpsrc_v6[i]);
clear_element (bin, &priv->udpsrc_mcast_v4[i]);
clear_element (bin, &priv->udpsrc_mcast_v6[i]);
clear_element (bin, &priv->udpqueue[i]);
clear_element (bin, &priv->udpsink[i]);
clear_element (bin, &priv->mcast_udpsrc_v4[i]);
clear_element (bin, &priv->mcast_udpsrc_v6[i]);
clear_element (bin, &priv->mcast_udpqueue[i]);
clear_element (bin, &priv->mcast_udpsink[i]);
clear_element (bin, &priv->appsrc[i]);
clear_element (bin, &priv->appqueue[i]);
clear_element (bin, &priv->appsink[i]);
clear_element (bin, &priv->tee[i]);
clear_element (bin, &priv->funnel[i]);
if (priv->sinkpad || i == 1) {
gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
@ -3332,6 +3313,18 @@ update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
switch (tr->lower_transport) {
case GST_RTSP_LOWER_TRANS_UDP_MCAST:
{
if (add) {
if (!create_mcast_part_for_transport (stream, tr))
goto mcast_error;
priv->transports = g_list_prepend (priv->transports, trans);
} else {
priv->transports = g_list_remove (priv->transports, trans);
/* FIXME: Check if there are remaining mcast transports, and destroy
* mcast part if its now unused */
}
break;
}
case GST_RTSP_LOWER_TRANS_UDP:
{
gchar *dest;
@ -3393,6 +3386,10 @@ unknown_transport:
GST_INFO ("Unknown transport %d", tr->lower_transport);
return FALSE;
}
mcast_error:
{
return FALSE;
}
}

View file

@ -33,7 +33,6 @@ GST_START_TEST (test_get_sockets)
GSocket *socket;
gboolean have_ipv4;
gboolean have_ipv6;
GstRTSPTransport *tr;
srcpad = gst_pad_new ("testsrcpad", GST_PAD_SRC);
fail_unless (srcpad != NULL);
@ -62,11 +61,6 @@ GST_START_TEST (test_get_sockets)
fail_unless (gst_rtsp_stream_join_bin (stream, bin, rtpbin, GST_STATE_NULL));
gst_rtsp_transport_new (&tr);
tr->lower_transport = GST_RTSP_LOWER_TRANS_UDP;
fail_unless (gst_rtsp_stream_allocate_udp_sockets (stream,
G_SOCKET_FAMILY_IPV4, tr, FALSE));
socket = gst_rtsp_stream_get_rtp_socket (stream, G_SOCKET_FAMILY_IPV4);
have_ipv4 = (socket != NULL);
if (have_ipv4) {
@ -102,7 +96,6 @@ GST_START_TEST (test_get_sockets)
/* check that at least one family is available */
fail_unless (have_ipv4 || have_ipv6);
gst_rtsp_transport_free (tr);
g_object_unref (pool);
fail_unless (gst_rtsp_stream_leave_bin (stream, bin, rtpbin));
@ -121,7 +114,6 @@ GST_START_TEST (test_allocate_udp_ports_fail)
GstBin *bin;
GstElement *rtpbin;
GstRTSPAddressPool *pool;
GstRTSPTransport *tr;
srcpad = gst_pad_new ("testsrcpad", GST_PAD_SRC);
fail_unless (srcpad != NULL);
@ -143,14 +135,8 @@ GST_START_TEST (test_allocate_udp_ports_fail)
"192.168.1.1", 6000, 6001, 0));
gst_rtsp_stream_set_address_pool (stream, pool);
fail_unless (gst_rtsp_stream_join_bin (stream, bin, rtpbin, GST_STATE_NULL));
fail_if (gst_rtsp_stream_join_bin (stream, bin, rtpbin, GST_STATE_NULL));
gst_rtsp_transport_new (&tr);
tr->lower_transport = GST_RTSP_LOWER_TRANS_UDP;
fail_if (gst_rtsp_stream_allocate_udp_sockets (stream, G_SOCKET_FAMILY_IPV4,
tr, FALSE));
gst_rtsp_transport_free (tr);
g_object_unref (pool);
fail_unless (gst_rtsp_stream_leave_bin (stream, bin, rtpbin));
gst_object_unref (bin);
@ -257,13 +243,6 @@ GST_START_TEST (test_multicast_address_and_unicast_udp)
fail_unless (gst_rtsp_stream_join_bin (stream, bin, rtpbin, GST_STATE_NULL));
gst_rtsp_transport_new (&tr);
/* unicast udp */
tr->lower_transport = GST_RTSP_LOWER_TRANS_UDP;
fail_unless (gst_rtsp_stream_allocate_udp_sockets (stream,
G_SOCKET_FAMILY_IPV4, tr, FALSE));
gst_rtsp_transport_free (tr);
g_object_unref (pool);
fail_unless (gst_rtsp_stream_leave_bin (stream, bin, rtpbin));
gst_object_unref (bin);
@ -280,7 +259,6 @@ GST_START_TEST (test_allocate_udp_ports_multicast)
GstBin *bin;
GstElement *rtpbin;
GstRTSPAddressPool *pool;
GstRTSPTransport *tr;
GstRTSPAddress *addr;
srcpad = gst_pad_new ("testsrcpad", GST_PAD_SRC);
@ -308,12 +286,6 @@ GST_START_TEST (test_allocate_udp_ports_multicast)
fail_unless (gst_rtsp_stream_join_bin (stream, bin, rtpbin, GST_STATE_NULL));
/* allocate udp multicast ports for IPv4 */
gst_rtsp_transport_new (&tr);
tr->lower_transport = GST_RTSP_LOWER_TRANS_UDP_MCAST;
fail_unless (gst_rtsp_stream_allocate_udp_sockets (stream,
G_SOCKET_FAMILY_IPV4, tr, FALSE));
/* check the multicast address and ports for IPv4 */
addr = gst_rtsp_stream_get_multicast_address (stream, G_SOCKET_FAMILY_IPV4);
fail_unless (addr != NULL);
@ -322,10 +294,6 @@ GST_START_TEST (test_allocate_udp_ports_multicast)
fail_unless_equals_int (addr->n_ports, 2);
gst_rtsp_address_free (addr);
/* allocate udp multicast ports for IPv6 */
fail_unless (gst_rtsp_stream_allocate_udp_sockets (stream,
G_SOCKET_FAMILY_IPV6, tr, FALSE));
/* check the multicast address and ports for IPv6 */
addr = gst_rtsp_stream_get_multicast_address (stream, G_SOCKET_FAMILY_IPV6);
fail_unless (addr != NULL);
@ -334,7 +302,6 @@ GST_START_TEST (test_allocate_udp_ports_multicast)
fail_unless_equals_int (addr->n_ports, 2);
gst_rtsp_address_free (addr);
gst_rtsp_transport_free (tr);
g_object_unref (pool);
fail_unless (gst_rtsp_stream_leave_bin (stream, bin, rtpbin));
gst_object_unref (bin);
@ -351,7 +318,6 @@ GST_START_TEST (test_allocate_udp_ports_client_settings)
GstBin *bin;
GstElement *rtpbin;
GstRTSPAddressPool *pool;
GstRTSPTransport *tr;
GstRTSPAddress *addr;
srcpad = gst_pad_new ("testsrcpad", GST_PAD_SRC);
@ -384,14 +350,10 @@ GST_START_TEST (test_allocate_udp_ports_client_settings)
fail_unless (gst_rtsp_stream_join_bin (stream, bin, rtpbin, GST_STATE_NULL));
/* client transport settings for IPv4 */
gst_rtsp_transport_new (&tr);
tr->destination = g_strdup ("233.252.0.2");
tr->port.min = 6002;
tr->port.max = 6003;
tr->lower_transport = GST_RTSP_LOWER_TRANS_UDP_MCAST;
fail_unless (gst_rtsp_stream_allocate_udp_sockets (stream,
G_SOCKET_FAMILY_IPV4, tr, FALSE));
/* Reserve IPV4 mcast address */
addr = gst_rtsp_stream_reserve_address (stream, "233.252.0.2", 6002, 2, 1);
fail_unless (addr != NULL);
gst_rtsp_address_free (addr);
/* verify that the multicast address and ports correspond to the requested client
* transport information for IPv4 */
@ -402,13 +364,10 @@ GST_START_TEST (test_allocate_udp_ports_client_settings)
fail_unless_equals_int (addr->n_ports, 2);
gst_rtsp_address_free (addr);
/* client transport settings for IPv6 */
g_free (tr->destination);
tr->destination = g_strdup ("FF11:DB8::1");
tr->port.min = 6006;
tr->port.max = 6007;
fail_unless (gst_rtsp_stream_allocate_udp_sockets (stream,
G_SOCKET_FAMILY_IPV6, tr, FALSE));
/* Reserve IPV6 mcast address */
addr = gst_rtsp_stream_reserve_address (stream, "FF11:DB8::1", 6006, 2, 1);
fail_unless (addr != NULL);
gst_rtsp_address_free (addr);
/* verify that the multicast address and ports correspond to the requested client
* transport information for IPv6 */
@ -419,7 +378,6 @@ GST_START_TEST (test_allocate_udp_ports_client_settings)
fail_unless_equals_int (addr->n_ports, 2);
gst_rtsp_address_free (addr);
gst_rtsp_transport_free (tr);
g_object_unref (pool);
fail_unless (gst_rtsp_stream_leave_bin (stream, bin, rtpbin));
gst_object_unref (bin);