rtsp-server: Limit the number of threads per server instance

If we exceed the maximum, just round robin the clients over the existing
threads.
This commit is contained in:
Olivier Crête 2013-02-19 13:19:41 -05:00 committed by Wim Taymans
parent 4071e1b999
commit 4c61c6d308
3 changed files with 92 additions and 26 deletions

View file

@ -106,7 +106,7 @@ dnl *** checks for library functions ***
dnl *** checks for dependancy libraries ***
dnl GLib is required (GStreamer is ok with GLib-2.8, but we want at least 2.10)
GLIB_REQ=2.10.0
GLIB_REQ=2.32.0
AC_SUBST([GLIB_REQ])
AG_GST_GLIB_CHECK([$GLIB_REQ])

View file

@ -53,6 +53,7 @@ struct _GstRTSPServerPrivate
/* the clients that are connected */
GList *clients;
GQueue loops; /* the main loops used in the threads */
};
#define DEFAULT_ADDRESS "0.0.0.0"
@ -93,6 +94,7 @@ GST_DEBUG_CATEGORY_STATIC (rtsp_server_debug);
#define GST_CAT_DEFAULT rtsp_server_debug
typedef struct _ClientContext ClientContext;
typedef struct _Loop Loop;
static guint gst_rtsp_server_signals[SIGNAL_LAST] = { 0 };
@ -102,7 +104,7 @@ static void gst_rtsp_server_set_property (GObject * object, guint propid,
const GValue * value, GParamSpec * pspec);
static void gst_rtsp_server_finalize (GObject * object);
static gpointer do_loop (ClientContext * ctx);
static gpointer do_loop (Loop * loop);
static GstRTSPClient *default_create_client (GstRTSPServer * server);
static gboolean default_accept_client (GstRTSPServer * server,
GstRTSPClient * client, GSocket * socket, GError ** error);
@ -231,6 +233,7 @@ gst_rtsp_server_init (GstRTSPServer * server)
priv->session_pool = gst_rtsp_session_pool_new ();
priv->mount_points = gst_rtsp_mount_points_new ();
priv->max_threads = DEFAULT_MAX_THREADS;
g_queue_init (&priv->loops);
}
static void
@ -912,34 +915,95 @@ close_error:
}
}
struct _Loop
{
gint refcnt;
GstRTSPServer *server;
GMainLoop *mainloop;
GMainContext *mainctx;
};
/* must be called with the lock held */
static void
loop_unref (Loop * loop)
{
GstRTSPServer *server = loop->server;
GstRTSPServerPrivate *priv = server->priv;
loop->refcnt--;
if (loop->refcnt <= 0) {
g_queue_remove (&priv->loops, loop);
g_main_loop_quit (loop->mainloop);
}
}
struct _ClientContext
{
GstRTSPServer *server;
GMainLoop *loop;
Loop *loop;
GstRTSPClient *client;
};
static void
static gboolean
free_client_context (ClientContext * ctx)
{
GST_RTSP_SERVER_LOCK (ctx->server);
if (ctx->loop)
g_main_loop_unref (ctx->loop);
loop_unref (ctx->loop);
GST_RTSP_SERVER_UNLOCK (ctx->server);
g_object_unref (ctx->client);
g_slice_free (ClientContext, ctx);
return G_SOURCE_REMOVE;
}
static gpointer
do_loop (ClientContext * ctx)
do_loop (Loop * loop)
{
GST_INFO ("enter mainloop");
g_main_loop_run (ctx->loop);
g_main_loop_run (loop->mainloop);
GST_INFO ("exit mainloop");
free_client_context (ctx);
g_main_context_unref (loop->mainctx);
g_main_loop_unref (loop->mainloop);
g_object_unref (loop->server);
g_slice_free (Loop, loop);
return NULL;
}
/* Must be called with lock held */
static Loop *
gst_rtsp_server_get_main_loop (GstRTSPServer * server)
{
GstRTSPServerPrivate *priv = server->priv;
Loop *loop;
if (priv->max_threads > 0 &&
g_queue_get_length (&priv->loops) >= priv->max_threads) {
loop = g_queue_pop_head (&priv->loops);
loop->refcnt++;
} else {
GstRTSPServerClass *klass = GST_RTSP_SERVER_GET_CLASS (server);
loop = g_slice_new0 (Loop);
loop->refcnt = 1;
loop->server = g_object_ref (server);
loop->mainctx = g_main_context_new ();
loop->mainloop = g_main_loop_new (loop->mainctx, FALSE);
g_thread_pool_push (klass->pool, loop, NULL);
}
g_queue_push_tail (&priv->loops, loop);
return loop;
}
static void
unmanage_client (GstRTSPClient * client, ClientContext * ctx)
{
@ -954,10 +1018,16 @@ unmanage_client (GstRTSPClient * client, ClientContext * ctx)
priv->clients = g_list_remove (priv->clients, ctx);
GST_RTSP_SERVER_UNLOCK (server);
if (ctx->loop)
g_main_loop_quit (ctx->loop);
else
if (ctx->loop) {
GSource *src;
src = g_idle_source_new ();
g_source_set_callback (src, (GSourceFunc) free_client_context, ctx, NULL);
g_source_attach (src, ctx->loop->mainctx);
g_source_unref (src);
} else {
free_client_context (ctx);
}
g_object_unref (server);
}
@ -976,6 +1046,8 @@ manage_client (GstRTSPServer * server, GstRTSPClient * client)
ctx = g_slice_new0 (ClientContext);
ctx->server = server;
ctx->client = client;
GST_RTSP_SERVER_LOCK (server);
if (priv->max_threads == 0) {
GSource *source;
@ -985,22 +1057,16 @@ manage_client (GstRTSPServer * server, GstRTSPClient * client)
else
mainctx = NULL;
} else {
mainctx = g_main_context_new ();
ctx->loop = g_main_loop_new (mainctx, TRUE);
g_main_context_unref (mainctx);
ctx->loop = gst_rtsp_server_get_main_loop (server);
mainctx = ctx->loop->mainctx;
}
gst_rtsp_client_attach (client, mainctx);
GST_RTSP_SERVER_LOCK (server);
g_signal_connect (client, "closed", (GCallback) unmanage_client, ctx);
priv->clients = g_list_prepend (priv->clients, ctx);
gst_rtsp_client_attach (client, mainctx);
GST_RTSP_SERVER_UNLOCK (server);
if (ctx->loop) {
GstRTSPServerClass *klass = GST_RTSP_SERVER_GET_CLASS (server);
g_thread_pool_push (klass->pool, ctx, NULL);
}
}
static GstRTSPClient *

View file

@ -742,7 +742,7 @@ GST_END_TEST;
GST_START_TEST (test_play_multithreaded)
{
gst_rtsp_server_set_max_threads (server, -1);
gst_rtsp_server_set_max_threads (server, 2);
start_server ();
@ -789,7 +789,7 @@ GST_START_TEST (test_play_multithreaded_block_in_describe)
GstRTSPMessage *response;
GstRTSPStatusCode code;
gst_rtsp_server_set_max_threads (server, 1);
gst_rtsp_server_set_max_threads (server, 2);
mounts = gst_rtsp_server_get_mount_points (server);
fail_unless (mounts != NULL);
@ -873,7 +873,7 @@ GST_START_TEST (test_play_multithreaded_timeout_client)
GstRTSPMessage *request;
GstRTSPMessage *response;
gst_rtsp_server_set_max_threads (server, -1);
gst_rtsp_server_set_max_threads (server, 2);
pool = gst_rtsp_server_get_session_pool (server);
g_signal_connect (server, "client-connected",
G_CALLBACK (session_connected_new_session_cb), new_session_timeout_one);
@ -952,7 +952,7 @@ GST_START_TEST (test_play_multithreaded_timeout_session)
GstRTSPTransport *audio_transport = NULL;
GstRTSPSessionPool *pool;
gst_rtsp_server_set_max_threads (server, -1);
gst_rtsp_server_set_max_threads (server, 2);
pool = gst_rtsp_server_get_session_pool (server);
g_signal_connect (server, "client-connected",
G_CALLBACK (session_connected_new_session_cb), new_session_timeout_one);