mirror of
https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git
synced 2024-06-13 11:49:21 +00:00
rtpav1pay: Use a VecDeque
instead of a Vec
for the queued OBUs
And use a `Vec` plus offset for consuming partial byte buffers. Removing the first element from a `Vec` repeatedly is not very cheap. Also simplify calculation of the current packet by removing a mostly unused type and keeping track of the calculations always locally instead of sometimes storing it in the element state.
This commit is contained in:
parent
e0437ae8f6
commit
36861edf9a
|
@ -10,6 +10,7 @@
|
|||
use gst::{glib, subclass::prelude::*};
|
||||
use gst_rtp::{prelude::*, subclass::prelude::*};
|
||||
use std::{
|
||||
collections::VecDeque,
|
||||
io::{Cursor, Read, Seek, SeekFrom, Write},
|
||||
sync::Mutex,
|
||||
};
|
||||
|
@ -32,7 +33,7 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
|
|||
// TODO: properly handle `max_ptime` and `min_ptime`
|
||||
|
||||
/// Information about the OBUs intended to be grouped into one packet
|
||||
#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
|
||||
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
|
||||
struct PacketOBUData {
|
||||
obu_count: usize,
|
||||
payload_size: u32,
|
||||
|
@ -41,21 +42,23 @@ struct PacketOBUData {
|
|||
ends_temporal_unit: bool,
|
||||
}
|
||||
|
||||
/// Temporary information held between invocations of `consider_new_packet()`
|
||||
#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
|
||||
struct TempPacketData {
|
||||
payload_limit: u32,
|
||||
required_ids: Option<(u8, u8)>,
|
||||
/// bytes used for an OBUs size field will only be added to the total
|
||||
/// once its known for sure it will be placed in the packet
|
||||
pending_bytes: u32,
|
||||
packet: PacketOBUData,
|
||||
impl Default for PacketOBUData {
|
||||
fn default() -> Self {
|
||||
PacketOBUData {
|
||||
payload_size: 1, // 1 byte is used for the aggregation header
|
||||
omit_last_size_field: true,
|
||||
obu_count: 0,
|
||||
last_obu_fragment_size: None,
|
||||
ends_temporal_unit: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Default, PartialEq, Eq)]
|
||||
struct ObuData {
|
||||
info: SizedObu,
|
||||
bytes: Vec<u8>,
|
||||
offset: usize,
|
||||
dts: Option<gst::ClockTime>,
|
||||
pts: Option<gst::ClockTime>,
|
||||
}
|
||||
|
@ -64,8 +67,7 @@ struct ObuData {
|
|||
struct State {
|
||||
/// Holds header information and raw bytes for all received OBUs,
|
||||
/// as well as DTS and PTS
|
||||
//obus: Vec<(SizedObu, Vec<u8>, Option<ClockTime>, Option<ClockTime>)>,
|
||||
obus: Vec<ObuData>,
|
||||
obus: VecDeque<ObuData>,
|
||||
|
||||
/// Indicates that the first element in the Buffer is an OBU fragment,
|
||||
/// left over from the previous RTP packet
|
||||
|
@ -74,8 +76,6 @@ struct State {
|
|||
/// Indicates the next constructed packet will be the first in its sequence
|
||||
/// (Corresponds to `N` field in the aggregation header)
|
||||
first_packet_in_seq: bool,
|
||||
|
||||
temp_packet_data: Option<TempPacketData>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
|
@ -86,10 +86,9 @@ pub struct RTPAv1Pay {
|
|||
impl Default for State {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
obus: Vec::new(),
|
||||
obus: VecDeque::new(),
|
||||
open_obu_fragment: false,
|
||||
first_packet_in_seq: true,
|
||||
temp_packet_data: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -140,9 +139,10 @@ impl RTPAv1Pay {
|
|||
);
|
||||
return Err(gst::FlowError::Error);
|
||||
}
|
||||
state.obus.push(ObuData {
|
||||
state.obus.push_back(ObuData {
|
||||
info: obu,
|
||||
bytes: Vec::new(),
|
||||
offset: 0,
|
||||
dts,
|
||||
pts,
|
||||
});
|
||||
|
@ -171,9 +171,10 @@ impl RTPAv1Pay {
|
|||
.read_exact(&mut bytes[(obu.header_len as usize)..bytes_total])
|
||||
.map_err(err_flow!(self, buf_read))?;
|
||||
|
||||
state.obus.push(ObuData {
|
||||
state.obus.push_back(ObuData {
|
||||
info: obu,
|
||||
bytes,
|
||||
offset: 0,
|
||||
dts,
|
||||
pts,
|
||||
});
|
||||
|
@ -208,75 +209,68 @@ impl RTPAv1Pay {
|
|||
state.obus.len()
|
||||
);
|
||||
|
||||
let mut data = state.temp_packet_data.take().unwrap_or_else(|| {
|
||||
TempPacketData {
|
||||
payload_limit: gst_rtp::calc_payload_len(self.instance().mtu(), 0, 0),
|
||||
packet: PacketOBUData {
|
||||
payload_size: 1, // 1 byte is used for the aggregation header
|
||||
omit_last_size_field: true,
|
||||
..PacketOBUData::default()
|
||||
},
|
||||
..TempPacketData::default()
|
||||
}
|
||||
});
|
||||
let mut packet = data.packet;
|
||||
let payload_limit = gst_rtp::calc_payload_len(self.instance().mtu(), 0, 0);
|
||||
|
||||
// Create information about the packet that can be created now while iterating over the
|
||||
// OBUs and return this if a full packet can indeed be created now.
|
||||
let mut packet = PacketOBUData::default();
|
||||
let mut pending_bytes = 0;
|
||||
let mut required_ids = None::<(u8, u8)>;
|
||||
|
||||
// figure out how many OBUs we can fit into this packet
|
||||
while packet.obu_count < state.obus.len() {
|
||||
for obu in &state.obus {
|
||||
// for OBUs with extension headers, spatial and temporal IDs must be equal
|
||||
// to all other such OBUs in the packet
|
||||
let matching_obu_ids = |obu: &SizedObu, data: &mut TempPacketData| -> bool {
|
||||
if let Some((sid, tid)) = data.required_ids {
|
||||
let matching_obu_ids = |obu: &SizedObu, required_ids: &mut Option<(u8, u8)>| -> bool {
|
||||
if let Some((sid, tid)) = *required_ids {
|
||||
sid == obu.spatial_id && tid == obu.temporal_id
|
||||
} else {
|
||||
data.required_ids = Some((obu.spatial_id, obu.temporal_id));
|
||||
*required_ids = Some((obu.spatial_id, obu.temporal_id));
|
||||
true
|
||||
}
|
||||
};
|
||||
|
||||
let current = state.obus[packet.obu_count].info;
|
||||
let current = &obu.info;
|
||||
|
||||
// should this packet be finished here?
|
||||
if current.obu_type == ObuType::TemporalDelimiter {
|
||||
// remove the temporal delimiter, it is not supposed to be transmitted
|
||||
// ignore the temporal delimiter, it is not supposed to be transmitted,
|
||||
// it will be skipped later when building the packet
|
||||
gst::log!(CAT, imp: self, "ignoring temporal delimiter OBU");
|
||||
state.obus.remove(packet.obu_count);
|
||||
|
||||
if packet.obu_count > 0 {
|
||||
packet.ends_temporal_unit = true;
|
||||
if packet.obu_count > 3 {
|
||||
packet.payload_size += data.pending_bytes;
|
||||
packet.payload_size += pending_bytes;
|
||||
packet.omit_last_size_field = false;
|
||||
}
|
||||
return Some(packet);
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
} else if packet.payload_size >= data.payload_limit
|
||||
|
||||
continue;
|
||||
} else if packet.payload_size >= payload_limit
|
||||
|| (packet.obu_count > 0 && current.obu_type == ObuType::SequenceHeader)
|
||||
|| !matching_obu_ids(&state.obus[packet.obu_count].info, &mut data)
|
||||
|| !matching_obu_ids(current, &mut required_ids)
|
||||
{
|
||||
if packet.obu_count > 3 {
|
||||
packet.payload_size += data.pending_bytes;
|
||||
packet.payload_size += pending_bytes;
|
||||
packet.omit_last_size_field = false;
|
||||
}
|
||||
return Some(packet);
|
||||
}
|
||||
|
||||
// would the full OBU fit?
|
||||
if packet.payload_size + data.pending_bytes + current.full_size() <= data.payload_limit
|
||||
{
|
||||
if packet.payload_size + pending_bytes + current.full_size() <= payload_limit {
|
||||
packet.obu_count += 1;
|
||||
packet.payload_size += current.partial_size() + data.pending_bytes;
|
||||
data.pending_bytes = current.leb_size;
|
||||
packet.payload_size += current.partial_size() + pending_bytes;
|
||||
pending_bytes = current.leb_size;
|
||||
}
|
||||
// would it fit without the size field?
|
||||
else if packet.obu_count < 3
|
||||
&& packet.payload_size + data.pending_bytes + current.partial_size()
|
||||
<= data.payload_limit
|
||||
&& packet.payload_size + pending_bytes + current.partial_size() <= payload_limit
|
||||
{
|
||||
packet.obu_count += 1;
|
||||
packet.payload_size += current.partial_size() + data.pending_bytes;
|
||||
packet.payload_size += current.partial_size() + pending_bytes;
|
||||
|
||||
return Some(packet);
|
||||
}
|
||||
|
@ -287,21 +281,20 @@ impl RTPAv1Pay {
|
|||
} else {
|
||||
// assume the biggest possible OBU fragment,
|
||||
// so if anything the size field will be smaller than expected
|
||||
leb128_size(data.payload_limit - packet.payload_size) as u32
|
||||
leb128_size(payload_limit - packet.payload_size) as u32
|
||||
};
|
||||
|
||||
// is there even enough space to bother?
|
||||
if packet.payload_size + data.pending_bytes + leb_size + current.header_len
|
||||
< data.payload_limit
|
||||
if packet.payload_size + pending_bytes + leb_size + current.header_len
|
||||
< payload_limit
|
||||
{
|
||||
packet.obu_count += 1;
|
||||
packet.last_obu_fragment_size = Some(
|
||||
data.payload_limit - packet.payload_size - data.pending_bytes - leb_size,
|
||||
);
|
||||
packet.payload_size = data.payload_limit;
|
||||
packet.last_obu_fragment_size =
|
||||
Some(payload_limit - packet.payload_size - pending_bytes - leb_size);
|
||||
packet.payload_size = payload_limit;
|
||||
packet.omit_last_size_field = leb_size == 0;
|
||||
} else if packet.obu_count > 3 {
|
||||
packet.payload_size += data.pending_bytes;
|
||||
packet.payload_size += pending_bytes;
|
||||
}
|
||||
|
||||
return Some(packet);
|
||||
|
@ -310,14 +303,12 @@ impl RTPAv1Pay {
|
|||
|
||||
if force && packet.obu_count > 0 {
|
||||
if packet.obu_count > 3 {
|
||||
packet.payload_size += data.pending_bytes;
|
||||
packet.payload_size += pending_bytes;
|
||||
packet.omit_last_size_field = false;
|
||||
}
|
||||
Some(packet)
|
||||
} else {
|
||||
// if we ran out of OBUs with space in the packet to spare, wait a bit longer
|
||||
data.packet = packet;
|
||||
state.temp_packet_data = Some(data);
|
||||
None
|
||||
}
|
||||
}
|
||||
|
@ -350,11 +341,12 @@ impl RTPAv1Pay {
|
|||
|
||||
{
|
||||
// this block enforces that outbuf_mut is dropped before pushing outbuf
|
||||
let first_obu = state.obus.front().unwrap();
|
||||
let outbuf_mut = outbuf
|
||||
.get_mut()
|
||||
.expect("Failed to get mutable reference to outbuf");
|
||||
outbuf_mut.set_dts(state.obus[0].dts);
|
||||
outbuf_mut.set_pts(state.obus[0].pts);
|
||||
outbuf_mut.set_dts(first_obu.dts);
|
||||
outbuf_mut.set_pts(first_obu.pts);
|
||||
|
||||
let mut rtp = gst_rtp::RTPBuffer::from_buffer_writable(outbuf_mut)
|
||||
.expect("Failed to create RTPBuffer");
|
||||
|
@ -389,7 +381,13 @@ impl RTPAv1Pay {
|
|||
|
||||
// append OBUs to the buffer
|
||||
for _ in 1..packet.obu_count {
|
||||
let obu = &state.obus[0];
|
||||
let obu = loop {
|
||||
let obu = state.obus.pop_front().unwrap();
|
||||
// Drop temporal delimiter from here
|
||||
if obu.info.obu_type != ObuType::TemporalDelimiter {
|
||||
break obu;
|
||||
}
|
||||
};
|
||||
|
||||
write_leb128(
|
||||
&mut BitWriter::endian(&mut writer, ENDIANNESS),
|
||||
|
@ -397,21 +395,28 @@ impl RTPAv1Pay {
|
|||
)
|
||||
.map_err(err_flow!(self, leb_write))?;
|
||||
writer
|
||||
.write(&obu.bytes)
|
||||
.write(&obu.bytes[obu.offset..])
|
||||
.map_err(err_flow!(self, obu_write))?;
|
||||
|
||||
state.obus.remove(0);
|
||||
}
|
||||
state.open_obu_fragment = false;
|
||||
|
||||
{
|
||||
let last_obu = loop {
|
||||
let obu = state.obus.front_mut().unwrap();
|
||||
// Drop temporal delimiter from here
|
||||
if obu.info.obu_type != ObuType::TemporalDelimiter {
|
||||
break obu;
|
||||
}
|
||||
let _ = state.obus.pop_front().unwrap();
|
||||
};
|
||||
|
||||
// do the last OBU separately
|
||||
// in this instance `obu_size` includes the header length
|
||||
let obu_size = if let Some(size) = packet.last_obu_fragment_size {
|
||||
state.open_obu_fragment = true;
|
||||
size
|
||||
} else {
|
||||
state.obus[0].bytes.len() as u32
|
||||
last_obu.bytes.len() as u32 - last_obu.offset as usize as u32
|
||||
};
|
||||
|
||||
if !packet.omit_last_size_field {
|
||||
|
@ -420,33 +425,30 @@ impl RTPAv1Pay {
|
|||
}
|
||||
|
||||
// if this OBU is not a fragment, handle it as usual
|
||||
if packet.last_obu_fragment_size == None {
|
||||
if packet.last_obu_fragment_size.is_none() {
|
||||
writer
|
||||
.write(&state.obus[0].bytes)
|
||||
.write(&last_obu.bytes[last_obu.offset..])
|
||||
.map_err(err_flow!(self, obu_write))?;
|
||||
state.obus.remove(0);
|
||||
let _ = state.obus.pop_front().unwrap();
|
||||
}
|
||||
// otherwise write only a slice, and update the element
|
||||
// to only contain the unwritten bytes
|
||||
else {
|
||||
writer
|
||||
.write(&state.obus[0].bytes[0..obu_size as usize])
|
||||
.write(
|
||||
&last_obu.bytes[last_obu.offset..last_obu.offset + obu_size as usize],
|
||||
)
|
||||
.map_err(err_flow!(self, obu_write))?;
|
||||
|
||||
let new_size = state.obus[0].bytes.len() as u32 - obu_size;
|
||||
state.obus[0] = ObuData {
|
||||
info: SizedObu {
|
||||
size: new_size,
|
||||
header_len: 0,
|
||||
leb_size: leb128_size(new_size) as u32,
|
||||
is_fragment: true,
|
||||
..state.obus[0].info
|
||||
},
|
||||
bytes: Vec::from(
|
||||
&state.obus[0].bytes[obu_size as usize..state.obus[0].bytes.len()],
|
||||
),
|
||||
..state.obus[0]
|
||||
let new_size = last_obu.bytes.len() as u32 - last_obu.offset as u32 - obu_size;
|
||||
last_obu.info = SizedObu {
|
||||
size: new_size,
|
||||
header_len: 0,
|
||||
leb_size: leb128_size(new_size) as u32,
|
||||
is_fragment: true,
|
||||
..last_obu.info
|
||||
};
|
||||
last_obu.offset += obu_size as usize;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -643,7 +645,7 @@ mod tests {
|
|||
false, // force argument
|
||||
State {
|
||||
// payloader state
|
||||
obus: vec![
|
||||
obus: VecDeque::from(vec![
|
||||
ObuData {
|
||||
info: SizedObu {
|
||||
obu_type: ObuType::Padding,
|
||||
|
@ -689,14 +691,14 @@ mod tests {
|
|||
bytes: vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
|
||||
..ObuData::default()
|
||||
},
|
||||
],
|
||||
]),
|
||||
..State::default()
|
||||
},
|
||||
),
|
||||
(
|
||||
true,
|
||||
State {
|
||||
obus: vec![
|
||||
obus: VecDeque::from(vec![
|
||||
ObuData {
|
||||
info: SizedObu {
|
||||
obu_type: ObuType::TemporalDelimiter,
|
||||
|
@ -741,14 +743,14 @@ mod tests {
|
|||
bytes: vec![1, 2, 3],
|
||||
..ObuData::default()
|
||||
},
|
||||
],
|
||||
]),
|
||||
..State::default()
|
||||
},
|
||||
),
|
||||
(
|
||||
false,
|
||||
State {
|
||||
obus: vec![ObuData {
|
||||
obus: VecDeque::from(vec![ObuData {
|
||||
info: SizedObu {
|
||||
obu_type: ObuType::Frame,
|
||||
size: 4,
|
||||
|
@ -756,7 +758,7 @@ mod tests {
|
|||
},
|
||||
bytes: vec![1, 2, 3, 4],
|
||||
..ObuData::default()
|
||||
}],
|
||||
}]),
|
||||
..State::default()
|
||||
},
|
||||
),
|
||||
|
@ -772,12 +774,12 @@ mod tests {
|
|||
ends_temporal_unit: true,
|
||||
}),
|
||||
State {
|
||||
obus: vec![
|
||||
obus: VecDeque::from(vec![
|
||||
input_data[0].1.obus[0].clone(),
|
||||
input_data[0].1.obus[1].clone(),
|
||||
input_data[0].1.obus[2].clone(),
|
||||
input_data[0].1.obus[4].clone(),
|
||||
],
|
||||
]),
|
||||
..input_data[0].1
|
||||
},
|
||||
),
|
||||
|
@ -790,7 +792,11 @@ mod tests {
|
|||
ends_temporal_unit: false,
|
||||
}),
|
||||
State {
|
||||
obus: input_data[1].1.obus[1..].to_owned(),
|
||||
obus: {
|
||||
let mut copy = input_data[1].1.obus.clone();
|
||||
copy.pop_front().unwrap();
|
||||
copy
|
||||
},
|
||||
..input_data[1].1
|
||||
},
|
||||
),
|
||||
|
@ -809,7 +815,15 @@ mod tests {
|
|||
pay.consider_new_packet(&mut state, input_data[idx].0),
|
||||
results[idx].0,
|
||||
);
|
||||
assert_eq!(state.obus, results[idx].1.obus);
|
||||
assert_eq!(
|
||||
state
|
||||
.obus
|
||||
.iter()
|
||||
.filter(|o| o.info.obu_type != ObuType::TemporalDelimiter)
|
||||
.cloned()
|
||||
.collect::<Vec<_>>(),
|
||||
results[idx].1.obus.iter().cloned().collect::<Vec<_>>()
|
||||
);
|
||||
assert_eq!(state.open_obu_fragment, results[idx].1.open_obu_fragment);
|
||||
assert_eq!(
|
||||
state.first_packet_in_seq,
|
||||
|
|
Loading…
Reference in a new issue