From ff4522fe665f6663d55e9c821648b843b36889e5 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Tue, 9 Oct 2012 16:12:01 +0200 Subject: [PATCH] manual: Talk about threading Rework the threading chapter. Talk about stream-status and give some examples on how to change the thread priorities. --- docs/manual/advanced-threads.xml | 425 ++++++++++++++++++++++++++++-- tests/examples/manual/.gitignore | 2 + tests/examples/manual/Makefile.am | 5 + 3 files changed, 411 insertions(+), 21 deletions(-) diff --git a/docs/manual/advanced-threads.xml b/docs/manual/advanced-threads.xml index cabdcf5479..5925640310 100644 --- a/docs/manual/advanced-threads.xml +++ b/docs/manual/advanced-threads.xml @@ -7,10 +7,411 @@ may want to have influence on some parts of those. &GStreamer; allows applications to force the use of multiple threads over some parts of a pipeline. + See . + + &GStreamer; can also notify you when threads are created so that you can + configure things such as the thread priority or the threadpool to use. + See . + + + + Scheduling in &GStreamer; + + Each element in the &GStreamer; pipeline decides how it is going to + be scheduled. Elements can choose if their pads are to be scheduled + push-based or pull-based. An element can, for example, choose to start + a thread to start pulling from the sink pad or/and start pushing on + the source pad. An element can also choose to use the upstream or + downstream thread for its data processing in push and pull mode + respectively. &GStreamer; does not pose any restrictions on how the + element chooses to be scheduled. See the Plugin Writer Guide for more + details. + + + What will happen in any case is that some elements will start a thread + for their data processing, called the streaming threads. + The streaming threads, or GstTask objects, are + created from a GstTaskPool when the element + needs to make a streaming thread. In the next section we see how we + can receive notifications of the tasks and pools. + + + + + Configuring Threads in &GStreamer; + + A STREAM_STATUS message is posted on the bus to inform you about the + status of the streaming threads. You will get the following information + from the message: + + + + When a new thread is about to be created, you will be notified + of this with a GST_STREAM_STATUS_TYPE_CREATE type. It is then + possible to configure a GstTaskPool in + the GstTask. The custom taskpool will + provide custom threads for the task to implement the streaming + threads. + + + This message needs to be handled synchronously if you want to + configure a custom taskpool. If you don't configure the taskpool + on the task when this message returns, the task will use its + default pool. + + + + + When a thread is entered or left. This is the moment where you + could configure thread priorities. You also get a notification + when a thread is destroyed. + + + + + You get messages when the thread starts, pauses and stops. This + could be used to visualize the status of streaming threads in + a gui application. + + + + + + + + We will now look at some examples in the next sections. + + + + Boost priority of a thread + + .----------. .----------. + | faksesrc | | fakesink | + | src->sink | + '----------' '----------' + + + Let's look at the simple pipeline above. We would like to boost + the priority of the streaming thread. + It will be the fakesrc element that starts the streaming thread for + generating the fake data pushing them to the peer fakesink. + The flow for changing the priority would go like this: + + + + + When going from READY to PAUSED state, udpsrc will require a + streaming thread for pushing data into the depayloader. It will + post a STREAM_STATUS message indicating its requirement for a + streaming thread. + + + + + The application will react to the STREAM_STATUS messages with a + sync bus handler. It will then configure a custom + GstTaskPool on the + GstTask inside the message. The custom + taskpool is responsible for creating the threads. In this + example we will make a thread with a higher priority. + + + + + Alternatively, since the sync message is called in the thread + context, you can use thread ENTER/LEAVE notifications to + change the priority or scheduling pollicy of the current thread. + + + + + In a first step we need to implement a custom + GstTaskPool that we can configure on the task. + Below is the implementation of a GstTaskPool + subclass that uses pthreads to create a SCHED_RR real-time thread. + Note that creating real-time threads might require extra priveleges. + + + + + + + + +typedef struct +{ + pthread_t thread; +} TestRTId; + +G_DEFINE_TYPE (TestRTPool, test_rt_pool, GST_TYPE_TASK_POOL); + +static void +default_prepare (GstTaskPool * pool, GError ** error) +{ + /* we don't do anything here. We could construct a pool of threads here that + * we could reuse later but we don't */ +} + +static void +default_cleanup (GstTaskPool * pool) +{ +} + +static gpointer +default_push (GstTaskPool * pool, GstTaskPoolFunction func, gpointer data, + GError ** error) +{ + TestRTId *tid; + gint res; + pthread_attr_t attr; + struct sched_param param; + + tid = g_slice_new0 (TestRTId); + + pthread_attr_init (&attr); + if ((res = pthread_attr_setschedpolicy (&attr, SCHED_RR)) != 0) + g_warning ("setschedpolicy: failure: %p", g_strerror (res)); + + param.sched_priority = 50; + if ((res = pthread_attr_setschedparam (&attr, ¶m)) != 0) + g_warning ("setschedparam: failure: %p", g_strerror (res)); + + if ((res = pthread_attr_setinheritsched (&attr, PTHREAD_EXPLICIT_SCHED)) != 0) + g_warning ("setinheritsched: failure: %p", g_strerror (res)); + + res = pthread_create (&tid->thread, &attr, (void *(*)(void *)) func, data); + + if (res != 0) { + g_set_error (error, G_THREAD_ERROR, G_THREAD_ERROR_AGAIN, + "Error creating thread: %s", g_strerror (res)); + g_slice_free (TestRTId, tid); + tid = NULL; + } + + return tid; +} + +static void +default_join (GstTaskPool * pool, gpointer id) +{ + TestRTId *tid = (TestRTId *) id; + + pthread_join (tid->thread, NULL); + + g_slice_free (TestRTId, tid); +} + +static void +test_rt_pool_class_init (TestRTPoolClass * klass) +{ + GstTaskPoolClass *gsttaskpool_class; + + gsttaskpool_class = (GstTaskPoolClass *) klass; + + gsttaskpool_class->prepare = default_prepare; + gsttaskpool_class->cleanup = default_cleanup; + gsttaskpool_class->push = default_push; + gsttaskpool_class->join = default_join; +} + +static void +test_rt_pool_init (TestRTPool * pool) +{ +} + +GstTaskPool * +test_rt_pool_new (void) +{ + GstTaskPool *pool; + + pool = g_object_new (TEST_TYPE_RT_POOL, NULL); + + return pool; +} +]]> + + + + The important function to implement when writing an taskpool is the + push function. The implementation should start a thread + that calls the given function. More involved implementations might + want to keep some threads around in a pool because creating and + destroying threads is not always the fastest operation. + + + In a next step we need to actually configure the custom taskpool when + the fakesrc needs it. For this we intercept the STREAM_STATUS messages + with a sync handler. + + + + + + + + Note that this program likely needs root permissions in order to + create real-time threads. When the thread can't be created, the + state change function will fail, which we catch in the application + above. + + + When there are multiple threads in the pipeline, you will receive + multiple STREAM_STATUS messages. You should use the owner of the + message, which is likely the pad or the element that starts the + thread, to figure out what the function of this thread is in the + context of the application. + + + When would you want to force a thread? + + We have seen that threads are created by elements but it is also + possible to insert elements in the pipeline for the sole purpose of + forcing a new thread in the pipeline. + There are several reasons to force the use of threads. However, for performance reasons, you never want to use one thread for every @@ -24,7 +425,8 @@ Data buffering, for example when dealing with network streams or when recording data from a live stream such as a video or audio card. Short hickups elsewhere in the pipeline will not cause data - loss. + loss. See also about network + buffering with queue2.
Data buffering, from a networked source @@ -58,7 +460,7 @@ Above, we've mentioned the queue element several times now. A queue is the thread boundary element through which you can force the use of threads. It does so by using a classic - provider/receiver model as learned in threading classes at + provider/consumer model as learned in threading classes at universities all around the world. By doing this, it acts both as a means to make data throughput between threads threadsafe, and it can also act as a buffer. Queues have several GObject @@ -76,23 +478,4 @@ - - Scheduling in &GStreamer; - - - Each element in the &GStreamer; pipeline decides how it is going to - be scheduled. Elements can choose to be scheduled push-based or - pull-based. - If elements support random access to data, such as file sources, - then elements downstream in the pipeline can ask to schedule the random - access elements in pull-based mode. Data is pulled from upstream - and pushed downstream. If pull-mode is not supported, the element can - decide to operate in push-mode. - - - In practice, most elements in &GStreamer;, such as decoders, encoders, - etc. only support push-based scheduling, which means that in practice, - &GStreamer; uses a push-based scheduling model. - - diff --git a/tests/examples/manual/.gitignore b/tests/examples/manual/.gitignore index 8c15399e68..49574d9a99 100644 --- a/tests/examples/manual/.gitignore +++ b/tests/examples/manual/.gitignore @@ -33,6 +33,8 @@ probe query fakesrc typefind +effectswitch +testrtpool xml-mp3 xml diff --git a/tests/examples/manual/Makefile.am b/tests/examples/manual/Makefile.am index a2db29bcd5..dafec2193b 100644 --- a/tests/examples/manual/Makefile.am +++ b/tests/examples/manual/Makefile.am @@ -41,6 +41,7 @@ EXAMPLES = \ appsink \ dynformat \ effectswitch \ + testrtpool \ playbin \ decodebin @@ -59,6 +60,7 @@ BUILT_SOURCES = \ appsink.c \ dynformat.c \ effectswitch.c \ + testrtpool.c \ playbin.c decodebin.c CLEANFILES = core core.* test-registry.* *.gcno *.gcda $(BUILT_SOURCES) @@ -112,6 +114,9 @@ effectswitch.c: $(top_srcdir)/docs/manual/advanced-dataaccess.xml playbin.c decodebin.c: $(top_srcdir)/docs/manual/highlevel-components.xml $(PERL_PATH) $(srcdir)/extract.pl $@ $< +testrtpool.c: $(top_srcdir)/docs/manual/advanced-threads.xml + $(PERL_PATH) $(srcdir)/extract.pl $@ $< + TESTS = bin \ elementcreate elementfactory elementget elementlink elementmake \ ghostpad init