diff --git a/net/rtpav1/src/pay/imp.rs b/net/rtpav1/src/pay/imp.rs index 230f0d4c..0841b897 100644 --- a/net/rtpav1/src/pay/imp.rs +++ b/net/rtpav1/src/pay/imp.rs @@ -17,7 +17,7 @@ use gst::{ use gst_rtp::{prelude::*, rtp_buffer::RTPBuffer, subclass::prelude::*, RTPBasePayload}; use std::{ io::{Cursor, Read, Seek, SeekFrom, Write}, - sync::{Mutex, MutexGuard}, + sync::Mutex, }; use bitstream_io::{BitReader, BitWriter}; @@ -109,14 +109,14 @@ impl RTPAv1Pay { /// Parses new OBUs, stores them in the state, /// and constructs and sends new RTP packets when appropriate. - fn handle_new_obus<'s>( - &'s self, + fn handle_new_obus( + &self, element: &::Type, - state: &mut MutexGuard<'s, State>, + state: &mut State, data: &[u8], dts: Option, pts: Option, - ) -> Result { + ) -> Result { let mut reader = Cursor::new(data); while reader.position() < data.len() as u64 { @@ -188,11 +188,16 @@ impl RTPAv1Pay { } } - while let Some(packet_data) = self.consider_new_packet(element, state, false) { - self.push_new_packet(element, state, packet_data)?; + let mut list = gst::BufferList::new(); + { + let list = list.get_mut().unwrap(); + while let Some(packet_data) = self.consider_new_packet(element, state, false) { + let buffer = self.generate_new_packet(element, state, packet_data)?; + list.add(buffer); + } } - Ok(FlowSuccess::Ok) + Ok(list) } /// Look at the size the currently stored OBUs would require, @@ -329,14 +334,14 @@ impl RTPAv1Pay { } } - /// Given the information returned by consider_new_packet(), construct and push + /// Given the information returned by consider_new_packet(), construct and return /// new RTP packet, filled with those OBUs. - fn push_new_packet<'s>( - &'s self, + fn generate_new_packet( + &self, element: &::Type, - state: &mut MutexGuard<'s, State>, + state: &mut State, packet: PacketOBUData, - ) -> Result { + ) -> Result { gst::log!( CAT, obj: element, @@ -461,10 +466,11 @@ impl RTPAv1Pay { gst::log!( CAT, obj: element, - "pushing RTP packet of size {}", + "generated RTP packet of size {}", outbuf.size() ); - element.push(outbuf) + + Ok(outbuf) } } @@ -590,24 +596,36 @@ impl RTPBasePayloadImpl for RTPAv1Pay { FlowError::Error })?; - self.handle_new_obus(element, &mut state, buffer.as_slice(), dts, pts) + let list = self.handle_new_obus(element, &mut state, buffer.as_slice(), dts, pts)?; + drop(state); + + if !list.is_empty() { + element.push_list(list) + } else { + Ok(gst::FlowSuccess::Ok) + } } fn sink_event(&self, element: &Self::Type, event: Event) -> bool { gst::log!(CAT, obj: element, "sink event: {}", event.type_()); if matches!(event.type_(), EventType::Eos) { - let mut state = self.state.lock().unwrap(); - // flush all remaining OBUs - while let Some(packet_data) = self.consider_new_packet(element, &mut state, true) { - if self - .push_new_packet(element, &mut state, packet_data) - .is_err() - { - break; + let mut list = gst::BufferList::new(); + { + let mut state = self.state.lock().unwrap(); + let list = list.get_mut().unwrap(); + + while let Some(packet_data) = self.consider_new_packet(element, &mut state, true) { + match self.generate_new_packet(element, &mut state, packet_data) { + Ok(buffer) => list.add(buffer), + Err(_) => break, + } } } + if !list.is_empty() { + let _ = element.push_list(list); + } } self.parent_sink_event(element, event)