From fe5f8077c1523206147c746cc40364ea16da669f Mon Sep 17 00:00:00 2001 From: Jake Foytik Date: Mon, 25 Apr 2016 08:55:25 -0400 Subject: [PATCH] rtsp-stream: Fix crash on cleanup with shared media and multiple udpsrc - Unicast udpsrcs are now managed in a hash table. This allows for proper cleanup in with shared streams and fixes a memory leak. - Unicast udpsrcs are now properly cleaned up when shared connections exit. See the update_transport() function. - Create unit test for shared media. https://bugzilla.gnome.org/show_bug.cgi?id=764744 --- gst/rtsp-server/rtsp-stream.c | 301 +++++++++++++++++++--------------- tests/check/gst/rtspserver.c | 120 +++++++++++--- tests/check/gst/stream.c | 40 ++--- 3 files changed, 288 insertions(+), 173 deletions(-) diff --git a/gst/rtsp-server/rtsp-stream.c b/gst/rtsp-server/rtsp-stream.c index 810ee1c7cd..27ad5acd0d 100644 --- a/gst/rtsp-server/rtsp-stream.c +++ b/gst/rtsp-server/rtsp-stream.c @@ -62,6 +62,18 @@ #define GST_RTSP_STREAM_GET_PRIVATE(obj) \ (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_STREAM, GstRTSPStreamPrivate)) +/* Container for udpsrc elements created for a specific RTSPTransport. */ +typedef struct +{ + GstElement *udpsrc[2]; +} GstRTSPStreamUDPSrcs; + +static void +destroy_udp_srcs_func (gpointer data) +{ + g_slice_free (GstRTSPStreamUDPSrcs, (GstRTSPStreamUDPSrcs *) data); +} + struct _GstRTSPStreamPrivate { GMutex lock; @@ -95,16 +107,11 @@ struct _GstRTSPStreamPrivate GstElement *srtpdec; GHashTable *keys; - /* sinks used for sending and receiving RTP and RTCP over ipv4, they share - * sockets */ - GstElement *udpsrc_v4[2]; - /* UDP sources for UDP multicast transports */ - GstElement *udpsrc_mcast_v4[2]; + /* Unicast UDP sources associated with RTSPTransports */ + GHashTable *udpsrcs; - /* sinks used for sending and receiving RTP and RTCP over ipv6, they share - * sockets */ - GstElement *udpsrc_v6[2]; - /* UDP sources for UDP multicast transports */ + /* Only allow one set of IPV4 and IPV6 multicast udpsrcs */ + GstElement *udpsrc_mcast_v4[2]; GstElement *udpsrc_mcast_v6[2]; GstElement *udpqueue[2]; @@ -127,12 +134,10 @@ struct _GstRTSPStreamPrivate /* server ports for sending/receiving over ipv4 */ GstRTSPRange server_port_v4; GstRTSPAddress *server_addr_v4; - gboolean have_ipv4; /* server ports for sending/receiving over ipv6 */ GstRTSPRange server_port_v6; GstRTSPAddress *server_addr_v6; - gboolean have_ipv6; /* multicast addresses */ GstRTSPAddressPool *pool; @@ -270,6 +275,8 @@ gst_rtsp_stream_init (GstRTSPStream * stream) NULL, (GDestroyNotify) gst_caps_unref); priv->ptmap = g_hash_table_new_full (NULL, NULL, NULL, (GDestroyNotify) gst_caps_unref); + priv->udpsrcs = g_hash_table_new_full (g_direct_hash, g_direct_equal, + NULL, (GDestroyNotify) destroy_udp_srcs_func); } static void @@ -312,6 +319,11 @@ gst_rtsp_stream_finalize (GObject * obj) g_hash_table_unref (priv->keys); g_hash_table_destroy (priv->ptmap); + /* We expect all udpsrcs to be cleaned up by this point. */ + if (g_hash_table_size (priv->udpsrcs) > 0) + g_critical ("Unreffing udpsrcs hash table that contains elements."); + g_hash_table_unref (priv->udpsrcs); + G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj); } @@ -1493,42 +1505,63 @@ gst_rtsp_stream_allocate_udp_sockets (GstRTSPStream * stream, g_mutex_lock (&priv->lock); - if (family == G_SOCKET_FAMILY_IPV4) { - if (transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) { - if (priv->have_ipv4_mcast) + if (transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) { + if (family == G_SOCKET_FAMILY_IPV4) { + /* Multicast IPV4 */ + if (priv->have_ipv4_mcast) { + result = TRUE; goto done; + } + priv->have_ipv4_mcast = alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4, priv->udpsrc_mcast_v4, &priv->server_port_v4, ct, &priv->addr_v4, use_client_settings); + result = priv->have_ipv4_mcast; + } else { - priv->have_ipv4 = - alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4, priv->udpsrc_v4, - &priv->server_port_v4, ct, &priv->server_addr_v4, - use_client_settings); - } - } else { - if (transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) { - if (priv->have_ipv6_mcast) + /* Multicast IPV6 */ + if (priv->have_ipv6_mcast) { + result = TRUE; goto done; + } + priv->have_ipv6_mcast = alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6, priv->udpsrc_mcast_v6, &priv->server_port_v6, ct, &priv->addr_v6, use_client_settings); - } else { - if (priv->have_ipv6) - goto done; - priv->have_ipv6 = - alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6, priv->udpsrc_v6, - &priv->server_port_v6, ct, &priv->server_addr_v6, - use_client_settings); + result = priv->have_ipv6_mcast; } + } else { + /* We allow multiple unicast transports, so we must maintain a table of the + * udpsrcs created for them. */ + GstRTSPStreamUDPSrcs *transport_udpsrcs = + g_slice_new0 (GstRTSPStreamUDPSrcs); + + if (family == G_SOCKET_FAMILY_IPV4) { + /* Unicast IPV4 */ + result = + alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4, + transport_udpsrcs->udpsrc, &priv->server_port_v4, ct, + &priv->server_addr_v4, use_client_settings); + } else { + /* Unicast IPV6 */ + result = + alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6, + transport_udpsrcs->udpsrc, &priv->server_port_v6, ct, + &priv->server_addr_v6, use_client_settings); + } + + /* If we didn't create any unicast udpsrcs, free the transport_udpsrcs struct. + * Otherwise, add it to the hash table */ + if (transport_udpsrcs->udpsrc[0] == NULL + && transport_udpsrcs->udpsrc[1] == NULL) + g_slice_free (GstRTSPStreamUDPSrcs, transport_udpsrcs); + else + g_hash_table_insert (priv->udpsrcs, ct, transport_udpsrcs); } done: - result = priv->have_ipv4 || priv->have_ipv4_mcast || priv->have_ipv6 || - priv->have_ipv6_mcast; - g_mutex_unlock (&priv->lock); return result; @@ -2586,39 +2619,6 @@ create_receiver_part (GstRTSPStream * stream, GstBin * bin, GstState state) gst_pad_link (pad, priv->recv_sink[i]); gst_object_unref (pad); - if (priv->udpsrc_v4[i]) { - if (priv->srcpad) { - /* we set and keep these to playing so that they don't cause NO_PREROLL return - * values. This is only relevant for PLAY pipelines */ - gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_PLAYING); - gst_element_set_locked_state (priv->udpsrc_v4[i], TRUE); - } - /* add udpsrc */ - gst_bin_add (bin, priv->udpsrc_v4[i]); - - /* and link to the funnel v4 */ - selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u"); - pad = gst_element_get_static_pad (priv->udpsrc_v4[i], "src"); - gst_pad_link (pad, selpad); - gst_object_unref (pad); - gst_object_unref (selpad); - } - - if (priv->udpsrc_v6[i]) { - if (priv->srcpad) { - gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_PLAYING); - gst_element_set_locked_state (priv->udpsrc_v6[i], TRUE); - } - gst_bin_add (bin, priv->udpsrc_v6[i]); - - /* and link to the funnel v6 */ - selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u"); - pad = gst_element_get_static_pad (priv->udpsrc_v6[i], "src"); - gst_pad_link (pad, selpad); - gst_object_unref (pad); - gst_object_unref (selpad); - } - if (is_tcp) { /* make and add appsrc */ priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL); @@ -2808,51 +2808,50 @@ no_udp_protocol: gst_element_set_state (priv->udpsink[0], GST_STATE_NULL); if (priv->udpsink[1]) gst_element_set_state (priv->udpsink[1], GST_STATE_NULL); - if (priv->udpsrc_v4[0]) { - gst_element_set_state (priv->udpsrc_v4[0], GST_STATE_NULL); - gst_object_unref (priv->udpsrc_v4[0]); - priv->udpsrc_v4[0] = NULL; - } - if (priv->udpsrc_v4[1]) { - gst_element_set_state (priv->udpsrc_v4[1], GST_STATE_NULL); - gst_object_unref (priv->udpsrc_v4[1]); - priv->udpsrc_v4[1] = NULL; - } - if (priv->udpsrc_mcast_v4[0]) { - gst_element_set_state (priv->udpsrc_mcast_v4[0], GST_STATE_NULL); - gst_object_unref (priv->udpsrc_mcast_v4[0]); - priv->udpsrc_mcast_v4[0] = NULL; - } - if (priv->udpsrc_mcast_v4[1]) { - gst_element_set_state (priv->udpsrc_mcast_v4[1], GST_STATE_NULL); - gst_object_unref (priv->udpsrc_mcast_v4[1]); - priv->udpsrc_mcast_v4[1] = NULL; - } - if (priv->udpsrc_v6[0]) { - gst_element_set_state (priv->udpsrc_v6[0], GST_STATE_NULL); - gst_object_unref (priv->udpsrc_v6[0]); - priv->udpsrc_v6[0] = NULL; - } - if (priv->udpsrc_v6[1]) { - gst_element_set_state (priv->udpsrc_v6[1], GST_STATE_NULL); - gst_object_unref (priv->udpsrc_v6[1]); - priv->udpsrc_v6[1] = NULL; - } - if (priv->udpsrc_mcast_v6[0]) { - gst_element_set_state (priv->udpsrc_mcast_v6[0], GST_STATE_NULL); - gst_object_unref (priv->udpsrc_mcast_v6[0]); - priv->udpsrc_mcast_v6[0] = NULL; - } - if (priv->udpsrc_mcast_v6[1]) { - gst_element_set_state (priv->udpsrc_mcast_v6[1], GST_STATE_NULL); - gst_object_unref (priv->udpsrc_mcast_v6[1]); - priv->udpsrc_mcast_v6[1] = NULL; - } + g_mutex_unlock (&priv->lock); return FALSE; } } +/* Must be called with priv->lock. */ +static void +remove_all_unicast_udpsrcs (GstRTSPStream * stream, GstBin * bin) +{ + GstRTSPStreamPrivate *priv; + GHashTableIter iter; + gpointer iter_key, iter_value; + + priv = stream->priv; + + /* Remove all of the unicast udpsrcs */ + g_hash_table_iter_init (&iter, priv->udpsrcs); + while (g_hash_table_iter_next (&iter, &iter_key, &iter_value)) { + GstRTSPStreamUDPSrcs *transport_udpsrcs = + (GstRTSPStreamUDPSrcs *) iter_value; + + for (int i = 0; i < 2; i++) { + if (transport_udpsrcs->udpsrc[i]) { + if (priv->sinkpad || i == 1) { + /* Set udpsrc to NULL now before removing */ + gst_element_set_locked_state (transport_udpsrcs->udpsrc[i], FALSE); + gst_element_set_state (transport_udpsrcs->udpsrc[i], GST_STATE_NULL); + + /* removing them should also nicely release the request + * pads when they finalize */ + gst_bin_remove (bin, transport_udpsrcs->udpsrc[i]); + } else { + /* we need to set the state to NULL before unref */ + gst_element_set_state (transport_udpsrcs->udpsrc[i], GST_STATE_NULL); + gst_object_unref (transport_udpsrcs->udpsrc[i]); + } + } + } + } + + g_hash_table_remove_all (priv->udpsrcs); +} + /** * gst_rtsp_stream_leave_bin: * @stream: a #GstRTSPStream @@ -2910,6 +2909,7 @@ gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin, is_udp = ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) || (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST)); + remove_all_unicast_udpsrcs (stream, bin); for (i = 0; i < 2; i++) { if (priv->udpsink[i]) @@ -2927,21 +2927,6 @@ gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin, if (priv->appsrc[i]) gst_element_set_state (priv->appsrc[i], GST_STATE_NULL); - if (priv->udpsrc_v4[i]) { - if (priv->sinkpad || i == 1) { - /* and set udpsrc to NULL now before removing */ - gst_element_set_locked_state (priv->udpsrc_v4[i], FALSE); - gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL); - /* removing them should also nicely release the request - * pads when they finalize */ - gst_bin_remove (bin, priv->udpsrc_v4[i]); - } else { - /* we need to set the state to NULL before unref */ - gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL); - gst_object_unref (priv->udpsrc_v4[i]); - } - } - if (priv->udpsrc_mcast_v4[i]) { if (priv->sinkpad || i == 1) { /* and set udpsrc to NULL now before removing */ @@ -2956,16 +2941,6 @@ gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin, } } - if (priv->udpsrc_v6[i]) { - if (priv->sinkpad || i == 1) { - gst_element_set_locked_state (priv->udpsrc_v6[i], FALSE); - gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL); - gst_bin_remove (bin, priv->udpsrc_v6[i]); - } else { - gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL); - gst_object_unref (priv->udpsrc_v6[i]); - } - } if (priv->udpsrc_mcast_v6[i]) { if (priv->sinkpad || i == 1) { gst_element_set_locked_state (priv->udpsrc_mcast_v6[i], FALSE); @@ -3006,8 +2981,6 @@ gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin, priv->recv_sink[i] = NULL; } - priv->udpsrc_v4[i] = NULL; - priv->udpsrc_v6[i] = NULL; priv->udpsrc_mcast_v4[i] = NULL; priv->udpsrc_mcast_v6[i] = NULL; priv->udpsink[i] = NULL; @@ -3378,6 +3351,68 @@ gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer) return ret; } +/* Properly dispose udpsrcs that were created for a given transport. */ +/* Must be called with priv->lock. */ +static void +remove_transport_udpsrcs (GstRTSPStreamPrivate * priv, + const GstRTSPTransport * tr) +{ + /* Remove the udpsrcs associated with this transport. */ + GstRTSPStreamUDPSrcs *transport_udpsrcs = + g_hash_table_lookup (priv->udpsrcs, tr); + if (transport_udpsrcs != NULL) { + for (int i = 0; i < 2; i++) { + if (transport_udpsrcs->udpsrc[i]) { + if (priv->sinkpad || i == 1) { + GstBin *bin; + GstPad *udpsrc_srcpad, *funnel_sinkpad; + + /* We know these udpsrcs are all linked to funnels. Explicitely + * get the funnel src pads so we can properly release them. */ + udpsrc_srcpad = + gst_element_get_static_pad (transport_udpsrcs->udpsrc[i], "src"); + funnel_sinkpad = gst_pad_get_peer (udpsrc_srcpad); + + if (funnel_sinkpad != NULL) { + /* Unlink pads and release funnel's request pad. */ + gst_pad_unlink (udpsrc_srcpad, funnel_sinkpad); + gst_element_release_request_pad (priv->funnel[i], funnel_sinkpad); + gst_object_unref (funnel_sinkpad); + } + gst_object_unref (udpsrc_srcpad); + + /* Set udpsrc to NULL now before removing */ + gst_element_set_locked_state (transport_udpsrcs->udpsrc[i], FALSE); + gst_element_set_state (transport_udpsrcs->udpsrc[i], GST_STATE_NULL); + + /* This udpsrc is expected to be owned by a bin. Get the bin and + * remove our element. */ + bin = GST_BIN (gst_element_get_parent (transport_udpsrcs->udpsrc[i])); + if (bin != NULL) { + gst_bin_remove (bin, transport_udpsrcs->udpsrc[i]); + gst_object_unref (bin); + } else { + GST_ERROR ("Expected this udpsrc element to be part of a bin."); + gst_object_unref (transport_udpsrcs->udpsrc[i]); + } + + } else { + /* we need to set the state to NULL before unref */ + gst_element_set_state (transport_udpsrcs->udpsrc[i], GST_STATE_NULL); + gst_object_unref (transport_udpsrcs->udpsrc[i]); + } + } + } + + /* The udpsrcs are now properly cleaned up. Remove them from the table */ + g_hash_table_remove (priv->udpsrcs, tr); + + } else { + /* This can happen if we're dealing with a multicast transport. */ + GST_INFO ("Could not find udpsrcs associated with this transport."); + } +} + /* must be called with lock */ static gboolean update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans, @@ -3426,6 +3461,8 @@ update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans, g_signal_emit_by_name (priv->udpsink[0], "remove", dest, min, NULL); g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL); priv->transports = g_list_remove (priv->transports, trans); + + remove_transport_udpsrcs (priv, tr); } priv->transports_cookie++; break; diff --git a/tests/check/gst/rtspserver.c b/tests/check/gst/rtspserver.c index d21bf040db..805b177d0e 100644 --- a/tests/check/gst/rtspserver.c +++ b/tests/check/gst/rtspserver.c @@ -149,7 +149,7 @@ get_client_ports (GstRTSPRange * range) /* start the tested rtsp server */ static void -start_server (void) +start_server (gboolean set_shared_factory) { GstRTSPMountPoints *mounts; gchar *service; @@ -172,6 +172,7 @@ start_server (void) gst_rtsp_address_pool_add_range (pool, GST_RTSP_ADDRESS_POOL_ANY_IPV4, GST_RTSP_ADDRESS_POOL_ANY_IPV4, 6000, 6010, 0); gst_rtsp_media_factory_set_address_pool (factory, pool); + gst_rtsp_media_factory_set_shared (factory, set_shared_factory); gst_object_unref (pool); /* set port to any */ @@ -571,7 +572,7 @@ GST_START_TEST (test_connect) { GstRTSPConnection *conn; - start_server (); + start_server (FALSE); /* connect to server */ conn = connect_to_server (test_port, TEST_MOUNT_POINT); @@ -597,7 +598,7 @@ GST_START_TEST (test_describe) const gchar *control_video; const gchar *control_audio; - start_server (); + start_server (FALSE); conn = connect_to_server (test_port, TEST_MOUNT_POINT); @@ -668,7 +669,7 @@ GST_START_TEST (test_describe_non_existing_mount_point) { GstRTSPConnection *conn; - start_server (); + start_server (FALSE); /* send DESCRIBE request for a non-existing mount point * and check that we get a 404 Not Found */ @@ -697,7 +698,7 @@ do_test_setup (GstRTSPLowerTrans lower_transport) GstRTSPTransport *video_transport = NULL; GstRTSPTransport *audio_transport = NULL; - start_server (); + start_server (FALSE); conn = connect_to_server (test_port, TEST_MOUNT_POINT); @@ -782,7 +783,7 @@ GST_START_TEST (test_setup_twice) gchar *session1 = NULL; gchar *session2 = NULL; - start_server (); + start_server (FALSE); conn = connect_to_server (test_port, TEST_MOUNT_POINT); @@ -854,7 +855,7 @@ GST_START_TEST (test_setup_with_require_header) gchar *unsupported = NULL; GstRTSPTransport *video_transport = NULL; - start_server (); + start_server (FALSE); conn = connect_to_server (test_port, TEST_MOUNT_POINT); @@ -916,7 +917,7 @@ GST_START_TEST (test_setup_non_existing_stream) GstRTSPConnection *conn; GstRTSPRange client_ports; - start_server (); + start_server (FALSE); conn = connect_to_server (test_port, TEST_MOUNT_POINT); @@ -1009,7 +1010,8 @@ done: } static void -do_test_play_full (const gchar * range, GstRTSPLowerTrans lower_transport) +do_test_play_full (const gchar * range, GstRTSPLowerTrans lower_transport, + GMutex * lock) { GstRTSPConnection *conn; GstSDPMessage *sdp_message = NULL; @@ -1051,8 +1053,20 @@ do_test_play_full (const gchar * range, GstRTSPLowerTrans lower_transport) fail_unless_equals_string (range, range_out); g_free (range_out); - receive_rtp (rtp_socket, NULL); - receive_rtcp (rtcp_socket, NULL, 0); + for (;;) { + receive_rtp (rtp_socket, NULL); + receive_rtcp (rtcp_socket, NULL, 0); + + if (lock != NULL) { + if (g_mutex_trylock (lock) == TRUE) { + g_mutex_unlock (lock); + break; + } + } else { + break; + } + + } /* send TEARDOWN request and check that we get 200 OK */ fail_unless (do_simple_request (conn, GST_RTSP_TEARDOWN, @@ -1076,12 +1090,12 @@ do_test_play_full (const gchar * range, GstRTSPLowerTrans lower_transport) static void do_test_play (const gchar * range) { - do_test_play_full (range, GST_RTSP_LOWER_TRANS_UDP); + do_test_play_full (range, GST_RTSP_LOWER_TRANS_UDP, NULL); } GST_START_TEST (test_play) { - start_server (); + start_server (FALSE); do_test_play (NULL); @@ -1095,7 +1109,7 @@ GST_START_TEST (test_play_without_session) { GstRTSPConnection *conn; - start_server (); + start_server (FALSE); conn = connect_to_server (test_port, TEST_MOUNT_POINT); @@ -1155,7 +1169,7 @@ GST_START_TEST (test_play_multithreaded) gst_rtsp_thread_pool_set_max_threads (pool, 2); g_object_unref (pool); - start_server (); + start_server (FALSE); do_test_play (NULL); @@ -1215,7 +1229,7 @@ GST_START_TEST (test_play_multithreaded_block_in_describe) gst_rtsp_mount_points_add_factory (mounts, TEST_MOUNT_POINT "2", factory); g_object_unref (mounts); - start_server (); + start_server (FALSE); conn = connect_to_server (test_port, TEST_MOUNT_POINT "2"); iterate (); @@ -1294,7 +1308,7 @@ GST_START_TEST (test_play_multithreaded_timeout_client) g_signal_connect (server, "client-connected", G_CALLBACK (session_connected_new_session_cb), new_session_timeout_one); - start_server (); + start_server (FALSE); conn = connect_to_server (test_port, TEST_MOUNT_POINT); @@ -1367,7 +1381,7 @@ GST_START_TEST (test_play_multithreaded_timeout_session) g_signal_connect (server, "client-connected", G_CALLBACK (session_connected_new_session_cb), new_session_timeout_one); - start_server (); + start_server (FALSE); conn = connect_to_server (test_port, TEST_MOUNT_POINT); @@ -1443,7 +1457,7 @@ GST_START_TEST (test_play_disconnect) g_signal_connect (server, "client-connected", G_CALLBACK (session_connected_new_session_cb), new_session_timeout_one); - start_server (); + start_server (FALSE); conn = connect_to_server (test_port, TEST_MOUNT_POINT); @@ -1515,7 +1529,8 @@ GST_START_TEST (test_play_specific_server_port) factory = gst_rtsp_media_factory_new (); /* we have to suspend media after SDP in order to make sure that * we can reconfigure UDP sink with new UDP ports */ - gst_rtsp_media_factory_set_suspend_mode (factory, GST_RTSP_SUSPEND_MODE_RESET); + gst_rtsp_media_factory_set_suspend_mode (factory, + GST_RTSP_SUSPEND_MODE_RESET); pool = gst_rtsp_address_pool_new (); gst_rtsp_address_pool_add_range (pool, GST_RTSP_ADDRESS_POOL_ANY_IPV4, GST_RTSP_ADDRESS_POOL_ANY_IPV4, 7770, 7780, 0); @@ -1603,7 +1618,7 @@ GST_END_TEST; GST_START_TEST (test_play_smpte_range) { - start_server (); + start_server (FALSE); do_test_play ("npt=5-"); do_test_play ("smpte=0:00:00-"); @@ -1617,6 +1632,65 @@ GST_START_TEST (test_play_smpte_range) GST_END_TEST; +static gpointer +thread_func (gpointer data) +{ + do_test_play_full (NULL, GST_RTSP_LOWER_TRANS_UDP, (GMutex *) data); + return NULL; +} + +/* Test adding and removing clients to a 'Shared' media. */ +GST_START_TEST (test_shared) +{ + GMutex lock1, lock2, lock3, lock4; + GThread *thread1, *thread2, *thread3, *thread4; + + /* Locks for each thread. Each thread will keep reading data as long as the + * thread is locked. */ + g_mutex_init (&lock1); + g_mutex_init (&lock2); + g_mutex_init (&lock3); + g_mutex_init (&lock4); + + start_server (TRUE); + + /* Start the first receiver thread. */ + g_mutex_lock (&lock1); + thread1 = g_thread_new ("thread1", thread_func, &lock1); + + /* Connect and disconnect another client. */ + g_mutex_lock (&lock2); + thread2 = g_thread_new ("thread2", thread_func, &lock2); + g_mutex_unlock (&lock2); + g_mutex_clear (&lock2); + g_thread_join (thread2); + + /* Do it again. */ + g_mutex_lock (&lock3); + thread3 = g_thread_new ("thread3", thread_func, &lock3); + g_mutex_unlock (&lock3); + g_mutex_clear (&lock3); + g_thread_join (thread3); + + /* Disconnect the last client. This will clean up the media. */ + g_mutex_unlock (&lock1); + g_mutex_clear (&lock1); + g_thread_join (thread1); + + /* Connect and disconnect another client. This will create and clean up the + * media. */ + g_mutex_lock (&lock4); + thread4 = g_thread_new ("thread4", thread_func, &lock4); + g_mutex_unlock (&lock4); + g_mutex_clear (&lock4); + g_thread_join (thread4); + + stop_server (); + iterate (); +} + +GST_END_TEST; + GST_START_TEST (test_announce_without_sdp) { GstRTSPConnection *conn; @@ -1749,7 +1823,8 @@ GST_START_TEST (test_record_tcp) gint i; mfactory = - start_record_server ("( rtppcmadepay name=depay0 ! appsink name=sink async=false )"); + start_record_server + ("( rtppcmadepay name=depay0 ! appsink name=sink async=false )"); g_signal_connect (mfactory, "media-constructed", G_CALLBACK (media_constructed_cb), &server_sink); @@ -1926,6 +2001,7 @@ rtspserver_suite (void) tcase_add_test (tc, test_play_disconnect); tcase_add_test (tc, test_play_specific_server_port); tcase_add_test (tc, test_play_smpte_range); + tcase_add_test (tc, test_shared); tcase_add_test (tc, test_announce_without_sdp); tcase_add_test (tc, test_record_tcp); return s; diff --git a/tests/check/gst/stream.c b/tests/check/gst/stream.c index 0fb7e59c4f..26c291582c 100644 --- a/tests/check/gst/stream.c +++ b/tests/check/gst/stream.c @@ -52,18 +52,20 @@ GST_START_TEST (test_get_sockets) /* configure address pool for IPv4 and IPv6 unicast addresses */ pool = gst_rtsp_address_pool_new (); - fail_unless (gst_rtsp_address_pool_add_range (pool, GST_RTSP_ADDRESS_POOL_ANY_IPV4, - GST_RTSP_ADDRESS_POOL_ANY_IPV4, 50000, 60000, 0)); - fail_unless (gst_rtsp_address_pool_add_range (pool, GST_RTSP_ADDRESS_POOL_ANY_IPV6, - GST_RTSP_ADDRESS_POOL_ANY_IPV6, 50000, 60000, 0)); + fail_unless (gst_rtsp_address_pool_add_range (pool, + GST_RTSP_ADDRESS_POOL_ANY_IPV4, GST_RTSP_ADDRESS_POOL_ANY_IPV4, 50000, + 60000, 0)); + fail_unless (gst_rtsp_address_pool_add_range (pool, + GST_RTSP_ADDRESS_POOL_ANY_IPV6, GST_RTSP_ADDRESS_POOL_ANY_IPV6, 50000, + 60000, 0)); gst_rtsp_stream_set_address_pool (stream, pool); fail_unless (gst_rtsp_stream_join_bin (stream, bin, rtpbin, GST_STATE_NULL)); gst_rtsp_transport_new (&tr); tr->lower_transport = GST_RTSP_LOWER_TRANS_UDP; - fail_unless (gst_rtsp_stream_allocate_udp_sockets (stream, G_SOCKET_FAMILY_IPV4, - tr, FALSE)); + fail_unless (gst_rtsp_stream_allocate_udp_sockets (stream, + G_SOCKET_FAMILY_IPV4, tr, FALSE)); socket = gst_rtsp_stream_get_rtp_socket (stream, G_SOCKET_FAMILY_IPV4); have_ipv4 = (socket != NULL); @@ -138,7 +140,7 @@ GST_START_TEST (test_allocate_udp_ports_fail) pool = gst_rtsp_address_pool_new (); fail_unless (gst_rtsp_address_pool_add_range (pool, "192.168.1.1", - "192.168.1.1", 6000, 6001, 0)); + "192.168.1.1", 6000, 6001, 0)); gst_rtsp_stream_set_address_pool (stream, pool); fail_unless (gst_rtsp_stream_join_bin (stream, bin, rtpbin, GST_STATE_NULL)); @@ -146,7 +148,7 @@ GST_START_TEST (test_allocate_udp_ports_fail) gst_rtsp_transport_new (&tr); tr->lower_transport = GST_RTSP_LOWER_TRANS_UDP; fail_if (gst_rtsp_stream_allocate_udp_sockets (stream, G_SOCKET_FAMILY_IPV4, - tr, FALSE)); + tr, FALSE)); gst_rtsp_transport_free (tr); g_object_unref (pool); @@ -258,8 +260,8 @@ GST_START_TEST (test_multicast_address_and_unicast_udp) gst_rtsp_transport_new (&tr); /* unicast udp */ tr->lower_transport = GST_RTSP_LOWER_TRANS_UDP; - fail_unless (gst_rtsp_stream_allocate_udp_sockets (stream, G_SOCKET_FAMILY_IPV4, - tr, FALSE)); + fail_unless (gst_rtsp_stream_allocate_udp_sockets (stream, + G_SOCKET_FAMILY_IPV4, tr, FALSE)); gst_rtsp_transport_free (tr); g_object_unref (pool); @@ -309,8 +311,8 @@ GST_START_TEST (test_allocate_udp_ports_multicast) /* allocate udp multicast ports for IPv4 */ gst_rtsp_transport_new (&tr); tr->lower_transport = GST_RTSP_LOWER_TRANS_UDP_MCAST; - fail_unless (gst_rtsp_stream_allocate_udp_sockets (stream, G_SOCKET_FAMILY_IPV4, - tr, FALSE)); + fail_unless (gst_rtsp_stream_allocate_udp_sockets (stream, + G_SOCKET_FAMILY_IPV4, tr, FALSE)); /* check the multicast address and ports for IPv4 */ addr = gst_rtsp_stream_get_multicast_address (stream, G_SOCKET_FAMILY_IPV4); @@ -320,9 +322,9 @@ GST_START_TEST (test_allocate_udp_ports_multicast) fail_unless_equals_int (addr->n_ports, 2); gst_rtsp_address_free (addr); - /* allocate upd multicast ports for IPv6 */ - fail_unless (gst_rtsp_stream_allocate_udp_sockets (stream, G_SOCKET_FAMILY_IPV6, - tr, FALSE)); + /* allocate udp multicast ports for IPv6 */ + fail_unless (gst_rtsp_stream_allocate_udp_sockets (stream, + G_SOCKET_FAMILY_IPV6, tr, FALSE)); /* check the multicast address and ports for IPv6 */ addr = gst_rtsp_stream_get_multicast_address (stream, G_SOCKET_FAMILY_IPV6); @@ -388,8 +390,8 @@ GST_START_TEST (test_allocate_udp_ports_client_settings) tr->port.min = 6002; tr->port.max = 6003; tr->lower_transport = GST_RTSP_LOWER_TRANS_UDP_MCAST; - fail_unless (gst_rtsp_stream_allocate_udp_sockets (stream, G_SOCKET_FAMILY_IPV4, - tr, FALSE)); + fail_unless (gst_rtsp_stream_allocate_udp_sockets (stream, + G_SOCKET_FAMILY_IPV4, tr, FALSE)); /* verify that the multicast address and ports correspond to the requested client * transport information for IPv4 */ @@ -405,8 +407,8 @@ GST_START_TEST (test_allocate_udp_ports_client_settings) tr->destination = g_strdup ("FF11:DB8::1"); tr->port.min = 6006; tr->port.max = 6007; - fail_unless (gst_rtsp_stream_allocate_udp_sockets (stream, G_SOCKET_FAMILY_IPV6, - tr, FALSE)); + fail_unless (gst_rtsp_stream_allocate_udp_sockets (stream, + G_SOCKET_FAMILY_IPV6, tr, FALSE)); /* verify that the multicast address and ports correspond to the requested client * transport information for IPv6 */