rtpav1pay: Don't push buffers downstream while holding mutexes

And also push all packets that can be generated as a time as a single
buffer list instead of one by one.
This commit is contained in:
Sebastian Dröge 2022-09-12 17:50:07 +03:00
parent f9a8e121e1
commit 7edc9e656f

View file

@ -17,7 +17,7 @@ use gst::{
use gst_rtp::{prelude::*, rtp_buffer::RTPBuffer, subclass::prelude::*, RTPBasePayload};
use std::{
io::{Cursor, Read, Seek, SeekFrom, Write},
sync::{Mutex, MutexGuard},
sync::Mutex,
};
use bitstream_io::{BitReader, BitWriter};
@ -109,14 +109,14 @@ impl RTPAv1Pay {
/// Parses new OBUs, stores them in the state,
/// and constructs and sends new RTP packets when appropriate.
fn handle_new_obus<'s>(
&'s self,
fn handle_new_obus(
&self,
element: &<Self as ObjectSubclass>::Type,
state: &mut MutexGuard<'s, State>,
state: &mut State,
data: &[u8],
dts: Option<ClockTime>,
pts: Option<ClockTime>,
) -> Result<FlowSuccess, FlowError> {
) -> Result<gst::BufferList, FlowError> {
let mut reader = Cursor::new(data);
while reader.position() < data.len() as u64 {
@ -188,11 +188,16 @@ impl RTPAv1Pay {
}
}
while let Some(packet_data) = self.consider_new_packet(element, state, false) {
self.push_new_packet(element, state, packet_data)?;
let mut list = gst::BufferList::new();
{
let list = list.get_mut().unwrap();
while let Some(packet_data) = self.consider_new_packet(element, state, false) {
let buffer = self.generate_new_packet(element, state, packet_data)?;
list.add(buffer);
}
}
Ok(FlowSuccess::Ok)
Ok(list)
}
/// Look at the size the currently stored OBUs would require,
@ -329,14 +334,14 @@ impl RTPAv1Pay {
}
}
/// Given the information returned by consider_new_packet(), construct and push
/// Given the information returned by consider_new_packet(), construct and return
/// new RTP packet, filled with those OBUs.
fn push_new_packet<'s>(
&'s self,
fn generate_new_packet(
&self,
element: &<Self as ObjectSubclass>::Type,
state: &mut MutexGuard<'s, State>,
state: &mut State,
packet: PacketOBUData,
) -> Result<FlowSuccess, FlowError> {
) -> Result<gst::Buffer, FlowError> {
gst::log!(
CAT,
obj: element,
@ -461,10 +466,11 @@ impl RTPAv1Pay {
gst::log!(
CAT,
obj: element,
"pushing RTP packet of size {}",
"generated RTP packet of size {}",
outbuf.size()
);
element.push(outbuf)
Ok(outbuf)
}
}
@ -590,24 +596,36 @@ impl RTPBasePayloadImpl for RTPAv1Pay {
FlowError::Error
})?;
self.handle_new_obus(element, &mut state, buffer.as_slice(), dts, pts)
let list = self.handle_new_obus(element, &mut state, buffer.as_slice(), dts, pts)?;
drop(state);
if !list.is_empty() {
element.push_list(list)
} else {
Ok(gst::FlowSuccess::Ok)
}
}
fn sink_event(&self, element: &Self::Type, event: Event) -> bool {
gst::log!(CAT, obj: element, "sink event: {}", event.type_());
if matches!(event.type_(), EventType::Eos) {
let mut state = self.state.lock().unwrap();
// flush all remaining OBUs
while let Some(packet_data) = self.consider_new_packet(element, &mut state, true) {
if self
.push_new_packet(element, &mut state, packet_data)
.is_err()
{
break;
let mut list = gst::BufferList::new();
{
let mut state = self.state.lock().unwrap();
let list = list.get_mut().unwrap();
while let Some(packet_data) = self.consider_new_packet(element, &mut state, true) {
match self.generate_new_packet(element, &mut state, packet_data) {
Ok(buffer) => list.add(buffer),
Err(_) => break,
}
}
}
if !list.is_empty() {
let _ = element.push_list(list);
}
}
self.parent_sink_event(element, event)