diff --git a/gst/rtsp-server/Makefile.am b/gst/rtsp-server/Makefile.am index 170fce55fb..725537ec2a 100644 --- a/gst/rtsp-server/Makefile.am +++ b/gst/rtsp-server/Makefile.am @@ -3,6 +3,7 @@ public_headers = \ rtsp-address-pool.h \ rtsp-params.h \ rtsp-sdp.h \ + rtsp-thread-pool.h \ rtsp-media.h \ rtsp-media-factory.h \ rtsp-media-factory-uri.h \ @@ -22,6 +23,7 @@ c_sources = \ rtsp-address-pool.c \ rtsp-params.c \ rtsp-sdp.c \ + rtsp-thread-pool.c \ rtsp-media.c \ rtsp-media-factory.c \ rtsp-media-factory-uri.c \ diff --git a/gst/rtsp-server/rtsp-thread-pool.c b/gst/rtsp-server/rtsp-thread-pool.c new file mode 100644 index 0000000000..5f8121340c --- /dev/null +++ b/gst/rtsp-server/rtsp-thread-pool.c @@ -0,0 +1,461 @@ +/* GStreamer + * Copyright (C) 2013 Wim Taymans + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#include + +#include "rtsp-thread-pool.h" + +typedef struct _GstRTSPThreadImpl +{ + GstRTSPThread thread; + + gint reused; +} GstRTSPThreadImpl; + +GST_DEFINE_MINI_OBJECT_TYPE (GstRTSPThread, gst_rtsp_thread); + +static void gst_rtsp_thread_init (GstRTSPThreadImpl * impl); + +static void +_gst_rtsp_thread_free (GstRTSPThreadImpl * impl) +{ + GST_DEBUG ("free thread %p", impl); + + g_main_loop_unref (impl->thread.loop); + g_main_context_unref (impl->thread.context); + g_slice_free1 (sizeof (GstRTSPThreadImpl), impl); +} + +static GstRTSPThread * +_gst_rtsp_thread_copy (GstRTSPThreadImpl * impl) +{ + GstRTSPThreadImpl *copy; + + GST_DEBUG ("copy thread %p", impl); + + copy = g_slice_new0 (GstRTSPThreadImpl); + gst_rtsp_thread_init (copy); + copy->thread.context = g_main_context_ref (impl->thread.context); + copy->thread.loop = g_main_loop_ref (impl->thread.loop); + + return GST_RTSP_THREAD (copy); +} + +static void +gst_rtsp_thread_init (GstRTSPThreadImpl * impl) +{ + gst_mini_object_init (GST_MINI_OBJECT_CAST (impl), 0, + GST_TYPE_RTSP_THREAD, + (GstMiniObjectCopyFunction) _gst_rtsp_thread_copy, NULL, + (GstMiniObjectFreeFunction) _gst_rtsp_thread_free); + + g_atomic_int_set (&impl->reused, 1); +} + +/** + * gst_rtsp_thread_new: + * + * Create a new thread object that can run a mainloop. + * + * Returns: a #GstRTSPThread. + */ +GstRTSPThread * +gst_rtsp_thread_new (void) +{ + GstRTSPThreadImpl *impl; + + impl = g_slice_new0 (GstRTSPThreadImpl); + + gst_rtsp_thread_init (impl); + impl->thread.context = g_main_context_new (); + impl->thread.loop = g_main_loop_new (impl->thread.context, TRUE); + + return GST_RTSP_THREAD (impl); +} + +/** + * gst_rtsp_thread_reuse: + * @thread: a #GstRTSPThread + * + * Reuse the mainloop of @thread + */ +void +gst_rtsp_thread_reuse (GstRTSPThread * thread) +{ + GstRTSPThreadImpl *impl = (GstRTSPThreadImpl *) thread; + + g_return_if_fail (GST_IS_RTSP_THREAD (thread)); + + GST_DEBUG ("reuse thread %p", thread); + g_atomic_int_inc (&impl->reused); +} + +/** + * gst_rtsp_thread_stop: + * @thread: a #GstRTSPThread + * + * Stop @thread. When no threads are using the mainloop, the thread will be + * stopped and the final ref to @thread will be released. + */ +void +gst_rtsp_thread_stop (GstRTSPThread * thread) +{ + GstRTSPThreadImpl *impl = (GstRTSPThreadImpl *) thread; + + g_return_if_fail (GST_IS_RTSP_THREAD (thread)); + + GST_DEBUG ("stop thread %p", thread); + + if (g_atomic_int_dec_and_test (&impl->reused)) { + GST_DEBUG ("stop mainloop of thread %p", thread); + g_main_loop_quit (thread->loop); + } +} + +#define GST_RTSP_THREAD_POOL_GET_PRIVATE(obj) \ + (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_THREAD_POOL, GstRTSPThreadPoolPrivate)) + +struct _GstRTSPThreadPoolPrivate +{ + GMutex lock; + + gint max_threads; + /* currently used mainloops */ + GQueue threads; +}; + +#define DEFAULT_MAX_THREADS 1 + +enum +{ + PROP_0, + PROP_MAX_THREADS, + PROP_LAST +}; + +GST_DEBUG_CATEGORY_STATIC (rtsp_thread_pool_debug); +#define GST_CAT_DEFAULT rtsp_thread_pool_debug + +static GQuark thread_pool; + +static void gst_rtsp_thread_pool_get_property (GObject * object, guint propid, + GValue * value, GParamSpec * pspec); +static void gst_rtsp_thread_pool_set_property (GObject * object, guint propid, + const GValue * value, GParamSpec * pspec); +static void gst_rtsp_thread_pool_finalize (GObject * obj); + +static gpointer do_loop (GstRTSPThread * thread, + GstRTSPThreadPoolClass * klass); +static GstRTSPThread *default_get_thread (GstRTSPThreadPool * pool, + GstRTSPThreadType type, GstRTSPClientState * state); + +G_DEFINE_TYPE (GstRTSPThreadPool, gst_rtsp_thread_pool, G_TYPE_OBJECT); + +static void +gst_rtsp_thread_pool_class_init (GstRTSPThreadPoolClass * klass) +{ + GObjectClass *gobject_class; + + g_type_class_add_private (klass, sizeof (GstRTSPThreadPoolPrivate)); + + gobject_class = G_OBJECT_CLASS (klass); + + gobject_class->get_property = gst_rtsp_thread_pool_get_property; + gobject_class->set_property = gst_rtsp_thread_pool_set_property; + gobject_class->finalize = gst_rtsp_thread_pool_finalize; + + /** + * GstRTSPThreadPool::max-threads: + * + * The maximum amount of threads to use for client connections. A value of + * 0 means to use only the mainloop, -1 means an unlimited amount of + * threads. + */ + g_object_class_install_property (gobject_class, PROP_MAX_THREADS, + g_param_spec_int ("max-threads", "Max Threads", + "The maximum amount of threads to use for client connections " + "(0 = only mainloop, -1 = unlimited)", -1, G_MAXINT, + DEFAULT_MAX_THREADS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + klass->get_thread = default_get_thread; + + klass->pool = g_thread_pool_new ((GFunc) do_loop, klass, -1, FALSE, NULL); + + GST_DEBUG_CATEGORY_INIT (rtsp_thread_pool_debug, "rtspthreadpool", 0, + "GstRTSPThreadPool"); + + thread_pool = g_quark_from_string ("gst.rtsp.thread.pool"); +} + +static void +gst_rtsp_thread_pool_init (GstRTSPThreadPool * pool) +{ + GstRTSPThreadPoolPrivate *priv; + + pool->priv = priv = GST_RTSP_THREAD_POOL_GET_PRIVATE (pool); + + g_mutex_init (&priv->lock); + priv->max_threads = DEFAULT_MAX_THREADS; + g_queue_init (&priv->threads); +} + +static void +gst_rtsp_thread_pool_finalize (GObject * obj) +{ + GstRTSPThreadPool *pool = GST_RTSP_THREAD_POOL (obj); + GstRTSPThreadPoolPrivate *priv = pool->priv; + + GST_INFO ("finalize pool %p", pool); + + g_queue_clear (&priv->threads); + g_mutex_clear (&priv->lock); + + G_OBJECT_CLASS (gst_rtsp_thread_pool_parent_class)->finalize (obj); +} + +static void +gst_rtsp_thread_pool_get_property (GObject * object, guint propid, + GValue * value, GParamSpec * pspec) +{ + GstRTSPThreadPool *pool = GST_RTSP_THREAD_POOL (object); + + switch (propid) { + case PROP_MAX_THREADS: + g_value_set_int (value, gst_rtsp_thread_pool_get_max_threads (pool)); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec); + } +} + +static void +gst_rtsp_thread_pool_set_property (GObject * object, guint propid, + const GValue * value, GParamSpec * pspec) +{ + GstRTSPThreadPool *pool = GST_RTSP_THREAD_POOL (object); + + switch (propid) { + case PROP_MAX_THREADS: + gst_rtsp_thread_pool_set_max_threads (pool, g_value_get_int (value)); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec); + } +} + +static gpointer +do_loop (GstRTSPThread * thread, GstRTSPThreadPoolClass * klass) +{ + GstRTSPThreadPoolPrivate *priv; + GstRTSPThreadPool *pool; + + pool = gst_mini_object_get_qdata (GST_MINI_OBJECT (thread), thread_pool); + priv = pool->priv; + + if (klass->thread_enter) + klass->thread_enter (pool, thread); + + GST_INFO ("enter mainloop of thread %p", thread); + g_main_loop_run (thread->loop); + GST_INFO ("exit mainloop of thread %p", thread); + + if (klass->thread_leave) + klass->thread_leave (pool, thread); + + g_mutex_lock (&priv->lock); + g_queue_remove (&priv->threads, thread); + g_mutex_unlock (&priv->lock); + + gst_rtsp_thread_unref (thread); + + return NULL; +} + +/** + * gst_rtsp_thread_pool_new: + * + * Create a new #GstRTSPThreadPool instance. + * + * Returns: a new #GstRTSPThreadPool + */ +GstRTSPThreadPool * +gst_rtsp_thread_pool_new (void) +{ + GstRTSPThreadPool *result; + + result = g_object_new (GST_TYPE_RTSP_THREAD_POOL, NULL); + + return result; +} + +/** + * gst_rtsp_thread_pool_set_max_threads: + * @pool: a #GstRTSPThreadPool + * @max_threads: maximum threads + * + * Set the maximum threads used by the pool to handle client requests. + * A value of 0 will use the pool mainloop, a value of -1 will use an + * unlimited number of threads. + */ +void +gst_rtsp_thread_pool_set_max_threads (GstRTSPThreadPool * pool, + gint max_threads) +{ + GstRTSPThreadPoolPrivate *priv; + + g_return_if_fail (GST_IS_RTSP_THREAD_POOL (pool)); + + priv = pool->priv; + + g_mutex_lock (&priv->lock); + priv->max_threads = max_threads; + g_mutex_unlock (&priv->lock); +} + +/** + * gst_rtsp_thread_pool_get_max_threads: + * @pool: a #GstRTSPThreadPool + * + * Get the maximum number of threads used for client connections. + * See gst_rtsp_thread_pool_set_max_threads(). + * + * Returns: the maximum number of threads. + */ +gint +gst_rtsp_thread_pool_get_max_threads (GstRTSPThreadPool * pool) +{ + GstRTSPThreadPoolPrivate *priv; + gint res; + + g_return_val_if_fail (GST_IS_RTSP_THREAD_POOL (pool), -1); + + priv = pool->priv; + + g_mutex_lock (&priv->lock); + res = priv->max_threads; + g_mutex_unlock (&priv->lock); + + return res; +} + +static GstRTSPThread * +make_thread (GstRTSPThreadPool * pool, GstRTSPClientState * state) +{ + GstRTSPThreadPoolClass *klass; + GstRTSPThread *thread; + + klass = GST_RTSP_THREAD_POOL_GET_CLASS (pool); + + thread = gst_rtsp_thread_new (); + gst_mini_object_set_qdata (GST_MINI_OBJECT (thread), thread_pool, + g_object_ref (pool), g_object_unref); + + GST_DEBUG_OBJECT (pool, "new thread %p", thread); + + if (klass->configure_thread) + klass->configure_thread (pool, thread, state); + + return thread; +} + +static GstRTSPThread * +default_get_thread (GstRTSPThreadPool * pool, + GstRTSPThreadType type, GstRTSPClientState * state) +{ + GstRTSPThreadPoolPrivate *priv = pool->priv; + GstRTSPThreadPoolClass *klass; + GstRTSPThread *thread; + GError *error = NULL; + + klass = GST_RTSP_THREAD_POOL_GET_CLASS (pool); + + switch (type) { + case GST_RTSP_THREAD_TYPE_CLIENT: + if (priv->max_threads == 0) { + /* no threads allowed */ + GST_DEBUG_OBJECT (pool, "no client threads allowed"); + thread = NULL; + } else { + if (priv->max_threads > 0 && + g_queue_get_length (&priv->threads) >= priv->max_threads) { + /* max threads reached, recycle from queue */ + GST_DEBUG_OBJECT (pool, "recycle client thread"); + thread = g_queue_pop_head (&priv->threads); + gst_rtsp_thread_ref (thread); + } else { + /* make more threads */ + GST_DEBUG_OBJECT (pool, "make new client thread"); + thread = make_thread (pool, state); + + if (!g_thread_pool_push (klass->pool, thread, &error)) + goto thread_error; + } + g_queue_push_tail (&priv->threads, thread); + } + break; + case GST_RTSP_THREAD_TYPE_MEDIA: + GST_DEBUG_OBJECT (pool, "make new media thread"); + thread = make_thread (pool, state); + + if (!g_thread_pool_push (klass->pool, thread, &error)) + goto thread_error; + break; + default: + thread = NULL; + break; + } + return thread; + + /* ERRORS */ +thread_error: + { + GST_ERROR_OBJECT (pool, "failed to push thread %s", error->message); + gst_rtsp_thread_unref (thread); + g_clear_error (&error); + return NULL; + } +} + +/** + * gst_rtsp_thread_pool_get_thread: + * @pool: a #GstRTSPThreadPool + * @type: the #GstRTSPThreadType + * @state: a #GstRTSPClientState + * + * Get a new #GstRTSPThread for @type and @state. + * + * Returns: a new #GstRTSPThread, gst_rtsp_thread_stop() after usage + */ +GstRTSPThread * +gst_rtsp_thread_pool_get_thread (GstRTSPThreadPool * pool, + GstRTSPThreadType type, GstRTSPClientState * state) +{ + GstRTSPThreadPoolClass *klass; + GstRTSPThread *result = NULL; + + g_return_val_if_fail (GST_IS_RTSP_THREAD_POOL (pool), NULL); + g_return_val_if_fail (state != NULL, NULL); + + klass = GST_RTSP_THREAD_POOL_GET_CLASS (pool); + + if (klass->get_thread) + result = klass->get_thread (pool, type, state); + + return result; +} diff --git a/gst/rtsp-server/rtsp-thread-pool.h b/gst/rtsp-server/rtsp-thread-pool.h new file mode 100644 index 0000000000..301b8a1d9d --- /dev/null +++ b/gst/rtsp-server/rtsp-thread-pool.h @@ -0,0 +1,165 @@ +/* GStreamer + * Copyright (C) 2010 Wim Taymans + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#include + +#ifndef __GST_RTSP_THREAD_POOL_H__ +#define __GST_RTSP_THREAD_POOL_H__ + +typedef struct _GstRTSPThread GstRTSPThread; +typedef struct _GstRTSPThreadPool GstRTSPThreadPool; +typedef struct _GstRTSPThreadPoolClass GstRTSPThreadPoolClass; +typedef struct _GstRTSPThreadPoolPrivate GstRTSPThreadPoolPrivate; + +#include "rtsp-client.h" + +G_BEGIN_DECLS + +#define GST_TYPE_RTSP_THREAD_POOL (gst_rtsp_thread_pool_get_type ()) +#define GST_IS_RTSP_THREAD_POOL(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_RTSP_THREAD_POOL)) +#define GST_IS_RTSP_THREAD_POOL_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_RTSP_THREAD_POOL)) +#define GST_RTSP_THREAD_POOL_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj), GST_TYPE_RTSP_THREAD_POOL, GstRTSPThreadPoolClass)) +#define GST_RTSP_THREAD_POOL(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_RTSP_THREAD_POOL, GstRTSPThreadPool)) +#define GST_RTSP_THREAD_POOL_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_RTSP_THREAD_POOL, GstRTSPThreadPoolClass)) +#define GST_RTSP_THREAD_POOL_CAST(obj) ((GstRTSPThreadPool*)(obj)) +#define GST_RTSP_THREAD_POOL_CLASS_CAST(klass) ((GstRTSPThreadPoolClass*)(klass)) + +GType gst_rtsp_thread_get_type (void); + +#define GST_TYPE_RTSP_THREAD (gst_rtsp_thread_get_type ()) +#define GST_IS_RTSP_THREAD(obj) (GST_IS_MINI_OBJECT_TYPE (obj, GST_TYPE_RTSP_THREAD)) +#define GST_RTSP_THREAD_CAST(obj) ((GstRTSPThread*)(obj)) +#define GST_RTSP_THREAD(obj) (GST_RTSP_THREAD_CAST(obj)) + +/** + * GstRTSPThread: + * + * Structure holding info about a mainloop running in a thread + */ +struct _GstRTSPThread { + GstMiniObject mini_object; + + GMainContext *context; + GMainLoop *loop; +}; + +GstRTSPThread * gst_rtsp_thread_new (void); + +void gst_rtsp_thread_reuse (GstRTSPThread * thread); +void gst_rtsp_thread_stop (GstRTSPThread * thread); + +/** + * gst_rtsp_thread_ref: + * @thread: The thread to refcount + * + * Increase the refcount of this thread. + * + * Returns: (transfer full): @thread (for convenience when doing assignments) + */ +#ifdef _FOOL_GTK_DOC_ +G_INLINE_FUNC GstRTSPThread * gst_rtsp_thread_ref (GstRTSPThread * thread); +#endif + +static inline GstRTSPThread * +gst_rtsp_thread_ref (GstRTSPThread * thread) +{ + return (GstRTSPThread *) gst_mini_object_ref (GST_MINI_OBJECT_CAST (thread)); +} + +/** + * gst_rtsp_thread_unref: + * @thread: (transfer full): the thread to refcount + * + * Decrease the refcount of an thread, freeing it if the refcount reaches 0. + */ +#ifdef _FOOL_GTK_DOC_ +G_INLINE_FUNC void gst_rtsp_thread_unref (GstRTSPPermissions * thread); +#endif + + +static inline void +gst_rtsp_thread_unref (GstRTSPThread * thread) +{ + gst_mini_object_unref (GST_MINI_OBJECT_CAST (thread)); +} + +/** + * GstRTSPThreadType: + * @GST_RTSP_THREAD_TYPE_CLIENT: a thread to handle the client communication + * @GST_RTSP_THREAD_TYPE_MEDIA: a thread to handle media + * + * Different thread types + */ +typedef enum +{ + GST_RTSP_THREAD_TYPE_CLIENT, + GST_RTSP_THREAD_TYPE_MEDIA +} GstRTSPThreadType; + +/** + * GstRTSPThreadPool: + * + * The thread pool structure. + */ +struct _GstRTSPThreadPool { + GObject parent; + + GstRTSPThreadPoolPrivate *priv; +}; + +/** + * GstRTSPThreadPoolClass: + * @get_thread: get or reuse a thread object + * @configure_thread: configure a thread object + * @thread_enter: called from the thread when it is entered + * @thread_leave: called from the thread when it is left + * + * Class for managing threads. + */ +struct _GstRTSPThreadPoolClass { + GObjectClass parent_class; + + GThreadPool *pool; + + GstRTSPThread * (*get_thread) (GstRTSPThreadPool *pool, + GstRTSPThreadType type, + GstRTSPClientState *state); + void (*configure_thread) (GstRTSPThreadPool *pool, + GstRTSPThread * thread, + GstRTSPClientState *state); + + void (*thread_enter) (GstRTSPThreadPool *pool, + GstRTSPThread *thread); + void (*thread_leave) (GstRTSPThreadPool *pool, + GstRTSPThread *thread); +}; + +GType gst_rtsp_thread_pool_get_type (void); + +GstRTSPThreadPool * gst_rtsp_thread_pool_new (void); + +void gst_rtsp_thread_pool_set_max_threads (GstRTSPThreadPool * pool, gint max_threads); +gint gst_rtsp_thread_pool_get_max_threads (GstRTSPThreadPool * pool); + +GstRTSPThread * gst_rtsp_thread_pool_get_thread (GstRTSPThreadPool *pool, + GstRTSPThreadType type, + GstRTSPClientState *state); +G_END_DECLS + +#endif /* __GST_RTSP_THREAD_POOL_H__ */