diff --git a/gst-plugin-threadshare/src/jitterbuffer/jitterbuffer.rs b/gst-plugin-threadshare/src/jitterbuffer/jitterbuffer.rs index f3026949..d582af44 100644 --- a/gst-plugin-threadshare/src/jitterbuffer/jitterbuffer.rs +++ b/gst-plugin-threadshare/src/jitterbuffer/jitterbuffer.rs @@ -15,11 +15,8 @@ // Free Software Foundation, Inc., 51 Franklin Street, Suite 500, // Boston, MA 02110-1335, USA. -use either::Either; - use futures::future::BoxFuture; use futures::future::{abortable, AbortHandle, Aborted}; -use futures::lock::{Mutex, MutexGuard}; use futures::prelude::*; use glib; @@ -31,23 +28,21 @@ use glib::{glib_object_impl, glib_object_subclass}; use gst; use gst::prelude::*; use gst::subclass::prelude::*; -use gst::{gst_debug, gst_error_msg, gst_info, gst_log, gst_trace}; +use gst::{gst_debug, gst_error, gst_error_msg, gst_info, gst_log, gst_trace}; use gst_rtp::RTPBuffer; use lazy_static::lazy_static; use std::cmp::{max, min, Ordering}; use std::collections::{BTreeSet, VecDeque}; -use std::sync::atomic::AtomicU64; -use std::sync::atomic::Ordering::Relaxed; +use std::mem; use std::sync::Arc; +use std::sync::Mutex as StdMutex; use std::time::Duration; use crate::get_current_running_time; use crate::runtime::prelude::*; -use crate::runtime::{ - self, Context, JoinHandle, PadContext, PadSink, PadSinkRef, PadSrc, PadSrcRef, PadSrcWeak, -}; +use crate::runtime::{self, Context, PadSink, PadSinkRef, PadSrc, PadSrcRef}; use super::{RTPJitterBuffer, RTPJitterBufferItem, RTPPacketRateCtx}; @@ -155,192 +150,27 @@ static PROPERTIES: [subclass::Property; 7] = [ }), ]; -#[derive(Clone, Debug)] -struct JitterBufferPadSinkHandler; - -impl PadSinkHandler for JitterBufferPadSinkHandler { - type ElementImpl = JitterBuffer; - - fn sink_chain( - &self, - pad: &PadSinkRef, - _jitterbuffer: &JitterBuffer, - element: &gst::Element, - buffer: gst::Buffer, - ) -> BoxFuture<'static, Result> { - let pad_weak = pad.downgrade(); - let element = element.clone(); - async move { - let pad = pad_weak.upgrade().expect("PadSink no longer exists"); - - gst_debug!(CAT, obj: pad.gst_pad(), "Handling {:?}", buffer); - let jitterbuffer = JitterBuffer::from_instance(&element); - jitterbuffer - .enqueue_item(pad.gst_pad(), &element, Some(buffer)) - .await - } - .boxed() - } - - fn sink_event( - &self, - pad: &PadSinkRef, - jitterbuffer: &JitterBuffer, - element: &gst::Element, - event: gst::Event, - ) -> Either> { - use gst::EventView; - - if event.is_serialized() { - let pad_weak = pad.downgrade(); - let element = element.clone(); - Either::Right( - async move { - let pad = pad_weak.upgrade().expect("PadSink no longer exists"); - - let mut forward = true; - - gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event); - - let jitterbuffer = JitterBuffer::from_instance(&element); - match event.view() { - EventView::FlushStop(..) => { - jitterbuffer.flush(&element).await; - } - EventView::Segment(e) => { - let mut state = jitterbuffer.state.lock().await; - state.segment = e - .get_segment() - .clone() - .downcast::() - .unwrap(); - } - EventView::Eos(..) => { - let mut state = jitterbuffer.state.lock().await; - jitterbuffer.drain(&mut state, &element).await; - } - EventView::CustomDownstreamSticky(e) => { - if PadContext::is_pad_context_sticky_event(&e) { - forward = false; - } - } - _ => (), - }; - - if forward { - gst_log!(CAT, obj: pad.gst_pad(), "Forwarding serialized {:?}", event); - jitterbuffer.src_pad.push_event(event).await - } else { - true - } - } - .boxed(), - ) - } else { - gst_log!(CAT, obj: pad.gst_pad(), "Forwarding non-serialized {:?}", event); - Either::Left(jitterbuffer.src_pad.gst_pad().push_event(event)) - } - } - - fn sink_query( - &self, - pad: &PadSinkRef, - jitterbuffer: &JitterBuffer, - element: &gst::Element, - query: &mut gst::QueryRef, - ) -> bool { - use gst::QueryView; - - gst_log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", query); - - match query.view_mut() { - QueryView::Drain(..) => { - gst_info!(CAT, obj: pad.gst_pad(), "Draining"); - runtime::executor::block_on(jitterbuffer.enqueue_item(pad.gst_pad(), element, None)) - .is_ok() - } - _ => jitterbuffer.src_pad.gst_pad().peer_query(query), - } - } -} - -#[derive(Debug)] -struct JitterBufferPadSrcHandlerInner { - latency: gst::ClockTime, - position: AtomicU64, -} - -#[derive(Clone, Debug)] -struct JitterBufferPadSrcHandler(Arc); - -impl JitterBufferPadSrcHandler { - fn new(latency: gst::ClockTime) -> Self { - JitterBufferPadSrcHandler(Arc::new(JitterBufferPadSrcHandlerInner { - latency, - position: AtomicU64::new(std::u64::MAX), - })) - } -} - -impl PadSrcHandler for JitterBufferPadSrcHandler { - type ElementImpl = JitterBuffer; - - fn src_query( - &self, - pad: &PadSrcRef, - jitterbuffer: &JitterBuffer, - _element: &gst::Element, - query: &mut gst::QueryRef, - ) -> bool { - use gst::QueryView; - - gst_log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", query); - - match query.view_mut() { - QueryView::Latency(ref mut q) => { - let mut peer_query = gst::query::Query::new_latency(); - - let ret = jitterbuffer.sink_pad.gst_pad().peer_query(&mut peer_query); - - if ret { - let (_, mut min_latency, _) = peer_query.get_result(); - min_latency += self.0.latency; - let max_latency = gst::CLOCK_TIME_NONE; - - q.set(true, min_latency, max_latency); - } - - ret - } - QueryView::Position(ref mut q) => { - if q.get_format() != gst::Format::Time { - jitterbuffer.sink_pad.gst_pad().peer_query(query) - } else { - let position = self.0.position.load(Relaxed); - q.set(gst::ClockTime(Some(position))); - true - } - } - _ => jitterbuffer.sink_pad.gst_pad().peer_query(query), - } - } -} - #[derive(Eq)] -struct GapPacket(gst::Buffer); +struct GapPacket { + buffer: gst::Buffer, + seq: u16, + pt: u8, +} + +impl GapPacket { + fn new(buffer: gst::Buffer) -> Self { + let mut rtp_buffer = RTPBuffer::from_buffer_readable(&buffer).unwrap(); + let seq = rtp_buffer.get_seq(); + let pt = rtp_buffer.get_payload_type(); + drop(rtp_buffer); + + Self { buffer, seq, pt } + } +} impl Ord for GapPacket { fn cmp(&self, other: &Self) -> Ordering { - let mut rtp_buffer = RTPBuffer::from_buffer_readable(&self.0).unwrap(); - let mut other_rtp_buffer = RTPBuffer::from_buffer_readable(&other.0).unwrap(); - - let seq = rtp_buffer.get_seq(); - let other_seq = other_rtp_buffer.get_seq(); - - drop(rtp_buffer); - drop(other_rtp_buffer); - - 0.cmp(&gst_rtp::compare_seqnum(seq, other_seq)) + 0.cmp(&gst_rtp::compare_seqnum(self.seq, other.seq)) } } @@ -356,87 +186,78 @@ impl PartialEq for GapPacket { } } -struct State { - jbuf: glib::SendUniqueCell, +struct SinkHandlerInner { packet_rate_ctx: RTPPacketRateCtx, - clock_rate: i32, - segment: gst::FormattedSegment, - ips_rtptime: u32, + ips_rtptime: Option, ips_pts: gst::ClockTime, - last_pt: u32, - last_in_seqnum: u32, - packet_spacing: gst::ClockTime, - gap_packets: Option>, - last_popped_seqnum: u32, - num_pushed: u64, - num_lost: u64, - num_late: u64, - last_rtptime: u32, - equidistant: i32, - earliest_pts: gst::ClockTime, - earliest_seqnum: u16, - last_popped_pts: gst::ClockTime, - discont: bool, - last_res: Result, - task_queue_abort_handle: Option, - wakeup_abort_handle: Option, - wakeup_join_handle: Option>>, - src_pad_handler: JitterBufferPadSrcHandler, + + gap_packets: BTreeSet, + + last_pt: Option, + + last_in_seqnum: Option, + last_rtptime: Option, } -impl Default for State { - fn default() -> State { - State { - jbuf: glib::SendUniqueCell::new(RTPJitterBuffer::new()).unwrap(), +impl Default for SinkHandlerInner { + fn default() -> Self { + SinkHandlerInner { packet_rate_ctx: RTPPacketRateCtx::new(), - clock_rate: -1, - segment: gst::FormattedSegment::::new(), - ips_rtptime: 0, + ips_rtptime: None, ips_pts: gst::CLOCK_TIME_NONE, - last_pt: std::u32::MAX, - last_in_seqnum: std::u32::MAX, - packet_spacing: gst::ClockTime(Some(0)), - gap_packets: Some(BTreeSet::new()), - last_popped_seqnum: std::u32::MAX, - num_pushed: 0, - num_lost: 0, - num_late: 0, - last_rtptime: std::u32::MAX, - equidistant: 0, - earliest_pts: gst::CLOCK_TIME_NONE, - earliest_seqnum: 0, - last_popped_pts: gst::CLOCK_TIME_NONE, - discont: false, - last_res: Ok(gst::FlowSuccess::Ok), - task_queue_abort_handle: None, - wakeup_abort_handle: None, - wakeup_join_handle: None, - src_pad_handler: JitterBufferPadSrcHandler::new( - DEFAULT_LATENCY_MS as u64 * gst::MSECOND, - ), + gap_packets: BTreeSet::new(), + last_pt: None, + last_in_seqnum: None, + last_rtptime: None, } } } -struct JitterBuffer { - sink_pad: PadSink, - src_pad: PadSrc, - state: Mutex, - settings: Mutex, -} +#[derive(Clone)] +struct SinkHandler(Arc>); -lazy_static! { - static ref CAT: gst::DebugCategory = gst::DebugCategory::new( - "ts-jitterbuffer", - gst::DebugColorFlags::empty(), - Some("Thread-sharing jitterbuffer"), - ); -} +impl SinkHandler { + fn new() -> Self { + SinkHandler(Arc::new(StdMutex::new(SinkHandlerInner::default()))) + } + + fn clear(&self) { + let mut inner = self.0.lock().unwrap(); + *inner = SinkHandlerInner::default(); + } + + // For resetting if seqnum discontinuities + fn reset( + &self, + inner: &mut SinkHandlerInner, + state: &mut State, + element: &gst::Element, + ) -> BTreeSet { + gst_info!(CAT, obj: element, "Resetting"); + + state.jbuf.borrow().flush(); + state.jbuf.borrow().reset_skew(); + state.discont = true; + + state.last_popped_seqnum = None; + state.last_popped_pts = gst::CLOCK_TIME_NONE; + + inner.last_in_seqnum = None; + inner.last_rtptime = None; + + state.earliest_pts = gst::CLOCK_TIME_NONE; + state.earliest_seqnum = None; + + inner.ips_rtptime = None; + inner.ips_pts = gst::CLOCK_TIME_NONE; + + mem::replace(&mut inner.gap_packets, BTreeSet::new()) + } -impl JitterBuffer { fn parse_caps( &self, - state: &mut MutexGuard, + inner: &mut SinkHandlerInner, + state: &mut State, element: &gst::Element, caps: &gst::Caps, pt: u8, @@ -453,18 +274,17 @@ impl JitterBuffer { return Err(gst::FlowError::Error); } - state.last_pt = pt as u32; - state.clock_rate = s + inner.last_pt = Some(pt); + let clock_rate = s .get_some::("clock-rate") .map_err(|_| gst::FlowError::Error)?; - if state.clock_rate <= 0 { + if clock_rate <= 0 { return Err(gst::FlowError::Error); } + state.clock_rate = Some(clock_rate as u32); - let clock_rate = state.clock_rate; - - state.packet_rate_ctx.reset(clock_rate); + inner.packet_rate_ctx.reset(clock_rate); state.jbuf.borrow().set_clock_rate(clock_rate as u32); Ok(gst::FlowSuccess::Ok) @@ -472,13 +292,14 @@ impl JitterBuffer { fn calculate_packet_spacing( &self, - state: &mut MutexGuard, + inner: &mut SinkHandlerInner, + state: &mut State, rtptime: u32, pts: gst::ClockTime, ) { - if state.ips_rtptime != rtptime { - if state.ips_pts.is_some() && pts.is_some() { - let new_packet_spacing = pts - state.ips_pts; + if inner.ips_rtptime != Some(rtptime) { + if inner.ips_pts.is_some() && pts.is_some() { + let new_packet_spacing = pts - inner.ips_pts; let old_packet_spacing = state.packet_spacing; if old_packet_spacing > new_packet_spacing { @@ -497,21 +318,19 @@ impl JitterBuffer { state.packet_spacing ); } - state.ips_rtptime = rtptime; - state.ips_pts = pts; + inner.ips_rtptime = Some(rtptime); + inner.ips_pts = pts; } } fn handle_big_gap_buffer( &self, - state: &mut MutexGuard, + inner: &mut SinkHandlerInner, element: &gst::Element, buffer: gst::Buffer, pt: u8, ) -> bool { - let gap_packets = state.gap_packets.as_mut().unwrap(); - - let gap_packets_length = gap_packets.len(); + let gap_packets_length = inner.gap_packets.len(); let mut reset = false; gst_debug!( @@ -521,35 +340,28 @@ impl JitterBuffer { gap_packets_length ); - gap_packets.insert(GapPacket(buffer)); + inner.gap_packets.insert(GapPacket::new(buffer)); if gap_packets_length > 0 { let mut prev_gap_seq = std::u32::MAX; let mut all_consecutive = true; - for gap_packet in gap_packets.iter() { - let mut rtp_buffer = RTPBuffer::from_buffer_readable(&gap_packet.0).unwrap(); - - let gap_pt = rtp_buffer.get_payload_type(); - let gap_seq = rtp_buffer.get_seq(); - + for gap_packet in inner.gap_packets.iter() { gst_log!( CAT, obj: element, "Looking at gap packet with seq {}", - gap_seq + gap_packet.seq, ); - drop(rtp_buffer); - - all_consecutive = gap_pt == pt; + all_consecutive = gap_packet.pt == pt; if prev_gap_seq == std::u32::MAX { - prev_gap_seq = gap_seq as u32; - } else if gst_rtp::compare_seqnum(gap_seq, prev_gap_seq as u16) != -1 { + prev_gap_seq = gap_packet.seq as u32; + } else if gst_rtp::compare_seqnum(gap_packet.seq, prev_gap_seq as u16) != -1 { all_consecutive = false; } else { - prev_gap_seq = gap_seq as u32; + prev_gap_seq = gap_packet.seq as u32; } if !all_consecutive { @@ -562,45 +374,25 @@ impl JitterBuffer { if all_consecutive && gap_packets_length > 3 { reset = true; } else if !all_consecutive { - gap_packets.clear(); + inner.gap_packets.clear(); } } reset } - fn reset( + fn store( &self, - state: &mut MutexGuard<'_, State>, - element: &gst::Element, - ) -> BTreeSet { - gst_info!(CAT, obj: element, "Resetting"); - - state.jbuf.borrow().flush(); - state.jbuf.borrow().reset_skew(); - state.discont = true; - state.last_popped_seqnum = std::u32::MAX; - state.last_in_seqnum = std::u32::MAX; - state.ips_rtptime = 0; - state.ips_pts = gst::CLOCK_TIME_NONE; - state.earliest_pts = gst::CLOCK_TIME_NONE; - - let gap_packets = state.gap_packets.take(); - state.gap_packets = Some(BTreeSet::new()); - - // Handle gap_packets in caller to avoid recursion - gap_packets.unwrap() - } - - async fn store( - &self, - state: &mut MutexGuard<'_, State>, + inner: &mut SinkHandlerInner, pad: &gst::Pad, element: &gst::Element, buffer: gst::Buffer, ) -> Result { + let jb = JitterBuffer::from_instance(element); + let mut state = jb.state.lock().unwrap(); + let (max_misorder_time, max_dropout_time) = { - let settings = self.settings.lock().await; + let settings = jb.settings.lock().unwrap(); (settings.max_misorder_time, settings.max_dropout_time) }; @@ -637,29 +429,29 @@ impl JitterBuffer { dts = get_current_running_time(element); pts = dts; - estimated_dts = state.clock_rate != -1; + estimated_dts = state.clock_rate.is_some(); } else { dts = state.segment.to_running_time(dts); } - if state.clock_rate == -1 { - state.ips_rtptime = rtptime; - state.ips_pts = pts; + if state.clock_rate.is_none() { + inner.ips_rtptime = Some(rtptime); + inner.ips_pts = pts; } - if state.last_pt != pt as u32 { - state.last_pt = pt as u32; - state.clock_rate = -1; + if inner.last_pt != Some(pt) { + inner.last_pt = Some(pt); + state.clock_rate = None; gst_debug!(CAT, obj: pad, "New payload type: {}", pt); if let Some(caps) = pad.get_current_caps() { /* Ignore errors at this point, as we want to emit request-pt-map */ - let _ = self.parse_caps(state, element, &caps, pt); + let _ = self.parse_caps(inner, &mut state, element, &caps, pt); } } - if state.clock_rate == -1 { + if state.clock_rate.is_none() { let caps = element .emit("request-pt-map", &[&(pt as u32)]) .map_err(|_| gst::FlowError::Error)? @@ -667,15 +459,15 @@ impl JitterBuffer { .get::() .map_err(|_| gst::FlowError::Error)? .ok_or(gst::FlowError::Error)?; - self.parse_caps(state, element, &caps, pt)?; + self.parse_caps(inner, &mut state, element, &caps, pt)?; } - state.packet_rate_ctx.update(seq, rtptime); + inner.packet_rate_ctx.update(seq, rtptime); - let max_dropout = state + let max_dropout = inner .packet_rate_ctx .get_max_dropout(max_dropout_time as i32); - let max_misorder = state + let max_misorder = inner .packet_rate_ctx .get_max_dropout(max_misorder_time as i32); @@ -698,13 +490,13 @@ impl JitterBuffer { return Ok(gst::FlowSuccess::Ok); } - if state.last_in_seqnum != std::u32::MAX { - let gap = gst_rtp::compare_seqnum(state.last_in_seqnum as u16, seq); + if let Some(last_in_seqnum) = inner.last_in_seqnum { + let gap = gst_rtp::compare_seqnum(last_in_seqnum as u16, seq); if gap == 1 { - self.calculate_packet_spacing(state, rtptime, pts); + self.calculate_packet_spacing(inner, &mut state, rtptime, pts); } else { if (gap != -1 && gap < -(max_misorder as i32)) || (gap >= max_dropout as i32) { - let reset = self.handle_big_gap_buffer(state, element, buffer, pt); + let reset = self.handle_big_gap_buffer(inner, element, buffer, pt); if reset { // Handle reset in `enqueue_item` to avoid recursion return Err(gst::FlowError::CustomError); @@ -712,29 +504,29 @@ impl JitterBuffer { return Ok(gst::FlowSuccess::Ok); } } - state.ips_pts = gst::CLOCK_TIME_NONE; - state.ips_rtptime = 0; + inner.ips_pts = gst::CLOCK_TIME_NONE; + inner.ips_rtptime = None; } - state.gap_packets.as_mut().unwrap().clear(); + inner.gap_packets.clear(); } - if state.last_popped_seqnum != std::u32::MAX { - let gap = gst_rtp::compare_seqnum(state.last_popped_seqnum as u16, seq); + if let Some(last_popped_seqnum) = state.last_popped_seqnum { + let gap = gst_rtp::compare_seqnum(last_popped_seqnum, seq); if gap <= 0 { - state.num_late += 1; + state.stats.num_late += 1; gst_debug!(CAT, obj: element, "Dropping late {}", seq); return Ok(gst::FlowSuccess::Ok); } } - state.last_in_seqnum = seq as u32; + inner.last_in_seqnum = Some(seq); let jb_item = if estimated_dts { - RTPJitterBufferItem::new(buffer, gst::CLOCK_TIME_NONE, pts, seq as u32, rtptime) + RTPJitterBufferItem::new(buffer, gst::CLOCK_TIME_NONE, pts, Some(seq), rtptime) } else { - RTPJitterBufferItem::new(buffer, dts, pts, seq as u32, rtptime) + RTPJitterBufferItem::new(buffer, dts, pts, Some(seq), rtptime) }; let (success, _, _) = state.jbuf.borrow().insert(jb_item); @@ -744,7 +536,7 @@ impl JitterBuffer { return Ok(gst::FlowSuccess::Ok); } - if rtptime == state.last_rtptime { + if Some(rtptime) == inner.last_rtptime { state.equidistant -= 2; } else { state.equidistant += 1; @@ -752,15 +544,19 @@ impl JitterBuffer { state.equidistant = min(max(state.equidistant, -7), 7); - state.last_rtptime = rtptime; + inner.last_rtptime = Some(rtptime); if state.earliest_pts.is_none() || (pts.is_some() && (pts < state.earliest_pts - || (pts == state.earliest_pts && seq > state.earliest_seqnum))) + || (pts == state.earliest_pts + && state + .earliest_seqnum + .map(|earliest_seqnum| seq > earliest_seqnum) + .unwrap_or(false)))) { state.earliest_pts = pts; - state.earliest_seqnum = seq; + state.earliest_seqnum = Some(seq); } gst_log!(CAT, obj: pad, "Stored buffer"); @@ -768,191 +564,387 @@ impl JitterBuffer { Ok(gst::FlowSuccess::Ok) } - async fn push_lost_events( + fn enqueue_item( &self, - state: &mut MutexGuard<'_, State>, + pad: &gst::Pad, element: &gst::Element, - seqnum: u32, - pts: gst::ClockTime, - discont: &mut bool, + buffer: Option, ) -> Result { - let (latency_ns, do_lost) = { - let settings = self.settings.lock().await; - ( - settings.latency_ms as i64 * gst::MSECOND.nseconds().unwrap() as i64, - settings.do_lost, - ) - }; + let mut inner = self.0.lock().unwrap(); - let mut ret = true; + let mut buffers = VecDeque::new(); + if let Some(buf) = buffer { + buffers.push_back(buf); + } - gst_debug!( - CAT, - obj: element, - "Pushing lost events seq: {}, last popped seq: {}", - seqnum, - state.last_popped_seqnum - ); - - if state.last_popped_seqnum != std::u32::MAX { - let mut lost_seqnum = ((state.last_popped_seqnum + 1) & 0xffff) as i64; - let gap = gst_rtp::compare_seqnum(lost_seqnum as u16, seqnum as u16) as i64; - - if gap > 0 { - let interval = pts.nseconds().unwrap() as i64 - - state.last_popped_pts.nseconds().unwrap() as i64; - let spacing = if interval >= 0 { - interval / (gap as i64 + 1) - } else { - 0 - }; - - *discont = true; - - if state.equidistant > 0 && gap > 1 && gap * spacing > latency_ns { - let n_packets = gap - latency_ns / spacing; - - if do_lost { - let s = gst::Structure::new( - "GstRTPPacketLost", - &[ - ("seqnum", &(lost_seqnum as u32)), - ( - "timestamp", - &(state.last_popped_pts + gst::ClockTime(Some(spacing as u64))), - ), - ("duration", &((n_packets * spacing) as u64)), - ("retry", &0), - ], - ); - - let event = gst::Event::new_custom_downstream(s).build(); - - ret = self.src_pad.push_event(event).await; + // This is to avoid recursion with `store`, `reset` and `enqueue_item` + while let Some(buf) = buffers.pop_front() { + if let Err(err) = self.store(&mut inner, pad, element, buf) { + match err { + gst::FlowError::CustomError => { + let jb = JitterBuffer::from_instance(element); + let mut state = jb.state.lock().unwrap(); + for gap_packet in self.reset(&mut inner, &mut state, element) { + buffers.push_back(gap_packet.buffer); + } } - - lost_seqnum = (lost_seqnum + n_packets) & 0xffff; - state.last_popped_pts += gst::ClockTime(Some((n_packets * spacing) as u64)); - state.num_lost += n_packets as u64; - - if !ret { - return Err(gst::FlowError::Error); - } - } - - while lost_seqnum != seqnum as i64 { - let timestamp = state.last_popped_pts + gst::ClockTime(Some(spacing as u64)); - let duration = if state.equidistant > 0 { spacing } else { 0 }; - - state.last_popped_pts = timestamp; - - if do_lost { - let s = gst::Structure::new( - "GstRTPPacketLost", - &[ - ("seqnum", &(lost_seqnum as u32)), - ("timestamp", ×tamp), - ("duration", &(duration as u64)), - ("retry", &0), - ], - ); - - let event = gst::Event::new_custom_downstream(s).build(); - - ret = self.src_pad.push_event(event).await; - } - - state.num_lost += 1; - - if !ret { - break; - } - - lost_seqnum = (lost_seqnum + 1) & 0xffff; + other => return Err(other), } } } - if ret { - Ok(gst::FlowSuccess::Ok) - } else { - Err(gst::FlowError::Error) - } - } + let jb = JitterBuffer::from_instance(element); + let mut state = jb.state.lock().unwrap(); - async fn pop_and_push( - &self, - state: &mut MutexGuard<'_, State>, - element: &gst::Element, - ) -> Result { - let mut discont = false; - let (jb_item, _) = state.jbuf.borrow().pop(); - - let dts = jb_item.get_dts(); - let pts = jb_item.get_pts(); - let seq = jb_item.get_seqnum(); - let mut buffer = jb_item.get_buffer(); - - let buffer = buffer.make_mut(); - - buffer.set_dts(state.segment.to_running_time(dts)); - buffer.set_pts(state.segment.to_running_time(pts)); - - if state.last_popped_pts.is_some() && buffer.get_pts() < state.last_popped_pts { - buffer.set_pts(state.last_popped_pts) - } - - self.push_lost_events(state, element, seq, pts, &mut discont) - .await?; - - if state.discont { - discont = true; - state.discont = false; - } - - state.last_popped_pts = buffer.get_pts(); - if let Some(pts) = state.last_popped_pts.nseconds() { - state.src_pad_handler.0.position.store(pts, Relaxed); - } - state.last_popped_seqnum = seq; - - if discont { - buffer.set_flags(gst::BufferFlags::DISCONT); - } - - state.num_pushed += 1; - - gst_debug!(CAT, obj: self.src_pad.gst_pad(), "Pushing {:?} with seq {}", buffer, seq); - - self.src_pad.push(buffer.to_owned()).await - } - - async fn schedule(&self, state: &mut MutexGuard<'_, State>, element: &gst::Element) { - let (latency_ns, context_wait_ns) = { - let settings = self.settings.lock().await; + let (latency, context_wait) = { + let settings = jb.settings.lock().unwrap(); ( settings.latency_ms as u64 * gst::MSECOND, settings.context_wait as u64 * gst::MSECOND, ) }; + // Reschedule if needed + let (_, next_wakeup) = + jb.src_pad_handler + .get_next_wakeup(&element, &state, latency, context_wait); + if let Some((next_wakeup, _)) = next_wakeup { + if let Some((previous_next_wakeup, ref abort_handle)) = state.wait_handle { + if previous_next_wakeup.is_none() || previous_next_wakeup > next_wakeup { + gst_debug!( + CAT, + obj: pad, + "Rescheduling for new item {} < {}", + next_wakeup, + previous_next_wakeup + ); + abort_handle.abort(); + state.wait_handle = None; + } + } + } + state.last_res + } +} + +impl PadSinkHandler for SinkHandler { + type ElementImpl = JitterBuffer; + + fn sink_chain( + &self, + pad: &PadSinkRef, + _jb: &JitterBuffer, + element: &gst::Element, + buffer: gst::Buffer, + ) -> BoxFuture<'static, Result> { + let pad_weak = pad.downgrade(); + let element = element.clone(); + let this = self.clone(); + + async move { + let pad = pad_weak.upgrade().expect("PadSink no longer exists"); + + gst_debug!(CAT, obj: pad.gst_pad(), "Handling {:?}", buffer); + this.enqueue_item(pad.gst_pad(), &element, Some(buffer)) + } + .boxed() + } + + fn sink_event( + &self, + pad: &PadSinkRef, + jb: &JitterBuffer, + _element: &gst::Element, + event: gst::Event, + ) -> bool { + use gst::EventView; + + gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event); + + match event.view() { + EventView::FlushStart(..) => { + jb.src_pad.cancel_task(); + } + _ => (), + } + + gst_log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", event); + jb.src_pad.gst_pad().push_event(event) + } + + fn sink_event_serialized( + &self, + pad: &PadSinkRef, + _jb: &JitterBuffer, + element: &gst::Element, + event: gst::Event, + ) -> BoxFuture<'static, bool> { + use gst::EventView; + + let pad_weak = pad.downgrade(); + let element = element.clone(); + + async move { + let pad = pad_weak.upgrade().expect("PadSink no longer exists"); + + gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event); + + let jb = JitterBuffer::from_instance(&element); + + let mut forward = true; + match event.view() { + EventView::Segment(e) => { + let mut state = jb.state.lock().unwrap(); + state.segment = e + .get_segment() + .clone() + .downcast::() + .unwrap(); + } + EventView::FlushStop(..) => { + jb.flush_stop(&element); + } + EventView::Eos(..) => { + let mut state = jb.state.lock().unwrap(); + state.eos = true; + if let Some((_, abort_handle)) = state.wait_handle.take() { + abort_handle.abort(); + } + forward = false; + } + _ => (), + }; + + if forward { + // FIXME: These events should really be queued up and stay in order + gst_log!(CAT, obj: pad.gst_pad(), "Forwarding serialized {:?}", event); + jb.src_pad.push_event(event).await + } else { + true + } + } + .boxed() + } +} + +#[derive(Clone)] +struct SrcHandler; + +impl SrcHandler { + fn new() -> Self { + SrcHandler + } + + fn clear(&self) {} + + fn generate_lost_events( + &self, + state: &mut State, + element: &gst::Element, + seqnum: u16, + pts: gst::ClockTime, + discont: &mut bool, + ) -> Vec { + let (latency_ns, do_lost) = { + let jb = JitterBuffer::from_instance(element); + let settings = jb.settings.lock().unwrap(); + ( + settings.latency_ms as u64 * gst::MSECOND.nseconds().unwrap(), + settings.do_lost, + ) + }; + + let mut events = vec![]; + + let last_popped_seqnum = match state.last_popped_seqnum { + None => return events, + Some(seq) => seq, + }; + + gst_debug!( + CAT, + obj: element, + "Generating lost events seq: {}, last popped seq: {:?}", + seqnum, + last_popped_seqnum, + ); + + let mut lost_seqnum = last_popped_seqnum.wrapping_add(1); + let gap = gst_rtp::compare_seqnum(lost_seqnum, seqnum) as i64; + + if gap > 0 { + let interval = + pts.nseconds().unwrap() as i64 - state.last_popped_pts.nseconds().unwrap() as i64; + let gap = gap as u64; + let spacing = if interval >= 0 { + interval as u64 / (gap + 1) + } else { + 0 + }; + + *discont = true; + + if state.equidistant > 0 && gap > 1 && gap * spacing > latency_ns { + let n_packets = gap - latency_ns / spacing; + + if do_lost { + let s = gst::Structure::new( + "GstRTPPacketLost", + &[ + ("seqnum", &(lost_seqnum as u32)), + ( + "timestamp", + &(state.last_popped_pts + gst::ClockTime(Some(spacing))), + ), + ("duration", &(n_packets * spacing)), + ("retry", &0), + ], + ); + + events.push(gst::Event::new_custom_downstream(s).build()); + } + + lost_seqnum = lost_seqnum.wrapping_add(n_packets as u16); + state.last_popped_pts += gst::ClockTime(Some(n_packets * spacing)); + state.stats.num_lost += n_packets; + } + + while lost_seqnum != seqnum { + let timestamp = state.last_popped_pts + gst::ClockTime(Some(spacing)); + let duration = if state.equidistant > 0 { spacing } else { 0 }; + + state.last_popped_pts = timestamp; + + if do_lost { + let s = gst::Structure::new( + "GstRTPPacketLost", + &[ + ("seqnum", &(lost_seqnum as u32)), + ("timestamp", ×tamp), + ("duration", &duration), + ("retry", &0), + ], + ); + + events.push(gst::Event::new_custom_downstream(s).build()); + } + + state.stats.num_lost += 1; + + lost_seqnum = lost_seqnum.wrapping_add(1); + } + } + + events + } + + async fn pop_and_push( + &self, + element: &gst::Element, + ) -> Result { + let jb = JitterBuffer::from_instance(element); + + let (lost_events, buffer, seq) = { + let mut state = jb.state.lock().unwrap(); + + let mut discont = false; + let (jb_item, _) = state.jbuf.borrow().pop(); + + let jb_item = match jb_item { + None => { + if state.eos { + return Err(gst::FlowError::Eos); + } else { + return Ok(gst::FlowSuccess::Ok); + } + } + Some(item) => item, + }; + + let dts = jb_item.get_dts(); + let pts = jb_item.get_pts(); + let seq = jb_item.get_seqnum(); + let mut buffer = jb_item.into_buffer(); + + let lost_events = { + let buffer = buffer.make_mut(); + + buffer.set_dts(state.segment.to_running_time(dts)); + buffer.set_pts(state.segment.to_running_time(pts)); + + if state.last_popped_pts.is_some() && buffer.get_pts() < state.last_popped_pts { + buffer.set_pts(state.last_popped_pts) + } + + let lost_events = if let Some(seq) = seq { + self.generate_lost_events(&mut state, element, seq, pts, &mut discont) + } else { + vec![] + }; + + if state.discont { + discont = true; + state.discont = false; + } + + if discont { + buffer.set_flags(gst::BufferFlags::DISCONT); + } + + lost_events + }; + + state.last_popped_pts = buffer.get_pts(); + if let Some(pts) = state.last_popped_pts.nseconds() { + state.position = pts.into(); + } + state.last_popped_seqnum = seq; + + state.stats.num_pushed += 1; + + (lost_events, buffer, seq) + }; + + for event in lost_events { + gst_debug!(CAT, obj: jb.src_pad.gst_pad(), "Pushing lost event {:?}", event); + let _ = jb.src_pad.push_event(event).await; + } + + gst_debug!(CAT, obj: jb.src_pad.gst_pad(), "Pushing {:?} with seq {:?}", buffer, seq); + + jb.src_pad.push(buffer).await + } + + fn get_next_wakeup( + &self, + element: &gst::Element, + state: &State, + latency: gst::ClockTime, + context_wait: gst::ClockTime, + ) -> (gst::ClockTime, Option<(gst::ClockTime, Duration)>) { let now = get_current_running_time(element); gst_debug!( CAT, obj: element, - "now is {}, earliest pts is {}, packet_spacing {} and latency {}", + "Now is {}, EOS {}, earliest pts is {}, packet_spacing {} and latency {}", now, + state.eos, state.earliest_pts, state.packet_spacing, - latency_ns + latency ); - if state.earliest_pts.is_none() { - return; + if state.eos { + gst_debug!(CAT, obj: element, "EOS, not waiting"); + return (now, Some((now, Duration::from_nanos(0)))); } - let next_wakeup = state.earliest_pts + latency_ns - state.packet_spacing; + if state.earliest_pts.is_none() { + return (now, None); + } + + let next_wakeup = state.earliest_pts + latency - state.packet_spacing - context_wait / 2; let delay = { if next_wakeup > now { @@ -962,233 +954,397 @@ impl JitterBuffer { } }; - if let Some(wakeup_abort_handle) = state.wakeup_abort_handle.take() { - wakeup_abort_handle.abort(); - } - if let Some(wakeup_join_handle) = state.wakeup_join_handle.take() { - let _ = wakeup_join_handle.await; - } + gst_debug!( + CAT, + obj: element, + "Next wakeup at {} with delay {}", + next_wakeup, + delay + ); - gst_debug!(CAT, obj: element, "Scheduling wakeup in {}", delay); - - let (wakeup_fut, abort_handle) = abortable(Self::wakeup_fut( - Duration::from_nanos(delay), - latency_ns, - context_wait_ns, - &element, - self.src_pad.downgrade(), - )); - state.wakeup_join_handle = Some(self.src_pad.spawn(wakeup_fut)); - state.wakeup_abort_handle = Some(abort_handle); + (now, Some((next_wakeup, Duration::from_nanos(delay)))) } - fn wakeup_fut( - delay: Duration, - latency_ns: gst::ClockTime, - context_wait_ns: gst::ClockTime, - element: &gst::Element, - pad_src_weak: PadSrcWeak, - ) -> BoxFuture<'static, ()> { + fn start_task(&self, pad: PadSrcRef<'_>, element: &gst::Element) { + let this = self.clone(); let element = element.clone(); - async move { - runtime::time::delay_for(delay).await; - let jb = Self::from_instance(&element); - let mut state = jb.state.lock().await; + pad.start_task(move || { + let this = this.clone(); + let element = element.clone(); - let pad_src = match pad_src_weak.upgrade() { - Some(pad_src) => pad_src, - None => return, - }; + async move { + let jb = JitterBuffer::from_instance(&element); + let (latency, context_wait) = { + let settings = jb.settings.lock().unwrap(); + ( + settings.latency_ms as u64 * gst::MSECOND, + settings.context_wait as u64 * gst::MSECOND, + ) + }; - let pad_ctx = pad_src.pad_context(); - let pad_ctx = match pad_ctx.upgrade() { - Some(pad_ctx) => pad_ctx, - None => return, - }; - - let now = get_current_running_time(&element); - - gst_debug!( - CAT, - obj: &element, - "Woke back up, earliest_pts {}", - state.earliest_pts - ); - - /* Check earliest PTS as we have just taken the lock */ - if state.earliest_pts.is_some() - && state.earliest_pts + latency_ns - state.packet_spacing - context_wait_ns / 2 - < now - { loop { - let (head_pts, head_seq) = state.jbuf.borrow().peek(); + let delay_fut = { + let mut state = jb.state.lock().unwrap(); + let (_, next_wakeup) = + this.get_next_wakeup(&element, &state, latency, context_wait); - state.last_res = jb.pop_and_push(&mut state, &element).await; + let (delay_fut, abort_handle) = match next_wakeup { + Some((_, delay)) if delay == Duration::from_nanos(0) => (None, None), + _ => { + let (delay_fut, abort_handle) = abortable(async move { + match next_wakeup { + Some((_, delay)) => { + runtime::time::delay_for(delay).await; + } + None => { + future::pending::<()>().await; + } + }; + }); - if let Some(drain_fut) = pad_ctx.drain_pending_tasks() { - let (abortable_drain, abort_handle) = abortable(drain_fut); - state.task_queue_abort_handle = Some(abort_handle); + let next_wakeup = + next_wakeup.map(|w| w.0).unwrap_or(gst::CLOCK_TIME_NONE); + (Some(delay_fut), Some((next_wakeup, abort_handle))) + } + }; - pad_src.spawn(abortable_drain.map(drop)); - } else { - state.task_queue_abort_handle = None; - } + state.wait_handle = abort_handle; - let has_pending_tasks = state.task_queue_abort_handle.is_some(); + delay_fut + }; - if head_pts == state.earliest_pts && head_seq == state.earliest_seqnum as u32 { - let (earliest_pts, earliest_seqnum) = state.jbuf.borrow().find_earliest(); - state.earliest_pts = earliest_pts; - state.earliest_seqnum = earliest_seqnum as u16; - } - - if has_pending_tasks - || state.earliest_pts.is_none() - || state.earliest_pts + latency_ns - state.packet_spacing >= now - { - break; - } - } - } - - jb.schedule(&mut state, &element).await; - } - .boxed() - } - - async fn enqueue_item( - &self, - pad: &gst::Pad, - element: &gst::Element, - buffer: Option, - ) -> Result { - let mut state = self.state.lock().await; - - let mut buffers = VecDeque::new(); - if let Some(buf) = buffer { - buffers.push_back(buf); - } - - // This is to avoid recursion with `store`, `reset` and `enqueue_item` - while let Some(buf) = buffers.pop_front() { - if let Err(err) = self.store(&mut state, pad, element, buf).await { - match err { - gst::FlowError::CustomError => { - for gap_packet in &self.reset(&mut state, element) { - buffers.push_back(gap_packet.0.to_owned()); + // Got aborted, reschedule if needed + if let Some(delay_fut) = delay_fut { + gst_debug!(CAT, obj: &element, "Waiting"); + if let Err(Aborted) = delay_fut.await { + gst_debug!(CAT, obj: &element, "Waiting aborted"); + return glib::Continue(true); + } + } + + let (head_pts, head_seq) = { + let state = jb.state.lock().unwrap(); + // + // Check earliest PTS as we have just taken the lock + let (now, next_wakeup) = + this.get_next_wakeup(&element, &state, latency, context_wait); + + gst_debug!( + CAT, + obj: &element, + "Woke up at {}, earliest_pts {}", + now, + state.earliest_pts + ); + + if let Some((next_wakeup, _)) = next_wakeup { + if next_wakeup > now { + // Reschedule and wait a bit longer in the next iteration + return glib::Continue(true); + } + } else { + return glib::Continue(true); + } + + let (head_pts, head_seq) = state.jbuf.borrow().peek(); + + (head_pts, head_seq) + }; + + let res = this.pop_and_push(&element).await; + + { + let mut state = jb.state.lock().unwrap(); + + state.last_res = res; + + if head_pts == state.earliest_pts && head_seq == state.earliest_seqnum { + let (earliest_pts, earliest_seqnum) = + state.jbuf.borrow().find_earliest(); + state.earliest_pts = earliest_pts; + state.earliest_seqnum = earliest_seqnum; + } + + if res.is_ok() { + // Return and reschedule if the next packet would be in the future + // Check earliest PTS as we have just taken the lock + let (now, next_wakeup) = + this.get_next_wakeup(&element, &state, latency, context_wait); + if let Some((next_wakeup, _)) = next_wakeup { + if next_wakeup > now { + // Reschedule and wait a bit longer in the next iteration + return glib::Continue(true); + } + } else { + return glib::Continue(true); + } + } + } + + match res { + Ok(_) => (), + Err(gst::FlowError::Eos) => { + gst_debug!(CAT, obj: &element, "Pushing EOS event",); + let event = gst::Event::new_eos().build(); + let _ = jb.src_pad.push_event(event).await; + return glib::Continue(false); + } + Err(gst::FlowError::Flushing) => { + gst_debug!(CAT, obj: &element, "Flushing",); + return glib::Continue(false); + } + Err(err) => { + gst_error!(CAT, obj: &element, "Error {}", err,); + return glib::Continue(false); } } - other => return Err(other), } } + }); + } +} + +impl PadSrcHandler for SrcHandler { + type ElementImpl = JitterBuffer; + + fn src_event( + &self, + pad: &PadSrcRef, + jb: &JitterBuffer, + element: &gst::Element, + event: gst::Event, + ) -> bool { + use gst::EventView; + + gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event); + + match event.view() { + EventView::FlushStart(..) => { + jb.src_pad.cancel_task(); + } + EventView::FlushStop(..) => { + jb.flush_stop(element); + } + _ => (), } - self.schedule(&mut state, element).await; - - state.last_res + gst_log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", event); + jb.sink_pad.gst_pad().push_event(event) } - async fn drain(&self, state: &mut MutexGuard<'_, State>, element: &gst::Element) -> bool { - let mut ret = true; + fn src_query( + &self, + pad: &PadSrcRef, + jb: &JitterBuffer, + _element: &gst::Element, + query: &mut gst::QueryRef, + ) -> bool { + use gst::QueryView; - loop { - let (head_pts, _) = state.jbuf.borrow().peek(); + gst_log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", query); - if head_pts == gst::CLOCK_TIME_NONE { - break; + match query.view_mut() { + QueryView::Latency(ref mut q) => { + let mut peer_query = gst::query::Query::new_latency(); + + let ret = jb.sink_pad.gst_pad().peer_query(&mut peer_query); + + if ret { + let settings = jb.settings.lock().unwrap(); + let (_, mut min_latency, _) = peer_query.get_result(); + min_latency += (settings.latency_ms as u64) * gst::SECOND; + let max_latency = gst::CLOCK_TIME_NONE; + + q.set(true, min_latency, max_latency); + } + + ret } - - if self.pop_and_push(state, element).await.is_err() { - ret = false; - break; + QueryView::Position(ref mut q) => { + if q.get_format() != gst::Format::Time { + jb.sink_pad.gst_pad().peer_query(query) + } else { + let state = jb.state.lock().unwrap(); + let position = state.position; + q.set(position); + true + } } + _ => jb.sink_pad.gst_pad().peer_query(query), } - - ret } +} - async fn flush(&self, element: &gst::Element) { - let mut state = self.state.lock().await; +#[derive(Debug)] +struct Stats { + num_pushed: u64, + num_lost: u64, + num_late: u64, +} - gst_info!(CAT, obj: element, "Flushing"); - - *state = State::default(); +impl Default for Stats { + fn default() -> Self { + Self { + num_pushed: 0, + num_lost: 0, + num_late: 0, + } } +} - async fn clear_pt_map(&self, element: &gst::Element) { +// Shared state between element, sink and source pad +struct State { + jbuf: glib::SendUniqueCell, + + last_res: Result, + position: gst::ClockTime, + + segment: gst::FormattedSegment, + clock_rate: Option, + + packet_spacing: gst::ClockTime, + equidistant: i32, + + discont: bool, + eos: bool, + + last_popped_seqnum: Option, + last_popped_pts: gst::ClockTime, + + stats: Stats, + + earliest_pts: gst::ClockTime, + earliest_seqnum: Option, + + wait_handle: Option<(gst::ClockTime, AbortHandle)>, +} + +impl Default for State { + fn default() -> State { + State { + jbuf: glib::SendUniqueCell::new(RTPJitterBuffer::new()).unwrap(), + + last_res: Ok(gst::FlowSuccess::Ok), + position: gst::CLOCK_TIME_NONE, + + segment: gst::FormattedSegment::::new(), + clock_rate: None, + + packet_spacing: gst::ClockTime(Some(0)), + equidistant: 0, + + discont: true, + eos: false, + + last_popped_seqnum: None, + last_popped_pts: gst::CLOCK_TIME_NONE, + + stats: Stats::default(), + + earliest_pts: gst::CLOCK_TIME_NONE, + earliest_seqnum: None, + + wait_handle: None, + } + } +} + +struct JitterBuffer { + sink_pad: PadSink, + src_pad: PadSrc, + sink_pad_handler: SinkHandler, + src_pad_handler: SrcHandler, + state: StdMutex, + settings: StdMutex, +} + +lazy_static! { + static ref CAT: gst::DebugCategory = gst::DebugCategory::new( + "ts-jitterbuffer", + gst::DebugColorFlags::empty(), + Some("Thread-sharing jitterbuffer"), + ); +} + +impl JitterBuffer { + fn clear_pt_map(&self, element: &gst::Element) { gst_info!(CAT, obj: element, "Clearing PT map"); - let mut state = self.state.lock().await; - state.clock_rate = -1; + let mut state = self.state.lock().unwrap(); + state.clock_rate = None; state.jbuf.borrow().reset_skew(); } - async fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> { - let mut state = self.state.lock().await; + fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> { gst_info!(CAT, obj: element, "Preparing"); - let (context, latency) = { - let settings = self.settings.lock().await; + let context = { + let settings = self.settings.lock().unwrap(); let context = Context::acquire(&settings.context, settings.context_wait).unwrap(); - let latency = settings.latency_ms as u64 * gst::MSECOND; - (context, latency) + context }; - state.src_pad_handler = JitterBufferPadSrcHandler::new(latency); - - let _ = self - .src_pad - .prepare(context, &state.src_pad_handler) - .await + self.src_pad + .prepare(context, &self.src_pad_handler) .map_err(|err| { gst_error_msg!( gst::ResourceError::OpenRead, ["Error preparing src_pad: {:?}", err] - ); - gst::StateChangeError - }); + ) + })?; - self.sink_pad.prepare(&JitterBufferPadSinkHandler).await; + self.sink_pad.prepare(&self.sink_pad_handler); gst_info!(CAT, obj: element, "Prepared"); Ok(()) } - async fn unprepare(&self, element: &gst::Element) -> Result<(), ()> { - let mut state = self.state.lock().await; + fn unprepare(&self, element: &gst::Element) { gst_debug!(CAT, obj: element, "Unpreparing"); - self.sink_pad.unprepare().await; - let _ = self.src_pad.unprepare().await; - - self.reset(&mut state, element); - - if let Some(wakeup_abort_handle) = state.wakeup_abort_handle.take() { - wakeup_abort_handle.abort(); - } + self.sink_pad.unprepare(); + let _ = self.src_pad.unprepare(); gst_debug!(CAT, obj: element, "Unprepared"); - - Ok(()) } - async fn stop(&self, element: &gst::Element) -> Result<(), ()> { - let mut state = self.state.lock().await; + fn start(&self, element: &gst::Element) { + let mut state = self.state.lock().unwrap(); + + gst_debug!(CAT, obj: element, "Starting"); + + *state = State::default(); + self.sink_pad_handler.clear(); + self.src_pad_handler.clear(); + + self.src_pad_handler + .start_task(self.src_pad.as_ref(), element); + + gst_debug!(CAT, obj: element, "Started"); + } + + fn stop(&self, element: &gst::Element) { + let mut state = self.state.lock().unwrap(); gst_debug!(CAT, obj: element, "Stopping"); - if let Some(wakeup_abort_handle) = state.wakeup_abort_handle.take() { - wakeup_abort_handle.abort(); - } - - if let Some(abort_handle) = state.task_queue_abort_handle.take() { + if let Some((_, abort_handle)) = state.wait_handle.take() { abort_handle.abort(); } - gst_debug!(CAT, obj: element, "Stopped"); + self.src_pad.stop_task(); - Ok(()) + self.src_pad_handler.clear(); + self.sink_pad_handler.clear(); + *state = State::default(); + + gst_debug!(CAT, obj: element, "Stopped"); + } + + fn flush_stop(&self, element: &gst::Element) { + self.src_pad.stop_task(); + self.start(element); } } @@ -1235,8 +1391,8 @@ impl ObjectSubclass for JitterBuffer { .get::() .expect("signal arg") .expect("missing signal arg"); - let jitterbuffer = Self::from_instance(&element); - runtime::executor::block_on(jitterbuffer.clear_pt_map(&element)); + let jb = Self::from_instance(&element); + jb.clear_pt_map(&element); None }, ); @@ -1262,8 +1418,10 @@ impl ObjectSubclass for JitterBuffer { Self { sink_pad, src_pad, - state: Mutex::new(State::default()), - settings: Mutex::new(Settings::default()), + sink_pad_handler: SinkHandler::new(), + src_pad_handler: SrcHandler::new(), + state: StdMutex::new(State::default()), + settings: StdMutex::new(Settings::default()), } } } @@ -1271,45 +1429,45 @@ impl ObjectSubclass for JitterBuffer { impl ObjectImpl for JitterBuffer { glib_object_impl!(); - fn set_property(&self, _obj: &glib::Object, id: usize, value: &glib::Value) { + fn set_property(&self, obj: &glib::Object, id: usize, value: &glib::Value) { let prop = &PROPERTIES[id]; match *prop { subclass::Property("latency", ..) => { let latency_ms = { - let mut settings = runtime::executor::block_on(self.settings.lock()); + let mut settings = self.settings.lock().unwrap(); settings.latency_ms = value.get_some().expect("type checked upstream"); settings.latency_ms as u64 }; - runtime::executor::block_on(self.state.lock()) - .jbuf - .borrow() - .set_delay(latency_ms * gst::MSECOND); + let state = self.state.lock().unwrap(); + state.jbuf.borrow().set_delay(latency_ms * gst::MSECOND); - /* TODO: post message */ + let element = obj.downcast_ref::().unwrap(); + let _ = + element.post_message(&gst::Message::new_latency().src(Some(element)).build()); } subclass::Property("do-lost", ..) => { - let mut settings = runtime::executor::block_on(self.settings.lock()); + let mut settings = self.settings.lock().unwrap(); settings.do_lost = value.get_some().expect("type checked upstream"); } subclass::Property("max-dropout-time", ..) => { - let mut settings = runtime::executor::block_on(self.settings.lock()); + let mut settings = self.settings.lock().unwrap(); settings.max_dropout_time = value.get_some().expect("type checked upstream"); } subclass::Property("max-misorder-time", ..) => { - let mut settings = runtime::executor::block_on(self.settings.lock()); + let mut settings = self.settings.lock().unwrap(); settings.max_misorder_time = value.get_some().expect("type checked upstream"); } subclass::Property("context", ..) => { - let mut settings = runtime::executor::block_on(self.settings.lock()); + let mut settings = self.settings.lock().unwrap(); settings.context = value .get() .expect("type checked upstream") .unwrap_or_else(|| "".into()); } subclass::Property("context-wait", ..) => { - let mut settings = runtime::executor::block_on(self.settings.lock()); + let mut settings = self.settings.lock().unwrap(); settings.context_wait = value.get_some().expect("type checked upstream"); } _ => unimplemented!(), @@ -1321,39 +1479,39 @@ impl ObjectImpl for JitterBuffer { match *prop { subclass::Property("latency", ..) => { - let settings = runtime::executor::block_on(self.settings.lock()); + let settings = self.settings.lock().unwrap(); Ok(settings.latency_ms.to_value()) } subclass::Property("do-lost", ..) => { - let settings = runtime::executor::block_on(self.settings.lock()); + let settings = self.settings.lock().unwrap(); Ok(settings.do_lost.to_value()) } subclass::Property("max-dropout-time", ..) => { - let settings = runtime::executor::block_on(self.settings.lock()); + let settings = self.settings.lock().unwrap(); Ok(settings.max_dropout_time.to_value()) } subclass::Property("max-misorder-time", ..) => { - let settings = runtime::executor::block_on(self.settings.lock()); + let settings = self.settings.lock().unwrap(); Ok(settings.max_misorder_time.to_value()) } subclass::Property("stats", ..) => { - let state = runtime::executor::block_on(self.state.lock()); + let state = self.state.lock().unwrap(); let s = gst::Structure::new( "application/x-rtp-jitterbuffer-stats", &[ - ("num-pushed", &state.num_pushed), - ("num-lost", &state.num_lost), - ("num-late", &state.num_late), + ("num-pushed", &state.stats.num_pushed), + ("num-lost", &state.stats.num_lost), + ("num-late", &state.stats.num_late), ], ); Ok(s.to_value()) } subclass::Property("context", ..) => { - let settings = runtime::executor::block_on(self.settings.lock()); + let settings = self.settings.lock().unwrap(); Ok(settings.context.to_value()) } subclass::Property("context-wait", ..) => { - let settings = runtime::executor::block_on(self.settings.lock()); + let settings = self.settings.lock().unwrap(); Ok(settings.context_wait.to_value()) } _ => unimplemented!(), @@ -1379,18 +1537,16 @@ impl ElementImpl for JitterBuffer { match transition { gst::StateChange::NullToReady => { - runtime::executor::block_on(self.prepare(element)).map_err(|err| { + self.prepare(element).map_err(|err| { element.post_error_message(&err); gst::StateChangeError })?; } gst::StateChange::PausedToReady => { - runtime::executor::block_on(self.stop(element)) - .map_err(|_| gst::StateChangeError)?; + self.stop(element); } gst::StateChange::ReadyToNull => { - runtime::executor::block_on(self.unprepare(element)) - .map_err(|_| gst::StateChangeError)?; + self.unprepare(element); } _ => (), } @@ -1399,6 +1555,7 @@ impl ElementImpl for JitterBuffer { match transition { gst::StateChange::ReadyToPaused => { + self.start(element); success = gst::StateChangeSuccess::NoPreroll; } gst::StateChange::PlayingToPaused => { diff --git a/gst-plugin-threadshare/src/jitterbuffer/mod.rs b/gst-plugin-threadshare/src/jitterbuffer/mod.rs index 2007445a..79148f16 100644 --- a/gst-plugin-threadshare/src/jitterbuffer/mod.rs +++ b/gst-plugin-threadshare/src/jitterbuffer/mod.rs @@ -18,6 +18,8 @@ use glib_sys as glib_ffi; use gstreamer_sys as gst_ffi; +use std::u32; + #[allow(clippy::module_inception)] pub mod jitterbuffer; @@ -178,7 +180,7 @@ impl RTPJitterBufferItem { buffer: gst::Buffer, dts: gst::ClockTime, pts: gst::ClockTime, - seqnum: u32, + seqnum: Option, rtptime: u32, ) -> RTPJitterBufferItem { unsafe { @@ -189,7 +191,7 @@ impl RTPJitterBufferItem { r#type: 0, dts: dts.to_glib(), pts: pts.to_glib(), - seqnum, + seqnum: seqnum.map(|s| s as u32).unwrap_or(u32::MAX), count: 1, rtptime, }))) @@ -222,9 +224,13 @@ impl RTPJitterBufferItem { } } - pub fn get_seqnum(&self) -> u32 { + pub fn get_seqnum(&self) -> Option { let item = self.0.as_ref().expect("Invalid wrapper"); - item.seqnum + if item.seqnum == u32::MAX { + None + } else { + Some(item.seqnum as u16) + } } #[allow(dead_code)] @@ -370,7 +376,7 @@ impl RTPJitterBuffer { } } - pub fn find_earliest(&self) -> (gst::ClockTime, u32) { + pub fn find_earliest(&self) -> (gst::ClockTime, Option) { unsafe { let mut pts = mem::MaybeUninit::uninit(); let mut seqnum = mem::MaybeUninit::uninit(); @@ -383,6 +389,12 @@ impl RTPJitterBuffer { let pts = pts.assume_init(); let seqnum = seqnum.assume_init(); + let seqnum = if seqnum == u32::MAX { + None + } else { + Some(seqnum as u16) + }; + if pts == gst_ffi::GST_CLOCK_TIME_NONE { (gst::CLOCK_TIME_NONE, seqnum) } else { @@ -391,25 +403,35 @@ impl RTPJitterBuffer { } } - pub fn pop(&self) -> (RTPJitterBufferItem, i32) { + pub fn pop(&self) -> (Option, i32) { unsafe { let mut percent = mem::MaybeUninit::uninit(); let item = ffi::rtp_jitter_buffer_pop(self.to_glib_none().0, percent.as_mut_ptr()); ( - RTPJitterBufferItem(Some(Box::from_raw(item))), + if item.is_null() { + None + } else { + Some(RTPJitterBufferItem(Some(Box::from_raw(item)))) + }, percent.assume_init(), ) } } - pub fn peek(&self) -> (gst::ClockTime, u32) { + pub fn peek(&self) -> (gst::ClockTime, Option) { unsafe { let item = ffi::rtp_jitter_buffer_peek(self.to_glib_none().0); if item.is_null() { - (gst::CLOCK_TIME_NONE, std::u32::MAX) + (gst::CLOCK_TIME_NONE, None) } else { - ((*item).pts.into(), (*item).seqnum) + let seqnum = (*item).seqnum; + let seqnum = if seqnum == u32::MAX { + None + } else { + Some(seqnum as u16) + }; + ((*item).pts.into(), seqnum) } } } diff --git a/gst-plugin-threadshare/src/lib.rs b/gst-plugin-threadshare/src/lib.rs index 3b6f286e..a2d0fad9 100644 --- a/gst-plugin-threadshare/src/lib.rs +++ b/gst-plugin-threadshare/src/lib.rs @@ -36,7 +36,7 @@ mod udpsrc; mod appsrc; pub mod dataqueue; -//mod jitterbuffer; +mod jitterbuffer; //mod proxy; mod queue; @@ -55,7 +55,7 @@ fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { queue::register(plugin)?; //proxy::register(plugin)?; appsrc::register(plugin)?; - //jitterbuffer::jitterbuffer::register(plugin)?; + jitterbuffer::jitterbuffer::register(plugin)?; Ok(()) }