Fixing Threading ---------------- 1) Observations The following observations are made when considering the current (17/11/2004) problems in gstreamer. - Bin state changes. Currently the state of a bin is determined by the highest state of the children, This is in particular a problem for GstThread because a thread should start/stop spinning at any time depending on the state of a child. ex 1: +-------------------------------------+ | GstThread | | +--------+ +---------+ +------+ | | | src | | decoder | | sink | | | | src-sink src-sink | | | +--------+ +---------+ +------+ | +-------------------------------------+ When performing the state change on the GstThread to PLAYING, one of the children (at random) will go to PLAYING first, this will trigger a method in GstThread that will start spinning the thread. Some elements are not yet in the PLAYING state when the scheduler starts iterating elements. This is not a clean way to start the data passing. State changes also trigger negotiation and scheduling (in the other thread) can do too. This creates races in negotiation. - ERROR and EOS conditions triggering a state change A typical problem is also that since scheduling starts while the state change happens, it is possible that the elements go to EOS or ERROR before the state change completes. Currently this makes the elements go to PAUSED again, creating races with the state change in progress. This also gives the impression to the core that the state change failed. - no locking whatsoever When an element does a state change, it is possible for another thread to perform a conflicting state change. - negotiation is not designed to work over multithread boundaries. negotiation over a queue is not possible. There is no method or policy of discovering a media type and then committing it. It is also not possible to tie the negotiated media to the relevant buffer. ex1: it Should be possible to queue the old and the new formats in a queue. The element connected to the sinkpad of the queue should be able to find out that the new format will be accepted by the element connected on the srcpad of the queue, even if that element is streaming the old format. +------------------------------+ | GstQueue | | +++++++++++++++++++++++++ | -sink |B|B|B|B|B|B|A|A|A|A|A|A| src- | +++++++++++++++++++++++++ | +------------------------------+ +----------+ +----------+ buffers in buffers in new format old format - element properties are not threadsafe When setting an element property while streaming, the element does no locking whatsoever to guarantee its internal consistency. - No control over streaming. When some GstThread is iterating and you want to reconnect a pad, there is no way to block the pad, perform the actions and then unblock it again. This leads to thread problems where a pad is negotiation at the same time that it is passing data. This is currently solved by PAUSING the pipeline or performing the actions in the same threadcontext as the iterate loop. - race conditions in synchronizing the clocks and spinning up the pipeline. Currently the clock is started as soon as the pipeline is set to playing. Because some time elaspes before the elements are negotiated, autoplugged and streaming, the first frame/sample almost always arrives late at the sinks. Hacks exist to adjust the element base time to compensate for the delay but this clearly is not clean. - race conditions when performing seeks in the pipeline. Since the elements have no control over the streaming threads, they cannot block them or resync them to the new seek position. It is also hard to synchronize them with the clock. - race conditions when sending tags and error messages up the pipeline hierarchy. These races are either caused by glib refcounting problems and by not properly locking. - more as changes are implemented and testcases are written 2) possible solutions - not allowing threading at all Run the complete pipeline in a single thread. Complications to solve include handling of blocking operations like source elements blocking in kernel space, sink elements blocking on the clock or kernel space, etc.. In practice, all operations should be made non-blocking because a blocking element can cause the rest of the pipeline to block as well and cause it to miss a deadline. A non-blocking model needs cooperation from the kernel (with callbacks) or requires the use of a polling mechanism, both of which are either impractical or too CPU intensive and in general not achievable for a general purpose Multimedia framework. For this reason we will not go further with this solution. - Allow threading. To make this work, We propose the following changes: - Remove GstThread, it does not add anything useful in a sense that you cannot arbitrarily place the thread element, it needs decoupled elements around the borders. - Simplify the state changes of bins elements. A bin or element never changes state automatically on EOS and ERROR. - Introduce the concept of the application and the streaming thread. All data passing is done in the streaming thread. This also means that all operations either are performed in the application thread or streaming thread and that they should be protected against competing operations in other threads. This would define a policy for adding appropriate locking. - Move the creation of threads into source and loop-based elements. This will make it possible for the elements in control of the threads to perform the locking when needed. One particular instance is for example the state changes, by creating the threads in the element, it is possible to sync the streaming and the application thread (which does the state change). - Remove negotiation from state changes. This will remove the conflict between streaming and negotiating elements. - add locks around pad operations like negotiation, streaming, linking, etc. This will remove races between these conflicting operations. This will also make it possible to un/block dataflow. - add locks around bin operations like add/removing elements. - add locks around element operations like state changes and property changes. - add a 2-phase directed negotiation process. The source pad queries and figures out what the sinkpad can take in the first phase. In the second phase it sends the new format change as an event to the peer element. This event can be interleaved with the buffers and can travel over queues inbetween the buffers. Need to rethink this wrt bufferpools (see DShow and old bufferpool implementation) - add a preroll phase that will be used to spin up the pipeline and align frames/samples in the sinks. This phase will happen in the PAUSED state. This also means that dataflow will happen in the PAUSED state. Sinks will not sink samples in the PAUSED state but will complete their state change asynchronously. This will allow us to have perfect synchronisation with the clock. - a two phase seek policy. First the event travels upstream, putting all elements in the seeking phase and making them synchronize to the new position. In the second phase the DISCONT event signals the end of the seek and all filters can continue with the new position. - Error messages, EOS, tags and other events in the pipeline should be sent to a mainloop. The app then has an in-thread mechanism for getting information about the pipeline. It should also be possible to get the messages directly from the elements itself, like signals. The application programmer has to know that working these events come from another thread and should handle them accordingly. - Add return values to push/pull so that errors upstream or downstream can be noted by other elements so that they can disable themselves or propagate the error. 3) detailed explanation a) Pipeline construction Pipeline construction includes: - adding/removing elements to the bin - finding elements in a bin by name and interface - setting the clock on a pipeline. - setting properties on objects - linking/unlinking pads These operations should take the object lock to make sure it can be executed from different threads. When connecting pads to other pads from elements inside another bin, we require that the bin has a ghostpad for the pad. This is needed so that the bin looks like a self-contained element. not allowed: +---------------------+ | GstBin | +---------+ | +--------+ | | element | | | src | | sink src------sink src- ... | +---------+ | +--------+ | +---------------------+ allowed: +-----------------------+ | GstBin | | +--------+ | +---------+ | | src | | | element | | sink src- ... | sink src---sink/ +--------+ | +---------+ +-----------------------+ This requirement is important when we need to sort the elements in the bin to perform the state change. testcases: - create a bin, add/remove elements from it - add/remove from different threads and check the bin integrity. b) state changes An element can be in one of the four states NULL, READY, PAUSED, PLAYING. NULL: starting state of the element READY: element is ready to start running. PAUSED: element is streaming data, has opened devices etc. PLAYING: element is streaming data and clock is running Note that data starts streaming even in the PAUSED state. The only difference between the PAUSED and PLAYING state is that the clock is running in the PLAYING state. This mostly has an effect on the renderers which will block on the first sample they receive when in PAUSED mode. The transition from READY->PAUSED is called the preroll state. During that transition, media is queued in the pipeline and autoplugging is done. Elements are put in a new state using the _set_state function. This function can return the following return values: typedef enum { GST_STATE_FAILURE = 0, GST_STATE_PARTIAL = 1, GST_STATE_SUCCESS = 2, GST_STATE_ASYNC = 3 } GstElementStateReturn; GST_STATE_FAILURE is returned when the element failed to go to the required state. When dealing with a bin, this is returned when one of the elements failed to go to the required state. The other elements in the bin might have changed their states successfully. This return value means that the element did _not_ change state, for bins this means that not all children have changed their state. GST_STATE_PARTIAL is returned when some elements in a bin where in the locked state and therefore did not change their state. Note that the state of the bin will be changed regardless of this PARTIAL return value. GST_STATE_SUCCES is returned when all the elements successfully changed their states. GST_STATE_ASYNC is returned when an element is going to report the success or failure of a state change later. The state of a bin is not related to the state of its children but only to the last state change directly performed on the bin or on a parent bin. This means that changing the state of an element inside the bin does not affect the state of the bin. Setting the state on a bin that is already in the correct state will perform the requested state change on the children. Elements are not allowed to change their own state. For bins, it is allowed to change the state of its children. This means that the application can only know about the states of the elements it has explicitly set. There is a difference in the way a pipeline and a bin handles the state change of its children: - a bin returns GST_STATE_ASYNC when one of its children returns an ASYNC reply. - a pipeline never returns GST_STATE_ASYNC but returns from the state change function after all ASYNC elements completed the state change. This is done by polling the ASYNC elements until they return their final state. The state change function must be fast an cannot block. If a blocking behaviour is unavoidable, the state change function must perform an async state change. Sink elements therefore always use async state changes since they need to wait before the first buffer arrives at the sink. A bin has to change the state of its children elements from the sink to the source elements. This makes sure that a sink element is always ready to receive data from a source element in the case of a READY->PAUSED state change. In the case of a PAUSED->READY state, the sink element will be set to READY first so that the source element will receive an error when it tries to push data to this element so that it will shut down as well. For loop based elements we have to be careful since they can pull a buffer from the peer element before it has been put in the right state. The state of a loop based element is therefore only changed after the source element has been put in the new state. c) Element state change functions The core will call the change_state function of an element with the element lock held. The element is responsible for starting any streaming tasks/threads and making sure that it synchronizes them to the state change function if needed. This means that no other thread is allowed to change the state of the element at that time and for bins, it is not possible to add/remove elements. When an element is busy doing the ASYNC state change, it is possible that another state change happens. The elements should be prepared for this. An element can receive a state change for the same state it is in. This is not a problem, some elements (like bins) use this to resynchronize their children. Other elements should ignore this state change and return SUCCESS. When performing a state change on an element that returns ASYNC on one of the state changes, ASYNC is returned and you can only proceed to the next state change when this ASYNC state change completed. Use the gst_element_get_state function to know when the state change completed. An example of this behaviour is setting a videosink to PLAYING, it will return ASYNC in the state change from READY->PAUSED. You can only set it to PLAYING when this state change completes. Bins will perform the state change code listed in d). For performing the state change, two variables are used: the current state of the element and the pending state. When the element is not performing a state change, the pending state == None. The state change variables are protected by the element lock. The pending state != None as long as the state change is performed or when an ASYNC state change is running. The core provides the following function for applications and bins to get the current state of an element: bool gst_element_get_state(&state, &pending, timeout); This function will block while the state change function is running inside the element because it grabs the element lock. When the element did not perform an async state change, this function returns TRUE immediately with the state updated to reflect the current state of the element and pending set to None. When the element performed an async state change, this function will block for the value of timeout and will return TRUE if the element completed the async state change within that timeout, otherwise it returns FALSE, with the current and pending state filled in. The algorithm is like this: bool gst_element_get_state(elem, &state, &pending, timeout) { g_mutex_lock (ELEMENT_LOCK); if (elem->pending != none) { if (!g_mutex_cond_wait(STATE, ELEMENT_LOCK, timeout) { /* timeout triggered */ *state = elem->state; *pending = elem->pending; ret = FALSE; } } if (elem->pending == none) { *state = elem->state; *pending = none; ret = TRUE; } g_mutex_unlock (ELEMENT_LOCK); return ret; } For plugins the following function is provided to commit the pending state, the ELEMENT_LOCK should be held when calling this function: gst_element_commit_state(element) { if (pending != none) { state = pending; pending = none; } g_cond_broadcast (STATE); } For bins the gst_element_get_state() works slightly different. It will run the function on all of its children, as soon as one of the children returns FALSE, the method returns FALSE with the state set to the current bin state and the pending set to pending state. For bins with elements that did an ASYNC state change, the _commit_state() is only executed when actively calling _get_state(). The reason for this is that when a child of the bin commits its state, this is not automatically reported to the bin. This is not a problem since the _get_state() function is the only way to get the current and pending state of the bin and is always consistent. d) bin state change algorithm In order to perform the sink to source state change a bin must be able to sort the elements. To make this easier we require that elements are connected to bins using ghostpads on the bin. The algorithm goes like this: d = [ ] # list of delayed elements p = [ ] # list of pending async elements q = [ elements without srcpads ] # sinks while q not empty do e = dequeue q s = [ all elements connected to e on the sinkpads ] q = q append s if e is entry point d = d append e else r = state change e if r is ASYNC p = p append e done while d not empty do e = dequeue d r = state change e if r is ASYNC p = p append e done # a bin would return ASYNC here if p is not empty # this last part is only performed by a pipeline while p not empty do e = peek p if state completed e dequeue e from p done The algorithm first tries to find the sink elements, ie. ones without sinkpads. Then it changes the state of each sink elements and queues the elements connected to the sinkpads. The entry points (loopbased and getbased elements) are delayed as we first need to change the state of the other elements before we can activate the entry points in the pipeline. The pipeline will poll the async children before returning. e) The GstTask infrastructure A new component: GstTask is added to the core. A task is created by an instance of the abstract GstScheduler class. Each schedulable element (when added to a pipeline) is handed a reference to a GstScheduler. It can use this object to create a GstTask, which is basically a managed wrapper around a threading library like GThread. It should be possible to write a GstScheduler instance that uses other means of scheduling, like one that does not use threads but implements task switching based on mutex locking. When source and loopbased elements want to create the streaming thread they create an instance of a GstTask, which they pass a pointer to a loop-function. This function will be called as soon as the element performs GstTask.start(). The element can stop and uses mutexes to pause the GstTask from, for example, the state change function or the event functions. The GstTasks implement the streaming threads. f) the preroll phase Element start the streaming threads in the READY->PAUSED state. Since the elements that start the threads are put in the PAUSED state last, after their connected elements, they will be able to deliver data to their peers without problems. Sink elements like audio and videosinks will return an async state change reply and will only commit the state change after receiving the first buffer. This will implement the preroll phase. The following pseudo code shows an algorithm for committing the state change in the streaming method. GST_OBJECT_LOCK (element); /* if we are going to PAUSED, we can commit the state change */ if (GST_STATE_TRANSITION (element) == GST_STATE_READY_TO_PAUSED) { gst_element_commit_state (element); } /* if we are paused we need to wait for playing to continue */ if (GST_STATE (element) == GST_STATE_PAUSED) { /* here we wait for the next state change */ do { g_cond_wait (element->state_cond, GST_OBJECT_GET_LOCK (element)); } while (GST_STATE (element) == GST_STATE_PAUSED); /* check if we got playing */ if (GST_STATE (element) != GST_STATE_PLAYING) { /* not playing, we can't accept the buffer */ GST_OBJECT_UNLOCK (element); gst_buffer_unref (buf); return GST_FLOW_WRONG_STATE; } } GST_OBJECT_UNLOCK (element); g) return values for push/pull To recover from pipeline errors in a more elegant manner than just shutting down the pipeline, we need more finegrained error messages in the data transport. The plugins should be able to know what goes wrong when interacting with their outside environment. This means that gst_pad_push/gst_pad_pull and gst_event_send should return a result code. Possible return values include: - GST_OK - GST_ERROR - GST_NOT_CONNECTED - GST_NOT_NEGOTIATED - GST_WRONG_STATE - GST_UNEXPECTED - GST_NOT_SUPPORTED GST_OK Data transport was successful GST_ERROR An error occurred during transport, such as a fatal decoding error, the pad should not be used again. GST_NOT_CONNECTED The pad was not connected GST_NOT_NEGOTIATED The peer does not know what datatype is going over the pipeline. GST_WRONG_STATE The peer pad is not in the correct state. GST_UNEXPECTED The peer pad did not expect the data because it was flushing or received an eos. GST_NOT_SUPPORTED The operation is not supported. The signatures of the functions will become: GstFlowReturn gst_pad_push (GstPad *pad, GstBuffer *buffer); GstFlowReturn gst_pad_pull (GstPad *pad, GstBuffer **buffer); GstResult gst_pad_push_event (GstPad *pad, GstEvent *event); - push_event will send the event to the connected pad. For sending events from the application: GstResult gst_pad_send_event (GstPad *pad, GstEvent *event); h) Negotiation Implement a simple two phase negotiation. First the source queries the sink if it accepts a certain format, then it sends the new format as an event. Sink pads can also trigger a state change by requesting a renegotiation. i) Mainloop integration/GstBus All error, warning and EOS messages from the plugins are sent to an event queue. The pipeline reads the messages from the queue and will either handle them or forward them to the main event queue that is read by the application. Specific pipelines can be written that deal with negotiation messages and errors in the pipeline intelligently. The basic pipeline will stop the pipeline when an error occurs. Whenever an element posts a message on the event queue, a signal is also fired that can be caught by the application. When dealing with those signals the application has to be aware that they come from the streaming threads and need to make sure they use proper locking to protect their own data structures. The messages will be implemented using a GstBus object that allows plugins to post messages and allows the application to read messages either synchronous or asynchronous. It is also possible to integrate the bus in the mainloop. The messages will derive from GstData to make them a lightweight refcounted object. Need to figure out how we can extend this method to encapsulate generic signals in messages too. This decouples the streaming thread from the application thread and should avoid race conditions and pipeline stalling due to application interaction. It is still possible to receive the messages in the streaming thread context if an application wants to. When doing this, special care has to be taken when performing state changes. j) EOS When an element goes to EOS, it sends the EOS event to the peer plugin and stops sending data on that pad. The peer element that received an EOS event on a pad can refuse any buffers on that pad. All elements without source pads must post the EOS message on the message queue. When the pipeline receives an EOS event from all sinks, it will post the EOS message on the application message queue so that the application knows the pipeline is in EOS. Elements without any connected sourcepads should also post the EOS message. This makes sure that all "dead-ends" signalled the EOS. No state change happens when elements go to EOS but the elements with the GstTask will stop their tasks and so stop producing data. An application can issue a seek operation which makes all tasks running again so that they can start streaming from the new location. A) threads and lowlatency People often think it is a sin to use threads in low latency applications. This is true when using the data has to pass thread boundaries but false when it doesn't. Since source and loop based elements create a thread, it is possible to construct a pipeline where data passing has to cross thread boundaries, consider this case: +-----------------------------------+ | +--------+ +--------+ | | |element1| |element2| | | .. -sink src-sink src- .. | | +--------+ +--------+ | +-----------------------------------+ The two elements are loop base and thus create a thread to drive the pipeline. At the border between the two elements there is a mutex to pass the data between the two threads. When using these kinds of element in a pipeline, low-latency will not be possible. For low-latency apps, don't use these constructs! Note that in a typical pipeline with one get-based element and two chain-based elements (decoder/sink) there is only one thread, no data is crossing thread boundaries and thus this pipeline can be low-latency. Also note that while this pipeline is streaming no interaction or locking is done between it and the main application. +-------------------------------------+ | +--------+ +---------+ +------+ | | | src | | decoder | | sink | | | | src-sink src-sink | | | +--------+ +---------+ +------+ | +-------------------------------------+ B) howto make non-threaded pipelines For low latency it is required to not have datapassing cross any thread borders. Here are some pointers for making sure this requirement is met: - never connect a loop or chain based element to a loop based element, this will create a new thread for the sink loop element. - do not use queues or any other decoupled element, as they implicitly create a thread boundary. - At least one thread will be created for any source element (either in the connected loop-based element or in the source itself) unless the source elements are connected to the same loop based element. - when designing sinks, make them non-blocking, use the async clock callbacks to schedule media rendering in the same thread (if any) as the clock. Sinks that provide the clock can be made blocking.