rusage: implement windowing of cpuload

Add a local help to the rusage plugin that supports windowing of values. We want
to generalize this for use in other plugins.
This commit is contained in:
Stefan Sauer 2014-09-10 07:55:33 +02:00
parent 018858980b
commit dbb1897e9b
2 changed files with 140 additions and 23 deletions

View file

@ -24,10 +24,6 @@
* *
* A tracing module that take rusage() snapshots and logs them. * A tracing module that take rusage() snapshots and logs them.
*/ */
/* TODO: log more items, cpuload is calculated as an aggregated value
* - in many cases a windowed value would be more interesting to see local
* cpu-load spikes
*/
#ifdef HAVE_CONFIG_H #ifdef HAVE_CONFIG_H
# include "config.h" # include "config.h"
@ -48,24 +44,119 @@ GST_DEBUG_CATEGORY_STATIC (gst_rusage_debug);
G_DEFINE_TYPE_WITH_CODE (GstRUsageTracer, gst_rusage_tracer, GST_TYPE_TRACER, G_DEFINE_TYPE_WITH_CODE (GstRUsageTracer, gst_rusage_tracer, GST_TYPE_TRACER,
_do_init); _do_init);
/* we remember x measurements per self->window */
#define WINDOW_SUBDIV 100
/* for ts calibration */ /* for ts calibration */
static gpointer main_thread_id = NULL; static gpointer main_thread_id = NULL;
static guint64 tproc_base = G_GINT64_CONSTANT (0); static guint64 tproc_base = G_GINT64_CONSTANT (0);
typedef struct
{
GstClockTime ts;
GstClockTime val;
} GstTraceValue;
typedef struct
{
GstClockTime window;
GMutex lock;
GQueue values; /* GstTraceValue* */
} GstTraceValues;
typedef struct typedef struct
{ {
/* time spend in this thread */ /* time spend in this thread */
GstClockTime tthread; GstClockTime tthread;
GstTraceValues *tvs_thread;
} GstThreadStats; } GstThreadStats;
static GstTraceValues *tvs_proc;
static void gst_rusage_tracer_invoke (GstTracer * self, GstTracerHookId id, static void gst_rusage_tracer_invoke (GstTracer * self, GstTracerHookId id,
GstTracerMessageId mid, va_list var_args); GstTracerMessageId mid, va_list var_args);
/* data helper */ /* data helper */
static void
free_trace_value (gpointer data)
{
g_slice_free (GstTraceValue, data);
}
static GstTraceValues *
make_trace_values (GstClockTime window)
{
GstTraceValues *self = g_slice_new0 (GstTraceValues);
self->window = window;
g_mutex_init (&self->lock);
g_queue_init (&self->values);
return self;
}
static void
free_trace_values (GstTraceValues * self)
{
g_queue_free_full (&self->values, free_trace_value);
g_mutex_clear (&self->lock);
g_slice_free (GstTraceValues, self);
}
static gboolean
update_trace_value (GstTraceValues * self, GstClockTime nts,
GstClockTime nval, GstClockTime * dts, GstClockTime * dval)
{
GstTraceValue *lv;
GstClockTimeDiff dt;
GstClockTime window = self->window;
GQueue *q = &self->values;
GList *node = q->tail;
gboolean ret = FALSE;
/* search from the tail of the queue for a good GstTraceValue */
while (node) {
lv = node->data;
dt = GST_CLOCK_DIFF (lv->ts, nts);
if (dt < window) {
break;
} else {
node = g_list_previous (node);
}
}
if (node) {
/* calculate the windowed value */
*dts = dt;
*dval = GST_CLOCK_DIFF (lv->val, nval);
/* drop all older measurements */
while (q->tail != node) {
free_trace_value (g_queue_pop_tail (q));
}
ret = TRUE;
} else {
*dts = nts;
*dval = nval;
}
/* don't push too many data items */
lv = q->head ? q->head->data : NULL;
if (!lv || (GST_CLOCK_DIFF (lv->ts, nts) > (window / WINDOW_SUBDIV))) {
/* push the new measurement */
lv = g_slice_new0 (GstTraceValue);
lv->ts = nts;
lv->val = nval;
g_queue_push_head (q, lv);
}
return ret;
}
static void static void
free_thread_stats (gpointer data) free_thread_stats (gpointer data)
{ {
free_trace_values (((GstThreadStats *) data)->tvs_thread);
g_slice_free (GstThreadStats, data); g_slice_free (GstThreadStats, data);
} }
@ -84,6 +175,7 @@ gst_rusage_tracer_init (GstRUsageTracer * self)
self->threads = g_hash_table_new_full (NULL, NULL, NULL, free_thread_stats); self->threads = g_hash_table_new_full (NULL, NULL, NULL, free_thread_stats);
main_thread_id = g_thread_self (); main_thread_id = g_thread_self ();
tvs_proc = make_trace_values (GST_SECOND);
GST_DEBUG ("rusage: main thread=%p", main_thread_id); GST_DEBUG ("rusage: main thread=%p", main_thread_id);
/* announce trace formats */ /* announce trace formats */
@ -92,11 +184,19 @@ gst_rusage_tracer_init (GstRUsageTracer * self)
"thread-id", GST_TYPE_STRUCTURE, gst_structure_new ("scope", "thread-id", GST_TYPE_STRUCTURE, gst_structure_new ("scope",
"related-to", G_TYPE_STRING, "thread", // use genum "related-to", G_TYPE_STRING, "thread", // use genum
NULL), NULL),
"cpuload", GST_TYPE_STRUCTURE, gst_structure_new ("value", "average-cpuload", GST_TYPE_STRUCTURE, gst_structure_new ("value",
"type", G_TYPE_GTYPE, G_TYPE_UINT, "type", G_TYPE_GTYPE, G_TYPE_UINT,
"description", G_TYPE_STRING, "cpu usage per thread", "description", G_TYPE_STRING, "average cpu usage per thread",
"flags", G_TYPE_STRING, "aggregated", // use gflags "flags", G_TYPE_STRING, "aggregated", // use gflags
"min", G_TYPE_UINT, 0, "max", G_TYPE_UINT, 100, "min", G_TYPE_UINT, 0,
"max", G_TYPE_UINT, 100,
NULL),
"current-cpuload", GST_TYPE_STRUCTURE, gst_structure_new ("value",
"type", G_TYPE_GTYPE, G_TYPE_UINT,
"description", G_TYPE_STRING, "current cpu usage per thread",
"flags", G_TYPE_STRING, "windowed", // use gflags
"min", G_TYPE_UINT, 0,
"max", G_TYPE_UINT, 100,
NULL), NULL),
"time", GST_TYPE_STRUCTURE, gst_structure_new ("value", "time", GST_TYPE_STRUCTURE, gst_structure_new ("value",
"type", G_TYPE_GTYPE, G_TYPE_UINT64, "type", G_TYPE_GTYPE, G_TYPE_UINT64,
@ -110,11 +210,19 @@ gst_rusage_tracer_init (GstRUsageTracer * self)
"thread-id", GST_TYPE_STRUCTURE, gst_structure_new ("scope", "thread-id", GST_TYPE_STRUCTURE, gst_structure_new ("scope",
"related-to", G_TYPE_STRING, "process", // use genum "related-to", G_TYPE_STRING, "process", // use genum
NULL), NULL),
"cpuload", GST_TYPE_STRUCTURE, gst_structure_new ("value", "average-cpuload", GST_TYPE_STRUCTURE, gst_structure_new ("value",
"type", G_TYPE_GTYPE, G_TYPE_UINT, "type", G_TYPE_GTYPE, G_TYPE_UINT,
"description", G_TYPE_STRING, "cpu usage per process", "description", G_TYPE_STRING, "average cpu usage per process",
"flags", G_TYPE_STRING, "aggregated", // use gflags "flags", G_TYPE_STRING, "aggregated", // use gflags
"min", G_TYPE_UINT, 0, "max", G_TYPE_UINT, 100, "min", G_TYPE_UINT, 0,
"max", G_TYPE_UINT, 100,
NULL),
"current-cpuload", GST_TYPE_STRUCTURE, gst_structure_new ("value",
"type", G_TYPE_GTYPE, G_TYPE_UINT,
"description", G_TYPE_STRING, "current cpu usage per process",
"flags", G_TYPE_STRING, "windowed", // use gflags
"min", G_TYPE_UINT, 0,
"max", G_TYPE_UINT, 100,
NULL), NULL),
"time", GST_TYPE_STRUCTURE, gst_structure_new ("value", "time", GST_TYPE_STRUCTURE, gst_structure_new ("value",
"type", G_TYPE_GTYPE, G_TYPE_UINT64, "type", G_TYPE_GTYPE, G_TYPE_UINT64,
@ -135,11 +243,10 @@ gst_rusage_tracer_invoke (GstTracer * obj, GstTracerHookId hid,
guint64 ts = va_arg (var_args, guint64); guint64 ts = va_arg (var_args, guint64);
GstThreadStats *stats; GstThreadStats *stats;
gpointer thread_id = g_thread_self (); gpointer thread_id = g_thread_self ();
guint cpuload = 0; guint avg_cpuload, cur_cpuload;
struct rusage ru; struct rusage ru;
GstClockTime tproc = G_GUINT64_CONSTANT (0); GstClockTime tproc = G_GUINT64_CONSTANT (0);
GstClockTime dts, dtproc;
// FIXME(ensonic): not threadsafe
static GstClockTime last_ts = G_GUINT64_CONSTANT (0); static GstClockTime last_ts = G_GUINT64_CONSTANT (0);
getrusage (RUSAGE_SELF, &ru); getrusage (RUSAGE_SELF, &ru);
@ -147,6 +254,7 @@ gst_rusage_tracer_invoke (GstTracer * obj, GstTracerHookId hid,
/* get stats record for current thread */ /* get stats record for current thread */
if (!(stats = g_hash_table_lookup (self->threads, thread_id))) { if (!(stats = g_hash_table_lookup (self->threads, thread_id))) {
stats = g_slice_new0 (GstThreadStats); stats = g_slice_new0 (GstThreadStats);
stats->tvs_thread = make_trace_values (GST_SECOND);
g_hash_table_insert (self->threads, thread_id, stats); g_hash_table_insert (self->threads, thread_id, stats);
} }
#ifdef HAVE_CLOCK_GETTIME #ifdef HAVE_CLOCK_GETTIME
@ -175,9 +283,6 @@ gst_rusage_tracer_invoke (GstTracer * obj, GstTracerHookId hid,
stats->tthread += GST_CLOCK_DIFF (last_ts, ts); stats->tthread += GST_CLOCK_DIFF (last_ts, ts);
#endif #endif
/* remember last timestamp for fallback calculations */
last_ts = ts;
/* Calibrate ts for the process and main thread. For tthread[main] and tproc /* Calibrate ts for the process and main thread. For tthread[main] and tproc
* the time is larger than ts, as our base-ts is taken after the process has * the time is larger than ts, as our base-ts is taken after the process has
* started. * started.
@ -195,7 +300,7 @@ gst_rusage_tracer_invoke (GstTracer * obj, GstTracerHookId hid,
stats->tthread -= tproc_base; stats->tthread -= tproc_base;
} }
} }
/* we always need to corect proc time */ /* we always need to correct proc time */
tproc -= tproc_base; tproc -= tproc_base;
/* FIXME: how can we take cpu-frequency scaling into account? /* FIXME: how can we take cpu-frequency scaling into account?
@ -206,18 +311,29 @@ gst_rusage_tracer_invoke (GstTracer * obj, GstTracerHookId hid,
* cpufreq-selector -g ondemand * cpufreq-selector -g ondemand
*/ */
/* *INDENT-OFF* */ /* *INDENT-OFF* */
cpuload = (guint) gst_util_uint64_scale (stats->tthread, avg_cpuload = (guint) gst_util_uint64_scale (stats->tthread,
G_GINT64_CONSTANT (100), ts); G_GINT64_CONSTANT (100), ts);
update_trace_value (stats->tvs_thread, ts, stats->tthread, &dts, &dtproc);
cur_cpuload = (guint) gst_util_uint64_scale (dtproc, G_GINT64_CONSTANT (100), dts);
gst_tracer_log_trace (gst_structure_new ("thread-rusage", gst_tracer_log_trace (gst_structure_new ("thread-rusage",
"ts", G_TYPE_UINT64, ts, "ts", G_TYPE_UINT64, ts,
"thread-id", G_TYPE_UINT, GPOINTER_TO_UINT (thread_id), "thread-id", G_TYPE_UINT, GPOINTER_TO_UINT (thread_id),
"cpuload", G_TYPE_UINT, cpuload, "average-cpuload", G_TYPE_UINT, MIN (avg_cpuload, 1000),
"current-cpuload", G_TYPE_UINT, MIN (cur_cpuload, 1000),
"time", G_TYPE_UINT64, stats->tthread, "time", G_TYPE_UINT64, stats->tthread,
NULL)); NULL));
cpuload = (guint) gst_util_uint64_scale (tproc, G_GINT64_CONSTANT (100), ts);
avg_cpuload = (guint) gst_util_uint64_scale (tproc, G_GINT64_CONSTANT (100), ts);
g_mutex_lock (&tvs_proc->lock);
update_trace_value (tvs_proc, ts, tproc, &dts, &dtproc);
/* remember last timestamp for fallback calculations */
last_ts = ts;
g_mutex_unlock (&tvs_proc->lock);
cur_cpuload = (guint) gst_util_uint64_scale (dtproc, G_GINT64_CONSTANT (100), dts);
gst_tracer_log_trace (gst_structure_new ("proc-rusage", gst_tracer_log_trace (gst_structure_new ("proc-rusage",
"ts", G_TYPE_UINT64, ts, "ts", G_TYPE_UINT64, ts,
"cpuload", G_TYPE_UINT, cpuload, "average-cpuload", G_TYPE_UINT, MIN (avg_cpuload, 1000),
"current-cpuload", G_TYPE_UINT, MIN (cur_cpuload, 1000),
"time", G_TYPE_UINT64, tproc, "time", G_TYPE_UINT64, tproc,
NULL)); NULL));
/* *INDENT-ON* */ /* *INDENT-ON* */

View file

@ -385,7 +385,8 @@ do_thread_rusage_stats (GstStructure * s)
gst_structure_get (s, "ts", G_TYPE_UINT64, &ts, gst_structure_get (s, "ts", G_TYPE_UINT64, &ts,
"thread-id", G_TYPE_UINT, &thread_id, "thread-id", G_TYPE_UINT, &thread_id,
"cpuload", G_TYPE_UINT, &cpuload, "time", G_TYPE_UINT64, &tthread, NULL); "average-cpuload", G_TYPE_UINT, &cpuload, "time", G_TYPE_UINT64, &tthread,
NULL);
thread_stats = get_thread_stats (thread_id); thread_stats = get_thread_stats (thread_id);
thread_stats->cpuload = cpuload; thread_stats->cpuload = cpuload;
thread_stats->tthread = tthread; thread_stats->tthread = tthread;
@ -398,7 +399,7 @@ do_proc_rusage_stats (GstStructure * s)
guint64 ts; guint64 ts;
gst_structure_get (s, "ts", G_TYPE_UINT64, &ts, gst_structure_get (s, "ts", G_TYPE_UINT64, &ts,
"cpuload", G_TYPE_UINT, &total_cpuload, NULL); "average-cpuload", G_TYPE_UINT, &total_cpuload, NULL);
last_ts = MAX (last_ts, ts); last_ts = MAX (last_ts, ts);
} }