rtp/recv: support buffers lists on rtp sink pad

In one case, improves throughput by 25% when buffer lists are used.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1618>
This commit is contained in:
Matthew Waters 2024-06-12 17:52:41 +10:00 committed by GStreamer Marge Bot
parent df4a4fb2ef
commit d036abb7d2
3 changed files with 631 additions and 225 deletions

View file

@ -2,6 +2,7 @@
use std::collections::{BTreeMap, HashMap};
use std::net::SocketAddr;
use std::ops::ControlFlow;
use std::pin::Pin;
use std::sync::{Arc, Mutex, MutexGuard};
use std::task::{Poll, Waker};
@ -194,8 +195,30 @@ impl RtpRecvSrcPad {
struct HeldRecvBuffer {
hold_id: Option<usize>,
buffer: gst::Buffer,
pad: RtpRecvSrcPad,
new_pad: bool,
jb: Arc<Mutex<JitterBufferStore>>,
}
#[derive(Debug)]
struct HeldRecvBufferList {
list: gst::BufferList,
jb: Arc<Mutex<JitterBufferStore>>,
}
#[derive(Debug)]
enum HeldRecvItem {
NewPad(RtpRecvSrcPad),
Buffer(HeldRecvBuffer),
BufferList(HeldRecvBufferList),
}
impl HeldRecvItem {
fn hold_id(&self) -> Option<usize> {
match self {
Self::NewPad(_) => None,
Self::Buffer(buf) => buf.hold_id,
Self::BufferList(_list) => None,
}
}
}
#[derive(Debug)]
@ -209,7 +232,7 @@ struct RecvSession {
rtp_recv_sink_segment: Option<gst::FormattedSegment<gst::ClockTime>>,
rtp_recv_sink_seqnum: Option<gst::Seqnum>,
recv_store: Vec<HeldRecvBuffer>,
recv_store: Vec<HeldRecvItem>,
rtp_recv_srcpads: Vec<RtpRecvSrcPad>,
recv_flow_combiner: Arc<Mutex<gst_base::UniqueFlowCombiner>>,
@ -239,7 +262,7 @@ impl RecvSession {
}
}
fn start_rtp_recv_task(&mut self, pad: &gst::Pad) -> Result<(), glib::BoolError> {
fn start_rtp_task(&mut self, pad: &gst::Pad) -> Result<(), glib::BoolError> {
gst::debug!(CAT, obj: pad, "Starting rtp recv src task");
let recv_pad = self
@ -301,7 +324,7 @@ impl RecvSession {
Ok(())
}
fn stop_rtp_recv_task(&mut self, pad: &gst::Pad) -> Result<(), glib::BoolError> {
fn stop_rtp_task(&mut self, pad: &gst::Pad) -> Result<(), glib::BoolError> {
gst::debug!(CAT, obj: pad, "Stopping rtp recv src task");
let recv_pad = self
.rtp_recv_srcpads
@ -318,7 +341,7 @@ impl RecvSession {
Ok(())
}
fn get_or_create_rtp_recv_src(
fn get_or_create_rtp_src(
&mut self,
rtpbin: &RtpRecv,
pt: u8,
@ -352,7 +375,7 @@ impl RecvSession {
RtpRecv::catch_panic_pad_function(
parent,
|| false,
|this| this.rtp_recv_src_event(pad, event, id, pt, ssrc),
|this| this.rtp_src_event(pad, event, id, pt, ssrc),
)
})
.activatemode_function({
@ -364,7 +387,7 @@ impl RecvSession {
glib::bool_error!("rtprecv does not exist anymore"),
));
};
this.rtp_recv_src_activatemode(pad, mode, active, id)
this.rtp_src_activatemode(pad, mode, active, id)
}
})
.name(format!("rtp_src_{}_{}_{}", id, pt, ssrc))
@ -401,7 +424,13 @@ struct State {
sessions: Vec<RecvSession>,
max_session_id: usize,
pads_session_id_map: HashMap<gst::Pad, usize>,
sync_context: Option<sync::Context>,
}
enum RecvRtpBuffer {
IsRtcp(gst::Buffer),
SsrcCollision(u32),
Forward((gst::Buffer, Arc<Mutex<JitterBufferStore>>)),
Drop,
}
impl State {
@ -441,10 +470,11 @@ impl State {
pub struct RtpRecv {
settings: Mutex<Settings>,
state: Arc<Mutex<State>>,
sync_context: Arc<Mutex<Option<sync::Context>>>,
}
impl RtpRecv {
fn rtp_recv_src_activatemode(
fn rtp_src_activatemode(
&self,
pad: &gst::Pad,
mode: gst::PadMode,
@ -465,9 +495,9 @@ impl RtpRecv {
};
if active {
session.start_rtp_recv_task(pad)?;
session.start_rtp_task(pad)?;
} else {
session.stop_rtp_recv_task(pad)?;
session.stop_rtp_task(pad)?;
gst::debug!(CAT, obj: pad, "Stopping task");
@ -535,17 +565,15 @@ impl RtpRecv {
gst::Iterator::from_vec(vec![])
}
fn rtp_recv_sink_chain(
fn handle_buffer_locked<const H: usize, const P: usize>(
&self,
pad: &gst::Pad,
id: usize,
session: &mut RecvSession,
mut buffer: gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> {
let mut state = self.state.lock().unwrap();
let Some(session) = state.mut_session_by_id(id) else {
return Err(gst::FlowError::Error);
};
now: Instant,
items_to_pre_push: &mut smallvec::SmallVec<[HeldRecvItem; P]>,
held_buffers: &mut smallvec::SmallVec<[HeldRecvBuffer; H]>,
) -> Result<RecvRtpBuffer, gst::FlowError> {
// TODO: this is different from the old C implementation, where we
// simply used the RTP timestamps as they were instead of doing any
// sort of skew calculations.
@ -573,8 +601,6 @@ impl RtpRecv {
},
};
gst::trace!(CAT, obj: pad, "using arrival time {}", arrival_time);
let addr: Option<SocketAddr> =
buffer
.meta::<gst_net::NetAddressMeta>()
@ -589,6 +615,7 @@ impl RtpRecv {
gst::error!(CAT, imp: self, "Failed to map input buffer {e:?}");
gst::FlowError::Error
})?;
let rtp = match rtp_types::RtpPacket::parse(&mapped) {
Ok(rtp) => rtp,
Err(e) => {
@ -598,59 +625,47 @@ impl RtpRecv {
.is_ok_and(|mut rtcp| rtcp.next().is_some_and(|rtcp| rtcp.is_ok()))
{
drop(mapped);
return Self::rtcp_recv_sink_chain(self, id, buffer);
return Ok(RecvRtpBuffer::IsRtcp(buffer));
}
gst::error!(CAT, imp: self, "Failed to parse input as valid rtp packet: {e:?}");
return Ok(gst::FlowSuccess::Ok);
return Ok(RecvRtpBuffer::Drop);
}
};
let internal_session = session.internal_session.clone();
gst::trace!(CAT, obj: pad, "using arrival time {}", arrival_time);
let internal_session = session.internal_session.clone();
let mut session_inner = internal_session.inner.lock().unwrap();
if state
.sync_context
.as_ref()
.unwrap()
.clock_rate(rtp.ssrc())
.is_none()
{
let clock_rate = session_inner
.session
.clock_rate_from_pt(rtp.payload_type())
.unwrap();
state
.sync_context
.as_mut()
.unwrap()
.set_clock_rate(rtp.ssrc(), clock_rate);
}
let pts = {
let mut sync_context = self.sync_context.lock().unwrap();
let sync_context = sync_context.as_mut().unwrap();
if !sync_context.has_clock_rate(rtp.ssrc()) {
let clock_rate = session_inner
.session
.clock_rate_from_pt(rtp.payload_type())
.unwrap();
sync_context.set_clock_rate(rtp.ssrc(), clock_rate);
}
// TODO: Put NTP time as `gst::ReferenceTimeStampMeta` on the buffers if selected via property
let (pts, _ntp_time) =
sync_context.calculate_pts(rtp.ssrc(), rtp.timestamp(), arrival_time.nseconds());
pts
};
// TODO: Put NTP time as `gst::ReferenceTimeStampMeta` on the buffers if selected via property
let (pts, _ntp_time) = state.sync_context.as_mut().unwrap().calculate_pts(
rtp.ssrc(),
rtp.timestamp(),
arrival_time.nseconds(),
);
let session = state.mut_session_by_id(id).unwrap();
let segment = session.rtp_recv_sink_segment.as_ref().unwrap();
let pts = segment
.position_from_running_time(gst::ClockTime::from_nseconds(pts))
.unwrap();
gst::debug!(CAT, "Calculated PTS: {}", pts);
gst::debug!(CAT, obj: pad, "Calculated PTS: {}", pts);
let now = Instant::now();
let mut buffers_to_push = vec![];
let mut ssrc_collision = vec![];
loop {
match session_inner.session.handle_recv(&rtp, addr, now) {
RecvReply::SsrcCollision(ssrc) => {
if !ssrc_collision.iter().any(|&needle| needle == ssrc) {
ssrc_collision.push(ssrc);
}
}
let recv_ret = session_inner.session.handle_recv(&rtp, addr, now);
gst::trace!(CAT, obj: pad, "session handle_recv ret: {recv_ret:?}");
match recv_ret {
RecvReply::SsrcCollision(ssrc) => return Ok(RecvRtpBuffer::SsrcCollision(ssrc)),
RecvReply::NewSsrc(ssrc, _pt) => {
drop(session_inner);
internal_session
@ -666,36 +681,45 @@ impl RtpRecv {
let buf_mut = buffer.make_mut();
buf_mut.set_pts(pts);
}
let (pad, new_pad) = session.get_or_create_rtp_recv_src(self, pt, ssrc);
session.recv_store.push(HeldRecvBuffer {
let (pad, new_pad) = session.get_or_create_rtp_src(self, pt, ssrc);
let jb = pad.jitter_buffer_store.clone();
if new_pad {
items_to_pre_push.push(HeldRecvItem::NewPad(pad));
}
held_buffers.push(HeldRecvBuffer {
hold_id: Some(hold_id),
buffer,
pad,
new_pad,
jb,
});
break;
}
RecvReply::Drop(hold_id) => {
if let Some(pos) = session
if let Some(pos) = held_buffers.iter().position(|b| b.hold_id == Some(hold_id))
{
held_buffers.remove(pos);
} else if let Some(pos) = session
.recv_store
.iter()
.position(|b| b.hold_id.unwrap() == hold_id)
.position(|b| b.hold_id() == Some(hold_id))
{
session.recv_store.remove(pos);
}
}
RecvReply::Forward(hold_id) => {
if let Some(pos) = session
if let Some(pos) = held_buffers.iter().position(|b| b.hold_id == Some(hold_id))
{
items_to_pre_push.push(HeldRecvItem::Buffer(held_buffers.remove(pos)));
} else if let Some(pos) = session
.recv_store
.iter()
.position(|b| b.hold_id.unwrap() == hold_id)
.position(|b| b.hold_id() == Some(hold_id))
{
buffers_to_push.push(session.recv_store.remove(pos));
items_to_pre_push.push(session.recv_store.remove(pos));
} else {
unreachable!();
}
}
RecvReply::Ignore => break,
RecvReply::Ignore => return Ok(RecvRtpBuffer::Drop),
RecvReply::Passthrough => {
let pt = rtp.payload_type();
let ssrc = rtp.ssrc();
@ -704,20 +728,26 @@ impl RtpRecv {
let buf_mut = buffer.make_mut();
buf_mut.set_pts(pts);
}
let (pad, new_pad) = session.get_or_create_rtp_recv_src(self, pt, ssrc);
buffers_to_push.push(HeldRecvBuffer {
hold_id: None,
buffer,
pad,
new_pad,
});
break;
let (pad, new_pad) = session.get_or_create_rtp_src(self, pt, ssrc);
let jb = pad.jitter_buffer_store.clone();
if new_pad {
items_to_pre_push.push(HeldRecvItem::NewPad(pad));
}
return Ok(RecvRtpBuffer::Forward((buffer, jb)));
}
}
}
let send_rtp_sink = session_inner.rtp_send_sinkpad.clone();
Ok(RecvRtpBuffer::Drop)
}
fn handle_ssrc_collision(
&self,
session: &mut RecvSession,
ssrc_collision: impl IntoIterator<Item = u32>,
) -> Result<gst::FlowSuccess, gst::FlowError> {
let session_inner = session.internal_session.inner.lock().unwrap();
let send_rtp_sink = session_inner.rtp_send_sinkpad.clone();
drop(session_inner);
if let Some(pad) = send_rtp_sink {
@ -735,63 +765,290 @@ impl RtpRecv {
}
}
Ok(gst::FlowSuccess::Ok)
}
fn handle_push_jitterbuffer<'a>(
&'a self,
mut state: MutexGuard<'a, State>,
id: usize,
buffers_to_push: impl IntoIterator<Item = HeldRecvItem>,
now: Instant,
) -> Result<MutexGuard<'a, State>, gst::FlowError> {
for mut held in buffers_to_push {
// TODO: handle other processing
if held.new_pad {
state.pads_session_id_map.insert(held.pad.pad.clone(), id);
// drops the state lock
held.pad.activate(state, id);
self.obj().add_pad(&held.pad.pad).unwrap();
state = self.state.lock().unwrap();
}
let mapped = held.buffer.map_readable().map_err(|e| {
gst::error!(CAT, imp: self, "Failed to map input buffer {e:?}");
gst::FlowError::Error
})?;
let rtp = match rtp_types::RtpPacket::parse(&mapped) {
Ok(rtp) => rtp,
Err(e) => {
gst::error!(CAT, imp: self, "Failed to parse input as valid rtp packet: {e:?}");
return Ok(gst::FlowSuccess::Ok);
match held {
HeldRecvItem::NewPad(ref mut pad) => {
// TODO: handle other processing
state.pads_session_id_map.insert(pad.pad.clone(), id);
// drops the state lock
pad.activate(state, id);
self.obj().add_pad(&pad.pad).unwrap();
state = self.state.lock().unwrap();
}
};
HeldRecvItem::Buffer(buffer) => {
let mapped = buffer.buffer.map_readable().map_err(|e| {
gst::error!(CAT, imp: self, "Failed to map input buffer {e:?}");
gst::FlowError::Error
})?;
let rtp = match rtp_types::RtpPacket::parse(&mapped) {
Ok(rtp) => rtp,
Err(e) => {
gst::error!(CAT, imp: self, "Failed to parse input as valid rtp packet: {e:?}");
return Ok(state);
}
};
// FIXME: Should block if too many packets are stored here because the source pad task
// is blocked
let mut jitterbuffer_store = held.pad.jitter_buffer_store.lock().unwrap();
// FIXME: Should block if too many packets are stored here because the source pad task
// is blocked
let mut jitterbuffer_store = buffer.jb.lock().unwrap();
match jitterbuffer_store.jitterbuffer.queue_packet(
&rtp,
held.buffer.pts().unwrap().nseconds(),
now,
) {
jitterbuffer::QueueResult::Flushing => {
// TODO: return flushing result upstream
}
jitterbuffer::QueueResult::Queued(id) => {
drop(mapped);
let ret = jitterbuffer_store.jitterbuffer.queue_packet(
&rtp,
buffer.buffer.pts().unwrap().nseconds(),
now,
);
gst::trace!(CAT, "jb queue buffer: {ret:?}");
match ret {
jitterbuffer::QueueResult::Flushing => {
// TODO: return flushing result upstream
}
jitterbuffer::QueueResult::Queued(id) => {
drop(mapped);
jitterbuffer_store
.store
.insert(id, JitterBufferItem::Packet(held.buffer));
if let Some(waker) = jitterbuffer_store.waker.take() {
waker.wake()
jitterbuffer_store
.store
.insert(id, JitterBufferItem::Packet(buffer.buffer));
if let Some(waker) = jitterbuffer_store.waker.take() {
waker.wake()
}
}
jitterbuffer::QueueResult::Late => {
gst::warning!(CAT, "Late buffer was dropped");
}
jitterbuffer::QueueResult::Duplicate => {
gst::warning!(CAT, "Duplicate buffer was dropped");
}
}
}
jitterbuffer::QueueResult::Late => {
gst::warning!(CAT, "Late buffer was dropped");
}
jitterbuffer::QueueResult::Duplicate => {
gst::warning!(CAT, "Duplicate buffer was dropped");
HeldRecvItem::BufferList(list) => {
// FIXME: Should block if too many packets are stored here because the source pad task
// is blocked
let mut jitterbuffer_store = list.jb.lock().unwrap();
for buffer in list.list.iter_owned() {
let mapped = buffer.map_readable().map_err(|e| {
gst::error!(CAT, imp: self, "Failed to map input buffer {e:?}");
gst::FlowError::Error
})?;
let rtp = match rtp_types::RtpPacket::parse(&mapped) {
Ok(rtp) => rtp,
Err(e) => {
gst::error!(CAT, imp: self, "Failed to parse input as valid rtp packet: {e:?}");
return Ok(state);
}
};
let ret = jitterbuffer_store.jitterbuffer.queue_packet(
&rtp,
buffer.pts().unwrap().nseconds(),
now,
);
gst::trace!(CAT, "jb queue buffer in list: {ret:?}");
match ret {
jitterbuffer::QueueResult::Flushing => {
return Err(gst::FlowError::Flushing);
}
jitterbuffer::QueueResult::Queued(id) => {
drop(mapped);
jitterbuffer_store
.store
.insert(id, JitterBufferItem::Packet(buffer));
if let Some(waker) = jitterbuffer_store.waker.take() {
waker.wake()
}
}
jitterbuffer::QueueResult::Late => {
gst::warning!(CAT, "Late buffer was dropped");
}
jitterbuffer::QueueResult::Duplicate => {
gst::warning!(CAT, "Duplicate buffer was dropped");
}
}
}
}
}
}
Ok(state)
}
fn rtp_sink_chain_list(
&self,
pad: &gst::Pad,
id: usize,
mut list: gst::BufferList,
) -> Result<gst::FlowSuccess, gst::FlowError> {
let mut state = self.state.lock().unwrap();
let Some(session) = state.mut_session_by_id(id) else {
return Err(gst::FlowError::Error);
};
let now = Instant::now();
let mut ssrc_collision: smallvec::SmallVec<[u32; 4]> = Default::default();
let mut items_to_pre_push: smallvec::SmallVec<[HeldRecvItem; 4]> =
smallvec::SmallVec::with_capacity(list.len() + 2);
let mut held_buffers: smallvec::SmallVec<[HeldRecvBuffer; 4]> = Default::default();
let mut split_bufferlist = false;
let mut previous_jb = None;
let list_mut = list.make_mut();
let mut ret = Ok(());
list_mut.foreach_mut(|buffer, _i| {
match self.handle_buffer_locked(
pad,
session,
buffer,
now,
&mut items_to_pre_push,
&mut held_buffers,
) {
Ok(RecvRtpBuffer::SsrcCollision(ssrc)) => {
ssrc_collision.push(ssrc);
ControlFlow::Continue(None)
}
Ok(RecvRtpBuffer::IsRtcp(buffer)) => {
match Self::rtcp_sink_chain(self, id, buffer) {
Ok(_buf) => ControlFlow::Continue(None),
Err(e) => {
ret = Err(e);
ControlFlow::Break(None)
}
}
}
Ok(RecvRtpBuffer::Drop) => ControlFlow::Continue(None),
Ok(RecvRtpBuffer::Forward((buffer, jb))) => {
// if all the buffers do not end up in the same jitterbuffer, then we need to
// split
if !split_bufferlist
&& previous_jb
.as_ref()
.map_or(false, |previous| !Arc::ptr_eq(previous, &jb))
{
split_bufferlist = true;
}
previous_jb = Some(jb);
ControlFlow::Continue(Some(buffer))
}
Err(e) => {
ret = Err(e);
ControlFlow::Break(None)
}
}
});
ret?;
session
.recv_store
.extend(held_buffers.into_iter().map(HeldRecvItem::Buffer));
self.handle_ssrc_collision(session, ssrc_collision)?;
state = self.handle_push_jitterbuffer(state, id, items_to_pre_push, now)?;
if split_bufferlist {
// this abomination is to work around passing state through handle_push_jitterbuffer
// inside a closure
let mut maybe_state = Some(state);
list_mut.foreach_mut({
let maybe_state = &mut maybe_state;
|buffer, _i| match self.handle_push_jitterbuffer(
maybe_state.take().unwrap(),
id,
[HeldRecvItem::Buffer(HeldRecvBuffer {
hold_id: None,
buffer,
jb: previous_jb.clone().unwrap(),
})],
now,
) {
Ok(state) => {
*maybe_state = Some(state);
ControlFlow::Continue(None)
}
Err(e) => {
ret = Err(e);
ControlFlow::Break(None)
}
}
});
state = maybe_state.unwrap();
ret?;
} else {
state = self.handle_push_jitterbuffer(
state,
id,
[HeldRecvItem::BufferList(HeldRecvBufferList {
list,
jb: previous_jb.unwrap(),
})],
now,
)?;
}
drop(state);
Ok(gst::FlowSuccess::Ok)
}
fn rtcp_recv_sink_chain(
fn rtp_sink_chain(
&self,
pad: &gst::Pad,
id: usize,
buffer: gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> {
let mut state = self.state.lock().unwrap();
let Some(session) = state.mut_session_by_id(id) else {
return Err(gst::FlowError::Error);
};
let now = Instant::now();
let mut items_to_pre_push: smallvec::SmallVec<[HeldRecvItem; 4]> = Default::default();
let mut held_buffers: smallvec::SmallVec<[HeldRecvBuffer; 4]> = Default::default();
let forward = match self.handle_buffer_locked(
pad,
session,
buffer,
now,
&mut items_to_pre_push,
&mut held_buffers,
)? {
RecvRtpBuffer::SsrcCollision(ssrc) => {
return self.handle_ssrc_collision(session, [ssrc])
}
RecvRtpBuffer::IsRtcp(buffer) => return Self::rtcp_sink_chain(self, id, buffer),
RecvRtpBuffer::Drop => None,
RecvRtpBuffer::Forward((buffer, jb)) => Some((buffer, jb)),
};
session
.recv_store
.extend(held_buffers.into_iter().map(HeldRecvItem::Buffer));
state = self.handle_push_jitterbuffer(state, id, items_to_pre_push, now)?;
if let Some((buffer, jb)) = forward {
state = self.handle_push_jitterbuffer(
state,
id,
[HeldRecvItem::Buffer(HeldRecvBuffer {
hold_id: None,
buffer,
jb,
})],
now,
)?;
}
drop(state);
Ok(gst::FlowSuccess::Ok)
}
fn rtcp_sink_chain(
&self,
id: usize,
buffer: gst::Buffer,
@ -881,15 +1138,14 @@ impl RtpRecv {
}
}
RtcpRecvReply::NewCName((cname, ssrc)) => {
let mut state = self.state.lock().unwrap();
let mut sync_context = self.sync_context.lock().unwrap();
state.sync_context.as_mut().unwrap().associate(ssrc, &cname);
sync_context.as_mut().unwrap().associate(ssrc, &cname);
}
RtcpRecvReply::NewRtpNtp((ssrc, rtp, ntp)) => {
let mut state = self.state.lock().unwrap();
let mut sync_context = self.sync_context.lock().unwrap();
state
.sync_context
sync_context
.as_mut()
.unwrap()
.add_sender_report(ssrc, rtp, ntp);
@ -904,12 +1160,7 @@ impl RtpRecv {
Ok(gst::FlowSuccess::Ok)
}
pub fn rtp_recv_sink_query(
&self,
pad: &gst::Pad,
query: &mut gst::QueryRef,
id: usize,
) -> bool {
pub fn rtp_sink_query(&self, pad: &gst::Pad, query: &mut gst::QueryRef, id: usize) -> bool {
gst::log!(CAT, obj: pad, "Handling query {query:?}");
if query.is_serialized() {
@ -985,7 +1236,7 @@ impl RtpRecv {
// through the relevant jitterbuffers in order to remain (reasonably)
// consistently ordered with the RTP packets once output on our source
// pads
fn rtp_recv_sink_queue_serialized_event(&self, id: usize, event: gst::Event) -> bool {
fn rtp_sink_queue_serialized_event(&self, id: usize, event: gst::Event) -> bool {
let state = self.state.lock().unwrap();
if let Some(session) = state.session_by_id(id) {
for srcpad in session
@ -1013,7 +1264,7 @@ impl RtpRecv {
true
}
fn rtp_recv_sink_event(&self, pad: &gst::Pad, mut event: gst::Event, id: usize) -> bool {
fn rtp_sink_event(&self, pad: &gst::Pad, mut event: gst::Event, id: usize) -> bool {
match event.view() {
gst::EventView::StreamStart(stream_start) => {
let mut state = self.state.lock().unwrap();
@ -1070,7 +1321,7 @@ impl RtpRecv {
drop(state);
self.rtp_recv_sink_queue_serialized_event(id, event)
self.rtp_sink_queue_serialized_event(id, event)
}
gst::EventView::Eos(_eos) => {
let now = Instant::now();
@ -1105,7 +1356,7 @@ impl RtpRecv {
}
drop(state);
// FIXME: may need to delay sending eos under some circumstances
self.rtp_recv_sink_queue_serialized_event(id, event);
self.rtp_sink_queue_serialized_event(id, event);
true
}
gst::EventView::FlushStart(_fs) => {
@ -1137,15 +1388,15 @@ impl RtpRecv {
.collect::<Vec<_>>();
for pad in pads {
// Will reset flushing to false and ensure task is woken up
let _ = session.start_rtp_recv_task(&pad);
let _ = session.start_rtp_task(&pad);
}
}
drop(state);
self.rtp_recv_sink_queue_serialized_event(id, event)
self.rtp_sink_queue_serialized_event(id, event)
}
_ => {
if event.is_serialized() {
self.rtp_recv_sink_queue_serialized_event(id, event)
self.rtp_sink_queue_serialized_event(id, event)
} else {
gst::Pad::event_default(pad, Some(&*self.obj()), event)
}
@ -1153,7 +1404,7 @@ impl RtpRecv {
}
}
fn rtp_recv_src_event(
fn rtp_src_event(
&self,
pad: &gst::Pad,
event: gst::Event,
@ -1220,6 +1471,7 @@ impl ObjectSubclass for RtpRecv {
Self {
settings: Default::default(),
state: Default::default(),
sync_context: Default::default(),
}
}
}
@ -1417,11 +1669,18 @@ impl ElementImpl for RtpRecv {
Vec<gst::Event>,
)> {
let sinkpad = gst::Pad::builder_from_template(templ)
.chain_list_function(move |pad, parent, buffer_list| {
RtpRecv::catch_panic_pad_function(
parent,
|| Err(gst::FlowError::Error),
|this| this.rtp_sink_chain_list(pad, id, buffer_list),
)
})
.chain_function(move |pad, parent, buffer| {
RtpRecv::catch_panic_pad_function(
parent,
|| Err(gst::FlowError::Error),
|this| this.rtp_recv_sink_chain(pad, id, buffer),
|this| this.rtp_sink_chain(pad, id, buffer),
)
})
.iterate_internal_links_function(|pad, parent| {
@ -1435,14 +1694,14 @@ impl ElementImpl for RtpRecv {
RtpRecv::catch_panic_pad_function(
parent,
|| false,
|this| this.rtp_recv_sink_event(pad, event, id),
|this| this.rtp_sink_event(pad, event, id),
)
})
.query_function(move |pad, parent, query| {
RtpRecv::catch_panic_pad_function(
parent,
|| false,
|this| this.rtp_recv_sink_query(pad, query, id),
|this| this.rtp_sink_query(pad, query, id),
)
})
.name(format!("rtp_sink_{}", id))
@ -1480,7 +1739,7 @@ impl ElementImpl for RtpRecv {
RtpRecv::catch_panic_pad_function(
parent,
|| Err(gst::FlowError::Error),
|this| this.rtcp_recv_sink_chain(id, buffer),
|this| this.rtcp_sink_chain(id, buffer),
)
})
.iterate_internal_links_function(|pad, parent| {
@ -1619,9 +1878,9 @@ impl ElementImpl for RtpRecv {
}
gst::StateChange::ReadyToPaused => {
let settings = self.settings.lock().unwrap();
let mut state = self.state.lock().unwrap();
let mut sync_context = self.sync_context.lock().unwrap();
state.sync_context = Some(sync::Context::new(settings.timestamping_mode));
*sync_context = Some(sync::Context::new(settings.timestamping_mode));
}
_ => (),
}
@ -1647,7 +1906,9 @@ impl ElementImpl for RtpRecv {
session.rtp_recv_sink_seqnum = None;
session.rtp_recv_sink_group_id = None;
}
state.sync_context = None;
let mut sync_context = self.sync_context.lock().unwrap();
*sync_context = None;
drop(sync_context);
drop(state);
for pad in removed_pads.iter() {

View file

@ -118,8 +118,8 @@ impl Context {
}
}
pub fn clock_rate(&self, ssrc_val: u32) -> Option<u32> {
self.ssrcs.get(&ssrc_val).and_then(|ssrc| ssrc.clock_rate)
pub fn has_clock_rate(&self, ssrc_val: u32) -> bool {
self.ssrcs.contains_key(&ssrc_val)
}
fn disassociate(&mut self, ssrc_val: u32, cname: &str) {

View file

@ -60,19 +60,6 @@ impl PacketInfo {
}
}
static PACKETS_TEST_1: [PacketInfo; 2] = [
PacketInfo {
seq_no: 500,
rtp_ts: 20,
payload_len: 7,
},
PacketInfo {
seq_no: 501,
rtp_ts: 30,
payload_len: 23,
},
];
fn send_init() -> gst_check::Harness {
init();
@ -283,9 +270,7 @@ fn test_send_list_benchmark() {
);
}
#[test]
fn test_receive() {
init();
fn receive_init() -> Arc<Mutex<gst_check::Harness>> {
let id = next_element_counter();
let elem = gst::ElementFactory::make("rtprecv")
@ -311,7 +296,6 @@ fn test_receive() {
.add_element_src_pad(pad)
});
inner.play();
let caps = Caps::builder("application/x-rtp")
.field("media", "audio")
.field("payload", TEST_PT as i32)
@ -320,6 +304,16 @@ fn test_receive() {
.build();
inner.set_src_caps(caps);
drop(inner);
h
}
fn receive_push<I>(h: Arc<Mutex<gst_check::Harness>>, packets: I, buffer_list: bool)
where
I: IntoIterator<Item = PacketInfo>,
{
let inner = h.lock().unwrap();
// Cannot push with harness lock as the 'pad-added' handler needs to add the newly created pad to
// the harness and needs to also take the harness lock. Workaround by pushing from the
// internal harness pad directly.
@ -331,21 +325,48 @@ fn test_receive() {
.peer()
.unwrap();
drop(inner);
push_pad.push(generate_rtp_buffer(500, 20, 9)).unwrap();
push_pad.push(generate_rtp_buffer(501, 30, 11)).unwrap();
if buffer_list {
let mut list = gst::BufferList::new();
let list_mut = list.make_mut();
for packet in packets {
list_mut.add(packet.generate_buffer());
}
push_pad.push_list(list).unwrap();
} else {
for packet in packets {
push_pad.push(packet.generate_buffer()).unwrap();
}
}
}
fn receive_pull<I>(h: Arc<Mutex<gst_check::Harness>>, packets: I)
where
I: IntoIterator<Item = PacketInfo>,
{
let mut inner = h.lock().unwrap();
for packet in packets {
let buffer = inner.pull().unwrap();
let mapped = buffer.map_readable().unwrap();
let rtp = rtp_types::RtpPacket::parse(&mapped).unwrap();
assert_eq!(rtp.sequence_number(), packet.seq_no);
}
}
let buffer = inner.pull().unwrap();
let mapped = buffer.map_readable().unwrap();
let rtp = rtp_types::RtpPacket::parse(&mapped).unwrap();
assert_eq!(rtp.sequence_number(), 500);
let buffer = inner.pull().unwrap();
let mapped = buffer.map_readable().unwrap();
let rtp = rtp_types::RtpPacket::parse(&mapped).unwrap();
assert_eq!(rtp.sequence_number(), 501);
fn receive_check_stats<I>(h: Arc<Mutex<gst_check::Harness>>, packets: I)
where
I: IntoIterator<Item = PacketInfo>,
{
let mut n_packets = 0;
let mut n_bytes = 0;
for packet in packets {
n_packets += 1;
n_bytes += packet.payload_len;
}
let inner = h.lock().unwrap();
let stats = inner.element().unwrap().property::<gst::Structure>("stats");
drop(inner);
let session_stats = stats.get::<gst::Structure>("0").unwrap();
let source_stats = session_stats
@ -367,12 +388,21 @@ fn test_receive() {
);
assert!(source_stats.get::<bool>("sender").unwrap());
assert!(!source_stats.get::<bool>("local").unwrap());
assert_eq!(source_stats.get::<u64>("packets-received").unwrap(), 2);
assert_eq!(source_stats.get::<u64>("octets-received").unwrap(), 20);
assert_eq!(
source_stats.get::<u64>("packets-received").unwrap(),
n_packets
);
assert_eq!(
source_stats.get::<u64>("octets-received").unwrap(),
n_bytes as u64
);
assert_eq!(jitterbuffer_stats.get::<u64>("num-late").unwrap(), 0);
assert_eq!(jitterbuffer_stats.get::<u64>("num-lost").unwrap(), 0);
assert_eq!(jitterbuffer_stats.get::<u64>("num-duplicates").unwrap(), 0);
assert_eq!(jitterbuffer_stats.get::<u64>("num-pushed").unwrap(), 2);
assert_eq!(
jitterbuffer_stats.get::<u64>("num-pushed").unwrap(),
n_packets
);
assert_eq!(jitterbuffer_stats.get::<i32>("pt").unwrap(), TEST_PT as i32);
assert_eq!(
jitterbuffer_stats.get::<i32>("ssrc").unwrap(),
@ -380,56 +410,46 @@ fn test_receive() {
);
}
static PACKETS_TEST_1: [PacketInfo; 2] = [
PacketInfo {
seq_no: 500,
rtp_ts: 20,
payload_len: 13,
},
PacketInfo {
seq_no: 501,
rtp_ts: 30,
payload_len: 7,
},
];
#[test]
fn test_receive() {
init();
let h = receive_init();
receive_push(h.clone(), PACKETS_TEST_1, false);
receive_pull(h.clone(), PACKETS_TEST_1);
receive_check_stats(h, PACKETS_TEST_1);
}
#[test]
fn test_receive_list() {
init();
let h = receive_init();
receive_push(h.clone(), PACKETS_TEST_1, true);
receive_pull(h.clone(), PACKETS_TEST_1);
receive_check_stats(h, PACKETS_TEST_1);
}
#[test]
fn test_receive_flush() {
init();
let id = next_element_counter();
let h = receive_init();
let elem = gst::ElementFactory::make("rtprecv")
.property("rtp-id", id.to_string())
.build()
.unwrap();
let h = Arc::new(Mutex::new(Harness::with_element(
&elem,
Some("rtp_sink_0"),
None,
)));
let weak_h = Arc::downgrade(&h);
let mut inner = h.lock().unwrap();
inner
.element()
.unwrap()
.connect_pad_added(move |_elem, pad| {
weak_h
.upgrade()
.unwrap()
.lock()
.unwrap()
.add_element_src_pad(pad)
});
inner.play();
receive_push(h.clone(), PACKETS_TEST_1, false);
let caps = Caps::builder("application/x-rtp")
.field("media", "audio")
.field("payload", TEST_PT as i32)
.field("clock-rate", TEST_CLOCK_RATE as i32)
.field("encoding-name", "custom-test")
.build();
inner.set_src_caps(caps);
// Cannot push with harness lock as the 'pad-added' handler needs to add the newly created pad to
// the harness and needs to also take the harness lock. Workaround by pushing from the
// internal harness pad directly.
let push_pad = inner
.element()
.unwrap()
.static_pad("rtp_sink_0")
.unwrap()
.peer()
.unwrap();
drop(inner);
push_pad.push(generate_rtp_buffer(500, 20, 9)).unwrap();
push_pad.push(generate_rtp_buffer(501, 30, 11)).unwrap();
let mut inner = h.lock().unwrap();
let seqnum = gst::Seqnum::next();
inner.push_event(gst::event::FlushStart::builder().seqnum(seqnum).build());
@ -446,3 +466,128 @@ fn test_receive_flush() {
};
assert_eq!(fs.seqnum(), seqnum);
}
#[test]
fn test_receive_benchmark() {
init();
let clock = gst::SystemClock::obtain();
const N_PACKETS: usize = 1024 * 1024;
let mut packets = Vec::with_capacity(N_PACKETS);
for i in 0..N_PACKETS {
packets.push(
PacketInfo {
seq_no: (i % u16::MAX as usize) as u16,
rtp_ts: i as u32,
payload_len: 8,
}
.generate_buffer(),
)
}
let h = receive_init();
let inner = h.lock().unwrap();
let push_pad = inner
.element()
.unwrap()
.static_pad("rtp_sink_0")
.unwrap()
.peer()
.unwrap();
drop(inner);
let start = clock.time();
for packet in packets {
push_pad.push(packet).unwrap();
}
let pushed = clock.time();
let mut inner = h.lock().unwrap();
for i in 0..N_PACKETS {
let buffer = inner.pull().unwrap();
let mapped = buffer.map_readable().unwrap();
let rtp = rtp_types::RtpPacket::parse(&mapped).unwrap();
assert_eq!(rtp.sequence_number(), (i % u16::MAX as usize) as u16);
}
let end = clock.time();
drop(inner);
let test_time = end.opt_sub(start);
let pull_time = end.opt_sub(pushed);
let push_time = pushed.opt_sub(start);
println!(
"test took {} (push {}, pull {})",
test_time.display(),
push_time.display(),
pull_time.display()
);
}
#[test]
fn test_receive_list_benchmark() {
init();
let clock = gst::SystemClock::obtain();
const N_PACKETS: usize = 32 * 1024;
const N_PUSHES: usize = 1024 / 32;
let h = receive_init();
let inner = h.lock().unwrap();
let push_pad = inner
.element()
.unwrap()
.static_pad("rtp_sink_0")
.unwrap()
.peer()
.unwrap();
drop(inner);
let mut lists = Vec::with_capacity(N_PUSHES);
for p in 0..N_PUSHES {
let mut list = gst::BufferList::new();
let list_mut = list.make_mut();
for i in 0..N_PACKETS {
list_mut.add(
PacketInfo {
seq_no: ((p * N_PACKETS + i) % u16::MAX as usize) as u16,
rtp_ts: (p * N_PACKETS + i) as u32,
payload_len: 8,
}
.generate_buffer(),
);
}
lists.push(list);
}
let start = clock.time();
for list in lists {
push_pad.push_list(list).unwrap();
}
let pushed = clock.time();
let mut inner = h.lock().unwrap();
for p in 0..N_PUSHES {
for i in 0..N_PACKETS {
let buffer = inner.pull().unwrap();
let mapped = buffer.map_readable().unwrap();
let rtp = rtp_types::RtpPacket::parse(&mapped).unwrap();
assert_eq!(rtp.sequence_number(), ((p * N_PACKETS + i) % u16::MAX as usize) as u16);
}
}
let end = clock.time();
drop(inner);
let test_time = end.opt_sub(start);
let pull_time = end.opt_sub(pushed);
let push_time = pushed.opt_sub(start);
println!(
"test took {} (push {}, pull {})",
test_time.display(),
push_time.display(),
pull_time.display()
);
}