diff --git a/net/reqwest/src/reqwesthttpsrc.rs b/net/reqwest/src/reqwesthttpsrc/imp.rs similarity index 93% rename from net/reqwest/src/reqwesthttpsrc.rs rename to net/reqwest/src/reqwesthttpsrc/imp.rs index eddfd3fc..cb9ebec8 100644 --- a/net/reqwest/src/reqwesthttpsrc.rs +++ b/net/reqwest/src/reqwesthttpsrc/imp.rs @@ -230,7 +230,7 @@ lazy_static! { impl ReqwestHttpSrc { fn set_location( &self, - _element: &gst_base::BaseSrc, + _element: &super::ReqwestHttpSrc, uri: Option<&str>, ) -> Result<(), glib::Error> { let state = self.state.lock().unwrap(); @@ -268,7 +268,10 @@ impl ReqwestHttpSrc { Ok(()) } - fn ensure_client(&self, src: &gst_base::BaseSrc) -> Result { + fn ensure_client( + &self, + src: &super::ReqwestHttpSrc, + ) -> Result { let mut client_guard = self.client.lock().unwrap(); if let Some(ref client) = *client_guard { gst_debug!(CAT, obj: src, "Using already configured client"); @@ -334,7 +337,7 @@ impl ReqwestHttpSrc { fn do_request( &self, - src: &gst_base::BaseSrc, + src: &super::ReqwestHttpSrc, uri: Url, start: u64, stop: Option, @@ -670,17 +673,15 @@ impl ReqwestHttpSrc { } impl ObjectImpl for ReqwestHttpSrc { - fn set_property(&self, obj: &glib::Object, id: usize, value: &glib::Value) { + fn set_property(&self, obj: &Self::Type, id: usize, value: &glib::Value) { let prop = &PROPERTIES[id]; match *prop { subclass::Property("location", ..) => { - let element = obj.downcast_ref::().unwrap(); - let location = value.get::<&str>().expect("type checked upstream"); - if let Err(err) = self.set_location(element, location) { + if let Err(err) = self.set_location(obj, location) { gst_error!( CAT, - obj: element, + obj: obj, "Failed to set property `location`: {:?}", err ); @@ -695,9 +696,8 @@ impl ObjectImpl for ReqwestHttpSrc { settings.user_agent = user_agent; } subclass::Property("is-live", ..) => { - let element = obj.downcast_ref::().unwrap(); let is_live = value.get_some().expect("type checked upstream"); - element.set_live(is_live); + obj.set_live(is_live); } subclass::Property("user-id", ..) => { let mut settings = self.settings.lock().unwrap(); @@ -743,7 +743,7 @@ impl ObjectImpl for ReqwestHttpSrc { }; } - fn get_property(&self, obj: &glib::Object, id: usize) -> Result { + fn get_property(&self, obj: &Self::Type, id: usize) -> Result { let prop = &PROPERTIES[id]; match *prop { subclass::Property("location", ..) => { @@ -756,10 +756,7 @@ impl ObjectImpl for ReqwestHttpSrc { let settings = self.settings.lock().unwrap(); Ok(settings.user_agent.to_value()) } - subclass::Property("is-live", ..) => { - let element = obj.downcast_ref::().unwrap(); - Ok(element.is_live().to_value()) - } + subclass::Property("is-live", ..) => Ok(obj.is_live().to_value()), subclass::Property("user-id", ..) => { let settings = self.settings.lock().unwrap(); Ok(settings.user_id.to_value()) @@ -796,16 +793,15 @@ impl ObjectImpl for ReqwestHttpSrc { } } - fn constructed(&self, obj: &glib::Object) { + fn constructed(&self, obj: &Self::Type) { self.parent_constructed(obj); - let element = obj.downcast_ref::().unwrap(); - element.set_automatic_eos(false); - element.set_format(gst::Format::Bytes); + obj.set_automatic_eos(false); + obj.set_format(gst::Format::Bytes); } } impl ElementImpl for ReqwestHttpSrc { - fn set_context(&self, element: &gst::Element, context: &gst::Context) { + fn set_context(&self, element: &Self::Type, context: &gst::Context) { if context.get_context_type() == REQWEST_CLIENT_CONTEXT { let mut external_client = self.external_client.lock().unwrap(); let s = context.get_structure(); @@ -820,7 +816,7 @@ impl ElementImpl for ReqwestHttpSrc { fn change_state( &self, - element: &gst::Element, + element: &Self::Type, transition: gst::StateChange, ) -> Result { if let gst::StateChange::ReadyToNull = transition { @@ -832,21 +828,21 @@ impl ElementImpl for ReqwestHttpSrc { } impl BaseSrcImpl for ReqwestHttpSrc { - fn is_seekable(&self, _src: &gst_base::BaseSrc) -> bool { + fn is_seekable(&self, _src: &Self::Type) -> bool { match *self.state.lock().unwrap() { State::Started { seekable, .. } => seekable, _ => false, } } - fn get_size(&self, _src: &gst_base::BaseSrc) -> Option { + fn get_size(&self, _src: &Self::Type) -> Option { match *self.state.lock().unwrap() { State::Started { size, .. } => size, _ => None, } } - fn unlock(&self, _src: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> { + fn unlock(&self, _src: &Self::Type) -> Result<(), gst::ErrorMessage> { let canceller = self.canceller.lock().unwrap(); if let Some(ref canceller) = *canceller { canceller.abort(); @@ -854,7 +850,7 @@ impl BaseSrcImpl for ReqwestHttpSrc { Ok(()) } - fn start(&self, src: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> { + fn start(&self, src: &Self::Type) -> Result<(), gst::ErrorMessage> { let mut state = self.state.lock().unwrap(); *state = State::Stopped; @@ -881,14 +877,14 @@ impl BaseSrcImpl for ReqwestHttpSrc { Ok(()) } - fn stop(&self, src: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> { + fn stop(&self, src: &Self::Type) -> Result<(), gst::ErrorMessage> { gst_debug!(CAT, obj: src, "Stopping"); *self.state.lock().unwrap() = State::Stopped; Ok(()) } - fn query(&self, element: &gst_base::BaseSrc, query: &mut gst::QueryRef) -> bool { + fn query(&self, element: &Self::Type, query: &mut gst::QueryRef) -> bool { use gst::QueryView; match query.view_mut() { @@ -906,7 +902,7 @@ impl BaseSrcImpl for ReqwestHttpSrc { } } - fn do_seek(&self, src: &gst_base::BaseSrc, segment: &mut gst::Segment) -> bool { + fn do_seek(&self, src: &Self::Type, segment: &mut gst::Segment) -> bool { let segment = segment.downcast_mut::().unwrap(); let mut state = self.state.lock().unwrap(); @@ -951,7 +947,7 @@ impl BaseSrcImpl for ReqwestHttpSrc { } impl PushSrcImpl for ReqwestHttpSrc { - fn create(&self, src: &gst_base::PushSrc) -> Result { + fn create(&self, src: &Self::Type) -> Result { let mut state = self.state.lock().unwrap(); let (response, position, caps, tags) = match *state { @@ -1073,15 +1069,13 @@ impl PushSrcImpl for ReqwestHttpSrc { } impl URIHandlerImpl for ReqwestHttpSrc { - fn get_uri(&self, _element: &gst::URIHandler) -> Option { + fn get_uri(&self, _element: &Self::Type) -> Option { let settings = self.settings.lock().unwrap(); settings.location.as_ref().map(Url::to_string) } - fn set_uri(&self, element: &gst::URIHandler, uri: &str) -> Result<(), glib::Error> { - let element = element.dynamic_cast_ref::().unwrap(); - + fn set_uri(&self, element: &Self::Type, uri: &str) -> Result<(), glib::Error> { self.set_location(&element, Some(uri)) } @@ -1096,6 +1090,7 @@ impl URIHandlerImpl for ReqwestHttpSrc { impl ObjectSubclass for ReqwestHttpSrc { const NAME: &'static str = "ReqwestHttpSrc"; + type Type = super::ReqwestHttpSrc; type ParentType = gst_base::PushSrc; type Instance = gst::subclass::ElementInstanceStruct; type Class = subclass::simple::ClassStruct; @@ -1116,7 +1111,7 @@ impl ObjectSubclass for ReqwestHttpSrc { type_.add_interface::(); } - fn class_init(klass: &mut subclass::simple::ClassStruct) { + fn class_init(klass: &mut Self::Class) { klass.set_metadata( "HTTP Source", "Source/Network/HTTP", @@ -1137,12 +1132,3 @@ impl ObjectSubclass for ReqwestHttpSrc { klass.install_properties(&PROPERTIES); } } - -pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { - gst::Element::register( - Some(plugin), - "reqwesthttpsrc", - gst::Rank::Marginal, - ReqwestHttpSrc::get_type(), - ) -} diff --git a/net/reqwest/src/reqwesthttpsrc/mod.rs b/net/reqwest/src/reqwesthttpsrc/mod.rs new file mode 100644 index 00000000..701f97a6 --- /dev/null +++ b/net/reqwest/src/reqwesthttpsrc/mod.rs @@ -0,0 +1,27 @@ +// Copyright (C) 2016-2018 Sebastian Dröge +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use glib::prelude::*; + +mod imp; + +glib_wrapper! { + pub struct ReqwestHttpSrc(ObjectSubclass) @extends gst_base::PushSrc, gst_base::BaseSrc, gst::Element, gst::Object, @implements gst::URIHandler; +} + +unsafe impl Send for ReqwestHttpSrc {} +unsafe impl Sync for ReqwestHttpSrc {} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + gst::Element::register( + Some(plugin), + "reqwesthttpsrc", + gst::Rank::Marginal, + ReqwestHttpSrc::static_type(), + ) +} diff --git a/net/rusoto/src/aws_transcribe_parse.rs b/net/rusoto/src/aws_transcriber/imp.rs similarity index 95% rename from net/rusoto/src/aws_transcribe_parse.rs rename to net/rusoto/src/aws_transcriber/imp.rs index a1c55e5c..3c95f3c7 100644 --- a/net/rusoto/src/aws_transcribe_parse.rs +++ b/net/rusoto/src/aws_transcriber/imp.rs @@ -40,7 +40,7 @@ use std::pin::Pin; use std::sync::Mutex; use std::time::Duration; -use crate::packet::*; +use super::packet::*; use serde_derive::Deserialize; @@ -199,7 +199,7 @@ impl Default for State { type WsSink = Pin + Send>>; -struct Transcriber { +pub struct Transcriber { srcpad: gst::Pad, sinkpad: gst::Pad, settings: Mutex, @@ -230,7 +230,7 @@ fn build_packet(payload: &[u8]) -> Vec { } impl Transcriber { - fn dequeue(&self, element: &gst::Element) -> bool { + fn dequeue(&self, element: &super::Transcriber) -> bool { /* First, check our pending buffers */ let mut items = vec![]; @@ -340,7 +340,7 @@ impl Transcriber { fn enqueue( &self, - element: &gst::Element, + element: &super::Transcriber, state: &mut State, alternative: &TranscriptAlternative, partial: bool, @@ -398,7 +398,7 @@ impl Transcriber { fn loop_fn( &self, - element: &gst::Element, + element: &super::Transcriber, receiver: &mut mpsc::Receiver, ) -> Result<(), gst::ErrorMessage> { let future = async move { @@ -567,7 +567,7 @@ impl Transcriber { RUNTIME.enter(|| futures::executor::block_on(future)) } - fn start_task(&self, element: &gst::Element) -> Result<(), gst::LoggableError> { + fn start_task(&self, element: &super::Transcriber) -> Result<(), gst::LoggableError> { let element_weak = element.downgrade(); let pad_weak = self.srcpad.downgrade(); let (sender, mut receiver) = mpsc::channel(1); @@ -610,7 +610,7 @@ impl Transcriber { fn src_activatemode( &self, _pad: &gst::Pad, - element: &gst::Element, + element: &super::Transcriber, _mode: gst::PadMode, active: bool, ) -> Result<(), gst::LoggableError> { @@ -628,7 +628,12 @@ impl Transcriber { Ok(()) } - fn src_query(&self, pad: &gst::Pad, element: &gst::Element, query: &mut gst::QueryRef) -> bool { + fn src_query( + &self, + pad: &gst::Pad, + element: &super::Transcriber, + query: &mut gst::QueryRef, + ) -> bool { use gst::QueryView; gst_log!(CAT, obj: pad, "Handling query {:?}", query); @@ -664,7 +669,7 @@ impl Transcriber { } } - fn sink_event(&self, pad: &gst::Pad, element: &gst::Element, event: gst::Event) -> bool { + fn sink_event(&self, pad: &gst::Pad, element: &super::Transcriber, event: gst::Event) -> bool { use gst::EventView; gst_debug!(CAT, obj: pad, "Handling event {:?}", event); @@ -771,7 +776,7 @@ impl Transcriber { async fn sync_and_send( &self, - element: &gst::Element, + element: &super::Transcriber, buffer: Option, ) -> Result { let mut delay = None; @@ -819,7 +824,7 @@ impl Transcriber { fn handle_buffer( &self, _pad: &gst::Pad, - element: &gst::Element, + element: &super::Transcriber, buffer: Option, ) -> Result { gst_debug!(CAT, obj: element, "Handling {:?}", buffer); @@ -846,13 +851,13 @@ impl Transcriber { fn sink_chain( &self, pad: &gst::Pad, - element: &gst::Element, + element: &super::Transcriber, buffer: gst::Buffer, ) -> Result { self.handle_buffer(pad, element, Some(buffer)) } - fn ensure_connection(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> { + fn ensure_connection(&self, element: &super::Transcriber) -> Result<(), gst::ErrorMessage> { let mut state = self.state.lock().unwrap(); if state.connected { @@ -959,7 +964,7 @@ impl Transcriber { Ok(()) } - fn disconnect(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> { + fn disconnect(&self, element: &super::Transcriber) -> Result<(), gst::ErrorMessage> { let mut state = self.state.lock().unwrap(); gst_info!(CAT, obj: element, "Unpreparing"); @@ -987,13 +992,14 @@ impl Transcriber { impl ObjectSubclass for Transcriber { const NAME: &'static str = "RsAwsTranscriber"; + type Type = super::Transcriber; type ParentType = gst::Element; type Instance = gst::subclass::ElementInstanceStruct; type Class = subclass::simple::ClassStruct; glib_object_subclass!(); - fn with_class(klass: &subclass::simple::ClassStruct) -> Self { + fn with_class(klass: &Self::Class) -> Self { let templ = klass.get_pad_template("sink").unwrap(); let sinkpad = gst::Pad::builder_with_template(&templ, Some("sink")) .chain_function(|pad, parent, buffer| { @@ -1047,7 +1053,7 @@ impl ObjectSubclass for Transcriber { } } - fn class_init(klass: &mut subclass::simple::ClassStruct) { + fn class_init(klass: &mut Self::Class) { klass.set_metadata( "Transcriber", "Audio/Text/Filter", @@ -1085,17 +1091,15 @@ impl ObjectSubclass for Transcriber { } impl ObjectImpl for Transcriber { - fn constructed(&self, obj: &glib::Object) { + fn constructed(&self, obj: &Self::Type) { self.parent_constructed(obj); - let element = obj.downcast_ref::().unwrap(); - element.add_pad(&self.sinkpad).unwrap(); - element.add_pad(&self.srcpad).unwrap(); - element - .set_element_flags(gst::ElementFlags::PROVIDE_CLOCK | gst::ElementFlags::REQUIRE_CLOCK); + obj.add_pad(&self.sinkpad).unwrap(); + obj.add_pad(&self.srcpad).unwrap(); + obj.set_element_flags(gst::ElementFlags::PROVIDE_CLOCK | gst::ElementFlags::REQUIRE_CLOCK); } - fn set_property(&self, _obj: &glib::Object, id: usize, value: &glib::Value) { + fn set_property(&self, _obj: &Self::Type, id: usize, value: &glib::Value) { let prop = &PROPERTIES[id]; match *prop { @@ -1115,7 +1119,7 @@ impl ObjectImpl for Transcriber { } } - fn get_property(&self, _obj: &glib::Object, id: usize) -> Result { + fn get_property(&self, _obj: &Self::Type, id: usize) -> Result { let prop = &PROPERTIES[id]; match *prop { @@ -1139,7 +1143,7 @@ impl ObjectImpl for Transcriber { impl ElementImpl for Transcriber { fn change_state( &self, - element: &gst::Element, + element: &Self::Type, transition: gst::StateChange, ) -> Result { gst_info!(CAT, obj: element, "Changing state {:?}", transition); @@ -1169,16 +1173,7 @@ impl ElementImpl for Transcriber { Ok(success) } - fn provide_clock(&self, _element: &gst::Element) -> Option { + fn provide_clock(&self, _element: &Self::Type) -> Option { Some(gst::SystemClock::obtain()) } } - -pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { - gst::Element::register( - Some(plugin), - "awstranscriber", - gst::Rank::None, - Transcriber::get_type(), - ) -} diff --git a/net/rusoto/src/aws_transcriber/mod.rs b/net/rusoto/src/aws_transcriber/mod.rs new file mode 100644 index 00000000..f28f8644 --- /dev/null +++ b/net/rusoto/src/aws_transcriber/mod.rs @@ -0,0 +1,37 @@ +// Copyright (C) 2020 Mathieu Duponchelle +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Library General Public +// License as published by the Free Software Foundation; either +// version 2 of the License, or (at your option) any later version. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Library General Public License for more details. +// +// You should have received a copy of the GNU Library General Public +// License along with this library; if not, write to the +// Free Software Foundation, Inc., 51 Franklin Street, Suite 500, +// Boston, MA 02110-1335, USA. + +use glib::prelude::*; + +mod imp; +mod packet; + +glib_wrapper! { + pub struct Transcriber(ObjectSubclass) @extends gst::Element, gst::Object; +} + +unsafe impl Send for Transcriber {} +unsafe impl Sync for Transcriber {} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + gst::Element::register( + Some(plugin), + "awstranscriber", + gst::Rank::None, + Transcriber::static_type(), + ) +} diff --git a/net/rusoto/src/packet/mod.rs b/net/rusoto/src/aws_transcriber/packet/mod.rs similarity index 100% rename from net/rusoto/src/packet/mod.rs rename to net/rusoto/src/aws_transcriber/packet/mod.rs diff --git a/net/rusoto/src/lib.rs b/net/rusoto/src/lib.rs index c0b45bd2..359ef1f6 100644 --- a/net/rusoto/src/lib.rs +++ b/net/rusoto/src/lib.rs @@ -14,8 +14,7 @@ extern crate gstreamer_base as gst_base; #[macro_use] extern crate lazy_static; -mod aws_transcribe_parse; -mod packet; +mod aws_transcriber; mod s3sink; mod s3src; mod s3url; @@ -24,7 +23,7 @@ mod s3utils; fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { s3sink::register(plugin)?; s3src::register(plugin)?; - aws_transcribe_parse::register(plugin)?; + aws_transcriber::register(plugin)?; Ok(()) } diff --git a/net/rusoto/src/s3sink.rs b/net/rusoto/src/s3sink/imp.rs similarity index 94% rename from net/rusoto/src/s3sink.rs rename to net/rusoto/src/s3sink/imp.rs index 21138f5b..121a3858 100644 --- a/net/rusoto/src/s3sink.rs +++ b/net/rusoto/src/s3sink/imp.rs @@ -157,7 +157,7 @@ static PROPERTIES: [subclass::Property; 4] = [ impl S3Sink { fn flush_current_buffer( &self, - element: &gst_base::BaseSink, + element: &super::S3Sink, ) -> Result<(), Option> { let upload_part_req = self.create_upload_part_request()?; let part_number = upload_part_req.part_number; @@ -261,7 +261,7 @@ impl S3Sink { }) } - fn finalize_upload(&self, element: &gst_base::BaseSink) -> Result<(), gst::ErrorMessage> { + fn finalize_upload(&self, element: &super::S3Sink) -> Result<(), gst::ErrorMessage> { if self.flush_current_buffer(element).is_err() { return Err(gst_error_msg!( gst::ResourceError::Settings, @@ -339,7 +339,7 @@ impl S3Sink { fn update_buffer( &self, src: &[u8], - element: &gst_base::BaseSink, + element: &super::S3Sink, ) -> Result<(), Option> { let mut state = self.state.lock().unwrap(); let started_state = match *state { @@ -381,6 +381,7 @@ impl S3Sink { impl ObjectSubclass for S3Sink { const NAME: &'static str = "RusotoS3Sink"; + type Type = super::S3Sink; type ParentType = gst_base::BaseSink; type Instance = gst::subclass::ElementInstanceStruct; type Class = subclass::simple::ClassStruct; @@ -395,7 +396,7 @@ impl ObjectSubclass for S3Sink { } } - fn class_init(klass: &mut subclass::simple::ClassStruct) { + fn class_init(klass: &mut Self::Class) { klass.set_metadata( "Amazon S3 sink", "Source/Network", @@ -418,7 +419,7 @@ impl ObjectSubclass for S3Sink { } impl ObjectImpl for S3Sink { - fn set_property(&self, _obj: &glib::Object, id: usize, value: &glib::Value) { + fn set_property(&self, _obj: &Self::Type, id: usize, value: &glib::Value) { let prop = &PROPERTIES[id as usize]; let mut settings = self.settings.lock().unwrap(); @@ -445,7 +446,7 @@ impl ObjectImpl for S3Sink { } } - fn get_property(&self, _: &glib::Object, id: usize) -> Result { + fn get_property(&self, _: &Self::Type, id: usize) -> Result { let prop = &PROPERTIES[id as usize]; let settings = self.settings.lock().unwrap(); @@ -462,11 +463,11 @@ impl ObjectImpl for S3Sink { impl ElementImpl for S3Sink {} impl BaseSinkImpl for S3Sink { - fn start(&self, _element: &gst_base::BaseSink) -> Result<(), gst::ErrorMessage> { + fn start(&self, _element: &Self::Type) -> Result<(), gst::ErrorMessage> { self.start() } - fn stop(&self, element: &gst_base::BaseSink) -> Result<(), gst::ErrorMessage> { + fn stop(&self, element: &Self::Type) -> Result<(), gst::ErrorMessage> { let mut state = self.state.lock().unwrap(); *state = State::Stopped; gst_info!(CAT, obj: element, "Stopped"); @@ -476,7 +477,7 @@ impl BaseSinkImpl for S3Sink { fn render( &self, - element: &gst_base::BaseSink, + element: &Self::Type, buffer: &gst::Buffer, ) -> Result { if let State::Stopped = *self.state.lock().unwrap() { @@ -511,13 +512,13 @@ impl BaseSinkImpl for S3Sink { } } - fn unlock(&self, _element: &gst_base::BaseSink) -> Result<(), gst::ErrorMessage> { + fn unlock(&self, _element: &Self::Type) -> Result<(), gst::ErrorMessage> { self.cancel(); Ok(()) } - fn event(&self, element: &gst_base::BaseSink, event: gst::Event) -> bool { + fn event(&self, element: &Self::Type, event: gst::Event) -> bool { if let gst::EventView::Eos(_) = event.view() { if let Err(error_message) = self.finalize_upload(element) { gst_error!( @@ -533,12 +534,3 @@ impl BaseSinkImpl for S3Sink { BaseSinkImplExt::parent_event(self, element, event) } } - -pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { - gst::Element::register( - Some(plugin), - "rusotos3sink", - gst::Rank::Primary, - S3Sink::get_type(), - ) -} diff --git a/net/rusoto/src/s3sink/mod.rs b/net/rusoto/src/s3sink/mod.rs new file mode 100644 index 00000000..40b6cd93 --- /dev/null +++ b/net/rusoto/src/s3sink/mod.rs @@ -0,0 +1,27 @@ +// Copyright (C) 2019 Amazon.com, Inc. or its affiliates +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use glib::prelude::*; + +mod imp; + +glib_wrapper! { + pub struct S3Sink(ObjectSubclass) @extends gst_base::BaseSink, gst::Element, gst::Object; +} + +unsafe impl Send for S3Sink {} +unsafe impl Sync for S3Sink {} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + gst::Element::register( + Some(plugin), + "rusotos3sink", + gst::Rank::Primary, + S3Sink::static_type(), + ) +} diff --git a/net/rusoto/src/s3src.rs b/net/rusoto/src/s3src/imp.rs similarity index 85% rename from net/rusoto/src/s3src.rs rename to net/rusoto/src/s3src/imp.rs index 8c397890..ea0eb4f6 100644 --- a/net/rusoto/src/s3src.rs +++ b/net/rusoto/src/s3src/imp.rs @@ -72,11 +72,7 @@ impl S3Src { Ok(S3Client::new(url.region.clone())) } - fn set_uri( - self: &S3Src, - _: &gst_base::BaseSrc, - url_str: Option<&str>, - ) -> Result<(), glib::Error> { + fn set_uri(self: &S3Src, _: &super::S3Src, url_str: Option<&str>) -> Result<(), glib::Error> { let state = self.state.lock().unwrap(); if let StreamingState::Started { .. } = *state { @@ -108,7 +104,7 @@ impl S3Src { fn head( self: &S3Src, - src: &gst_base::BaseSrc, + src: &super::S3Src, client: &S3Client, url: &GstS3Url, ) -> Result { @@ -145,7 +141,7 @@ impl S3Src { /* Returns the bytes, Some(error) if one occured, or a None error if interrupted */ fn get( self: &S3Src, - src: &gst_base::BaseSrc, + src: &super::S3Src, offset: u64, length: u64, ) -> Result> { @@ -210,6 +206,7 @@ impl S3Src { impl ObjectSubclass for S3Src { const NAME: &'static str = "RusotoS3Src"; + type Type = super::S3Src; type ParentType = gst_base::BaseSrc; type Instance = gst::subclass::ElementInstanceStruct; type Class = subclass::simple::ClassStruct; @@ -228,7 +225,7 @@ impl ObjectSubclass for S3Src { typ.add_interface::(); } - fn class_init(klass: &mut subclass::simple::ClassStruct) { + fn class_init(klass: &mut Self::Class) { klass.set_metadata( "Amazon S3 source", "Source/Network", @@ -251,19 +248,18 @@ impl ObjectSubclass for S3Src { } impl ObjectImpl for S3Src { - fn set_property(&self, obj: &glib::Object, id: usize, value: &glib::Value) { + fn set_property(&self, obj: &Self::Type, id: usize, value: &glib::Value) { let prop = &PROPERTIES[id as usize]; - let basesrc = obj.downcast_ref::().unwrap(); match *prop { subclass::Property("uri", ..) => { - let _ = self.set_uri(basesrc, value.get().expect("type checked upstream")); + let _ = self.set_uri(obj, value.get().expect("type checked upstream")); } _ => unimplemented!(), } } - fn get_property(&self, _: &glib::Object, id: usize) -> Result { + fn get_property(&self, _: &Self::Type, id: usize) -> Result { let prop = &PROPERTIES[id as usize]; match *prop { @@ -279,13 +275,12 @@ impl ObjectImpl for S3Src { } } - fn constructed(&self, obj: &glib::Object) { + fn constructed(&self, obj: &Self::Type) { self.parent_constructed(obj); - let basesrc = obj.downcast_ref::().unwrap(); - basesrc.set_format(gst::Format::Bytes); + obj.set_format(gst::Format::Bytes); /* Set a larger default blocksize to make read more efficient */ - basesrc.set_blocksize(256 * 1024); + obj.set_blocksize(256 * 1024); } } @@ -294,13 +289,12 @@ impl ElementImpl for S3Src { } impl URIHandlerImpl for S3Src { - fn get_uri(&self, _: &gst::URIHandler) -> Option { + fn get_uri(&self, _: &Self::Type) -> Option { self.url.lock().unwrap().as_ref().map(|s| s.to_string()) } - fn set_uri(&self, element: &gst::URIHandler, uri: &str) -> Result<(), glib::Error> { - let basesrc = element.dynamic_cast_ref::().unwrap(); - self.set_uri(basesrc, Some(uri)) + fn set_uri(&self, element: &Self::Type, uri: &str) -> Result<(), glib::Error> { + self.set_uri(element, Some(uri)) } fn get_uri_type() -> gst::URIType { @@ -313,18 +307,18 @@ impl URIHandlerImpl for S3Src { } impl BaseSrcImpl for S3Src { - fn is_seekable(&self, _: &gst_base::BaseSrc) -> bool { + fn is_seekable(&self, _: &Self::Type) -> bool { true } - fn get_size(&self, _: &gst_base::BaseSrc) -> Option { + fn get_size(&self, _: &Self::Type) -> Option { match *self.state.lock().unwrap() { StreamingState::Stopped => None, StreamingState::Started { size, .. } => Some(size), } } - fn start(&self, src: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> { + fn start(&self, src: &Self::Type) -> Result<(), gst::ErrorMessage> { let mut state = self.state.lock().unwrap(); if let StreamingState::Started { .. } = *state { @@ -353,7 +347,7 @@ impl BaseSrcImpl for S3Src { Ok(()) } - fn stop(&self, _: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> { + fn stop(&self, _: &Self::Type) -> Result<(), gst::ErrorMessage> { // First, stop any asynchronous tasks if we're running, as they will have the state lock self.cancel(); @@ -368,7 +362,7 @@ impl BaseSrcImpl for S3Src { Ok(()) } - fn query(&self, src: &gst_base::BaseSrc, query: &mut gst::QueryRef) -> bool { + fn query(&self, src: &Self::Type, query: &mut gst::QueryRef) -> bool { if let gst::QueryView::Scheduling(ref mut q) = query.view_mut() { q.set( gst::SchedulingFlags::SEQUENTIAL | gst::SchedulingFlags::BANDWIDTH_LIMITED, @@ -385,7 +379,7 @@ impl BaseSrcImpl for S3Src { fn create( &self, - src: &gst_base::BaseSrc, + src: &Self::Type, offset: u64, buffer: Option<&mut gst::BufferRef>, length: u32, @@ -416,21 +410,12 @@ impl BaseSrcImpl for S3Src { } /* FIXME: implement */ - fn do_seek(&self, _: &gst_base::BaseSrc, _: &mut gst::Segment) -> bool { + fn do_seek(&self, _: &Self::Type, _: &mut gst::Segment) -> bool { true } - fn unlock(&self, _: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> { + fn unlock(&self, _: &Self::Type) -> Result<(), gst::ErrorMessage> { self.cancel(); Ok(()) } } - -pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { - gst::Element::register( - Some(plugin), - "rusotos3src", - gst::Rank::Primary, - S3Src::get_type(), - ) -} diff --git a/net/rusoto/src/s3src/mod.rs b/net/rusoto/src/s3src/mod.rs new file mode 100644 index 00000000..04968fea --- /dev/null +++ b/net/rusoto/src/s3src/mod.rs @@ -0,0 +1,27 @@ +// Copyright (C) 2017 Author: Arun Raghavan +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use glib::prelude::*; + +mod imp; + +glib_wrapper! { + pub struct S3Src(ObjectSubclass) @extends gst_base::BaseSrc, gst::Element, gst::Object; +} + +unsafe impl Send for S3Src {} +unsafe impl Sync for S3Src {} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + gst::Element::register( + Some(plugin), + "rusotos3src", + gst::Rank::Primary, + S3Src::static_type(), + ) +}