rtprecv: optimize src Pad scheduling

This commit implements optimizations to reduce the number of threads and
context switches necessary to handle items on src Pads.

1. A single task for all src pads in a session

Instead of starting one pad task (thread) per src pad in a session, we can
spawn a single async task that dequeues all ready items out of the jitter
buffers and pushes them to their respective src pads.

2. Handle downstream-travelling items immediately when possible

It is possible to immediately handle downstream-travelling items without queuing
them in the jitter buffer:

* for an RTP packet, if the packet is the next packet that we were expecting
  and the src Pad's jitter buffer is empty.
* for a downstream query or event if the src Pad's jitter buffer is empty.

Otherwise, the item needs to be enqueued in the jitter buffer.

3. Making sure the above strategies play well together

If a jitter buffer is empty because the src pad task just dequeued items,
the sink pad might push incoming items before the src pad task had time to push
the dequeued items, which would break ordering.

In order to avoid this situation, each src pad uses a semaphore which is held
from the moment when the sink pad or the src pad makes decisions regarding the
jitter buffer and hold it as long as necessary so as to guarantee ordering.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/2346>
This commit is contained in:
François Laignel 2025-07-15 16:15:24 +02:00 committed by GStreamer Marge Bot
parent 41ea936e51
commit 1ce1a41ca7
4 changed files with 892 additions and 211 deletions

1
Cargo.lock generated
View file

@ -3282,6 +3282,7 @@ dependencies = [
"thiserror 2.0.12",
"time",
"tokio",
"tokio-util",
]
[[package]]

View file

@ -33,6 +33,7 @@ thiserror = "2"
time = { version = "0.3", default-features = false, features = ["std"] }
# TODO: experiment with other async executors (mio, async-std, etc)
tokio = { version = "1", default-features = false, features = ["rt-multi-thread", "time", "sync"] }
tokio-util = "0.7.15"
[dev-dependencies]
gst-check = { workspace = true, features = ["v1_20"] }

View file

@ -40,6 +40,7 @@ pub struct JitterBuffer {
last_input_ts: Option<u64>,
stats: Stats,
flushing: bool,
can_forward_packets_when_empty: bool,
}
#[derive(Debug, PartialEq, Eq)]
@ -53,6 +54,7 @@ pub enum PollResult {
#[derive(Debug, PartialEq, Eq)]
pub enum QueueResult {
Forward(usize),
Queued(usize),
Late,
Duplicate,
@ -109,10 +111,17 @@ impl JitterBuffer {
num_pushed: 0,
},
flushing: true,
can_forward_packets_when_empty: false,
}
}
pub fn queue_serialized_item(&mut self) -> QueueResult {
if self.items.is_empty() {
let id = self.packet_counter;
self.packet_counter += 1;
return QueueResult::Forward(id);
}
let id = self.packet_counter;
self.packet_counter += 1;
let item = Item {
@ -130,6 +139,7 @@ impl JitterBuffer {
trace!("Flush changed from {} to {flushing}", self.flushing);
self.flushing = flushing;
self.last_output_seqnum = None;
self.can_forward_packets_when_empty = false;
}
pub fn queue_packet(&mut self, rtp: &RtpPacket, mut pts: u64, now: Instant) -> QueueResult {
@ -182,6 +192,25 @@ impl JitterBuffer {
}
}
if self.items.is_empty()
// can forward after the first packet's deadline has been reached
&& self.can_forward_packets_when_empty
&& self
.last_output_seqnum
.is_some_and(|last_output_seqnum| seqnum == last_output_seqnum.wrapping_add(1))
{
// No packets enqueued & seqnum is in order => can forward it immediately
self.last_output_seqnum = Some(seqnum);
let id = self.packet_counter;
self.packet_counter += 1;
self.stats.num_pushed += 1;
return QueueResult::Forward(id);
}
// TODO: if the segnum base is known (i.e. first seqnum in RTSP)
// we could also Forward the initial Packet.
let id = self.packet_counter;
self.packet_counter += 1;
let item = Item {
@ -258,6 +287,7 @@ impl JitterBuffer {
let packet = self.items.pop_first().unwrap();
self.stats.num_pushed += 1;
self.can_forward_packets_when_empty = true;
PollResult::Forward {
id: packet.id,
@ -584,33 +614,31 @@ mod tests {
let rtp_data = generate_rtp_packet(0x12345678, 0, 0, 4);
let packet = RtpPacket::parse(&rtp_data).unwrap();
let QueueResult::Queued(id_first_serialized_item) = jb.queue_serialized_item() else {
let QueueResult::Forward(id_first_serialized_item) = jb.queue_serialized_item() else {
unreachable!()
};
assert_eq!(id_first_serialized_item, 0);
// query has been forwarded immediately
assert_eq!(jb.poll(now), PollResult::Empty);
let QueueResult::Queued(id_first_packet) = jb.queue_packet(&packet, 0, now) else {
unreachable!()
};
assert_eq!(id_first_packet, id_first_serialized_item + 1);
let QueueResult::Queued(id_second_serialized_item) = jb.queue_serialized_item() else {
unreachable!()
};
assert_eq!(id_second_serialized_item, id_first_packet + 1);
// query should be forwarded immediately
assert_eq!(
jb.poll(now),
PollResult::Forward {
id: id_first_serialized_item,
discont: false
}
);
let QueueResult::Queued(id_first) = jb.queue_packet(&packet, 0, now) else {
unreachable!()
};
assert_eq!(
jb.poll(now),
PollResult::Forward {
id: id_first,
id: id_first_packet,
discont: true
}
);
let QueueResult::Queued(id_second_serialized_item) = jb.queue_serialized_item() else {
unreachable!()
};
assert_eq!(
jb.poll(now),
PollResult::Forward {
@ -621,16 +649,11 @@ mod tests {
let rtp_data = generate_rtp_packet(0x12345678, 1, 0, 4);
let packet = RtpPacket::parse(&rtp_data).unwrap();
let QueueResult::Queued(id_second) = jb.queue_packet(&packet, 0, now) else {
let QueueResult::Forward(id_second_packet) = jb.queue_packet(&packet, 0, now) else {
unreachable!()
};
assert_eq!(
jb.poll(now),
PollResult::Forward {
id: id_second,
discont: false
}
);
assert_eq!(id_second_packet, id_second_serialized_item + 1);
assert_eq!(jb.poll(now), PollResult::Empty);
}
#[test]
@ -643,7 +666,7 @@ mod tests {
let rtp_data = generate_rtp_packet(0x12345678, 0, 0, 4);
let packet = RtpPacket::parse(&rtp_data).unwrap();
let QueueResult::Queued(id_first_serialized_item) = jb.queue_serialized_item() else {
let QueueResult::Forward(_id_first_serialized_item) = jb.queue_serialized_item() else {
unreachable!()
};
@ -651,13 +674,17 @@ mod tests {
unreachable!()
};
let QueueResult::Queued(id_second_serialized_item) = jb.queue_serialized_item() else {
unreachable!()
};
// Everything after this should eventually return flushing, poll() will instruct to drop
// everything stored and then return flushing indefinitely.
jb.set_flushing(true);
assert_eq!(jb.queue_packet(&packet, 0, now), QueueResult::Flushing);
assert_eq!(jb.poll(now), PollResult::Drop(id_first_serialized_item));
assert_eq!(jb.poll(now), PollResult::Drop(id_first));
assert_eq!(jb.poll(now), PollResult::Drop(id_second_serialized_item));
assert_eq!(jb.poll(now), PollResult::Flushing);
assert_eq!(jb.poll(now), PollResult::Flushing);

File diff suppressed because it is too large Load diff