diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index d770a015..a0cd2e2f 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -165,6 +165,18 @@ "readable": true, "type": "gchararray", "writable": true + }, + "stats": { + "blurb": "Various statistics", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "stats, num-uploads-started=(int)0, num-uploads-completed=(int)0, num-bytes-uploaded=(int)0;", + "mutable": "null", + "readable": true, + "type": "GstStructure", + "writable": false } }, "rank": "none" diff --git a/net/aws/src/s3hlssink/imp.rs b/net/aws/src/s3hlssink/imp.rs index 0645a103..d47162d3 100644 --- a/net/aws/src/s3hlssink/imp.rs +++ b/net/aws/src/s3hlssink/imp.rs @@ -85,6 +85,7 @@ impl Default for Settings { pub struct S3HlsSink { settings: Mutex, + state: Mutex, hlssink: gst::Element, canceller: Mutex>, } @@ -132,6 +133,18 @@ enum S3RequestControl { Pause, } +enum State { + Stopped, + Started(Started), +} + +#[derive(Default)] +struct Started { + num_uploads_started: usize, + num_uploads_completed: usize, + num_bytes_uploaded: usize, +} + impl S3Upload { fn new( s3_client: Client, @@ -258,8 +271,9 @@ impl S3HlsSink { let put_object_req_future = put_object_req.send(); let result = s3utils::wait(&self.canceller, put_object_req_future); - if let Err(err) = result { - gst::error!( + match result { + Err(err) => { + gst::error!( CAT, imp: self, "Put object request for S3 key {} of data length {} failed with error {:?}", @@ -267,12 +281,25 @@ impl S3HlsSink { s3_data_len, err, ); - element_imp_error!( - self, - gst::ResourceError::Write, - ["Put object request failed"] - ); - break; + element_imp_error!( + self, + gst::ResourceError::Write, + ["Put object request failed"] + ); + break; + } + Ok(_) => { + let mut state = self.state.lock().unwrap(); + match *state { + State::Started(ref mut state) => { + state.num_bytes_uploaded += s3_data_len; + state.num_uploads_completed += 1; + } + State::Stopped => { + unreachable!("State not started yet") + } + }; + } }; } Ok(S3Request::Delete(data)) => { @@ -400,6 +427,24 @@ impl S3HlsSink { }; }; } + + fn create_stats(&self) -> gst::Structure { + let state = self.state.lock().unwrap(); + + match &*state { + State::Started(state) => gst::Structure::builder("stats") + .field("num-uploads-started", state.num_uploads_started as u32) + .field("num-uploads-completed", state.num_uploads_completed as u32) + .field("num-bytes-uploaded", state.num_bytes_uploaded as u32) + .build(), + + State::Stopped => gst::Structure::builder("stats") + .field("num-uploads-started", 0) + .field("num-uploads-completed", 0) + .field("num-bytes-uploaded", 0) + .build(), + } + } } #[glib::object_subclass] @@ -423,6 +468,7 @@ impl ObjectSubclass for S3HlsSink { Self { settings: Mutex::new(Settings::default()), + state: Mutex::new(State::Stopped), hlssink, canceller: Mutex::new(None), } @@ -488,6 +534,13 @@ impl ObjectImpl for S3HlsSink { .minimum(1) .default_value(DEFAULT_TIMEOUT_IN_MSECS) .build(), + glib::ParamSpecBoxed::new( + "stats", + "Various statistics", + "Various statistics", + gst::Structure::static_type(), + glib::ParamFlags::READABLE, + ), glib::ParamSpecString::builder("endpoint-uri") .nick("S3 endpoint URI") .blurb("The S3 endpoint URI to use") @@ -568,6 +621,7 @@ impl ObjectImpl for S3HlsSink { "acl" => settings.s3_acl.as_str().to_value(), "retry-attempts" => settings.retry_attempts.to_value(), "request-timeout" => (settings.request_timeout.as_millis() as u64).to_value(), + "stats" => self.create_stats().to_value(), "endpoint-uri" => settings.endpoint_uri.to_value(), _ => unimplemented!(), } @@ -610,6 +664,11 @@ impl ObjectImpl for S3HlsSink { let s3client = self_.s3client_from_settings(); let settings = self_.settings.lock().unwrap(); + let mut state = self_.state.lock().unwrap(); + match *state { + State::Started(ref mut state) => state.num_uploads_started += 1, + State::Stopped => unreachable!("State not started yet"), + }; let s3_location = args[1].get::<&str>().unwrap(); let upload = S3Upload::new( @@ -639,6 +698,11 @@ impl ObjectImpl for S3HlsSink { let s3client = self_.s3client_from_settings(); let settings = self_.settings.lock().unwrap(); + let mut state = self_.state.lock().unwrap(); + match *state { + State::Started(ref mut state) => state.num_uploads_started += 1, + State::Stopped => unreachable!("State not started yet"), + }; let s3_location = args[1].get::<&str>().unwrap(); let upload = S3Upload::new( @@ -764,8 +828,24 @@ impl ElementImpl for S3HlsSink { * in turn will require the settings lock. */ let settings = self.settings.lock().unwrap(); + let mut state = self.state.lock().unwrap(); match transition { + gst::StateChange::ReadyToPaused => *state = State::Started(Started::default()), + gst::StateChange::PausedToPlaying => { + let s3_txc = settings.s3_txc.clone(); + if let Some(tx) = s3_txc { + gst::debug!( + CAT, + imp: self, + "Sending continue request to S3 request thread." + ); + if tx.send(S3RequestControl::Continue).is_err() { + gst::error!(CAT, imp: self, "Could not send continue request."); + } + } + } + gst::StateChange::PlayingToPaused => { let s3_txc = settings.s3_txc.clone(); if let Some(tx) = s3_txc { @@ -781,19 +861,7 @@ impl ElementImpl for S3HlsSink { } } } - gst::StateChange::PausedToPlaying => { - let s3_txc = settings.s3_txc.clone(); - if let Some(tx) = s3_txc { - gst::debug!( - CAT, - imp: self, - "Sending continue request to S3 request thread." - ); - if tx.send(S3RequestControl::Continue).is_err() { - gst::error!(CAT, imp: self, "Could not send continue request."); - } - } - } + gst::StateChange::ReadyToNull => { drop(settings); /* @@ -801,6 +869,8 @@ impl ElementImpl for S3HlsSink { * pending requests. */ self.stop(); + + *state = State::Stopped } _ => (), }