server: use thread pool

Use the thread pool instead of doing our own thing.
This commit is contained in:
Wim Taymans 2013-07-10 16:49:55 +02:00
parent 25269c7b1a
commit 27917f4ef3
3 changed files with 56 additions and 129 deletions

View file

@ -30,6 +30,7 @@ typedef struct _GstRTSPClientClass GstRTSPClientClass;
typedef struct _GstRTSPClientState GstRTSPClientState;
typedef struct _GstRTSPClientPrivate GstRTSPClientPrivate;
#include "rtsp-server.h"
#include "rtsp-media.h"
#include "rtsp-mount-points.h"
#include "rtsp-session-pool.h"
@ -49,6 +50,7 @@ typedef struct _GstRTSPClientPrivate GstRTSPClientPrivate;
/**
* GstRTSPClientState:
* @server: the server
* @client: the client
* @request: the complete request
* @uri: the complete url parsed from @request
@ -65,6 +67,7 @@ typedef struct _GstRTSPClientPrivate GstRTSPClientPrivate;
* Information passed around containing the client state of a request.
*/
struct _GstRTSPClientState {
GstRTSPServer *server;
GstRTSPClient *client;
GstRTSPMessage *request;
GstRTSPUrl *uri;

View file

@ -38,7 +38,6 @@ struct _GstRTSPServerPrivate
gchar *address;
gchar *service;
gint backlog;
gint max_threads;
gboolean use_client_settings;
GSocket *socket;
@ -52,12 +51,14 @@ struct _GstRTSPServerPrivate
/* authentication manager */
GstRTSPAuth *auth;
/* resource manager */
GstRTSPThreadPool *thread_pool;
/* the TLS certificate */
GTlsCertificate *certificate;
/* the clients that are connected */
GList *clients;
GQueue loops; /* the main loops used in the threads */
};
#define DEFAULT_ADDRESS "0.0.0.0"
@ -65,7 +66,6 @@ struct _GstRTSPServerPrivate
/* #define DEFAULT_ADDRESS "::0" */
#define DEFAULT_SERVICE "8554"
#define DEFAULT_BACKLOG 5
#define DEFAULT_MAX_THREADS 0
#define DEFAULT_USE_CLIENT_SETTINGS FALSE
/* Define to use the SO_LINGER option so that the server sockets can be resused
@ -83,7 +83,6 @@ enum
PROP_SESSION_POOL,
PROP_MOUNT_POINTS,
PROP_MAX_THREADS,
PROP_USE_CLIENT_SETTINGS,
PROP_LAST
};
@ -110,7 +109,6 @@ static void gst_rtsp_server_set_property (GObject * object, guint propid,
const GValue * value, GParamSpec * pspec);
static void gst_rtsp_server_finalize (GObject * object);
static gpointer do_loop (Loop * loop);
static GstRTSPClient *default_create_client (GstRTSPServer * server);
static gboolean default_setup_connection (GstRTSPServer * server,
GstRTSPClient * client, GstRTSPConnection * conn);
@ -197,18 +195,6 @@ gst_rtsp_server_class_init (GstRTSPServerClass * klass)
"The mount points to use for client session",
GST_TYPE_RTSP_MOUNT_POINTS,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GstRTSPServer::max-threads:
*
* The maximum amount of threads to use for client connections. A value of
* 0 means to use only the mainloop, -1 means an unlimited amount of
* threads.
*/
g_object_class_install_property (gobject_class, PROP_MAX_THREADS,
g_param_spec_int ("max-threads", "Max Threads",
"The maximum amount of threads to use for client connections "
"(0 = only mainloop, -1 = unlimited)", -1, G_MAXINT,
DEFAULT_MAX_THREADS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GstRTSPServer::use-client-settings:
*
@ -230,8 +216,6 @@ gst_rtsp_server_class_init (GstRTSPServerClass * klass)
klass->create_client = default_create_client;
klass->setup_connection = default_setup_connection;
klass->pool = g_thread_pool_new ((GFunc) do_loop, klass, -1, FALSE, NULL);
GST_DEBUG_CATEGORY_INIT (rtsp_server_debug, "rtspserver", 0, "GstRTSPServer");
}
@ -249,9 +233,8 @@ gst_rtsp_server_init (GstRTSPServer * server)
priv->backlog = DEFAULT_BACKLOG;
priv->session_pool = gst_rtsp_session_pool_new ();
priv->mount_points = gst_rtsp_mount_points_new ();
priv->max_threads = DEFAULT_MAX_THREADS;
priv->thread_pool = gst_rtsp_thread_pool_new ();
priv->use_client_settings = DEFAULT_USE_CLIENT_SETTINGS;
g_queue_init (&priv->loops);
}
static void
@ -268,8 +251,12 @@ gst_rtsp_server_finalize (GObject * object)
if (priv->socket)
g_object_unref (priv->socket);
g_object_unref (priv->session_pool);
g_object_unref (priv->mount_points);
if (priv->session_pool)
g_object_unref (priv->session_pool);
if (priv->mount_points)
g_object_unref (priv->mount_points);
if (priv->thread_pool)
g_object_unref (priv->thread_pool);
if (priv->auth)
g_object_unref (priv->auth);
@ -654,52 +641,60 @@ gst_rtsp_server_get_auth (GstRTSPServer * server)
}
/**
* gst_rtsp_server_set_max_threads:
* gst_rtsp_server_set_thread_pool:
* @server: a #GstRTSPServer
* @max_threads: maximum threads
* @pool: a #GstRTSPThreadPool
*
* Set the maximum threads used by the server to handle client requests.
* A value of 0 will use the server mainloop, a value of -1 will use an
* unlimited number of threads.
* configure @pool to be used as the thread pool of @server.
*/
void
gst_rtsp_server_set_max_threads (GstRTSPServer * server, gint max_threads)
gst_rtsp_server_set_thread_pool (GstRTSPServer * server,
GstRTSPThreadPool * pool)
{
GstRTSPServerPrivate *priv;
GstRTSPThreadPool *old;
g_return_if_fail (GST_IS_RTSP_SERVER (server));
priv = server->priv;
if (pool)
g_object_ref (pool);
GST_RTSP_SERVER_LOCK (server);
priv->max_threads = max_threads;
old = priv->thread_pool;
priv->thread_pool = pool;
GST_RTSP_SERVER_UNLOCK (server);
if (old)
g_object_unref (old);
}
/**
* gst_rtsp_server_get_max_threads:
* gst_rtsp_server_get_thread_pool:
* @server: a #GstRTSPServer
*
* Get the maximum number of threads used for client connections.
* See gst_rtsp_server_set_max_threads().
* Get the #GstRTSPThreadPool used as the thread pool of @server.
*
* Returns: the maximum number of threads.
* Returns: (transfer full): the #GstRTSPThreadPool of @server. g_object_unref() after
* usage.
*/
gint
gst_rtsp_server_get_max_threads (GstRTSPServer * server)
GstRTSPThreadPool *
gst_rtsp_server_get_thread_pool (GstRTSPServer * server)
{
GstRTSPServerPrivate *priv;
gint res;
GstRTSPThreadPool *result;
g_return_val_if_fail (GST_IS_RTSP_SERVER (server), -1);
g_return_val_if_fail (GST_IS_RTSP_SERVER (server), NULL);
priv = server->priv;
GST_RTSP_SERVER_LOCK (server);
res = priv->max_threads;
if ((result = priv->thread_pool))
g_object_ref (result);
GST_RTSP_SERVER_UNLOCK (server);
return res;
return result;
}
/**
@ -834,9 +829,6 @@ gst_rtsp_server_get_property (GObject * object, guint propid,
case PROP_MOUNT_POINTS:
g_value_take_object (value, gst_rtsp_server_get_mount_points (server));
break;
case PROP_MAX_THREADS:
g_value_set_int (value, gst_rtsp_server_get_max_threads (server));
break;
case PROP_USE_CLIENT_SETTINGS:
g_value_set_boolean (value,
gst_rtsp_server_get_use_client_settings (server));
@ -868,9 +860,6 @@ gst_rtsp_server_set_property (GObject * object, guint propid,
case PROP_MOUNT_POINTS:
gst_rtsp_server_set_mount_points (server, g_value_get_object (value));
break;
case PROP_MAX_THREADS:
gst_rtsp_server_set_max_threads (server, g_value_get_int (value));
break;
case PROP_USE_CLIENT_SETTINGS:
gst_rtsp_server_set_use_client_settings (server,
g_value_get_boolean (value));
@ -1050,34 +1039,10 @@ close_error:
}
}
struct _Loop
{
gint refcnt;
GstRTSPServer *server;
GMainLoop *mainloop;
GMainContext *mainctx;
};
/* must be called with the lock held */
static void
loop_unref (Loop * loop)
{
GstRTSPServer *server = loop->server;
GstRTSPServerPrivate *priv = server->priv;
loop->refcnt--;
if (loop->refcnt <= 0) {
g_queue_remove (&priv->loops, loop);
g_main_loop_quit (loop->mainloop);
}
}
struct _ClientContext
{
GstRTSPServer *server;
Loop *loop;
GstRTSPThread *thread;
GstRTSPClient *client;
};
@ -1085,8 +1050,8 @@ static gboolean
free_client_context (ClientContext * ctx)
{
GST_RTSP_SERVER_LOCK (ctx->server);
if (ctx->loop)
loop_unref (ctx->loop);
if (ctx->thread)
gst_rtsp_thread_stop (ctx->thread);
GST_RTSP_SERVER_UNLOCK (ctx->server);
g_object_unref (ctx->client);
@ -1095,50 +1060,6 @@ free_client_context (ClientContext * ctx)
return G_SOURCE_REMOVE;
}
static gpointer
do_loop (Loop * loop)
{
GST_INFO ("enter mainloop");
g_main_loop_run (loop->mainloop);
GST_INFO ("exit mainloop");
g_main_context_unref (loop->mainctx);
g_main_loop_unref (loop->mainloop);
g_object_unref (loop->server);
g_slice_free (Loop, loop);
return NULL;
}
/* Must be called with lock held */
static Loop *
gst_rtsp_server_get_main_loop (GstRTSPServer * server)
{
GstRTSPServerPrivate *priv = server->priv;
Loop *loop;
if (priv->max_threads > 0 &&
g_queue_get_length (&priv->loops) >= priv->max_threads) {
loop = g_queue_pop_head (&priv->loops);
loop->refcnt++;
} else {
GstRTSPServerClass *klass = GST_RTSP_SERVER_GET_CLASS (server);
loop = g_slice_new0 (Loop);
loop->refcnt = 1;
loop->server = g_object_ref (server);
loop->mainctx = g_main_context_new ();
loop->mainloop = g_main_loop_new (loop->mainctx, FALSE);
g_thread_pool_push (klass->pool, loop, NULL);
}
g_queue_push_tail (&priv->loops, loop);
return loop;
}
static void
unmanage_client (GstRTSPClient * client, ClientContext * ctx)
{
@ -1153,12 +1074,12 @@ unmanage_client (GstRTSPClient * client, ClientContext * ctx)
priv->clients = g_list_remove (priv->clients, ctx);
GST_RTSP_SERVER_UNLOCK (server);
if (ctx->loop) {
if (ctx->thread) {
GSource *src;
src = g_idle_source_new ();
g_source_set_callback (src, (GSourceFunc) free_client_context, ctx, NULL);
g_source_attach (src, ctx->loop->mainctx);
g_source_attach (src, ctx->thread->context);
g_source_unref (src);
} else {
free_client_context (ctx);
@ -1174,7 +1095,8 @@ manage_client (GstRTSPServer * server, GstRTSPClient * client)
{
ClientContext *ctx;
GstRTSPServerPrivate *priv = server->priv;
GMainContext *mainctx;
GMainContext *mainctx = NULL;
GstRTSPClientState state = { NULL };
GST_DEBUG_OBJECT (server, "manage client %p", client);
@ -1183,17 +1105,19 @@ manage_client (GstRTSPServer * server, GstRTSPClient * client)
ctx->client = client;
GST_RTSP_SERVER_LOCK (server);
if (priv->max_threads == 0) {
GSource *source;
state.server = server;
state.client = client;
ctx->thread = gst_rtsp_thread_pool_get_thread (priv->thread_pool,
GST_RTSP_THREAD_TYPE_CLIENT, &state);
if (ctx->thread)
mainctx = ctx->thread->context;
else {
GSource *source;
/* find the context to add the watch */
if ((source = g_main_current_source ()))
mainctx = g_source_get_context (source);
else
mainctx = NULL;
} else {
ctx->loop = gst_rtsp_server_get_main_loop (server);
mainctx = ctx->loop->mainctx;
}
g_signal_connect (client, "closed", (GCallback) unmanage_client, ctx);

View file

@ -100,8 +100,8 @@ GstRTSPMountPoints * gst_rtsp_server_get_mount_points (GstRTSPServer *serve
void gst_rtsp_server_set_auth (GstRTSPServer *server, GstRTSPAuth *auth);
GstRTSPAuth * gst_rtsp_server_get_auth (GstRTSPServer *server);
void gst_rtsp_server_set_max_threads (GstRTSPServer *server, gint max_threads);
gint gst_rtsp_server_get_max_threads (GstRTSPServer *server);
void gst_rtsp_server_set_thread_pool (GstRTSPServer *server, GstRTSPThreadPool *pool);
GstRTSPThreadPool * gst_rtsp_server_get_thread_pool (GstRTSPServer *server);
void gst_rtsp_server_set_use_client_settings (GstRTSPServer *server,
gboolean use_client_settings);