rtsp-stream: Always create multicast UDP elements if the protocol flag is set

Adding them later will cause deadlocks due to
1) pre-rolling and staying in PAUSED with the unicast/TCP sinks
2) adding the multicast sink
3) waiting for it to get data to preroll again

3) never happens because the queues after the tee are full.
This commit is contained in:
Sebastian Dröge 2016-09-05 18:04:50 +03:00
parent be4b9718e3
commit ca855abae1

View file

@ -124,8 +124,6 @@ struct _GstRTSPStreamPrivate
GstRTSPAddressPool *pool; GstRTSPAddressPool *pool;
/* unicast server addr/port */ /* unicast server addr/port */
GstRTSPRange server_port_v4;
GstRTSPRange server_port_v6;
GstRTSPAddress *server_addr_v4; GstRTSPAddress *server_addr_v4;
GstRTSPAddress *server_addr_v6; GstRTSPAddress *server_addr_v6;
@ -915,21 +913,9 @@ gst_rtsp_stream_get_multicast_iface (GstRTSPStream * stream)
return result; return result;
} }
/**
* gst_rtsp_stream_get_multicast_address: static GstRTSPAddress *
* @stream: a #GstRTSPStream gst_rtsp_stream_get_multicast_address_locked (GstRTSPStream * stream,
* @family: the #GSocketFamily
*
* Get the multicast address of @stream for @family. The original
* #GstRTSPAddress is cached and copy is returned, so freeing the return value
* won't release the address from the pool.
*
* Returns: (transfer full) (nullable): the #GstRTSPAddress of @stream
* or %NULL when no address could be allocated. gst_rtsp_address_free()
* after usage.
*/
GstRTSPAddress *
gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
GSocketFamily family) GSocketFamily family)
{ {
GstRTSPStreamPrivate *priv; GstRTSPStreamPrivate *priv;
@ -937,8 +923,6 @@ gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
GstRTSPAddress **addrp; GstRTSPAddress **addrp;
GstRTSPAddressFlags flags; GstRTSPAddressFlags flags;
g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
priv = stream->priv; priv = stream->priv;
if (family == G_SOCKET_FAMILY_IPV6) { if (family == G_SOCKET_FAMILY_IPV6) {
@ -949,7 +933,6 @@ gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
addrp = &priv->mcast_addr_v4; addrp = &priv->mcast_addr_v4;
} }
g_mutex_lock (&priv->lock);
if (*addrp == NULL) { if (*addrp == NULL) {
if (priv->pool == NULL) if (priv->pool == NULL)
goto no_pool; goto no_pool;
@ -967,7 +950,6 @@ gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
* GST_RTSP_ADDRESS_FLAG_UNICAST are givent. */ * GST_RTSP_ADDRESS_FLAG_UNICAST are givent. */
} }
result = gst_rtsp_address_copy (*addrp); result = gst_rtsp_address_copy (*addrp);
g_mutex_unlock (&priv->lock);
return result; return result;
@ -975,17 +957,43 @@ gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
no_pool: no_pool:
{ {
GST_ERROR_OBJECT (stream, "no address pool specified"); GST_ERROR_OBJECT (stream, "no address pool specified");
g_mutex_unlock (&priv->lock);
return NULL; return NULL;
} }
no_address: no_address:
{ {
GST_ERROR_OBJECT (stream, "failed to acquire address from pool"); GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
g_mutex_unlock (&priv->lock);
return NULL; return NULL;
} }
} }
/**
* gst_rtsp_stream_get_multicast_address:
* @stream: a #GstRTSPStream
* @family: the #GSocketFamily
*
* Get the multicast address of @stream for @family. The original
* #GstRTSPAddress is cached and copy is returned, so freeing the return value
* won't release the address from the pool.
*
* Returns: (transfer full) (nullable): the #GstRTSPAddress of @stream
* or %NULL when no address could be allocated. gst_rtsp_address_free()
* after usage.
*/
GstRTSPAddress *
gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
GSocketFamily family)
{
GstRTSPAddress *result;
g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
g_mutex_lock (&stream->priv->lock);
result = gst_rtsp_stream_get_multicast_address_locked (stream, family);
g_mutex_unlock (&stream->priv->lock);
return result;
}
/** /**
* gst_rtsp_stream_reserve_address: * gst_rtsp_stream_reserve_address:
* @stream: a #GstRTSPStream * @stream: a #GstRTSPStream
@ -1202,7 +1210,7 @@ error:
static gboolean static gboolean
alloc_ports_one_family (GstRTSPStream * stream, GSocketFamily family, alloc_ports_one_family (GstRTSPStream * stream, GSocketFamily family,
GstElement * udpsrc_out[2], GstElement * udpsink_out[2], GstElement * udpsrc_out[2], GstElement * udpsink_out[2],
GstRTSPRange * server_port_out, GstRTSPAddress ** server_addr_out) GstRTSPAddress ** server_addr_out, gboolean multicast)
{ {
GstRTSPStreamPrivate *priv = stream->priv; GstRTSPStreamPrivate *priv = stream->priv;
GSocket *rtp_socket = NULL; GSocket *rtp_socket = NULL;
@ -1248,13 +1256,21 @@ again:
g_socket_set_multicast_loopback (rtp_socket, FALSE); g_socket_set_multicast_loopback (rtp_socket, FALSE);
} }
if (pool && gst_rtsp_address_pool_has_unicast_addresses (pool)) { if ((pool && gst_rtsp_address_pool_has_unicast_addresses (pool)) || multicast) {
GstRTSPAddressFlags flags; GstRTSPAddressFlags flags;
if (addr) if (addr)
rejected_addresses = g_list_prepend (rejected_addresses, addr); rejected_addresses = g_list_prepend (rejected_addresses, addr);
flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_UNICAST; if (!pool)
goto no_ports;
flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT;
if (multicast)
flags |= GST_RTSP_ADDRESS_FLAG_MULTICAST;
else
flags |= GST_RTSP_ADDRESS_FLAG_UNICAST;
if (family == G_SOCKET_FAMILY_IPV6) if (family == G_SOCKET_FAMILY_IPV6)
flags |= GST_RTSP_ADDRESS_FLAG_IPV6; flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
else else
@ -1316,21 +1332,20 @@ again:
} }
g_object_unref (rtcp_sockaddr); g_object_unref (rtcp_sockaddr);
if (addr == NULL) if (!addr) {
addr_str = g_inet_address_to_string (inetaddr); addr = g_slice_new0 (GstRTSPAddress);
else addr->address = g_inet_address_to_string (inetaddr);
addr_str = addr->address; addr->port = tmp_rtp;
addr->n_ports = 2;
}
addr_str = addr->address;
g_clear_object (&inetaddr); g_clear_object (&inetaddr);
if (!create_and_configure_udpsources (udpsrc_out, rtp_socket, rtcp_socket)) { if (!create_and_configure_udpsources (udpsrc_out, rtp_socket, rtcp_socket)) {
if (addr == NULL)
g_free (addr_str);
goto no_udp_protocol; goto no_udp_protocol;
} }
if (addr == NULL)
g_free (addr_str);
g_object_get (G_OBJECT (udpsrc_out[0]), "port", &rtpport, NULL); g_object_get (G_OBJECT (udpsrc_out[0]), "port", &rtpport, NULL);
g_object_get (G_OBJECT (udpsrc_out[1]), "port", &rtcpport, NULL); g_object_get (G_OBJECT (udpsrc_out[1]), "port", &rtcpport, NULL);
@ -1338,18 +1353,26 @@ again:
if (rtpport != tmp_rtp || rtcpport != tmp_rtcp) if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
goto port_error; goto port_error;
server_port_out->min = rtpport;
server_port_out->max = rtcpport;
/* This function is called twice (for v4 and v6) but we create only one pair /* This function is called twice (for v4 and v6) but we create only one pair
* of udpsinks. */ * of udpsinks. */
if (!udpsink_out[0] if (!udpsink_out[0]
&& !create_and_configure_udpsinks (stream, udpsink_out)) && !create_and_configure_udpsinks (stream, udpsink_out))
goto no_udp_protocol; goto no_udp_protocol;
if (multicast) {
g_object_set (G_OBJECT (udpsink_out[0]), "multicast-iface",
priv->multicast_iface, NULL);
g_object_set (G_OBJECT (udpsink_out[1]), "multicast-iface",
priv->multicast_iface, NULL);
g_signal_emit_by_name (udpsink_out[0], "add", addr_str, rtpport, NULL);
g_signal_emit_by_name (udpsink_out[1], "add", addr_str, rtcpport, NULL);
}
set_sockets_for_udpsinks (udpsink_out, rtp_socket, rtcp_socket, family); set_sockets_for_udpsinks (udpsink_out, rtp_socket, rtcp_socket, family);
*server_addr_out = addr; *server_addr_out = addr;
g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free); g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
g_object_unref (rtp_socket); g_object_unref (rtp_socket);
@ -1464,15 +1487,21 @@ alloc_ports (GstRTSPStream * stream)
GstRTSPStreamPrivate *priv = stream->priv; GstRTSPStreamPrivate *priv = stream->priv;
gboolean ret = TRUE; gboolean ret = TRUE;
if ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) || if (priv->protocols & GST_RTSP_LOWER_TRANS_UDP) {
(priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST)) {
ret = alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4, ret = alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4,
priv->udpsrc_v4, priv->udpsink, priv->udpsrc_v4, priv->udpsink, &priv->server_addr_v4, FALSE);
&priv->server_port_v4, &priv->server_addr_v4);
ret |= alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6, ret |= alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6,
priv->udpsrc_v6, priv->udpsink, priv->udpsrc_v6, priv->udpsink, &priv->server_addr_v6, FALSE);
&priv->server_port_v6, &priv->server_addr_v6); }
/* FIXME: Maybe actually consider the return values? */
if (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST) {
ret |= alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4,
priv->mcast_udpsrc_v4, priv->mcast_udpsink, &priv->mcast_addr_v4, TRUE);
ret |= alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6,
priv->mcast_udpsrc_v6, priv->mcast_udpsink, &priv->mcast_addr_v6, TRUE);
} }
return ret; return ret;
@ -1499,11 +1528,17 @@ gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
g_mutex_lock (&priv->lock); g_mutex_lock (&priv->lock);
if (family == G_SOCKET_FAMILY_IPV4) { if (family == G_SOCKET_FAMILY_IPV4) {
if (server_port) if (server_port) {
*server_port = priv->server_port_v4; server_port->min = priv->server_addr_v4->port;
server_port->max =
priv->server_addr_v4->port + priv->server_addr_v4->n_ports - 1;
}
} else { } else {
if (server_port) if (server_port) {
*server_port = priv->server_port_v6; server_port->min = priv->server_addr_v6->port;
server_port->max =
priv->server_addr_v6->port + priv->server_addr_v6->n_ports - 1;
}
} }
g_mutex_unlock (&priv->lock); g_mutex_unlock (&priv->lock);
} }
@ -2357,7 +2392,7 @@ create_sender_part (GstRTSPStream * stream, GstBin * bin, GstState state)
{ {
GstRTSPStreamPrivate *priv; GstRTSPStreamPrivate *priv;
GstPad *pad; GstPad *pad;
gboolean is_tcp = FALSE, is_udp = FALSE; gboolean is_tcp, is_udp;
gint i; gint i;
priv = stream->priv; priv = stream->priv;
@ -2414,7 +2449,12 @@ create_sender_part (GstRTSPStream * stream, GstBin * bin, GstState state)
gst_pad_link (priv->send_src[i], pad); gst_pad_link (priv->send_src[i], pad);
gst_object_unref (pad); gst_object_unref (pad);
plug_sink (bin, priv->tee[i], priv->udpsink[i], &priv->udpqueue[i]); if (priv->udpsink[i])
plug_sink (bin, priv->tee[i], priv->udpsink[i], &priv->udpqueue[i]);
if (priv->mcast_udpsink[i])
plug_sink (bin, priv->tee[i], priv->mcast_udpsink[i],
&priv->mcast_udpqueue[i]);
if (is_tcp) { if (is_tcp) {
g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL); g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
@ -2437,12 +2477,16 @@ create_sender_part (GstRTSPStream * stream, GstBin * bin, GstState state)
if (state != GST_STATE_NULL) { if (state != GST_STATE_NULL) {
if (priv->udpsink[i]) if (priv->udpsink[i])
gst_element_set_state (priv->udpsink[i], state); gst_element_set_state (priv->udpsink[i], state);
if (priv->mcast_udpsink[i])
gst_element_set_state (priv->mcast_udpsink[i], state);
if (priv->appsink[i]) if (priv->appsink[i])
gst_element_set_state (priv->appsink[i], state); gst_element_set_state (priv->appsink[i], state);
if (priv->appqueue[i]) if (priv->appqueue[i])
gst_element_set_state (priv->appqueue[i], state); gst_element_set_state (priv->appqueue[i], state);
if (priv->udpqueue[i]) if (priv->udpqueue[i])
gst_element_set_state (priv->udpqueue[i], state); gst_element_set_state (priv->udpqueue[i], state);
if (priv->mcast_udpqueue[i])
gst_element_set_state (priv->mcast_udpqueue[i], state);
if (priv->tee[i]) if (priv->tee[i])
gst_element_set_state (priv->tee[i], state); gst_element_set_state (priv->tee[i], state);
} }
@ -2525,6 +2569,12 @@ create_receiver_part (GstRTSPStream * stream, GstBin * bin, GstState state)
if (priv->udpsrc_v6[i]) if (priv->udpsrc_v6[i])
plug_src (stream, bin, priv->udpsrc_v6[i], priv->funnel[i]); plug_src (stream, bin, priv->udpsrc_v6[i], priv->funnel[i]);
if (priv->mcast_udpsrc_v4[i])
plug_src (stream, bin, priv->mcast_udpsrc_v4[i], priv->funnel[i]);
if (priv->mcast_udpsrc_v6[i])
plug_src (stream, bin, priv->mcast_udpsrc_v6[i], priv->funnel[i]);
if (is_tcp) { if (is_tcp) {
/* make and add appsrc */ /* make and add appsrc */
priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL); priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
@ -2542,20 +2592,13 @@ create_receiver_part (GstRTSPStream * stream, GstBin * bin, GstState state)
} }
static gboolean static gboolean
create_mcast_part_for_transport (GstRTSPStream * stream, check_mcast_part_for_transport (GstRTSPStream * stream,
const GstRTSPTransport * tr) const GstRTSPTransport * tr)
{ {
GstRTSPStreamPrivate *priv = stream->priv; GstRTSPStreamPrivate *priv = stream->priv;
GInetAddress *inetaddr; GInetAddress *inetaddr;
GSocketFamily family; GSocketFamily family;
GstRTSPAddress *mcast_addr; GstRTSPAddress *mcast_addr;
GstElement **mcast_udpsrc;
GSocket *rtp_socket = NULL;
GSocket *rtcp_socket = NULL;
GSocketAddress *rtp_sockaddr = NULL;
GSocketAddress *rtcp_sockaddr = NULL;
GError *error = NULL;
const gchar *multicast_iface = priv->multicast_iface;
/* Check if it's a ipv4 or ipv6 transport */ /* Check if it's a ipv4 or ipv6 transport */
inetaddr = g_inet_address_new_from_string (tr->destination); inetaddr = g_inet_address_new_from_string (tr->destination);
@ -2565,10 +2608,8 @@ create_mcast_part_for_transport (GstRTSPStream * stream,
/* Select fields corresponding to the family */ /* Select fields corresponding to the family */
if (family == G_SOCKET_FAMILY_IPV4) { if (family == G_SOCKET_FAMILY_IPV4) {
mcast_addr = priv->mcast_addr_v4; mcast_addr = priv->mcast_addr_v4;
mcast_udpsrc = priv->mcast_udpsrc_v4;
} else { } else {
mcast_addr = priv->mcast_addr_v6; mcast_addr = priv->mcast_addr_v6;
mcast_udpsrc = priv->mcast_udpsrc_v6;
} }
/* We support only one mcast group per family, make sure this transport /* We support only one mcast group per family, make sure this transport
@ -2582,95 +2623,6 @@ create_mcast_part_for_transport (GstRTSPStream * stream,
tr->ttl != mcast_addr->ttl) tr->ttl != mcast_addr->ttl)
goto wrong_addr; goto wrong_addr;
if (mcast_udpsrc[0]) {
/* We already created elements for this family. Since we support only one
* mcast group per family, there is nothing more to do here. */
g_assert (mcast_udpsrc[1]);
g_assert (priv->mcast_udpqueue[0]);
g_assert (priv->mcast_udpqueue[1]);
g_assert (priv->mcast_udpsink[0]);
g_assert (priv->mcast_udpsink[1]);
return TRUE;
}
g_assert (!mcast_udpsrc[1]);
/* Create RTP/RTCP sockets and bind them on ANY with mcast ports */
rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
G_SOCKET_PROTOCOL_UDP, &error);
if (!rtp_socket)
goto socket_error;
g_socket_set_multicast_loopback (rtp_socket, FALSE);
rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
G_SOCKET_PROTOCOL_UDP, &error);
if (!rtcp_socket)
goto socket_error;
g_socket_set_multicast_loopback (rtcp_socket, FALSE);
inetaddr = g_inet_address_new_any (family);
rtp_sockaddr = g_inet_socket_address_new (inetaddr, mcast_addr->port);
rtcp_sockaddr = g_inet_socket_address_new (inetaddr, mcast_addr->port + 1);
g_object_unref (inetaddr);
if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, &error))
goto socket_error;
if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, &error))
goto socket_error;
g_object_unref (rtp_sockaddr);
g_object_unref (rtcp_sockaddr);
/* Add receiver part */
create_and_configure_udpsources (mcast_udpsrc, rtp_socket, rtcp_socket);
if (priv->sinkpad) {
plug_src (stream, priv->joined_bin, mcast_udpsrc[0], priv->funnel[0]);
gst_element_sync_state_with_parent (mcast_udpsrc[0]);
}
plug_src (stream, priv->joined_bin, mcast_udpsrc[1], priv->funnel[1]);
gst_element_sync_state_with_parent (mcast_udpsrc[1]);
/* Add sender part, could already have been created for the other family. */
if (!priv->mcast_udpsink[0]) {
g_assert (!priv->mcast_udpsink[1]);
g_assert (!priv->mcast_udpqueue[0]);
g_assert (!priv->mcast_udpqueue[1]);
create_and_configure_udpsinks (stream, priv->mcast_udpsink);
g_object_set (G_OBJECT (priv->mcast_udpsink[0]), "multicast-iface",
multicast_iface, NULL);
g_object_set (G_OBJECT (priv->mcast_udpsink[1]), "multicast-iface",
multicast_iface, NULL);
g_signal_emit_by_name (priv->mcast_udpsink[0], "add", mcast_addr->address,
mcast_addr->port, NULL);
g_signal_emit_by_name (priv->mcast_udpsink[1], "add", mcast_addr->address,
mcast_addr->port + 1, NULL);
set_sockets_for_udpsinks (priv->mcast_udpsink, rtp_socket, rtcp_socket,
family);
if (priv->srcpad) {
plug_sink (priv->joined_bin, priv->tee[0], priv->mcast_udpsink[0],
&priv->mcast_udpqueue[0]);
gst_element_sync_state_with_parent (priv->mcast_udpsink[0]);
gst_element_sync_state_with_parent (priv->mcast_udpqueue[0]);
}
plug_sink (priv->joined_bin, priv->tee[1], priv->mcast_udpsink[1],
&priv->mcast_udpqueue[1]);
gst_element_sync_state_with_parent (priv->mcast_udpsink[1]);
gst_element_sync_state_with_parent (priv->mcast_udpqueue[1]);
} else {
g_assert (priv->mcast_udpsink[1]);
g_assert (priv->mcast_udpqueue[0]);
g_assert (priv->mcast_udpqueue[1]);
set_sockets_for_udpsinks (priv->mcast_udpsink, rtp_socket, rtcp_socket,
family);
}
return TRUE; return TRUE;
no_addr: no_addr:
@ -2685,17 +2637,6 @@ wrong_addr:
"the reserved address"); "the reserved address");
return FALSE; return FALSE;
} }
socket_error:
{
GST_ERROR_OBJECT (stream, "Error creating and binding mcast socket: %s",
error->message);
g_clear_object (&rtp_socket);
g_clear_object (&rtcp_socket);
g_clear_object (&rtp_sockaddr);
g_clear_object (&rtcp_sockaddr);
g_clear_error (&error);
return FALSE;
}
} }
/** /**
@ -2962,6 +2903,19 @@ gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
if (priv->srtpdec) if (priv->srtpdec)
gst_object_unref (priv->srtpdec); gst_object_unref (priv->srtpdec);
if (priv->mcast_addr_v4)
gst_rtsp_address_free (priv->mcast_addr_v4);
priv->mcast_addr_v4 = NULL;
if (priv->mcast_addr_v6)
gst_rtsp_address_free (priv->mcast_addr_v6);
priv->mcast_addr_v6 = NULL;
if (priv->server_addr_v4)
gst_rtsp_address_free (priv->server_addr_v4);
priv->server_addr_v4 = NULL;
if (priv->server_addr_v6)
gst_rtsp_address_free (priv->server_addr_v6);
priv->server_addr_v6 = NULL;
g_clear_object (&priv->joined_bin); g_clear_object (&priv->joined_bin);
g_mutex_unlock (&priv->lock); g_mutex_unlock (&priv->lock);
@ -3321,13 +3275,11 @@ update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
case GST_RTSP_LOWER_TRANS_UDP_MCAST: case GST_RTSP_LOWER_TRANS_UDP_MCAST:
{ {
if (add) { if (add) {
if (!create_mcast_part_for_transport (stream, tr)) if (!check_mcast_part_for_transport (stream, tr))
goto mcast_error; goto mcast_error;
priv->transports = g_list_prepend (priv->transports, trans); priv->transports = g_list_prepend (priv->transports, trans);
} else { } else {
priv->transports = g_list_remove (priv->transports, trans); priv->transports = g_list_remove (priv->transports, trans);
/* FIXME: Check if there are remaining mcast transports, and destroy
* mcast part if its now unused */
} }
break; break;
} }