This is a follow-up to commit 7ee4afac.
This commit cleans up the `Pad{Sink,Src}Handler` by
- Keeping arguments which are strictly necessary.
- Passing arguments by value for the trait functions which return
a `Future`. The arguments which were previously passed by reference
were `clone`d internally and then `clone`d again in most
implementations.
There are unfortunate differences in trait function signatures
between those which return a `Future` and the sync functions. This
is due to the requirement for the arguments to be moved to the
resulting `Future`, whereas sync functions can rely on references.
One particular notable difference is the use of the `imp` in sync
functions instead of the `elem` in functions returning a `Future`.
Because the `imp` is not guaranteed to implement `Clone`, we can't
move it to the resulting `Future`, so the `elem` is used.
This is no longer available as this could lead to building a defined
value in Rust which could be interpreted as undefined in C due to
the sentinel `u64::MAX` for `None`.
Use the constants (e.g. `ONE`, `K`, `M`, ...) and operations to build
a value and deref (`*`) to get the quantity as an integer.
It is now guaranteed that each fragment is at most fragment-duration
long unless the one and only GOP of the fragment is longer than that.
The first (non-EOS) stream determines the duration of each fragment and
all other streams are drained to at most the fragment end timestamp
determined this way.
In addition the next fragment's target time is now at the end of the
previous fragment plus fragment-duration instead of using
first-fragment + N*fragment-duration
regardless of where fragments were split before.
That is, fmp4mux now uses the same strategy as used by splitmuxsink and
as is required e.g. by HLS with regards to the target duration.
A strong handle reference was held in the `block_on_priv` `Result`
handler in the thread for the `Scheduler::start` code path, which
lead to the `Handler` strong count not dropping to 0 when it
should, leading to the shutdown request not being triggered.
Use an Arc<AtomicBool> instead of a oneshot channel for shutdown.
The main Future is always polled and never relies on a waker, a
`poll_fn` is cheap and does the job.
Unpark the scheduler after posting a request to shutdown.
Subtasks are used when current async processing needs to execute
a `Future` via a sync function (eg. a call to a C function).
In this case `Context::block_on` would block the whole `Context`,
leading to a deadlock.
The main use case for this is the `Pad{Src,Sink}` functions:
when we `PadSrc::push` and the peer pad is a `PadSink`, we want
`PadSrc::push` to complete after the async function on the
`PadSink` completes. In this case the `PadSink` async function
is added as a subtask of current scheduler task and
`PadSrc::push` only returns when the subtask is executed.
In `runtime::Task` (`Task` here is the execution Task with a
state machine, not a scheduler task), we used to spawn state
transition actions and iteration loop (leading to a new
scheduler Task). At the time, it seemed convenient for the user
to automatically drain sub tasks after a state transition action
or an iteration. User wouldn't have to worry about this, similarly
to the `Pad{Src,Sink}` case.
In current implementation, the `Task` state machine now operates
directly on the target `Context`. State transtions actions and
the iteration loop are no longer spawned. It seems now useless to
abstract the subtasks draining from the user. Either they
transitively use a mechanism such as `Pad{Src,Sink}` which already
handles this automatically, or they add substasks on purpose, in
which case they know better when subtasks must be drained.
... so that it can be reused on current thread for subsequent
Scheduler instantiations (e.g. block_on) without the need to
reallocate internal data structures.
This commit improves threadshare timers predictability
by better making use of current time slice.
Added a dedicate timer BTreeMap for after timers (those
that are guaranteed to fire no sooner than the expected
instant) so as to avoid previous workaround which added
half the max throttling duration. These timers can now
be checked against the reactor processing instant.
Oneshot timers only need to be polled as `Future`s when
intervals are `Stream`s. This also reduces the size for
oneshot timers and make user call `next` on intervals.
Intervals can also implement `FusedStream`, which can help
when used in features such as `select!`.
Also drop the `time` module, which was kepts for
compatibility when the `executor` was migrated from tokio
based to smol-like.
Add a `tuning` feature which adds counters that help with performance
evaluation. The only counter added so far accumulates the duration a
Scheduler has been parked, which is pretty accurate an indication of
CPU usage of the Scheduler.
- Reworked buffer push.
- Reworked stats.
- Make first elements logs stand out. This make it possible to
follow what's going on with pipelines containing 1000s of
elements.
- Actually handle EOS.
- Use more significant defaults.
- Allow building without `clap` feature.
jitterbuffer tests crash on Windows CI sometimes. Activating logs
showed time values which are probably not expected in a regular
environment, but which can happen there. Adding extra robustness
to `next_wakeup` computation seems to fix the problem judging by
the few runs I triggered.
The encoding of ONVIF metadata is always UTF-8. ONVIF metadata may
or may not be encoded with gzip, but we don't see a use case for
transporting compressed ONVIF metadata between elements for now.
The code tried to do this before but didn't consider the case where the
first pad has a valid running time, in which case the buffer with the
invalid running time would never be dequeued and the muxer would never
output anything.
Using callgrind with the standalone test showed opportunities for
improvements for sub tasks addition and drain.
All sub task additions were performed after making sure we were
operating on a Context Task. The Context and Task were checked
again when adding the sub task.
Draining sub tasks was perfomed in a loop on every call places,
checking whether there were remaining sub tasks first. This
commit implements the loop and checks directly in
`executor::Task::drain_subtasks`, saving one `Mutex` lock and
one `thread_local` access per iteration when there are sub
tasks to drain.
The `PadSink` functions wrapper were performing redundant checks
on the `Context` presence and were adding the delayed Future only
when there were already sub tasks.
Implement a test that initializes pipelines with minimalistic
theadshare src and sink. This can help with the evaluation of
changes to the threadshare runtime or with element
implementation details. It makes it easy to run flamegraph or
callgrind and to focus on the threadshare runtime overhead.
By moving sync on buffer ts to `try_next`, the resulting delay
can be cancelled when a state transition occurs.
To prevent item loss, this requires first peeking the incoming
item from the channel without popping it. After the delay has
elasped, we can pop the item as the last await point in
`try_next`: either it will be cancelled before popping or the
popped item will be passed on to `handle_item`.
Also add `flush` which was missing from `stop` and `flush_start`
transition actions.
Previous Task iteration model suffered from the following
shortcomings:
- When an iteration was engaged it could be cancelled at
await points by Stop or Flush state transitions,
which could lead to inconsistent states.
- When an iteration was engaged it could not be cancelled
by a Pause state transition so as to prevent data loss.
This meant we couldn't block on the Pause request because
the mechanism couldn't guarantee Paused would be reached
in a timely manner.
This commit split the Task iteration into:
- `try_next`: this function returns a future that awaits
for a new iteration to begin. The regular use case is
to return an item to process. The item can be left to
`()` if `try_next` acts as a tick generator. It can
also return an error. This function can be cancelled at
await points when a state transition request occurs.
- `handle_item`: this function is called with the item
returned by `try_next` and is guaranteed to run to
completion even if a transition request is received.
Note that this model plays well with the common Future
cancellation pitfalls in Rust.
warning: this expression borrows a value the compiler would automatically borrow
--> generic/threadshare/src/runtime/executor/async_wrapper.rs:402:19
|
402 | match (&mut *self).get_mut().read(buf) {
| ^^^^^^^^^^^^ help: change this to: `(*self)`
|
= note: `#[warn(clippy::needless_borrow)]` on by default
= help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#needless_borrow
State transitions request functions hid the synchronization
details to the caller:
- If the transition was requested from a Context, a subtask was
added or the transition ack was not awaited if the new transition
was requested from a running transition or an iteration function.
- If the transition was not requested from a Context, current
thread was blocked until the ack was received.
This strategy facilitated code in elements, but it suffered from
the following shortcomings:
- The `prepare` transition request didn't comply with the above
strategy and would always return an `Async` form. This was
designed to accomodate the `Prepare` function for elements
such as `ts-tcpclientsrc` which takes times due to the
TCP socket connection delays. The idea was that the actual
transition result would be available after calling `start`.
This was a disadvantage for elements which would prefer to
error immediately in the event of a preparation failure.
- Hidding the transition request synchronization to the caller
meant that they had no options but relying on the internal
mechanism. E.g.: it was not possible to `start` from another
async runtime without blocking. Also it was not possible
to request a transition and choose not to await for the
ack.
This commit introduces a more flexible API for state
transitions requests:
- The transition request function now return a `TransitionStatus`,
which is a Future.
- When an error occurs immediately (e.g. the transition
request is not autorized due to current state of the Task),
the `TransitionStatus` is resolved immediately and can be
`check`ed for errors. This is useful for functions such as
`pepare` in the case of `ts-tcpclientsrc` (see above).
This is also useful for `pause`, because in current design,
the transition is always async. Note however, that `pause` is
forseen to adhere to the same behaviour as the other transition
requests in the near future [1].
- If the caller chooses to await for the ack and they don't know
if they are running on a ts Context (e.g. in `Pad{Src,Sink}`
handlers), they can call `await_maybe_on_context`. This is mostly
the same behaviour as the one that used to be performed internaly.
- If the caller knows for sure they can't possibly block an async
executor, they can call `block_on` which is more explicite, but
will nonetheless make sure no ts Context is being blocked. This
last check was introduced as it was considered low overhead
while it should ease preventing missues in cases where the above
functions should be used.
[1]: https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/793#note_1464400
Task state machines used to execute in an executor from the Futures
crate. State transitions actions and iteration functions were then
spawned on the target threadshare Context.
This commit directly spawns the task state machine on the threadshare
Context. This simplifies code a bit and paves the way for the changes
described in [1].
Also introduces struct `StateMachineHandle`, which gather together
fields to communicate and synchronize with the StateMachine. Renamed
`StateMachine::run` as `spawn` and return `StateMachineHandle`.
[1]: https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/793#note_1464400