threadshare/jitterbuffer: Use GSlice allocator for the jitter buffer items

They're actually used as a hackish intrusive list around GQueue so using
a different allocator is rather dangerous. Or rather, this is generally
dangerous and shouldn't be done but ...

Also make sure to free items manually in finalize() so that any
contained buffers can also be unreffed.
This commit is contained in:
Sebastian Dröge 2020-04-22 13:36:09 +03:00
parent 15d863a26c
commit d08268627e
2 changed files with 77 additions and 38 deletions

View file

@ -18,6 +18,7 @@
use glib_sys as glib_ffi;
use gstreamer_sys as gst_ffi;
use std::ptr;
use std::u32;
#[allow(clippy::module_inception)]
@ -129,7 +130,6 @@ use glib::translate::*;
use glib::{glib_object_wrapper, glib_wrapper};
use std::mem;
use std::ptr;
glib_wrapper! {
pub struct RTPJitterBuffer(Object<ffi::RTPJitterBuffer, RTPJitterBufferClass>);
@ -171,7 +171,7 @@ impl FromGlib<ffi::RTPJitterBufferMode> for RTPJitterBufferMode {
}
}
pub struct RTPJitterBufferItem(Option<Box<ffi::RTPJitterBufferItem>>);
pub struct RTPJitterBufferItem(Option<ptr::NonNull<ffi::RTPJitterBufferItem>>);
unsafe impl Send for RTPJitterBufferItem {}
@ -184,59 +184,80 @@ impl RTPJitterBufferItem {
rtptime: u32,
) -> RTPJitterBufferItem {
unsafe {
RTPJitterBufferItem(Some(Box::new(ffi::RTPJitterBufferItem {
data: buffer.into_ptr() as *mut _,
next: ptr::null_mut(),
prev: ptr::null_mut(),
r#type: 0,
dts: dts.to_glib(),
pts: pts.to_glib(),
seqnum: seqnum.map(|s| s as u32).unwrap_or(u32::MAX),
count: 1,
rtptime,
})))
let ptr = ptr::NonNull::new(glib_sys::g_slice_alloc0(mem::size_of::<
ffi::RTPJitterBufferItem,
>()) as *mut ffi::RTPJitterBufferItem)
.expect("Allocation failed");
ptr::write(
ptr.as_ptr(),
ffi::RTPJitterBufferItem {
data: buffer.into_ptr() as *mut _,
next: ptr::null_mut(),
prev: ptr::null_mut(),
r#type: 0,
dts: dts.to_glib(),
pts: pts.to_glib(),
seqnum: seqnum.map(|s| s as u32).unwrap_or(u32::MAX),
count: 1,
rtptime,
},
);
RTPJitterBufferItem(Some(ptr))
}
}
pub fn into_buffer(mut self) -> gst::Buffer {
unsafe {
let item = self.0.take().expect("Invalid wrapper");
let buf = item.data as *mut gst_ffi::GstBuffer;
let buf = item.as_ref().data as *mut gst_ffi::GstBuffer;
glib_sys::g_slice_free1(
mem::size_of::<ffi::RTPJitterBufferItem>(),
item.as_ptr() as *mut _,
);
from_glib_full(buf)
}
}
pub fn get_dts(&self) -> gst::ClockTime {
let item = self.0.as_ref().expect("Invalid wrapper");
if item.dts == gst_ffi::GST_CLOCK_TIME_NONE {
gst::CLOCK_TIME_NONE
} else {
gst::ClockTime(Some(item.dts))
unsafe {
let item = self.0.as_ref().expect("Invalid wrapper");
if item.as_ref().dts == gst_ffi::GST_CLOCK_TIME_NONE {
gst::CLOCK_TIME_NONE
} else {
gst::ClockTime(Some(item.as_ref().dts))
}
}
}
pub fn get_pts(&self) -> gst::ClockTime {
let item = self.0.as_ref().expect("Invalid wrapper");
if item.pts == gst_ffi::GST_CLOCK_TIME_NONE {
gst::CLOCK_TIME_NONE
} else {
gst::ClockTime(Some(item.pts))
unsafe {
let item = self.0.as_ref().expect("Invalid wrapper");
if item.as_ref().pts == gst_ffi::GST_CLOCK_TIME_NONE {
gst::CLOCK_TIME_NONE
} else {
gst::ClockTime(Some(item.as_ref().pts))
}
}
}
pub fn get_seqnum(&self) -> Option<u16> {
let item = self.0.as_ref().expect("Invalid wrapper");
if item.seqnum == u32::MAX {
None
} else {
Some(item.seqnum as u16)
unsafe {
let item = self.0.as_ref().expect("Invalid wrapper");
if item.as_ref().seqnum == u32::MAX {
None
} else {
Some(item.as_ref().seqnum as u16)
}
}
}
#[allow(dead_code)]
pub fn get_rtptime(&self) -> u32 {
let item = self.0.as_ref().expect("Invalid wrapper");
item.rtptime
unsafe {
let item = self.0.as_ref().expect("Invalid wrapper");
item.as_ref().rtptime
}
}
}
@ -244,7 +265,14 @@ impl Drop for RTPJitterBufferItem {
fn drop(&mut self) {
unsafe {
if let Some(ref item) = self.0 {
gst_ffi::gst_mini_object_unref(item.data as *mut _)
if !item.as_ref().data.is_null() {
gst_ffi::gst_mini_object_unref(item.as_ref().data as *mut _);
}
glib_sys::g_slice_free1(
mem::size_of::<ffi::RTPJitterBufferItem>(),
item.as_ptr() as *mut _,
);
}
}
}
@ -361,16 +389,15 @@ impl RTPJitterBuffer {
unsafe {
let mut head = mem::MaybeUninit::uninit();
let mut percent = mem::MaybeUninit::uninit();
let box_ = item.0.take().expect("Invalid wrapper");
let ptr = Box::into_raw(box_);
let ptr = item.0.take().expect("Invalid wrapper");
let ret: bool = from_glib(ffi::rtp_jitter_buffer_insert(
self.to_glib_none().0,
ptr,
ptr.as_ptr(),
head.as_mut_ptr(),
percent.as_mut_ptr(),
));
if !ret {
item.0 = Some(Box::from_raw(ptr));
item.0 = Some(ptr);
}
(ret, from_glib(head.assume_init()), percent.assume_init())
}
@ -412,7 +439,7 @@ impl RTPJitterBuffer {
if item.is_null() {
None
} else {
Some(RTPJitterBufferItem(Some(Box::from_raw(item))))
Some(RTPJitterBufferItem(Some(ptr::NonNull::new_unchecked(item))))
},
percent.assume_init(),
)
@ -438,7 +465,8 @@ impl RTPJitterBuffer {
pub fn flush(&self) {
unsafe extern "C" fn free_item(item: glib_ffi::gpointer, _: glib_ffi::gpointer) {
let _ = RTPJitterBufferItem(Some(Box::from_raw(item as *mut _)));
let _ =
RTPJitterBufferItem(Some(ptr::NonNull::new(item as *mut _).expect("NULL item")));
}
unsafe {

View file

@ -97,6 +97,7 @@ static void
rtp_jitter_buffer_finalize (GObject * object)
{
RTPJitterBuffer *jbuf;
RTPJitterBufferItem *item;
jbuf = RTP_JITTER_BUFFER_CAST (object);
@ -112,6 +113,16 @@ rtp_jitter_buffer_finalize (GObject * object)
if (jbuf->pipeline_clock)
gst_object_unref (jbuf->pipeline_clock);
/* Free any remaining items manually here. We can't let g_queue_free() take
* care of this because the items are not actually GList but
* RTPJitterBufferItem, and g_slice_free() explodes if the allocation is not
* from the expected block size.
*/
while ((item = (RTPJitterBufferItem *) g_queue_pop_head_link (jbuf->packets))) {
if (item->data)
gst_mini_object_unref (item->data);
g_slice_free (RTPJitterBufferItem, item);
}
g_queue_free (jbuf->packets);
g_mutex_clear (&jbuf->clock_lock);