// Copyright (C) 2018 Sebastian Dröge // // This library is free software; you can redistribute it and/or // modify it under the terms of the GNU Library General Public // License as published by the Free Software Foundation; either // version 2 of the License, or (at your option) any later version. // // This library is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU // Library General Public License for more details. // // You should have received a copy of the GNU Library General Public // License along with this library; if not, write to the // Free Software Foundation, Inc., 51 Franklin Street, Suite 500, // Boston, MA 02110-1335, USA. // // SPDX-License-Identifier: LGPL-2.1-or-later use futures::future::BoxFuture; use futures::prelude::*; use gst::glib; use gst::prelude::*; use gst::subclass::prelude::*; use gst_net::*; use once_cell::sync::Lazy; use std::i32; use std::io; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket}; use std::sync::Mutex; use std::time::Duration; use std::u16; use crate::runtime::prelude::*; use crate::runtime::{Async, Context, PadSrc, PadSrcRef, Task}; use crate::socket::{wrap_socket, GioSocketWrapper, Socket, SocketError, SocketRead}; const DEFAULT_ADDRESS: Option<&str> = Some("0.0.0.0"); const DEFAULT_PORT: i32 = 5000; const DEFAULT_REUSE: bool = true; const DEFAULT_CAPS: Option = None; const DEFAULT_MTU: u32 = 1492; const DEFAULT_SOCKET: Option = None; const DEFAULT_USED_SOCKET: Option = None; const DEFAULT_CONTEXT: &str = ""; const DEFAULT_CONTEXT_WAIT: Duration = Duration::ZERO; const DEFAULT_RETRIEVE_SENDER_ADDRESS: bool = true; #[derive(Debug, Clone)] struct Settings { address: Option, port: i32, // for conformity with C based udpsrc reuse: bool, caps: Option, mtu: u32, socket: Option, used_socket: Option, context: String, context_wait: Duration, retrieve_sender_address: bool, } impl Default for Settings { fn default() -> Self { Settings { address: DEFAULT_ADDRESS.map(Into::into), port: DEFAULT_PORT, reuse: DEFAULT_REUSE, caps: DEFAULT_CAPS, mtu: DEFAULT_MTU, socket: DEFAULT_SOCKET, used_socket: DEFAULT_USED_SOCKET, context: DEFAULT_CONTEXT.into(), context_wait: DEFAULT_CONTEXT_WAIT, retrieve_sender_address: DEFAULT_RETRIEVE_SENDER_ADDRESS, } } } #[derive(Debug)] struct UdpReader(Async); impl UdpReader { fn new(socket: Async) -> Self { UdpReader(socket) } } impl SocketRead for UdpReader { const DO_TIMESTAMP: bool = true; fn read<'buf>( &'buf mut self, buffer: &'buf mut [u8], ) -> BoxFuture<'buf, io::Result<(usize, Option)>> { async move { self.0 .recv_from(buffer) .await .map(|(read_size, saddr)| (read_size, Some(saddr))) } .boxed() } } #[derive(Clone, Debug)] struct UdpSrcPadHandler; impl PadSrcHandler for UdpSrcPadHandler { type ElementImpl = UdpSrc; fn src_event( &self, pad: &PadSrcRef, udpsrc: &UdpSrc, _element: &gst::Element, event: gst::Event, ) -> bool { use gst::EventView; gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event); let ret = match event.view() { EventView::FlushStart(..) => udpsrc.task.flush_start().is_ok(), EventView::FlushStop(..) => udpsrc.task.flush_stop().is_ok(), EventView::Reconfigure(..) => true, EventView::Latency(..) => true, _ => false, }; if ret { gst::log!(CAT, obj: pad.gst_pad(), "Handled {:?}", event); } else { gst::log!(CAT, obj: pad.gst_pad(), "Didn't handle {:?}", event); } ret } fn src_query( &self, pad: &PadSrcRef, udpsrc: &UdpSrc, _element: &gst::Element, query: &mut gst::QueryRef, ) -> bool { use gst::QueryViewMut; gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", query); let ret = match query.view_mut() { QueryViewMut::Latency(q) => { q.set(true, gst::ClockTime::ZERO, gst::ClockTime::NONE); true } QueryViewMut::Scheduling(q) => { q.set(gst::SchedulingFlags::SEQUENTIAL, 1, -1, 0); q.add_scheduling_modes(&[gst::PadMode::Push]); true } QueryViewMut::Caps(q) => { let caps = if let Some(caps) = udpsrc.configured_caps.lock().unwrap().as_ref() { q.filter() .map(|f| f.intersect_with_mode(caps, gst::CapsIntersectMode::First)) .unwrap_or_else(|| caps.clone()) } else { q.filter() .map(|f| f.to_owned()) .unwrap_or_else(gst::Caps::new_any) }; q.set_result(&caps); true } _ => false, }; if ret { gst::log!(CAT, obj: pad.gst_pad(), "Handled {:?}", query); } else { gst::log!(CAT, obj: pad.gst_pad(), "Didn't handle {:?}", query); } ret } } struct UdpSrcTask { element: super::UdpSrc, socket: Option>, retrieve_sender_address: bool, need_initial_events: bool, need_segment: bool, } impl UdpSrcTask { fn new(element: super::UdpSrc) -> Self { UdpSrcTask { element, socket: None, retrieve_sender_address: DEFAULT_RETRIEVE_SENDER_ADDRESS, need_initial_events: true, need_segment: true, } } async fn push_buffer( &mut self, buffer: gst::Buffer, ) -> Result { gst::log!(CAT, obj: &self.element, "Handling {:?}", buffer); let udpsrc = self.element.imp(); if self.need_initial_events { gst::debug!(CAT, obj: &self.element, "Pushing initial events"); let stream_id = format!("{:08x}{:08x}", rand::random::(), rand::random::()); let stream_start_evt = gst::event::StreamStart::builder(&stream_id) .group_id(gst::GroupId::next()) .build(); udpsrc.src_pad.push_event(stream_start_evt).await; let caps = udpsrc.settings.lock().unwrap().caps.clone(); if let Some(caps) = caps { udpsrc .src_pad .push_event(gst::event::Caps::new(&caps)) .await; *udpsrc.configured_caps.lock().unwrap() = Some(caps); } self.need_initial_events = false; } if self.need_segment { let segment_evt = gst::event::Segment::new(&gst::FormattedSegment::::new()); udpsrc.src_pad.push_event(segment_evt).await; self.need_segment = false; } let res = udpsrc.src_pad.push(buffer).await; match res { Ok(_) => gst::log!(CAT, obj: &self.element, "Successfully pushed buffer"), Err(gst::FlowError::Flushing) => gst::debug!(CAT, obj: &self.element, "Flushing"), Err(gst::FlowError::Eos) => { gst::debug!(CAT, obj: &self.element, "EOS"); udpsrc.src_pad.push_event(gst::event::Eos::new()).await; } Err(err) => { gst::error!(CAT, obj: &self.element, "Got error {}", err); gst::element_error!( self.element, gst::StreamError::Failed, ("Internal data stream error"), ["streaming stopped, reason {}", err] ); } } res } } impl TaskImpl for UdpSrcTask { fn prepare(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { async move { let udpsrc = self.element.imp(); let mut settings = udpsrc.settings.lock().unwrap(); gst::debug!(CAT, obj: &self.element, "Preparing Task"); self.retrieve_sender_address = settings.retrieve_sender_address; let socket = if let Some(ref wrapped_socket) = settings.socket { let socket: UdpSocket; #[cfg(unix)] { socket = wrapped_socket.get() } #[cfg(windows)] { socket = wrapped_socket.get() } let socket = Async::::try_from(socket).map_err(|err| { gst::error_msg!( gst::ResourceError::OpenRead, ["Failed to setup Async socket: {}", err] ) })?; settings.used_socket = Some(wrapped_socket.clone()); socket } else { let addr: IpAddr = match settings.address { None => { return Err(gst::error_msg!( gst::ResourceError::Settings, ["No address set"] )); } Some(ref addr) => match addr.parse() { Err(err) => { return Err(gst::error_msg!( gst::ResourceError::Settings, ["Invalid address '{}' set: {}", addr, err] )); } Ok(addr) => addr, }, }; let port = settings.port; // TODO: TTL, multicast loopback, etc let saddr = if addr.is_multicast() { let bind_addr = if addr.is_ipv4() { IpAddr::V4(Ipv4Addr::UNSPECIFIED) } else { IpAddr::V6(Ipv6Addr::UNSPECIFIED) }; let saddr = SocketAddr::new(bind_addr, port as u16); gst::debug!( CAT, obj: &self.element, "Binding to {:?} for multicast group {:?}", saddr, addr ); saddr } else { let saddr = SocketAddr::new(addr, port as u16); gst::debug!(CAT, obj: &self.element, "Binding to {:?}", saddr); saddr }; let socket = if addr.is_ipv4() { socket2::Socket::new( socket2::Domain::IPV4, socket2::Type::DGRAM, Some(socket2::Protocol::UDP), ) } else { socket2::Socket::new( socket2::Domain::IPV6, socket2::Type::DGRAM, Some(socket2::Protocol::UDP), ) } .map_err(|err| { gst::error_msg!( gst::ResourceError::OpenRead, ["Failed to create socket: {}", err] ) })?; socket.set_reuse_address(settings.reuse).map_err(|err| { gst::error_msg!( gst::ResourceError::OpenRead, ["Failed to set reuse_address: {}", err] ) })?; #[cfg(unix)] { socket.set_reuse_port(settings.reuse).map_err(|err| { gst::error_msg!( gst::ResourceError::OpenRead, ["Failed to set reuse_port: {}", err] ) })?; } socket.bind(&saddr.into()).map_err(|err| { gst::error_msg!( gst::ResourceError::OpenRead, ["Failed to bind socket: {}", err] ) })?; let socket = Async::::try_from(socket).map_err(|err| { gst::error_msg!( gst::ResourceError::OpenRead, ["Failed to setup Async socket: {}", err] ) })?; if addr.is_multicast() { // TODO: Multicast interface configuration, going to be tricky match addr { IpAddr::V4(addr) => { socket .as_ref() .join_multicast_v4(&addr, &Ipv4Addr::new(0, 0, 0, 0)) .map_err(|err| { gst::error_msg!( gst::ResourceError::OpenRead, ["Failed to join multicast group: {}", err] ) })?; } IpAddr::V6(addr) => { socket.as_ref().join_multicast_v6(&addr, 0).map_err(|err| { gst::error_msg!( gst::ResourceError::OpenRead, ["Failed to join multicast group: {}", err] ) })?; } } } settings.used_socket = Some(wrap_socket(&socket)?); socket }; let port: i32 = socket.as_ref().local_addr().unwrap().port().into(); if settings.port != port { settings.port = port; drop(settings); self.element.notify("port"); settings = udpsrc.settings.lock().unwrap(); }; let buffer_pool = gst::BufferPool::new(); let mut config = buffer_pool.config(); config.set_params(None, settings.mtu, 0, 0); buffer_pool.set_config(config).map_err(|err| { gst::error_msg!( gst::ResourceError::Settings, ["Failed to configure buffer pool {:?}", err] ) })?; self.socket = Some( Socket::try_new( self.element.clone().upcast(), buffer_pool, UdpReader::new(socket), ) .map_err(|err| { gst::error_msg!( gst::ResourceError::OpenRead, ["Failed to prepare socket {:?}", err] ) })?, ); self.element.notify("used-socket"); Ok(()) } .boxed() } fn unprepare(&mut self) -> BoxFuture<'_, ()> { async move { gst::debug!(CAT, obj: &self.element, "Unpreparing Task"); let udpsrc = self.element.imp(); udpsrc.settings.lock().unwrap().used_socket = None; self.element.notify("used-socket"); } .boxed() } fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { async move { gst::log!(CAT, obj: &self.element, "Starting task"); self.socket .as_mut() .unwrap() .set_clock(self.element.clock(), self.element.base_time()); gst::log!(CAT, obj: &self.element, "Task started"); Ok(()) } .boxed() } fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> { async move { let item = self.socket.as_mut().unwrap().next().await.ok_or_else(|| { gst::log!(CAT, obj: &self.element, "SocketStream Stopped"); gst::FlowError::Flushing })?; let (mut buffer, saddr) = item.map_err(|err| { gst::error!(CAT, obj: &self.element, "Got error {:?}", err); match err { SocketError::Gst(err) => { gst::element_error!( self.element, gst::StreamError::Failed, ("Internal data stream error"), ["streaming stopped, reason {}", err] ); } SocketError::Io(err) => { gst::element_error!( self.element, gst::StreamError::Failed, ("I/O error"), ["streaming stopped, I/O error {}", err] ); } } gst::FlowError::Error })?; if let Some(saddr) = saddr { if self.retrieve_sender_address { NetAddressMeta::add( buffer.get_mut().unwrap(), &gio::InetSocketAddress::from(saddr), ); } } self.push_buffer(buffer).await.map(drop) } .boxed() } fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { async move { gst::log!(CAT, obj: &self.element, "Stopping task"); self.need_initial_events = true; self.need_segment = true; gst::log!(CAT, obj: &self.element, "Task stopped"); Ok(()) } .boxed() } fn flush_stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { async move { gst::log!(CAT, obj: &self.element, "Stopping task flush"); self.need_segment = true; gst::log!(CAT, obj: &self.element, "Stopped task flush"); Ok(()) } .boxed() } } pub struct UdpSrc { src_pad: PadSrc, task: Task, configured_caps: Mutex>, settings: Mutex, } static CAT: Lazy = Lazy::new(|| { gst::DebugCategory::new( "ts-udpsrc", gst::DebugColorFlags::empty(), Some("Thread-sharing UDP source"), ) }); impl UdpSrc { fn prepare(&self, element: &super::UdpSrc) -> Result<(), gst::ErrorMessage> { gst::debug!(CAT, obj: element, "Preparing"); let settings = self.settings.lock().unwrap(); let context = Context::acquire(&settings.context, settings.context_wait).map_err(|err| { gst::error_msg!( gst::ResourceError::OpenRead, ["Failed to acquire Context: {}", err] ) })?; drop(settings); *self.configured_caps.lock().unwrap() = None; self.task .prepare(UdpSrcTask::new(element.clone()), context) .map_err(|err| { gst::error_msg!( gst::ResourceError::OpenRead, ["Error preparing Task: {:?}", err] ) })?; gst::debug!(CAT, obj: element, "Prepared"); Ok(()) } fn unprepare(&self, element: &super::UdpSrc) { gst::debug!(CAT, obj: element, "Unpreparing"); self.task.unprepare().unwrap(); gst::debug!(CAT, obj: element, "Unprepared"); } fn stop(&self, element: &super::UdpSrc) -> Result<(), gst::ErrorMessage> { gst::debug!(CAT, obj: element, "Stopping"); self.task.stop()?; gst::debug!(CAT, obj: element, "Stopped"); Ok(()) } fn start(&self, element: &super::UdpSrc) -> Result<(), gst::ErrorMessage> { gst::debug!(CAT, obj: element, "Starting"); self.task.start()?; gst::debug!(CAT, obj: element, "Started"); Ok(()) } fn pause(&self, element: &super::UdpSrc) -> Result<(), gst::ErrorMessage> { gst::debug!(CAT, obj: element, "Pausing"); self.task.pause()?; gst::debug!(CAT, obj: element, "Paused"); Ok(()) } } #[glib::object_subclass] impl ObjectSubclass for UdpSrc { const NAME: &'static str = "RsTsUdpSrc"; type Type = super::UdpSrc; type ParentType = gst::Element; fn with_class(klass: &Self::Class) -> Self { Self { src_pad: PadSrc::new( gst::Pad::from_template(&klass.pad_template("src").unwrap(), Some("src")), UdpSrcPadHandler, ), task: Task::default(), configured_caps: Default::default(), settings: Default::default(), } } } impl ObjectImpl for UdpSrc { fn properties() -> &'static [glib::ParamSpec] { static PROPERTIES: Lazy> = Lazy::new(|| { let mut properties = vec![ glib::ParamSpecString::new( "context", "Context", "Context name to share threads with", Some(DEFAULT_CONTEXT), glib::ParamFlags::READWRITE, ), glib::ParamSpecUInt::new( "context-wait", "Context Wait", "Throttle poll loop to run at most once every this many ms", 0, 1000, DEFAULT_CONTEXT_WAIT.as_millis() as u32, glib::ParamFlags::READWRITE, ), glib::ParamSpecString::new( "address", "Address", "Address/multicast group to listen on", DEFAULT_ADDRESS, glib::ParamFlags::READWRITE, ), glib::ParamSpecInt::new( "port", "Port", "Port to listen on", 0, u16::MAX as i32, DEFAULT_PORT, glib::ParamFlags::READWRITE, ), glib::ParamSpecBoolean::new( "reuse", "Reuse", "Allow reuse of the port", DEFAULT_REUSE, glib::ParamFlags::READWRITE, ), glib::ParamSpecBoxed::new( "caps", "Caps", "Caps to use", gst::Caps::static_type(), glib::ParamFlags::READWRITE, ), glib::ParamSpecUInt::new( "mtu", "MTU", "Maximum expected packet size. This directly defines the allocation size of the receive buffer pool", 0, i32::MAX as u32, DEFAULT_MTU, glib::ParamFlags::READWRITE, ), glib::ParamSpecBoolean::new( "retrieve-sender-address", "Retrieve sender address", "Whether to retrieve the sender address and add it to buffers as meta. Disabling this might result in minor performance improvements in certain scenarios", DEFAULT_RETRIEVE_SENDER_ADDRESS, glib::ParamFlags::READWRITE, ), ]; #[cfg(not(windows))] { properties.push(glib::ParamSpecObject::new( "socket", "Socket", "Socket to use for UDP reception. (None == allocate)", gio::Socket::static_type(), glib::ParamFlags::READWRITE, )); properties.push(glib::ParamSpecObject::new( "used-socket", "Used Socket", "Socket currently in use for UDP reception. (None = no socket)", gio::Socket::static_type(), glib::ParamFlags::READABLE, )); } properties }); PROPERTIES.as_ref() } fn set_property( &self, _obj: &Self::Type, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec, ) { let mut settings = self.settings.lock().unwrap(); match pspec.name() { "address" => { settings.address = value.get().expect("type checked upstream"); } "port" => { settings.port = value.get().expect("type checked upstream"); } "reuse" => { settings.reuse = value.get().expect("type checked upstream"); } "caps" => { settings.caps = value.get().expect("type checked upstream"); } "mtu" => { settings.mtu = value.get().expect("type checked upstream"); } "socket" => { settings.socket = value .get::>() .expect("type checked upstream") .map(|socket| GioSocketWrapper::new(&socket)); } "used-socket" => { unreachable!(); } "context" => { settings.context = value .get::>() .expect("type checked upstream") .unwrap_or_else(|| DEFAULT_CONTEXT.into()); } "context-wait" => { settings.context_wait = Duration::from_millis( value.get::().expect("type checked upstream").into(), ); } "retrieve-sender-address" => { settings.retrieve_sender_address = value.get().expect("type checked upstream"); } _ => unimplemented!(), } } fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { let settings = self.settings.lock().unwrap(); match pspec.name() { "address" => settings.address.to_value(), "port" => settings.port.to_value(), "reuse" => settings.reuse.to_value(), "caps" => settings.caps.to_value(), "mtu" => settings.mtu.to_value(), "socket" => settings .socket .as_ref() .map(GioSocketWrapper::as_socket) .to_value(), "used-socket" => settings .used_socket .as_ref() .map(GioSocketWrapper::as_socket) .to_value(), "context" => settings.context.to_value(), "context-wait" => (settings.context_wait.as_millis() as u32).to_value(), "retrieve-sender-address" => settings.retrieve_sender_address.to_value(), _ => unimplemented!(), } } fn constructed(&self, obj: &Self::Type) { self.parent_constructed(obj); obj.add_pad(self.src_pad.gst_pad()).unwrap(); crate::set_element_flags(obj, gst::ElementFlags::SOURCE); } } impl GstObjectImpl for UdpSrc {} impl ElementImpl for UdpSrc { fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { static ELEMENT_METADATA: Lazy = Lazy::new(|| { gst::subclass::ElementMetadata::new( "Thread-sharing UDP source", "Source/Network", "Receives data over the network via UDP", "Sebastian Dröge ", ) }); Some(&*ELEMENT_METADATA) } fn pad_templates() -> &'static [gst::PadTemplate] { static PAD_TEMPLATES: Lazy> = Lazy::new(|| { let caps = gst::Caps::new_any(); let src_pad_template = gst::PadTemplate::new( "src", gst::PadDirection::Src, gst::PadPresence::Always, &caps, ) .unwrap(); vec![src_pad_template] }); PAD_TEMPLATES.as_ref() } fn change_state( &self, element: &Self::Type, transition: gst::StateChange, ) -> Result { gst::trace!(CAT, obj: element, "Changing state {:?}", transition); match transition { gst::StateChange::NullToReady => { self.prepare(element).map_err(|err| { element.post_error_message(err); gst::StateChangeError })?; } gst::StateChange::PlayingToPaused => { self.pause(element).map_err(|_| gst::StateChangeError)?; } gst::StateChange::ReadyToNull => { self.unprepare(element); } _ => (), } let mut success = self.parent_change_state(element, transition)?; match transition { gst::StateChange::ReadyToPaused => { success = gst::StateChangeSuccess::NoPreroll; } gst::StateChange::PausedToPlaying => { self.start(element).map_err(|_| gst::StateChangeError)?; } gst::StateChange::PlayingToPaused => { success = gst::StateChangeSuccess::NoPreroll; } gst::StateChange::PausedToReady => { self.stop(element).map_err(|_| gst::StateChangeError)?; } _ => (), } Ok(success) } }