rtsp-media: Handle set state when preparing.

Handle the situation when  a call to gst_rtsp_media_set_state is done
when media status is preparing.

Also add unit test for this scenario.

The unit test simulate on a media level when two clients share a (live)
media.
Both clients have done SETUP and got responses. Now client 1 is doing
play and client 2 is just closing the connection.

Then without patch there are a problem when
client1 is calling gst_rtsp_media_unsuspend in handle_play_request.
And client2 is doing closing connection we can end up in a call
to gst_rtsp_media_set_state when
priv->status == GST_RTSP_MEDIA_STATUS_PREPARING and all the logic for
shut down media is jumped over .

With this patch and this scenario we wait until
priv->status == GST_RTSP_MEDIA_STATUS_PREPARED and then continue to
execute after that and now we will execute the logic for
shut down media.
This commit is contained in:
Göran Jönsson 2019-03-14 07:37:26 +01:00
parent f0a9861186
commit 1fd49d36d1
2 changed files with 175 additions and 0 deletions

View file

@ -4376,6 +4376,13 @@ gst_rtsp_media_set_state (GstRTSPMedia * media, GstState state,
priv = media->priv;
g_rec_mutex_lock (&priv->state_lock);
if (priv->status == GST_RTSP_MEDIA_STATUS_PREPARING
&& gst_rtsp_media_is_shared (media)) {
g_rec_mutex_unlock (&priv->state_lock);
gst_rtsp_media_get_status (media);
g_rec_mutex_lock (&priv->state_lock);
}
if (priv->status == GST_RTSP_MEDIA_STATUS_ERROR)
goto error_status;
if (priv->status != GST_RTSP_MEDIA_STATUS_PREPARED &&

View file

@ -402,6 +402,173 @@ GST_START_TEST (test_media_prepare)
GST_END_TEST;
enum _SyncState
{
SYNC_STATE_INIT,
SYNC_STATE_1,
SYNC_STATE_2,
SYNC_STATE_RACE
};
typedef enum _SyncState SyncState;
struct _help_thread_data
{
GstRTSPThreadPool *pool;
GstRTSPMedia *media;
GstRTSPTransport *transport;
GstRTSPStream *stream;
SyncState *state;
GMutex *sync_mutex;
GCond *sync_cond;
};
typedef struct _help_thread_data help_thread_data;
static gpointer
help_thread_main (gpointer user_data)
{
help_thread_data *data;
GstRTSPThread *thread;
GPtrArray *transports;
GstRTSPStreamTransport *stream_transport;
data = (help_thread_data *) user_data;
GST_INFO ("Another thread sharing media");
/* wait SYNC_STATE_1 */
g_mutex_lock (data->sync_mutex);
while (*data->state < SYNC_STATE_1)
g_cond_wait (data->sync_cond, data->sync_mutex);
g_mutex_unlock (data->sync_mutex);
/* prepare */
thread = gst_rtsp_thread_pool_get_thread (data->pool,
GST_RTSP_THREAD_TYPE_MEDIA, NULL);
fail_unless (gst_rtsp_media_prepare (data->media, thread));
/* set SYNC_STATE_2 */
g_mutex_lock (data->sync_mutex);
*data->state = SYNC_STATE_2;
g_cond_signal (data->sync_cond);
g_mutex_unlock (data->sync_mutex);
/* wait SYNC_STATE_RACE */
g_mutex_lock (data->sync_mutex);
while (*data->state < SYNC_STATE_RACE)
g_cond_wait (data->sync_cond, data->sync_mutex);
g_mutex_unlock (data->sync_mutex);
/* set state */
transports = g_ptr_array_new_with_free_func (g_object_unref);
fail_unless (transports != NULL);
stream_transport =
gst_rtsp_stream_transport_new (data->stream, data->transport);
fail_unless (stream_transport != NULL);
g_ptr_array_add (transports, stream_transport);
fail_unless (gst_rtsp_media_set_state (data->media, GST_STATE_NULL,
transports));
/* clean up */
GST_INFO ("Thread exit");
fail_unless (gst_rtsp_media_unprepare (data->media));
g_ptr_array_unref (transports);
return NULL;
}
GST_START_TEST (test_media_shared_race_test_unsuspend_vs_set_state_null)
{
help_thread_data data;
GstRTSPMediaFactory *factory;
GstRTSPMedia *media;
GstRTSPUrl *url;
GstRTSPThreadPool *pool;
GstRTSPThread *thread;
GThread *sharing_media_thread;
GstRTSPTransport *transport;
GstRTSPStream *stream;
SyncState state = SYNC_STATE_INIT;
GMutex sync_mutex;
GCond sync_cond;
g_mutex_init (&sync_mutex);
g_cond_init (&sync_cond);
pool = gst_rtsp_thread_pool_new ();
/* test non-reusable media first */
factory = gst_rtsp_media_factory_new ();
gst_rtsp_media_factory_set_shared (factory, TRUE);
fail_unless (gst_rtsp_url_parse ("rtsp://localhost:8554/test",
&url) == GST_RTSP_OK);
gst_rtsp_media_factory_set_launch (factory,
"( videotestsrc ! rtpvrawpay pt=96 name=pay0 )");
media = gst_rtsp_media_factory_construct (factory, url);
fail_unless (GST_IS_RTSP_MEDIA (media));
fail_unless (gst_rtsp_media_n_streams (media) == 1);
gst_rtsp_media_set_suspend_mode (media, GST_RTSP_SUSPEND_MODE_RESET);
stream = gst_rtsp_media_get_stream (media, 0);
fail_unless (stream != NULL);
thread = gst_rtsp_thread_pool_get_thread (pool,
GST_RTSP_THREAD_TYPE_MEDIA, NULL);
fail_unless (gst_rtsp_media_prepare (media, thread));
/* help thread */
data.pool = pool;
data.media = media;
data.stream = stream;
data.state = &state;
data.sync_mutex = &sync_mutex;
data.sync_cond = &sync_cond;
sharing_media_thread = g_thread_new ("new thread", help_thread_main, &data);
fail_unless (sharing_media_thread != NULL);
/* set state SYNC_STATE_1 */
g_mutex_lock (&sync_mutex);
state = SYNC_STATE_1;
g_cond_signal (&sync_cond);
g_mutex_unlock (&sync_mutex);
/* wait SYNC_STATE_2 */
g_mutex_lock (&sync_mutex);
while (state < SYNC_STATE_2)
g_cond_wait (&sync_cond, &sync_mutex);
g_mutex_unlock (&sync_mutex);
gst_rtsp_media_suspend (media);
fail_unless (gst_rtsp_transport_new (&transport) == GST_RTSP_OK);
transport->lower_transport = GST_RTSP_LOWER_TRANS_TCP;
fail_unless (gst_rtsp_stream_complete_stream (stream, transport));
data.transport = transport;
/* set state SYNC_STATE_RACE let the race begin unsuspend <-> set state GST_STATE_NULL */
g_mutex_lock (&sync_mutex);
state = SYNC_STATE_RACE;
g_cond_signal (&sync_cond);
g_mutex_unlock (&sync_mutex);
fail_unless (gst_rtsp_media_unsuspend (media));
/* sync end of other thread */
g_thread_join (sharing_media_thread);
/* clean up */
g_cond_clear (&sync_cond);
g_mutex_clear (&sync_mutex);
fail_unless (gst_rtsp_media_unprepare (media));
g_object_unref (media);
gst_rtsp_url_free (url);
g_object_unref (factory);
g_object_unref (pool);
gst_rtsp_thread_pool_cleanup ();
}
GST_END_TEST;
#define FLAG_HAVE_CAPS GST_ELEMENT_FLAG_LAST
static void
on_notify_caps (GstPad * pad, GParamSpec * pspec, GstElement * pay)
@ -663,6 +830,7 @@ rtspmedia_suite (void)
tcase_add_test (tc, test_media_seek_one_active_stream);
tcase_add_test (tc, test_media);
tcase_add_test (tc, test_media_prepare);
tcase_add_test (tc, test_media_shared_race_test_unsuspend_vs_set_state_null);
tcase_add_test (tc, test_media_reusable);
tcase_add_test (tc, test_media_dyn_prepare);
tcase_add_test (tc, test_media_take_pipeline);