2018-03-06 09:38:27 +00:00
// Copyright (C) 2018 Sebastian Dröge <sebastian@centricular.com>
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Library General Public
// License as published by the Free Software Foundation; either
// version 2 of the License, or (at your option) any later version.
//
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// Library General Public License for more details.
//
// You should have received a copy of the GNU Library General Public
// License along with this library; if not, write to the
// Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
// Boston, MA 02110-1335, USA.
2019-11-24 20:12:40 +00:00
use either ::Either ;
2019-12-02 09:30:07 +00:00
use futures ::future ::BoxFuture ;
2020-01-06 12:19:11 +00:00
use futures ::lock ::Mutex ;
2019-11-24 20:12:40 +00:00
use futures ::prelude ::* ;
use gio ;
use gio_sys as gio_ffi ;
2018-03-06 09:38:27 +00:00
use glib ;
use glib ::prelude ::* ;
2018-12-06 11:03:04 +00:00
use glib ::subclass ;
use glib ::subclass ::prelude ::* ;
2019-11-24 20:12:40 +00:00
use glib ::{ glib_object_impl , glib_object_subclass } ;
use gobject_sys as gobject_ffi ;
2018-03-06 09:38:27 +00:00
use gst ;
use gst ::prelude ::* ;
2018-12-06 11:03:04 +00:00
use gst ::subclass ::prelude ::* ;
2019-11-24 20:12:40 +00:00
use gst ::{ gst_debug , gst_element_error , gst_error , gst_error_msg , gst_log , gst_trace } ;
2019-03-13 16:07:53 +00:00
use gst_net ::* ;
2018-03-06 09:38:27 +00:00
2019-11-24 20:12:40 +00:00
use lazy_static ::lazy_static ;
2018-11-13 12:13:23 +00:00
2019-11-24 20:12:40 +00:00
use rand ;
2018-11-13 12:13:23 +00:00
2018-07-06 07:19:59 +00:00
use std ::io ;
2019-12-02 09:30:07 +00:00
use std ::net ::{ IpAddr , Ipv4Addr , Ipv6Addr , SocketAddr } ;
2020-01-06 12:19:11 +00:00
use std ::sync ::{ self , Arc } ;
2018-03-09 15:32:21 +00:00
use std ::u16 ;
2018-03-06 09:38:27 +00:00
2018-11-13 12:13:23 +00:00
#[ cfg(unix) ]
use std ::os ::unix ::io ::{ AsRawFd , FromRawFd , IntoRawFd , RawFd } ;
#[ cfg(windows) ]
2018-12-19 17:15:06 +00:00
use std ::os ::windows ::io ::{ AsRawSocket , FromRawSocket , IntoRawSocket , RawSocket } ;
2018-11-13 12:13:23 +00:00
2019-12-02 09:30:07 +00:00
use crate ::runtime ::prelude ::* ;
2020-01-02 21:32:52 +00:00
use crate ::runtime ::{ self , Context , JoinHandle , PadSrc , PadSrcRef } ;
2019-12-02 09:30:07 +00:00
use super ::socket ::{ Socket , SocketRead , SocketStream } ;
2018-03-08 20:26:34 +00:00
2019-02-21 18:12:09 +00:00
const DEFAULT_ADDRESS : Option < & str > = Some ( " 127.0.0.1 " ) ;
2018-03-08 20:26:34 +00:00
const DEFAULT_PORT : u32 = 5000 ;
2018-05-23 07:32:06 +00:00
const DEFAULT_REUSE : bool = true ;
2018-03-08 20:26:34 +00:00
const DEFAULT_CAPS : Option < gst ::Caps > = None ;
const DEFAULT_MTU : u32 = 1500 ;
2018-11-13 12:13:23 +00:00
const DEFAULT_SOCKET : Option < GioSocketWrapper > = None ;
const DEFAULT_USED_SOCKET : Option < GioSocketWrapper > = None ;
2019-02-21 18:12:09 +00:00
const DEFAULT_CONTEXT : & str = " " ;
2018-03-12 14:19:46 +00:00
const DEFAULT_CONTEXT_WAIT : u32 = 0 ;
2019-03-13 16:07:53 +00:00
const DEFAULT_RETRIEVE_SENDER_ADDRESS : bool = true ;
2018-03-08 20:26:34 +00:00
2018-11-13 12:13:23 +00:00
// Send/Sync struct for passing around a gio::Socket
// and getting the raw fd from it
//
// gio::Socket is not Send/Sync as it's generally unsafe
// to access it from multiple threads. Getting the underlying raw
// fd is safe though, as is receiving/sending from two different threads
#[ derive(Debug) ]
struct GioSocketWrapper {
socket : * mut gio_ffi ::GSocket ,
}
unsafe impl Send for GioSocketWrapper { }
unsafe impl Sync for GioSocketWrapper { }
impl GioSocketWrapper {
fn new ( socket : & gio ::Socket ) -> Self {
use glib ::translate ::* ;
Self {
socket : socket . to_glib_full ( ) ,
}
}
fn as_socket ( & self ) -> gio ::Socket {
unsafe {
use glib ::translate ::* ;
from_glib_none ( self . socket )
}
}
#[ cfg(unix) ]
fn get < T : FromRawFd > ( & self ) -> T {
2018-11-30 17:38:25 +00:00
unsafe { FromRawFd ::from_raw_fd ( libc ::dup ( gio_ffi ::g_socket_get_fd ( self . socket ) ) ) }
2018-11-13 12:13:23 +00:00
}
#[ cfg(windows) ]
fn get < T : FromRawSocket > ( & self ) -> T {
2018-11-30 17:38:25 +00:00
unsafe {
2018-12-19 17:15:06 +00:00
FromRawSocket ::from_raw_socket (
dup_socket ( gio_ffi ::g_socket_get_fd ( self . socket ) as _ ) as _
)
2018-11-30 17:38:25 +00:00
}
2018-11-13 12:13:23 +00:00
}
}
impl Clone for GioSocketWrapper {
fn clone ( & self ) -> Self {
Self {
socket : unsafe { gobject_ffi ::g_object_ref ( self . socket as * mut _ ) as * mut _ } ,
}
}
}
impl Drop for GioSocketWrapper {
fn drop ( & mut self ) {
unsafe {
gobject_ffi ::g_object_unref ( self . socket as * mut _ ) ;
}
}
}
2018-11-30 17:38:25 +00:00
#[ cfg(windows) ]
unsafe fn dup_socket ( socket : usize ) -> usize {
use std ::mem ;
use winapi ::shared ::ws2def ;
use winapi ::um ::processthreadsapi ;
use winapi ::um ::winsock2 ;
2019-07-10 14:20:45 +00:00
let mut proto_info = mem ::MaybeUninit ::uninit ( ) ;
2018-11-30 17:38:25 +00:00
let ret = winsock2 ::WSADuplicateSocketA (
socket ,
2018-12-19 17:15:06 +00:00
processthreadsapi ::GetCurrentProcessId ( ) ,
2019-07-10 14:20:45 +00:00
proto_info . as_mut_ptr ( ) ,
2018-11-30 17:38:25 +00:00
) ;
assert_eq! ( ret , 0 ) ;
2019-08-08 13:35:08 +00:00
let mut proto_info = proto_info . assume_init ( ) ;
2018-12-19 17:15:06 +00:00
let socket = winsock2 ::WSASocketA (
2018-11-30 17:38:25 +00:00
ws2def ::AF_INET ,
ws2def ::SOCK_DGRAM ,
2018-12-19 17:15:06 +00:00
ws2def ::IPPROTO_UDP as i32 ,
2018-11-30 17:38:25 +00:00
& mut proto_info ,
0 ,
0 ,
) ;
2018-12-19 17:15:06 +00:00
assert_ne! ( socket , winsock2 ::INVALID_SOCKET ) ;
socket
2018-11-30 17:38:25 +00:00
}
2018-03-08 20:26:34 +00:00
#[ derive(Debug, Clone) ]
struct Settings {
address : Option < String > ,
port : u32 ,
2018-05-23 07:32:06 +00:00
reuse : bool ,
2018-03-08 20:26:34 +00:00
caps : Option < gst ::Caps > ,
mtu : u32 ,
2018-11-13 12:13:23 +00:00
socket : Option < GioSocketWrapper > ,
used_socket : Option < GioSocketWrapper > ,
2018-03-08 20:26:34 +00:00
context : String ,
2018-03-12 14:19:46 +00:00
context_wait : u32 ,
2019-03-13 16:07:53 +00:00
retrieve_sender_address : bool ,
2018-03-08 20:26:34 +00:00
}
impl Default for Settings {
fn default ( ) -> Self {
Settings {
address : DEFAULT_ADDRESS . map ( Into ::into ) ,
port : DEFAULT_PORT ,
2018-05-23 07:32:06 +00:00
reuse : DEFAULT_REUSE ,
2018-03-08 20:26:34 +00:00
caps : DEFAULT_CAPS ,
mtu : DEFAULT_MTU ,
2018-11-13 12:13:23 +00:00
socket : DEFAULT_SOCKET ,
used_socket : DEFAULT_USED_SOCKET ,
2018-03-08 20:26:34 +00:00
context : DEFAULT_CONTEXT . into ( ) ,
2018-03-12 14:19:46 +00:00
context_wait : DEFAULT_CONTEXT_WAIT ,
2019-03-13 16:07:53 +00:00
retrieve_sender_address : DEFAULT_RETRIEVE_SENDER_ADDRESS ,
2018-03-08 20:26:34 +00:00
}
}
}
2019-03-13 16:07:53 +00:00
static PROPERTIES : [ subclass ::Property ; 10 ] = [
2018-12-18 09:23:45 +00:00
subclass ::Property ( " address " , | name | {
2018-12-06 11:03:04 +00:00
glib ::ParamSpec ::string (
2018-12-18 09:23:45 +00:00
name ,
2018-12-06 11:03:04 +00:00
" Address " ,
" Address/multicast group to listen on " ,
DEFAULT_ADDRESS ,
glib ::ParamFlags ::READWRITE ,
)
} ) ,
2018-12-18 09:23:45 +00:00
subclass ::Property ( " port " , | name | {
2018-12-06 11:03:04 +00:00
glib ::ParamSpec ::uint (
2018-12-18 09:23:45 +00:00
name ,
2018-12-06 11:03:04 +00:00
" Port " ,
" Port to listen on " ,
0 ,
u16 ::MAX as u32 ,
DEFAULT_PORT ,
glib ::ParamFlags ::READWRITE ,
)
} ) ,
2018-12-18 09:23:45 +00:00
subclass ::Property ( " reuse " , | name | {
2018-12-06 11:03:04 +00:00
glib ::ParamSpec ::boolean (
2018-12-18 09:23:45 +00:00
name ,
2018-12-06 11:03:04 +00:00
" Reuse " ,
" Allow reuse of the port " ,
DEFAULT_REUSE ,
glib ::ParamFlags ::READWRITE ,
)
} ) ,
2018-12-18 09:23:45 +00:00
subclass ::Property ( " caps " , | name | {
2018-12-06 11:03:04 +00:00
glib ::ParamSpec ::boxed (
2018-12-18 09:23:45 +00:00
name ,
2018-12-06 11:03:04 +00:00
" Caps " ,
" Caps to use " ,
gst ::Caps ::static_type ( ) ,
glib ::ParamFlags ::READWRITE ,
)
} ) ,
2018-12-18 09:23:45 +00:00
subclass ::Property ( " mtu " , | name | {
2018-12-06 11:03:04 +00:00
glib ::ParamSpec ::uint (
2018-12-18 09:23:45 +00:00
name ,
2018-12-06 11:03:04 +00:00
" MTU " ,
" MTU " ,
0 ,
u16 ::MAX as u32 ,
DEFAULT_MTU ,
glib ::ParamFlags ::READWRITE ,
)
} ) ,
2018-12-18 09:23:45 +00:00
subclass ::Property ( " socket " , | name | {
2018-12-06 11:03:04 +00:00
glib ::ParamSpec ::object (
2018-12-18 09:23:45 +00:00
name ,
2018-12-06 11:03:04 +00:00
" Socket " ,
" Socket to use for UDP reception. (None == allocate) " ,
gio ::Socket ::static_type ( ) ,
glib ::ParamFlags ::READWRITE ,
)
} ) ,
2018-12-18 09:23:45 +00:00
subclass ::Property ( " used-socket " , | name | {
2018-12-06 11:03:04 +00:00
glib ::ParamSpec ::object (
2018-12-18 09:23:45 +00:00
name ,
2018-12-06 11:03:04 +00:00
" Used Socket " ,
" Socket currently in use for UDP reception. (None = no socket) " ,
gio ::Socket ::static_type ( ) ,
glib ::ParamFlags ::READABLE ,
)
} ) ,
2018-12-18 09:23:45 +00:00
subclass ::Property ( " context " , | name | {
2018-12-06 11:03:04 +00:00
glib ::ParamSpec ::string (
2018-12-18 09:23:45 +00:00
name ,
2018-12-06 11:03:04 +00:00
" Context " ,
" Context name to share threads with " ,
Some ( DEFAULT_CONTEXT ) ,
glib ::ParamFlags ::READWRITE ,
)
} ) ,
2018-12-18 09:23:45 +00:00
subclass ::Property ( " context-wait " , | name | {
2018-12-06 11:03:04 +00:00
glib ::ParamSpec ::uint (
2018-12-18 09:23:45 +00:00
name ,
2018-12-06 11:03:04 +00:00
" Context Wait " ,
" Throttle poll loop to run at most once every this many ms " ,
0 ,
1000 ,
DEFAULT_CONTEXT_WAIT ,
glib ::ParamFlags ::READWRITE ,
)
} ) ,
2019-03-13 16:07:53 +00:00
subclass ::Property ( " retrieve-sender-address " , | name | {
glib ::ParamSpec ::boolean (
name ,
" Retrieve sender address " ,
" Whether to retrieve the sender address and add it to buffers as meta. Disabling this might result in minor performance improvements in certain scenarios " ,
DEFAULT_REUSE ,
glib ::ParamFlags ::READWRITE ,
)
} ) ,
2018-03-08 20:26:34 +00:00
] ;
2019-12-02 09:30:07 +00:00
#[ derive(Debug) ]
struct UdpReaderInner {
2019-11-30 18:51:31 +00:00
socket : tokio ::net ::UdpSocket ,
2018-07-06 07:19:59 +00:00
}
2019-12-02 09:30:07 +00:00
#[ derive(Debug) ]
pub struct UdpReader ( Arc < Mutex < UdpReaderInner > > ) ;
2018-07-06 07:19:59 +00:00
impl UdpReader {
2019-11-30 18:51:31 +00:00
fn new ( socket : tokio ::net ::UdpSocket ) -> Self {
2019-12-02 09:30:07 +00:00
UdpReader ( Arc ::new ( Mutex ::new ( UdpReaderInner { socket } ) ) )
2018-07-06 07:19:59 +00:00
}
}
impl SocketRead for UdpReader {
const DO_TIMESTAMP : bool = true ;
2019-12-02 09:30:07 +00:00
fn read < ' buf > (
& self ,
buffer : & ' buf mut [ u8 ] ,
) -> BoxFuture < ' buf , io ::Result < ( usize , Option < std ::net ::SocketAddr > ) > > {
let this = Arc ::clone ( & self . 0 ) ;
async move {
this . lock ( )
. await
. socket
. recv_from ( buffer )
. await
. map ( | ( read_size , saddr ) | ( read_size , Some ( saddr ) ) )
}
. boxed ( )
2018-07-06 07:19:59 +00:00
}
}
2019-12-02 09:30:07 +00:00
#[ derive(Debug) ]
2020-01-06 12:19:11 +00:00
struct UdpSrcPadHandlerState {
2019-12-02 09:30:07 +00:00
retrieve_sender_address : bool ,
2018-03-08 20:26:34 +00:00
need_initial_events : bool ,
2020-01-06 12:19:11 +00:00
caps : Option < gst ::Caps > ,
2018-03-15 19:17:01 +00:00
configured_caps : Option < gst ::Caps > ,
2018-03-06 09:38:27 +00:00
}
2020-01-06 12:19:11 +00:00
impl Default for UdpSrcPadHandlerState {
2019-12-02 09:30:07 +00:00
fn default ( ) -> Self {
2020-01-06 12:19:11 +00:00
UdpSrcPadHandlerState {
2019-12-02 09:30:07 +00:00
retrieve_sender_address : true ,
2018-03-08 20:26:34 +00:00
need_initial_events : true ,
2020-01-06 12:19:11 +00:00
caps : None ,
2018-03-15 19:17:01 +00:00
configured_caps : None ,
2018-03-08 20:26:34 +00:00
}
2018-03-06 09:38:27 +00:00
}
}
2020-01-06 12:19:11 +00:00
#[ derive(Debug, Default) ]
struct UdpSrcPadHandlerInner {
state : sync ::RwLock < UdpSrcPadHandlerState > ,
socket_stream : Mutex < Option < SocketStream < UdpReader > > > ,
flush_join_handle : sync ::Mutex < Option < JoinHandle < Result < ( ) , ( ) > > > > ,
}
2019-12-02 09:30:07 +00:00
2020-01-06 12:19:11 +00:00
#[ derive(Clone, Debug, Default) ]
struct UdpSrcPadHandler ( Arc < UdpSrcPadHandlerInner > ) ;
2019-12-02 09:30:07 +00:00
2020-01-06 12:19:11 +00:00
impl UdpSrcPadHandler {
2019-12-02 09:30:07 +00:00
async fn start_task ( & self , pad : PadSrcRef < '_ > , element : & gst ::Element ) {
let this = self . clone ( ) ;
let pad_weak = pad . downgrade ( ) ;
let element = element . clone ( ) ;
pad . start_task ( move | | {
let this = this . clone ( ) ;
let pad_weak = pad_weak . clone ( ) ;
let element = element . clone ( ) ;
async move {
let item = this
2020-01-06 12:19:11 +00:00
. 0
. socket_stream
2019-12-02 09:30:07 +00:00
. lock ( )
. await
. as_mut ( )
. expect ( " Missing SocketStream " )
. next ( )
. await ;
let pad = pad_weak . upgrade ( ) . expect ( " PadSrc no longer exists " ) ;
let ( mut buffer , saddr ) = match item {
Some ( Ok ( ( buffer , saddr ) ) ) = > ( buffer , saddr ) ,
Some ( Err ( err ) ) = > {
gst_error! ( CAT , obj : & element , " Got error {} " , err ) ;
match err {
Either ::Left ( gst ::FlowError ::CustomError ) = > ( ) ,
Either ::Left ( err ) = > {
gst_element_error! (
element ,
gst ::StreamError ::Failed ,
( " Internal data stream error " ) ,
[ " streaming stopped, reason {} " , err ]
) ;
}
Either ::Right ( err ) = > {
gst_element_error! (
element ,
gst ::StreamError ::Failed ,
( " I/O error " ) ,
[ " streaming stopped, I/O error {} " , err ]
) ;
}
}
return ;
}
None = > {
gst_log! ( CAT , obj : pad . gst_pad ( ) , " SocketStream Stopped " ) ;
pad . pause_task ( ) . await ;
return ;
}
} ;
if let Some ( saddr ) = saddr {
2020-01-06 12:19:11 +00:00
if this . 0. state . read ( ) . unwrap ( ) . retrieve_sender_address {
2019-12-02 09:30:07 +00:00
let inet_addr = match saddr . ip ( ) {
IpAddr ::V4 ( ip ) = > gio ::InetAddress ::new_from_bytes (
gio ::InetAddressBytes ::V4 ( & ip . octets ( ) ) ,
) ,
IpAddr ::V6 ( ip ) = > gio ::InetAddress ::new_from_bytes (
gio ::InetAddressBytes ::V6 ( & ip . octets ( ) ) ,
) ,
} ;
let inet_socket_addr =
& gio ::InetSocketAddress ::new ( & inet_addr , saddr . port ( ) ) ;
NetAddressMeta ::add ( buffer . get_mut ( ) . unwrap ( ) , inet_socket_addr ) ;
}
}
this . push_buffer ( pad , & element , buffer ) . await ;
}
} )
. await ;
}
async fn push_buffer ( & self , pad : PadSrcRef < '_ > , element : & gst ::Element , buffer : gst ::Buffer ) {
{
let mut events = Vec ::new ( ) ;
{
2020-01-06 12:19:11 +00:00
// Only `read` the state in the hot path
if self . 0. state . read ( ) . unwrap ( ) . need_initial_events {
// We will need to `write` and we also want to prevent
// any changes on the state while we are handling initial events
let mut state = self . 0. state . write ( ) . unwrap ( ) ;
assert! ( state . need_initial_events ) ;
2019-12-02 09:30:07 +00:00
gst_debug! ( CAT , obj : pad . gst_pad ( ) , " Pushing initial events " ) ;
let stream_id =
format! ( " {:08x} {:08x} " , rand ::random ::< u32 > ( ) , rand ::random ::< u32 > ( ) ) ;
events . push (
gst ::Event ::new_stream_start ( & stream_id )
2020-01-24 23:20:37 +00:00
. group_id ( gst ::GroupId ::next ( ) )
2019-12-02 09:30:07 +00:00
. build ( ) ,
) ;
2020-01-06 12:19:11 +00:00
if let Some ( ref caps ) = state . caps {
2019-12-02 09:30:07 +00:00
events . push ( gst ::Event ::new_caps ( & caps ) . build ( ) ) ;
2020-01-06 12:19:11 +00:00
state . configured_caps = Some ( caps . clone ( ) ) ;
2019-12-02 09:30:07 +00:00
}
events . push (
gst ::Event ::new_segment ( & gst ::FormattedSegment ::< gst ::format ::Time > ::new ( ) )
. build ( ) ,
) ;
2020-01-06 12:19:11 +00:00
state . need_initial_events = false ;
2019-12-02 09:30:07 +00:00
}
}
for event in events {
pad . push_event ( event ) . await ;
}
}
match pad . push ( buffer ) . await {
Ok ( _ ) = > {
gst_log! ( CAT , obj : pad . gst_pad ( ) , " Successfully pushed buffer " ) ;
}
Err ( gst ::FlowError ::Flushing ) = > {
gst_debug! ( CAT , obj : pad . gst_pad ( ) , " Flushing " ) ;
pad . pause_task ( ) . await ;
}
Err ( gst ::FlowError ::Eos ) = > {
gst_debug! ( CAT , obj : pad . gst_pad ( ) , " EOS " ) ;
pad . pause_task ( ) . await ;
}
Err ( err ) = > {
gst_error! ( CAT , obj : pad . gst_pad ( ) , " Got error {} " , err ) ;
gst_element_error! (
element ,
gst ::StreamError ::Failed ,
( " Internal data stream error " ) ,
[ " streaming stopped, reason {} " , err ]
) ;
}
}
}
2019-10-31 22:34:21 +00:00
}
2019-12-02 09:30:07 +00:00
impl PadSrcHandler for UdpSrcPadHandler {
type ElementImpl = UdpSrc ;
2018-03-06 09:38:27 +00:00
2019-12-02 09:30:07 +00:00
fn src_event (
& self ,
2020-01-06 12:19:11 +00:00
pad : & PadSrcRef ,
_udpsrc : & UdpSrc ,
2019-12-02 09:30:07 +00:00
element : & gst ::Element ,
event : gst ::Event ,
) -> Either < bool , BoxFuture < 'static , bool > > {
2020-01-06 12:19:11 +00:00
use gst ::EventView ;
gst_log! ( CAT , obj : pad . gst_pad ( ) , " Handling {:?} " , event ) ;
2018-03-06 09:38:27 +00:00
2018-03-15 19:17:01 +00:00
let ret = match event . view ( ) {
EventView ::FlushStart ( .. ) = > {
2020-01-06 12:19:11 +00:00
let mut flush_join_handle = self . 0. flush_join_handle . lock ( ) . unwrap ( ) ;
if flush_join_handle . is_none ( ) {
let element = element . clone ( ) ;
let pad_weak = pad . downgrade ( ) ;
* flush_join_handle = Some ( pad . spawn ( async move {
let res = UdpSrc ::from_instance ( & element ) . pause ( & element ) . await ;
let pad = pad_weak . upgrade ( ) . unwrap ( ) ;
if res . is_ok ( ) {
gst_debug! ( CAT , obj : pad . gst_pad ( ) , " FlushStart complete " ) ;
} else {
gst_debug! ( CAT , obj : pad . gst_pad ( ) , " FlushStart failed " ) ;
}
res
} ) ) ;
} else {
gst_debug! ( CAT , obj : pad . gst_pad ( ) , " FlushStart ignored: previous Flush in progress " ) ;
}
2018-03-15 19:17:01 +00:00
true
}
EventView ::FlushStop ( .. ) = > {
2020-01-06 12:19:11 +00:00
let element = element . clone ( ) ;
let inner_weak = Arc ::downgrade ( & self . 0 ) ;
let pad_weak = pad . downgrade ( ) ;
let fut = async move {
let mut ret = false ;
let pad = pad_weak . upgrade ( ) . unwrap ( ) ;
let inner_weak = inner_weak . upgrade ( ) . unwrap ( ) ;
let flush_join_handle = inner_weak . flush_join_handle . lock ( ) . unwrap ( ) . take ( ) ;
if let Some ( flush_join_handle ) = flush_join_handle {
if let Ok ( Ok ( ( ) ) ) = flush_join_handle . await {
ret = UdpSrc ::from_instance ( & element )
. start ( & element )
. await
. is_ok ( ) ;
gst_debug! ( CAT , obj : pad . gst_pad ( ) , " FlushStop complete " ) ;
} else {
gst_debug! ( CAT , obj : pad . gst_pad ( ) , " FlushStop aborted: FlushStart failed " ) ;
}
} else {
gst_debug! ( CAT , obj : pad . gst_pad ( ) , " FlushStop ignored: no Flush in progress " ) ;
}
ret
2018-03-15 19:17:01 +00:00
}
2020-01-06 12:19:11 +00:00
. boxed ( ) ;
return Either ::Right ( fut ) ;
2018-03-15 19:17:01 +00:00
}
EventView ::Reconfigure ( .. ) = > true ,
EventView ::Latency ( .. ) = > true ,
_ = > false ,
} ;
2018-03-06 09:38:27 +00:00
2018-03-15 19:17:01 +00:00
if ret {
2020-01-06 12:19:11 +00:00
gst_log! ( CAT , obj : pad . gst_pad ( ) , " Handled {:?} " , event ) ;
2018-03-06 09:38:27 +00:00
} else {
2020-01-06 12:19:11 +00:00
gst_log! ( CAT , obj : pad . gst_pad ( ) , " Didn't handle {:?} " , event ) ;
2018-03-06 09:38:27 +00:00
}
2019-12-02 09:30:07 +00:00
Either ::Left ( ret )
2018-03-06 09:38:27 +00:00
}
2018-12-06 11:03:04 +00:00
fn src_query (
& self ,
2020-01-06 12:19:11 +00:00
pad : & PadSrcRef ,
2019-12-02 09:30:07 +00:00
_udpsrc : & UdpSrc ,
2018-12-06 11:03:04 +00:00
_element : & gst ::Element ,
query : & mut gst ::QueryRef ,
) -> bool {
2020-01-06 12:19:11 +00:00
use gst ::QueryView ;
gst_log! ( CAT , obj : pad . gst_pad ( ) , " Handling {:?} " , query ) ;
2018-03-06 09:38:27 +00:00
2018-03-15 19:17:01 +00:00
let ret = match query . view_mut ( ) {
QueryView ::Latency ( ref mut q ) = > {
2020-01-23 15:09:40 +00:00
q . set ( true , 0. into ( ) , gst ::CLOCK_TIME_NONE ) ;
2018-03-15 19:17:01 +00:00
true
}
QueryView ::Scheduling ( ref mut q ) = > {
q . set ( gst ::SchedulingFlags ::SEQUENTIAL , 1 , - 1 , 0 ) ;
q . add_scheduling_modes ( & [ gst ::PadMode ::Push ] ) ;
true
}
QueryView ::Caps ( ref mut q ) = > {
2020-01-06 12:19:11 +00:00
let state = self . 0. state . read ( ) . unwrap ( ) ;
let caps = if let Some ( ref caps ) = state . configured_caps {
2018-03-15 19:17:01 +00:00
q . get_filter ( )
. map ( | f | f . intersect_with_mode ( caps , gst ::CapsIntersectMode ::First ) )
2019-02-21 18:12:09 +00:00
. unwrap_or_else ( | | caps . clone ( ) )
2018-03-15 19:17:01 +00:00
} else {
q . get_filter ( )
. map ( | f | f . to_owned ( ) )
2019-07-04 15:30:26 +00:00
. unwrap_or_else ( gst ::Caps ::new_any )
2018-03-15 19:17:01 +00:00
} ;
q . set_result ( & caps ) ;
true
}
_ = > false ,
2018-03-06 09:38:27 +00:00
} ;
2018-03-15 19:17:01 +00:00
if ret {
2020-01-06 12:19:11 +00:00
gst_log! ( CAT , obj : pad . gst_pad ( ) , " Handled {:?} " , query ) ;
2018-03-15 19:17:01 +00:00
} else {
2020-01-06 12:19:11 +00:00
gst_log! ( CAT , obj : pad . gst_pad ( ) , " Didn't handle {:?} " , query ) ;
2018-03-15 19:17:01 +00:00
}
2018-03-08 20:26:34 +00:00
2019-12-02 09:30:07 +00:00
ret
2018-03-26 14:49:42 +00:00
}
2019-12-02 09:30:07 +00:00
}
2018-03-26 14:49:42 +00:00
2019-12-02 09:30:07 +00:00
#[ derive(Debug) ]
struct State {
socket : Option < Socket < UdpReader > > ,
}
2019-11-24 20:12:40 +00:00
2019-12-02 09:30:07 +00:00
impl Default for State {
fn default ( ) -> State {
State { socket : None }
2018-03-15 19:21:42 +00:00
}
2019-12-02 09:30:07 +00:00
}
2018-03-15 19:21:42 +00:00
2019-12-02 09:30:07 +00:00
#[ derive(Debug) ]
struct UdpSrc {
src_pad : PadSrc ,
src_pad_handler : UdpSrcPadHandler ,
state : Mutex < State > ,
settings : Mutex < Settings > ,
}
2018-03-08 20:26:34 +00:00
2019-12-02 09:30:07 +00:00
lazy_static! {
static ref CAT : gst ::DebugCategory = gst ::DebugCategory ::new (
" ts-udpsrc " ,
gst ::DebugColorFlags ::empty ( ) ,
Some ( " Thread-sharing UDP source " ) ,
) ;
}
2018-03-08 20:26:34 +00:00
2019-12-02 09:30:07 +00:00
impl UdpSrc {
async fn prepare ( & self , element : & gst ::Element ) -> Result < ( ) , gst ::ErrorMessage > {
2020-01-07 19:19:46 +00:00
let mut state = self . state . lock ( ) . await ;
let mut settings = self . settings . lock ( ) . await . clone ( ) ;
2018-03-08 20:26:34 +00:00
2020-01-07 19:19:46 +00:00
gst_debug! ( CAT , obj : element , " Preparing " ) ;
2020-01-06 12:19:11 +00:00
2020-01-07 19:19:46 +00:00
let context =
2019-12-02 09:30:07 +00:00
Context ::acquire ( & settings . context , settings . context_wait ) . map_err ( | err | {
2018-11-05 11:43:38 +00:00
gst_error_msg! (
gst ::ResourceError ::OpenRead ,
2019-12-02 09:30:07 +00:00
[ " Failed to acquire Context: {} " , err ]
2018-11-05 11:43:38 +00:00
)
2020-01-07 19:19:46 +00:00
} ) ? ;
2018-03-08 20:26:34 +00:00
2018-11-13 12:13:23 +00:00
let socket = if let Some ( ref wrapped_socket ) = settings . socket {
use std ::net ::UdpSocket ;
2019-06-06 06:26:02 +00:00
let socket : UdpSocket ;
2018-11-13 12:13:23 +00:00
#[ cfg(unix) ]
{
socket = wrapped_socket . get ( )
}
#[ cfg(windows) ]
{
socket = wrapped_socket . get ( )
2018-03-16 18:24:36 +00:00
}
2018-11-13 12:13:23 +00:00
2020-01-07 19:19:46 +00:00
let socket = context . enter ( | | {
tokio ::net ::UdpSocket ::from_std ( socket ) . map_err ( | err | {
gst_error_msg! (
gst ::ResourceError ::OpenRead ,
[ " Failed to setup socket for tokio: {} " , err ]
)
} )
2019-11-30 18:51:31 +00:00
} ) ? ;
2018-11-13 12:13:23 +00:00
2019-12-02 09:30:07 +00:00
settings . used_socket = Some ( wrapped_socket . clone ( ) ) ;
2018-11-13 12:13:23 +00:00
socket
} else {
let addr : IpAddr = match settings . address {
None = > {
2018-03-16 18:24:36 +00:00
return Err ( gst_error_msg! (
gst ::ResourceError ::Settings ,
2018-11-13 12:13:23 +00:00
[ " No address set " ]
2018-12-04 17:01:40 +00:00
) ) ;
2018-03-16 18:24:36 +00:00
}
2018-11-13 12:13:23 +00:00
Some ( ref addr ) = > match addr . parse ( ) {
Err ( err ) = > {
return Err ( gst_error_msg! (
gst ::ResourceError ::Settings ,
[ " Invalid address '{}' set: {} " , addr , err ]
2018-12-04 17:01:40 +00:00
) ) ;
2018-11-13 12:13:23 +00:00
}
Ok ( addr ) = > addr ,
} ,
2018-03-08 20:26:34 +00:00
} ;
2018-11-13 12:13:23 +00:00
let port = settings . port ;
2018-03-08 20:26:34 +00:00
2018-11-13 12:13:23 +00:00
// TODO: TTL, multicast loopback, etc
let saddr = if addr . is_multicast ( ) {
let bind_addr = if addr . is_ipv4 ( ) {
2019-11-30 18:51:31 +00:00
IpAddr ::V4 ( Ipv4Addr ::UNSPECIFIED )
2018-11-13 12:13:23 +00:00
} else {
2019-11-30 18:51:31 +00:00
IpAddr ::V6 ( Ipv6Addr ::UNSPECIFIED )
2018-11-13 12:13:23 +00:00
} ;
2018-05-23 07:32:06 +00:00
2018-11-13 12:13:23 +00:00
let saddr = SocketAddr ::new ( bind_addr , port as u16 ) ;
gst_debug! (
2019-10-31 22:34:21 +00:00
CAT ,
2020-01-07 19:19:46 +00:00
obj : element ,
2018-11-13 12:13:23 +00:00
" Binding to {:?} for multicast group {:?} " ,
saddr ,
addr
) ;
2018-05-23 07:32:06 +00:00
2018-11-13 12:13:23 +00:00
saddr
} else {
let saddr = SocketAddr ::new ( addr , port as u16 ) ;
2020-01-07 19:19:46 +00:00
gst_debug! ( CAT , obj : element , " Binding to {:?} " , saddr ) ;
2018-05-23 07:32:06 +00:00
2018-11-13 12:13:23 +00:00
saddr
} ;
2018-05-23 07:32:06 +00:00
2018-11-13 12:13:23 +00:00
let builder = if addr . is_ipv4 ( ) {
net2 ::UdpBuilder ::new_v4 ( )
} else {
net2 ::UdpBuilder ::new_v6 ( )
}
. map_err ( | err | {
gst_error_msg! (
gst ::ResourceError ::OpenRead ,
[ " Failed to create socket: {} " , err ]
)
} ) ? ;
2018-03-08 20:26:34 +00:00
2018-11-13 12:13:23 +00:00
builder . reuse_address ( settings . reuse ) . map_err ( | err | {
2018-05-23 07:32:06 +00:00
gst_error_msg! (
gst ::ResourceError ::OpenRead ,
2018-11-13 12:13:23 +00:00
[ " Failed to set reuse_address: {} " , err ]
2018-05-23 07:32:06 +00:00
)
} ) ? ;
2018-11-13 12:13:23 +00:00
#[ cfg(unix) ]
{
use net2 ::unix ::UnixUdpBuilderExt ;
builder . reuse_port ( settings . reuse ) . map_err ( | err | {
gst_error_msg! (
gst ::ResourceError ::OpenRead ,
[ " Failed to set reuse_port: {} " , err ]
)
} ) ? ;
}
2018-05-23 17:35:12 +00:00
2018-11-13 12:13:23 +00:00
let socket = builder . bind ( & saddr ) . map_err ( | err | {
2018-07-24 10:40:58 +00:00
gst_error_msg! (
gst ::ResourceError ::OpenRead ,
2018-11-13 12:13:23 +00:00
[ " Failed to bind socket: {} " , err ]
2018-07-24 10:40:58 +00:00
)
} ) ? ;
2018-05-23 17:35:12 +00:00
2020-01-07 19:19:46 +00:00
let socket = context . enter ( | | {
tokio ::net ::UdpSocket ::from_std ( socket ) . map_err ( | err | {
gst_error_msg! (
gst ::ResourceError ::OpenRead ,
[ " Failed to setup socket for tokio: {} " , err ]
)
} )
2019-11-30 18:51:31 +00:00
} ) ? ;
2018-11-13 12:13:23 +00:00
if addr . is_multicast ( ) {
// TODO: Multicast interface configuration, going to be tricky
match addr {
IpAddr ::V4 ( addr ) = > {
socket
2019-11-24 20:12:40 +00:00
. join_multicast_v4 ( addr , Ipv4Addr ::new ( 0 , 0 , 0 , 0 ) )
2018-11-13 12:13:23 +00:00
. map_err ( | err | {
gst_error_msg! (
gst ::ResourceError ::OpenRead ,
[ " Failed to join multicast group: {} " , err ]
)
} ) ? ;
}
IpAddr ::V6 ( addr ) = > {
socket . join_multicast_v6 ( & addr , 0 ) . map_err ( | err | {
2018-03-16 18:24:36 +00:00
gst_error_msg! (
gst ::ResourceError ::OpenRead ,
[ " Failed to join multicast group: {} " , err ]
)
} ) ? ;
2018-11-13 12:13:23 +00:00
}
2018-03-08 20:26:34 +00:00
}
2018-11-13 12:13:23 +00:00
}
// Store the socket as used-socket in the settings
#[ cfg(unix) ]
2018-12-03 11:02:35 +00:00
unsafe {
let fd = libc ::dup ( socket . as_raw_fd ( ) ) ;
2018-11-13 12:13:23 +00:00
2018-12-03 11:02:35 +00:00
// This is unsafe because it allows us to share the fd between the socket and the
// GIO socket below, but safety of this is the job of the application
2018-11-13 12:13:23 +00:00
struct FdConverter ( RawFd ) ;
impl IntoRawFd for FdConverter {
fn into_raw_fd ( self ) -> RawFd {
self . 0
}
2018-03-08 20:26:34 +00:00
}
2018-11-13 12:13:23 +00:00
let fd = FdConverter ( fd ) ;
let gio_socket = gio ::Socket ::new_from_fd ( fd ) . map_err ( | err | {
gst_error_msg! (
gst ::ResourceError ::OpenRead ,
[ " Failed to create wrapped GIO socket: {} " , err ]
)
2019-10-04 09:06:47 +00:00
} ) ? ;
2018-11-13 12:13:23 +00:00
let wrapper = GioSocketWrapper ::new ( & gio_socket ) ;
2019-12-02 09:30:07 +00:00
settings . used_socket = Some ( wrapper ) ;
2018-11-13 12:13:23 +00:00
}
#[ cfg(windows) ]
2018-12-03 11:02:35 +00:00
unsafe {
2019-02-07 08:21:08 +00:00
// FIXME: Needs https://github.com/tokio-rs/tokio/pull/806
// and https://github.com/carllerche/mio/pull/859
let fd = unreachable! ( ) ; //dup_socket(socket.as_raw_socket() as _) as _;
2018-11-13 12:13:23 +00:00
2018-12-03 11:02:35 +00:00
// This is unsafe because it allows us to share the fd between the socket and the
// GIO socket below, but safety of this is the job of the application
2018-11-13 12:13:23 +00:00
struct SocketConverter ( RawSocket ) ;
impl IntoRawSocket for SocketConverter {
fn into_raw_socket ( self ) -> RawSocket {
self . 0
}
}
let fd = SocketConverter ( fd ) ;
let gio_socket = gio ::Socket ::new_from_socket ( fd ) . map_err ( | err | {
gst_error_msg! (
gst ::ResourceError ::OpenRead ,
[ " Failed to create wrapped GIO socket: {} " , err ]
)
2019-10-04 09:06:47 +00:00
} ) ? ;
2018-11-13 12:13:23 +00:00
let wrapper = GioSocketWrapper ::new ( & gio_socket ) ;
2019-12-02 09:30:07 +00:00
settings . used_socket = Some ( wrapper ) ;
2018-03-08 20:26:34 +00:00
}
2018-11-13 12:13:23 +00:00
socket
} ;
2018-03-08 20:26:34 +00:00
let buffer_pool = gst ::BufferPool ::new ( ) ;
let mut config = buffer_pool . get_config ( ) ;
config . set_params ( None , settings . mtu , 0 , 0 ) ;
2019-11-30 18:51:31 +00:00
buffer_pool . set_config ( config ) . map_err ( | err | {
2018-03-16 18:24:36 +00:00
gst_error_msg! (
gst ::ResourceError ::Settings ,
2019-11-30 18:51:31 +00:00
[ " Failed to configure buffer pool {:?} " , err ]
2018-03-16 18:24:36 +00:00
)
} ) ? ;
2018-03-08 20:26:34 +00:00
2018-07-06 07:19:59 +00:00
let socket = Socket ::new ( element . upcast_ref ( ) , UdpReader ::new ( socket ) , buffer_pool ) ;
2019-11-30 18:51:31 +00:00
let socket_stream = socket . prepare ( ) . await . map_err ( | err | {
gst_error_msg! (
gst ::ResourceError ::OpenRead ,
[ " Failed to prepare socket {:?} " , err ]
)
2019-12-02 09:30:07 +00:00
} ) ? ;
2018-03-08 20:26:34 +00:00
2020-01-07 19:19:46 +00:00
{
let mut src_pad_handler_state = self . src_pad_handler . 0. state . write ( ) . unwrap ( ) ;
src_pad_handler_state . retrieve_sender_address = settings . retrieve_sender_address ;
src_pad_handler_state . caps = settings . caps . clone ( ) ;
}
2019-11-30 18:51:31 +00:00
drop ( settings ) ;
2020-01-07 19:19:46 +00:00
* self . src_pad_handler . 0. socket_stream . lock ( ) . await = Some ( socket_stream ) ;
2019-11-30 18:51:31 +00:00
2020-01-07 19:19:46 +00:00
state . socket = Some ( socket ) ;
2019-11-30 18:51:31 +00:00
2020-01-07 19:19:46 +00:00
element . notify ( " used-socket " ) ;
2019-11-30 18:51:31 +00:00
2019-12-02 09:30:07 +00:00
self . src_pad
. prepare ( context , & self . src_pad_handler )
. await
. map_err ( | err | {
gst_error_msg! (
gst ::ResourceError ::OpenRead ,
[ " Error preparing src_pads: {:?} " , err ]
)
2018-03-16 18:24:36 +00:00
} ) ? ;
2018-03-08 20:26:34 +00:00
2020-01-07 19:19:46 +00:00
gst_debug! ( CAT , obj : element , " Prepared " ) ;
2018-03-08 20:26:34 +00:00
Ok ( ( ) )
}
2019-12-02 09:30:07 +00:00
async fn unprepare ( & self , element : & gst ::Element ) -> Result < ( ) , ( ) > {
let mut state = self . state . lock ( ) . await ;
2019-10-31 22:34:21 +00:00
gst_debug! ( CAT , obj : element , " Unpreparing " ) ;
2018-03-08 20:26:34 +00:00
2019-12-02 09:30:07 +00:00
self . settings . lock ( ) . await . used_socket = None ;
2018-11-13 12:13:23 +00:00
2019-12-02 09:30:07 +00:00
self . src_pad . stop_task ( ) . await ;
2018-03-28 09:29:29 +00:00
2020-02-03 20:46:56 +00:00
* self . src_pad_handler . 0. socket_stream . lock ( ) . await = None ;
2019-12-02 09:30:07 +00:00
{
let socket = state . socket . take ( ) . unwrap ( ) ;
socket . unprepare ( ) . await . unwrap ( ) ;
2018-03-26 14:49:42 +00:00
}
2019-12-02 09:30:07 +00:00
let _ = self . src_pad . unprepare ( ) . await ;
2020-01-06 12:19:11 +00:00
self . src_pad_handler
. 0
. state
. write ( )
. unwrap ( )
. configured_caps = None ;
2018-03-08 20:26:34 +00:00
2019-10-31 22:34:21 +00:00
gst_debug! ( CAT , obj : element , " Unprepared " ) ;
2018-03-08 20:26:34 +00:00
Ok ( ( ) )
}
2019-12-02 09:30:07 +00:00
async fn start ( & self , element : & gst ::Element ) -> Result < ( ) , ( ) > {
let state = self . state . lock ( ) . await ;
2019-10-31 22:34:21 +00:00
gst_debug! ( CAT , obj : element , " Starting " ) ;
2018-03-08 20:26:34 +00:00
if let Some ( ref socket ) = state . socket {
2019-12-02 09:30:07 +00:00
socket
. start ( element . get_clock ( ) , Some ( element . get_base_time ( ) ) )
. await ;
2018-03-08 20:26:34 +00:00
}
2019-12-02 09:30:07 +00:00
self . src_pad_handler
. start_task ( self . src_pad . as_ref ( ) , element )
. await ;
2019-10-31 22:34:21 +00:00
gst_debug! ( CAT , obj : element , " Started " ) ;
2018-03-08 20:26:34 +00:00
Ok ( ( ) )
}
2019-12-02 09:30:07 +00:00
async fn pause ( & self , element : & gst ::Element ) -> Result < ( ) , ( ) > {
let pause_completion = {
let state = self . state . lock ( ) . await ;
gst_debug! ( CAT , obj : element , " Pausing " ) ;
2018-03-08 20:26:34 +00:00
2019-12-02 09:30:07 +00:00
let pause_completion = self . src_pad . pause_task ( ) . await ;
state . socket . as_ref ( ) . unwrap ( ) . pause ( ) . await ;
2019-11-24 20:12:40 +00:00
2019-12-02 09:30:07 +00:00
pause_completion
} ;
gst_debug! ( CAT , obj : element , " Waiting for Task Pause to complete " ) ;
pause_completion . await ;
2018-03-08 20:26:34 +00:00
2019-12-02 09:30:07 +00:00
gst_debug! ( CAT , obj : element , " Paused " ) ;
2018-03-08 20:26:34 +00:00
Ok ( ( ) )
}
2018-03-06 09:38:27 +00:00
}
2018-12-06 11:03:04 +00:00
impl ObjectSubclass for UdpSrc {
const NAME : & 'static str = " RsTsUdpSrc " ;
type ParentType = gst ::Element ;
type Instance = gst ::subclass ::ElementInstanceStruct < Self > ;
type Class = subclass ::simple ::ClassStruct < Self > ;
glib_object_subclass! ( ) ;
fn class_init ( klass : & mut subclass ::simple ::ClassStruct < Self > ) {
klass . set_metadata (
" Thread-sharing UDP source " ,
" Source/Network " ,
" Receives data over the network via UDP " ,
" Sebastian Dröge <sebastian@centricular.com> " ,
) ;
let caps = gst ::Caps ::new_any ( ) ;
let src_pad_template = gst ::PadTemplate ::new (
" src " ,
gst ::PadDirection ::Src ,
gst ::PadPresence ::Always ,
& caps ,
2019-01-29 15:26:40 +00:00
)
. unwrap ( ) ;
2018-12-06 11:03:04 +00:00
klass . add_pad_template ( src_pad_template ) ;
2019-02-07 08:21:08 +00:00
#[ cfg(not(windows)) ]
{
klass . install_properties ( & PROPERTIES ) ;
}
#[ cfg(windows) ]
{
let properties = PROPERTIES
. iter ( )
. filter ( | p | match * p {
subclass ::Property ( " socket " , .. ) | subclass ::Property ( " used-socket " , .. ) = > {
false
}
_ = > true ,
} )
. collect ::< Vec < _ > > ( ) ;
klass . install_properties ( properties . as_slice ( ) ) ;
}
2018-12-06 11:03:04 +00:00
}
fn new_with_class ( klass : & subclass ::simple ::ClassStruct < Self > ) -> Self {
let templ = klass . get_pad_template ( " src " ) . unwrap ( ) ;
2019-12-02 09:30:07 +00:00
let src_pad = PadSrc ::new_from_template ( & templ , Some ( " src " ) ) ;
2018-12-06 11:03:04 +00:00
Self {
2019-02-21 18:12:09 +00:00
src_pad ,
2020-01-06 12:19:11 +00:00
src_pad_handler : UdpSrcPadHandler ::default ( ) ,
2018-12-06 11:03:04 +00:00
state : Mutex ::new ( State ::default ( ) ) ,
settings : Mutex ::new ( Settings ::default ( ) ) ,
}
}
}
impl ObjectImpl for UdpSrc {
glib_object_impl! ( ) ;
fn set_property ( & self , _obj : & glib ::Object , id : usize , value : & glib ::Value ) {
let prop = & PROPERTIES [ id ] ;
2018-03-08 20:26:34 +00:00
2020-01-02 21:32:52 +00:00
let mut settings = runtime ::executor ::block_on ( self . settings . lock ( ) ) ;
2018-03-08 20:26:34 +00:00
match * prop {
2018-12-06 11:03:04 +00:00
subclass ::Property ( " address " , .. ) = > {
2019-08-12 22:45:36 +00:00
settings . address = value . get ( ) . expect ( " type checked upstream " ) ;
2018-03-08 20:26:34 +00:00
}
2018-12-06 11:03:04 +00:00
subclass ::Property ( " port " , .. ) = > {
2019-08-12 22:45:36 +00:00
settings . port = value . get_some ( ) . expect ( " type checked upstream " ) ;
2018-03-08 20:26:34 +00:00
}
2018-12-06 11:03:04 +00:00
subclass ::Property ( " reuse " , .. ) = > {
2019-08-12 22:45:36 +00:00
settings . reuse = value . get_some ( ) . expect ( " type checked upstream " ) ;
2018-05-23 07:32:06 +00:00
}
2018-12-06 11:03:04 +00:00
subclass ::Property ( " caps " , .. ) = > {
2019-08-12 22:45:36 +00:00
settings . caps = value . get ( ) . expect ( " type checked upstream " ) ;
2018-03-08 20:26:34 +00:00
}
2018-12-06 11:03:04 +00:00
subclass ::Property ( " mtu " , .. ) = > {
2019-08-12 22:45:36 +00:00
settings . mtu = value . get_some ( ) . expect ( " type checked upstream " ) ;
2018-03-08 20:26:34 +00:00
}
2018-12-06 11:03:04 +00:00
subclass ::Property ( " socket " , .. ) = > {
2018-11-13 12:13:23 +00:00
settings . socket = value
. get ::< gio ::Socket > ( )
2019-08-12 22:45:36 +00:00
. expect ( " type checked upstream " )
2018-11-13 12:13:23 +00:00
. map ( | socket | GioSocketWrapper ::new ( & socket ) ) ;
}
2018-12-06 11:03:04 +00:00
subclass ::Property ( " used-socket " , .. ) = > {
2018-11-13 12:13:23 +00:00
unreachable! ( ) ;
}
2018-12-06 11:03:04 +00:00
subclass ::Property ( " context " , .. ) = > {
2019-08-12 22:45:36 +00:00
settings . context = value
. get ( )
. expect ( " type checked upstream " )
. unwrap_or_else ( | | " " . into ( ) ) ;
2018-03-08 20:26:34 +00:00
}
2018-12-06 11:03:04 +00:00
subclass ::Property ( " context-wait " , .. ) = > {
2019-08-12 22:45:36 +00:00
settings . context_wait = value . get_some ( ) . expect ( " type checked upstream " ) ;
2018-03-12 14:19:46 +00:00
}
2019-03-13 16:07:53 +00:00
subclass ::Property ( " retrieve-sender-address " , .. ) = > {
2019-08-12 22:45:36 +00:00
settings . retrieve_sender_address = value . get_some ( ) . expect ( " type checked upstream " ) ;
2019-03-13 16:07:53 +00:00
}
2018-03-08 20:26:34 +00:00
_ = > unimplemented! ( ) ,
}
}
2018-12-06 11:03:04 +00:00
fn get_property ( & self , _obj : & glib ::Object , id : usize ) -> Result < glib ::Value , ( ) > {
let prop = & PROPERTIES [ id ] ;
2018-03-08 20:26:34 +00:00
2020-01-02 21:32:52 +00:00
let settings = runtime ::executor ::block_on ( self . settings . lock ( ) ) ;
2018-03-08 20:26:34 +00:00
match * prop {
2020-01-02 21:32:52 +00:00
subclass ::Property ( " address " , .. ) = > Ok ( settings . address . to_value ( ) ) ,
subclass ::Property ( " port " , .. ) = > Ok ( settings . port . to_value ( ) ) ,
subclass ::Property ( " reuse " , .. ) = > Ok ( settings . reuse . to_value ( ) ) ,
subclass ::Property ( " caps " , .. ) = > Ok ( settings . caps . to_value ( ) ) ,
subclass ::Property ( " mtu " , .. ) = > Ok ( settings . mtu . to_value ( ) ) ,
subclass ::Property ( " socket " , .. ) = > Ok ( settings
. socket
. as_ref ( )
. map ( GioSocketWrapper ::as_socket )
. to_value ( ) ) ,
subclass ::Property ( " used-socket " , .. ) = > Ok ( settings
. used_socket
. as_ref ( )
. map ( GioSocketWrapper ::as_socket )
. to_value ( ) ) ,
subclass ::Property ( " context " , .. ) = > Ok ( settings . context . to_value ( ) ) ,
subclass ::Property ( " context-wait " , .. ) = > Ok ( settings . context_wait . to_value ( ) ) ,
2019-03-13 16:07:53 +00:00
subclass ::Property ( " retrieve-sender-address " , .. ) = > {
Ok ( settings . retrieve_sender_address . to_value ( ) )
}
2018-03-08 20:26:34 +00:00
_ = > unimplemented! ( ) ,
}
}
2018-12-06 11:03:04 +00:00
fn constructed ( & self , obj : & glib ::Object ) {
self . parent_constructed ( obj ) ;
let element = obj . downcast_ref ::< gst ::Element > ( ) . unwrap ( ) ;
2019-12-02 09:30:07 +00:00
element . add_pad ( self . src_pad . gst_pad ( ) ) . unwrap ( ) ;
2019-11-24 20:12:40 +00:00
super ::set_element_flags ( element , gst ::ElementFlags ::SOURCE ) ;
2018-12-06 11:03:04 +00:00
}
2018-03-08 20:26:34 +00:00
}
2018-03-06 09:38:27 +00:00
2018-12-06 11:03:04 +00:00
impl ElementImpl for UdpSrc {
2018-03-06 09:38:27 +00:00
fn change_state (
& self ,
2018-12-06 11:03:04 +00:00
element : & gst ::Element ,
2018-03-06 09:38:27 +00:00
transition : gst ::StateChange ,
2019-01-11 23:45:05 +00:00
) -> Result < gst ::StateChangeSuccess , gst ::StateChangeError > {
2019-10-31 22:34:21 +00:00
gst_trace! ( CAT , obj : element , " Changing state {:?} " , transition ) ;
2018-03-06 09:38:27 +00:00
match transition {
2019-01-11 23:45:05 +00:00
gst ::StateChange ::NullToReady = > {
2020-01-02 21:32:52 +00:00
runtime ::executor ::block_on ( self . prepare ( element ) ) . map_err ( | err | {
2019-11-30 18:51:31 +00:00
element . post_error_message ( & err ) ;
gst ::StateChangeError
} ) ? ;
}
2019-01-11 23:45:05 +00:00
gst ::StateChange ::PlayingToPaused = > {
2020-01-02 21:32:52 +00:00
runtime ::executor ::block_on ( self . pause ( element ) )
. map_err ( | _ | gst ::StateChangeError ) ? ;
2019-01-11 23:45:05 +00:00
}
gst ::StateChange ::ReadyToNull = > {
2020-01-02 21:32:52 +00:00
runtime ::executor ::block_on ( self . unprepare ( element ) )
. map_err ( | _ | gst ::StateChangeError ) ? ;
2019-01-11 23:45:05 +00:00
}
2018-03-06 09:38:27 +00:00
_ = > ( ) ,
}
2019-01-11 23:45:05 +00:00
let mut success = self . parent_change_state ( element , transition ) ? ;
2018-03-06 09:38:27 +00:00
match transition {
gst ::StateChange ::ReadyToPaused = > {
2019-01-11 23:45:05 +00:00
success = gst ::StateChangeSuccess ::NoPreroll ;
}
gst ::StateChange ::PausedToPlaying = > {
2020-01-02 21:32:52 +00:00
runtime ::executor ::block_on ( self . start ( element ) )
. map_err ( | _ | gst ::StateChangeError ) ? ;
2018-03-06 09:38:27 +00:00
}
2020-01-23 14:56:11 +00:00
gst ::StateChange ::PlayingToPaused = > {
success = gst ::StateChangeSuccess ::NoPreroll ;
}
2018-03-08 20:26:34 +00:00
gst ::StateChange ::PausedToReady = > {
2020-01-06 12:19:11 +00:00
self . src_pad_handler
. 0
. state
. write ( )
. unwrap ( )
. need_initial_events = true ;
2018-03-08 20:26:34 +00:00
}
2018-03-06 09:38:27 +00:00
_ = > ( ) ,
}
2019-01-11 23:45:05 +00:00
Ok ( success )
2018-03-06 09:38:27 +00:00
}
}
2018-11-05 11:43:11 +00:00
pub fn register ( plugin : & gst ::Plugin ) -> Result < ( ) , glib ::BoolError > {
2019-06-04 09:04:06 +00:00
gst ::Element ::register (
Some ( plugin ) ,
" ts-udpsrc " ,
gst ::Rank ::None ,
UdpSrc ::get_type ( ) ,
)
2018-03-06 09:38:27 +00:00
}