rtprecv: ensure that stopping the rtp src task does not critical

When pad a released, then we were removing the pad from an internal
list. If the pad was not already deactivated, the deactiviation would
attempt to look for the pad in that list and panic if it was not there.

Fix by delaying removal of the pad from the list until after pad
deactivation occurs.

Also includes test.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1618>
This commit is contained in:
Matthew Waters 2024-06-14 17:17:04 +10:00 committed by GStreamer Marge Bot
parent 10a31a397e
commit 39b61195ad
2 changed files with 96 additions and 18 deletions

View file

@ -119,12 +119,14 @@ impl futures::stream::Stream for JitterBufferStream {
if pending_item.is_some() { if pending_item.is_some() {
// but only after sending the previous pending item // but only after sending the previous pending item
next_pending_item = Some(item); next_pending_item = Some(item);
break break;
} }
} }
JitterBufferItem::Packet(ref packet) => { JitterBufferItem::Packet(ref packet) => {
match pending_item { match pending_item {
Some(JitterBufferItem::Event(_) | JitterBufferItem::Query(_, _)) => unreachable!(), Some(
JitterBufferItem::Event(_) | JitterBufferItem::Query(_, _),
) => unreachable!(),
Some(JitterBufferItem::Packet(pending_buffer)) => { Some(JitterBufferItem::Packet(pending_buffer)) => {
let mut list = gst::BufferList::new(); let mut list = gst::BufferList::new();
let list_mut = list.make_mut(); let list_mut = list.make_mut();
@ -1862,6 +1864,7 @@ impl ElementImpl for RtpRecv {
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
let mut removed_pads = vec![]; let mut removed_pads = vec![];
let mut removed_session_ids = vec![]; let mut removed_session_ids = vec![];
let mut removed_srcpads_session_ids = vec![];
if let Some(&id) = state.pads_session_id_map.get(pad) { if let Some(&id) = state.pads_session_id_map.get(pad) {
removed_pads.push(pad.clone()); removed_pads.push(pad.clone());
if let Some(session) = state.mut_session_by_id(id) { if let Some(session) = state.mut_session_by_id(id) {
@ -1869,7 +1872,7 @@ impl ElementImpl for RtpRecv {
session.rtp_recv_sinkpad = None; session.rtp_recv_sinkpad = None;
removed_pads.extend(session.rtp_recv_srcpads.iter().map(|r| r.pad.clone())); removed_pads.extend(session.rtp_recv_srcpads.iter().map(|r| r.pad.clone()));
session.recv_flow_combiner.lock().unwrap().clear(); session.recv_flow_combiner.lock().unwrap().clear();
session.rtp_recv_srcpads.clear(); removed_srcpads_session_ids.push(id);
session.recv_store.clear(); session.recv_store.clear();
} }
@ -1882,6 +1885,10 @@ impl ElementImpl for RtpRecv {
} }
} }
} }
for pad in removed_pads.iter() {
state.pads_session_id_map.remove(pad);
}
drop(state); drop(state);
for pad in removed_pads.iter() { for pad in removed_pads.iter() {
@ -1894,9 +1901,10 @@ impl ElementImpl for RtpRecv {
{ {
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
for id in removed_srcpads_session_ids {
for pad in removed_pads.iter() { if let Some(session) = state.mut_session_by_id(id) {
state.pads_session_id_map.remove(pad); session.rtp_recv_srcpads.clear();
}
} }
for id in removed_session_ids { for id in removed_session_ids {
if let Some(session) = state.mut_session_by_id(id) { if let Some(session) = state.mut_session_by_id(id) {

View file

@ -55,8 +55,11 @@ struct PacketInfo {
} }
impl PacketInfo { impl PacketInfo {
fn generate_buffer(&self) -> gst::Buffer { fn generate_buffer(&self, dts: Option<gst::ClockTime>) -> gst::Buffer {
generate_rtp_buffer(self.seq_no, self.rtp_ts, self.payload_len) let mut buf = generate_rtp_buffer(self.seq_no, self.rtp_ts, self.payload_len);
let buf_mut = buf.make_mut();
buf_mut.set_dts(dts);
buf
} }
} }
@ -91,7 +94,7 @@ where
let mut list = gst::BufferList::new(); let mut list = gst::BufferList::new();
let list_mut = list.make_mut(); let list_mut = list.make_mut();
for packet in packets { for packet in packets {
list_mut.add(packet.generate_buffer()); list_mut.add(packet.generate_buffer(None));
} }
let push_pad = h let push_pad = h
.element() .element()
@ -103,7 +106,7 @@ where
push_pad.push_list(list).unwrap(); push_pad.push_list(list).unwrap();
} else { } else {
for packet in packets { for packet in packets {
h.push(packet.generate_buffer()).unwrap(); h.push(packet.generate_buffer(None)).unwrap();
} }
} }
} }
@ -183,7 +186,7 @@ fn test_send_benchmark() {
rtp_ts: i as u32, rtp_ts: i as u32,
payload_len: 8, payload_len: 8,
} }
.generate_buffer(), .generate_buffer(None),
) )
} }
@ -231,7 +234,7 @@ fn test_send_list_benchmark() {
rtp_ts: i as u32, rtp_ts: i as u32,
payload_len: 8, payload_len: 8,
} }
.generate_buffer(), .generate_buffer(None),
); );
} }
let mut h = send_init(); let mut h = send_init();
@ -330,12 +333,12 @@ where
let mut list = gst::BufferList::new(); let mut list = gst::BufferList::new();
let list_mut = list.make_mut(); let list_mut = list.make_mut();
for packet in packets { for packet in packets {
list_mut.add(packet.generate_buffer()); list_mut.add(packet.generate_buffer(None));
} }
push_pad.push_list(list).unwrap(); push_pad.push_list(list).unwrap();
} else { } else {
for packet in packets { for packet in packets {
push_pad.push(packet.generate_buffer()).unwrap(); push_pad.push(packet.generate_buffer(None)).unwrap();
} }
} }
} }
@ -471,7 +474,8 @@ fn test_receive_flush() {
fn test_receive_benchmark() { fn test_receive_benchmark() {
init(); init();
let clock = gst::SystemClock::obtain(); let clock = gst::SystemClock::obtain();
const N_PACKETS: usize = 1024 * 1024; //const N_PACKETS: usize = 1024 * 1024;
const N_PACKETS: usize = 1024;
let mut packets = Vec::with_capacity(N_PACKETS); let mut packets = Vec::with_capacity(N_PACKETS);
for i in 0..N_PACKETS { for i in 0..N_PACKETS {
packets.push( packets.push(
@ -480,7 +484,7 @@ fn test_receive_benchmark() {
rtp_ts: i as u32, rtp_ts: i as u32,
payload_len: 8, payload_len: 8,
} }
.generate_buffer(), .generate_buffer(None),
) )
} }
@ -554,7 +558,7 @@ fn test_receive_list_benchmark() {
rtp_ts: (p * N_PACKETS + i) as u32, rtp_ts: (p * N_PACKETS + i) as u32,
payload_len: 8, payload_len: 8,
} }
.generate_buffer(), .generate_buffer(None),
); );
} }
lists.push(list); lists.push(list);
@ -574,7 +578,10 @@ fn test_receive_list_benchmark() {
let buffer = inner.pull().unwrap(); let buffer = inner.pull().unwrap();
let mapped = buffer.map_readable().unwrap(); let mapped = buffer.map_readable().unwrap();
let rtp = rtp_types::RtpPacket::parse(&mapped).unwrap(); let rtp = rtp_types::RtpPacket::parse(&mapped).unwrap();
assert_eq!(rtp.sequence_number(), ((p * N_PACKETS + i) % u16::MAX as usize) as u16); assert_eq!(
rtp.sequence_number(),
((p * N_PACKETS + i) % u16::MAX as usize) as u16
);
} }
} }
@ -591,3 +598,66 @@ fn test_receive_list_benchmark() {
pull_time.display() pull_time.display()
); );
} }
#[test]
fn recv_release_sink_pad() {
init();
let id = next_element_counter();
let elem = gst::ElementFactory::make("rtprecv")
.property("rtp-id", id.to_string())
.build()
.unwrap();
elem.set_state(gst::State::Playing).unwrap();
let sinkpad = elem.request_pad_simple("rtp_sink_0").unwrap();
let stream_start = gst::event::StreamStart::new("random");
sinkpad.send_event(stream_start);
let caps = Caps::builder("application/x-rtp")
.field("media", "audio")
.field("payload", TEST_PT as i32)
.field("clock-rate", TEST_CLOCK_RATE as i32)
.field("encoding-name", "custom-test")
.build();
sinkpad.send_event(gst::event::Caps::new(&caps));
let segment = gst::FormattedSegment::<gst::ClockTime>::new();
sinkpad.send_event(gst::event::Segment::new(&segment));
let (sender, recv) = std::sync::mpsc::sync_channel(1);
elem.connect_pad_added({
let sender = sender.clone();
move |_elem, pad| {
let other_pad = gst::Pad::builder(gst::PadDirection::Sink)
.chain_function(|_pad, _parent, _buffer| Ok(gst::FlowSuccess::Ok))
.build();
other_pad.set_active(true).unwrap();
pad.link(&other_pad).unwrap();
sender.send(other_pad).unwrap();
}
});
// push two buffers to get past the rtpsource validation
sinkpad
.chain(
PacketInfo {
seq_no: 30,
rtp_ts: 10,
payload_len: 4,
}
.generate_buffer(Some(gst::ClockTime::from_mseconds(50))),
)
.unwrap();
sinkpad
.chain(
PacketInfo {
seq_no: 31,
rtp_ts: 10,
payload_len: 4,
}
.generate_buffer(Some(gst::ClockTime::from_mseconds(100))),
)
.unwrap();
let _other_pad = recv.recv().unwrap();
elem.release_request_pad(&sinkpad);
elem.set_state(gst::State::Null).unwrap();
}