From 39b61195ad73ef7dea3f4922ea6c133a1553e9da Mon Sep 17 00:00:00 2001 From: Matthew Waters Date: Fri, 14 Jun 2024 17:17:04 +1000 Subject: [PATCH] rtprecv: ensure that stopping the rtp src task does not critical When pad a released, then we were removing the pad from an internal list. If the pad was not already deactivated, the deactiviation would attempt to look for the pad in that list and panic if it was not there. Fix by delaying removal of the pad from the list until after pad deactivation occurs. Also includes test. Part-of: --- net/rtp/src/rtpbin2/rtprecv.rs | 20 +++++--- net/rtp/tests/rtpbin2.rs | 94 +++++++++++++++++++++++++++++----- 2 files changed, 96 insertions(+), 18 deletions(-) diff --git a/net/rtp/src/rtpbin2/rtprecv.rs b/net/rtp/src/rtpbin2/rtprecv.rs index 2642039f..75fd1f03 100644 --- a/net/rtp/src/rtpbin2/rtprecv.rs +++ b/net/rtp/src/rtpbin2/rtprecv.rs @@ -119,12 +119,14 @@ impl futures::stream::Stream for JitterBufferStream { if pending_item.is_some() { // but only after sending the previous pending item next_pending_item = Some(item); - break + break; } } JitterBufferItem::Packet(ref packet) => { match pending_item { - Some(JitterBufferItem::Event(_) | JitterBufferItem::Query(_, _)) => unreachable!(), + Some( + JitterBufferItem::Event(_) | JitterBufferItem::Query(_, _), + ) => unreachable!(), Some(JitterBufferItem::Packet(pending_buffer)) => { let mut list = gst::BufferList::new(); let list_mut = list.make_mut(); @@ -1862,6 +1864,7 @@ impl ElementImpl for RtpRecv { let mut state = self.state.lock().unwrap(); let mut removed_pads = vec![]; let mut removed_session_ids = vec![]; + let mut removed_srcpads_session_ids = vec![]; if let Some(&id) = state.pads_session_id_map.get(pad) { removed_pads.push(pad.clone()); if let Some(session) = state.mut_session_by_id(id) { @@ -1869,7 +1872,7 @@ impl ElementImpl for RtpRecv { session.rtp_recv_sinkpad = None; removed_pads.extend(session.rtp_recv_srcpads.iter().map(|r| r.pad.clone())); session.recv_flow_combiner.lock().unwrap().clear(); - session.rtp_recv_srcpads.clear(); + removed_srcpads_session_ids.push(id); session.recv_store.clear(); } @@ -1882,6 +1885,10 @@ impl ElementImpl for RtpRecv { } } } + + for pad in removed_pads.iter() { + state.pads_session_id_map.remove(pad); + } drop(state); for pad in removed_pads.iter() { @@ -1894,9 +1901,10 @@ impl ElementImpl for RtpRecv { { let mut state = self.state.lock().unwrap(); - - for pad in removed_pads.iter() { - state.pads_session_id_map.remove(pad); + for id in removed_srcpads_session_ids { + if let Some(session) = state.mut_session_by_id(id) { + session.rtp_recv_srcpads.clear(); + } } for id in removed_session_ids { if let Some(session) = state.mut_session_by_id(id) { diff --git a/net/rtp/tests/rtpbin2.rs b/net/rtp/tests/rtpbin2.rs index bf4cc098..a056ffa5 100644 --- a/net/rtp/tests/rtpbin2.rs +++ b/net/rtp/tests/rtpbin2.rs @@ -55,8 +55,11 @@ struct PacketInfo { } impl PacketInfo { - fn generate_buffer(&self) -> gst::Buffer { - generate_rtp_buffer(self.seq_no, self.rtp_ts, self.payload_len) + fn generate_buffer(&self, dts: Option) -> gst::Buffer { + let mut buf = generate_rtp_buffer(self.seq_no, self.rtp_ts, self.payload_len); + let buf_mut = buf.make_mut(); + buf_mut.set_dts(dts); + buf } } @@ -91,7 +94,7 @@ where let mut list = gst::BufferList::new(); let list_mut = list.make_mut(); for packet in packets { - list_mut.add(packet.generate_buffer()); + list_mut.add(packet.generate_buffer(None)); } let push_pad = h .element() @@ -103,7 +106,7 @@ where push_pad.push_list(list).unwrap(); } else { for packet in packets { - h.push(packet.generate_buffer()).unwrap(); + h.push(packet.generate_buffer(None)).unwrap(); } } } @@ -183,7 +186,7 @@ fn test_send_benchmark() { rtp_ts: i as u32, payload_len: 8, } - .generate_buffer(), + .generate_buffer(None), ) } @@ -231,7 +234,7 @@ fn test_send_list_benchmark() { rtp_ts: i as u32, payload_len: 8, } - .generate_buffer(), + .generate_buffer(None), ); } let mut h = send_init(); @@ -330,12 +333,12 @@ where let mut list = gst::BufferList::new(); let list_mut = list.make_mut(); for packet in packets { - list_mut.add(packet.generate_buffer()); + list_mut.add(packet.generate_buffer(None)); } push_pad.push_list(list).unwrap(); } else { for packet in packets { - push_pad.push(packet.generate_buffer()).unwrap(); + push_pad.push(packet.generate_buffer(None)).unwrap(); } } } @@ -471,7 +474,8 @@ fn test_receive_flush() { fn test_receive_benchmark() { init(); let clock = gst::SystemClock::obtain(); - const N_PACKETS: usize = 1024 * 1024; + //const N_PACKETS: usize = 1024 * 1024; + const N_PACKETS: usize = 1024; let mut packets = Vec::with_capacity(N_PACKETS); for i in 0..N_PACKETS { packets.push( @@ -480,7 +484,7 @@ fn test_receive_benchmark() { rtp_ts: i as u32, payload_len: 8, } - .generate_buffer(), + .generate_buffer(None), ) } @@ -554,7 +558,7 @@ fn test_receive_list_benchmark() { rtp_ts: (p * N_PACKETS + i) as u32, payload_len: 8, } - .generate_buffer(), + .generate_buffer(None), ); } lists.push(list); @@ -574,7 +578,10 @@ fn test_receive_list_benchmark() { let buffer = inner.pull().unwrap(); let mapped = buffer.map_readable().unwrap(); let rtp = rtp_types::RtpPacket::parse(&mapped).unwrap(); - assert_eq!(rtp.sequence_number(), ((p * N_PACKETS + i) % u16::MAX as usize) as u16); + assert_eq!( + rtp.sequence_number(), + ((p * N_PACKETS + i) % u16::MAX as usize) as u16 + ); } } @@ -591,3 +598,66 @@ fn test_receive_list_benchmark() { pull_time.display() ); } + +#[test] +fn recv_release_sink_pad() { + init(); + + let id = next_element_counter(); + + let elem = gst::ElementFactory::make("rtprecv") + .property("rtp-id", id.to_string()) + .build() + .unwrap(); + elem.set_state(gst::State::Playing).unwrap(); + let sinkpad = elem.request_pad_simple("rtp_sink_0").unwrap(); + let stream_start = gst::event::StreamStart::new("random"); + sinkpad.send_event(stream_start); + let caps = Caps::builder("application/x-rtp") + .field("media", "audio") + .field("payload", TEST_PT as i32) + .field("clock-rate", TEST_CLOCK_RATE as i32) + .field("encoding-name", "custom-test") + .build(); + sinkpad.send_event(gst::event::Caps::new(&caps)); + let segment = gst::FormattedSegment::::new(); + sinkpad.send_event(gst::event::Segment::new(&segment)); + + let (sender, recv) = std::sync::mpsc::sync_channel(1); + elem.connect_pad_added({ + let sender = sender.clone(); + move |_elem, pad| { + let other_pad = gst::Pad::builder(gst::PadDirection::Sink) + .chain_function(|_pad, _parent, _buffer| Ok(gst::FlowSuccess::Ok)) + .build(); + other_pad.set_active(true).unwrap(); + pad.link(&other_pad).unwrap(); + sender.send(other_pad).unwrap(); + } + }); + // push two buffers to get past the rtpsource validation + sinkpad + .chain( + PacketInfo { + seq_no: 30, + rtp_ts: 10, + payload_len: 4, + } + .generate_buffer(Some(gst::ClockTime::from_mseconds(50))), + ) + .unwrap(); + sinkpad + .chain( + PacketInfo { + seq_no: 31, + rtp_ts: 10, + payload_len: 4, + } + .generate_buffer(Some(gst::ClockTime::from_mseconds(100))), + ) + .unwrap(); + let _other_pad = recv.recv().unwrap(); + + elem.release_request_pad(&sinkpad); + elem.set_state(gst::State::Null).unwrap(); +}