Dynamically reconfigure pipeline in PLAY based on transports

The initial pipeline does not contain specific transport
elements. The receiver and the sender parts are added
after PLAY.
If the media is shared, the streams are dynamically
reconfigured after each PLAY.

https://bugzilla.gnome.org/show_bug.cgi?id=788340
This commit is contained in:
Patricia Muscalu 2017-10-17 10:44:33 +02:00 committed by Sebastian Dröge
parent 930a602e17
commit a7732a68e8
9 changed files with 1341 additions and 344 deletions

View file

@ -1518,6 +1518,7 @@ handle_play_request (GstRTSPClient * client, GstRTSPContext * ctx)
gint matched;
gchar *seek_style = NULL;
GstRTSPStatusCode sig_result;
GPtrArray *transports;
if (!(session = ctx->session))
goto no_session;
@ -1556,6 +1557,14 @@ handle_play_request (GstRTSPClient * client, GstRTSPContext * ctx)
if (rtspstate != GST_RTSP_STATE_PLAYING && rtspstate != GST_RTSP_STATE_READY)
goto invalid_state;
/* update the pipeline */
transports = gst_rtsp_session_media_get_transports (sessmedia);
if (!gst_rtsp_media_complete_pipeline (media, transports)) {
g_ptr_array_unref (transports);
goto pipeline_error;
}
g_ptr_array_unref (transports);
/* in play we first unsuspend, media could be suspended from SDP or PAUSED */
if (!gst_rtsp_media_unsuspend (media))
goto unsuspend_failed;
@ -1666,6 +1675,13 @@ invalid_state:
ctx);
return FALSE;
}
pipeline_error:
{
GST_ERROR ("client %p: failed to configure the pipeline", client);
send_generic_response (client, GST_RTSP_STS_METHOD_NOT_VALID_IN_THIS_STATE,
ctx);
return FALSE;
}
unsuspend_failed:
{
GST_ERROR ("client %p: unsuspend failed", client);
@ -1784,39 +1800,52 @@ default_configure_client_transport (GstRTSPClient * client,
GstRTSPClientPrivate *priv = client->priv;
/* we have a valid transport now, set the destination of the client. */
if (ct->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
gboolean use_client_settings;
if (ct->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST ||
ct->lower_transport == GST_RTSP_LOWER_TRANS_UDP) {
use_client_settings =
gst_rtsp_auth_check (GST_RTSP_AUTH_CHECK_TRANSPORT_CLIENT_SETTINGS);
/* allocate UDP ports */
GSocketFamily family;
gboolean use_client_settings = FALSE;
if (ct->destination && use_client_settings) {
GstRTSPAddress *addr;
family = priv->is_ipv6 ? G_SOCKET_FAMILY_IPV6 : G_SOCKET_FAMILY_IPV4;
if ((ct->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) &&
gst_rtsp_auth_check (GST_RTSP_AUTH_CHECK_TRANSPORT_CLIENT_SETTINGS) &&
(ct->destination != NULL))
use_client_settings = TRUE;
addr = gst_rtsp_stream_reserve_address (ctx->stream, ct->destination,
ct->port.min, ct->port.max - ct->port.min + 1, ct->ttl);
if (!gst_rtsp_stream_allocate_udp_sockets (ctx->stream, family, ct,
use_client_settings))
goto error_allocating_ports;
if (addr == NULL)
goto no_address;
if (ct->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
GstRTSPAddress *addr = NULL;
gst_rtsp_address_free (addr);
if (use_client_settings) {
/* the address has been successfully allocated, let's check if it's
* the one requested by the client */
addr = gst_rtsp_stream_reserve_address (ctx->stream, ct->destination,
ct->port.min, ct->port.max - ct->port.min + 1, ct->ttl);
if (addr == NULL)
goto no_address;
} else {
g_free (ct->destination);
addr = gst_rtsp_stream_get_multicast_address (ctx->stream, family);
if (addr == NULL)
goto no_address;
ct->destination = g_strdup (addr->address);
ct->port.min = addr->port;
ct->port.max = addr->port + addr->n_ports - 1;
ct->ttl = addr->ttl;
gst_rtsp_address_free (addr);
}
} else {
GstRTSPAddress *addr;
GSocketFamily family;
family = priv->is_ipv6 ? G_SOCKET_FAMILY_IPV6 : G_SOCKET_FAMILY_IPV4;
addr = gst_rtsp_stream_get_multicast_address (ctx->stream, family);
if (addr == NULL)
goto no_address;
GstRTSPUrl *url;
url = gst_rtsp_connection_get_url (priv->connection);
g_free (ct->destination);
ct->destination = g_strdup (addr->address);
ct->port.min = addr->port;
ct->port.max = addr->port + addr->n_ports - 1;
ct->ttl = addr->ttl;
gst_rtsp_address_free (addr);
ct->destination = g_strdup (url->host);
}
} else {
GstRTSPUrl *url;
@ -1863,9 +1892,14 @@ default_configure_client_transport (GstRTSPClient * client,
return TRUE;
/* ERRORS */
error_allocating_ports:
{
GST_ERROR_OBJECT (client, "Failed to allocate UDP ports");
return FALSE;
}
no_address:
{
GST_ERROR_OBJECT (client, "failed to acquire address for stream");
GST_ERROR_OBJECT (client, "Failed to acquire address for stream");
return FALSE;
}
}
@ -3036,6 +3070,7 @@ handle_record_request (GstRTSPClient * client, GstRTSPContext * ctx)
gchar *path;
gint matched;
GstRTSPStatusCode sig_result;
GPtrArray *transports;
if (!(session = ctx->session))
goto no_session;
@ -3074,7 +3109,15 @@ handle_record_request (GstRTSPClient * client, GstRTSPContext * ctx)
if (rtspstate != GST_RTSP_STATE_PLAYING && rtspstate != GST_RTSP_STATE_READY)
goto invalid_state;
/* in play we first unsuspend, media could be suspended from SDP or PAUSED */
/* update the pipeline */
transports = gst_rtsp_session_media_get_transports (sessmedia);
if (!gst_rtsp_media_complete_pipeline (media, transports)) {
g_ptr_array_unref (transports);
goto pipeline_error;
}
g_ptr_array_unref (transports);
/* in record we first unsuspend, media could be suspended from SDP or PAUSED */
if (!gst_rtsp_media_unsuspend (media))
goto unsuspend_failed;
@ -3140,6 +3183,13 @@ invalid_state:
ctx);
return FALSE;
}
pipeline_error:
{
GST_ERROR ("client %p: failed to configure the pipeline", client);
send_generic_response (client, GST_RTSP_STS_METHOD_NOT_VALID_IN_THIS_STATE,
ctx);
return FALSE;
}
unsuspend_failed:
{
GST_ERROR ("client %p: unsuspend failed", client);

View file

@ -2114,6 +2114,21 @@ media_streams_set_blocked (GstRTSPMedia * media, gboolean blocked)
g_ptr_array_foreach (priv->streams, (GFunc) stream_update_blocked, media);
}
static void
stream_unblock (GstRTSPStream * stream, GstRTSPMedia * media)
{
gst_rtsp_stream_unblock_linked (stream);
}
static void
media_unblock_linked (GstRTSPMedia * media)
{
GstRTSPMediaPrivate *priv = media->priv;
GST_DEBUG ("media %p unblocking linked streams", media);
g_ptr_array_foreach (priv->streams, (GFunc) stream_unblock, media);
}
static void
gst_rtsp_media_set_status (GstRTSPMedia * media, GstRTSPMediaStatus status)
{
@ -2526,8 +2541,6 @@ default_handle_message (GstRTSPMedia * media, GstMessage * message)
GST_INFO ("%p: ignoring ASYNC_DONE", media);
} else {
GST_INFO ("%p: got ASYNC_DONE", media);
collect_media_stats (media);
if (priv->status == GST_RTSP_MEDIA_STATUS_PREPARING)
gst_rtsp_media_set_status (media, GST_RTSP_MEDIA_STATUS_PREPARED);
}
@ -2642,6 +2655,9 @@ pad_added_cb (GstElement * element, GstPad * pad, GstRTSPMedia * media)
GST_WARNING ("failed to join bin element");
}
if (priv->blocked)
gst_rtsp_stream_set_blocked (stream, TRUE);
priv->adding = FALSE;
g_rec_mutex_unlock (&priv->state_lock);
@ -2720,7 +2736,9 @@ start_preroll (GstRTSPMedia * media)
GstStateChangeReturn ret;
GST_INFO ("setting pipeline to PAUSED for media %p", media);
/* first go to PAUSED */
/* start blocked since it is possible that there are no sink elements yet */
media_streams_set_blocked (media, TRUE);
ret = set_target_state (media, GST_STATE_PAUSED, TRUE);
switch (ret) {
@ -2737,10 +2755,7 @@ start_preroll (GstRTSPMedia * media)
* seeking query in preroll instead */
priv->seekable = -1;
priv->is_live = TRUE;
if (!(priv->transport_mode & GST_RTSP_TRANSPORT_MODE_RECORD)) {
/* start blocked to make sure nothing goes to the sink */
media_streams_set_blocked (media, TRUE);
}
ret = set_state (media, GST_STATE_PLAYING);
if (ret == GST_STATE_CHANGE_FAILURE)
goto state_failed;
@ -3100,6 +3115,8 @@ finish_unprepare (GstRTSPMedia * media)
set_state (media, GST_STATE_NULL);
g_rec_mutex_lock (&priv->state_lock);
media_streams_set_blocked (media, FALSE);
if (priv->status != GST_RTSP_MEDIA_STATUS_UNPREPARING)
return;
@ -3209,8 +3226,6 @@ gst_rtsp_media_unprepare (GstRTSPMedia * media)
goto is_busy;
GST_INFO ("unprepare media %p", media);
if (priv->blocked)
media_streams_set_blocked (media, FALSE);
set_target_state (media, GST_STATE_NULL, FALSE);
success = TRUE;
@ -3563,7 +3578,6 @@ default_suspend (GstRTSPMedia * media)
{
GstRTSPMediaPrivate *priv = media->priv;
GstStateChangeReturn ret;
gboolean unblock = FALSE;
switch (priv->suspend_mode) {
case GST_RTSP_SUSPEND_MODE_NONE:
@ -3574,7 +3588,6 @@ default_suspend (GstRTSPMedia * media)
ret = set_target_state (media, GST_STATE_PAUSED, TRUE);
if (ret == GST_STATE_CHANGE_FAILURE)
goto state_failed;
unblock = TRUE;
break;
case GST_RTSP_SUSPEND_MODE_RESET:
GST_DEBUG ("media %p suspend to NULL", media);
@ -3587,16 +3600,11 @@ default_suspend (GstRTSPMedia * media)
* is actually from NULL to PLAY will create a new sequence
* number. */
g_ptr_array_foreach (priv->streams, (GFunc) do_set_seqnum, NULL);
unblock = TRUE;
break;
default:
break;
}
/* let the streams do the state changes freely, if any */
if (unblock)
media_streams_set_blocked (media, FALSE);
return TRUE;
/* ERRORS */
@ -3674,7 +3682,19 @@ default_unsuspend (GstRTSPMedia * media)
switch (priv->suspend_mode) {
case GST_RTSP_SUSPEND_MODE_NONE:
gst_rtsp_media_set_status (media, GST_RTSP_MEDIA_STATUS_PREPARED);
if ((priv->transport_mode & GST_RTSP_TRANSPORT_MODE_RECORD))
break;
gst_rtsp_media_set_status (media, GST_RTSP_MEDIA_STATUS_PREPARING);
/* at this point the media pipeline has been updated and contain all
* specific transport parts: all active streams contain at least one sink
* element and it's safe to unblock any blocked streams that are active */
media_unblock_linked (media);
g_rec_mutex_unlock (&priv->state_lock);
if (gst_rtsp_media_get_status (media) == GST_RTSP_MEDIA_STATUS_ERROR) {
g_rec_mutex_lock (&priv->state_lock);
goto preroll_failed;
}
g_rec_mutex_lock (&priv->state_lock);
break;
case GST_RTSP_SUSPEND_MODE_PAUSE:
gst_rtsp_media_set_status (media, GST_RTSP_MEDIA_STATUS_PREPARED);
@ -3682,6 +3702,10 @@ default_unsuspend (GstRTSPMedia * media)
case GST_RTSP_SUSPEND_MODE_RESET:
{
gst_rtsp_media_set_status (media, GST_RTSP_MEDIA_STATUS_PREPARING);
/* at this point the media pipeline has been updated and contain all
* specific transport parts: all active streams contain at least one sink
* element and it's safe to unblock any blocked streams that are active */
media_unblock_linked (media);
if (!start_preroll (media))
goto start_failed;
@ -3771,7 +3795,7 @@ media_set_pipeline_state_locked (GstRTSPMedia * media, GstState state)
} else {
if (state == GST_STATE_PLAYING)
/* make sure pads are not blocking anymore when going to PLAYING */
media_streams_set_blocked (media, FALSE);
media_unblock_linked (media);
set_state (media, state);
@ -4007,3 +4031,52 @@ gst_rtsp_media_seekable (GstRTSPMedia * media)
* and no stream is seekable only to the beginning */
return media->priv->seekable;
}
/**
* gst_rtsp_media_complete_pipeline:
* @media: a #GstRTSPMedia
* @transports: a list of #GstRTSPTransport
*
* Add a receiver and sender parts to the pipeline based on the transport from
* SETUP.
*
* Returns: %TRUE if the media pipeline has been sucessfully updated.
*/
gboolean
gst_rtsp_media_complete_pipeline (GstRTSPMedia * media, GPtrArray * transports)
{
GstRTSPMediaPrivate *priv;
guint i;
g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
g_return_val_if_fail (transports, FALSE);
GST_DEBUG_OBJECT (media, "complete pipeline");
priv = media->priv;
g_mutex_lock (&priv->lock);
for (i = 0; i < priv->streams->len; i++) {
GstRTSPStreamTransport *transport;
GstRTSPStream *stream;
const GstRTSPTransport *rtsp_transport;
transport = g_ptr_array_index (transports, i);
if (!transport)
continue;
stream = gst_rtsp_stream_transport_get_stream (transport);
if (!stream)
continue;
rtsp_transport = gst_rtsp_stream_transport_get_transport (transport);
if (!gst_rtsp_stream_complete_stream (stream, rtsp_transport)) {
g_mutex_unlock (&priv->lock);
return FALSE;
}
}
g_mutex_unlock (&priv->lock);
return TRUE;
}

View file

@ -382,6 +382,9 @@ GST_EXPORT
void gst_rtsp_media_set_pipeline_state (GstRTSPMedia * media,
GstState state);
GST_EXPORT
gboolean gst_rtsp_media_complete_pipeline (GstRTSPMedia * media, GPtrArray * transports);
#ifdef G_DEFINE_AUTOPTR_CLEANUP_FUNC
G_DEFINE_AUTOPTR_CLEANUP_FUNC(GstRTSPMedia, gst_object_unref)
#endif

File diff suppressed because it is too large Load diff

View file

@ -160,6 +160,10 @@ gboolean gst_rtsp_stream_set_blocked (GstRTSPStream * stream,
GST_EXPORT
gboolean gst_rtsp_stream_is_blocking (GstRTSPStream * stream);
GST_EXPORT
gboolean gst_rtsp_stream_unblock_linked (GstRTSPStream * stream);
GST_EXPORT
void gst_rtsp_stream_set_client_side (GstRTSPStream *stream, gboolean client_side);
@ -272,7 +276,7 @@ GstElement * gst_rtsp_stream_request_aux_sender (GstRTSPStream * st
GST_EXPORT
gboolean gst_rtsp_stream_allocate_udp_sockets (GstRTSPStream * stream, GSocketFamily family,
GstRTSPTransport *transport, gboolean use_client_setttings);
GstRTSPTransport *transport, gboolean use_client_settings);
GST_EXPORT
void gst_rtsp_stream_set_publish_clock_mode (GstRTSPStream * stream, GstRTSPPublishClockMode mode);
@ -280,6 +284,9 @@ void gst_rtsp_stream_set_publish_clock_mode (GstRTSPStream *
GST_EXPORT
GstRTSPPublishClockMode gst_rtsp_stream_get_publish_clock_mode (GstRTSPStream * stream);
GST_EXPORT
gboolean gst_rtsp_stream_complete_stream (GstRTSPStream * stream, const GstRTSPTransport * transport);
/**
* GstRTSPStreamTransportFilterFunc:
* @stream: a #GstRTSPStream object

View file

@ -793,7 +793,6 @@ GST_START_TEST (test_client_multicast_invalid_transport_specific)
g_object_unref (session_pool);
/* simple SETUP with a valid URI and multicast, but an invalid prt */
fail_unless (gst_rtsp_message_init_request (&request, GST_RTSP_SETUP,
"rtsp://localhost/test/stream=0") == GST_RTSP_OK);
@ -813,7 +812,6 @@ GST_START_TEST (test_client_multicast_invalid_transport_specific)
g_object_unref (session_pool);
/* simple SETUP with a valid URI and multicast, but an invalid ttl */
fail_unless (gst_rtsp_message_init_request (&request, GST_RTSP_SETUP,
"rtsp://localhost/test/stream=0") == GST_RTSP_OK);
@ -832,7 +830,6 @@ GST_START_TEST (test_client_multicast_invalid_transport_specific)
fail_unless (gst_rtsp_session_pool_get_n_sessions (session_pool) == 0);
g_object_unref (session_pool);
teardown_client (client);
g_object_unref (ctx.auth);
gst_rtsp_token_unref (ctx.token);

View file

@ -21,7 +21,73 @@
#include <rtsp-media-factory.h>
GST_START_TEST (test_launch)
GST_START_TEST (test_media_seek)
{
GstRTSPMediaFactory *factory;
GstRTSPMedia *media;
GstRTSPUrl *url;
GstRTSPStream *stream;
GstRTSPTimeRange *range;
gchar *str;
GstRTSPThreadPool *pool;
GstRTSPThread *thread;
GstRTSPTransport *transport;
factory = gst_rtsp_media_factory_new ();
fail_if (gst_rtsp_media_factory_is_shared (factory));
fail_unless (gst_rtsp_url_parse ("rtsp://localhost:8554/test",
&url) == GST_RTSP_OK);
gst_rtsp_media_factory_set_launch (factory,
"( videotestsrc ! rtpvrawpay pt=96 name=pay0 )");
media = gst_rtsp_media_factory_construct (factory, url);
fail_unless (GST_IS_RTSP_MEDIA (media));
fail_unless (gst_rtsp_media_n_streams (media) == 1);
stream = gst_rtsp_media_get_stream (media, 0);
fail_unless (stream != NULL);
pool = gst_rtsp_thread_pool_new ();
thread = gst_rtsp_thread_pool_get_thread (pool,
GST_RTSP_THREAD_TYPE_MEDIA, NULL);
fail_unless (gst_rtsp_media_prepare (media, thread));
/* define transport */
fail_unless (gst_rtsp_transport_new (&transport) == GST_RTSP_OK);
transport->lower_transport = GST_RTSP_LOWER_TRANS_TCP;
fail_unless (gst_rtsp_stream_complete_stream (stream, transport));
fail_unless (gst_rtsp_transport_free (transport) == GST_RTSP_OK);
fail_unless (gst_rtsp_range_parse ("npt=5.0-", &range) == GST_RTSP_OK);
/* the media is seekable now */
fail_unless (gst_rtsp_media_seek (media, range));
str = gst_rtsp_media_get_range_string (media, FALSE, GST_RTSP_RANGE_NPT);
fail_unless (g_str_equal (str, "npt=5-"));
gst_rtsp_range_free (range);
g_free (str);
fail_unless (gst_rtsp_media_unprepare (media));
g_object_unref (media);
gst_rtsp_url_free (url);
g_object_unref (factory);
g_object_unref (pool);
gst_rtsp_thread_pool_cleanup ();
}
GST_END_TEST;
GST_START_TEST (test_media_seek_no_sinks)
{
GstRTSPMediaFactory *factory;
GstRTSPMedia *media;
@ -70,15 +136,8 @@ GST_START_TEST (test_launch)
fail_unless (g_str_equal (str, "npt=0-"));
g_free (str);
fail_unless (gst_rtsp_media_seek (media, range));
str = gst_rtsp_media_get_range_string (media, FALSE, GST_RTSP_RANGE_NPT);
fail_unless (g_str_equal (str, "npt=5-"));
g_free (str);
str = gst_rtsp_media_get_range_string (media, TRUE, GST_RTSP_RANGE_NPT);
fail_unless (g_str_equal (str, "npt=5-"));
g_free (str);
/* fails, need to be prepared and contain sink elements */
fail_if (gst_rtsp_media_seek (media, range));
fail_unless (gst_rtsp_media_unprepare (media));
@ -126,12 +185,13 @@ GST_START_TEST (test_media)
GST_END_TEST;
static void
test_prepare_reusable (GstRTSPThreadPool * pool, const gchar * launch_line)
test_prepare_reusable (const gchar * launch_line)
{
GstRTSPMediaFactory *factory;
GstRTSPMedia *media;
GstRTSPUrl *url;
GstRTSPThread *thread;
GstRTSPThreadPool *pool;
factory = gst_rtsp_media_factory_new ();
fail_if (gst_rtsp_media_factory_is_shared (factory));
@ -146,6 +206,7 @@ test_prepare_reusable (GstRTSPThreadPool * pool, const gchar * launch_line)
g_object_set (G_OBJECT (media), "reusable", TRUE, NULL);
pool = gst_rtsp_thread_pool_new ();
thread = gst_rtsp_thread_pool_get_thread (pool,
GST_RTSP_THREAD_TYPE_MEDIA, NULL);
fail_unless (gst_rtsp_media_prepare (media, thread));
@ -162,6 +223,17 @@ test_prepare_reusable (GstRTSPThreadPool * pool, const gchar * launch_line)
g_object_unref (factory);
}
GST_START_TEST (test_media_reusable)
{
/* test reusable media */
test_prepare_reusable ("( videotestsrc ! rtpvrawpay pt=96 name=pay0 )");
test_prepare_reusable (
"( videotestsrc is-live=true ! rtpvrawpay pt=96 name=pay0 )");
}
GST_END_TEST;
GST_START_TEST (test_media_prepare)
{
GstRTSPMediaFactory *factory;
@ -199,11 +271,6 @@ GST_START_TEST (test_media_prepare)
gst_rtsp_url_free (url);
g_object_unref (factory);
/* test reusable media */
test_prepare_reusable (pool, "( videotestsrc ! rtpvrawpay pt=96 name=pay0 )");
test_prepare_reusable (pool,
"( videotestsrc is-live=true ! rtpvrawpay pt=96 name=pay0 )");
g_object_unref (pool);
gst_rtsp_thread_pool_cleanup ();
}
@ -454,9 +521,11 @@ rtspmedia_suite (void)
suite_add_tcase (s, tc);
tcase_set_timeout (tc, 20);
tcase_add_test (tc, test_launch);
tcase_add_test (tc, test_media_seek);
tcase_add_test (tc, test_media_seek_no_sinks);
tcase_add_test (tc, test_media);
tcase_add_test (tc, test_media_prepare);
tcase_add_test (tc, test_media_reusable);
tcase_add_test (tc, test_media_dyn_prepare);
tcase_add_test (tc, test_media_take_pipeline);
tcase_add_test (tc, test_media_reset);

View file

@ -1058,6 +1058,74 @@ done:
gst_buffer_unref (buffer);
}
static void
do_test_play_tcp_full (const gchar * range)
{
GstRTSPConnection *conn;
GstSDPMessage *sdp_message = NULL;
const GstSDPMedia *sdp_media;
const gchar *video_control;
const gchar *audio_control;
GstRTSPRange client_port;
gchar *session = NULL;
GstRTSPTransport *video_transport = NULL;
GstRTSPTransport *audio_transport = NULL;
gchar *range_out = NULL;
GstRTSPLowerTrans lower_transport = GST_RTSP_LOWER_TRANS_TCP;
conn = connect_to_server (test_port, TEST_MOUNT_POINT);
sdp_message = do_describe (conn, TEST_MOUNT_POINT);
get_client_ports (&client_port);
/* get control strings from DESCRIBE response */
fail_unless (gst_sdp_message_medias_len (sdp_message) == 2);
sdp_media = gst_sdp_message_get_media (sdp_message, 0);
video_control = gst_sdp_media_get_attribute_val (sdp_media, "control");
sdp_media = gst_sdp_message_get_media (sdp_message, 1);
audio_control = gst_sdp_media_get_attribute_val (sdp_media, "control");
/* do SETUP for video and audio */
fail_unless (do_setup_full (conn, video_control, lower_transport,
&client_port, NULL, &session, &video_transport,
NULL) == GST_RTSP_STS_OK);
fail_unless (do_setup_full (conn, audio_control, lower_transport,
&client_port, NULL, &session, &audio_transport,
NULL) == GST_RTSP_STS_OK);
/* send PLAY request and check that we get 200 OK */
fail_unless (do_request (conn, GST_RTSP_PLAY, NULL, session, NULL, range,
NULL, NULL, NULL, NULL, NULL, &range_out) == GST_RTSP_STS_OK);
if (range)
fail_unless_equals_string (range, range_out);
g_free (range_out);
{
GstRTSPMessage *message;
fail_unless (gst_rtsp_message_new (&message) == GST_RTSP_OK);
fail_unless (gst_rtsp_connection_receive (conn, message, NULL) == GST_RTSP_OK);
fail_unless (gst_rtsp_message_get_type (message) == GST_RTSP_MESSAGE_DATA);
gst_rtsp_message_free (message);
}
/* send TEARDOWN request and check that we get 200 OK */
fail_unless (do_simple_request (conn, GST_RTSP_TEARDOWN,
session) == GST_RTSP_STS_OK);
/* FIXME: The rtsp-server always disconnects the transport before
* sending the RTCP BYE
* receive_rtcp (rtcp_socket, NULL, GST_RTCP_TYPE_BYE);
*/
/* clean up and iterate so the clean-up can finish */
g_free (session);
gst_rtsp_transport_free (video_transport);
gst_rtsp_transport_free (audio_transport);
gst_sdp_message_free (sdp_message);
gst_rtsp_connection_free (conn);
}
static void
do_test_play_full (const gchar * range, GstRTSPLowerTrans lower_transport,
GMutex * lock)
@ -1579,6 +1647,70 @@ GST_START_TEST (test_no_session_timeout)
GST_END_TEST;
/* media contains two streams: video and audio but only one
* stream is requested */
GST_START_TEST (test_play_one_active_stream)
{
GstRTSPConnection *conn;
GstSDPMessage *sdp_message = NULL;
const GstSDPMedia *sdp_media;
const gchar *video_control;
GstRTSPRange client_port;
gchar *session = NULL;
GstRTSPTransport *video_transport = NULL;
GstRTSPSessionPool *pool;
GstRTSPThreadPool *thread_pool;
thread_pool = gst_rtsp_server_get_thread_pool (server);
gst_rtsp_thread_pool_set_max_threads (thread_pool, 2);
g_object_unref (thread_pool);
pool = gst_rtsp_server_get_session_pool (server);
g_signal_connect (server, "client-connected",
G_CALLBACK (session_connected_new_session_cb), new_session_timeout_one);
start_server (FALSE);
conn = connect_to_server (test_port, TEST_MOUNT_POINT);
gst_rtsp_connection_set_remember_session_id (conn, FALSE);
sdp_message = do_describe (conn, TEST_MOUNT_POINT);
/* get control strings from DESCRIBE response */
fail_unless (gst_sdp_message_medias_len (sdp_message) == 2);
sdp_media = gst_sdp_message_get_media (sdp_message, 0);
video_control = gst_sdp_media_get_attribute_val (sdp_media, "control");
get_client_ports (&client_port);
/* do SETUP for video only */
fail_unless (do_setup (conn, video_control, &client_port, &session,
&video_transport) == GST_RTSP_STS_OK);
fail_unless (gst_rtsp_session_pool_get_n_sessions (pool) == 1);
/* send PLAY request and check that we get 200 OK */
fail_unless (do_simple_request (conn, GST_RTSP_PLAY,
session) == GST_RTSP_STS_OK);
/* send TEARDOWN request */
fail_unless (do_simple_request (conn, GST_RTSP_TEARDOWN,
session) == GST_RTSP_STS_OK);
/* clean up and iterate so the clean-up can finish */
g_object_unref (pool);
g_free (session);
gst_rtsp_transport_free (video_transport);
gst_sdp_message_free (sdp_message);
gst_rtsp_connection_free (conn);
stop_server ();
iterate ();
}
GST_END_TEST;
GST_START_TEST (test_play_disconnect)
{
@ -1772,6 +1904,22 @@ GST_START_TEST (test_play_smpte_range)
GST_END_TEST;
GST_START_TEST (test_play_smpte_range_tcp)
{
start_tcp_server ();
do_test_play_tcp_full ("npt=5-");
do_test_play_tcp_full ("smpte=0:00:00-");
do_test_play_tcp_full ("smpte=1:00:00-");
do_test_play_tcp_full ("smpte=1:00:03-");
do_test_play_tcp_full ("clock=20120321T152256Z-");
stop_server ();
iterate ();
}
GST_END_TEST;
static gpointer
thread_func (gpointer data)
{
@ -2112,6 +2260,113 @@ GST_START_TEST (test_record_tcp)
GST_END_TEST;
static void
do_test_multiple_transports (GstRTSPLowerTrans trans1, GstRTSPLowerTrans trans2)
{
GstRTSPConnection *conn1;
GstRTSPConnection *conn2;
GstSDPMessage *sdp_message1 = NULL;
GstSDPMessage *sdp_message2 = NULL;
const GstSDPMedia *sdp_media;
const gchar *video_control;
const gchar *audio_control;
GstRTSPRange client_port1, client_port2;
gchar *session1 = NULL;
gchar *session2 = NULL;
GstRTSPTransport *video_transport = NULL;
GstRTSPTransport *audio_transport = NULL;
GSocket *rtp_socket, *rtcp_socket;
conn1 = connect_to_server (test_port, TEST_MOUNT_POINT);
conn2 = connect_to_server (test_port, TEST_MOUNT_POINT);
sdp_message1 = do_describe (conn1, TEST_MOUNT_POINT);
get_client_ports_full (&client_port1, &rtp_socket, &rtcp_socket);
/* get control strings from DESCRIBE response */
sdp_media = gst_sdp_message_get_media (sdp_message1, 0);
video_control = gst_sdp_media_get_attribute_val (sdp_media, "control");
sdp_media = gst_sdp_message_get_media (sdp_message1, 1);
audio_control = gst_sdp_media_get_attribute_val (sdp_media, "control");
/* do SETUP for video and audio */
fail_unless (do_setup_full (conn1, video_control, trans1,
&client_port1, NULL, &session1, &video_transport,
NULL) == GST_RTSP_STS_OK);
fail_unless (do_setup_full (conn1, audio_control, trans1,
&client_port1, NULL, &session1, &audio_transport,
NULL) == GST_RTSP_STS_OK);
gst_rtsp_transport_free (video_transport);
gst_rtsp_transport_free (audio_transport);
sdp_message2 = do_describe (conn2, TEST_MOUNT_POINT);
/* get control strings from DESCRIBE response */
sdp_media = gst_sdp_message_get_media (sdp_message2, 0);
video_control = gst_sdp_media_get_attribute_val (sdp_media, "control");
sdp_media = gst_sdp_message_get_media (sdp_message2, 1);
audio_control = gst_sdp_media_get_attribute_val (sdp_media, "control");
get_client_ports_full (&client_port2, NULL, NULL);
/* do SETUP for video and audio */
fail_unless (do_setup_full (conn2, video_control, trans2,
&client_port2, NULL, &session2, &video_transport,
NULL) == GST_RTSP_STS_OK);
fail_unless (do_setup_full (conn2, audio_control, trans2,
&client_port2, NULL, &session2, &audio_transport,
NULL) == GST_RTSP_STS_OK);
/* send PLAY request and check that we get 200 OK */
fail_unless (do_request (conn1, GST_RTSP_PLAY, NULL, session1, NULL, NULL,
NULL, NULL, NULL, NULL, NULL, NULL) == GST_RTSP_STS_OK);
/* send PLAY request and check that we get 200 OK */
fail_unless (do_request (conn2, GST_RTSP_PLAY, NULL, session2, NULL, NULL,
NULL, NULL, NULL, NULL, NULL, NULL) == GST_RTSP_STS_OK);
/* receive UDP data */
receive_rtp (rtp_socket, NULL);
receive_rtcp (rtcp_socket, NULL, 0);
/* receive TCP data */
{
GstRTSPMessage *message;
fail_unless (gst_rtsp_message_new (&message) == GST_RTSP_OK);
fail_unless (gst_rtsp_connection_receive (conn2, message, NULL) == GST_RTSP_OK);
fail_unless (gst_rtsp_message_get_type (message) == GST_RTSP_MESSAGE_DATA);
gst_rtsp_message_free (message);
}
/* send TEARDOWN request and check that we get 200 OK */
fail_unless (do_simple_request (conn1, GST_RTSP_TEARDOWN,
session1) == GST_RTSP_STS_OK);
/* send TEARDOWN request and check that we get 200 OK */
fail_unless (do_simple_request (conn2, GST_RTSP_TEARDOWN,
session2) == GST_RTSP_STS_OK);
/* clean up and iterate so the clean-up can finish */
g_object_unref (rtp_socket);
g_object_unref (rtcp_socket);
g_free (session1);
g_free (session2);
gst_rtsp_transport_free (video_transport);
gst_rtsp_transport_free (audio_transport);
gst_sdp_message_free (sdp_message1);
gst_sdp_message_free (sdp_message2);
gst_rtsp_connection_free (conn1);
gst_rtsp_connection_free (conn2);
}
GST_START_TEST (test_multiple_transports)
{
start_server (TRUE);
do_test_multiple_transports (GST_RTSP_LOWER_TRANS_UDP, GST_RTSP_LOWER_TRANS_TCP);
stop_server ();
}
GST_END_TEST;
static Suite *
rtspserver_suite (void)
{
@ -2140,12 +2395,16 @@ rtspserver_suite (void)
tcase_add_test (tc, test_play_multithreaded_timeout_client);
tcase_add_test (tc, test_play_multithreaded_timeout_session);
tcase_add_test (tc, test_no_session_timeout);
tcase_add_test (tc, test_play_one_active_stream);
tcase_add_test (tc, test_play_disconnect);
tcase_add_test (tc, test_play_specific_server_port);
tcase_add_test (tc, test_play_smpte_range);
tcase_add_test (tc, test_play_smpte_range_tcp);
tcase_add_test (tc, test_shared);
tcase_add_test (tc, test_announce_without_sdp);
tcase_add_test (tc, test_record_tcp);
tcase_add_test (tc, test_multiple_transports);
return s;
}

View file

@ -22,7 +22,8 @@
#include <rtsp-stream.h>
#include <rtsp-address-pool.h>
GST_START_TEST (test_get_sockets)
static void
get_sockets (GstRTSPLowerTrans lower_transport, GSocketFamily socket_family)
{
GstPad *srcpad;
GstElement *pay;
@ -33,6 +34,7 @@ GST_START_TEST (test_get_sockets)
GSocket *socket;
gboolean have_ipv4;
gboolean have_ipv6;
GstRTSPTransport *transport;
srcpad = gst_pad_new ("testsrcpad", GST_PAD_SRC);
fail_unless (srcpad != NULL);
@ -57,18 +59,44 @@ GST_START_TEST (test_get_sockets)
fail_unless (gst_rtsp_address_pool_add_range (pool,
GST_RTSP_ADDRESS_POOL_ANY_IPV6, GST_RTSP_ADDRESS_POOL_ANY_IPV6, 50000,
60000, 0));
fail_unless (gst_rtsp_address_pool_add_range (pool, "233.252.0.0",
"233.252.0.0", 50000, 60000, 1));
fail_unless (gst_rtsp_address_pool_add_range (pool, "FF11:DB8::1",
"FF11:DB8::1", 50000, 60000, 1));
gst_rtsp_stream_set_address_pool (stream, pool);
fail_unless (gst_rtsp_stream_join_bin (stream, bin, rtpbin, GST_STATE_NULL));
socket = gst_rtsp_stream_get_rtp_socket (stream, G_SOCKET_FAMILY_IPV4);
/* allocate udp ports first */
fail_unless (gst_rtsp_transport_new (&transport) == GST_RTSP_OK);
transport->lower_transport = lower_transport;
/* no ports allocated, complete stream should fail */
fail_if (gst_rtsp_stream_complete_stream (stream, transport));
/* allocate ports */
fail_unless (gst_rtsp_stream_allocate_udp_sockets (stream,
socket_family, transport, FALSE));
fail_unless (gst_rtsp_stream_complete_stream (stream, transport));
fail_unless (gst_rtsp_transport_free (transport) == GST_RTSP_OK);
if (lower_transport == GST_RTSP_LOWER_TRANS_UDP)
socket = gst_rtsp_stream_get_rtp_socket (stream, G_SOCKET_FAMILY_IPV4);
else
socket = gst_rtsp_stream_get_rtp_multicast_socket (stream,
G_SOCKET_FAMILY_IPV4);
have_ipv4 = (socket != NULL);
if (have_ipv4) {
fail_unless (g_socket_get_fd (socket) >= 0);
g_object_unref (socket);
}
socket = gst_rtsp_stream_get_rtcp_socket (stream, G_SOCKET_FAMILY_IPV4);
if (lower_transport == GST_RTSP_LOWER_TRANS_UDP)
socket = gst_rtsp_stream_get_rtcp_socket (stream, G_SOCKET_FAMILY_IPV4);
else
socket = gst_rtsp_stream_get_rtcp_multicast_socket (stream,
G_SOCKET_FAMILY_IPV4);
if (have_ipv4) {
fail_unless (socket != NULL);
fail_unless (g_socket_get_fd (socket) >= 0);
@ -77,14 +105,22 @@ GST_START_TEST (test_get_sockets)
fail_unless (socket == NULL);
}
socket = gst_rtsp_stream_get_rtp_socket (stream, G_SOCKET_FAMILY_IPV6);
if (lower_transport == GST_RTSP_LOWER_TRANS_UDP)
socket = gst_rtsp_stream_get_rtp_socket (stream, G_SOCKET_FAMILY_IPV6);
else
socket = gst_rtsp_stream_get_rtp_multicast_socket (stream,
G_SOCKET_FAMILY_IPV6);
have_ipv6 = (socket != NULL);
if (have_ipv6) {
fail_unless (g_socket_get_fd (socket) >= 0);
g_object_unref (socket);
}
socket = gst_rtsp_stream_get_rtcp_socket (stream, G_SOCKET_FAMILY_IPV6);
if (lower_transport == GST_RTSP_LOWER_TRANS_UDP)
socket = gst_rtsp_stream_get_rtcp_socket (stream, G_SOCKET_FAMILY_IPV6);
else
socket = gst_rtsp_stream_get_rtcp_multicast_socket (stream,
G_SOCKET_FAMILY_IPV6);
if (have_ipv6) {
fail_unless (socket != NULL);
fail_unless (g_socket_get_fd (socket) >= 0);
@ -104,8 +140,25 @@ GST_START_TEST (test_get_sockets)
gst_object_unref (stream);
}
GST_START_TEST (test_get_sockets_udp)
{
get_sockets (GST_RTSP_LOWER_TRANS_UDP, G_SOCKET_FAMILY_IPV4);
get_sockets (GST_RTSP_LOWER_TRANS_UDP, G_SOCKET_FAMILY_IPV6);
}
GST_END_TEST;
GST_START_TEST (test_get_sockets_mcast)
{
get_sockets (GST_RTSP_LOWER_TRANS_UDP_MCAST, G_SOCKET_FAMILY_IPV4);
get_sockets (GST_RTSP_LOWER_TRANS_UDP_MCAST, G_SOCKET_FAMILY_IPV6);
}
GST_END_TEST;
/* The purpose of this test is to make sure that it's not possible to allocate
* multicast UDP ports if the address pool does not contain multicast UDP
* addresses. */
GST_START_TEST (test_allocate_udp_ports_fail)
{
GstPad *srcpad;
@ -114,6 +167,7 @@ GST_START_TEST (test_allocate_udp_ports_fail)
GstBin *bin;
GstElement *rtpbin;
GstRTSPAddressPool *pool;
GstRTSPTransport *transport;
srcpad = gst_pad_new ("testsrcpad", GST_PAD_SRC);
fail_unless (srcpad != NULL);
@ -135,7 +189,13 @@ GST_START_TEST (test_allocate_udp_ports_fail)
"192.168.1.1", 6000, 6001, 0));
gst_rtsp_stream_set_address_pool (stream, pool);
fail_if (gst_rtsp_stream_join_bin (stream, bin, rtpbin, GST_STATE_NULL));
fail_unless (gst_rtsp_stream_join_bin (stream, bin, rtpbin, GST_STATE_NULL));
fail_unless (gst_rtsp_transport_new (&transport) == GST_RTSP_OK);
transport->lower_transport = GST_RTSP_LOWER_TRANS_UDP_MCAST;
fail_if (gst_rtsp_stream_allocate_udp_sockets (stream, G_SOCKET_FAMILY_IPV4,
transport, FALSE));
fail_unless (gst_rtsp_transport_free (transport) == GST_RTSP_OK);
g_object_unref (pool);
fail_unless (gst_rtsp_stream_leave_bin (stream, bin, rtpbin));
@ -433,7 +493,8 @@ rtspstream_suite (void)
TCase *tc = tcase_create ("general");
suite_add_tcase (s, tc);
tcase_add_test (tc, test_get_sockets);
tcase_add_test (tc, test_get_sockets_udp);
tcase_add_test (tc, test_get_sockets_mcast);
tcase_add_test (tc, test_allocate_udp_ports_fail);
tcase_add_test (tc, test_get_multicast_address);
tcase_add_test (tc, test_multicast_address_and_unicast_udp);