diff --git a/net/rtp/src/rtpbin2/rtprecv.rs b/net/rtp/src/rtpbin2/rtprecv.rs index 2642039f..75fd1f03 100644 --- a/net/rtp/src/rtpbin2/rtprecv.rs +++ b/net/rtp/src/rtpbin2/rtprecv.rs @@ -119,12 +119,14 @@ impl futures::stream::Stream for JitterBufferStream { if pending_item.is_some() { // but only after sending the previous pending item next_pending_item = Some(item); - break + break; } } JitterBufferItem::Packet(ref packet) => { match pending_item { - Some(JitterBufferItem::Event(_) | JitterBufferItem::Query(_, _)) => unreachable!(), + Some( + JitterBufferItem::Event(_) | JitterBufferItem::Query(_, _), + ) => unreachable!(), Some(JitterBufferItem::Packet(pending_buffer)) => { let mut list = gst::BufferList::new(); let list_mut = list.make_mut(); @@ -1862,6 +1864,7 @@ impl ElementImpl for RtpRecv { let mut state = self.state.lock().unwrap(); let mut removed_pads = vec![]; let mut removed_session_ids = vec![]; + let mut removed_srcpads_session_ids = vec![]; if let Some(&id) = state.pads_session_id_map.get(pad) { removed_pads.push(pad.clone()); if let Some(session) = state.mut_session_by_id(id) { @@ -1869,7 +1872,7 @@ impl ElementImpl for RtpRecv { session.rtp_recv_sinkpad = None; removed_pads.extend(session.rtp_recv_srcpads.iter().map(|r| r.pad.clone())); session.recv_flow_combiner.lock().unwrap().clear(); - session.rtp_recv_srcpads.clear(); + removed_srcpads_session_ids.push(id); session.recv_store.clear(); } @@ -1882,6 +1885,10 @@ impl ElementImpl for RtpRecv { } } } + + for pad in removed_pads.iter() { + state.pads_session_id_map.remove(pad); + } drop(state); for pad in removed_pads.iter() { @@ -1894,9 +1901,10 @@ impl ElementImpl for RtpRecv { { let mut state = self.state.lock().unwrap(); - - for pad in removed_pads.iter() { - state.pads_session_id_map.remove(pad); + for id in removed_srcpads_session_ids { + if let Some(session) = state.mut_session_by_id(id) { + session.rtp_recv_srcpads.clear(); + } } for id in removed_session_ids { if let Some(session) = state.mut_session_by_id(id) { diff --git a/net/rtp/tests/rtpbin2.rs b/net/rtp/tests/rtpbin2.rs index bf4cc098..a056ffa5 100644 --- a/net/rtp/tests/rtpbin2.rs +++ b/net/rtp/tests/rtpbin2.rs @@ -55,8 +55,11 @@ struct PacketInfo { } impl PacketInfo { - fn generate_buffer(&self) -> gst::Buffer { - generate_rtp_buffer(self.seq_no, self.rtp_ts, self.payload_len) + fn generate_buffer(&self, dts: Option) -> gst::Buffer { + let mut buf = generate_rtp_buffer(self.seq_no, self.rtp_ts, self.payload_len); + let buf_mut = buf.make_mut(); + buf_mut.set_dts(dts); + buf } } @@ -91,7 +94,7 @@ where let mut list = gst::BufferList::new(); let list_mut = list.make_mut(); for packet in packets { - list_mut.add(packet.generate_buffer()); + list_mut.add(packet.generate_buffer(None)); } let push_pad = h .element() @@ -103,7 +106,7 @@ where push_pad.push_list(list).unwrap(); } else { for packet in packets { - h.push(packet.generate_buffer()).unwrap(); + h.push(packet.generate_buffer(None)).unwrap(); } } } @@ -183,7 +186,7 @@ fn test_send_benchmark() { rtp_ts: i as u32, payload_len: 8, } - .generate_buffer(), + .generate_buffer(None), ) } @@ -231,7 +234,7 @@ fn test_send_list_benchmark() { rtp_ts: i as u32, payload_len: 8, } - .generate_buffer(), + .generate_buffer(None), ); } let mut h = send_init(); @@ -330,12 +333,12 @@ where let mut list = gst::BufferList::new(); let list_mut = list.make_mut(); for packet in packets { - list_mut.add(packet.generate_buffer()); + list_mut.add(packet.generate_buffer(None)); } push_pad.push_list(list).unwrap(); } else { for packet in packets { - push_pad.push(packet.generate_buffer()).unwrap(); + push_pad.push(packet.generate_buffer(None)).unwrap(); } } } @@ -471,7 +474,8 @@ fn test_receive_flush() { fn test_receive_benchmark() { init(); let clock = gst::SystemClock::obtain(); - const N_PACKETS: usize = 1024 * 1024; + //const N_PACKETS: usize = 1024 * 1024; + const N_PACKETS: usize = 1024; let mut packets = Vec::with_capacity(N_PACKETS); for i in 0..N_PACKETS { packets.push( @@ -480,7 +484,7 @@ fn test_receive_benchmark() { rtp_ts: i as u32, payload_len: 8, } - .generate_buffer(), + .generate_buffer(None), ) } @@ -554,7 +558,7 @@ fn test_receive_list_benchmark() { rtp_ts: (p * N_PACKETS + i) as u32, payload_len: 8, } - .generate_buffer(), + .generate_buffer(None), ); } lists.push(list); @@ -574,7 +578,10 @@ fn test_receive_list_benchmark() { let buffer = inner.pull().unwrap(); let mapped = buffer.map_readable().unwrap(); let rtp = rtp_types::RtpPacket::parse(&mapped).unwrap(); - assert_eq!(rtp.sequence_number(), ((p * N_PACKETS + i) % u16::MAX as usize) as u16); + assert_eq!( + rtp.sequence_number(), + ((p * N_PACKETS + i) % u16::MAX as usize) as u16 + ); } } @@ -591,3 +598,66 @@ fn test_receive_list_benchmark() { pull_time.display() ); } + +#[test] +fn recv_release_sink_pad() { + init(); + + let id = next_element_counter(); + + let elem = gst::ElementFactory::make("rtprecv") + .property("rtp-id", id.to_string()) + .build() + .unwrap(); + elem.set_state(gst::State::Playing).unwrap(); + let sinkpad = elem.request_pad_simple("rtp_sink_0").unwrap(); + let stream_start = gst::event::StreamStart::new("random"); + sinkpad.send_event(stream_start); + let caps = Caps::builder("application/x-rtp") + .field("media", "audio") + .field("payload", TEST_PT as i32) + .field("clock-rate", TEST_CLOCK_RATE as i32) + .field("encoding-name", "custom-test") + .build(); + sinkpad.send_event(gst::event::Caps::new(&caps)); + let segment = gst::FormattedSegment::::new(); + sinkpad.send_event(gst::event::Segment::new(&segment)); + + let (sender, recv) = std::sync::mpsc::sync_channel(1); + elem.connect_pad_added({ + let sender = sender.clone(); + move |_elem, pad| { + let other_pad = gst::Pad::builder(gst::PadDirection::Sink) + .chain_function(|_pad, _parent, _buffer| Ok(gst::FlowSuccess::Ok)) + .build(); + other_pad.set_active(true).unwrap(); + pad.link(&other_pad).unwrap(); + sender.send(other_pad).unwrap(); + } + }); + // push two buffers to get past the rtpsource validation + sinkpad + .chain( + PacketInfo { + seq_no: 30, + rtp_ts: 10, + payload_len: 4, + } + .generate_buffer(Some(gst::ClockTime::from_mseconds(50))), + ) + .unwrap(); + sinkpad + .chain( + PacketInfo { + seq_no: 31, + rtp_ts: 10, + payload_len: 4, + } + .generate_buffer(Some(gst::ClockTime::from_mseconds(100))), + ) + .unwrap(); + let _other_pad = recv.recv().unwrap(); + + elem.release_request_pad(&sinkpad); + elem.set_state(gst::State::Null).unwrap(); +}