diff --git a/utils/livesync/src/livesync/imp.rs b/utils/livesync/src/livesync/imp.rs index 0e655c4a..7355edaa 100644 --- a/utils/livesync/src/livesync/imp.rs +++ b/utils/livesync/src/livesync/imp.rs @@ -14,7 +14,7 @@ use gst::{ }; use once_cell::sync::Lazy; use parking_lot::{Condvar, Mutex, MutexGuard}; -use std::sync::mpsc; +use std::{collections::VecDeque, sync::mpsc}; /// Offset for the segment in single-segment mode, to handle negative DTS const SEGMENT_OFFSET: gst::ClockTime = gst::ClockTime::from_seconds(60 * 60 * 1000); @@ -67,19 +67,23 @@ struct State { upstream_latency: Option, fallback_duration: gst::ClockTime, + playing: bool, eos: bool, - segment: Option>, srcresult: Result, - playing: bool, - sent_segment: bool, clock_id: Option, + in_segment: Option>, + pending_segment: Option>, + out_segment: Option>, + in_caps: Option, + pending_caps: Option, in_audio_info: Option, out_audio_info: Option, - in_item: Option, + queue: VecDeque, + buffer_queued: bool, out_buffer: Option, in_timestamp: Option, @@ -113,16 +117,19 @@ impl Default for State { late_threshold: DEFAULT_LATE_THRESHOLD, upstream_latency: None, fallback_duration: DEFAULT_DURATION, - eos: false, - segment: None, - srcresult: Err(gst::FlowError::Flushing), playing: false, - sent_segment: false, + eos: false, + srcresult: Err(gst::FlowError::Flushing), clock_id: None, + in_segment: None, + pending_segment: None, + out_segment: None, in_caps: None, + pending_caps: None, in_audio_info: None, out_audio_info: None, - in_item: None, + queue: VecDeque::with_capacity(32), + buffer_queued: false, out_buffer: None, in_timestamp: None, out_timestamp: None, @@ -394,14 +401,15 @@ impl ElementImpl for LiveSync { impl State { /// Calculate the running time the buffer covers, including latency - fn ts_range(&self, buf: &gst::BufferRef) -> Option { + fn ts_range( + &self, + buf: &gst::BufferRef, + segment: &gst::FormattedSegment, + ) -> Option { let mut timestamp_start = buf.dts_or_pts()?; if !self.single_segment { - timestamp_start = self - .segment - .as_ref() - .unwrap() + timestamp_start = segment .to_running_time(timestamp_start) .unwrap_or(gst::ClockTime::ZERO); timestamp_start += self.latency + self.upstream_latency.unwrap(); @@ -456,16 +464,17 @@ impl LiveSync { state.in_timestamp = None; state.num_in = 0; state.num_drop = 0; - state.segment = None; + state.in_segment = None; } else { { let mut state = self.state.lock(); state.srcresult = Err(gst::FlowError::Flushing); - state.out_buffer = None; - state.out_audio_info = None; if let Some(clock_id) = state.clock_id.take() { clock_id.unschedule(); } + state.pending_caps = None; + state.out_audio_info = None; + state.out_buffer = None; self.cond.notify_all(); } @@ -474,7 +483,8 @@ impl LiveSync { let mut state = self.state.lock(); state.in_caps = None; state.in_audio_info = None; - state.in_item = None; + state.queue.clear(); + state.buffer_queued = false; state.update_fallback_duration(); } drop(lock); @@ -500,7 +510,8 @@ impl LiveSync { let mut state = self.state.lock(); state.srcresult = Ok(gst::FlowSuccess::Ok); - state.sent_segment = false; + state.pending_segment = None; + state.out_segment = None; state.out_timestamp = None; state.num_out = 0; state.num_duplicate = 0; @@ -513,11 +524,12 @@ impl LiveSync { { let mut state = self.state.lock(); state.srcresult = Err(gst::FlowError::Flushing); - state.out_buffer = None; - state.out_audio_info = None; if let Some(clock_id) = state.clock_id.take() { clock_id.unschedule(); } + state.pending_caps = None; + state.out_audio_info = None; + state.out_buffer = None; self.cond.notify_all(); } @@ -558,12 +570,15 @@ impl LiveSync { let mut state = self.state.lock(); state.srcresult = Ok(gst::FlowSuccess::Ok); state.eos = false; - state.sent_segment = false; - state.segment = None; + state.in_segment = None; + state.pending_segment = None; + state.out_segment = None; state.in_caps = None; + state.pending_caps = None; state.in_audio_info = None; state.out_audio_info = None; - state.in_item = None; + state.queue.clear(); + state.buffer_queued = false; state.out_buffer = None; state.update_fallback_duration(); @@ -587,11 +602,7 @@ impl LiveSync { }; let mut state = self.state.lock(); - state.segment = Some(segment.clone()); - if !state.single_segment { - state.sent_segment = false; - } - return true; + state.in_segment = Some(segment.clone()); } gst::EventView::Gap(_) => { @@ -638,30 +649,25 @@ impl LiveSync { state.in_caps = Some(caps); state.in_audio_info = audio_info; state.update_fallback_duration(); - return true; } _ => {} } - if event.is_serialized() { - let mut state = self.state.lock(); - while state.srcresult.is_ok() && state.in_item.is_some() { - self.cond.wait(&mut state); - } - - if state.srcresult.is_err() { - return false; - } - - gst::trace!(CAT, imp: self, "Queueing {:?}", event); - state.in_item = Some(Item::Event(event)); - self.cond.notify_all(); - - true - } else { - gst::Pad::event_default(pad, Some(&*self.obj()), event) + if !event.is_serialized() { + return gst::Pad::event_default(pad, Some(&*self.obj()), event); } + + let mut state = self.state.lock(); + if state.srcresult.is_err() { + return false; + } + + gst::trace!(CAT, imp: self, "Queueing {:?}", event); + state.queue.push_back(Item::Event(event)); + self.cond.notify_all(); + + true } fn src_event(&self, pad: &gst::Pad, mut event: gst::Event) -> bool { @@ -695,16 +701,14 @@ impl LiveSync { let (sender, receiver) = mpsc::sync_channel(1); let mut state = self.state.lock(); - while state.srcresult.is_ok() && state.in_item.is_some() { - self.cond.wait(&mut state); - } - if state.srcresult.is_err() { return false; } gst::trace!(CAT, imp: self, "Queueing {:?}", query); - state.in_item = Some(Item::Query(std::ptr::NonNull::from(query), sender)); + state + .queue + .push_back(Item::Query(std::ptr::NonNull::from(query), sender)); self.cond.notify_all(); drop(state); @@ -774,7 +778,7 @@ impl LiveSync { } } - while state.srcresult.is_ok() && state.in_item.is_some() { + while state.srcresult.is_ok() && state.buffer_queued { self.cond.wait(&mut state); } state.srcresult?; @@ -819,9 +823,10 @@ impl LiveSync { } } + // At this stage we should really really have a segment + let segment = state.in_segment.as_ref().ok_or(gst::FlowError::Error)?; + if state.single_segment { - // At this stage we should really really have a segment - let segment = state.segment.as_ref().ok_or(gst::FlowError::Error)?; let dts = segment .to_running_time_full(buf_mut.dts()) .map(|r| r + SEGMENT_OFFSET) @@ -855,7 +860,7 @@ impl LiveSync { buf_mut.set_flags(gst::BufferFlags::DISCONT); } - let mut timestamp = state.ts_range(buf_mut); + let mut timestamp = state.ts_range(buf_mut, segment); let lateness = self.buffer_is_backwards(&state, timestamp); match lateness { BufferLateness::OnTime => {} @@ -889,12 +894,13 @@ impl LiveSync { buf_mut.set_pts(prev.pts().map(|t| t + prev_duration)); buf_mut.set_flags(gst::BufferFlags::GAP); - timestamp = state.ts_range(buf_mut); + timestamp = state.ts_range(buf_mut, state.out_segment.as_ref().unwrap()); } } gst::trace!(CAT, imp: self, "Queueing {:?} ({:?})", buffer, lateness); - state.in_item = Some(Item::Buffer(buffer, lateness)); + state.queue.push_back(Item::Buffer(buffer, lateness)); + state.buffer_queued = true; state.in_timestamp = timestamp; state.num_in += 1; self.cond.notify_all(); @@ -958,31 +964,54 @@ impl LiveSync { fn src_loop_inner(&self) -> Result { let mut state = self.state.lock(); while state.srcresult.is_ok() - && (!state.playing || (state.in_item.is_none() && state.out_buffer.is_none())) + && (!state.playing || (state.queue.is_empty() && state.out_buffer.is_none())) { self.cond.wait(&mut state); } state.srcresult?; - gst::trace!(CAT, imp: self, "Unqueueing {:?}", state.in_item); - let in_buffer = match state.in_item.take() { + let in_item = state.queue.pop_front(); + gst::trace!(CAT, imp: self, "Unqueueing {:?}", in_item); + + let in_buffer = match in_item { None => None, Some(Item::Buffer(buffer, lateness)) => { if self.buffer_is_early(&state, state.in_timestamp) { // Try this buffer again on the next iteration - state.in_item = Some(Item::Buffer(buffer, lateness)); + state.queue.push_front(Item::Buffer(buffer, lateness)); None } else { + state.buffer_queued = false; Some((buffer, lateness)) } } Some(Item::Event(event)) => { + let mut push = true; + + match event.view() { + gst::EventView::Segment(e) => { + let segment = e.segment().downcast_ref().unwrap(); + state.pending_segment = Some(segment.clone()); + push = false; + } + + gst::EventView::Caps(e) => { + state.pending_caps = Some(e.caps_owned()); + state.update_fallback_duration(); + push = false; + } + + _ => {} + } + self.cond.notify_all(); drop(state); - self.srcpad.push_event(event); + if push { + self.srcpad.push_event(event); + } return Ok(gst::FlowSuccess::Ok); } @@ -999,19 +1028,18 @@ impl LiveSync { } }; - let (duplicate, caps) = if let Some((buffer, lateness)) = in_buffer { - let caps = state.in_caps.take(); - + let duplicate; + let mut caps = None; + let mut segment = None; + if let Some((buffer, lateness)) = in_buffer { state.out_buffer = Some(buffer); state.out_timestamp = state.in_timestamp; - if caps.is_some() { - state.out_audio_info = state.in_audio_info.clone(); - } + caps = state.pending_caps.take(); + segment = state.pending_segment.take(); + duplicate = lateness != BufferLateness::OnTime; self.cond.notify_all(); - - (lateness != BufferLateness::OnTime, caps) } else { // Work around borrow checker let State { @@ -1045,8 +1073,11 @@ impl LiveSync { buffer.set_flags(gst::BufferFlags::GAP); buffer.unset_flags(gst::BufferFlags::DISCONT); - state.out_timestamp = state.ts_range(state.out_buffer.as_ref().unwrap()); - (true, None) + state.out_timestamp = state.ts_range( + state.out_buffer.as_ref().unwrap(), + state.out_segment.as_ref().unwrap(), + ); + duplicate = true; }; let buffer = state.out_buffer.clone().unwrap(); @@ -1060,29 +1091,37 @@ impl LiveSync { let event = gst::event::Caps::new(&caps); MutexGuard::unlocked(&mut state, || self.srcpad.push_event(event)); state.srcresult?; + + state.out_audio_info = caps + .structure(0) + .unwrap() + .has_name("audio/x-raw") + .then(|| gst_audio::AudioInfo::from_caps(&caps).unwrap()); } - if !state.sent_segment { - let event = if state.single_segment { - // Create live segment - let mut segment = gst::FormattedSegment::::new(); - segment.set_start(sync_ts + SEGMENT_OFFSET); - segment.set_base(sync_ts); - segment.set_time(sync_ts); - segment.set_position(sync_ts + SEGMENT_OFFSET); - - gst::debug!(CAT, imp: self, "Sending new segment: {:?}", segment); - gst::event::Segment::new(&segment) - } else { - let segment = state.segment.as_ref().unwrap(); - + if let Some(segment) = segment { + if !state.single_segment { gst::debug!(CAT, imp: self, "Forwarding segment: {:?}", segment); - gst::event::Segment::new(segment) - }; - MutexGuard::unlocked(&mut state, || self.srcpad.push_event(event)); - state.srcresult?; - state.sent_segment = true; + let event = gst::event::Segment::new(&segment); + MutexGuard::unlocked(&mut state, || self.srcpad.push_event(event)); + state.srcresult?; + } else if state.out_segment.is_none() { + // Create live segment + let mut live_segment = gst::FormattedSegment::::new(); + live_segment.set_start(sync_ts + SEGMENT_OFFSET); + live_segment.set_base(sync_ts); + live_segment.set_time(sync_ts); + live_segment.set_position(sync_ts + SEGMENT_OFFSET); + + gst::debug!(CAT, imp: self, "Sending new segment: {:?}", live_segment); + + let event = gst::event::Segment::new(&live_segment); + MutexGuard::unlocked(&mut state, || self.srcpad.push_event(event)); + state.srcresult?; + } + + state.out_segment = Some(segment); } {