mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2024-11-26 11:41:09 +00:00
ges: discoverer-manager: Support for delayed clean-up of discoverers
The manager keeps track of one discoverer per thread and in large applications with hundreds of threads this can significantly increase memory pressure. So we need to periodically clean-up the unused discoverers. Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/5608>
This commit is contained in:
parent
101205d429
commit
b8a7b2c9e2
3 changed files with 139 additions and 22 deletions
|
@ -1,12 +1,44 @@
|
||||||
#include "ges-internal.h"
|
#include "ges-internal.h"
|
||||||
#include "ges-discoverer-manager.h"
|
#include "ges-discoverer-manager.h"
|
||||||
|
|
||||||
|
|
||||||
|
typedef struct
|
||||||
|
{
|
||||||
|
GWeakRef /*<GESDiscovererManager> */ manager;
|
||||||
|
GstDiscoverer *discoverer;
|
||||||
|
GThread *thread;
|
||||||
|
gint n_uri;
|
||||||
|
gulong load_serialized_info_id;
|
||||||
|
gulong source_setup_id;
|
||||||
|
gulong discovered_id;
|
||||||
|
} GESDiscovererData;
|
||||||
|
|
||||||
|
static void
|
||||||
|
ges_discoverer_data_free (GESDiscovererData * data)
|
||||||
|
{
|
||||||
|
GST_LOG ("Freeing discoverer %" GST_PTR_FORMAT, data->discoverer);
|
||||||
|
g_assert (data->n_uri == 0);
|
||||||
|
gst_discoverer_stop (data->discoverer);
|
||||||
|
g_signal_handler_disconnect (data->discoverer, data->load_serialized_info_id);
|
||||||
|
g_signal_handler_disconnect (data->discoverer, data->source_setup_id);
|
||||||
|
g_signal_handler_disconnect (data->discoverer, data->discovered_id);
|
||||||
|
g_weak_ref_clear (&data->manager);
|
||||||
|
|
||||||
|
g_object_unref (data->discoverer);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
ges_discoverer_data_unref (GESDiscovererData * data)
|
||||||
|
{
|
||||||
|
g_atomic_rc_box_release_full (data,
|
||||||
|
(GDestroyNotify) ges_discoverer_data_free);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* GESDiscovererManager:
|
* GESDiscovererManager:
|
||||||
*
|
*
|
||||||
* Since: 1.24
|
* Since: 1.24
|
||||||
*/
|
*/
|
||||||
|
|
||||||
struct _GESDiscovererManager
|
struct _GESDiscovererManager
|
||||||
{
|
{
|
||||||
GObject parent;
|
GObject parent;
|
||||||
|
@ -86,8 +118,28 @@ static void
|
||||||
ges_discoverer_manager_finalize (GObject * object)
|
ges_discoverer_manager_finalize (GObject * object)
|
||||||
{
|
{
|
||||||
GESDiscovererManager *self = GES_DISCOVERER_MANAGER (object);
|
GESDiscovererManager *self = GES_DISCOVERER_MANAGER (object);
|
||||||
|
GHashTableIter iter;
|
||||||
|
GESDiscovererData *discoverer_data;
|
||||||
|
GMainContext *context = g_main_context_get_thread_default ();
|
||||||
|
|
||||||
|
if (!context)
|
||||||
|
context = g_main_context_default ();
|
||||||
|
|
||||||
|
|
||||||
|
g_mutex_lock (&self->lock);
|
||||||
|
g_hash_table_iter_init (&iter, self->discoverers);
|
||||||
|
while (g_hash_table_iter_next (&iter, NULL, (gpointer *) & discoverer_data)) {
|
||||||
|
GSource *source;
|
||||||
|
|
||||||
|
while ((source =
|
||||||
|
g_main_context_find_source_by_user_data (context,
|
||||||
|
discoverer_data))) {
|
||||||
|
g_source_destroy (source);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
g_hash_table_unref (self->discoverers);
|
g_hash_table_unref (self->discoverers);
|
||||||
|
g_mutex_unlock (&self->lock);
|
||||||
|
|
||||||
G_OBJECT_CLASS (ges_discoverer_manager_parent_class)->finalize (object);
|
G_OBJECT_CLASS (ges_discoverer_manager_parent_class)->finalize (object);
|
||||||
}
|
}
|
||||||
|
@ -187,7 +239,7 @@ void
|
||||||
ges_discoverer_manager_init (GESDiscovererManager * self)
|
ges_discoverer_manager_init (GESDiscovererManager * self)
|
||||||
{
|
{
|
||||||
self->discoverers = g_hash_table_new_full (g_direct_hash, g_str_equal,
|
self->discoverers = g_hash_table_new_full (g_direct_hash, g_str_equal,
|
||||||
NULL, g_object_unref);
|
NULL, (GDestroyNotify) ges_discoverer_data_unref);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -276,7 +328,7 @@ ges_discoverer_manager_set_timeout (GESDiscovererManager * self,
|
||||||
GstClockTime timeout)
|
GstClockTime timeout)
|
||||||
{
|
{
|
||||||
GHashTableIter iter;
|
GHashTableIter iter;
|
||||||
GstDiscoverer *discoverer;
|
GESDiscovererData *discoverer_data;
|
||||||
|
|
||||||
g_return_if_fail (GES_IS_DISCOVERER_MANAGER (self));
|
g_return_if_fail (GES_IS_DISCOVERER_MANAGER (self));
|
||||||
|
|
||||||
|
@ -284,8 +336,8 @@ ges_discoverer_manager_set_timeout (GESDiscovererManager * self,
|
||||||
|
|
||||||
g_mutex_lock (&self->lock);
|
g_mutex_lock (&self->lock);
|
||||||
g_hash_table_iter_init (&iter, self->discoverers);
|
g_hash_table_iter_init (&iter, self->discoverers);
|
||||||
while (g_hash_table_iter_next (&iter, NULL, (gpointer *) & discoverer))
|
while (g_hash_table_iter_next (&iter, NULL, (gpointer *) & discoverer_data))
|
||||||
g_object_set (discoverer, "timeout", timeout, NULL);
|
g_object_set (discoverer_data->discoverer, "timeout", timeout, NULL);
|
||||||
g_mutex_unlock (&self->lock);
|
g_mutex_unlock (&self->lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -305,37 +357,86 @@ source_setup_cb (GESDiscovererManager * self, GstElement * source)
|
||||||
g_signal_emit (self, signals[DISCOVERER_SOURCE_SETUP], 0, source);
|
g_signal_emit (self, signals[DISCOVERER_SOURCE_SETUP], 0, source);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static gboolean
|
||||||
proxy_discovered_cb (GESDiscovererManager * self,
|
cleanup_discoverer_cb (GESDiscovererData * discoverer_data)
|
||||||
GstDiscovererInfo * info, GError * err, gpointer user_data)
|
|
||||||
{
|
{
|
||||||
g_signal_emit (self, signals[DISCOVERER_SIGNAL], 0, info, err);
|
GESDiscovererManager *self = g_weak_ref_get (&discoverer_data->manager);
|
||||||
|
gint res = G_SOURCE_CONTINUE;
|
||||||
|
|
||||||
|
if (!self) {
|
||||||
|
return G_SOURCE_REMOVE;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (discoverer_data->n_uri > 0) {
|
||||||
|
GST_DEBUG_OBJECT (self, "Discoverer still has %d uris to discover",
|
||||||
|
discoverer_data->n_uri);
|
||||||
|
goto done;
|
||||||
|
}
|
||||||
|
|
||||||
|
g_mutex_lock (&self->lock);
|
||||||
|
res = G_SOURCE_REMOVE;
|
||||||
|
GST_DEBUG_OBJECT (self, "Removing unused discoverer");
|
||||||
|
g_hash_table_remove (self->discoverers, discoverer_data->thread);
|
||||||
|
|
||||||
|
done:
|
||||||
|
g_mutex_unlock (&self->lock);
|
||||||
|
g_object_unref (self);
|
||||||
|
|
||||||
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
proxy_discovered_cb (GESDiscovererManager * self,
|
||||||
|
GstDiscovererInfo * info, GError * err, GstDiscoverer * discoverer)
|
||||||
|
{
|
||||||
|
g_signal_emit (self, signals[DISCOVERER_SIGNAL], 0, info, err);
|
||||||
|
|
||||||
static GstDiscoverer *
|
g_mutex_lock (&self->lock);
|
||||||
|
GESDiscovererData *data =
|
||||||
|
g_hash_table_lookup (self->discoverers, g_thread_self ());
|
||||||
|
if (data) {
|
||||||
|
data->n_uri--;
|
||||||
|
data = g_atomic_rc_box_acquire (data);
|
||||||
|
}
|
||||||
|
g_mutex_unlock (&self->lock);
|
||||||
|
|
||||||
|
if (data) {
|
||||||
|
ges_timeout_add (1000, (GSourceFunc) cleanup_discoverer_cb, data,
|
||||||
|
(GDestroyNotify) ges_discoverer_data_unref);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static GESDiscovererData *
|
||||||
create_discoverer (GESDiscovererManager * self)
|
create_discoverer (GESDiscovererManager * self)
|
||||||
{
|
{
|
||||||
GstDiscoverer *discoverer;
|
GstDiscoverer *discoverer;
|
||||||
|
|
||||||
|
GESDiscovererData *data = g_atomic_rc_box_new0 (GESDiscovererData);
|
||||||
discoverer = gst_discoverer_new (self->timeout, NULL);
|
discoverer = gst_discoverer_new (self->timeout, NULL);
|
||||||
g_signal_connect_swapped (discoverer, "load-serialized-info",
|
data->thread = g_thread_self ();
|
||||||
|
g_weak_ref_set (&data->manager, self);
|
||||||
|
data->load_serialized_info_id =
|
||||||
|
g_signal_connect_swapped (discoverer, "load-serialized-info",
|
||||||
G_CALLBACK (proxy_load_serialized_info_cb), self);
|
G_CALLBACK (proxy_load_serialized_info_cb), self);
|
||||||
g_signal_connect_swapped (discoverer, "source-setup",
|
data->source_setup_id =
|
||||||
|
g_signal_connect_swapped (discoverer, "source-setup",
|
||||||
G_CALLBACK (source_setup_cb), self);
|
G_CALLBACK (source_setup_cb), self);
|
||||||
g_signal_connect_swapped (discoverer, "discovered",
|
data->discovered_id =
|
||||||
|
g_signal_connect_swapped (discoverer, "discovered",
|
||||||
G_CALLBACK (proxy_discovered_cb), self);
|
G_CALLBACK (proxy_discovered_cb), self);
|
||||||
g_object_set (discoverer, "use-cache", self->use_cache, NULL);
|
g_object_set (discoverer, "use-cache", self->use_cache, NULL);
|
||||||
|
|
||||||
gst_discoverer_start (discoverer);
|
gst_discoverer_start (discoverer);
|
||||||
|
|
||||||
return discoverer;
|
data->discoverer = discoverer;
|
||||||
|
|
||||||
|
return data;
|
||||||
}
|
}
|
||||||
|
|
||||||
static GstDiscoverer *
|
static GESDiscovererData *
|
||||||
ges_discoverer_manager_get_discoverer (GESDiscovererManager * self)
|
ges_discoverer_manager_get_discoverer (GESDiscovererManager * self)
|
||||||
{
|
{
|
||||||
GstDiscoverer *ret;
|
GESDiscovererData *ret;
|
||||||
|
|
||||||
g_return_val_if_fail (GES_IS_DISCOVERER_MANAGER (self), NULL);
|
g_return_val_if_fail (GES_IS_DISCOVERER_MANAGER (self), NULL);
|
||||||
|
|
||||||
|
@ -355,19 +456,19 @@ gboolean
|
||||||
ges_discoverer_manager_start_discovery (GESDiscovererManager * self,
|
ges_discoverer_manager_start_discovery (GESDiscovererManager * self,
|
||||||
const gchar * uri)
|
const gchar * uri)
|
||||||
{
|
{
|
||||||
GstDiscoverer *discoverer;
|
GESDiscovererData *disco_data;
|
||||||
|
|
||||||
g_return_val_if_fail (uri != NULL, FALSE);
|
g_return_val_if_fail (uri != NULL, FALSE);
|
||||||
|
|
||||||
discoverer = ges_discoverer_manager_get_discoverer (self);
|
disco_data = ges_discoverer_manager_get_discoverer (self);
|
||||||
|
|
||||||
gboolean res = gst_discoverer_discover_uri_async (discoverer, uri);
|
|
||||||
|
|
||||||
g_mutex_lock (&self->lock);
|
g_mutex_lock (&self->lock);
|
||||||
g_hash_table_insert (self->discoverers, g_thread_self (), discoverer);
|
gboolean res =
|
||||||
|
gst_discoverer_discover_uri_async (disco_data->discoverer, uri);
|
||||||
|
disco_data->n_uri++;
|
||||||
|
g_hash_table_insert (self->discoverers, g_thread_self (), disco_data);
|
||||||
g_mutex_unlock (&self->lock);
|
g_mutex_unlock (&self->lock);
|
||||||
|
|
||||||
|
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -423,6 +423,8 @@ ges_get_compositor_factory (void);
|
||||||
|
|
||||||
G_GNUC_INTERNAL void
|
G_GNUC_INTERNAL void
|
||||||
ges_idle_add (GSourceFunc func, gpointer udata, GDestroyNotify notify);
|
ges_idle_add (GSourceFunc func, gpointer udata, GDestroyNotify notify);
|
||||||
|
G_GNUC_INTERNAL void
|
||||||
|
ges_timeout_add (guint interval, GSourceFunc func, gpointer udata, GDestroyNotify notify);
|
||||||
|
|
||||||
G_GNUC_INTERNAL gboolean
|
G_GNUC_INTERNAL gboolean
|
||||||
ges_util_structure_get_clocktime (GstStructure *structure, const gchar *name,
|
ges_util_structure_get_clocktime (GstStructure *structure, const gchar *name,
|
||||||
|
|
|
@ -280,6 +280,20 @@ ges_get_compositor_factory (void)
|
||||||
return compositor_factory;
|
return compositor_factory;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
ges_timeout_add (guint interval, GSourceFunc func, gpointer udata,
|
||||||
|
GDestroyNotify notify)
|
||||||
|
{
|
||||||
|
GMainContext *context = g_main_context_get_thread_default ();
|
||||||
|
GSource *source = g_timeout_source_new (interval);
|
||||||
|
|
||||||
|
if (!context)
|
||||||
|
context = g_main_context_default ();
|
||||||
|
|
||||||
|
g_source_set_callback (source, func, udata, notify);
|
||||||
|
g_source_attach (source, context);
|
||||||
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
ges_idle_add (GSourceFunc func, gpointer udata, GDestroyNotify notify)
|
ges_idle_add (GSourceFunc func, gpointer udata, GDestroyNotify notify)
|
||||||
{
|
{
|
||||||
|
|
Loading…
Reference in a new issue