mirror of
https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git
synced 2025-01-02 07:18:43 +00:00
ts-queue: Make PendingQueue a proper struct
This commit is contained in:
parent
892c812669
commit
da2332d814
1 changed files with 37 additions and 11 deletions
|
@ -120,13 +120,19 @@ static PROPERTIES: [subclass::Property; 5] = [
|
|||
}),
|
||||
];
|
||||
|
||||
struct PendingQueue {
|
||||
task: Option<task::Task>,
|
||||
scheduled: bool,
|
||||
items: VecDeque<DataQueueItem>,
|
||||
}
|
||||
|
||||
struct State {
|
||||
io_context: Option<IOContext>,
|
||||
pending_future_id: Option<PendingFutureId>,
|
||||
io_context_in: Option<IOContext>,
|
||||
pending_future_id_in: Option<PendingFutureId>,
|
||||
queue: Option<DataQueue>,
|
||||
pending_queue: Option<(Option<task::Task>, bool, VecDeque<DataQueueItem>)>,
|
||||
pending_queue: Option<PendingQueue>,
|
||||
last_res: Result<gst::FlowSuccess, gst::FlowError>,
|
||||
pending_future_cancel: Option<futures::sync::oneshot::Sender<()>>,
|
||||
}
|
||||
|
@ -179,12 +185,16 @@ impl Queue {
|
|||
fn queue_until_full(
|
||||
&self,
|
||||
queue: &DataQueue,
|
||||
pending_queue: &mut Option<(Option<task::Task>, bool, VecDeque<DataQueueItem>)>,
|
||||
pending_queue: &mut Option<PendingQueue>,
|
||||
item: DataQueueItem,
|
||||
) -> Result<(), DataQueueItem> {
|
||||
match pending_queue {
|
||||
None => queue.push(item),
|
||||
Some((_, false, ref mut items)) => {
|
||||
Some(PendingQueue {
|
||||
task: _,
|
||||
scheduled: false,
|
||||
ref mut items,
|
||||
}) => {
|
||||
let mut failed_item = None;
|
||||
while let Some(item) = items.pop_front() {
|
||||
if let Err(item) = queue.push(item) {
|
||||
|
@ -222,7 +232,7 @@ impl Queue {
|
|||
..
|
||||
} = *state;
|
||||
|
||||
pending_queue.as_mut().unwrap().1 = true;
|
||||
pending_queue.as_mut().unwrap().scheduled = true;
|
||||
|
||||
let element_clone = element.clone();
|
||||
let future = future::poll_fn(move || {
|
||||
|
@ -245,7 +255,12 @@ impl Queue {
|
|||
"Trying to empty pending queue"
|
||||
);
|
||||
|
||||
let res = if let Some((ref mut task, _, ref mut items)) = *pending_queue {
|
||||
let res = if let Some(PendingQueue {
|
||||
ref mut task,
|
||||
scheduled: _,
|
||||
ref mut items,
|
||||
}) = *pending_queue
|
||||
{
|
||||
let mut failed_item = None;
|
||||
while let Some(item) = items.pop_front() {
|
||||
if let Err(item) = dq.as_ref().unwrap().push(item) {
|
||||
|
@ -312,11 +327,15 @@ impl Queue {
|
|||
if let Err(item) = self.queue_until_full(queue, pending_queue, item) {
|
||||
if pending_queue
|
||||
.as_ref()
|
||||
.map(|(_, scheduled, _)| !scheduled)
|
||||
.map(|pq| !pq.scheduled)
|
||||
.unwrap_or(true)
|
||||
{
|
||||
if pending_queue.is_none() {
|
||||
*pending_queue = Some((None, false, VecDeque::new()));
|
||||
*pending_queue = Some(PendingQueue {
|
||||
task: None,
|
||||
scheduled: false,
|
||||
items: VecDeque::new(),
|
||||
});
|
||||
}
|
||||
|
||||
let schedule_now = match item {
|
||||
|
@ -326,7 +345,7 @@ impl Queue {
|
|||
_ => true,
|
||||
};
|
||||
|
||||
pending_queue.as_mut().unwrap().2.push_back(item);
|
||||
pending_queue.as_mut().unwrap().items.push_back(item);
|
||||
|
||||
gst_log!(
|
||||
self.cat,
|
||||
|
@ -343,7 +362,7 @@ impl Queue {
|
|||
}
|
||||
} else {
|
||||
assert!(io_context_in.is_some());
|
||||
pending_queue.as_mut().unwrap().2.push_back(item);
|
||||
pending_queue.as_mut().unwrap().items.push_back(item);
|
||||
|
||||
None
|
||||
}
|
||||
|
@ -540,7 +559,11 @@ impl Queue {
|
|||
> {
|
||||
let event = {
|
||||
let state = self.state.lock().unwrap();
|
||||
if let Some((Some(ref task), _, _)) = state.pending_queue {
|
||||
if let Some(PendingQueue {
|
||||
task: Some(ref task),
|
||||
..
|
||||
}) = state.pending_queue
|
||||
{
|
||||
task.notify();
|
||||
}
|
||||
|
||||
|
@ -768,7 +791,10 @@ impl Queue {
|
|||
queue.pause();
|
||||
queue.clear(&self.src_pad);
|
||||
}
|
||||
if let Some((Some(task), _, _)) = state.pending_queue.take() {
|
||||
if let Some(PendingQueue {
|
||||
task: Some(task), ..
|
||||
}) = state.pending_queue.take()
|
||||
{
|
||||
task.notify();
|
||||
}
|
||||
let _ = state.pending_future_cancel.take();
|
||||
|
|
Loading…
Reference in a new issue