2018-03-06 09:38:27 +00:00
// Copyright (C) 2018 Sebastian Dröge <sebastian@centricular.com>
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Library General Public
// License as published by the Free Software Foundation; either
// version 2 of the License, or (at your option) any later version.
//
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// Library General Public License for more details.
//
// You should have received a copy of the GNU Library General Public
// License along with this library; if not, write to the
// Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
// Boston, MA 02110-1335, USA.
use glib ;
use glib ::prelude ::* ;
2018-12-06 11:03:04 +00:00
use glib ::subclass ;
use glib ::subclass ::prelude ::* ;
2018-03-06 09:38:27 +00:00
use gst ;
use gst ::prelude ::* ;
2018-12-06 11:03:04 +00:00
use gst ::subclass ::prelude ::* ;
2019-03-13 16:07:53 +00:00
use gst_net ::* ;
2018-03-06 09:38:27 +00:00
2018-11-13 12:13:23 +00:00
use gio ;
use gio_ffi ;
use gobject_ffi ;
2018-07-06 07:19:59 +00:00
use std ::io ;
2018-03-15 18:52:38 +00:00
use std ::sync ::Mutex ;
2018-03-09 15:32:21 +00:00
use std ::u16 ;
2018-03-06 09:38:27 +00:00
2018-03-26 14:49:42 +00:00
use futures ;
use futures ::future ;
2019-03-13 16:07:53 +00:00
use futures ::{ Async , Future , Poll } ;
2018-03-08 20:26:34 +00:00
use tokio ::net ;
2018-03-16 17:11:53 +00:00
use either ::Either ;
use rand ;
2018-05-23 07:32:06 +00:00
use net2 ;
2018-11-13 12:13:23 +00:00
#[ cfg(unix) ]
use std ::os ::unix ::io ::{ AsRawFd , FromRawFd , IntoRawFd , RawFd } ;
#[ cfg(windows) ]
2018-12-19 17:15:06 +00:00
use std ::os ::windows ::io ::{ AsRawSocket , FromRawSocket , IntoRawSocket , RawSocket } ;
2018-11-13 12:13:23 +00:00
2018-03-15 18:52:38 +00:00
use iocontext ::* ;
2018-07-06 07:26:23 +00:00
use socket ::* ;
2018-03-08 20:26:34 +00:00
2019-02-21 18:12:09 +00:00
const DEFAULT_ADDRESS : Option < & str > = Some ( " 127.0.0.1 " ) ;
2018-03-08 20:26:34 +00:00
const DEFAULT_PORT : u32 = 5000 ;
2018-05-23 07:32:06 +00:00
const DEFAULT_REUSE : bool = true ;
2018-03-08 20:26:34 +00:00
const DEFAULT_CAPS : Option < gst ::Caps > = None ;
const DEFAULT_MTU : u32 = 1500 ;
2018-11-13 12:13:23 +00:00
const DEFAULT_SOCKET : Option < GioSocketWrapper > = None ;
const DEFAULT_USED_SOCKET : Option < GioSocketWrapper > = None ;
2019-02-21 18:12:09 +00:00
const DEFAULT_CONTEXT : & str = " " ;
2018-03-12 14:19:46 +00:00
const DEFAULT_CONTEXT_WAIT : u32 = 0 ;
2019-03-13 16:07:53 +00:00
const DEFAULT_RETRIEVE_SENDER_ADDRESS : bool = true ;
2018-03-08 20:26:34 +00:00
2018-11-13 12:13:23 +00:00
// Send/Sync struct for passing around a gio::Socket
// and getting the raw fd from it
//
// gio::Socket is not Send/Sync as it's generally unsafe
// to access it from multiple threads. Getting the underlying raw
// fd is safe though, as is receiving/sending from two different threads
#[ derive(Debug) ]
struct GioSocketWrapper {
socket : * mut gio_ffi ::GSocket ,
}
unsafe impl Send for GioSocketWrapper { }
unsafe impl Sync for GioSocketWrapper { }
impl GioSocketWrapper {
fn new ( socket : & gio ::Socket ) -> Self {
use glib ::translate ::* ;
Self {
socket : socket . to_glib_full ( ) ,
}
}
fn as_socket ( & self ) -> gio ::Socket {
unsafe {
use glib ::translate ::* ;
from_glib_none ( self . socket )
}
}
#[ cfg(unix) ]
fn get < T : FromRawFd > ( & self ) -> T {
2018-11-30 17:38:25 +00:00
unsafe { FromRawFd ::from_raw_fd ( libc ::dup ( gio_ffi ::g_socket_get_fd ( self . socket ) ) ) }
2018-11-13 12:13:23 +00:00
}
#[ cfg(windows) ]
fn get < T : FromRawSocket > ( & self ) -> T {
2018-11-30 17:38:25 +00:00
unsafe {
2018-12-19 17:15:06 +00:00
FromRawSocket ::from_raw_socket (
dup_socket ( gio_ffi ::g_socket_get_fd ( self . socket ) as _ ) as _
)
2018-11-30 17:38:25 +00:00
}
2018-11-13 12:13:23 +00:00
}
}
impl Clone for GioSocketWrapper {
fn clone ( & self ) -> Self {
Self {
socket : unsafe { gobject_ffi ::g_object_ref ( self . socket as * mut _ ) as * mut _ } ,
}
}
}
impl Drop for GioSocketWrapper {
fn drop ( & mut self ) {
unsafe {
gobject_ffi ::g_object_unref ( self . socket as * mut _ ) ;
}
}
}
2018-11-30 17:38:25 +00:00
#[ cfg(windows) ]
unsafe fn dup_socket ( socket : usize ) -> usize {
use std ::mem ;
use winapi ::shared ::ws2def ;
use winapi ::um ::processthreadsapi ;
use winapi ::um ::winsock2 ;
let mut proto_info = mem ::zeroed ( ) ;
let ret = winsock2 ::WSADuplicateSocketA (
socket ,
2018-12-19 17:15:06 +00:00
processthreadsapi ::GetCurrentProcessId ( ) ,
2018-11-30 17:38:25 +00:00
& mut proto_info ,
) ;
assert_eq! ( ret , 0 ) ;
2018-12-19 17:15:06 +00:00
let socket = winsock2 ::WSASocketA (
2018-11-30 17:38:25 +00:00
ws2def ::AF_INET ,
ws2def ::SOCK_DGRAM ,
2018-12-19 17:15:06 +00:00
ws2def ::IPPROTO_UDP as i32 ,
2018-11-30 17:38:25 +00:00
& mut proto_info ,
0 ,
0 ,
) ;
2018-12-19 17:15:06 +00:00
assert_ne! ( socket , winsock2 ::INVALID_SOCKET ) ;
socket
2018-11-30 17:38:25 +00:00
}
2018-03-08 20:26:34 +00:00
#[ derive(Debug, Clone) ]
struct Settings {
address : Option < String > ,
port : u32 ,
2018-05-23 07:32:06 +00:00
reuse : bool ,
2018-03-08 20:26:34 +00:00
caps : Option < gst ::Caps > ,
mtu : u32 ,
2018-11-13 12:13:23 +00:00
socket : Option < GioSocketWrapper > ,
used_socket : Option < GioSocketWrapper > ,
2018-03-08 20:26:34 +00:00
context : String ,
2018-03-12 14:19:46 +00:00
context_wait : u32 ,
2019-03-13 16:07:53 +00:00
retrieve_sender_address : bool ,
2018-03-08 20:26:34 +00:00
}
impl Default for Settings {
fn default ( ) -> Self {
Settings {
address : DEFAULT_ADDRESS . map ( Into ::into ) ,
port : DEFAULT_PORT ,
2018-05-23 07:32:06 +00:00
reuse : DEFAULT_REUSE ,
2018-03-08 20:26:34 +00:00
caps : DEFAULT_CAPS ,
mtu : DEFAULT_MTU ,
2018-11-13 12:13:23 +00:00
socket : DEFAULT_SOCKET ,
used_socket : DEFAULT_USED_SOCKET ,
2018-03-08 20:26:34 +00:00
context : DEFAULT_CONTEXT . into ( ) ,
2018-03-12 14:19:46 +00:00
context_wait : DEFAULT_CONTEXT_WAIT ,
2019-03-13 16:07:53 +00:00
retrieve_sender_address : DEFAULT_RETRIEVE_SENDER_ADDRESS ,
2018-03-08 20:26:34 +00:00
}
}
}
2019-03-13 16:07:53 +00:00
static PROPERTIES : [ subclass ::Property ; 10 ] = [
2018-12-18 09:23:45 +00:00
subclass ::Property ( " address " , | name | {
2018-12-06 11:03:04 +00:00
glib ::ParamSpec ::string (
2018-12-18 09:23:45 +00:00
name ,
2018-12-06 11:03:04 +00:00
" Address " ,
" Address/multicast group to listen on " ,
DEFAULT_ADDRESS ,
glib ::ParamFlags ::READWRITE ,
)
} ) ,
2018-12-18 09:23:45 +00:00
subclass ::Property ( " port " , | name | {
2018-12-06 11:03:04 +00:00
glib ::ParamSpec ::uint (
2018-12-18 09:23:45 +00:00
name ,
2018-12-06 11:03:04 +00:00
" Port " ,
" Port to listen on " ,
0 ,
u16 ::MAX as u32 ,
DEFAULT_PORT ,
glib ::ParamFlags ::READWRITE ,
)
} ) ,
2018-12-18 09:23:45 +00:00
subclass ::Property ( " reuse " , | name | {
2018-12-06 11:03:04 +00:00
glib ::ParamSpec ::boolean (
2018-12-18 09:23:45 +00:00
name ,
2018-12-06 11:03:04 +00:00
" Reuse " ,
" Allow reuse of the port " ,
DEFAULT_REUSE ,
glib ::ParamFlags ::READWRITE ,
)
} ) ,
2018-12-18 09:23:45 +00:00
subclass ::Property ( " caps " , | name | {
2018-12-06 11:03:04 +00:00
glib ::ParamSpec ::boxed (
2018-12-18 09:23:45 +00:00
name ,
2018-12-06 11:03:04 +00:00
" Caps " ,
" Caps to use " ,
gst ::Caps ::static_type ( ) ,
glib ::ParamFlags ::READWRITE ,
)
} ) ,
2018-12-18 09:23:45 +00:00
subclass ::Property ( " mtu " , | name | {
2018-12-06 11:03:04 +00:00
glib ::ParamSpec ::uint (
2018-12-18 09:23:45 +00:00
name ,
2018-12-06 11:03:04 +00:00
" MTU " ,
" MTU " ,
0 ,
u16 ::MAX as u32 ,
DEFAULT_MTU ,
glib ::ParamFlags ::READWRITE ,
)
} ) ,
2018-12-18 09:23:45 +00:00
subclass ::Property ( " socket " , | name | {
2018-12-06 11:03:04 +00:00
glib ::ParamSpec ::object (
2018-12-18 09:23:45 +00:00
name ,
2018-12-06 11:03:04 +00:00
" Socket " ,
" Socket to use for UDP reception. (None == allocate) " ,
gio ::Socket ::static_type ( ) ,
glib ::ParamFlags ::READWRITE ,
)
} ) ,
2018-12-18 09:23:45 +00:00
subclass ::Property ( " used-socket " , | name | {
2018-12-06 11:03:04 +00:00
glib ::ParamSpec ::object (
2018-12-18 09:23:45 +00:00
name ,
2018-12-06 11:03:04 +00:00
" Used Socket " ,
" Socket currently in use for UDP reception. (None = no socket) " ,
gio ::Socket ::static_type ( ) ,
glib ::ParamFlags ::READABLE ,
)
} ) ,
2018-12-18 09:23:45 +00:00
subclass ::Property ( " context " , | name | {
2018-12-06 11:03:04 +00:00
glib ::ParamSpec ::string (
2018-12-18 09:23:45 +00:00
name ,
2018-12-06 11:03:04 +00:00
" Context " ,
" Context name to share threads with " ,
Some ( DEFAULT_CONTEXT ) ,
glib ::ParamFlags ::READWRITE ,
)
} ) ,
2018-12-18 09:23:45 +00:00
subclass ::Property ( " context-wait " , | name | {
2018-12-06 11:03:04 +00:00
glib ::ParamSpec ::uint (
2018-12-18 09:23:45 +00:00
name ,
2018-12-06 11:03:04 +00:00
" Context Wait " ,
" Throttle poll loop to run at most once every this many ms " ,
0 ,
1000 ,
DEFAULT_CONTEXT_WAIT ,
glib ::ParamFlags ::READWRITE ,
)
} ) ,
2019-03-13 16:07:53 +00:00
subclass ::Property ( " retrieve-sender-address " , | name | {
glib ::ParamSpec ::boolean (
name ,
" Retrieve sender address " ,
" Whether to retrieve the sender address and add it to buffers as meta. Disabling this might result in minor performance improvements in certain scenarios " ,
DEFAULT_REUSE ,
glib ::ParamFlags ::READWRITE ,
)
} ) ,
2018-03-08 20:26:34 +00:00
] ;
2018-07-06 07:19:59 +00:00
pub struct UdpReader {
socket : net ::UdpSocket ,
}
impl UdpReader {
2018-11-30 17:38:25 +00:00
fn new ( socket : net ::UdpSocket ) -> Self {
Self { socket }
2018-07-06 07:19:59 +00:00
}
}
impl SocketRead for UdpReader {
const DO_TIMESTAMP : bool = true ;
2019-03-13 16:07:53 +00:00
fn poll_read (
& mut self ,
buf : & mut [ u8 ] ,
) -> Poll < ( usize , Option < std ::net ::SocketAddr > ) , io ::Error > {
match self . socket . poll_recv_from ( buf ) {
2019-07-04 15:30:26 +00:00
Ok ( Async ::Ready ( result ) ) = > Ok ( Async ::Ready ( ( result . 0 , Some ( result . 1 ) ) ) ) ,
Ok ( Async ::NotReady ) = > Ok ( Async ::NotReady ) ,
Err ( result ) = > Err ( result ) ,
}
2018-07-06 07:19:59 +00:00
}
}
2018-03-06 09:38:27 +00:00
struct State {
2018-03-08 20:26:34 +00:00
io_context : Option < IOContext > ,
2018-03-26 14:49:42 +00:00
pending_future_id : Option < PendingFutureId > ,
2018-07-06 07:19:59 +00:00
socket : Option < Socket < UdpReader > > ,
2018-03-08 20:26:34 +00:00
need_initial_events : bool ,
2018-03-15 19:17:01 +00:00
configured_caps : Option < gst ::Caps > ,
2018-03-26 14:49:42 +00:00
pending_future_cancel : Option < futures ::sync ::oneshot ::Sender < ( ) > > ,
2018-03-06 09:38:27 +00:00
}
impl Default for State {
fn default ( ) -> State {
2018-03-08 20:26:34 +00:00
State {
io_context : None ,
2018-03-26 14:49:42 +00:00
pending_future_id : None ,
2018-03-08 20:26:34 +00:00
socket : None ,
need_initial_events : true ,
2018-03-15 19:17:01 +00:00
configured_caps : None ,
2018-03-26 14:49:42 +00:00
pending_future_cancel : None ,
2018-03-08 20:26:34 +00:00
}
2018-03-06 09:38:27 +00:00
}
}
struct UdpSrc {
cat : gst ::DebugCategory ,
src_pad : gst ::Pad ,
state : Mutex < State > ,
2018-03-08 20:26:34 +00:00
settings : Mutex < Settings > ,
2018-03-06 09:38:27 +00:00
}
impl UdpSrc {
2018-12-06 11:03:04 +00:00
fn src_event ( & self , pad : & gst ::Pad , element : & gst ::Element , event : gst ::Event ) -> bool {
2018-03-06 09:38:27 +00:00
use gst ::EventView ;
gst_log! ( self . cat , obj : pad , " Handling event {:?} " , event ) ;
2018-03-15 19:17:01 +00:00
let ret = match event . view ( ) {
EventView ::FlushStart ( .. ) = > {
let _ = self . stop ( element ) ;
true
}
EventView ::FlushStop ( .. ) = > {
2019-01-11 23:45:05 +00:00
let ( res , state , pending ) = element . get_state ( 0. into ( ) ) ;
if res = = Ok ( gst ::StateChangeSuccess ::Success ) & & state = = gst ::State ::Playing
| | res = = Ok ( gst ::StateChangeSuccess ::Async ) & & pending = = gst ::State ::Playing
2018-03-15 19:17:01 +00:00
{
let _ = self . start ( element ) ;
}
true
}
EventView ::Reconfigure ( .. ) = > true ,
EventView ::Latency ( .. ) = > true ,
_ = > false ,
} ;
2018-03-06 09:38:27 +00:00
2018-03-15 19:17:01 +00:00
if ret {
2018-03-06 09:38:27 +00:00
gst_log! ( self . cat , obj : pad , " Handled event {:?} " , event ) ;
} else {
gst_log! ( self . cat , obj : pad , " Didn't handle event {:?} " , event ) ;
}
2018-03-15 19:17:01 +00:00
ret
2018-03-06 09:38:27 +00:00
}
2018-12-06 11:03:04 +00:00
fn src_query (
& self ,
pad : & gst ::Pad ,
_element : & gst ::Element ,
query : & mut gst ::QueryRef ,
) -> bool {
2018-03-06 09:38:27 +00:00
use gst ::QueryView ;
gst_log! ( self . cat , obj : pad , " Handling query {:?} " , query ) ;
2018-03-15 19:17:01 +00:00
let ret = match query . view_mut ( ) {
QueryView ::Latency ( ref mut q ) = > {
q . set ( true , 0. into ( ) , 0. into ( ) ) ;
true
}
QueryView ::Scheduling ( ref mut q ) = > {
q . set ( gst ::SchedulingFlags ::SEQUENTIAL , 1 , - 1 , 0 ) ;
q . add_scheduling_modes ( & [ gst ::PadMode ::Push ] ) ;
true
}
QueryView ::Caps ( ref mut q ) = > {
let state = self . state . lock ( ) . unwrap ( ) ;
let caps = if let Some ( ref caps ) = state . configured_caps {
q . get_filter ( )
. map ( | f | f . intersect_with_mode ( caps , gst ::CapsIntersectMode ::First ) )
2019-02-21 18:12:09 +00:00
. unwrap_or_else ( | | caps . clone ( ) )
2018-03-15 19:17:01 +00:00
} else {
q . get_filter ( )
. map ( | f | f . to_owned ( ) )
2019-07-04 15:30:26 +00:00
. unwrap_or_else ( gst ::Caps ::new_any )
2018-03-15 19:17:01 +00:00
} ;
q . set_result ( & caps ) ;
true
}
_ = > false ,
2018-03-06 09:38:27 +00:00
} ;
2018-03-15 19:17:01 +00:00
if ret {
gst_log! ( self . cat , obj : pad , " Handled query {:?} " , query ) ;
} else {
gst_log! ( self . cat , obj : pad , " Didn't handle query {:?} " , query ) ;
}
ret
2018-03-06 09:38:27 +00:00
}
2018-03-08 20:26:34 +00:00
2018-03-26 14:49:42 +00:00
fn create_io_context_event ( state : & State ) -> Option < gst ::Event > {
if let ( & Some ( ref pending_future_id ) , & Some ( ref io_context ) ) =
( & state . pending_future_id , & state . io_context )
{
let s = gst ::Structure ::new (
" ts-io-context " ,
& [
2018-11-29 19:01:02 +00:00
( " io-context " , & io_context ) ,
( " pending-future-id " , & * pending_future_id ) ,
2018-03-26 14:49:42 +00:00
] ,
) ;
Some ( gst ::Event ::new_custom_downstream_sticky ( s ) . build ( ) )
} else {
None
}
}
fn push_buffer (
& self ,
2018-12-06 11:03:04 +00:00
element : & gst ::Element ,
2018-03-26 14:49:42 +00:00
buffer : gst ::Buffer ,
) -> future ::Either <
2019-06-06 06:26:02 +00:00
Box < dyn Future < Item = ( ) , Error = gst ::FlowError > + Send + 'static > ,
2018-03-26 14:49:42 +00:00
future ::FutureResult < ( ) , gst ::FlowError > ,
> {
2018-03-15 19:21:42 +00:00
let mut events = Vec ::new ( ) ;
let mut state = self . state . lock ( ) . unwrap ( ) ;
if state . need_initial_events {
gst_debug! ( self . cat , obj : element , " Pushing initial events " ) ;
2018-03-16 17:11:53 +00:00
let stream_id = format! ( " {:08x} {:08x} " , rand ::random ::< u32 > ( ) , rand ::random ::< u32 > ( ) ) ;
events . push ( gst ::Event ::new_stream_start ( & stream_id ) . build ( ) ) ;
2018-03-15 19:21:42 +00:00
if let Some ( ref caps ) = self . settings . lock ( ) . unwrap ( ) . caps {
events . push ( gst ::Event ::new_caps ( & caps ) . build ( ) ) ;
state . configured_caps = Some ( caps . clone ( ) ) ;
}
events . push (
gst ::Event ::new_segment ( & gst ::FormattedSegment ::< gst ::format ::Time > ::new ( ) ) . build ( ) ,
) ;
2018-03-26 14:49:42 +00:00
2018-05-16 14:34:06 +00:00
if let Some ( event ) = Self ::create_io_context_event ( & state ) {
events . push ( event ) ;
// Get rid of reconfigure flag
self . src_pad . check_reconfigure ( ) ;
}
2018-03-15 19:21:42 +00:00
state . need_initial_events = false ;
2018-03-26 14:49:42 +00:00
} else if self . src_pad . check_reconfigure ( ) {
if let Some ( event ) = Self ::create_io_context_event ( & state ) {
events . push ( event ) ;
}
2018-03-15 19:21:42 +00:00
}
drop ( state ) ;
for event in events {
self . src_pad . push_event ( event ) ;
}
2019-01-11 23:45:05 +00:00
let res = match self . src_pad . push ( buffer ) {
2018-03-16 17:11:53 +00:00
Ok ( _ ) = > {
gst_log! ( self . cat , obj : element , " Successfully pushed buffer " ) ;
Ok ( ( ) )
}
Err ( gst ::FlowError ::Flushing ) = > {
gst_debug! ( self . cat , obj : element , " Flushing " ) ;
let state = self . state . lock ( ) . unwrap ( ) ;
if let Some ( ref socket ) = state . socket {
socket . pause ( ) ;
}
Ok ( ( ) )
}
Err ( gst ::FlowError ::Eos ) = > {
gst_debug! ( self . cat , obj : element , " EOS " ) ;
let state = self . state . lock ( ) . unwrap ( ) ;
if let Some ( ref socket ) = state . socket {
socket . pause ( ) ;
}
Ok ( ( ) )
}
Err ( err ) = > {
gst_error! ( self . cat , obj : element , " Got error {} " , err ) ;
gst_element_error! (
element ,
gst ::StreamError ::Failed ,
( " Internal data stream error " ) ,
[ " streaming stopped, reason {} " , err ]
) ;
Err ( gst ::FlowError ::CustomError )
}
2018-03-26 14:49:42 +00:00
} ;
match res {
Ok ( ( ) ) = > {
let mut state = self . state . lock ( ) . unwrap ( ) ;
2018-04-05 09:49:12 +00:00
if let State {
io_context : Some ( ref io_context ) ,
pending_future_id : Some ( ref pending_future_id ) ,
2018-03-26 14:49:42 +00:00
ref mut pending_future_cancel ,
..
2018-04-05 09:49:12 +00:00
} = * state
2018-03-26 14:49:42 +00:00
{
2018-04-05 09:49:12 +00:00
let ( cancel , future ) = io_context . drain_pending_futures ( * pending_future_id ) ;
* pending_future_cancel = cancel ;
future
2018-03-26 14:49:42 +00:00
} else {
2018-04-05 09:49:12 +00:00
future ::Either ::B ( future ::ok ( ( ) ) )
2018-03-26 14:49:42 +00:00
}
}
2018-04-05 09:49:12 +00:00
Err ( err ) = > future ::Either ::B ( future ::err ( err ) ) ,
2018-03-16 17:11:53 +00:00
}
2018-03-15 19:21:42 +00:00
}
2018-12-06 11:03:04 +00:00
fn prepare ( & self , element : & gst ::Element ) -> Result < ( ) , gst ::ErrorMessage > {
2018-03-08 20:26:34 +00:00
use std ::net ::{ IpAddr , Ipv4Addr , Ipv6Addr , SocketAddr } ;
gst_debug! ( self . cat , obj : element , " Preparing " ) ;
let settings = self . settings . lock ( ) . unwrap ( ) . clone ( ) ;
let mut state = self . state . lock ( ) . unwrap ( ) ;
2018-11-05 11:43:38 +00:00
let io_context =
IOContext ::new ( & settings . context , settings . context_wait ) . map_err ( | err | {
gst_error_msg! (
gst ::ResourceError ::OpenRead ,
[ " Failed to create IO context: {} " , err ]
)
} ) ? ;
2018-03-08 20:26:34 +00:00
2018-11-13 12:13:23 +00:00
let socket = if let Some ( ref wrapped_socket ) = settings . socket {
use std ::net ::UdpSocket ;
2019-06-06 06:26:02 +00:00
let socket : UdpSocket ;
2018-11-13 12:13:23 +00:00
#[ cfg(unix) ]
{
socket = wrapped_socket . get ( )
}
#[ cfg(windows) ]
{
socket = wrapped_socket . get ( )
2018-03-16 18:24:36 +00:00
}
2018-11-13 12:13:23 +00:00
let socket =
net ::UdpSocket ::from_std ( socket , io_context . reactor_handle ( ) ) . map_err ( | err | {
gst_error_msg! (
gst ::ResourceError ::OpenRead ,
[ " Failed to setup socket for tokio: {} " , err ]
)
} ) ? ;
self . settings . lock ( ) . unwrap ( ) . used_socket = Some ( wrapped_socket . clone ( ) ) ;
socket
} else {
let addr : IpAddr = match settings . address {
None = > {
2018-03-16 18:24:36 +00:00
return Err ( gst_error_msg! (
gst ::ResourceError ::Settings ,
2018-11-13 12:13:23 +00:00
[ " No address set " ]
2018-12-04 17:01:40 +00:00
) ) ;
2018-03-16 18:24:36 +00:00
}
2018-11-13 12:13:23 +00:00
Some ( ref addr ) = > match addr . parse ( ) {
Err ( err ) = > {
return Err ( gst_error_msg! (
gst ::ResourceError ::Settings ,
[ " Invalid address '{}' set: {} " , addr , err ]
2018-12-04 17:01:40 +00:00
) ) ;
2018-11-13 12:13:23 +00:00
}
Ok ( addr ) = > addr ,
} ,
2018-03-08 20:26:34 +00:00
} ;
2018-11-13 12:13:23 +00:00
let port = settings . port ;
2018-03-08 20:26:34 +00:00
2018-11-13 12:13:23 +00:00
// TODO: TTL, multicast loopback, etc
let saddr = if addr . is_multicast ( ) {
// TODO: Use ::unspecified() constructor once stable
let bind_addr = if addr . is_ipv4 ( ) {
IpAddr ::V4 ( Ipv4Addr ::new ( 0 , 0 , 0 , 0 ) )
} else {
IpAddr ::V6 ( Ipv6Addr ::new ( 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 ) )
} ;
2018-05-23 07:32:06 +00:00
2018-11-13 12:13:23 +00:00
let saddr = SocketAddr ::new ( bind_addr , port as u16 ) ;
gst_debug! (
self . cat ,
obj : element ,
" Binding to {:?} for multicast group {:?} " ,
saddr ,
addr
) ;
2018-05-23 07:32:06 +00:00
2018-11-13 12:13:23 +00:00
saddr
} else {
let saddr = SocketAddr ::new ( addr , port as u16 ) ;
gst_debug! ( self . cat , obj : element , " Binding to {:?} " , saddr ) ;
2018-05-23 07:32:06 +00:00
2018-11-13 12:13:23 +00:00
saddr
} ;
2018-05-23 07:32:06 +00:00
2018-11-13 12:13:23 +00:00
let builder = if addr . is_ipv4 ( ) {
net2 ::UdpBuilder ::new_v4 ( )
} else {
net2 ::UdpBuilder ::new_v6 ( )
}
. map_err ( | err | {
gst_error_msg! (
gst ::ResourceError ::OpenRead ,
[ " Failed to create socket: {} " , err ]
)
} ) ? ;
2018-03-08 20:26:34 +00:00
2018-11-13 12:13:23 +00:00
builder . reuse_address ( settings . reuse ) . map_err ( | err | {
2018-05-23 07:32:06 +00:00
gst_error_msg! (
gst ::ResourceError ::OpenRead ,
2018-11-13 12:13:23 +00:00
[ " Failed to set reuse_address: {} " , err ]
2018-05-23 07:32:06 +00:00
)
} ) ? ;
2018-11-13 12:13:23 +00:00
#[ cfg(unix) ]
{
use net2 ::unix ::UnixUdpBuilderExt ;
builder . reuse_port ( settings . reuse ) . map_err ( | err | {
gst_error_msg! (
gst ::ResourceError ::OpenRead ,
[ " Failed to set reuse_port: {} " , err ]
)
} ) ? ;
}
2018-05-23 17:35:12 +00:00
2018-11-13 12:13:23 +00:00
let socket = builder . bind ( & saddr ) . map_err ( | err | {
2018-07-24 10:40:58 +00:00
gst_error_msg! (
gst ::ResourceError ::OpenRead ,
2018-11-13 12:13:23 +00:00
[ " Failed to bind socket: {} " , err ]
2018-07-24 10:40:58 +00:00
)
} ) ? ;
2018-05-23 17:35:12 +00:00
2018-11-13 12:13:23 +00:00
let socket =
net ::UdpSocket ::from_std ( socket , io_context . reactor_handle ( ) ) . map_err ( | err | {
gst_error_msg! (
gst ::ResourceError ::OpenRead ,
[ " Failed to setup socket for tokio: {} " , err ]
)
} ) ? ;
if addr . is_multicast ( ) {
// TODO: Multicast interface configuration, going to be tricky
match addr {
IpAddr ::V4 ( addr ) = > {
socket
. join_multicast_v4 ( & addr , & Ipv4Addr ::new ( 0 , 0 , 0 , 0 ) )
. map_err ( | err | {
gst_error_msg! (
gst ::ResourceError ::OpenRead ,
[ " Failed to join multicast group: {} " , err ]
)
} ) ? ;
}
IpAddr ::V6 ( addr ) = > {
socket . join_multicast_v6 ( & addr , 0 ) . map_err ( | err | {
2018-03-16 18:24:36 +00:00
gst_error_msg! (
gst ::ResourceError ::OpenRead ,
[ " Failed to join multicast group: {} " , err ]
)
} ) ? ;
2018-11-13 12:13:23 +00:00
}
2018-03-08 20:26:34 +00:00
}
2018-11-13 12:13:23 +00:00
}
// Store the socket as used-socket in the settings
#[ cfg(unix) ]
2018-12-03 11:02:35 +00:00
unsafe {
let fd = libc ::dup ( socket . as_raw_fd ( ) ) ;
2018-11-13 12:13:23 +00:00
2018-12-03 11:02:35 +00:00
// This is unsafe because it allows us to share the fd between the socket and the
// GIO socket below, but safety of this is the job of the application
2018-11-13 12:13:23 +00:00
struct FdConverter ( RawFd ) ;
impl IntoRawFd for FdConverter {
fn into_raw_fd ( self ) -> RawFd {
self . 0
}
2018-03-08 20:26:34 +00:00
}
2018-11-13 12:13:23 +00:00
let fd = FdConverter ( fd ) ;
let gio_socket = gio ::Socket ::new_from_fd ( fd ) . map_err ( | err | {
gst_error_msg! (
gst ::ResourceError ::OpenRead ,
[ " Failed to create wrapped GIO socket: {} " , err ]
)
} ) ? ; ;
let wrapper = GioSocketWrapper ::new ( & gio_socket ) ;
self . settings . lock ( ) . unwrap ( ) . used_socket = Some ( wrapper ) ;
}
#[ cfg(windows) ]
2018-12-03 11:02:35 +00:00
unsafe {
2019-02-07 08:21:08 +00:00
// FIXME: Needs https://github.com/tokio-rs/tokio/pull/806
// and https://github.com/carllerche/mio/pull/859
let fd = unreachable! ( ) ; //dup_socket(socket.as_raw_socket() as _) as _;
2018-11-13 12:13:23 +00:00
2018-12-03 11:02:35 +00:00
// This is unsafe because it allows us to share the fd between the socket and the
// GIO socket below, but safety of this is the job of the application
2018-11-13 12:13:23 +00:00
struct SocketConverter ( RawSocket ) ;
impl IntoRawSocket for SocketConverter {
fn into_raw_socket ( self ) -> RawSocket {
self . 0
}
}
let fd = SocketConverter ( fd ) ;
let gio_socket = gio ::Socket ::new_from_socket ( fd ) . map_err ( | err | {
gst_error_msg! (
gst ::ResourceError ::OpenRead ,
[ " Failed to create wrapped GIO socket: {} " , err ]
)
} ) ? ; ;
let wrapper = GioSocketWrapper ::new ( & gio_socket ) ;
self . settings . lock ( ) . unwrap ( ) . used_socket = Some ( wrapper ) ;
2018-03-08 20:26:34 +00:00
}
2018-11-13 12:13:23 +00:00
socket
} ;
2018-03-08 20:26:34 +00:00
let buffer_pool = gst ::BufferPool ::new ( ) ;
let mut config = buffer_pool . get_config ( ) ;
config . set_params ( None , settings . mtu , 0 , 0 ) ;
2018-03-16 18:24:36 +00:00
buffer_pool . set_config ( config ) . map_err ( | _ | {
gst_error_msg! (
gst ::ResourceError ::Settings ,
[ " Failed to configure buffer pool " ]
)
} ) ? ;
2018-03-08 20:26:34 +00:00
2018-07-06 07:19:59 +00:00
let socket = Socket ::new ( element . upcast_ref ( ) , UdpReader ::new ( socket ) , buffer_pool ) ;
2018-03-08 20:26:34 +00:00
let element_clone = element . clone ( ) ;
2018-03-16 17:11:53 +00:00
let element_clone2 = element . clone ( ) ;
2019-03-13 16:07:53 +00:00
let retrieve_sender_address = self . settings . lock ( ) . unwrap ( ) . retrieve_sender_address ;
2018-03-16 18:24:36 +00:00
socket
. schedule (
& io_context ,
2019-03-13 16:07:53 +00:00
move | ( mut buffer , saddr ) | {
2018-12-06 11:03:04 +00:00
let udpsrc = Self ::from_instance ( & element_clone ) ;
2019-03-13 16:07:53 +00:00
if let Some ( saddr ) = saddr {
if retrieve_sender_address {
let inet_addr = match saddr . ip ( ) {
IpAddr ::V4 ( ip ) = > gio ::InetAddress ::new_from_bytes (
gio ::InetAddressBytes ::V4 ( & ip . octets ( ) ) ,
) ,
IpAddr ::V6 ( ip ) = > gio ::InetAddress ::new_from_bytes (
gio ::InetAddressBytes ::V6 ( & ip . octets ( ) ) ,
) ,
} ;
let inet_socket_addr =
& gio ::InetSocketAddress ::new ( & inet_addr , saddr . port ( ) ) ;
NetAddressMeta ::add ( buffer . get_mut ( ) . unwrap ( ) , inet_socket_addr ) ;
}
}
2018-03-16 18:24:36 +00:00
udpsrc . push_buffer ( & element_clone , buffer )
} ,
move | err | {
2018-12-06 11:03:04 +00:00
let udpsrc = Self ::from_instance ( & element_clone2 ) ;
2018-03-16 18:24:36 +00:00
gst_error! ( udpsrc . cat , obj : & element_clone2 , " Got error {} " , err ) ;
match err {
Either ::Left ( gst ::FlowError ::CustomError ) = > ( ) ,
Either ::Left ( err ) = > {
gst_element_error! (
element_clone2 ,
gst ::StreamError ::Failed ,
( " Internal data stream error " ) ,
[ " streaming stopped, reason {} " , err ]
) ;
}
Either ::Right ( err ) = > {
gst_element_error! (
element_clone2 ,
gst ::StreamError ::Failed ,
( " I/O error " ) ,
[ " streaming stopped, I/O error {} " , err ]
) ;
}
2018-03-16 17:11:53 +00:00
}
2018-03-16 18:24:36 +00:00
} ,
)
. map_err ( | _ | {
gst_error_msg! ( gst ::ResourceError ::OpenRead , [ " Failed to schedule socket " ] )
} ) ? ;
2018-03-08 20:26:34 +00:00
2018-03-26 14:49:42 +00:00
let pending_future_id = io_context . acquire_pending_future_id ( ) ;
gst_debug! (
self . cat ,
obj : element ,
" Got pending future id {:?} " ,
pending_future_id
) ;
2018-03-08 20:26:34 +00:00
state . socket = Some ( socket ) ;
state . io_context = Some ( io_context ) ;
2018-03-26 14:49:42 +00:00
state . pending_future_id = Some ( pending_future_id ) ;
2018-03-08 20:26:34 +00:00
gst_debug! ( self . cat , obj : element , " Prepared " ) ;
2018-11-13 12:13:23 +00:00
drop ( state ) ;
element . notify ( " used-socket " ) ;
2018-03-08 20:26:34 +00:00
Ok ( ( ) )
}
2018-12-06 11:03:04 +00:00
fn unprepare ( & self , element : & gst ::Element ) -> Result < ( ) , ( ) > {
2018-03-08 20:26:34 +00:00
gst_debug! ( self . cat , obj : element , " Unpreparing " ) ;
2018-11-13 12:13:23 +00:00
self . settings . lock ( ) . unwrap ( ) . used_socket = None ;
2018-03-28 09:29:29 +00:00
// FIXME: The IO Context has to be alive longer than the queue,
// otherwise the queue can't finish any remaining work
let ( mut socket , io_context ) = {
let mut state = self . state . lock ( ) . unwrap ( ) ;
if let ( & Some ( ref pending_future_id ) , & Some ( ref io_context ) ) =
( & state . pending_future_id , & state . io_context )
{
io_context . release_pending_future_id ( * pending_future_id ) ;
}
2018-03-08 20:26:34 +00:00
2018-03-28 09:29:29 +00:00
let socket = state . socket . take ( ) ;
let io_context = state . io_context . take ( ) ;
* state = State ::default ( ) ;
( socket , io_context )
} ;
2018-03-08 20:26:34 +00:00
2018-03-28 09:29:29 +00:00
if let Some ( ref socket ) = socket . take ( ) {
socket . shutdown ( ) ;
2018-03-26 14:49:42 +00:00
}
2018-03-28 09:29:29 +00:00
drop ( io_context ) ;
2018-03-08 20:26:34 +00:00
gst_debug! ( self . cat , obj : element , " Unprepared " ) ;
Ok ( ( ) )
}
2018-12-06 11:03:04 +00:00
fn start ( & self , element : & gst ::Element ) -> Result < ( ) , ( ) > {
2018-03-08 20:26:34 +00:00
gst_debug! ( self . cat , obj : element , " Starting " ) ;
2018-03-15 19:17:01 +00:00
let state = self . state . lock ( ) . unwrap ( ) ;
2018-03-08 20:26:34 +00:00
if let Some ( ref socket ) = state . socket {
2018-07-06 07:19:59 +00:00
socket . unpause ( element . get_clock ( ) , Some ( element . get_base_time ( ) ) ) ;
2018-03-08 20:26:34 +00:00
}
gst_debug! ( self . cat , obj : element , " Started " ) ;
Ok ( ( ) )
}
2018-12-06 11:03:04 +00:00
fn stop ( & self , element : & gst ::Element ) -> Result < ( ) , ( ) > {
2018-03-08 20:26:34 +00:00
gst_debug! ( self . cat , obj : element , " Stopping " ) ;
2018-03-26 14:49:42 +00:00
let mut state = self . state . lock ( ) . unwrap ( ) ;
2018-03-08 20:26:34 +00:00
if let Some ( ref socket ) = state . socket {
socket . pause ( ) ;
}
2018-03-26 14:49:42 +00:00
let _ = state . pending_future_cancel . take ( ) ;
2018-03-08 20:26:34 +00:00
gst_debug! ( self . cat , obj : element , " Stopped " ) ;
Ok ( ( ) )
}
2018-03-06 09:38:27 +00:00
}
2018-12-06 11:03:04 +00:00
impl ObjectSubclass for UdpSrc {
const NAME : & 'static str = " RsTsUdpSrc " ;
type ParentType = gst ::Element ;
type Instance = gst ::subclass ::ElementInstanceStruct < Self > ;
type Class = subclass ::simple ::ClassStruct < Self > ;
glib_object_subclass! ( ) ;
fn class_init ( klass : & mut subclass ::simple ::ClassStruct < Self > ) {
klass . set_metadata (
" Thread-sharing UDP source " ,
" Source/Network " ,
" Receives data over the network via UDP " ,
" Sebastian Dröge <sebastian@centricular.com> " ,
) ;
let caps = gst ::Caps ::new_any ( ) ;
let src_pad_template = gst ::PadTemplate ::new (
" src " ,
gst ::PadDirection ::Src ,
gst ::PadPresence ::Always ,
& caps ,
2019-01-29 15:26:40 +00:00
)
. unwrap ( ) ;
2018-12-06 11:03:04 +00:00
klass . add_pad_template ( src_pad_template ) ;
2019-02-07 08:21:08 +00:00
#[ cfg(not(windows)) ]
{
klass . install_properties ( & PROPERTIES ) ;
}
#[ cfg(windows) ]
{
let properties = PROPERTIES
. iter ( )
. filter ( | p | match * p {
subclass ::Property ( " socket " , .. ) | subclass ::Property ( " used-socket " , .. ) = > {
false
}
_ = > true ,
} )
. collect ::< Vec < _ > > ( ) ;
klass . install_properties ( properties . as_slice ( ) ) ;
}
2018-12-06 11:03:04 +00:00
}
fn new_with_class ( klass : & subclass ::simple ::ClassStruct < Self > ) -> Self {
let templ = klass . get_pad_template ( " src " ) . unwrap ( ) ;
2019-04-16 07:09:42 +00:00
let src_pad = gst ::Pad ::new_from_template ( & templ , Some ( " src " ) ) ;
2018-12-06 11:03:04 +00:00
src_pad . set_event_function ( | pad , parent , event | {
UdpSrc ::catch_panic_pad_function (
parent ,
| | false ,
| udpsrc , element | udpsrc . src_event ( pad , element , event ) ,
)
} ) ;
src_pad . set_query_function ( | pad , parent , query | {
UdpSrc ::catch_panic_pad_function (
parent ,
| | false ,
| udpsrc , element | udpsrc . src_query ( pad , element , query ) ,
)
} ) ;
Self {
cat : gst ::DebugCategory ::new (
" ts-udpsrc " ,
gst ::DebugColorFlags ::empty ( ) ,
2019-05-23 20:55:54 +00:00
Some ( " Thread-sharing UDP source " ) ,
2018-12-06 11:03:04 +00:00
) ,
2019-02-21 18:12:09 +00:00
src_pad ,
2018-12-06 11:03:04 +00:00
state : Mutex ::new ( State ::default ( ) ) ,
settings : Mutex ::new ( Settings ::default ( ) ) ,
}
}
}
impl ObjectImpl for UdpSrc {
glib_object_impl! ( ) ;
fn set_property ( & self , _obj : & glib ::Object , id : usize , value : & glib ::Value ) {
let prop = & PROPERTIES [ id ] ;
2018-03-08 20:26:34 +00:00
match * prop {
2018-12-06 11:03:04 +00:00
subclass ::Property ( " address " , .. ) = > {
2018-03-08 20:26:34 +00:00
let mut settings = self . settings . lock ( ) . unwrap ( ) ;
settings . address = value . get ( ) ;
}
2018-12-06 11:03:04 +00:00
subclass ::Property ( " port " , .. ) = > {
2018-03-08 20:26:34 +00:00
let mut settings = self . settings . lock ( ) . unwrap ( ) ;
settings . port = value . get ( ) . unwrap ( ) ;
}
2018-12-06 11:03:04 +00:00
subclass ::Property ( " reuse " , .. ) = > {
2018-05-23 07:32:06 +00:00
let mut settings = self . settings . lock ( ) . unwrap ( ) ;
settings . reuse = value . get ( ) . unwrap ( ) ;
}
2018-12-06 11:03:04 +00:00
subclass ::Property ( " caps " , .. ) = > {
2018-03-08 20:26:34 +00:00
let mut settings = self . settings . lock ( ) . unwrap ( ) ;
settings . caps = value . get ( ) ;
}
2018-12-06 11:03:04 +00:00
subclass ::Property ( " mtu " , .. ) = > {
2018-03-08 20:26:34 +00:00
let mut settings = self . settings . lock ( ) . unwrap ( ) ;
settings . mtu = value . get ( ) . unwrap ( ) ;
}
2018-12-06 11:03:04 +00:00
subclass ::Property ( " socket " , .. ) = > {
2018-11-13 12:13:23 +00:00
let mut settings = self . settings . lock ( ) . unwrap ( ) ;
settings . socket = value
. get ::< gio ::Socket > ( )
. map ( | socket | GioSocketWrapper ::new ( & socket ) ) ;
}
2018-12-06 11:03:04 +00:00
subclass ::Property ( " used-socket " , .. ) = > {
2018-11-13 12:13:23 +00:00
unreachable! ( ) ;
}
2018-12-06 11:03:04 +00:00
subclass ::Property ( " context " , .. ) = > {
2018-03-08 20:26:34 +00:00
let mut settings = self . settings . lock ( ) . unwrap ( ) ;
settings . context = value . get ( ) . unwrap_or_else ( | | " " . into ( ) ) ;
}
2018-12-06 11:03:04 +00:00
subclass ::Property ( " context-wait " , .. ) = > {
2018-03-12 14:19:46 +00:00
let mut settings = self . settings . lock ( ) . unwrap ( ) ;
settings . context_wait = value . get ( ) . unwrap ( ) ;
}
2019-03-13 16:07:53 +00:00
subclass ::Property ( " retrieve-sender-address " , .. ) = > {
let mut settings = self . settings . lock ( ) . unwrap ( ) ;
settings . retrieve_sender_address = value . get ( ) . unwrap ( ) ;
}
2018-03-08 20:26:34 +00:00
_ = > unimplemented! ( ) ,
}
}
2018-12-06 11:03:04 +00:00
fn get_property ( & self , _obj : & glib ::Object , id : usize ) -> Result < glib ::Value , ( ) > {
let prop = & PROPERTIES [ id ] ;
2018-03-08 20:26:34 +00:00
match * prop {
2018-12-06 11:03:04 +00:00
subclass ::Property ( " address " , .. ) = > {
2019-05-28 02:43:54 +00:00
let settings = self . settings . lock ( ) . unwrap ( ) ;
2018-03-08 20:26:34 +00:00
Ok ( settings . address . to_value ( ) )
}
2018-12-06 11:03:04 +00:00
subclass ::Property ( " port " , .. ) = > {
2019-05-28 02:43:54 +00:00
let settings = self . settings . lock ( ) . unwrap ( ) ;
2018-03-08 20:26:34 +00:00
Ok ( settings . port . to_value ( ) )
}
2018-12-06 11:03:04 +00:00
subclass ::Property ( " reuse " , .. ) = > {
2019-05-28 02:43:54 +00:00
let settings = self . settings . lock ( ) . unwrap ( ) ;
2018-05-23 07:32:06 +00:00
Ok ( settings . reuse . to_value ( ) )
}
2018-12-06 11:03:04 +00:00
subclass ::Property ( " caps " , .. ) = > {
2019-05-28 02:43:54 +00:00
let settings = self . settings . lock ( ) . unwrap ( ) ;
2018-03-08 20:26:34 +00:00
Ok ( settings . caps . to_value ( ) )
}
2018-12-06 11:03:04 +00:00
subclass ::Property ( " mtu " , .. ) = > {
2019-05-28 02:43:54 +00:00
let settings = self . settings . lock ( ) . unwrap ( ) ;
2018-03-08 20:26:34 +00:00
Ok ( settings . mtu . to_value ( ) )
}
2018-12-06 11:03:04 +00:00
subclass ::Property ( " socket " , .. ) = > {
2019-05-28 02:43:54 +00:00
let settings = self . settings . lock ( ) . unwrap ( ) ;
2018-11-13 12:13:23 +00:00
Ok ( settings
. socket
. as_ref ( )
. map ( GioSocketWrapper ::as_socket )
. to_value ( ) )
}
2018-12-06 11:03:04 +00:00
subclass ::Property ( " used-socket " , .. ) = > {
2019-05-28 02:43:54 +00:00
let settings = self . settings . lock ( ) . unwrap ( ) ;
2018-11-13 12:13:23 +00:00
Ok ( settings
. used_socket
. as_ref ( )
. map ( GioSocketWrapper ::as_socket )
. to_value ( ) )
}
2018-12-06 11:03:04 +00:00
subclass ::Property ( " context " , .. ) = > {
2019-05-28 02:43:54 +00:00
let settings = self . settings . lock ( ) . unwrap ( ) ;
2018-03-08 20:26:34 +00:00
Ok ( settings . context . to_value ( ) )
}
2018-12-06 11:03:04 +00:00
subclass ::Property ( " context-wait " , .. ) = > {
2019-05-28 02:43:54 +00:00
let settings = self . settings . lock ( ) . unwrap ( ) ;
2018-03-12 14:19:46 +00:00
Ok ( settings . context_wait . to_value ( ) )
}
2019-03-13 16:07:53 +00:00
subclass ::Property ( " retrieve-sender-address " , .. ) = > {
2019-05-28 02:43:54 +00:00
let settings = self . settings . lock ( ) . unwrap ( ) ;
2019-03-13 16:07:53 +00:00
Ok ( settings . retrieve_sender_address . to_value ( ) )
}
2018-03-08 20:26:34 +00:00
_ = > unimplemented! ( ) ,
}
}
2018-12-06 11:03:04 +00:00
fn constructed ( & self , obj : & glib ::Object ) {
self . parent_constructed ( obj ) ;
let element = obj . downcast_ref ::< gst ::Element > ( ) . unwrap ( ) ;
element . add_pad ( & self . src_pad ) . unwrap ( ) ;
::set_element_flags ( element , gst ::ElementFlags ::SOURCE ) ;
}
2018-03-08 20:26:34 +00:00
}
2018-03-06 09:38:27 +00:00
2018-12-06 11:03:04 +00:00
impl ElementImpl for UdpSrc {
2018-03-06 09:38:27 +00:00
fn change_state (
& self ,
2018-12-06 11:03:04 +00:00
element : & gst ::Element ,
2018-03-06 09:38:27 +00:00
transition : gst ::StateChange ,
2019-01-11 23:45:05 +00:00
) -> Result < gst ::StateChangeSuccess , gst ::StateChangeError > {
2018-03-06 09:38:27 +00:00
gst_trace! ( self . cat , obj : element , " Changing state {:?} " , transition ) ;
match transition {
2019-01-11 23:45:05 +00:00
gst ::StateChange ::NullToReady = > {
self . prepare ( element ) . map_err ( | err | {
2018-03-16 18:24:36 +00:00
element . post_error_message ( & err ) ;
2019-01-11 23:45:05 +00:00
gst ::StateChangeError
} ) ? ;
}
gst ::StateChange ::PlayingToPaused = > {
self . stop ( element ) . map_err ( | _ | gst ::StateChangeError ) ? ;
}
gst ::StateChange ::ReadyToNull = > {
self . unprepare ( element ) . map_err ( | _ | gst ::StateChangeError ) ? ;
}
2018-03-06 09:38:27 +00:00
_ = > ( ) ,
}
2019-01-11 23:45:05 +00:00
let mut success = self . parent_change_state ( element , transition ) ? ;
2018-03-06 09:38:27 +00:00
match transition {
gst ::StateChange ::ReadyToPaused = > {
2019-01-11 23:45:05 +00:00
success = gst ::StateChangeSuccess ::NoPreroll ;
}
gst ::StateChange ::PausedToPlaying = > {
self . start ( element ) . map_err ( | _ | gst ::StateChangeError ) ? ;
2018-03-06 09:38:27 +00:00
}
2018-03-08 20:26:34 +00:00
gst ::StateChange ::PausedToReady = > {
let mut state = self . state . lock ( ) . unwrap ( ) ;
state . need_initial_events = true ;
}
2018-03-06 09:38:27 +00:00
_ = > ( ) ,
}
2019-01-11 23:45:05 +00:00
Ok ( success )
2018-03-06 09:38:27 +00:00
}
}
2018-11-05 11:43:11 +00:00
pub fn register ( plugin : & gst ::Plugin ) -> Result < ( ) , glib ::BoolError > {
2019-06-04 09:04:06 +00:00
gst ::Element ::register (
Some ( plugin ) ,
" ts-udpsrc " ,
gst ::Rank ::None ,
UdpSrc ::get_type ( ) ,
)
2018-03-06 09:38:27 +00:00
}