awss3hlssink: Add stats property.

application can monitor the progress of hls segment generation
and upload progress.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1022>
This commit is contained in:
rajneeshksoni 2022-12-19 18:16:54 +05:30 committed by GStreamer Marge Bot
parent de23ea7f29
commit d846f527af
2 changed files with 103 additions and 21 deletions

View file

@ -165,6 +165,18 @@
"readable": true, "readable": true,
"type": "gchararray", "type": "gchararray",
"writable": true "writable": true
},
"stats": {
"blurb": "Various statistics",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "stats, num-uploads-started=(int)0, num-uploads-completed=(int)0, num-bytes-uploaded=(int)0;",
"mutable": "null",
"readable": true,
"type": "GstStructure",
"writable": false
} }
}, },
"rank": "none" "rank": "none"

View file

@ -85,6 +85,7 @@ impl Default for Settings {
pub struct S3HlsSink { pub struct S3HlsSink {
settings: Mutex<Settings>, settings: Mutex<Settings>,
state: Mutex<State>,
hlssink: gst::Element, hlssink: gst::Element,
canceller: Mutex<Option<future::AbortHandle>>, canceller: Mutex<Option<future::AbortHandle>>,
} }
@ -132,6 +133,18 @@ enum S3RequestControl {
Pause, Pause,
} }
enum State {
Stopped,
Started(Started),
}
#[derive(Default)]
struct Started {
num_uploads_started: usize,
num_uploads_completed: usize,
num_bytes_uploaded: usize,
}
impl S3Upload { impl S3Upload {
fn new( fn new(
s3_client: Client, s3_client: Client,
@ -258,8 +271,9 @@ impl S3HlsSink {
let put_object_req_future = put_object_req.send(); let put_object_req_future = put_object_req.send();
let result = s3utils::wait(&self.canceller, put_object_req_future); let result = s3utils::wait(&self.canceller, put_object_req_future);
if let Err(err) = result { match result {
gst::error!( Err(err) => {
gst::error!(
CAT, CAT,
imp: self, imp: self,
"Put object request for S3 key {} of data length {} failed with error {:?}", "Put object request for S3 key {} of data length {} failed with error {:?}",
@ -267,12 +281,25 @@ impl S3HlsSink {
s3_data_len, s3_data_len,
err, err,
); );
element_imp_error!( element_imp_error!(
self, self,
gst::ResourceError::Write, gst::ResourceError::Write,
["Put object request failed"] ["Put object request failed"]
); );
break; break;
}
Ok(_) => {
let mut state = self.state.lock().unwrap();
match *state {
State::Started(ref mut state) => {
state.num_bytes_uploaded += s3_data_len;
state.num_uploads_completed += 1;
}
State::Stopped => {
unreachable!("State not started yet")
}
};
}
}; };
} }
Ok(S3Request::Delete(data)) => { Ok(S3Request::Delete(data)) => {
@ -400,6 +427,24 @@ impl S3HlsSink {
}; };
}; };
} }
fn create_stats(&self) -> gst::Structure {
let state = self.state.lock().unwrap();
match &*state {
State::Started(state) => gst::Structure::builder("stats")
.field("num-uploads-started", state.num_uploads_started as u32)
.field("num-uploads-completed", state.num_uploads_completed as u32)
.field("num-bytes-uploaded", state.num_bytes_uploaded as u32)
.build(),
State::Stopped => gst::Structure::builder("stats")
.field("num-uploads-started", 0)
.field("num-uploads-completed", 0)
.field("num-bytes-uploaded", 0)
.build(),
}
}
} }
#[glib::object_subclass] #[glib::object_subclass]
@ -423,6 +468,7 @@ impl ObjectSubclass for S3HlsSink {
Self { Self {
settings: Mutex::new(Settings::default()), settings: Mutex::new(Settings::default()),
state: Mutex::new(State::Stopped),
hlssink, hlssink,
canceller: Mutex::new(None), canceller: Mutex::new(None),
} }
@ -488,6 +534,13 @@ impl ObjectImpl for S3HlsSink {
.minimum(1) .minimum(1)
.default_value(DEFAULT_TIMEOUT_IN_MSECS) .default_value(DEFAULT_TIMEOUT_IN_MSECS)
.build(), .build(),
glib::ParamSpecBoxed::new(
"stats",
"Various statistics",
"Various statistics",
gst::Structure::static_type(),
glib::ParamFlags::READABLE,
),
glib::ParamSpecString::builder("endpoint-uri") glib::ParamSpecString::builder("endpoint-uri")
.nick("S3 endpoint URI") .nick("S3 endpoint URI")
.blurb("The S3 endpoint URI to use") .blurb("The S3 endpoint URI to use")
@ -568,6 +621,7 @@ impl ObjectImpl for S3HlsSink {
"acl" => settings.s3_acl.as_str().to_value(), "acl" => settings.s3_acl.as_str().to_value(),
"retry-attempts" => settings.retry_attempts.to_value(), "retry-attempts" => settings.retry_attempts.to_value(),
"request-timeout" => (settings.request_timeout.as_millis() as u64).to_value(), "request-timeout" => (settings.request_timeout.as_millis() as u64).to_value(),
"stats" => self.create_stats().to_value(),
"endpoint-uri" => settings.endpoint_uri.to_value(), "endpoint-uri" => settings.endpoint_uri.to_value(),
_ => unimplemented!(), _ => unimplemented!(),
} }
@ -610,6 +664,11 @@ impl ObjectImpl for S3HlsSink {
let s3client = self_.s3client_from_settings(); let s3client = self_.s3client_from_settings();
let settings = self_.settings.lock().unwrap(); let settings = self_.settings.lock().unwrap();
let mut state = self_.state.lock().unwrap();
match *state {
State::Started(ref mut state) => state.num_uploads_started += 1,
State::Stopped => unreachable!("State not started yet"),
};
let s3_location = args[1].get::<&str>().unwrap(); let s3_location = args[1].get::<&str>().unwrap();
let upload = S3Upload::new( let upload = S3Upload::new(
@ -639,6 +698,11 @@ impl ObjectImpl for S3HlsSink {
let s3client = self_.s3client_from_settings(); let s3client = self_.s3client_from_settings();
let settings = self_.settings.lock().unwrap(); let settings = self_.settings.lock().unwrap();
let mut state = self_.state.lock().unwrap();
match *state {
State::Started(ref mut state) => state.num_uploads_started += 1,
State::Stopped => unreachable!("State not started yet"),
};
let s3_location = args[1].get::<&str>().unwrap(); let s3_location = args[1].get::<&str>().unwrap();
let upload = S3Upload::new( let upload = S3Upload::new(
@ -764,8 +828,24 @@ impl ElementImpl for S3HlsSink {
* in turn will require the settings lock. * in turn will require the settings lock.
*/ */
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
let mut state = self.state.lock().unwrap();
match transition { match transition {
gst::StateChange::ReadyToPaused => *state = State::Started(Started::default()),
gst::StateChange::PausedToPlaying => {
let s3_txc = settings.s3_txc.clone();
if let Some(tx) = s3_txc {
gst::debug!(
CAT,
imp: self,
"Sending continue request to S3 request thread."
);
if tx.send(S3RequestControl::Continue).is_err() {
gst::error!(CAT, imp: self, "Could not send continue request.");
}
}
}
gst::StateChange::PlayingToPaused => { gst::StateChange::PlayingToPaused => {
let s3_txc = settings.s3_txc.clone(); let s3_txc = settings.s3_txc.clone();
if let Some(tx) = s3_txc { if let Some(tx) = s3_txc {
@ -781,19 +861,7 @@ impl ElementImpl for S3HlsSink {
} }
} }
} }
gst::StateChange::PausedToPlaying => {
let s3_txc = settings.s3_txc.clone();
if let Some(tx) = s3_txc {
gst::debug!(
CAT,
imp: self,
"Sending continue request to S3 request thread."
);
if tx.send(S3RequestControl::Continue).is_err() {
gst::error!(CAT, imp: self, "Could not send continue request.");
}
}
}
gst::StateChange::ReadyToNull => { gst::StateChange::ReadyToNull => {
drop(settings); drop(settings);
/* /*
@ -801,6 +869,8 @@ impl ElementImpl for S3HlsSink {
* pending requests. * pending requests.
*/ */
self.stop(); self.stop();
*state = State::Stopped
} }
_ => (), _ => (),
} }