This is a follow-up to commit 7ee4afac.
This commit cleans up the `Pad{Sink,Src}Handler` by
- Keeping arguments which are strictly necessary.
- Passing arguments by value for the trait functions which return
a `Future`. The arguments which were previously passed by reference
were `clone`d internally and then `clone`d again in most
implementations.
There are unfortunate differences in trait function signatures
between those which return a `Future` and the sync functions. This
is due to the requirement for the arguments to be moved to the
resulting `Future`, whereas sync functions can rely on references.
One particular notable difference is the use of the `imp` in sync
functions instead of the `elem` in functions returning a `Future`.
Because the `imp` is not guaranteed to implement `Clone`, we can't
move it to the resulting `Future`, so the `elem` is used.
This is no longer available as this could lead to building a defined
value in Rust which could be interpreted as undefined in C due to
the sentinel `u64::MAX` for `None`.
Use the constants (e.g. `ONE`, `K`, `M`, ...) and operations to build
a value and deref (`*`) to get the quantity as an integer.
It is now guaranteed that each fragment is at most fragment-duration
long unless the one and only GOP of the fragment is longer than that.
The first (non-EOS) stream determines the duration of each fragment and
all other streams are drained to at most the fragment end timestamp
determined this way.
In addition the next fragment's target time is now at the end of the
previous fragment plus fragment-duration instead of using
first-fragment + N*fragment-duration
regardless of where fragments were split before.
That is, fmp4mux now uses the same strategy as used by splitmuxsink and
as is required e.g. by HLS with regards to the target duration.
A strong handle reference was held in the `block_on_priv` `Result`
handler in the thread for the `Scheduler::start` code path, which
lead to the `Handler` strong count not dropping to 0 when it
should, leading to the shutdown request not being triggered.
Use an Arc<AtomicBool> instead of a oneshot channel for shutdown.
The main Future is always polled and never relies on a waker, a
`poll_fn` is cheap and does the job.
Unpark the scheduler after posting a request to shutdown.
Subtasks are used when current async processing needs to execute
a `Future` via a sync function (eg. a call to a C function).
In this case `Context::block_on` would block the whole `Context`,
leading to a deadlock.
The main use case for this is the `Pad{Src,Sink}` functions:
when we `PadSrc::push` and the peer pad is a `PadSink`, we want
`PadSrc::push` to complete after the async function on the
`PadSink` completes. In this case the `PadSink` async function
is added as a subtask of current scheduler task and
`PadSrc::push` only returns when the subtask is executed.
In `runtime::Task` (`Task` here is the execution Task with a
state machine, not a scheduler task), we used to spawn state
transition actions and iteration loop (leading to a new
scheduler Task). At the time, it seemed convenient for the user
to automatically drain sub tasks after a state transition action
or an iteration. User wouldn't have to worry about this, similarly
to the `Pad{Src,Sink}` case.
In current implementation, the `Task` state machine now operates
directly on the target `Context`. State transtions actions and
the iteration loop are no longer spawned. It seems now useless to
abstract the subtasks draining from the user. Either they
transitively use a mechanism such as `Pad{Src,Sink}` which already
handles this automatically, or they add substasks on purpose, in
which case they know better when subtasks must be drained.
... so that it can be reused on current thread for subsequent
Scheduler instantiations (e.g. block_on) without the need to
reallocate internal data structures.
This commit improves threadshare timers predictability
by better making use of current time slice.
Added a dedicate timer BTreeMap for after timers (those
that are guaranteed to fire no sooner than the expected
instant) so as to avoid previous workaround which added
half the max throttling duration. These timers can now
be checked against the reactor processing instant.
Oneshot timers only need to be polled as `Future`s when
intervals are `Stream`s. This also reduces the size for
oneshot timers and make user call `next` on intervals.
Intervals can also implement `FusedStream`, which can help
when used in features such as `select!`.
Also drop the `time` module, which was kepts for
compatibility when the `executor` was migrated from tokio
based to smol-like.
Add a `tuning` feature which adds counters that help with performance
evaluation. The only counter added so far accumulates the duration a
Scheduler has been parked, which is pretty accurate an indication of
CPU usage of the Scheduler.
- Reworked buffer push.
- Reworked stats.
- Make first elements logs stand out. This make it possible to
follow what's going on with pipelines containing 1000s of
elements.
- Actually handle EOS.
- Use more significant defaults.
- Allow building without `clap` feature.
jitterbuffer tests crash on Windows CI sometimes. Activating logs
showed time values which are probably not expected in a regular
environment, but which can happen there. Adding extra robustness
to `next_wakeup` computation seems to fix the problem judging by
the few runs I triggered.
The encoding of ONVIF metadata is always UTF-8. ONVIF metadata may
or may not be encoded with gzip, but we don't see a use case for
transporting compressed ONVIF metadata between elements for now.
The code tried to do this before but didn't consider the case where the
first pad has a valid running time, in which case the buffer with the
invalid running time would never be dequeued and the muxer would never
output anything.
Using callgrind with the standalone test showed opportunities for
improvements for sub tasks addition and drain.
All sub task additions were performed after making sure we were
operating on a Context Task. The Context and Task were checked
again when adding the sub task.
Draining sub tasks was perfomed in a loop on every call places,
checking whether there were remaining sub tasks first. This
commit implements the loop and checks directly in
`executor::Task::drain_subtasks`, saving one `Mutex` lock and
one `thread_local` access per iteration when there are sub
tasks to drain.
The `PadSink` functions wrapper were performing redundant checks
on the `Context` presence and were adding the delayed Future only
when there were already sub tasks.
Implement a test that initializes pipelines with minimalistic
theadshare src and sink. This can help with the evaluation of
changes to the threadshare runtime or with element
implementation details. It makes it easy to run flamegraph or
callgrind and to focus on the threadshare runtime overhead.
By moving sync on buffer ts to `try_next`, the resulting delay
can be cancelled when a state transition occurs.
To prevent item loss, this requires first peeking the incoming
item from the channel without popping it. After the delay has
elasped, we can pop the item as the last await point in
`try_next`: either it will be cancelled before popping or the
popped item will be passed on to `handle_item`.
Also add `flush` which was missing from `stop` and `flush_start`
transition actions.
Previous Task iteration model suffered from the following
shortcomings:
- When an iteration was engaged it could be cancelled at
await points by Stop or Flush state transitions,
which could lead to inconsistent states.
- When an iteration was engaged it could not be cancelled
by a Pause state transition so as to prevent data loss.
This meant we couldn't block on the Pause request because
the mechanism couldn't guarantee Paused would be reached
in a timely manner.
This commit split the Task iteration into:
- `try_next`: this function returns a future that awaits
for a new iteration to begin. The regular use case is
to return an item to process. The item can be left to
`()` if `try_next` acts as a tick generator. It can
also return an error. This function can be cancelled at
await points when a state transition request occurs.
- `handle_item`: this function is called with the item
returned by `try_next` and is guaranteed to run to
completion even if a transition request is received.
Note that this model plays well with the common Future
cancellation pitfalls in Rust.
warning: this expression borrows a value the compiler would automatically borrow
--> generic/threadshare/src/runtime/executor/async_wrapper.rs:402:19
|
402 | match (&mut *self).get_mut().read(buf) {
| ^^^^^^^^^^^^ help: change this to: `(*self)`
|
= note: `#[warn(clippy::needless_borrow)]` on by default
= help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#needless_borrow