threadshare: Move buffer handling into a separate function

This commit is contained in:
Sebastian Dröge 2018-03-15 21:21:42 +02:00
parent 613706d446
commit 212b00ef2f

View file

@ -287,6 +287,33 @@ impl UdpSrc {
ret ret
} }
fn push_buffer(&self, element: &Element, buffer: gst::Buffer) -> Result<(), gst::FlowError> {
let mut events = Vec::new();
let mut state = self.state.lock().unwrap();
if state.need_initial_events {
gst_debug!(self.cat, obj: element, "Pushing initial events");
// TODO: Invent a stream id
events.push(gst::Event::new_stream_start("meh").build());
if let Some(ref caps) = self.settings.lock().unwrap().caps {
events.push(gst::Event::new_caps(&caps).build());
state.configured_caps = Some(caps.clone());
}
events.push(
gst::Event::new_segment(&gst::FormattedSegment::<gst::format::Time>::new()).build(),
);
state.need_initial_events = false;
}
drop(state);
for event in events {
self.src_pad.push_event(event);
}
// TODO: Error handling
self.src_pad.push(buffer).into_result().map(|_| ())
}
fn prepare(&self, element: &Element) -> Result<(), ()> { fn prepare(&self, element: &Element) -> Result<(), ()> {
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
@ -363,34 +390,7 @@ impl UdpSrc {
let element_clone = element.clone(); let element_clone = element.clone();
socket.schedule(&io_context, move |buffer| { socket.schedule(&io_context, move |buffer| {
let udpsrc = element_clone.get_impl().downcast_ref::<UdpSrc>().unwrap(); let udpsrc = element_clone.get_impl().downcast_ref::<UdpSrc>().unwrap();
udpsrc.push_buffer(&element_clone, buffer)
let mut events = Vec::new();
let mut state = udpsrc.state.lock().unwrap();
if state.need_initial_events {
gst_debug!(udpsrc.cat, obj: &element_clone, "Pushing initial events");
// TODO: Invent a stream id
events.push(gst::Event::new_stream_start("meh").build());
if let Some(ref caps) = settings.caps {
events.push(gst::Event::new_caps(&caps).build());
state.configured_caps = Some(caps.clone());
}
events.push(
gst::Event::new_segment(&gst::FormattedSegment::<gst::format::Time>::new())
.build(),
);
state.need_initial_events = false;
}
drop(state);
for event in events {
udpsrc.src_pad.push_event(event);
}
// TODO: Error handling
udpsrc.src_pad.push(buffer).into_result().unwrap();
Ok(())
}); });
state.socket = Some(socket); state.socket = Some(socket);