diff --git a/gst-plugin-threadshare/Cargo.toml b/gst-plugin-threadshare/Cargo.toml index 961ac0f6..967c1b82 100644 --- a/gst-plugin-threadshare/Cargo.toml +++ b/gst-plugin-threadshare/Cargo.toml @@ -12,10 +12,8 @@ gio-sys = { git = "https://github.com/gtk-rs/sys" } gstreamer-sys = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs-sys" } glib = { git = "https://github.com/gtk-rs/glib", features = ["subclassing"] } gio = { git = "https://github.com/gtk-rs/gio" } -gstreamer = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } +gstreamer = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["subclassing"] } gstreamer-check = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } -gobject-subclass = { git = "https://github.com/gtk-rs/gobject-subclass" } -gst-plugin = { path = "../gst-plugin" } tokio = "0.1" tokio-reactor = "0.1" tokio-executor = "0.1" diff --git a/gst-plugin-threadshare/src/appsrc.rs b/gst-plugin-threadshare/src/appsrc.rs index d08df87b..32e4e973 100644 --- a/gst-plugin-threadshare/src/appsrc.rs +++ b/gst-plugin-threadshare/src/appsrc.rs @@ -17,11 +17,11 @@ use glib; use glib::prelude::*; +use glib::subclass; +use glib::subclass::prelude::*; use gst; use gst::prelude::*; - -use gobject_subclass::object::*; -use gst_plugin::element::*; +use gst::subclass::prelude::*; use std::sync::Mutex; use std::u32; @@ -63,44 +63,56 @@ impl Default for Settings { } } -static PROPERTIES: [Property; 5] = [ - Property::String( - "context", - "Context", - "Context name to share threads with", - Some(DEFAULT_CONTEXT), - PropertyMutability::ReadWrite, - ), - Property::UInt( - "context-wait", - "Context Wait", - "Throttle poll loop to run at most once every this many ms", - (0, 1000), - DEFAULT_CONTEXT_WAIT, - PropertyMutability::ReadWrite, - ), - Property::UInt( - "max-buffers", - "Max Buffers", - "Maximum number of buffers to queue up", - (1, u32::MAX), - DEFAULT_MAX_BUFFERS, - PropertyMutability::ReadWrite, - ), - Property::Boxed( - "caps", - "Caps", - "Caps to use", - gst::Caps::static_type, - PropertyMutability::ReadWrite, - ), - Property::Boolean( - "do-timestamp", - "Do Timestamp", - "Timestamp buffers with the current running time on arrival", - DEFAULT_DO_TIMESTAMP, - PropertyMutability::ReadWrite, - ), +static PROPERTIES: [subclass::Property; 5] = [ + subclass::Property("context", || { + glib::ParamSpec::string( + "context", + "Context", + "Context name to share threads with", + Some(DEFAULT_CONTEXT), + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("context-wait", || { + glib::ParamSpec::uint( + "context-wait", + "Context Wait", + "Throttle poll loop to run at most once every this many ms", + 0, + 1000, + DEFAULT_CONTEXT_WAIT, + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("max-buffers", || { + glib::ParamSpec::uint( + "max-buffers", + "Max Buffers", + "Maximum number of buffers to queue up", + 1, + u32::MAX, + DEFAULT_MAX_BUFFERS, + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("caps", || { + glib::ParamSpec::boxed( + "caps", + "Caps", + "Caps to use", + gst::Caps::static_type(), + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("do-timestamp", || { + glib::ParamSpec::boolean( + "do-timestamp", + "Do Timestamp", + "Timestamp buffers with the current running time on arrival", + DEFAULT_DO_TIMESTAMP, + glib::ParamFlags::READWRITE, + ) + }), ]; struct State { @@ -133,88 +145,6 @@ struct AppSrc { } impl AppSrc { - fn class_init(klass: &mut ElementClass) { - klass.set_metadata( - "Thread-sharing app source", - "Source/Generic", - "Thread-sharing app source", - "Sebastian Dröge ", - ); - - let caps = gst::Caps::new_any(); - - let src_pad_template = gst::PadTemplate::new( - "src", - gst::PadDirection::Src, - gst::PadPresence::Always, - &caps, - ); - klass.add_pad_template(src_pad_template); - - klass.install_properties(&PROPERTIES); - - klass.add_action_signal( - "push-buffer", - &[gst::Buffer::static_type()], - bool::static_type(), - |args| { - let element = args[0] - .get::() - .unwrap() - .downcast::() - .unwrap(); - let buffer = args[1].get::().unwrap(); - let appsrc = element.get_impl().downcast_ref::().unwrap(); - - Some(appsrc.push_buffer(&element, buffer).to_value()) - }, - ); - - klass.add_action_signal("end-of-stream", &[], bool::static_type(), |args| { - let element = args[0] - .get::() - .unwrap() - .downcast::() - .unwrap(); - let appsrc = element.get_impl().downcast_ref::().unwrap(); - Some(appsrc.end_of_stream(&element).to_value()) - }); - } - - fn init(element: &Element) -> Box> { - let templ = element.get_pad_template("src").unwrap(); - let src_pad = gst::Pad::new_from_template(&templ, "src"); - - src_pad.set_event_function(|pad, parent, event| { - AppSrc::catch_panic_pad_function( - parent, - || false, - |queue, element| queue.src_event(pad, element, event), - ) - }); - src_pad.set_query_function(|pad, parent, query| { - AppSrc::catch_panic_pad_function( - parent, - || false, - |queue, element| queue.src_query(pad, element, query), - ) - }); - element.add_pad(&src_pad).unwrap(); - - ::set_element_flags(element, gst::ElementFlags::SOURCE); - - Box::new(Self { - cat: gst::DebugCategory::new( - "ts-appsrc", - gst::DebugColorFlags::empty(), - "Thread-sharing app source", - ), - src_pad: src_pad, - state: Mutex::new(State::default()), - settings: Mutex::new(Settings::default()), - }) - } - fn create_io_context_event(state: &State) -> Option { if let (&Some(ref pending_future_id), &Some(ref io_context)) = (&state.pending_future_id, &state.io_context) @@ -232,7 +162,7 @@ impl AppSrc { } } - fn src_event(&self, pad: &gst::Pad, element: &Element, event: gst::Event) -> bool { + fn src_event(&self, pad: &gst::Pad, element: &gst::Element, event: gst::Event) -> bool { use gst::EventView; gst_log!(self.cat, obj: pad, "Handling event {:?}", event); @@ -265,7 +195,12 @@ impl AppSrc { ret } - fn src_query(&self, pad: &gst::Pad, _element: &Element, query: &mut gst::QueryRef) -> bool { + fn src_query( + &self, + pad: &gst::Pad, + _element: &gst::Element, + query: &mut gst::QueryRef, + ) -> bool { use gst::QueryView; gst_log!(self.cat, obj: pad, "Handling query {:?}", query); @@ -306,7 +241,7 @@ impl AppSrc { ret } - fn push_buffer(&self, element: &Element, mut buffer: gst::Buffer) -> bool { + fn push_buffer(&self, element: &gst::Element, mut buffer: gst::Buffer) -> bool { let settings = self.settings.lock().unwrap().clone(); if settings.do_timestamp { @@ -337,7 +272,7 @@ impl AppSrc { } } - fn end_of_stream(&self, element: &Element) -> bool { + fn end_of_stream(&self, element: &gst::Element) -> bool { let mut state = self.state.lock().unwrap(); if let Some(ref mut channel) = state.channel { match channel.try_send(Either::Right(gst::Event::new_eos().build())) { @@ -354,7 +289,7 @@ impl AppSrc { fn push_item( &self, - element: &Element, + element: &gst::Element, item: Either, ) -> future::Either< Box + Send + 'static>, @@ -449,7 +384,7 @@ impl AppSrc { } } - fn prepare(&self, element: &Element) -> Result<(), gst::ErrorMessage> { + fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> { gst_debug!(self.cat, obj: element, "Preparing"); let settings = self.settings.lock().unwrap().clone(); @@ -480,7 +415,7 @@ impl AppSrc { Ok(()) } - fn unprepare(&self, element: &Element) -> Result<(), ()> { + fn unprepare(&self, element: &gst::Element) -> Result<(), ()> { gst_debug!(self.cat, obj: element, "Unpreparing"); let mut state = self.state.lock().unwrap(); @@ -498,7 +433,7 @@ impl AppSrc { Ok(()) } - fn start(&self, element: &Element) -> Result<(), ()> { + fn start(&self, element: &gst::Element) -> Result<(), ()> { gst_debug!(self.cat, obj: element, "Starting"); let settings = self.settings.lock().unwrap().clone(); let mut state = self.state.lock().unwrap(); @@ -515,7 +450,7 @@ impl AppSrc { let element_clone = element.clone(); let future = channel_receiver.for_each(move |item| { - let appsrc = element_clone.get_impl().downcast_ref::().unwrap(); + let appsrc = Self::from_instance(&element_clone); appsrc.push_item(&element_clone, item) }); io_context.spawn(future); @@ -526,7 +461,7 @@ impl AppSrc { Ok(()) } - fn stop(&self, element: &Element) -> Result<(), ()> { + fn stop(&self, element: &gst::Element) -> Result<(), ()> { gst_debug!(self.cat, obj: element, "Stopping"); let mut state = self.state.lock().unwrap(); @@ -539,28 +474,114 @@ impl AppSrc { } } -impl ObjectImpl for AppSrc { - fn set_property(&self, _obj: &glib::Object, id: u32, value: &glib::Value) { - let prop = &PROPERTIES[id as usize]; +impl ObjectSubclass for AppSrc { + const NAME: &'static str = "RsTsAppSrc"; + type ParentType = gst::Element; + type Instance = gst::subclass::ElementInstanceStruct; + type Class = subclass::simple::ClassStruct; + + glib_object_subclass!(); + + fn class_init(klass: &mut subclass::simple::ClassStruct) { + klass.set_metadata( + "Thread-sharing app source", + "Source/Generic", + "Thread-sharing app source", + "Sebastian Dröge ", + ); + + let caps = gst::Caps::new_any(); + + let src_pad_template = gst::PadTemplate::new( + "src", + gst::PadDirection::Src, + gst::PadPresence::Always, + &caps, + ); + klass.add_pad_template(src_pad_template); + + klass.install_properties(&PROPERTIES); + + klass.add_action_signal( + "push-buffer", + &[gst::Buffer::static_type()], + bool::static_type(), + |args| { + let element = args[0].get::().unwrap(); + let buffer = args[1].get::().unwrap(); + let appsrc = Self::from_instance(&element); + + Some(appsrc.push_buffer(&element, buffer).to_value()) + }, + ); + + klass.add_action_signal("end-of-stream", &[], bool::static_type(), |args| { + let element = args[0].get::().unwrap(); + let appsrc = Self::from_instance(&element); + Some(appsrc.end_of_stream(&element).to_value()) + }); + } + + fn new() -> Self { + unreachable!() + } + + fn new_with_class(klass: &subclass::simple::ClassStruct) -> Self { + let templ = klass.get_pad_template("src").unwrap(); + let src_pad = gst::Pad::new_from_template(&templ, "src"); + + src_pad.set_event_function(|pad, parent, event| { + AppSrc::catch_panic_pad_function( + parent, + || false, + |queue, element| queue.src_event(pad, element, event), + ) + }); + src_pad.set_query_function(|pad, parent, query| { + AppSrc::catch_panic_pad_function( + parent, + || false, + |queue, element| queue.src_query(pad, element, query), + ) + }); + + Self { + cat: gst::DebugCategory::new( + "ts-appsrc", + gst::DebugColorFlags::empty(), + "Thread-sharing app source", + ), + src_pad: src_pad, + state: Mutex::new(State::default()), + settings: Mutex::new(Settings::default()), + } + } +} + +impl ObjectImpl for AppSrc { + glib_object_impl!(); + + fn set_property(&self, _obj: &glib::Object, id: usize, value: &glib::Value) { + let prop = &PROPERTIES[id]; match *prop { - Property::String("context", ..) => { + subclass::Property("context", ..) => { let mut settings = self.settings.lock().unwrap(); settings.context = value.get().unwrap_or_else(|| "".into()); } - Property::UInt("context-wait", ..) => { + subclass::Property("context-wait", ..) => { let mut settings = self.settings.lock().unwrap(); settings.context_wait = value.get().unwrap(); } - Property::Boxed("caps", ..) => { + subclass::Property("caps", ..) => { let mut settings = self.settings.lock().unwrap(); settings.caps = value.get(); } - Property::UInt("max-buffers", ..) => { + subclass::Property("max-buffers", ..) => { let mut settings = self.settings.lock().unwrap(); settings.max_buffers = value.get().unwrap(); } - Property::Boolean("do-timestamp", ..) => { + subclass::Property("do-timestamp", ..) => { let mut settings = self.settings.lock().unwrap(); settings.do_timestamp = value.get().unwrap(); } @@ -568,39 +589,48 @@ impl ObjectImpl for AppSrc { } } - fn get_property(&self, _obj: &glib::Object, id: u32) -> Result { - let prop = &PROPERTIES[id as usize]; + fn get_property(&self, _obj: &glib::Object, id: usize) -> Result { + let prop = &PROPERTIES[id]; match *prop { - Property::String("context", ..) => { + subclass::Property("context", ..) => { let mut settings = self.settings.lock().unwrap(); Ok(settings.context.to_value()) } - Property::UInt("context-wait", ..) => { + subclass::Property("context-wait", ..) => { let mut settings = self.settings.lock().unwrap(); Ok(settings.context_wait.to_value()) } - Property::Boxed("caps", ..) => { + subclass::Property("caps", ..) => { let mut settings = self.settings.lock().unwrap(); Ok(settings.caps.to_value()) } - Property::UInt("max-buffers", ..) => { + subclass::Property("max-buffers", ..) => { let mut settings = self.settings.lock().unwrap(); Ok(settings.max_buffers.to_value()) } - Property::Boolean("do-timestamp", ..) => { + subclass::Property("do-timestamp", ..) => { let mut settings = self.settings.lock().unwrap(); Ok(settings.do_timestamp.to_value()) } _ => unimplemented!(), } } + + fn constructed(&self, obj: &glib::Object) { + self.parent_constructed(obj); + + let element = obj.downcast_ref::().unwrap(); + element.add_pad(&self.src_pad).unwrap(); + + ::set_element_flags(element, gst::ElementFlags::SOURCE); + } } -impl ElementImpl for AppSrc { +impl ElementImpl for AppSrc { fn change_state( &self, - element: &Element, + element: &gst::Element, transition: gst::StateChange, ) -> gst::StateChangeReturn { gst_trace!(self.cat, obj: element, "Changing state {:?}", transition); @@ -624,7 +654,7 @@ impl ElementImpl for AppSrc { _ => (), } - let mut ret = element.parent_change_state(transition); + let mut ret = self.parent_change_state(element, transition); if ret == gst::StateChangeReturn::Failure { return ret; } @@ -648,23 +678,6 @@ impl ElementImpl for AppSrc { } } -struct AppSrcStatic; - -impl ImplTypeStatic for AppSrcStatic { - fn get_name(&self) -> &str { - "AppSrc" - } - - fn new(&self, element: &Element) -> Box> { - AppSrc::init(element) - } - - fn class_init(&self, klass: &mut ElementClass) { - AppSrc::class_init(klass); - } -} - pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { - let type_ = register_type(AppSrcStatic); - gst::Element::register(plugin, "ts-appsrc", 0, type_) + gst::Element::register(plugin, "ts-appsrc", 0, AppSrc::get_type()) } diff --git a/gst-plugin-threadshare/src/lib.rs b/gst-plugin-threadshare/src/lib.rs index 65f8fd30..47d8e574 100644 --- a/gst-plugin-threadshare/src/lib.rs +++ b/gst-plugin-threadshare/src/lib.rs @@ -27,9 +27,6 @@ extern crate gstreamer_sys as gst_ffi; extern crate gio; #[macro_use] extern crate glib; -extern crate gobject_subclass; -#[macro_use] -extern crate gst_plugin; #[macro_use] extern crate gstreamer as gst; @@ -73,16 +70,16 @@ fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { Ok(()) } -plugin_define!( - b"threadshare\0", - b"Threadshare Plugin\0", +gst_plugin_define!( + "threadshare", + "Threadshare Plugin", plugin_init, - b"0.1.0\0", - b"LGPL\0", - b"threadshare\0", - b"threadshare\0", - b"https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs\0", - b"2018-03-01\0" + "0.1.0", + "LGPL", + "threadshare", + "threadshare", + "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs", + "2018-03-01" ); pub fn set_element_flags + glib::IsA>( diff --git a/gst-plugin-threadshare/src/proxy.rs b/gst-plugin-threadshare/src/proxy.rs index 93ce47aa..08fac9a7 100644 --- a/gst-plugin-threadshare/src/proxy.rs +++ b/gst-plugin-threadshare/src/proxy.rs @@ -17,11 +17,11 @@ use glib; use glib::prelude::*; +use glib::subclass; +use glib::subclass::prelude::*; use gst; use gst::prelude::*; - -use gobject_subclass::object::*; -use gst_plugin::element::*; +use gst::subclass::prelude::*; use std::collections::HashMap; use std::collections::VecDeque; @@ -87,62 +87,80 @@ impl Default for SettingsSrc { } } -static PROPERTIES_SRC: [Property; 6] = [ - Property::UInt( - "max-size-buffers", - "Max Size Buffers", - "Maximum number of buffers to queue (0=unlimited)", - (0, u32::MAX), - DEFAULT_MAX_SIZE_BUFFERS, - PropertyMutability::ReadWrite, - ), - Property::UInt( - "max-size-bytes", - "Max Size Bytes", - "Maximum number of bytes to queue (0=unlimited)", - (0, u32::MAX), - DEFAULT_MAX_SIZE_BYTES, - PropertyMutability::ReadWrite, - ), - Property::UInt64( - "max-size-time", - "Max Size Time", - "Maximum number of nanoseconds to queue (0=unlimited)", - (0, u64::MAX - 1), - DEFAULT_MAX_SIZE_TIME, - PropertyMutability::ReadWrite, - ), - Property::String( - "context", - "Context", - "Context name to share threads with", - Some(DEFAULT_CONTEXT), - PropertyMutability::ReadWrite, - ), - Property::UInt( - "context-wait", - "Context Wait", - "Throttle poll loop to run at most once every this many ms", - (0, 1000), - DEFAULT_CONTEXT_WAIT, - PropertyMutability::ReadWrite, - ), - Property::String( +static PROPERTIES_SRC: [subclass::Property; 6] = [ + subclass::Property("max-size-buffers", || { + glib::ParamSpec::uint( + "max-size-buffers", + "Max Size Buffers", + "Maximum number of buffers to queue (0=unlimited)", + 0, + u32::MAX, + DEFAULT_MAX_SIZE_BUFFERS, + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("max-size-bytes", || { + glib::ParamSpec::uint( + "max-size-bytes", + "Max Size Bytes", + "Maximum number of bytes to queue (0=unlimited)", + 0, + u32::MAX, + DEFAULT_MAX_SIZE_BYTES, + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("max-size-time", || { + glib::ParamSpec::uint64( + "max-size-time", + "Max Size Time", + "Maximum number of nanoseconds to queue (0=unlimited)", + 0, + u64::MAX - 1, + DEFAULT_MAX_SIZE_TIME, + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("context", || { + glib::ParamSpec::string( + "context", + "Context", + "Context name to share threads with", + Some(DEFAULT_CONTEXT), + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("context-wait", || { + glib::ParamSpec::uint( + "context-wait", + "Context Wait", + "Throttle poll loop to run at most once every this many ms", + 0, + 1000, + DEFAULT_CONTEXT_WAIT, + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("proxy-context", || { + glib::ParamSpec::string( + "proxy-context", + "Proxy Context", + "Context name of the proxy to share with", + Some(DEFAULT_PROXY_CONTEXT), + glib::ParamFlags::READWRITE, + ) + }), +]; + +static PROPERTIES_SINK: [subclass::Property; 1] = [subclass::Property("proxy-context", || { + glib::ParamSpec::string( "proxy-context", "Proxy Context", "Context name of the proxy to share with", Some(DEFAULT_PROXY_CONTEXT), - PropertyMutability::ReadWrite, - ), -]; - -static PROPERTIES_SINK: [Property; 1] = [Property::String( - "proxy-context", - "Proxy Context", - "Context name of the proxy to share with", - Some(DEFAULT_PROXY_CONTEXT), - PropertyMutability::ReadWrite, -)]; + glib::ParamFlags::READWRITE, + ) +})]; // TODO: Refactor into a Sender and Receiver instead of the have_ booleans @@ -273,80 +291,10 @@ struct ProxySink { } impl ProxySink { - fn class_init(klass: &mut ElementClass) { - klass.set_metadata( - "Thread-sharing proxy sink", - "Sink/Generic", - "Thread-sharing proxy sink", - "Sebastian Dröge ", - ); - - let caps = gst::Caps::new_any(); - - let sink_pad_template = gst::PadTemplate::new( - "sink", - gst::PadDirection::Sink, - gst::PadPresence::Always, - &caps, - ); - klass.add_pad_template(sink_pad_template); - - klass.install_properties(&PROPERTIES_SINK); - } - - fn init(element: &Element) -> Box> { - let templ = element.get_pad_template("sink").unwrap(); - let sink_pad = gst::Pad::new_from_template(&templ, "sink"); - - sink_pad.set_chain_function(|pad, parent, buffer| { - ProxySink::catch_panic_pad_function( - parent, - || gst::FlowReturn::Error, - |queue, element| queue.sink_chain(pad, element, buffer), - ) - }); - sink_pad.set_chain_list_function(|pad, parent, list| { - ProxySink::catch_panic_pad_function( - parent, - || gst::FlowReturn::Error, - |queue, element| queue.sink_chain_list(pad, element, list), - ) - }); - sink_pad.set_event_function(|pad, parent, event| { - ProxySink::catch_panic_pad_function( - parent, - || false, - |queue, element| queue.sink_event(pad, element, event), - ) - }); - sink_pad.set_query_function(|pad, parent, query| { - ProxySink::catch_panic_pad_function( - parent, - || false, - |queue, element| queue.sink_query(pad, element, query), - ) - }); - - element.add_pad(&sink_pad).unwrap(); - - ::set_element_flags(element, gst::ElementFlags::SINK); - - Box::new(Self { - cat: gst::DebugCategory::new( - "ts-proxysink", - gst::DebugColorFlags::empty(), - "Thread-sharing proxy sink", - ), - sink_pad: sink_pad, - state: Mutex::new(StateSink::default()), - settings: Mutex::new(SettingsSink::default()), - }) - } - fn enqueue_item( &self, _pad: &gst::Pad, - element: &Element, + element: &gst::Element, item: DataQueueItem, ) -> gst::FlowReturn { let wait_future = { @@ -426,10 +374,7 @@ impl ProxySink { let element_clone = element.clone(); let future = future::poll_fn(move || { - let sink = element_clone - .get_impl() - .downcast_ref::() - .unwrap(); + let sink = Self::from_instance(&element_clone); let state = sink.state.lock().unwrap(); gst_log!( @@ -549,7 +494,7 @@ impl ProxySink { fn sink_chain( &self, pad: &gst::Pad, - element: &Element, + element: &gst::Element, buffer: gst::Buffer, ) -> gst::FlowReturn { gst_log!(self.cat, obj: pad, "Handling buffer {:?}", buffer); @@ -559,14 +504,14 @@ impl ProxySink { fn sink_chain_list( &self, pad: &gst::Pad, - element: &Element, + element: &gst::Element, list: gst::BufferList, ) -> gst::FlowReturn { gst_log!(self.cat, obj: pad, "Handling buffer list {:?}", list); self.enqueue_item(pad, element, DataQueueItem::BufferList(list)) } - fn sink_event(&self, pad: &gst::Pad, element: &Element, event: gst::Event) -> bool { + fn sink_event(&self, pad: &gst::Pad, element: &gst::Element, event: gst::Event) -> bool { use gst::EventView; gst_log!(self.cat, obj: pad, "Handling event {:?}", event); @@ -612,13 +557,18 @@ impl ProxySink { true } - fn sink_query(&self, pad: &gst::Pad, element: &Element, query: &mut gst::QueryRef) -> bool { + fn sink_query( + &self, + pad: &gst::Pad, + element: &gst::Element, + query: &mut gst::QueryRef, + ) -> bool { gst_log!(self.cat, obj: pad, "Handling query {:?}", query); pad.query_default(element, query) } - fn prepare(&self, element: &Element) -> Result<(), gst::ErrorMessage> { + fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> { gst_debug!(self.cat, obj: element, "Preparing"); let settings = self.settings.lock().unwrap().clone(); @@ -640,7 +590,7 @@ impl ProxySink { Ok(()) } - fn unprepare(&self, element: &Element) -> Result<(), ()> { + fn unprepare(&self, element: &gst::Element) -> Result<(), ()> { gst_debug!(self.cat, obj: element, "Unpreparing"); let mut state = self.state.lock().unwrap(); @@ -650,7 +600,7 @@ impl ProxySink { Ok(()) } - fn start(&self, element: &Element) -> Result<(), ()> { + fn start(&self, element: &gst::Element) -> Result<(), ()> { gst_debug!(self.cat, obj: element, "Starting"); let state = self.state.lock().unwrap(); @@ -662,7 +612,7 @@ impl ProxySink { Ok(()) } - fn stop(&self, element: &Element) -> Result<(), ()> { + fn stop(&self, element: &gst::Element) -> Result<(), ()> { gst_debug!(self.cat, obj: element, "Stopping"); let mut state = self.state.lock().unwrap(); @@ -682,12 +632,93 @@ impl ProxySink { } } -impl ObjectImpl for ProxySink { - fn set_property(&self, _obj: &glib::Object, id: u32, value: &glib::Value) { - let prop = &PROPERTIES_SINK[id as usize]; +impl ObjectSubclass for ProxySink { + const NAME: &'static str = "RsTsProxySink"; + type ParentType = gst::Element; + type Instance = gst::subclass::ElementInstanceStruct; + type Class = subclass::simple::ClassStruct; + + glib_object_subclass!(); + + fn class_init(klass: &mut subclass::simple::ClassStruct) { + klass.set_metadata( + "Thread-sharing proxy sink", + "Sink/Generic", + "Thread-sharing proxy sink", + "Sebastian Dröge ", + ); + + let caps = gst::Caps::new_any(); + + let sink_pad_template = gst::PadTemplate::new( + "sink", + gst::PadDirection::Sink, + gst::PadPresence::Always, + &caps, + ); + klass.add_pad_template(sink_pad_template); + + klass.install_properties(&PROPERTIES_SINK); + } + + fn new() -> Self { + unreachable!() + } + + fn new_with_class(klass: &subclass::simple::ClassStruct) -> Self { + let templ = klass.get_pad_template("sink").unwrap(); + let sink_pad = gst::Pad::new_from_template(&templ, "sink"); + + sink_pad.set_chain_function(|pad, parent, buffer| { + ProxySink::catch_panic_pad_function( + parent, + || gst::FlowReturn::Error, + |queue, element| queue.sink_chain(pad, element, buffer), + ) + }); + sink_pad.set_chain_list_function(|pad, parent, list| { + ProxySink::catch_panic_pad_function( + parent, + || gst::FlowReturn::Error, + |queue, element| queue.sink_chain_list(pad, element, list), + ) + }); + sink_pad.set_event_function(|pad, parent, event| { + ProxySink::catch_panic_pad_function( + parent, + || false, + |queue, element| queue.sink_event(pad, element, event), + ) + }); + sink_pad.set_query_function(|pad, parent, query| { + ProxySink::catch_panic_pad_function( + parent, + || false, + |queue, element| queue.sink_query(pad, element, query), + ) + }); + + Self { + cat: gst::DebugCategory::new( + "ts-proxysink", + gst::DebugColorFlags::empty(), + "Thread-sharing proxy sink", + ), + sink_pad: sink_pad, + state: Mutex::new(StateSink::default()), + settings: Mutex::new(SettingsSink::default()), + } + } +} + +impl ObjectImpl for ProxySink { + glib_object_impl!(); + + fn set_property(&self, _obj: &glib::Object, id: usize, value: &glib::Value) { + let prop = &PROPERTIES_SINK[id]; match *prop { - Property::String("proxy-context", ..) => { + subclass::Property("proxy-context", ..) => { let mut settings = self.settings.lock().unwrap(); settings.proxy_context = value.get().unwrap_or_else(|| "".into()); } @@ -695,23 +726,32 @@ impl ObjectImpl for ProxySink { } } - fn get_property(&self, _obj: &glib::Object, id: u32) -> Result { - let prop = &PROPERTIES_SINK[id as usize]; + fn get_property(&self, _obj: &glib::Object, id: usize) -> Result { + let prop = &PROPERTIES_SINK[id]; match *prop { - Property::String("proxy-context", ..) => { + subclass::Property("proxy-context", ..) => { let mut settings = self.settings.lock().unwrap(); Ok(settings.proxy_context.to_value()) } _ => unimplemented!(), } } + + fn constructed(&self, obj: &glib::Object) { + self.parent_constructed(obj); + + let element = obj.downcast_ref::().unwrap(); + element.add_pad(&self.sink_pad).unwrap(); + + ::set_element_flags(element, gst::ElementFlags::SINK); + } } -impl ElementImpl for ProxySink { +impl ElementImpl for ProxySink { fn change_state( &self, - element: &Element, + element: &gst::Element, transition: gst::StateChange, ) -> gst::StateChangeReturn { gst_trace!(self.cat, obj: element, "Changing state {:?}", transition); @@ -735,7 +775,7 @@ impl ElementImpl for ProxySink { _ => (), } - let ret = element.parent_change_state(transition); + let ret = self.parent_change_state(element, transition); if ret == gst::StateChangeReturn::Failure { return ret; } @@ -760,71 +800,6 @@ struct ProxySrc { } impl ProxySrc { - fn class_init(klass: &mut ElementClass) { - klass.set_metadata( - "Thread-sharing proxy source", - "Source/Generic", - "Thread-sharing proxy source", - "Sebastian Dröge ", - ); - - let caps = gst::Caps::new_any(); - - let src_pad_template = gst::PadTemplate::new( - "src", - gst::PadDirection::Src, - gst::PadPresence::Always, - &caps, - ); - klass.add_pad_template(src_pad_template); - - klass.install_properties(&PROPERTIES_SRC); - } - - fn init(element: &Element) -> Box> { - let templ = element.get_pad_template("src").unwrap(); - let src_pad = gst::Pad::new_from_template(&templ, "src"); - - src_pad.set_event_function(|pad, parent, event| { - ProxySrc::catch_panic_pad_function( - parent, - || false, - |queue, element| queue.src_event(pad, element, event), - ) - }); - src_pad.set_query_function(|pad, parent, query| { - ProxySrc::catch_panic_pad_function( - parent, - || false, - |queue, element| queue.src_query(pad, element, query), - ) - }); - element.add_pad(&src_pad).unwrap(); - - ::set_element_flags(element, gst::ElementFlags::SOURCE); - - Box::new(Self { - cat: gst::DebugCategory::new( - "ts-proxysrc", - gst::DebugColorFlags::empty(), - "Thread-sharing proxy source", - ), - src_pad: src_pad, - state: Mutex::new(StateSrc::default()), - settings: Mutex::new(SettingsSrc::default()), - }) - } - - fn catch_panic_pad_function T, G: FnOnce() -> T>( - parent: &Option, - fallback: G, - f: F, - ) -> T { - let element = parent.as_ref().unwrap().downcast_ref::().unwrap(); - let src = element.get_impl().downcast_ref::().unwrap(); - element.catch_panic(fallback, |element| f(src, element)) - } - fn create_io_context_event(state: &StateSrc) -> Option { if let (&Some(ref pending_future_id), &Some(ref io_context)) = (&state.pending_future_id, &state.io_context) @@ -842,7 +817,7 @@ impl ProxySrc { } } - fn src_event(&self, pad: &gst::Pad, element: &Element, event: gst::Event) -> bool { + fn src_event(&self, pad: &gst::Pad, element: &gst::Element, event: gst::Event) -> bool { use gst::EventView; gst_log!(self.cat, obj: pad, "Handling event {:?}", event); @@ -875,7 +850,12 @@ impl ProxySrc { ret } - fn src_query(&self, pad: &gst::Pad, _element: &Element, query: &mut gst::QueryRef) -> bool { + fn src_query( + &self, + pad: &gst::Pad, + _element: &gst::Element, + query: &mut gst::QueryRef, + ) -> bool { use gst::QueryView; gst_log!(self.cat, obj: pad, "Handling query {:?}", query); @@ -917,7 +897,7 @@ impl ProxySrc { fn push_item( &self, - element: &Element, + element: &gst::Element, item: DataQueueItem, ) -> future::Either< Box + Send + 'static>, @@ -1045,7 +1025,7 @@ impl ProxySrc { } } - fn prepare(&self, element: &Element) -> Result<(), gst::ErrorMessage> { + fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> { gst_debug!(self.cat, obj: element, "Preparing"); let settings = self.settings.lock().unwrap().clone(); @@ -1095,14 +1075,11 @@ impl ProxySrc { .schedule( &io_context, move |item| { - let src = element_clone.get_impl().downcast_ref::().unwrap(); + let src = Self::from_instance(&element_clone); src.push_item(&element_clone, item) }, move |err| { - let src = element_clone2 - .get_impl() - .downcast_ref::() - .unwrap(); + let src = Self::from_instance(&element_clone2); gst_error!(src.cat, obj: &element_clone2, "Got error {}", err); match err { gst::FlowError::CustomError => (), @@ -1143,7 +1120,7 @@ impl ProxySrc { Ok(()) } - fn unprepare(&self, element: &Element) -> Result<(), ()> { + fn unprepare(&self, element: &gst::Element) -> Result<(), ()> { gst_debug!(self.cat, obj: element, "Unpreparing"); // FIXME: The IO Context has to be alive longer than the queue, @@ -1181,7 +1158,7 @@ impl ProxySrc { Ok(()) } - fn start(&self, element: &Element) -> Result<(), ()> { + fn start(&self, element: &gst::Element) -> Result<(), ()> { gst_debug!(self.cat, obj: element, "Starting"); let state = self.state.lock().unwrap(); let queue = state.queue.as_ref().unwrap().0.lock().unwrap(); @@ -1195,7 +1172,7 @@ impl ProxySrc { Ok(()) } - fn stop(&self, element: &Element) -> Result<(), ()> { + fn stop(&self, element: &gst::Element) -> Result<(), ()> { gst_debug!(self.cat, obj: element, "Stopping"); let state = self.state.lock().unwrap(); let mut queue = state.queue.as_ref().unwrap().0.lock().unwrap(); @@ -1212,32 +1189,99 @@ impl ProxySrc { } } -impl ObjectImpl for ProxySrc { - fn set_property(&self, _obj: &glib::Object, id: u32, value: &glib::Value) { - let prop = &PROPERTIES_SRC[id as usize]; +impl ObjectSubclass for ProxySrc { + const NAME: &'static str = "RsTsProxySrc"; + type ParentType = gst::Element; + type Instance = gst::subclass::ElementInstanceStruct; + type Class = subclass::simple::ClassStruct; + + glib_object_subclass!(); + + fn class_init(klass: &mut subclass::simple::ClassStruct) { + klass.set_metadata( + "Thread-sharing proxy source", + "Source/Generic", + "Thread-sharing proxy source", + "Sebastian Dröge ", + ); + + let caps = gst::Caps::new_any(); + + let src_pad_template = gst::PadTemplate::new( + "src", + gst::PadDirection::Src, + gst::PadPresence::Always, + &caps, + ); + klass.add_pad_template(src_pad_template); + + klass.install_properties(&PROPERTIES_SRC); + } + + fn new() -> Self { + unreachable!() + } + + fn new_with_class(klass: &subclass::simple::ClassStruct) -> Self { + let templ = klass.get_pad_template("src").unwrap(); + let src_pad = gst::Pad::new_from_template(&templ, "src"); + + src_pad.set_event_function(|pad, parent, event| { + ProxySrc::catch_panic_pad_function( + parent, + || false, + |queue, element| queue.src_event(pad, element, event), + ) + }); + src_pad.set_query_function(|pad, parent, query| { + ProxySrc::catch_panic_pad_function( + parent, + || false, + |queue, element| queue.src_query(pad, element, query), + ) + }); + + Self { + cat: gst::DebugCategory::new( + "ts-proxysrc", + gst::DebugColorFlags::empty(), + "Thread-sharing proxy source", + ), + src_pad: src_pad, + state: Mutex::new(StateSrc::default()), + settings: Mutex::new(SettingsSrc::default()), + } + } +} + +impl ObjectImpl for ProxySrc { + glib_object_impl!(); + + fn set_property(&self, _obj: &glib::Object, id: usize, value: &glib::Value) { + let prop = &PROPERTIES_SRC[id]; match *prop { - Property::UInt("max-size-buffers", ..) => { + subclass::Property("max-size-buffers", ..) => { let mut settings = self.settings.lock().unwrap(); settings.max_size_buffers = value.get().unwrap(); } - Property::UInt("max-size-bytes", ..) => { + subclass::Property("max-size-bytes", ..) => { let mut settings = self.settings.lock().unwrap(); settings.max_size_bytes = value.get().unwrap(); } - Property::UInt64("max-size-time", ..) => { + subclass::Property("max-size-time", ..) => { let mut settings = self.settings.lock().unwrap(); settings.max_size_time = value.get().unwrap(); } - Property::String("context", ..) => { + subclass::Property("context", ..) => { let mut settings = self.settings.lock().unwrap(); settings.context = value.get().unwrap_or_else(|| "".into()); } - Property::UInt("context-wait", ..) => { + subclass::Property("context-wait", ..) => { let mut settings = self.settings.lock().unwrap(); settings.context_wait = value.get().unwrap(); } - Property::String("proxy-context", ..) => { + subclass::Property("proxy-context", ..) => { let mut settings = self.settings.lock().unwrap(); settings.proxy_context = value.get().unwrap_or_else(|| "".into()); } @@ -1245,43 +1289,52 @@ impl ObjectImpl for ProxySrc { } } - fn get_property(&self, _obj: &glib::Object, id: u32) -> Result { - let prop = &PROPERTIES_SRC[id as usize]; + fn get_property(&self, _obj: &glib::Object, id: usize) -> Result { + let prop = &PROPERTIES_SRC[id]; match *prop { - Property::UInt("max-size-buffers", ..) => { + subclass::Property("max-size-buffers", ..) => { let mut settings = self.settings.lock().unwrap(); Ok(settings.max_size_buffers.to_value()) } - Property::UInt("max-size-bytes", ..) => { + subclass::Property("max-size-bytes", ..) => { let mut settings = self.settings.lock().unwrap(); Ok(settings.max_size_bytes.to_value()) } - Property::UInt64("max-size-time", ..) => { + subclass::Property("max-size-time", ..) => { let mut settings = self.settings.lock().unwrap(); Ok(settings.max_size_time.to_value()) } - Property::String("context", ..) => { + subclass::Property("context", ..) => { let mut settings = self.settings.lock().unwrap(); Ok(settings.context.to_value()) } - Property::UInt("context-wait", ..) => { + subclass::Property("context-wait", ..) => { let mut settings = self.settings.lock().unwrap(); Ok(settings.context_wait.to_value()) } - Property::String("proxy-context", ..) => { + subclass::Property("proxy-context", ..) => { let mut settings = self.settings.lock().unwrap(); Ok(settings.proxy_context.to_value()) } _ => unimplemented!(), } } + + fn constructed(&self, obj: &glib::Object) { + self.parent_constructed(obj); + + let element = obj.downcast_ref::().unwrap(); + element.add_pad(&self.src_pad).unwrap(); + + ::set_element_flags(element, gst::ElementFlags::SOURCE); + } } -impl ElementImpl for ProxySrc { +impl ElementImpl for ProxySrc { fn change_state( &self, - element: &Element, + element: &gst::Element, transition: gst::StateChange, ) -> gst::StateChangeReturn { gst_trace!(self.cat, obj: element, "Changing state {:?}", transition); @@ -1305,7 +1358,7 @@ impl ElementImpl for ProxySrc { _ => (), } - let mut ret = element.parent_change_state(transition); + let mut ret = self.parent_change_state(element, transition); if ret == gst::StateChangeReturn::Failure { return ret; } @@ -1325,42 +1378,7 @@ impl ElementImpl for ProxySrc { } } -struct ProxySinkStatic; - -impl ImplTypeStatic for ProxySinkStatic { - fn get_name(&self) -> &str { - "ProxySink" - } - - fn new(&self, element: &Element) -> Box> { - ProxySink::init(element) - } - - fn class_init(&self, klass: &mut ElementClass) { - ProxySink::class_init(klass); - } -} - -struct ProxySrcStatic; - -impl ImplTypeStatic for ProxySrcStatic { - fn get_name(&self) -> &str { - "ProxySrc" - } - - fn new(&self, element: &Element) -> Box> { - ProxySrc::init(element) - } - - fn class_init(&self, klass: &mut ElementClass) { - ProxySrc::class_init(klass); - } -} - pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { - let type_ = register_type(ProxySinkStatic); - gst::Element::register(plugin, "ts-proxysink", 0, type_)?; - - let type_ = register_type(ProxySrcStatic); - gst::Element::register(plugin, "ts-proxysrc", 0, type_) + gst::Element::register(plugin, "ts-proxysink", 0, ProxySink::get_type())?; + gst::Element::register(plugin, "ts-proxysrc", 0, ProxySrc::get_type()) } diff --git a/gst-plugin-threadshare/src/queue.rs b/gst-plugin-threadshare/src/queue.rs index 540879fc..8122e623 100644 --- a/gst-plugin-threadshare/src/queue.rs +++ b/gst-plugin-threadshare/src/queue.rs @@ -17,11 +17,11 @@ use glib; use glib::prelude::*; +use glib::subclass; +use glib::subclass::prelude::*; use gst; use gst::prelude::*; - -use gobject_subclass::object::*; -use gst_plugin::element::*; +use gst::subclass::prelude::*; use std::collections::VecDeque; use std::sync::Mutex; @@ -64,46 +64,60 @@ impl Default for Settings { } } -static PROPERTIES: [Property; 5] = [ - Property::UInt( - "max-size-buffers", - "Max Size Buffers", - "Maximum number of buffers to queue (0=unlimited)", - (0, u32::MAX), - DEFAULT_MAX_SIZE_BUFFERS, - PropertyMutability::ReadWrite, - ), - Property::UInt( - "max-size-bytes", - "Max Size Bytes", - "Maximum number of bytes to queue (0=unlimited)", - (0, u32::MAX), - DEFAULT_MAX_SIZE_BYTES, - PropertyMutability::ReadWrite, - ), - Property::UInt64( - "max-size-time", - "Max Size Time", - "Maximum number of nanoseconds to queue (0=unlimited)", - (0, u64::MAX - 1), - DEFAULT_MAX_SIZE_TIME, - PropertyMutability::ReadWrite, - ), - Property::String( - "context", - "Context", - "Context name to share threads with", - Some(DEFAULT_CONTEXT), - PropertyMutability::ReadWrite, - ), - Property::UInt( - "context-wait", - "Context Wait", - "Throttle poll loop to run at most once every this many ms", - (0, 1000), - DEFAULT_CONTEXT_WAIT, - PropertyMutability::ReadWrite, - ), +static PROPERTIES: [subclass::Property; 5] = [ + subclass::Property("max-size-buffers", || { + glib::ParamSpec::uint( + "max-size-buffers", + "Max Size Buffers", + "Maximum number of buffers to queue (0=unlimited)", + 0, + u32::MAX, + DEFAULT_MAX_SIZE_BUFFERS, + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("max-size-bytes", || { + glib::ParamSpec::uint( + "max-size-bytes", + "Max Size Bytes", + "Maximum number of bytes to queue (0=unlimited)", + 0, + u32::MAX, + DEFAULT_MAX_SIZE_BYTES, + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("max-size-time", || { + glib::ParamSpec::uint64( + "max-size-time", + "Max Size Time", + "Maximum number of nanoseconds to queue (0=unlimited)", + 0, + u64::MAX - 1, + DEFAULT_MAX_SIZE_TIME, + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("context", || { + glib::ParamSpec::string( + "context", + "Context", + "Context name to share threads with", + Some(DEFAULT_CONTEXT), + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("context-wait", || { + glib::ParamSpec::uint( + "context-wait", + "Context Wait", + "Throttle poll loop to run at most once every this many ms", + 0, + 1000, + DEFAULT_CONTEXT_WAIT, + glib::ParamFlags::READWRITE, + ) + }), ]; struct State { @@ -141,100 +155,6 @@ struct Queue { } impl Queue { - fn class_init(klass: &mut ElementClass) { - klass.set_metadata( - "Thread-sharing queue", - "Generic", - "Simple data queue", - "Sebastian Dröge ", - ); - - let caps = gst::Caps::new_any(); - - let sink_pad_template = gst::PadTemplate::new( - "sink", - gst::PadDirection::Sink, - gst::PadPresence::Always, - &caps, - ); - klass.add_pad_template(sink_pad_template); - - let src_pad_template = gst::PadTemplate::new( - "src", - gst::PadDirection::Src, - gst::PadPresence::Always, - &caps, - ); - klass.add_pad_template(src_pad_template); - - klass.install_properties(&PROPERTIES); - } - - fn init(element: &Element) -> Box> { - let templ = element.get_pad_template("sink").unwrap(); - let sink_pad = gst::Pad::new_from_template(&templ, "sink"); - let templ = element.get_pad_template("src").unwrap(); - let src_pad = gst::Pad::new_from_template(&templ, "src"); - - sink_pad.set_chain_function(|pad, parent, buffer| { - Queue::catch_panic_pad_function( - parent, - || gst::FlowReturn::Error, - |queue, element| queue.sink_chain(pad, element, buffer), - ) - }); - sink_pad.set_chain_list_function(|pad, parent, list| { - Queue::catch_panic_pad_function( - parent, - || gst::FlowReturn::Error, - |queue, element| queue.sink_chain_list(pad, element, list), - ) - }); - sink_pad.set_event_function(|pad, parent, event| { - Queue::catch_panic_pad_function( - parent, - || false, - |queue, element| queue.sink_event(pad, element, event), - ) - }); - sink_pad.set_query_function(|pad, parent, query| { - Queue::catch_panic_pad_function( - parent, - || false, - |queue, element| queue.sink_query(pad, element, query), - ) - }); - - src_pad.set_event_function(|pad, parent, event| { - Queue::catch_panic_pad_function( - parent, - || false, - |queue, element| queue.src_event(pad, element, event), - ) - }); - src_pad.set_query_function(|pad, parent, query| { - Queue::catch_panic_pad_function( - parent, - || false, - |queue, element| queue.src_query(pad, element, query), - ) - }); - element.add_pad(&sink_pad).unwrap(); - element.add_pad(&src_pad).unwrap(); - - Box::new(Self { - cat: gst::DebugCategory::new( - "ts-queue", - gst::DebugColorFlags::empty(), - "Thread-sharing queue", - ), - sink_pad: sink_pad, - src_pad: src_pad, - state: Mutex::new(State::default()), - settings: Mutex::new(Settings::default()), - }) - } - fn create_io_context_event(state: &State) -> Option { if let (&Some(ref pending_future_id), &Some(ref io_context)) = (&state.pending_future_id, &state.io_context) @@ -255,7 +175,7 @@ impl Queue { fn enqueue_item( &self, _pad: &gst::Pad, - element: &Element, + element: &gst::Element, item: DataQueueItem, ) -> gst::FlowReturn { let wait_future = { @@ -325,7 +245,7 @@ impl Queue { let element_clone = element.clone(); let future = future::poll_fn(move || { - let queue = element_clone.get_impl().downcast_ref::().unwrap(); + let queue = Self::from_instance(&element_clone); let mut state = queue.state.lock().unwrap(); let State { @@ -430,7 +350,7 @@ impl Queue { fn sink_chain( &self, pad: &gst::Pad, - element: &Element, + element: &gst::Element, buffer: gst::Buffer, ) -> gst::FlowReturn { gst_log!(self.cat, obj: pad, "Handling buffer {:?}", buffer); @@ -440,14 +360,14 @@ impl Queue { fn sink_chain_list( &self, pad: &gst::Pad, - element: &Element, + element: &gst::Element, list: gst::BufferList, ) -> gst::FlowReturn { gst_log!(self.cat, obj: pad, "Handling buffer list {:?}", list); self.enqueue_item(pad, element, DataQueueItem::BufferList(list)) } - fn sink_event(&self, pad: &gst::Pad, element: &Element, mut event: gst::Event) -> bool { + fn sink_event(&self, pad: &gst::Pad, element: &gst::Element, mut event: gst::Event) -> bool { use gst::EventView; gst_log!(self.cat, obj: pad, "Handling event {:?}", event); @@ -505,7 +425,12 @@ impl Queue { } } - fn sink_query(&self, pad: &gst::Pad, _element: &Element, query: &mut gst::QueryRef) -> bool { + fn sink_query( + &self, + pad: &gst::Pad, + _element: &gst::Element, + query: &mut gst::QueryRef, + ) -> bool { gst_log!(self.cat, obj: pad, "Handling query {:?}", query); if query.is_serialized() { @@ -518,7 +443,7 @@ impl Queue { } } - fn src_event(&self, pad: &gst::Pad, element: &Element, event: gst::Event) -> bool { + fn src_event(&self, pad: &gst::Pad, element: &gst::Element, event: gst::Event) -> bool { use gst::EventView; gst_log!(self.cat, obj: pad, "Handling event {:?}", event); @@ -542,7 +467,12 @@ impl Queue { self.sink_pad.push_event(event) } - fn src_query(&self, pad: &gst::Pad, _element: &Element, query: &mut gst::QueryRef) -> bool { + fn src_query( + &self, + pad: &gst::Pad, + _element: &gst::Element, + query: &mut gst::QueryRef, + ) -> bool { use gst::QueryView; gst_log!(self.cat, obj: pad, "Handling query {:?}", query); @@ -578,7 +508,7 @@ impl Queue { fn push_item( &self, - element: &Element, + element: &gst::Element, item: DataQueueItem, ) -> future::Either< Box + Send + 'static>, @@ -679,7 +609,7 @@ impl Queue { } } - fn prepare(&self, element: &Element) -> Result<(), gst::ErrorMessage> { + fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> { gst_debug!(self.cat, obj: element, "Preparing"); let settings = self.settings.lock().unwrap().clone(); @@ -719,11 +649,11 @@ impl Queue { .schedule( &io_context, move |item| { - let queue = element_clone.get_impl().downcast_ref::().unwrap(); + let queue = Self::from_instance(&element_clone); queue.push_item(&element_clone, item) }, move |err| { - let queue = element_clone2.get_impl().downcast_ref::().unwrap(); + let queue = Self::from_instance(&element_clone2); gst_error!(queue.cat, obj: &element_clone2, "Got error {}", err); match err { gst::FlowError::CustomError => (), @@ -762,7 +692,7 @@ impl Queue { Ok(()) } - fn unprepare(&self, element: &Element) -> Result<(), ()> { + fn unprepare(&self, element: &gst::Element) -> Result<(), ()> { gst_debug!(self.cat, obj: element, "Unpreparing"); // FIXME: The IO Context has to be alive longer than the queue, @@ -792,7 +722,7 @@ impl Queue { Ok(()) } - fn start(&self, element: &Element) -> Result<(), ()> { + fn start(&self, element: &gst::Element) -> Result<(), ()> { gst_debug!(self.cat, obj: element, "Starting"); let mut state = self.state.lock().unwrap(); @@ -806,7 +736,7 @@ impl Queue { Ok(()) } - fn stop(&self, element: &Element) -> Result<(), ()> { + fn stop(&self, element: &gst::Element) -> Result<(), ()> { gst_debug!(self.cat, obj: element, "Stopping"); let mut state = self.state.lock().unwrap(); @@ -826,28 +756,135 @@ impl Queue { } } -impl ObjectImpl for Queue { - fn set_property(&self, _obj: &glib::Object, id: u32, value: &glib::Value) { - let prop = &PROPERTIES[id as usize]; +impl ObjectSubclass for Queue { + const NAME: &'static str = "RsTsQueue"; + type ParentType = gst::Element; + type Instance = gst::subclass::ElementInstanceStruct; + type Class = subclass::simple::ClassStruct; + + glib_object_subclass!(); + + fn class_init(klass: &mut subclass::simple::ClassStruct) { + klass.set_metadata( + "Thread-sharing queue", + "Generic", + "Simple data queue", + "Sebastian Dröge ", + ); + + let caps = gst::Caps::new_any(); + + let sink_pad_template = gst::PadTemplate::new( + "sink", + gst::PadDirection::Sink, + gst::PadPresence::Always, + &caps, + ); + klass.add_pad_template(sink_pad_template); + + let src_pad_template = gst::PadTemplate::new( + "src", + gst::PadDirection::Src, + gst::PadPresence::Always, + &caps, + ); + klass.add_pad_template(src_pad_template); + + klass.install_properties(&PROPERTIES); + } + + fn new() -> Self { + unreachable!() + } + + fn new_with_class(klass: &subclass::simple::ClassStruct) -> Self { + let templ = klass.get_pad_template("sink").unwrap(); + let sink_pad = gst::Pad::new_from_template(&templ, "sink"); + let templ = klass.get_pad_template("src").unwrap(); + let src_pad = gst::Pad::new_from_template(&templ, "src"); + + sink_pad.set_chain_function(|pad, parent, buffer| { + Queue::catch_panic_pad_function( + parent, + || gst::FlowReturn::Error, + |queue, element| queue.sink_chain(pad, element, buffer), + ) + }); + sink_pad.set_chain_list_function(|pad, parent, list| { + Queue::catch_panic_pad_function( + parent, + || gst::FlowReturn::Error, + |queue, element| queue.sink_chain_list(pad, element, list), + ) + }); + sink_pad.set_event_function(|pad, parent, event| { + Queue::catch_panic_pad_function( + parent, + || false, + |queue, element| queue.sink_event(pad, element, event), + ) + }); + sink_pad.set_query_function(|pad, parent, query| { + Queue::catch_panic_pad_function( + parent, + || false, + |queue, element| queue.sink_query(pad, element, query), + ) + }); + + src_pad.set_event_function(|pad, parent, event| { + Queue::catch_panic_pad_function( + parent, + || false, + |queue, element| queue.src_event(pad, element, event), + ) + }); + src_pad.set_query_function(|pad, parent, query| { + Queue::catch_panic_pad_function( + parent, + || false, + |queue, element| queue.src_query(pad, element, query), + ) + }); + + Self { + cat: gst::DebugCategory::new( + "ts-queue", + gst::DebugColorFlags::empty(), + "Thread-sharing queue", + ), + sink_pad: sink_pad, + src_pad: src_pad, + state: Mutex::new(State::default()), + settings: Mutex::new(Settings::default()), + } + } +} + +impl ObjectImpl for Queue { + glib_object_impl!(); + + fn set_property(&self, _obj: &glib::Object, id: usize, value: &glib::Value) { + let prop = &PROPERTIES[id]; match *prop { - Property::UInt("max-size-buffers", ..) => { + subclass::Property("max-size-buffers", ..) => { let mut settings = self.settings.lock().unwrap(); settings.max_size_buffers = value.get().unwrap(); } - Property::UInt("max-size-bytes", ..) => { + subclass::Property("max-size-bytes", ..) => { let mut settings = self.settings.lock().unwrap(); settings.max_size_bytes = value.get().unwrap(); } - Property::UInt64("max-size-time", ..) => { + subclass::Property("max-size-time", ..) => { let mut settings = self.settings.lock().unwrap(); settings.max_size_time = value.get().unwrap(); } - Property::String("context", ..) => { + subclass::Property("context", ..) => { let mut settings = self.settings.lock().unwrap(); settings.context = value.get().unwrap_or_else(|| "".into()); } - Property::UInt("context-wait", ..) => { + subclass::Property("context-wait", ..) => { let mut settings = self.settings.lock().unwrap(); settings.context_wait = value.get().unwrap(); } @@ -855,39 +892,47 @@ impl ObjectImpl for Queue { } } - fn get_property(&self, _obj: &glib::Object, id: u32) -> Result { - let prop = &PROPERTIES[id as usize]; + fn get_property(&self, _obj: &glib::Object, id: usize) -> Result { + let prop = &PROPERTIES[id]; match *prop { - Property::UInt("max-size-buffers", ..) => { + subclass::Property("max-size-buffers", ..) => { let mut settings = self.settings.lock().unwrap(); Ok(settings.max_size_buffers.to_value()) } - Property::UInt("max-size-bytes", ..) => { + subclass::Property("max-size-bytes", ..) => { let mut settings = self.settings.lock().unwrap(); Ok(settings.max_size_bytes.to_value()) } - Property::UInt64("max-size-time", ..) => { + subclass::Property("max-size-time", ..) => { let mut settings = self.settings.lock().unwrap(); Ok(settings.max_size_time.to_value()) } - Property::String("context", ..) => { + subclass::Property("context", ..) => { let mut settings = self.settings.lock().unwrap(); Ok(settings.context.to_value()) } - Property::UInt("context-wait", ..) => { + subclass::Property("context-wait", ..) => { let mut settings = self.settings.lock().unwrap(); Ok(settings.context_wait.to_value()) } _ => unimplemented!(), } } + + fn constructed(&self, obj: &glib::Object) { + self.parent_constructed(obj); + + let element = obj.downcast_ref::().unwrap(); + element.add_pad(&self.sink_pad).unwrap(); + element.add_pad(&self.src_pad).unwrap(); + } } -impl ElementImpl for Queue { +impl ElementImpl for Queue { fn change_state( &self, - element: &Element, + element: &gst::Element, transition: gst::StateChange, ) -> gst::StateChangeReturn { gst_trace!(self.cat, obj: element, "Changing state {:?}", transition); @@ -911,7 +956,7 @@ impl ElementImpl for Queue { _ => (), } - let ret = element.parent_change_state(transition); + let ret = self.parent_change_state(element, transition); if ret == gst::StateChangeReturn::Failure { return ret; } @@ -928,24 +973,6 @@ impl ElementImpl for Queue { } } -struct QueueStatic; - -impl ImplTypeStatic for QueueStatic { - fn get_name(&self) -> &str { - "Queue" - } - - fn new(&self, element: &Element) -> Box> { - Queue::init(element) - } - - fn class_init(&self, klass: &mut ElementClass) { - Queue::class_init(klass); - } -} - pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { - let queue_static = QueueStatic; - let type_ = register_type(queue_static); - gst::Element::register(plugin, "ts-queue", 0, type_) + gst::Element::register(plugin, "ts-queue", 0, Queue::get_type()) } diff --git a/gst-plugin-threadshare/src/tcpclientsrc.rs b/gst-plugin-threadshare/src/tcpclientsrc.rs index 212f7a51..0817cbca 100644 --- a/gst-plugin-threadshare/src/tcpclientsrc.rs +++ b/gst-plugin-threadshare/src/tcpclientsrc.rs @@ -18,11 +18,11 @@ use glib; use glib::prelude::*; +use glib::subclass; +use glib::subclass::prelude::*; use gst; use gst::prelude::*; - -use gobject_subclass::object::*; -use gst_plugin::element::*; +use gst::subclass::prelude::*; use std::io; use std::sync::Mutex; @@ -71,52 +71,67 @@ impl Default for Settings { } } -static PROPERTIES: [Property; 6] = [ - Property::String( - "address", - "Address", - "Address to receive packets from", - DEFAULT_ADDRESS, - PropertyMutability::ReadWrite, - ), - Property::UInt( - "port", - "Port", - "Port to receive packets from", - (0, u16::MAX as u32), - DEFAULT_PORT, - PropertyMutability::ReadWrite, - ), - Property::Boxed( - "caps", - "Caps", - "Caps to use", - gst::Caps::static_type, - PropertyMutability::ReadWrite, - ), - Property::UInt( - "chunk-size", - "Chunk Size", - "Chunk Size", - (0, u16::MAX as u32), - DEFAULT_CHUNK_SIZE, - PropertyMutability::ReadWrite, - ), - Property::String( - "context", - "Context", - "Context name to share threads with", - Some(DEFAULT_CONTEXT), - PropertyMutability::ReadWrite, - ), - Property::UInt( - "context-wait", - "Context Wait", - "Throttle poll loop to run at most once every this many ms", - (0, 1000), - DEFAULT_CONTEXT_WAIT, - PropertyMutability::ReadWrite, - ), +static PROPERTIES: [subclass::Property; 6] = [ + subclass::Property("address", || { + glib::ParamSpec::string( + "address", + "Address", + "Address to receive packets from", + DEFAULT_ADDRESS, + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("port", || { + glib::ParamSpec::uint( + "port", + "Port", + "Port to receive packets from", + 0, + u16::MAX as u32, + DEFAULT_PORT, + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("caps", || { + glib::ParamSpec::boxed( + "caps", + "Caps", + "Caps to use", + gst::Caps::static_type(), + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("chunk-size", || { + glib::ParamSpec::uint( + "chunk-size", + "Chunk Size", + "Chunk Size", + 0, + u16::MAX as u32, + DEFAULT_CHUNK_SIZE, + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("context", || { + glib::ParamSpec::string( + "context", + "Context", + "Context name to share threads with", + Some(DEFAULT_CONTEXT), + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("context-wait", || { + glib::ParamSpec::uint( + "context-wait", + "Context Wait", + "Throttle poll loop to run at most once every this many ms", + 0, + 1000, + DEFAULT_CONTEXT_WAIT, + glib::ParamFlags::READWRITE, + ) + }), ]; pub struct TcpClientReader { @@ -185,61 +200,7 @@ struct TcpClientSrc { } impl TcpClientSrc { - fn class_init(klass: &mut ElementClass) { - klass.set_metadata( - "Thread-sharing TCP client source", - "Source/Network", - "Receives data over the network via TCP", - "Sebastian Dröge , LEE Dongjun ", - ); - - let caps = gst::Caps::new_any(); - let src_pad_template = gst::PadTemplate::new( - "src", - gst::PadDirection::Src, - gst::PadPresence::Always, - &caps, - ); - klass.add_pad_template(src_pad_template); - - klass.install_properties(&PROPERTIES); - } - - fn init(element: &Element) -> Box> { - let templ = element.get_pad_template("src").unwrap(); - let src_pad = gst::Pad::new_from_template(&templ, "src"); - - src_pad.set_event_function(|pad, parent, event| { - TcpClientSrc::catch_panic_pad_function( - parent, - || false, - |tcpclientsrc, element| tcpclientsrc.src_event(pad, element, event), - ) - }); - src_pad.set_query_function(|pad, parent, query| { - TcpClientSrc::catch_panic_pad_function( - parent, - || false, - |tcpclientsrc, element| tcpclientsrc.src_query(pad, element, query), - ) - }); - element.add_pad(&src_pad).unwrap(); - - ::set_element_flags(element, gst::ElementFlags::SOURCE); - - Box::new(Self { - cat: gst::DebugCategory::new( - "ts-tcpclientsrc", - gst::DebugColorFlags::empty(), - "Thread-sharing TCP Client source", - ), - src_pad: src_pad, - state: Mutex::new(State::default()), - settings: Mutex::new(Settings::default()), - }) - } - - fn src_event(&self, pad: &gst::Pad, element: &Element, event: gst::Event) -> bool { + fn src_event(&self, pad: &gst::Pad, element: &gst::Element, event: gst::Event) -> bool { use gst::EventView; gst_log!(self.cat, obj: pad, "Handling event {:?}", event); @@ -271,7 +232,12 @@ impl TcpClientSrc { ret } - fn src_query(&self, pad: &gst::Pad, _element: &Element, query: &mut gst::QueryRef) -> bool { + fn src_query( + &self, + pad: &gst::Pad, + _element: &gst::Element, + query: &mut gst::QueryRef, + ) -> bool { use gst::QueryView; gst_log!(self.cat, obj: pad, "Handling query {:?}", query); @@ -331,7 +297,7 @@ impl TcpClientSrc { fn push_buffer( &self, - element: &Element, + element: &gst::Element, buffer: gst::Buffer, ) -> future::Either< Box + Send + 'static>, @@ -431,7 +397,7 @@ impl TcpClientSrc { } } - fn prepare(&self, element: &Element) -> Result<(), gst::ErrorMessage> { + fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> { use std::net::{IpAddr, SocketAddr}; gst_debug!(self.cat, obj: element, "Preparing"); @@ -493,17 +459,11 @@ impl TcpClientSrc { .schedule( &io_context, move |buffer| { - let tcpclientsrc = element_clone - .get_impl() - .downcast_ref::() - .unwrap(); + let tcpclientsrc = Self::from_instance(&element_clone); tcpclientsrc.push_buffer(&element_clone, buffer) }, move |err| { - let tcpclientsrc = element_clone2 - .get_impl() - .downcast_ref::() - .unwrap(); + let tcpclientsrc = Self::from_instance(&element_clone2); gst_error!(tcpclientsrc.cat, obj: &element_clone2, "Got error {}", err); match err { Either::Left(gst::FlowError::CustomError) => (), @@ -547,7 +507,7 @@ impl TcpClientSrc { Ok(()) } - fn unprepare(&self, element: &Element) -> Result<(), ()> { + fn unprepare(&self, element: &gst::Element) -> Result<(), ()> { gst_debug!(self.cat, obj: element, "Unpreparing"); // FIXME: The IO Context has to be alive longer than the queue, @@ -576,7 +536,7 @@ impl TcpClientSrc { Ok(()) } - fn start(&self, element: &Element) -> Result<(), ()> { + fn start(&self, element: &gst::Element) -> Result<(), ()> { gst_debug!(self.cat, obj: element, "Starting"); let state = self.state.lock().unwrap(); @@ -589,7 +549,7 @@ impl TcpClientSrc { Ok(()) } - fn stop(&self, element: &Element) -> Result<(), ()> { + fn stop(&self, element: &gst::Element) -> Result<(), ()> { gst_debug!(self.cat, obj: element, "Stopping"); let mut state = self.state.lock().unwrap(); @@ -604,32 +564,98 @@ impl TcpClientSrc { } } -impl ObjectImpl for TcpClientSrc { - fn set_property(&self, _obj: &glib::Object, id: u32, value: &glib::Value) { - let prop = &PROPERTIES[id as usize]; +impl ObjectSubclass for TcpClientSrc { + const NAME: &'static str = "RsTsTcpClientSrc"; + type ParentType = gst::Element; + type Instance = gst::subclass::ElementInstanceStruct; + type Class = subclass::simple::ClassStruct; + + glib_object_subclass!(); + + fn class_init(klass: &mut subclass::simple::ClassStruct) { + klass.set_metadata( + "Thread-sharing TCP client source", + "Source/Network", + "Receives data over the network via TCP", + "Sebastian Dröge , LEE Dongjun ", + ); + + let caps = gst::Caps::new_any(); + let src_pad_template = gst::PadTemplate::new( + "src", + gst::PadDirection::Src, + gst::PadPresence::Always, + &caps, + ); + klass.add_pad_template(src_pad_template); + + klass.install_properties(&PROPERTIES); + } + + fn new() -> Self { + unreachable!() + } + + fn new_with_class(klass: &subclass::simple::ClassStruct) -> Self { + let templ = klass.get_pad_template("src").unwrap(); + let src_pad = gst::Pad::new_from_template(&templ, "src"); + + src_pad.set_event_function(|pad, parent, event| { + TcpClientSrc::catch_panic_pad_function( + parent, + || false, + |tcpclientsrc, element| tcpclientsrc.src_event(pad, element, event), + ) + }); + src_pad.set_query_function(|pad, parent, query| { + TcpClientSrc::catch_panic_pad_function( + parent, + || false, + |tcpclientsrc, element| tcpclientsrc.src_query(pad, element, query), + ) + }); + + Self { + cat: gst::DebugCategory::new( + "ts-tcpclientsrc", + gst::DebugColorFlags::empty(), + "Thread-sharing TCP Client source", + ), + src_pad: src_pad, + state: Mutex::new(State::default()), + settings: Mutex::new(Settings::default()), + } + } +} + +impl ObjectImpl for TcpClientSrc { + glib_object_impl!(); + + fn set_property(&self, _obj: &glib::Object, id: usize, value: &glib::Value) { + let prop = &PROPERTIES[id]; match *prop { - Property::String("address", ..) => { + subclass::Property("address", ..) => { let mut settings = self.settings.lock().unwrap(); settings.address = value.get(); } - Property::UInt("port", ..) => { + subclass::Property("port", ..) => { let mut settings = self.settings.lock().unwrap(); settings.port = value.get().unwrap(); } - Property::Boxed("caps", ..) => { + subclass::Property("caps", ..) => { let mut settings = self.settings.lock().unwrap(); settings.caps = value.get(); } - Property::UInt("chunk-size", ..) => { + subclass::Property("chunk-size", ..) => { let mut settings = self.settings.lock().unwrap(); settings.chunk_size = value.get().unwrap(); } - Property::String("context", ..) => { + subclass::Property("context", ..) => { let mut settings = self.settings.lock().unwrap(); settings.context = value.get().unwrap_or_else(|| "".into()); } - Property::UInt("context-wait", ..) => { + subclass::Property("context-wait", ..) => { let mut settings = self.settings.lock().unwrap(); settings.context_wait = value.get().unwrap(); } @@ -637,43 +663,52 @@ impl ObjectImpl for TcpClientSrc { } } - fn get_property(&self, _obj: &glib::Object, id: u32) -> Result { - let prop = &PROPERTIES[id as usize]; + fn get_property(&self, _obj: &glib::Object, id: usize) -> Result { + let prop = &PROPERTIES[id]; match *prop { - Property::String("address", ..) => { + subclass::Property("address", ..) => { let mut settings = self.settings.lock().unwrap(); Ok(settings.address.to_value()) } - Property::UInt("port", ..) => { + subclass::Property("port", ..) => { let mut settings = self.settings.lock().unwrap(); Ok(settings.port.to_value()) } - Property::Boxed("caps", ..) => { + subclass::Property("caps", ..) => { let mut settings = self.settings.lock().unwrap(); Ok(settings.caps.to_value()) } - Property::UInt("chunk-size", ..) => { + subclass::Property("chunk-size", ..) => { let mut settings = self.settings.lock().unwrap(); Ok(settings.chunk_size.to_value()) } - Property::String("context", ..) => { + subclass::Property("context", ..) => { let mut settings = self.settings.lock().unwrap(); Ok(settings.context.to_value()) } - Property::UInt("context-wait", ..) => { + subclass::Property("context-wait", ..) => { let mut settings = self.settings.lock().unwrap(); Ok(settings.context_wait.to_value()) } _ => unimplemented!(), } } + + fn constructed(&self, obj: &glib::Object) { + self.parent_constructed(obj); + + let element = obj.downcast_ref::().unwrap(); + element.add_pad(&self.src_pad).unwrap(); + + ::set_element_flags(element, gst::ElementFlags::SOURCE); + } } -impl ElementImpl for TcpClientSrc { +impl ElementImpl for TcpClientSrc { fn change_state( &self, - element: &Element, + element: &gst::Element, transition: gst::StateChange, ) -> gst::StateChangeReturn { gst_trace!(self.cat, obj: element, "Changing state {:?}", transition); @@ -699,7 +734,7 @@ impl ElementImpl for TcpClientSrc { _ => (), } - let mut ret = element.parent_change_state(transition); + let mut ret = self.parent_change_state(element, transition); if ret == gst::StateChangeReturn::Failure { return ret; } @@ -719,24 +754,6 @@ impl ElementImpl for TcpClientSrc { } } -struct TcpClientSrcStatic; - -impl ImplTypeStatic for TcpClientSrcStatic { - fn get_name(&self) -> &str { - "TcpClientSrc" - } - - fn new(&self, element: &Element) -> Box> { - TcpClientSrc::init(element) - } - - fn class_init(&self, klass: &mut ElementClass) { - TcpClientSrc::class_init(klass); - } -} - pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { - let tcpclientsrc_static = TcpClientSrcStatic; - let type_ = register_type(tcpclientsrc_static); - gst::Element::register(plugin, "ts-tcpclientsrc", 0, type_) + gst::Element::register(plugin, "ts-tcpclientsrc", 0, TcpClientSrc::get_type()) } diff --git a/gst-plugin-threadshare/src/udpsrc.rs b/gst-plugin-threadshare/src/udpsrc.rs index 884e9857..596da6b2 100644 --- a/gst-plugin-threadshare/src/udpsrc.rs +++ b/gst-plugin-threadshare/src/udpsrc.rs @@ -17,17 +17,17 @@ use glib; use glib::prelude::*; +use glib::subclass; +use glib::subclass::prelude::*; use gst; use gst::prelude::*; +use gst::subclass::prelude::*; use gio; use gio_ffi; use gobject_ffi; -use gobject_subclass::object::*; -use gst_plugin::element::*; - use std::io; use std::sync::Mutex; use std::u16; @@ -179,73 +179,94 @@ impl Default for Settings { } } -static PROPERTIES: [Property; 9] = [ - Property::String( - "address", - "Address", - "Address/multicast group to listen on", - DEFAULT_ADDRESS, - PropertyMutability::ReadWrite, - ), - Property::UInt( - "port", - "Port", - "Port to listen on", - (0, u16::MAX as u32), - DEFAULT_PORT, - PropertyMutability::ReadWrite, - ), - Property::Boolean( - "reuse", - "Reuse", - "Allow reuse of the port", - DEFAULT_REUSE, - PropertyMutability::ReadWrite, - ), - Property::Boxed( - "caps", - "Caps", - "Caps to use", - gst::Caps::static_type, - PropertyMutability::ReadWrite, - ), - Property::UInt( - "mtu", - "MTU", - "MTU", - (0, u16::MAX as u32), - DEFAULT_MTU, - PropertyMutability::ReadWrite, - ), - Property::Object( - "socket", - "Socket", - "Socket to use for UDP reception. (None == allocate)", - gio::Socket::static_type, - PropertyMutability::ReadWrite, - ), - Property::Object( - "used-socket", - "Used Socket", - "Socket currently in use for UDP reception. (None = no socket)", - gio::Socket::static_type, - PropertyMutability::Readable, - ), - Property::String( - "context", - "Context", - "Context name to share threads with", - Some(DEFAULT_CONTEXT), - PropertyMutability::ReadWrite, - ), - Property::UInt( - "context-wait", - "Context Wait", - "Throttle poll loop to run at most once every this many ms", - (0, 1000), - DEFAULT_CONTEXT_WAIT, - PropertyMutability::ReadWrite, - ), +static PROPERTIES: [subclass::Property; 9] = [ + subclass::Property("address", || { + glib::ParamSpec::string( + "address", + "Address", + "Address/multicast group to listen on", + DEFAULT_ADDRESS, + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("port", || { + glib::ParamSpec::uint( + "port", + "Port", + "Port to listen on", + 0, + u16::MAX as u32, + DEFAULT_PORT, + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("reuse", || { + glib::ParamSpec::boolean( + "reuse", + "Reuse", + "Allow reuse of the port", + DEFAULT_REUSE, + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("caps", || { + glib::ParamSpec::boxed( + "caps", + "Caps", + "Caps to use", + gst::Caps::static_type(), + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("mtu", || { + glib::ParamSpec::uint( + "mtu", + "MTU", + "MTU", + 0, + u16::MAX as u32, + DEFAULT_MTU, + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("socket", || { + glib::ParamSpec::object( + "socket", + "Socket", + "Socket to use for UDP reception. (None == allocate)", + gio::Socket::static_type(), + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("used-socket", || { + glib::ParamSpec::object( + "used-socket", + "Used Socket", + "Socket currently in use for UDP reception. (None = no socket)", + gio::Socket::static_type(), + glib::ParamFlags::READABLE, + ) + }), + subclass::Property("context", || { + glib::ParamSpec::string( + "context", + "Context", + "Context name to share threads with", + Some(DEFAULT_CONTEXT), + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("context-wait", || { + glib::ParamSpec::uint( + "context-wait", + "Context Wait", + "Throttle poll loop to run at most once every this many ms", + 0, + 1000, + DEFAULT_CONTEXT_WAIT, + glib::ParamFlags::READWRITE, + ) + }), ]; pub struct UdpReader { @@ -296,61 +317,7 @@ struct UdpSrc { } impl UdpSrc { - fn class_init(klass: &mut ElementClass) { - klass.set_metadata( - "Thread-sharing UDP source", - "Source/Network", - "Receives data over the network via UDP", - "Sebastian Dröge ", - ); - - let caps = gst::Caps::new_any(); - let src_pad_template = gst::PadTemplate::new( - "src", - gst::PadDirection::Src, - gst::PadPresence::Always, - &caps, - ); - klass.add_pad_template(src_pad_template); - - klass.install_properties(&PROPERTIES); - } - - fn init(element: &Element) -> Box> { - let templ = element.get_pad_template("src").unwrap(); - let src_pad = gst::Pad::new_from_template(&templ, "src"); - - src_pad.set_event_function(|pad, parent, event| { - UdpSrc::catch_panic_pad_function( - parent, - || false, - |udpsrc, element| udpsrc.src_event(pad, element, event), - ) - }); - src_pad.set_query_function(|pad, parent, query| { - UdpSrc::catch_panic_pad_function( - parent, - || false, - |udpsrc, element| udpsrc.src_query(pad, element, query), - ) - }); - element.add_pad(&src_pad).unwrap(); - - ::set_element_flags(element, gst::ElementFlags::SOURCE); - - Box::new(Self { - cat: gst::DebugCategory::new( - "ts-udpsrc", - gst::DebugColorFlags::empty(), - "Thread-sharing UDP source", - ), - src_pad: src_pad, - state: Mutex::new(State::default()), - settings: Mutex::new(Settings::default()), - }) - } - - fn src_event(&self, pad: &gst::Pad, element: &Element, event: gst::Event) -> bool { + fn src_event(&self, pad: &gst::Pad, element: &gst::Element, event: gst::Event) -> bool { use gst::EventView; gst_log!(self.cat, obj: pad, "Handling event {:?}", event); @@ -382,7 +349,12 @@ impl UdpSrc { ret } - fn src_query(&self, pad: &gst::Pad, _element: &Element, query: &mut gst::QueryRef) -> bool { + fn src_query( + &self, + pad: &gst::Pad, + _element: &gst::Element, + query: &mut gst::QueryRef, + ) -> bool { use gst::QueryView; gst_log!(self.cat, obj: pad, "Handling query {:?}", query); @@ -442,7 +414,7 @@ impl UdpSrc { fn push_buffer( &self, - element: &Element, + element: &gst::Element, buffer: gst::Buffer, ) -> future::Either< Box + Send + 'static>, @@ -537,7 +509,7 @@ impl UdpSrc { } } - fn prepare(&self, element: &Element) -> Result<(), gst::ErrorMessage> { + fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> { use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; gst_debug!(self.cat, obj: element, "Preparing"); @@ -768,11 +740,11 @@ impl UdpSrc { .schedule( &io_context, move |buffer| { - let udpsrc = element_clone.get_impl().downcast_ref::().unwrap(); + let udpsrc = Self::from_instance(&element_clone); udpsrc.push_buffer(&element_clone, buffer) }, move |err| { - let udpsrc = element_clone2.get_impl().downcast_ref::().unwrap(); + let udpsrc = Self::from_instance(&element_clone2); gst_error!(udpsrc.cat, obj: &element_clone2, "Got error {}", err); match err { Either::Left(gst::FlowError::CustomError) => (), @@ -819,7 +791,7 @@ impl UdpSrc { Ok(()) } - fn unprepare(&self, element: &Element) -> Result<(), ()> { + fn unprepare(&self, element: &gst::Element) -> Result<(), ()> { gst_debug!(self.cat, obj: element, "Unpreparing"); self.settings.lock().unwrap().used_socket = None; @@ -850,7 +822,7 @@ impl UdpSrc { Ok(()) } - fn start(&self, element: &Element) -> Result<(), ()> { + fn start(&self, element: &gst::Element) -> Result<(), ()> { gst_debug!(self.cat, obj: element, "Starting"); let state = self.state.lock().unwrap(); @@ -863,7 +835,7 @@ impl UdpSrc { Ok(()) } - fn stop(&self, element: &Element) -> Result<(), ()> { + fn stop(&self, element: &gst::Element) -> Result<(), ()> { gst_debug!(self.cat, obj: element, "Stopping"); let mut state = self.state.lock().unwrap(); @@ -878,45 +850,111 @@ impl UdpSrc { } } -impl ObjectImpl for UdpSrc { - fn set_property(&self, _obj: &glib::Object, id: u32, value: &glib::Value) { - let prop = &PROPERTIES[id as usize]; +impl ObjectSubclass for UdpSrc { + const NAME: &'static str = "RsTsUdpSrc"; + type ParentType = gst::Element; + type Instance = gst::subclass::ElementInstanceStruct; + type Class = subclass::simple::ClassStruct; + + glib_object_subclass!(); + + fn class_init(klass: &mut subclass::simple::ClassStruct) { + klass.set_metadata( + "Thread-sharing UDP source", + "Source/Network", + "Receives data over the network via UDP", + "Sebastian Dröge ", + ); + + let caps = gst::Caps::new_any(); + let src_pad_template = gst::PadTemplate::new( + "src", + gst::PadDirection::Src, + gst::PadPresence::Always, + &caps, + ); + klass.add_pad_template(src_pad_template); + + klass.install_properties(&PROPERTIES); + } + + fn new() -> Self { + unreachable!() + } + + fn new_with_class(klass: &subclass::simple::ClassStruct) -> Self { + let templ = klass.get_pad_template("src").unwrap(); + let src_pad = gst::Pad::new_from_template(&templ, "src"); + + src_pad.set_event_function(|pad, parent, event| { + UdpSrc::catch_panic_pad_function( + parent, + || false, + |udpsrc, element| udpsrc.src_event(pad, element, event), + ) + }); + src_pad.set_query_function(|pad, parent, query| { + UdpSrc::catch_panic_pad_function( + parent, + || false, + |udpsrc, element| udpsrc.src_query(pad, element, query), + ) + }); + + Self { + cat: gst::DebugCategory::new( + "ts-udpsrc", + gst::DebugColorFlags::empty(), + "Thread-sharing UDP source", + ), + src_pad: src_pad, + state: Mutex::new(State::default()), + settings: Mutex::new(Settings::default()), + } + } +} + +impl ObjectImpl for UdpSrc { + glib_object_impl!(); + + fn set_property(&self, _obj: &glib::Object, id: usize, value: &glib::Value) { + let prop = &PROPERTIES[id]; match *prop { - Property::String("address", ..) => { + subclass::Property("address", ..) => { let mut settings = self.settings.lock().unwrap(); settings.address = value.get(); } - Property::UInt("port", ..) => { + subclass::Property("port", ..) => { let mut settings = self.settings.lock().unwrap(); settings.port = value.get().unwrap(); } - Property::Boolean("reuse", ..) => { + subclass::Property("reuse", ..) => { let mut settings = self.settings.lock().unwrap(); settings.reuse = value.get().unwrap(); } - Property::Boxed("caps", ..) => { + subclass::Property("caps", ..) => { let mut settings = self.settings.lock().unwrap(); settings.caps = value.get(); } - Property::UInt("mtu", ..) => { + subclass::Property("mtu", ..) => { let mut settings = self.settings.lock().unwrap(); settings.mtu = value.get().unwrap(); } - Property::Object("socket", ..) => { + subclass::Property("socket", ..) => { let mut settings = self.settings.lock().unwrap(); settings.socket = value .get::() .map(|socket| GioSocketWrapper::new(&socket)); } - Property::Object("used-socket", ..) => { + subclass::Property("used-socket", ..) => { unreachable!(); } - Property::String("context", ..) => { + subclass::Property("context", ..) => { let mut settings = self.settings.lock().unwrap(); settings.context = value.get().unwrap_or_else(|| "".into()); } - Property::UInt("context-wait", ..) => { + subclass::Property("context-wait", ..) => { let mut settings = self.settings.lock().unwrap(); settings.context_wait = value.get().unwrap(); } @@ -924,31 +962,31 @@ impl ObjectImpl for UdpSrc { } } - fn get_property(&self, _obj: &glib::Object, id: u32) -> Result { - let prop = &PROPERTIES[id as usize]; + fn get_property(&self, _obj: &glib::Object, id: usize) -> Result { + let prop = &PROPERTIES[id]; match *prop { - Property::String("address", ..) => { + subclass::Property("address", ..) => { let mut settings = self.settings.lock().unwrap(); Ok(settings.address.to_value()) } - Property::UInt("port", ..) => { + subclass::Property("port", ..) => { let mut settings = self.settings.lock().unwrap(); Ok(settings.port.to_value()) } - Property::Boolean("reuse", ..) => { + subclass::Property("reuse", ..) => { let mut settings = self.settings.lock().unwrap(); Ok(settings.reuse.to_value()) } - Property::Boxed("caps", ..) => { + subclass::Property("caps", ..) => { let mut settings = self.settings.lock().unwrap(); Ok(settings.caps.to_value()) } - Property::UInt("mtu", ..) => { + subclass::Property("mtu", ..) => { let mut settings = self.settings.lock().unwrap(); Ok(settings.mtu.to_value()) } - Property::Object("socket", ..) => { + subclass::Property("socket", ..) => { let mut settings = self.settings.lock().unwrap(); Ok(settings .socket @@ -956,7 +994,7 @@ impl ObjectImpl for UdpSrc { .map(GioSocketWrapper::as_socket) .to_value()) } - Property::Object("used-socket", ..) => { + subclass::Property("used-socket", ..) => { let mut settings = self.settings.lock().unwrap(); Ok(settings .used_socket @@ -964,23 +1002,31 @@ impl ObjectImpl for UdpSrc { .map(GioSocketWrapper::as_socket) .to_value()) } - Property::String("context", ..) => { + subclass::Property("context", ..) => { let mut settings = self.settings.lock().unwrap(); Ok(settings.context.to_value()) } - Property::UInt("context-wait", ..) => { + subclass::Property("context-wait", ..) => { let mut settings = self.settings.lock().unwrap(); Ok(settings.context_wait.to_value()) } _ => unimplemented!(), } } + + fn constructed(&self, obj: &glib::Object) { + self.parent_constructed(obj); + + let element = obj.downcast_ref::().unwrap(); + element.add_pad(&self.src_pad).unwrap(); + ::set_element_flags(element, gst::ElementFlags::SOURCE); + } } -impl ElementImpl for UdpSrc { +impl ElementImpl for UdpSrc { fn change_state( &self, - element: &Element, + element: &gst::Element, transition: gst::StateChange, ) -> gst::StateChangeReturn { gst_trace!(self.cat, obj: element, "Changing state {:?}", transition); @@ -1004,7 +1050,7 @@ impl ElementImpl for UdpSrc { _ => (), } - let mut ret = element.parent_change_state(transition); + let mut ret = self.parent_change_state(element, transition); if ret == gst::StateChangeReturn::Failure { return ret; } @@ -1028,24 +1074,6 @@ impl ElementImpl for UdpSrc { } } -struct UdpSrcStatic; - -impl ImplTypeStatic for UdpSrcStatic { - fn get_name(&self) -> &str { - "UdpSrc" - } - - fn new(&self, element: &Element) -> Box> { - UdpSrc::init(element) - } - - fn class_init(&self, klass: &mut ElementClass) { - UdpSrc::class_init(klass); - } -} - pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { - let udpsrc_static = UdpSrcStatic; - let type_ = register_type(udpsrc_static); - gst::Element::register(plugin, "ts-udpsrc", 0, type_) + gst::Element::register(plugin, "ts-udpsrc", 0, UdpSrc::get_type()) }