mirror of
https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git
synced 2025-01-09 02:35:29 +00:00
rtpav1pay: Consider the marker flag to output packets immediately at the end of a frame
Otherwise it is necessary to wait for the beginning of the following frame, which unnecessarily increases the latency. Fixes https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/issues/255 Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1086>
This commit is contained in:
parent
af0e6281d2
commit
ba0904630d
2 changed files with 67 additions and 21 deletions
|
@ -114,6 +114,7 @@ impl RTPAv1Pay {
|
|||
&self,
|
||||
state: &mut State,
|
||||
data: &[u8],
|
||||
marker: bool,
|
||||
dts: Option<gst::ClockTime>,
|
||||
pts: Option<gst::ClockTime>,
|
||||
) -> Result<gst::BufferList, gst::FlowError> {
|
||||
|
@ -193,7 +194,7 @@ impl RTPAv1Pay {
|
|||
let mut list = gst::BufferList::new();
|
||||
{
|
||||
let list = list.get_mut().unwrap();
|
||||
while let Some(packet_data) = self.consider_new_packet(state, false) {
|
||||
while let Some(packet_data) = self.consider_new_packet(state, false, marker) {
|
||||
let buffer = self.generate_new_packet(state, packet_data)?;
|
||||
list.add(buffer);
|
||||
}
|
||||
|
@ -208,13 +209,21 @@ impl RTPAv1Pay {
|
|||
///
|
||||
/// If `true` is passed for `force`, packets of any size will be accepted,
|
||||
/// which is used in flushing the last OBUs after receiving an EOS for example.
|
||||
fn consider_new_packet(&self, state: &mut State, force: bool) -> Option<PacketOBUData> {
|
||||
///
|
||||
/// If `true` is passed for `marker` then all queued OBUs are considered to finish this TU.
|
||||
fn consider_new_packet(
|
||||
&self,
|
||||
state: &mut State,
|
||||
force: bool,
|
||||
marker: bool,
|
||||
) -> Option<PacketOBUData> {
|
||||
gst::trace!(
|
||||
CAT,
|
||||
imp: self,
|
||||
"{} new packet, currently storing {} OBUs",
|
||||
"{} new packet, currently storing {} OBUs (marker {})",
|
||||
if force { "forcing" } else { "considering" },
|
||||
state.obus.len()
|
||||
state.obus.len(),
|
||||
marker,
|
||||
);
|
||||
|
||||
let payload_limit = gst_rtp::calc_payload_len(self.obj().mtu(), 0, 0);
|
||||
|
@ -226,7 +235,7 @@ impl RTPAv1Pay {
|
|||
let mut required_ids = None::<(u8, u8)>;
|
||||
|
||||
// figure out how many OBUs we can fit into this packet
|
||||
for obu in &state.obus {
|
||||
for (idx, obu) in state.obus.iter().enumerate() {
|
||||
// for OBUs with extension headers, spatial and temporal IDs must be equal
|
||||
// to all other such OBUs in the packet
|
||||
let matching_obu_ids = |obu: &SizedObu, required_ids: &mut Option<(u8, u8)>| -> bool {
|
||||
|
@ -247,11 +256,20 @@ impl RTPAv1Pay {
|
|||
gst::log!(CAT, imp: self, "ignoring temporal delimiter OBU");
|
||||
|
||||
if packet.obu_count > 0 {
|
||||
if marker {
|
||||
gst::warning!(
|
||||
CAT,
|
||||
imp: self,
|
||||
"Temporal delimited in the middle of a frame"
|
||||
);
|
||||
}
|
||||
|
||||
packet.ends_temporal_unit = true;
|
||||
if packet.obu_count > 3 {
|
||||
packet.payload_size += pending_bytes;
|
||||
packet.omit_last_size_field = false;
|
||||
}
|
||||
|
||||
return Some(packet);
|
||||
}
|
||||
|
||||
|
@ -264,6 +282,7 @@ impl RTPAv1Pay {
|
|||
packet.payload_size += pending_bytes;
|
||||
packet.omit_last_size_field = false;
|
||||
}
|
||||
packet.ends_temporal_unit = marker && idx == state.obus.len() - 1;
|
||||
return Some(packet);
|
||||
}
|
||||
|
||||
|
@ -279,6 +298,7 @@ impl RTPAv1Pay {
|
|||
{
|
||||
packet.obu_count += 1;
|
||||
packet.payload_size += current.partial_size() + pending_bytes;
|
||||
packet.ends_temporal_unit = marker && idx == state.obus.len() - 1;
|
||||
|
||||
return Some(packet);
|
||||
}
|
||||
|
@ -302,6 +322,7 @@ impl RTPAv1Pay {
|
|||
packet.payload_size = payload_limit;
|
||||
packet.omit_last_size_field = leb_size == 0;
|
||||
} else if packet.obu_count > 3 {
|
||||
packet.ends_temporal_unit = marker && idx == state.obus.len() - 1;
|
||||
packet.payload_size += pending_bytes;
|
||||
}
|
||||
|
||||
|
@ -309,11 +330,13 @@ impl RTPAv1Pay {
|
|||
}
|
||||
}
|
||||
|
||||
if force && packet.obu_count > 0 {
|
||||
if (force || marker) && packet.obu_count > 0 {
|
||||
if packet.obu_count > 3 {
|
||||
packet.payload_size += pending_bytes;
|
||||
packet.omit_last_size_field = false;
|
||||
}
|
||||
packet.ends_temporal_unit = true;
|
||||
|
||||
Some(packet)
|
||||
} else {
|
||||
// if we ran out of OBUs with space in the packet to spare, wait a bit longer
|
||||
|
@ -621,7 +644,7 @@ impl RTPBasePayloadImpl for RTPAv1Pay {
|
|||
let dts = buffer.dts();
|
||||
let pts = buffer.pts();
|
||||
|
||||
let buffer = buffer.into_mapped_buffer_readable().map_err(|_| {
|
||||
let map = buffer.map_readable().map_err(|_| {
|
||||
gst::element_imp_error!(
|
||||
self,
|
||||
gst::ResourceError::Read,
|
||||
|
@ -631,7 +654,10 @@ impl RTPBasePayloadImpl for RTPAv1Pay {
|
|||
gst::FlowError::Error
|
||||
})?;
|
||||
|
||||
let list = self.handle_new_obus(&mut state, buffer.as_slice(), dts, pts)?;
|
||||
// Does the buffer finished a full TU?
|
||||
let marker = buffer.flags().contains(gst::BufferFlags::MARKER);
|
||||
let list = self.handle_new_obus(&mut state, map.as_slice(), marker, dts, pts)?;
|
||||
drop(map);
|
||||
drop(state);
|
||||
|
||||
if !list.is_empty() {
|
||||
|
@ -652,7 +678,7 @@ impl RTPBasePayloadImpl for RTPAv1Pay {
|
|||
let mut state = self.state.lock().unwrap();
|
||||
let list = list.get_mut().unwrap();
|
||||
|
||||
while let Some(packet_data) = self.consider_new_packet(&mut state, true) {
|
||||
while let Some(packet_data) = self.consider_new_packet(&mut state, true, true) {
|
||||
match self.generate_new_packet(&mut state, packet_data) {
|
||||
Ok(buffer) => list.add(buffer),
|
||||
Err(_) => break,
|
||||
|
@ -804,15 +830,25 @@ mod tests {
|
|||
(
|
||||
false,
|
||||
State {
|
||||
obus: VecDeque::from(vec![ObuData {
|
||||
info: SizedObu {
|
||||
obu_type: ObuType::Frame,
|
||||
size: 4,
|
||||
..base_obu
|
||||
obus: VecDeque::from(vec![
|
||||
ObuData {
|
||||
info: SizedObu {
|
||||
obu_type: ObuType::TemporalDelimiter,
|
||||
size: 0,
|
||||
..base_obu
|
||||
},
|
||||
..ObuData::default()
|
||||
},
|
||||
bytes: vec![1, 2, 3, 4],
|
||||
..ObuData::default()
|
||||
}]),
|
||||
ObuData {
|
||||
info: SizedObu {
|
||||
obu_type: ObuType::Frame,
|
||||
size: 4,
|
||||
..base_obu
|
||||
},
|
||||
bytes: vec![1, 2, 3, 4],
|
||||
..ObuData::default()
|
||||
},
|
||||
]),
|
||||
..State::default()
|
||||
},
|
||||
),
|
||||
|
@ -843,7 +879,7 @@ mod tests {
|
|||
payload_size: 34,
|
||||
last_obu_fragment_size: None,
|
||||
omit_last_size_field: false,
|
||||
ends_temporal_unit: false,
|
||||
ends_temporal_unit: true,
|
||||
}),
|
||||
State {
|
||||
obus: {
|
||||
|
@ -854,7 +890,17 @@ mod tests {
|
|||
..input_data[1].1
|
||||
},
|
||||
),
|
||||
(None, input_data[2].1.clone()),
|
||||
(
|
||||
None,
|
||||
State {
|
||||
obus: {
|
||||
let mut copy = input_data[2].1.obus.clone();
|
||||
copy.pop_front().unwrap();
|
||||
copy
|
||||
},
|
||||
..input_data[2].1
|
||||
},
|
||||
),
|
||||
];
|
||||
|
||||
let element = <RTPAv1Pay as ObjectSubclass>::Type::new();
|
||||
|
@ -866,7 +912,7 @@ mod tests {
|
|||
*state = input_data[idx].1.clone();
|
||||
|
||||
assert_eq!(
|
||||
pay.consider_new_packet(&mut state, input_data[idx].0),
|
||||
pay.consider_new_packet(&mut state, input_data[idx].0, false),
|
||||
results[idx].0,
|
||||
);
|
||||
assert_eq!(
|
||||
|
|
|
@ -167,7 +167,7 @@ fn test_payloader() {
|
|||
0b0011_0100, 0b0100_1000, 1,
|
||||
]
|
||||
), (
|
||||
false,
|
||||
true, // because of EOS
|
||||
90_000,
|
||||
vec![
|
||||
0b0001_0000,
|
||||
|
|
Loading…
Reference in a new issue