From ca855abae1a38c4278f85b93f84bf2b0c6a8a4d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Mon, 5 Sep 2016 18:04:50 +0300 Subject: [PATCH] rtsp-stream: Always create multicast UDP elements if the protocol flag is set Adding them later will cause deadlocks due to 1) pre-rolling and staying in PAUSED with the unicast/TCP sinks 2) adding the multicast sink 3) waiting for it to get data to preroll again 3) never happens because the queues after the tee are full. --- gst/rtsp-server/rtsp-stream.c | 278 ++++++++++++++-------------------- 1 file changed, 115 insertions(+), 163 deletions(-) diff --git a/gst/rtsp-server/rtsp-stream.c b/gst/rtsp-server/rtsp-stream.c index 1d5f58e947..5773fa60c9 100644 --- a/gst/rtsp-server/rtsp-stream.c +++ b/gst/rtsp-server/rtsp-stream.c @@ -124,8 +124,6 @@ struct _GstRTSPStreamPrivate GstRTSPAddressPool *pool; /* unicast server addr/port */ - GstRTSPRange server_port_v4; - GstRTSPRange server_port_v6; GstRTSPAddress *server_addr_v4; GstRTSPAddress *server_addr_v6; @@ -915,21 +913,9 @@ gst_rtsp_stream_get_multicast_iface (GstRTSPStream * stream) return result; } -/** - * gst_rtsp_stream_get_multicast_address: - * @stream: a #GstRTSPStream - * @family: the #GSocketFamily - * - * Get the multicast address of @stream for @family. The original - * #GstRTSPAddress is cached and copy is returned, so freeing the return value - * won't release the address from the pool. - * - * Returns: (transfer full) (nullable): the #GstRTSPAddress of @stream - * or %NULL when no address could be allocated. gst_rtsp_address_free() - * after usage. - */ -GstRTSPAddress * -gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream, + +static GstRTSPAddress * +gst_rtsp_stream_get_multicast_address_locked (GstRTSPStream * stream, GSocketFamily family) { GstRTSPStreamPrivate *priv; @@ -937,8 +923,6 @@ gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream, GstRTSPAddress **addrp; GstRTSPAddressFlags flags; - g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL); - priv = stream->priv; if (family == G_SOCKET_FAMILY_IPV6) { @@ -949,7 +933,6 @@ gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream, addrp = &priv->mcast_addr_v4; } - g_mutex_lock (&priv->lock); if (*addrp == NULL) { if (priv->pool == NULL) goto no_pool; @@ -967,7 +950,6 @@ gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream, * GST_RTSP_ADDRESS_FLAG_UNICAST are givent. */ } result = gst_rtsp_address_copy (*addrp); - g_mutex_unlock (&priv->lock); return result; @@ -975,17 +957,43 @@ gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream, no_pool: { GST_ERROR_OBJECT (stream, "no address pool specified"); - g_mutex_unlock (&priv->lock); return NULL; } no_address: { GST_ERROR_OBJECT (stream, "failed to acquire address from pool"); - g_mutex_unlock (&priv->lock); return NULL; } } +/** + * gst_rtsp_stream_get_multicast_address: + * @stream: a #GstRTSPStream + * @family: the #GSocketFamily + * + * Get the multicast address of @stream for @family. The original + * #GstRTSPAddress is cached and copy is returned, so freeing the return value + * won't release the address from the pool. + * + * Returns: (transfer full) (nullable): the #GstRTSPAddress of @stream + * or %NULL when no address could be allocated. gst_rtsp_address_free() + * after usage. + */ +GstRTSPAddress * +gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream, + GSocketFamily family) +{ + GstRTSPAddress *result; + + g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL); + + g_mutex_lock (&stream->priv->lock); + result = gst_rtsp_stream_get_multicast_address_locked (stream, family); + g_mutex_unlock (&stream->priv->lock); + + return result; +} + /** * gst_rtsp_stream_reserve_address: * @stream: a #GstRTSPStream @@ -1202,7 +1210,7 @@ error: static gboolean alloc_ports_one_family (GstRTSPStream * stream, GSocketFamily family, GstElement * udpsrc_out[2], GstElement * udpsink_out[2], - GstRTSPRange * server_port_out, GstRTSPAddress ** server_addr_out) + GstRTSPAddress ** server_addr_out, gboolean multicast) { GstRTSPStreamPrivate *priv = stream->priv; GSocket *rtp_socket = NULL; @@ -1248,13 +1256,21 @@ again: g_socket_set_multicast_loopback (rtp_socket, FALSE); } - if (pool && gst_rtsp_address_pool_has_unicast_addresses (pool)) { + if ((pool && gst_rtsp_address_pool_has_unicast_addresses (pool)) || multicast) { GstRTSPAddressFlags flags; if (addr) rejected_addresses = g_list_prepend (rejected_addresses, addr); - flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_UNICAST; + if (!pool) + goto no_ports; + + flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT; + if (multicast) + flags |= GST_RTSP_ADDRESS_FLAG_MULTICAST; + else + flags |= GST_RTSP_ADDRESS_FLAG_UNICAST; + if (family == G_SOCKET_FAMILY_IPV6) flags |= GST_RTSP_ADDRESS_FLAG_IPV6; else @@ -1316,21 +1332,20 @@ again: } g_object_unref (rtcp_sockaddr); - if (addr == NULL) - addr_str = g_inet_address_to_string (inetaddr); - else - addr_str = addr->address; + if (!addr) { + addr = g_slice_new0 (GstRTSPAddress); + addr->address = g_inet_address_to_string (inetaddr); + addr->port = tmp_rtp; + addr->n_ports = 2; + } + + addr_str = addr->address; g_clear_object (&inetaddr); if (!create_and_configure_udpsources (udpsrc_out, rtp_socket, rtcp_socket)) { - if (addr == NULL) - g_free (addr_str); goto no_udp_protocol; } - if (addr == NULL) - g_free (addr_str); - g_object_get (G_OBJECT (udpsrc_out[0]), "port", &rtpport, NULL); g_object_get (G_OBJECT (udpsrc_out[1]), "port", &rtcpport, NULL); @@ -1338,18 +1353,26 @@ again: if (rtpport != tmp_rtp || rtcpport != tmp_rtcp) goto port_error; - server_port_out->min = rtpport; - server_port_out->max = rtcpport; - /* This function is called twice (for v4 and v6) but we create only one pair * of udpsinks. */ if (!udpsink_out[0] && !create_and_configure_udpsinks (stream, udpsink_out)) goto no_udp_protocol; + if (multicast) { + g_object_set (G_OBJECT (udpsink_out[0]), "multicast-iface", + priv->multicast_iface, NULL); + g_object_set (G_OBJECT (udpsink_out[1]), "multicast-iface", + priv->multicast_iface, NULL); + + g_signal_emit_by_name (udpsink_out[0], "add", addr_str, rtpport, NULL); + g_signal_emit_by_name (udpsink_out[1], "add", addr_str, rtcpport, NULL); + } + set_sockets_for_udpsinks (udpsink_out, rtp_socket, rtcp_socket, family); *server_addr_out = addr; + g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free); g_object_unref (rtp_socket); @@ -1464,15 +1487,21 @@ alloc_ports (GstRTSPStream * stream) GstRTSPStreamPrivate *priv = stream->priv; gboolean ret = TRUE; - if ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) || - (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST)) { + if (priv->protocols & GST_RTSP_LOWER_TRANS_UDP) { ret = alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4, - priv->udpsrc_v4, priv->udpsink, - &priv->server_port_v4, &priv->server_addr_v4); + priv->udpsrc_v4, priv->udpsink, &priv->server_addr_v4, FALSE); ret |= alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6, - priv->udpsrc_v6, priv->udpsink, - &priv->server_port_v6, &priv->server_addr_v6); + priv->udpsrc_v6, priv->udpsink, &priv->server_addr_v6, FALSE); + } + + /* FIXME: Maybe actually consider the return values? */ + if (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST) { + ret |= alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4, + priv->mcast_udpsrc_v4, priv->mcast_udpsink, &priv->mcast_addr_v4, TRUE); + + ret |= alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6, + priv->mcast_udpsrc_v6, priv->mcast_udpsink, &priv->mcast_addr_v6, TRUE); } return ret; @@ -1499,11 +1528,17 @@ gst_rtsp_stream_get_server_port (GstRTSPStream * stream, g_mutex_lock (&priv->lock); if (family == G_SOCKET_FAMILY_IPV4) { - if (server_port) - *server_port = priv->server_port_v4; + if (server_port) { + server_port->min = priv->server_addr_v4->port; + server_port->max = + priv->server_addr_v4->port + priv->server_addr_v4->n_ports - 1; + } } else { - if (server_port) - *server_port = priv->server_port_v6; + if (server_port) { + server_port->min = priv->server_addr_v6->port; + server_port->max = + priv->server_addr_v6->port + priv->server_addr_v6->n_ports - 1; + } } g_mutex_unlock (&priv->lock); } @@ -2357,7 +2392,7 @@ create_sender_part (GstRTSPStream * stream, GstBin * bin, GstState state) { GstRTSPStreamPrivate *priv; GstPad *pad; - gboolean is_tcp = FALSE, is_udp = FALSE; + gboolean is_tcp, is_udp; gint i; priv = stream->priv; @@ -2414,7 +2449,12 @@ create_sender_part (GstRTSPStream * stream, GstBin * bin, GstState state) gst_pad_link (priv->send_src[i], pad); gst_object_unref (pad); - plug_sink (bin, priv->tee[i], priv->udpsink[i], &priv->udpqueue[i]); + if (priv->udpsink[i]) + plug_sink (bin, priv->tee[i], priv->udpsink[i], &priv->udpqueue[i]); + + if (priv->mcast_udpsink[i]) + plug_sink (bin, priv->tee[i], priv->mcast_udpsink[i], + &priv->mcast_udpqueue[i]); if (is_tcp) { g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL); @@ -2437,12 +2477,16 @@ create_sender_part (GstRTSPStream * stream, GstBin * bin, GstState state) if (state != GST_STATE_NULL) { if (priv->udpsink[i]) gst_element_set_state (priv->udpsink[i], state); + if (priv->mcast_udpsink[i]) + gst_element_set_state (priv->mcast_udpsink[i], state); if (priv->appsink[i]) gst_element_set_state (priv->appsink[i], state); if (priv->appqueue[i]) gst_element_set_state (priv->appqueue[i], state); if (priv->udpqueue[i]) gst_element_set_state (priv->udpqueue[i], state); + if (priv->mcast_udpqueue[i]) + gst_element_set_state (priv->mcast_udpqueue[i], state); if (priv->tee[i]) gst_element_set_state (priv->tee[i], state); } @@ -2525,6 +2569,12 @@ create_receiver_part (GstRTSPStream * stream, GstBin * bin, GstState state) if (priv->udpsrc_v6[i]) plug_src (stream, bin, priv->udpsrc_v6[i], priv->funnel[i]); + if (priv->mcast_udpsrc_v4[i]) + plug_src (stream, bin, priv->mcast_udpsrc_v4[i], priv->funnel[i]); + + if (priv->mcast_udpsrc_v6[i]) + plug_src (stream, bin, priv->mcast_udpsrc_v6[i], priv->funnel[i]); + if (is_tcp) { /* make and add appsrc */ priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL); @@ -2542,20 +2592,13 @@ create_receiver_part (GstRTSPStream * stream, GstBin * bin, GstState state) } static gboolean -create_mcast_part_for_transport (GstRTSPStream * stream, +check_mcast_part_for_transport (GstRTSPStream * stream, const GstRTSPTransport * tr) { GstRTSPStreamPrivate *priv = stream->priv; GInetAddress *inetaddr; GSocketFamily family; GstRTSPAddress *mcast_addr; - GstElement **mcast_udpsrc; - GSocket *rtp_socket = NULL; - GSocket *rtcp_socket = NULL; - GSocketAddress *rtp_sockaddr = NULL; - GSocketAddress *rtcp_sockaddr = NULL; - GError *error = NULL; - const gchar *multicast_iface = priv->multicast_iface; /* Check if it's a ipv4 or ipv6 transport */ inetaddr = g_inet_address_new_from_string (tr->destination); @@ -2565,10 +2608,8 @@ create_mcast_part_for_transport (GstRTSPStream * stream, /* Select fields corresponding to the family */ if (family == G_SOCKET_FAMILY_IPV4) { mcast_addr = priv->mcast_addr_v4; - mcast_udpsrc = priv->mcast_udpsrc_v4; } else { mcast_addr = priv->mcast_addr_v6; - mcast_udpsrc = priv->mcast_udpsrc_v6; } /* We support only one mcast group per family, make sure this transport @@ -2582,95 +2623,6 @@ create_mcast_part_for_transport (GstRTSPStream * stream, tr->ttl != mcast_addr->ttl) goto wrong_addr; - if (mcast_udpsrc[0]) { - /* We already created elements for this family. Since we support only one - * mcast group per family, there is nothing more to do here. */ - g_assert (mcast_udpsrc[1]); - g_assert (priv->mcast_udpqueue[0]); - g_assert (priv->mcast_udpqueue[1]); - g_assert (priv->mcast_udpsink[0]); - g_assert (priv->mcast_udpsink[1]); - return TRUE; - } - - g_assert (!mcast_udpsrc[1]); - - /* Create RTP/RTCP sockets and bind them on ANY with mcast ports */ - rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM, - G_SOCKET_PROTOCOL_UDP, &error); - if (!rtp_socket) - goto socket_error; - g_socket_set_multicast_loopback (rtp_socket, FALSE); - - rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM, - G_SOCKET_PROTOCOL_UDP, &error); - if (!rtcp_socket) - goto socket_error; - g_socket_set_multicast_loopback (rtcp_socket, FALSE); - - inetaddr = g_inet_address_new_any (family); - rtp_sockaddr = g_inet_socket_address_new (inetaddr, mcast_addr->port); - rtcp_sockaddr = g_inet_socket_address_new (inetaddr, mcast_addr->port + 1); - g_object_unref (inetaddr); - - if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, &error)) - goto socket_error; - - if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, &error)) - goto socket_error; - - g_object_unref (rtp_sockaddr); - g_object_unref (rtcp_sockaddr); - - /* Add receiver part */ - create_and_configure_udpsources (mcast_udpsrc, rtp_socket, rtcp_socket); - if (priv->sinkpad) { - plug_src (stream, priv->joined_bin, mcast_udpsrc[0], priv->funnel[0]); - gst_element_sync_state_with_parent (mcast_udpsrc[0]); - } - plug_src (stream, priv->joined_bin, mcast_udpsrc[1], priv->funnel[1]); - gst_element_sync_state_with_parent (mcast_udpsrc[1]); - - /* Add sender part, could already have been created for the other family. */ - if (!priv->mcast_udpsink[0]) { - g_assert (!priv->mcast_udpsink[1]); - g_assert (!priv->mcast_udpqueue[0]); - g_assert (!priv->mcast_udpqueue[1]); - - create_and_configure_udpsinks (stream, priv->mcast_udpsink); - - g_object_set (G_OBJECT (priv->mcast_udpsink[0]), "multicast-iface", - multicast_iface, NULL); - g_object_set (G_OBJECT (priv->mcast_udpsink[1]), "multicast-iface", - multicast_iface, NULL); - - g_signal_emit_by_name (priv->mcast_udpsink[0], "add", mcast_addr->address, - mcast_addr->port, NULL); - g_signal_emit_by_name (priv->mcast_udpsink[1], "add", mcast_addr->address, - mcast_addr->port + 1, NULL); - - set_sockets_for_udpsinks (priv->mcast_udpsink, rtp_socket, rtcp_socket, - family); - - if (priv->srcpad) { - plug_sink (priv->joined_bin, priv->tee[0], priv->mcast_udpsink[0], - &priv->mcast_udpqueue[0]); - gst_element_sync_state_with_parent (priv->mcast_udpsink[0]); - gst_element_sync_state_with_parent (priv->mcast_udpqueue[0]); - } - plug_sink (priv->joined_bin, priv->tee[1], priv->mcast_udpsink[1], - &priv->mcast_udpqueue[1]); - gst_element_sync_state_with_parent (priv->mcast_udpsink[1]); - gst_element_sync_state_with_parent (priv->mcast_udpqueue[1]); - } else { - g_assert (priv->mcast_udpsink[1]); - g_assert (priv->mcast_udpqueue[0]); - g_assert (priv->mcast_udpqueue[1]); - - set_sockets_for_udpsinks (priv->mcast_udpsink, rtp_socket, rtcp_socket, - family); - } - return TRUE; no_addr: @@ -2685,17 +2637,6 @@ wrong_addr: "the reserved address"); return FALSE; } -socket_error: - { - GST_ERROR_OBJECT (stream, "Error creating and binding mcast socket: %s", - error->message); - g_clear_object (&rtp_socket); - g_clear_object (&rtcp_socket); - g_clear_object (&rtp_sockaddr); - g_clear_object (&rtcp_sockaddr); - g_clear_error (&error); - return FALSE; - } } /** @@ -2962,6 +2903,19 @@ gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin, if (priv->srtpdec) gst_object_unref (priv->srtpdec); + if (priv->mcast_addr_v4) + gst_rtsp_address_free (priv->mcast_addr_v4); + priv->mcast_addr_v4 = NULL; + if (priv->mcast_addr_v6) + gst_rtsp_address_free (priv->mcast_addr_v6); + priv->mcast_addr_v6 = NULL; + if (priv->server_addr_v4) + gst_rtsp_address_free (priv->server_addr_v4); + priv->server_addr_v4 = NULL; + if (priv->server_addr_v6) + gst_rtsp_address_free (priv->server_addr_v6); + priv->server_addr_v6 = NULL; + g_clear_object (&priv->joined_bin); g_mutex_unlock (&priv->lock); @@ -3321,13 +3275,11 @@ update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans, case GST_RTSP_LOWER_TRANS_UDP_MCAST: { if (add) { - if (!create_mcast_part_for_transport (stream, tr)) + if (!check_mcast_part_for_transport (stream, tr)) goto mcast_error; priv->transports = g_list_prepend (priv->transports, trans); } else { priv->transports = g_list_remove (priv->transports, trans); - /* FIXME: Check if there are remaining mcast transports, and destroy - * mcast part if its now unused */ } break; }