/* GStreamer
 * Copyright (C) 2013 Wim Taymans <wim.taymans at gmail.com>
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Library General Public
 * License as published by the Free Software Foundation; either
 * version 2 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Library General Public License for more details.
 *
 * You should have received a copy of the GNU Library General Public
 * License along with this library; if not, write to the
 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
 * Boston, MA 02110-1301, USA.
 */
/**
 * SECTION:rtsp-thread-pool
 * @short_description: A pool of threads
 * @see_also: #GstRTSPMedia, #GstRTSPClient
 *
 * A #GstRTSPThreadPool manages reusable threads for various server tasks.
 * Currently the defined thread types can be found in #GstRTSPThreadType.
 *
 * Threads of type #GST_RTSP_THREAD_TYPE_CLIENT are used to handle requests from
 * a connected client. With gst_rtsp_thread_pool_get_max_threads() a maximum
 * number of threads can be set after which the pool will start to reuse the
 * same thread for multiple clients.
 *
 * Threads of type #GST_RTSP_THREAD_TYPE_MEDIA will be used to perform the state
 * changes of the media pipelines and handle its bus messages.
 *
 * gst_rtsp_thread_pool_get_thread() can be used to create a #GstRTSPThread
 * object of the right type. The thread object contains a mainloop and context
 * that run in a seperate thread and can be used to attached sources to.
 *
 * gst_rtsp_thread_reuse() can be used to reuse a thread for multiple purposes.
 * If all gst_rtsp_thread_reuse() calls are matched with a
 * gst_rtsp_thread_stop() call, the mainloop will be quit and the thread will
 * stop.
 *
 * To configure the threads, a subclass of this object should be made and the
 * virtual methods should be overriden to implement the desired functionality.
 *
 * Last reviewed on 2013-07-11 (1.0.0)
 */
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif

#include <string.h>

#include "rtsp-thread-pool.h"

typedef struct _GstRTSPThreadImpl
{
  GstRTSPThread thread;

  gint reused;
  GSource *source;
  /* FIXME, the source has to be part of GstRTSPThreadImpl, due to a bug in GLib:
   * https://bugzilla.gnome.org/show_bug.cgi?id=720186 */
} GstRTSPThreadImpl;

GST_DEFINE_MINI_OBJECT_TYPE (GstRTSPThread, gst_rtsp_thread);

static void gst_rtsp_thread_init (GstRTSPThreadImpl * impl);

static void
_gst_rtsp_thread_free (GstRTSPThreadImpl * impl)
{
  GST_DEBUG ("free thread %p", impl);

  g_source_unref (impl->source);
  g_main_loop_unref (impl->thread.loop);
  g_main_context_unref (impl->thread.context);
  g_slice_free1 (sizeof (GstRTSPThreadImpl), impl);
}

static GstRTSPThread *
_gst_rtsp_thread_copy (GstRTSPThreadImpl * impl)
{
  GstRTSPThreadImpl *copy;

  GST_DEBUG ("copy thread %p", impl);

  copy = g_slice_new0 (GstRTSPThreadImpl);
  gst_rtsp_thread_init (copy);
  copy->thread.context = g_main_context_ref (impl->thread.context);
  copy->thread.loop = g_main_loop_ref (impl->thread.loop);

  return GST_RTSP_THREAD (copy);
}

static void
gst_rtsp_thread_init (GstRTSPThreadImpl * impl)
{
  gst_mini_object_init (GST_MINI_OBJECT_CAST (impl), 0,
      GST_TYPE_RTSP_THREAD,
      (GstMiniObjectCopyFunction) _gst_rtsp_thread_copy, NULL,
      (GstMiniObjectFreeFunction) _gst_rtsp_thread_free);

  g_atomic_int_set (&impl->reused, 1);
}

/**
 * gst_rtsp_thread_new:
 * @type: the thread type
 *
 * Create a new thread object that can run a mainloop.
 *
 * Returns: (transfer full): a #GstRTSPThread.
 */
GstRTSPThread *
gst_rtsp_thread_new (GstRTSPThreadType type)
{
  GstRTSPThreadImpl *impl;

  impl = g_slice_new0 (GstRTSPThreadImpl);

  gst_rtsp_thread_init (impl);
  impl->thread.type = type;
  impl->thread.context = g_main_context_new ();
  impl->thread.loop = g_main_loop_new (impl->thread.context, TRUE);

  return GST_RTSP_THREAD (impl);
}

/**
 * gst_rtsp_thread_reuse:
 * @thread: (transfer none): a #GstRTSPThread
 *
 * Reuse the mainloop of @thread
 *
 * Returns: %TRUE if the mainloop could be reused
 */
gboolean
gst_rtsp_thread_reuse (GstRTSPThread * thread)
{
  GstRTSPThreadImpl *impl = (GstRTSPThreadImpl *) thread;
  gboolean res;

  g_return_val_if_fail (GST_IS_RTSP_THREAD (thread), FALSE);

  GST_DEBUG ("reuse thread %p", thread);

  res = g_atomic_int_add (&impl->reused, 1) > 0;
  if (res)
    gst_rtsp_thread_ref (thread);

  return res;
}

static gboolean
do_quit (GstRTSPThread * thread)
{
  GST_DEBUG ("stop mainloop of thread %p", thread);
  g_main_loop_quit (thread->loop);
  return FALSE;
}

/**
 * gst_rtsp_thread_stop:
 * @thread: (transfer full): a #GstRTSPThread
 *
 * Stop and unref @thread. When no threads are using the mainloop, the thread
 * will be stopped and the final ref to @thread will be released.
 */
void
gst_rtsp_thread_stop (GstRTSPThread * thread)
{
  GstRTSPThreadImpl *impl = (GstRTSPThreadImpl *) thread;

  g_return_if_fail (GST_IS_RTSP_THREAD (thread));

  GST_DEBUG ("stop thread %p", thread);

  if (g_atomic_int_dec_and_test (&impl->reused)) {
    GST_DEBUG ("add idle source to quit mainloop of thread %p", thread);
    impl->source = g_idle_source_new ();
    g_source_set_callback (impl->source, (GSourceFunc) do_quit,
        thread, (GDestroyNotify) gst_rtsp_thread_unref);
    g_source_attach (impl->source, thread->context);
  } else
    gst_rtsp_thread_unref (thread);
}

struct _GstRTSPThreadPoolPrivate
{
  GMutex lock;

  gint max_threads;
  /* currently used mainloops */
  GQueue threads;
};

#define DEFAULT_MAX_THREADS 1

enum
{
  PROP_0,
  PROP_MAX_THREADS,
  PROP_LAST
};

GST_DEBUG_CATEGORY_STATIC (rtsp_thread_pool_debug);
#define GST_CAT_DEFAULT rtsp_thread_pool_debug

static GQuark thread_pool;

static void gst_rtsp_thread_pool_get_property (GObject * object, guint propid,
    GValue * value, GParamSpec * pspec);
static void gst_rtsp_thread_pool_set_property (GObject * object, guint propid,
    const GValue * value, GParamSpec * pspec);
static void gst_rtsp_thread_pool_finalize (GObject * obj);

static gpointer do_loop (GstRTSPThread * thread);
static GstRTSPThread *default_get_thread (GstRTSPThreadPool * pool,
    GstRTSPThreadType type, GstRTSPContext * ctx);

G_DEFINE_TYPE_WITH_PRIVATE (GstRTSPThreadPool, gst_rtsp_thread_pool,
    G_TYPE_OBJECT);

static void
gst_rtsp_thread_pool_class_init (GstRTSPThreadPoolClass * klass)
{
  GObjectClass *gobject_class;

  gobject_class = G_OBJECT_CLASS (klass);

  gobject_class->get_property = gst_rtsp_thread_pool_get_property;
  gobject_class->set_property = gst_rtsp_thread_pool_set_property;
  gobject_class->finalize = gst_rtsp_thread_pool_finalize;

  /**
   * GstRTSPThreadPool::max-threads:
   *
   * The maximum amount of threads to use for client connections. A value of
   * 0 means to use only the mainloop, -1 means an unlimited amount of
   * threads.
   */
  g_object_class_install_property (gobject_class, PROP_MAX_THREADS,
      g_param_spec_int ("max-threads", "Max Threads",
          "The maximum amount of threads to use for client connections "
          "(0 = only mainloop, -1 = unlimited)", -1, G_MAXINT,
          DEFAULT_MAX_THREADS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));

  klass->get_thread = default_get_thread;

  GST_DEBUG_CATEGORY_INIT (rtsp_thread_pool_debug, "rtspthreadpool", 0,
      "GstRTSPThreadPool");

  thread_pool = g_quark_from_string ("gst.rtsp.thread.pool");
}

static void
gst_rtsp_thread_pool_init (GstRTSPThreadPool * pool)
{
  GstRTSPThreadPoolPrivate *priv;

  pool->priv = priv = gst_rtsp_thread_pool_get_instance_private (pool);

  g_mutex_init (&priv->lock);
  priv->max_threads = DEFAULT_MAX_THREADS;
  g_queue_init (&priv->threads);
}

static void
gst_rtsp_thread_pool_finalize (GObject * obj)
{
  GstRTSPThreadPool *pool = GST_RTSP_THREAD_POOL (obj);
  GstRTSPThreadPoolPrivate *priv = pool->priv;

  GST_INFO ("finalize pool %p", pool);

  g_queue_clear (&priv->threads);
  g_mutex_clear (&priv->lock);

  G_OBJECT_CLASS (gst_rtsp_thread_pool_parent_class)->finalize (obj);
}

static void
gst_rtsp_thread_pool_get_property (GObject * object, guint propid,
    GValue * value, GParamSpec * pspec)
{
  GstRTSPThreadPool *pool = GST_RTSP_THREAD_POOL (object);

  switch (propid) {
    case PROP_MAX_THREADS:
      g_value_set_int (value, gst_rtsp_thread_pool_get_max_threads (pool));
      break;
    default:
      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
  }
}

static void
gst_rtsp_thread_pool_set_property (GObject * object, guint propid,
    const GValue * value, GParamSpec * pspec)
{
  GstRTSPThreadPool *pool = GST_RTSP_THREAD_POOL (object);

  switch (propid) {
    case PROP_MAX_THREADS:
      gst_rtsp_thread_pool_set_max_threads (pool, g_value_get_int (value));
      break;
    default:
      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
  }
}

static gpointer
do_loop (GstRTSPThread * thread)
{
  GstRTSPThreadPoolPrivate *priv;
  GstRTSPThreadPoolClass *klass;
  GstRTSPThreadPool *pool;

  pool = gst_mini_object_get_qdata (GST_MINI_OBJECT (thread), thread_pool);
  priv = pool->priv;

  klass = GST_RTSP_THREAD_POOL_GET_CLASS (pool);

  if (klass->thread_enter)
    klass->thread_enter (pool, thread);

  GST_INFO ("enter mainloop of thread %p", thread);
  g_main_loop_run (thread->loop);
  GST_INFO ("exit mainloop of thread %p", thread);

  if (klass->thread_leave)
    klass->thread_leave (pool, thread);

  g_mutex_lock (&priv->lock);
  g_queue_remove (&priv->threads, thread);
  g_mutex_unlock (&priv->lock);

  gst_rtsp_thread_unref (thread);

  return NULL;
}

/**
 * gst_rtsp_thread_pool_new:
 *
 * Create a new #GstRTSPThreadPool instance.
 *
 * Returns: (transfer full): a new #GstRTSPThreadPool
 */
GstRTSPThreadPool *
gst_rtsp_thread_pool_new (void)
{
  GstRTSPThreadPool *result;

  result = g_object_new (GST_TYPE_RTSP_THREAD_POOL, NULL);

  return result;
}

/**
 * gst_rtsp_thread_pool_set_max_threads:
 * @pool: a #GstRTSPThreadPool
 * @max_threads: maximum threads
 *
 * Set the maximum threads used by the pool to handle client requests.
 * A value of 0 will use the pool mainloop, a value of -1 will use an
 * unlimited number of threads.
 */
void
gst_rtsp_thread_pool_set_max_threads (GstRTSPThreadPool * pool,
    gint max_threads)
{
  GstRTSPThreadPoolPrivate *priv;

  g_return_if_fail (GST_IS_RTSP_THREAD_POOL (pool));

  priv = pool->priv;

  g_mutex_lock (&priv->lock);
  priv->max_threads = max_threads;
  g_mutex_unlock (&priv->lock);
}

/**
 * gst_rtsp_thread_pool_get_max_threads:
 * @pool: a #GstRTSPThreadPool
 *
 * Get the maximum number of threads used for client connections.
 * See gst_rtsp_thread_pool_set_max_threads().
 *
 * Returns: the maximum number of threads.
 */
gint
gst_rtsp_thread_pool_get_max_threads (GstRTSPThreadPool * pool)
{
  GstRTSPThreadPoolPrivate *priv;
  gint res;

  g_return_val_if_fail (GST_IS_RTSP_THREAD_POOL (pool), -1);

  priv = pool->priv;

  g_mutex_lock (&priv->lock);
  res = priv->max_threads;
  g_mutex_unlock (&priv->lock);

  return res;
}

static GstRTSPThread *
make_thread (GstRTSPThreadPool * pool, GstRTSPThreadType type,
    GstRTSPContext * ctx)
{
  GstRTSPThreadPoolClass *klass;
  GstRTSPThread *thread;

  klass = GST_RTSP_THREAD_POOL_GET_CLASS (pool);

  thread = gst_rtsp_thread_new (type);
  gst_mini_object_set_qdata (GST_MINI_OBJECT (thread), thread_pool,
      g_object_ref (pool), g_object_unref);

  GST_DEBUG_OBJECT (pool, "new thread %p", thread);

  if (klass->configure_thread)
    klass->configure_thread (pool, thread, ctx);

  return thread;
}

static GstRTSPThread *
default_get_thread (GstRTSPThreadPool * pool,
    GstRTSPThreadType type, GstRTSPContext * ctx)
{
  GstRTSPThreadPoolPrivate *priv = pool->priv;
  GstRTSPThreadPoolClass *klass;
  GstRTSPThread *thread;
  GError *error = NULL;

  klass = GST_RTSP_THREAD_POOL_GET_CLASS (pool);

  switch (type) {
    case GST_RTSP_THREAD_TYPE_CLIENT:
      if (priv->max_threads == 0) {
        /* no threads allowed */
        GST_DEBUG_OBJECT (pool, "no client threads allowed");
        thread = NULL;
      } else {
        g_mutex_lock (&priv->lock);
      retry:
        if (priv->max_threads > 0 &&
            g_queue_get_length (&priv->threads) >= priv->max_threads) {
          /* max threads reached, recycle from queue */
          thread = g_queue_pop_head (&priv->threads);
          GST_DEBUG_OBJECT (pool, "recycle client thread %p", thread);
          if (!gst_rtsp_thread_reuse (thread)) {
            GST_DEBUG_OBJECT (pool, "thread %p stopping, retry", thread);
            /* this can happen if we just decremented the reuse counter of the
             * thread and signaled the mainloop that it should stop. We leave
             * the thread out of the queue now, there is no point to add it
             * again, it will be removed from the mainloop otherwise after it
             * stops. */
            goto retry;
          }
        } else {
          /* make more threads */
          GST_DEBUG_OBJECT (pool, "make new client thread");
          thread = make_thread (pool, type, ctx);

          if (!g_thread_pool_push (klass->pool, gst_rtsp_thread_ref (thread),
                  &error))
            goto thread_error;
        }
        g_queue_push_tail (&priv->threads, thread);
        g_mutex_unlock (&priv->lock);
      }
      break;
    case GST_RTSP_THREAD_TYPE_MEDIA:
      GST_DEBUG_OBJECT (pool, "make new media thread");
      thread = make_thread (pool, type, ctx);

      if (!g_thread_pool_push (klass->pool, gst_rtsp_thread_ref (thread),
              &error))
        goto thread_error;
      break;
    default:
      thread = NULL;
      break;
  }
  return thread;

  /* ERRORS */
thread_error:
  {
    GST_ERROR_OBJECT (pool, "failed to push thread %s", error->message);
    gst_rtsp_thread_unref (thread);
    /* drop also the ref dedicated for the pool */
    gst_rtsp_thread_unref (thread);
    g_clear_error (&error);
    return NULL;
  }
}

/**
 * gst_rtsp_thread_pool_get_thread:
 * @pool: a #GstRTSPThreadPool
 * @type: the #GstRTSPThreadType
 * @ctx: (transfer none): a #GstRTSPContext
 *
 * Get a new #GstRTSPThread for @type and @ctx.
 *
 * Returns: (transfer full) (nullable): a new #GstRTSPThread,
 * gst_rtsp_thread_stop() after usage
 */
GstRTSPThread *
gst_rtsp_thread_pool_get_thread (GstRTSPThreadPool * pool,
    GstRTSPThreadType type, GstRTSPContext * ctx)
{
  GstRTSPThreadPoolClass *klass;
  GstRTSPThread *result = NULL;

  g_return_val_if_fail (GST_IS_RTSP_THREAD_POOL (pool), NULL);

  klass = GST_RTSP_THREAD_POOL_GET_CLASS (pool);

  /* We want to be thread safe as there might be 2 threads wanting to get new
   * #GstRTSPThread at the same time
   */
  if (G_UNLIKELY (!g_atomic_pointer_get (&klass->pool))) {
    GThreadPool *t_pool;
    t_pool = g_thread_pool_new ((GFunc) do_loop, klass, -1, FALSE, NULL);
    if (!g_atomic_pointer_compare_and_exchange (&klass->pool,
            (GThreadPool *) NULL, t_pool))
      g_thread_pool_free (t_pool, FALSE, TRUE);
  }

  if (klass->get_thread)
    result = klass->get_thread (pool, type, ctx);

  return result;
}

/**
 * gst_rtsp_thread_pool_cleanup:
 *
 * Wait for all tasks to be stopped and free all allocated resources. This is
 * mainly used in test suites to ensure proper cleanup of internal data
 * structures.
 */
void
gst_rtsp_thread_pool_cleanup (void)
{
  GstRTSPThreadPoolClass *klass;

  klass =
      GST_RTSP_THREAD_POOL_CLASS (g_type_class_ref
      (gst_rtsp_thread_pool_get_type ()));
  if (klass->pool != NULL) {
    g_thread_pool_free (klass->pool, FALSE, TRUE);
    klass->pool = NULL;
  }
  g_type_class_unref (klass);
}