Change from pthreads to GThreads

Original commit message from CVS:
Change from pthreads to GThreads
This commit is contained in:
David Schleef 2002-11-01 21:38:39 +00:00
parent ec02fff666
commit e7db7a70ad
6 changed files with 93 additions and 125 deletions

View file

@ -20,7 +20,8 @@
* Boston, MA 02111-1307, USA.
*/
#include <pthread.h>
#include <glib.h>
#include <stdio.h>
#include <stdlib.h>
#include <signal.h>
@ -59,12 +60,12 @@ struct _cothread_context
};
/* this _cothread_ctx_key is used as a pthread key to the thread's context
* a pthread key is a "pointer" to memory space that is/can be different
/* this _cothread_ctx_key is used as a GThread key to the thread's context
* a GThread key is a "pointer" to memory space that is/can be different
* (ie. private) for each thread. The key itself is shared among threads,
* so it only needs to be initialized once.
*/
static pthread_key_t _cothread_ctx_key = -1;
static GPrivate *_cothread_ctx_key;
/* Disabling this define allows you to shut off a few checks in
* cothread_switch. This likely will speed things up fractionally */
@ -83,7 +84,14 @@ cothread_context_init (void)
/*
* initalize the whole of the cothreads context
*/
cothread_context *ctx = (cothread_context *) g_malloc (sizeof (cothread_context));
cothread_context *ctx;
/* if there already is a cotread context for this thread,
* just return it */
ctx = g_private_get (_cothread_ctx_key);
if(ctx) return ctx;
ctx = (cothread_context *) g_malloc (sizeof (cothread_context));
/* we consider the initiating process to be cothread 0 */
ctx->ncothreads = 1;
@ -92,16 +100,18 @@ cothread_context_init (void)
GST_INFO (GST_CAT_COTHREADS, "initializing cothreads");
/* initialize the cothread key (for pthread space) if not done yet */
if (_cothread_ctx_key == (pthread_key_t) -1) {
if (pthread_key_create (&_cothread_ctx_key, NULL) != 0) {
perror ("pthread_key_create");
/* initialize the cothread key (for GThread space) if not done yet */
/* FIXME this should be done in cothread_init() */
if (_cothread_ctx_key == NULL) {
_cothread_ctx_key = g_private_new (NULL);
if (_cothread_ctx_key == NULL) {
perror ("g_private_new");
return NULL;
}
}
/* set this thread's context pointer */
pthread_setspecific (_cothread_ctx_key, ctx);
g_private_set (_cothread_ctx_key, ctx);
/* clear the cothread data */
@ -189,6 +199,7 @@ cothread_create (cothread_context *ctx)
GST_DEBUG (GST_CAT_COTHREADS, "Found free cothread slot %d", slot);
sp = CURRENT_STACK_FRAME;
printf("stack pointer %p\n",sp);
/* FIXME this may not be 64bit clean
* could use casts to uintptr_t from inttypes.h
* if only all platforms had inttypes.h
@ -392,14 +403,14 @@ cothread_main (cothread_context * ctx)
/**
* cothread_current_main:
*
* Get the main thread in the current pthread.
* Get the main thread in the current GThread.
*
* Returns: the #cothread_state of the main (0th) thread in the current pthread
* Returns: the #cothread_state of the main (0th) thread in the current GThread
*/
cothread_state *
cothread_current_main (void)
{
cothread_context *ctx = pthread_getspecific (_cothread_ctx_key);
cothread_context *ctx = g_private_get (_cothread_ctx_key);
return ctx->cothreads[0];
}
@ -414,7 +425,7 @@ cothread_current_main (void)
cothread_state *
cothread_current (void)
{
cothread_context *ctx = pthread_getspecific (_cothread_ctx_key);
cothread_context *ctx = g_private_get (_cothread_ctx_key);
return ctx->cothreads[ctx->current];
}
@ -422,7 +433,7 @@ cothread_current (void)
static void
cothread_stub (void)
{
cothread_context *ctx = pthread_getspecific (_cothread_ctx_key);
cothread_context *ctx = g_private_get (_cothread_ctx_key);
register cothread_state *thread = ctx->cothreads[ctx->current];
GST_DEBUG_ENTER ("");
@ -448,7 +459,7 @@ int cothread_getcurrent (void) __attribute__ ((no_instrument_function));
int
cothread_getcurrent (void)
{
cothread_context *ctx = pthread_getspecific (_cothread_ctx_key);
cothread_context *ctx = g_private_get (_cothread_ctx_key);
if (!ctx)
return -1;
@ -479,7 +490,7 @@ cothread_set_private (cothread_state *thread, gpointer data)
void
cothread_context_set_data (cothread_state *thread, gchar *key, gpointer data)
{
cothread_context *ctx = pthread_getspecific (_cothread_ctx_key);
cothread_context *ctx = g_private_get (_cothread_ctx_key);
g_hash_table_insert (ctx->data, key, data);
}
@ -510,7 +521,7 @@ cothread_get_private (cothread_state *thread)
gpointer
cothread_context_get_data (cothread_state * thread, gchar * key)
{
cothread_context *ctx = pthread_getspecific (_cothread_ctx_key);
cothread_context *ctx = g_private_get (_cothread_ctx_key);
return g_hash_table_lookup (ctx->data, key);
}

View file

@ -188,10 +188,10 @@ gst_default_debug_handler (gint category, gboolean incore,
{
gchar *empty = "";
gchar *elementname = empty,*location = empty;
int pthread_id = getpid();
int pid = getpid();
int cothread_id = 0; /*FIXME*/
#ifdef GST_DEBUG_COLOR
int pthread_color = pthread_id%6 + 31;
int pid_color = pid%6 + 31;
int cothread_color = (cothread_id < 0) ? 37 : (cothread_id%6 + 31);
#endif
@ -208,11 +208,11 @@ gst_default_debug_handler (gint category, gboolean incore,
#ifdef GST_DEBUG_COLOR
fprintf(stderr,"DEBUG(\033[00;%dm%5d\033[00m:\033[00;%dm%2d\033[00m)\033["
"%s;%sm%s%s\033[00m %s\n",
pthread_color,pthread_id,cothread_color,cothread_id,incore?"00":"01",
pid_color,pid,cothread_color,cothread_id,incore?"00":"01",
_gst_category_colors[category],location,elementname,string);
#else
fprintf(stderr,"DEBUG(%5d:%2d)%s%s %s\n",
pthread_id,cothread_id,location,elementname,string);
pid,cothread_id,location,elementname,string);
#endif /* GST_DEBUG_COLOR */
if (location != empty) g_free(location);
@ -300,10 +300,10 @@ gst_default_info_handler (gint category, gboolean incore,
{
gchar *empty = "";
gchar *elementname = empty,*location = empty;
int pthread_id = getpid();
int pid = getpid();
int cothread_id = 0; /*FIXME*/
#ifdef GST_DEBUG_COLOR
int pthread_color = pthread_id%6 + 31;
int pid_color = pid%6 + 31;
int cothread_color = (cothread_id < 0) ? 37 : (cothread_id%6 + 31);
#endif
@ -319,11 +319,11 @@ gst_default_info_handler (gint category, gboolean incore,
#ifdef GST_DEBUG_COLOR
fprintf(stderr,"\033[01mINFO\033[00m (\033[00;%dm%5d\033[00m:\033[00;%dm%2d\033[00m)\033["
GST_DEBUG_CHAR_MODE ";%sm%s%s\033[00m %s\n",
pthread_color,pthread_id,cothread_color,cothread_id,
pid_color,pid,cothread_color,cothread_id,
_gst_category_colors[category],location,elementname,string);
#else
fprintf(stderr,"INFO (%5d:%2d)%s%s %s\n",
pthread_id,cothread_id,location,elementname,string);
pid,cothread_id,location,elementname,string);
#endif /* GST_DEBUG_COLOR */
/*
#else

View file

@ -29,7 +29,6 @@
#define STATUS(A)
#endif
#include <pthread.h>
#include "config.h"
#include "gst_private.h"
@ -320,9 +319,9 @@ gst_queue_chain (GstPad *pad, GstBuffer *buf)
restart:
/* we have to lock the queue since we span threads */
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locking t:%ld", pthread_self ());
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locking t:%p", g_thread_self ());
g_mutex_lock (queue->qlock);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locked t:%ld", pthread_self ());
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locked t:%p", g_thread_self ());
/* assume don't need to flush this buffer when the queue is filled */
queue->flush = FALSE;
@ -472,9 +471,9 @@ gst_queue_get (GstPad *pad)
restart:
/* have to lock for thread-safety */
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locking t:%ld", pthread_self ());
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locking t:%p", g_thread_self ());
g_mutex_lock (queue->qlock);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locked t:%ld %p", pthread_self (), queue->not_empty);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locked t:%p %p", g_thread_self (), queue->not_empty);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "pre empty wait, level:%d/%d", queue->level_buffers, queue->size_buffers);
while (queue->level_buffers == 0) {

View file

@ -30,6 +30,10 @@
#include "gstscheduler.h"
#include "gstqueue.h"
#define STACK_SIZE 0x200000
#define g_thread_equal(a,b) ((a) == (b))
GstElementDetails gst_thread_details = {
"Threaded container",
"Generic/Bin",
@ -84,9 +88,10 @@ static GType
gst_thread_schedpolicy_get_type(void) {
static GType thread_schedpolicy_type = 0;
static GEnumValue thread_schedpolicy[] = {
{SCHED_OTHER, "SCHED_OTHER", "Normal Scheduling"},
{SCHED_FIFO, "SCHED_FIFO", "FIFO Scheduling (requires root)"},
{SCHED_RR, "SCHED_RR", "Round-Robin Scheduling (requires root)"},
{G_THREAD_PRIORITY_LOW, "LOW", "Low Priority Scheduling"},
{G_THREAD_PRIORITY_NORMAL, "NORMAL", "Normal Scheduling"},
{G_THREAD_PRIORITY_HIGH, "HIGH", "High Priority Scheduling"},
{G_THREAD_PRIORITY_URGENT, "URGENT", "Urgent Scheduling"},
{0, NULL, NULL},
};
if (!thread_schedpolicy_type) {
@ -137,7 +142,7 @@ gst_thread_class_init (GstThreadClass *klass)
g_object_class_install_property(G_OBJECT_CLASS (klass), ARG_SCHEDPOLICY,
g_param_spec_enum("schedpolicy", "Scheduling Policy", "The scheduling policy of the thread",
GST_TYPE_THREAD_SCHEDPOLICY, SCHED_OTHER, G_PARAM_READWRITE));
GST_TYPE_THREAD_SCHEDPOLICY, G_THREAD_PRIORITY_NORMAL, G_PARAM_READWRITE));
g_object_class_install_property(G_OBJECT_CLASS (klass), ARG_PRIORITY,
g_param_spec_int("priority", "Scheduling Priority", "The scheduling priority of the thread",
0, 99, 0, G_PARAM_READWRITE));
@ -179,8 +184,8 @@ gst_thread_init (GstThread *thread)
thread->cond = g_cond_new ();
thread->ppid = getpid ();
thread->thread_id = (pthread_t) -1;
thread->sched_policy = SCHED_OTHER;
thread->thread_id = (GThread *) NULL;
thread->sched_policy = G_THREAD_PRIORITY_NORMAL;
thread->priority = 0;
thread->stack = NULL;
}
@ -309,8 +314,8 @@ gst_thread_change_state (GstElement * element)
GstThread *thread;
gboolean stateset = GST_STATE_SUCCESS;
gint transition;
pthread_t self = pthread_self ();
glong stacksize;
GThread * self = g_thread_self ();
GError * error = NULL;
g_return_val_if_fail (GST_IS_THREAD (element), GST_STATE_FAILURE);
g_return_val_if_fail (gst_has_threads (), GST_STATE_FAILURE);
@ -323,7 +328,7 @@ gst_thread_change_state (GstElement * element)
gst_element_state_get_name (GST_STATE (element)),
gst_element_state_get_name (GST_STATE_PENDING (element)));
if (pthread_equal (self, thread->thread_id)) {
if (g_thread_equal (self, thread->thread_id)) {
GST_DEBUG (GST_CAT_THREAD,
"no sync(" GST_DEBUG_THREAD_FORMAT "): setting own thread's state to spinning",
GST_DEBUG_THREAD_ARGS (thread->pid));
@ -337,70 +342,29 @@ gst_thread_change_state (GstElement * element)
THR_DEBUG ("creating thread \"%s\"", GST_ELEMENT_NAME (element));
/* this bit of code handles creation of pthreads
/* this bit of code handles creation of GThreads
* this is therefor tricky code
* compare it with the block of code that handles the destruction
* in GST_STATE_READY_TO_NULL below
*/
g_mutex_lock (thread->lock);
/* create attribute struct for pthread
* and assign stack pointer and size to it
*
* the default state of a pthread is PTHREAD_CREATE_JOINABLE
* (see man pthread_attr_init)
* - other thread can sync on termination
* - thread resources are kept allocated until other thread performs
* pthread_join
*/
if (pthread_attr_init (&thread->attr) != 0)
g_warning ("pthread_attr_init returned an error !");
/* this function should return a newly allocated stack
* (using whatever method)
* which we can initiate the pthreads with
* the stack should be freed in
*/
if (gst_scheduler_get_preferred_stack (GST_ELEMENT_SCHED (element),
&thread->stack, &stacksize)) {
#ifdef HAVE_PTHREAD_ATTR_SETSTACK
if (pthread_attr_setstack (&thread->attr,
thread->stack, stacksize) != 0) {
g_warning ("pthread_attr_setstack failed\n");
return GST_STATE_FAILURE;
}
#else
if (pthread_attr_setstackaddr (&thread->attr, thread->stack) != 0) {
g_warning ("pthread_attr_setstackaddr failed\n");
return GST_STATE_FAILURE;
}
if (pthread_attr_setstacksize (&thread->attr, stacksize) != 0) {
g_warning ("pthread_attr_setstacksize failed\n");
return GST_STATE_FAILURE;
}
#endif
GST_DEBUG (GST_CAT_THREAD, "pthread attr set stack at %p of size %ld",
thread->stack, stacksize);
}
else {
g_warning ("scheduler did not return a preferred stack");
}
/* create a new pthread
/* create a new GThread
* use the specified attributes
* make it execute gst_thread_main_loop (thread)
*/
GST_DEBUG (GST_CAT_THREAD, "going to pthread_create...");
if (pthread_create (&thread->thread_id, &thread->attr,
gst_thread_main_loop, thread) != 0) {
GST_DEBUG (GST_CAT_THREAD, "pthread_create failed");
GST_DEBUG (GST_CAT_THREAD, "going to g_thread_create_full...");
thread->thread_id = g_thread_create_full(gst_thread_main_loop,
thread, STACK_SIZE, TRUE, TRUE, G_THREAD_PRIORITY_NORMAL,
&error);
if (!thread->thread_id){
GST_DEBUG (GST_CAT_THREAD, "g_thread_create_full failed");
g_mutex_unlock (thread->lock);
GST_DEBUG (GST_CAT_THREAD, "could not create thread \"%s\"",
GST_ELEMENT_NAME (element));
return GST_STATE_FAILURE;
}
GST_DEBUG (GST_CAT_THREAD, "pthread created");
GST_DEBUG (GST_CAT_THREAD, "GThread created");
/* wait for it to 'spin up' */
THR_DEBUG ("waiting for child thread spinup");
@ -530,17 +494,10 @@ gst_thread_change_state (GstElement * element)
* compare this block to the block
*/
/* in glibc 2.2.5, pthread_attr_destroy does nothing more
* than return 0 */
if (pthread_attr_destroy (&thread->attr) != 0)
g_warning ("pthread_attr_destroy has failed !");
GST_DEBUG (GST_CAT_THREAD, "joining GThread %p", thread->thread_id);
g_thread_join (thread->thread_id);
GST_DEBUG (GST_CAT_THREAD, "joining pthread %ld", thread->thread_id);
if (pthread_join (thread->thread_id, NULL) != 0)
g_warning ("pthread_join has failed !\n");
thread->thread_id = -1;
thread->thread_id = NULL;
/* the stack was allocated when we created the thread
* using scheduler->get_preferred_stack */
@ -590,26 +547,14 @@ gst_thread_main_loop (void *arg)
{
GstThread *thread = NULL;
gint stateset;
glong page_size;
gpointer stack_pointer;
gulong stack_offset;
GST_DEBUG (GST_CAT_THREAD, "gst_thread_main_loop started");
thread = GST_THREAD (arg);
g_mutex_lock (thread->lock);
/* handle scheduler policy; do stuff if not the normal scheduler */
if (thread->sched_policy != SCHED_OTHER) {
struct sched_param sched_param;
memset (&sched_param, 0, sizeof (sched_param));
if (thread->priority == 0) {
thread->priority = sched_get_priority_max (thread->sched_policy);
}
sched_param.sched_priority = thread->priority;
if (sched_setscheduler (0, thread->sched_policy, &sched_param) != 0) {
GST_DEBUG (GST_CAT_THREAD, "not running with real-time priority");
}
}
/* set up the element's scheduler */
gst_scheduler_setup (GST_ELEMENT_SCHED (thread));
GST_FLAG_UNSET (thread, GST_THREAD_STATE_REAPING);
@ -617,6 +562,21 @@ gst_thread_main_loop (void *arg)
thread->pid = getpid();
THR_INFO_MAIN ("thread is running");
page_size = sysconf(_SC_PAGESIZE);
stack_pointer = (gpointer) &stack_pointer;
if(((gulong)stack_pointer & (page_size-1)) < (page_size>>1)){
/* stack grows up, I think */
/* FIXME this is probably not true for the main thread */
stack_offset = (gulong)stack_pointer & (page_size - 1);
}else{
/* stack grows down, I think */
stack_offset = STACK_SIZE - ((gulong)stack_pointer & (page_size - 1));
}
/* note the subtlety with pointer arithmetic */
thread->stack = stack_pointer - stack_offset;
thread->stack_size = STACK_SIZE;
/* first we need to change the state of all the children */
if (GST_ELEMENT_CLASS (parent_class)->change_state) {
stateset = GST_ELEMENT_CLASS (parent_class)->change_state (GST_ELEMENT(thread));

View file

@ -24,8 +24,7 @@
#ifndef __GST_THREAD_H__
#define __GST_THREAD_H__
#include <unistd.h>
#include <pthread.h>
#include <glib.h>
#include <gst/gstbin.h>
@ -62,11 +61,11 @@ typedef struct _GstThreadClass GstThreadClass;
struct _GstThread {
GstBin bin;
pthread_t thread_id; /* id of the thread, if any */
pthread_attr_t attr; /* attributes for the stack space */
GThread *thread_id; /* id of the thread, if any */
int sched_policy;
int priority;
void *stack; /* set with gst_scheduler_get_preferred_stack */
gpointer *stack; /* set with gst_scheduler_get_preferred_stack */
guint stack_size; /* stack size */
gint pid; /* the pid of the thread */
gint ppid; /* the pid of the thread's parent process */
GMutex *lock; /* thread lock/condititon pair ... */

View file

@ -29,7 +29,6 @@
#define STATUS(A)
#endif
#include <pthread.h>
#include "config.h"
#include "gst_private.h"
@ -320,9 +319,9 @@ gst_queue_chain (GstPad *pad, GstBuffer *buf)
restart:
/* we have to lock the queue since we span threads */
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locking t:%ld", pthread_self ());
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locking t:%p", g_thread_self ());
g_mutex_lock (queue->qlock);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locked t:%ld", pthread_self ());
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locked t:%p", g_thread_self ());
/* assume don't need to flush this buffer when the queue is filled */
queue->flush = FALSE;
@ -472,9 +471,9 @@ gst_queue_get (GstPad *pad)
restart:
/* have to lock for thread-safety */
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locking t:%ld", pthread_self ());
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locking t:%p", g_thread_self ());
g_mutex_lock (queue->qlock);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locked t:%ld %p", pthread_self (), queue->not_empty);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locked t:%p %p", g_thread_self (), queue->not_empty);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "pre empty wait, level:%d/%d", queue->level_buffers, queue->size_buffers);
while (queue->level_buffers == 0) {