2019-06-12 08:07:39 +00:00
// Copyright (C) 2019 Amazon.com, Inc. or its affiliates <mkolny@amazon.com>
//
2022-01-15 18:40:12 +00:00
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
2019-06-12 08:07:39 +00:00
2021-06-03 18:20:54 +00:00
use gst ::glib ;
2019-06-12 08:07:39 +00:00
use gst ::prelude ::* ;
use gst ::subclass ::prelude ::* ;
use gst_base ::subclass ::prelude ::* ;
2022-05-14 05:01:35 +00:00
use aws_sdk_s3 ::client ::fluent_builders ::{
AbortMultipartUpload , CompleteMultipartUpload , CreateMultipartUpload , UploadPart ,
2019-06-12 08:07:39 +00:00
} ;
2022-05-14 05:01:35 +00:00
use aws_sdk_s3 ::config ;
use aws_sdk_s3 ::model ::{ CompletedMultipartUpload , CompletedPart } ;
use aws_sdk_s3 ::types ::ByteStream ;
2022-10-31 09:00:55 +00:00
use aws_sdk_s3 ::{ config ::retry ::RetryConfig , Client , Credentials , Region } ;
2019-06-12 08:07:39 +00:00
2022-05-14 05:01:35 +00:00
use futures ::future ;
2020-11-22 15:43:59 +00:00
use once_cell ::sync ::Lazy ;
2021-11-22 09:12:51 +00:00
use std ::collections ::HashMap ;
2022-05-14 05:01:35 +00:00
use std ::convert ::From ;
2019-06-12 08:07:39 +00:00
use std ::sync ::Mutex ;
2022-02-23 18:10:30 +00:00
use std ::time ::Duration ;
2019-06-12 08:07:39 +00:00
2021-04-30 13:05:35 +00:00
use crate ::s3url ::* ;
2022-05-14 05:01:35 +00:00
use crate ::s3utils ::{ self , duration_from_millis , duration_to_millis , WaitError } ;
2019-06-12 08:07:39 +00:00
2021-12-06 12:06:10 +00:00
use super ::OnError ;
2022-05-14 05:01:35 +00:00
const DEFAULT_RETRY_ATTEMPTS : u32 = 5 ;
const DEFAULT_BUFFER_SIZE : u64 = 5 * 1024 * 1024 ;
2021-12-06 12:06:10 +00:00
const DEFAULT_MULTIPART_UPLOAD_ON_ERROR : OnError = OnError ::DoNothing ;
2022-05-14 05:01:35 +00:00
2022-03-18 09:44:17 +00:00
// General setting for create / abort requests
2022-05-14 05:01:35 +00:00
const DEFAULT_REQUEST_TIMEOUT_MSEC : u64 = 15_000 ;
2022-03-18 09:44:17 +00:00
const DEFAULT_RETRY_DURATION_MSEC : u64 = 60_000 ;
2022-03-18 10:10:18 +00:00
// This needs to be independently configurable, as the part size can be upto 5GB
const DEFAULT_UPLOAD_PART_REQUEST_TIMEOUT_MSEC : u64 = 10_000 ;
const DEFAULT_UPLOAD_PART_RETRY_DURATION_MSEC : u64 = 60_000 ;
2022-03-18 09:44:17 +00:00
// CompletedMultipartUpload can take minutes to complete, so we need a longer value here
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html
const DEFAULT_COMPLETE_REQUEST_TIMEOUT_MSEC : u64 = 600_000 ; // 10 minutes
const DEFAULT_COMPLETE_RETRY_DURATION_MSEC : u64 = 3_600_000 ; // 60 minutes
2021-12-06 12:06:10 +00:00
2019-06-12 08:07:39 +00:00
struct Started {
2022-05-14 05:01:35 +00:00
client : Client ,
2019-06-12 08:07:39 +00:00
buffer : Vec < u8 > ,
upload_id : String ,
part_number : i64 ,
completed_parts : Vec < CompletedPart > ,
}
impl Started {
2022-05-14 05:01:35 +00:00
pub fn new ( client : Client , buffer : Vec < u8 > , upload_id : String ) -> Started {
2019-06-12 08:07:39 +00:00
Started {
2020-04-20 01:28:30 +00:00
client ,
2019-06-12 08:07:39 +00:00
buffer ,
upload_id ,
part_number : 0 ,
completed_parts : Vec ::new ( ) ,
}
}
pub fn increment_part_number ( & mut self ) -> Result < i64 , gst ::ErrorMessage > {
// https://docs.aws.amazon.com/AmazonS3/latest/dev/qfacts.html
const MAX_MULTIPART_NUMBER : i64 = 10000 ;
if self . part_number > MAX_MULTIPART_NUMBER {
2020-12-20 18:43:45 +00:00
return Err ( gst ::error_msg! (
2019-06-12 08:07:39 +00:00
gst ::ResourceError ::Failed ,
[
" Maximum number of parts ({}) reached. " ,
MAX_MULTIPART_NUMBER
]
) ) ;
}
self . part_number + = 1 ;
Ok ( self . part_number )
}
}
2023-03-07 08:46:34 +00:00
#[ derive(Default) ]
2019-06-12 08:07:39 +00:00
enum State {
2023-03-07 08:46:34 +00:00
#[ default ]
2019-06-12 08:07:39 +00:00
Stopped ,
2022-11-07 19:53:15 +00:00
Completed ,
2019-06-12 08:07:39 +00:00
Started ( Started ) ,
}
struct Settings {
2021-06-21 10:20:32 +00:00
region : Region ,
bucket : Option < String > ,
key : Option < String > ,
2019-06-12 08:07:39 +00:00
content_type : Option < String > ,
2023-02-09 13:44:16 +00:00
content_disposition : Option < String > ,
2019-06-12 08:07:39 +00:00
buffer_size : u64 ,
2021-09-27 13:49:12 +00:00
access_key : Option < String > ,
secret_access_key : Option < String > ,
2022-06-16 07:16:28 +00:00
session_token : Option < String > ,
2021-11-22 09:12:51 +00:00
metadata : Option < gst ::Structure > ,
2022-05-14 05:01:35 +00:00
retry_attempts : u32 ,
2021-12-06 12:06:10 +00:00
multipart_upload_on_error : OnError ,
2022-05-14 05:01:35 +00:00
request_timeout : Duration ,
2022-07-30 05:59:50 +00:00
endpoint_uri : Option < String > ,
2019-06-12 08:07:39 +00:00
}
2021-06-21 10:20:32 +00:00
impl Settings {
fn to_uri ( & self ) -> String {
format! (
" s3://{}/{}/{} " ,
2022-05-14 05:01:35 +00:00
self . region ,
2021-06-21 10:20:32 +00:00
self . bucket . as_ref ( ) . unwrap ( ) ,
self . key . as_ref ( ) . unwrap ( )
)
}
2021-11-22 09:12:51 +00:00
2022-10-09 13:06:59 +00:00
fn to_metadata ( & self , imp : & S3Sink ) -> Option < HashMap < String , String > > {
2021-11-22 09:12:51 +00:00
self . metadata . as_ref ( ) . map ( | structure | {
let mut hash = HashMap ::new ( ) ;
for ( key , value ) in structure . iter ( ) {
if let Ok ( Ok ( value_str ) ) = value . transform ::< String > ( ) . map ( | v | v . get ( ) ) {
2022-10-09 13:06:59 +00:00
gst ::log! ( CAT , imp : imp , " metadata '{}' -> '{}' " , key , value_str ) ;
2021-11-22 09:12:51 +00:00
hash . insert ( key . to_string ( ) , value_str ) ;
} else {
2022-02-21 17:43:46 +00:00
gst ::warning! (
2021-11-22 09:12:51 +00:00
CAT ,
2022-10-09 13:06:59 +00:00
imp : imp ,
2021-11-22 09:12:51 +00:00
" Failed to convert metadata '{}' to string ('{:?}') " ,
key ,
value
) ;
}
}
hash
} )
}
2021-06-21 10:20:32 +00:00
}
impl Default for Settings {
fn default ( ) -> Self {
Settings {
2022-05-14 05:01:35 +00:00
region : Region ::new ( " us-west-2 " ) ,
2021-06-21 10:20:32 +00:00
bucket : None ,
key : None ,
content_type : None ,
2023-02-09 13:44:16 +00:00
content_disposition : None ,
2021-09-27 13:49:12 +00:00
access_key : None ,
secret_access_key : None ,
2022-06-16 07:16:28 +00:00
session_token : None ,
2021-11-22 09:12:51 +00:00
metadata : None ,
2022-05-14 05:01:35 +00:00
buffer_size : DEFAULT_BUFFER_SIZE ,
retry_attempts : DEFAULT_RETRY_ATTEMPTS ,
2021-12-06 12:06:10 +00:00
multipart_upload_on_error : DEFAULT_MULTIPART_UPLOAD_ON_ERROR ,
2022-05-14 05:01:35 +00:00
request_timeout : Duration ::from_millis ( DEFAULT_REQUEST_TIMEOUT_MSEC ) ,
2022-07-30 05:59:50 +00:00
endpoint_uri : None ,
2021-06-21 10:20:32 +00:00
}
}
}
2021-03-07 16:22:24 +00:00
#[ derive(Default) ]
2019-06-12 08:07:39 +00:00
pub struct S3Sink {
2021-04-30 13:05:35 +00:00
url : Mutex < Option < GstS3Url > > ,
2019-06-12 08:07:39 +00:00
settings : Mutex < Settings > ,
state : Mutex < State > ,
2020-04-20 01:28:30 +00:00
canceller : Mutex < Option < future ::AbortHandle > > ,
2021-12-06 12:06:10 +00:00
abort_multipart_canceller : Mutex < Option < future ::AbortHandle > > ,
2019-06-12 08:07:39 +00:00
}
2020-11-22 15:43:59 +00:00
static CAT : Lazy < gst ::DebugCategory > = Lazy ::new ( | | {
gst ::DebugCategory ::new (
2022-05-26 09:52:42 +00:00
" aws3sink " ,
2019-10-31 22:34:21 +00:00
gst ::DebugColorFlags ::empty ( ) ,
Some ( " Amazon S3 Sink " ) ,
2020-11-22 15:43:59 +00:00
)
} ) ;
2019-10-31 22:34:21 +00:00
2019-06-12 08:07:39 +00:00
impl S3Sink {
2022-11-07 19:53:15 +00:00
fn flush_multipart_upload ( & self , state : & mut Started ) {
let settings = self . settings . lock ( ) . unwrap ( ) ;
match settings . multipart_upload_on_error {
OnError ::Abort = > {
gst ::log! (
CAT ,
imp : self ,
" Aborting multipart upload request with id: {} " ,
state . upload_id
) ;
match self . abort_multipart_upload_request ( state ) {
Ok ( ( ) ) = > {
gst ::log! (
CAT ,
imp : self ,
" Aborting multipart upload request succeeded. "
) ;
}
Err ( err ) = > gst ::error! (
CAT ,
imp : self ,
" Aborting multipart upload failed: {} " ,
err . to_string ( )
) ,
}
}
OnError ::Complete = > {
gst ::log! (
CAT ,
imp : self ,
" Completing multipart upload request with id: {} " ,
state . upload_id
) ;
match self . complete_multipart_upload_request ( state ) {
Ok ( ( ) ) = > {
gst ::log! (
CAT ,
imp : self ,
" Complete multipart upload request succeeded. "
) ;
}
Err ( err ) = > gst ::error! (
CAT ,
imp : self ,
" Completing multipart upload failed: {} " ,
err . to_string ( )
) ,
}
}
OnError ::DoNothing = > ( ) ,
}
}
2022-10-09 13:06:59 +00:00
fn flush_current_buffer ( & self ) -> Result < ( ) , Option < gst ::ErrorMessage > > {
2022-05-14 05:01:35 +00:00
let upload_part_req : UploadPart = self . create_upload_part_request ( ) ? ;
2019-06-12 08:07:39 +00:00
let mut state = self . state . lock ( ) . unwrap ( ) ;
let state = match * state {
State ::Started ( ref mut started_state ) = > started_state ,
2022-11-07 19:53:15 +00:00
State ::Completed = > {
unreachable! ( " Upload should not be completed yet " ) ;
}
2019-06-12 08:07:39 +00:00
State ::Stopped = > {
unreachable! ( " Element should be started " ) ;
}
} ;
2020-04-20 01:28:30 +00:00
2022-05-14 05:01:35 +00:00
let part_number = state . part_number ;
2022-02-23 18:10:30 +00:00
2022-05-14 05:01:35 +00:00
let upload_part_req_future = upload_part_req . send ( ) ;
let output =
s3utils ::wait ( & self . canceller , upload_part_req_future ) . map_err ( | err | match err {
WaitError ::FutureError ( err ) = > {
2022-11-07 19:53:15 +00:00
self . flush_multipart_upload ( state ) ;
2022-05-14 05:01:35 +00:00
Some ( gst ::error_msg! (
gst ::ResourceError ::OpenWrite ,
[ " Failed to upload part: {} " , err ]
) )
2021-12-06 12:06:10 +00:00
}
2022-05-14 05:01:35 +00:00
WaitError ::Cancelled = > None ,
} ) ? ;
let completed_part = CompletedPart ::builder ( )
. set_e_tag ( output . e_tag )
. set_part_number ( Some ( part_number as i32 ) )
. build ( ) ;
state . completed_parts . push ( completed_part ) ;
2020-04-20 01:28:30 +00:00
2022-10-09 13:06:59 +00:00
gst ::info! ( CAT , imp : self , " Uploaded part {} " , part_number ) ;
2019-06-12 08:07:39 +00:00
Ok ( ( ) )
}
2022-05-14 05:01:35 +00:00
fn create_upload_part_request ( & self ) -> Result < UploadPart , gst ::ErrorMessage > {
2021-04-30 13:05:35 +00:00
let url = self . url . lock ( ) . unwrap ( ) ;
2022-05-14 05:01:35 +00:00
let settings = self . settings . lock ( ) . unwrap ( ) ;
let mut state = self . state . lock ( ) . unwrap ( ) ;
let state = match * state {
State ::Started ( ref mut started_state ) = > started_state ,
2022-11-07 19:53:15 +00:00
State ::Completed = > {
unreachable! ( " Upload should not be completed yet " ) ;
}
2022-05-14 05:01:35 +00:00
State ::Stopped = > {
unreachable! ( " Element should be started " ) ;
}
} ;
2019-06-12 08:07:39 +00:00
2022-05-14 05:01:35 +00:00
let part_number = state . increment_part_number ( ) ? ;
let body = Some ( ByteStream ::from ( std ::mem ::replace (
& mut state . buffer ,
Vec ::with_capacity ( settings . buffer_size as usize ) ,
) ) ) ;
let bucket = Some ( url . as_ref ( ) . unwrap ( ) . bucket . to_owned ( ) ) ;
let key = Some ( url . as_ref ( ) . unwrap ( ) . object . to_owned ( ) ) ;
let upload_id = Some ( state . upload_id . to_owned ( ) ) ;
let client = & state . client ;
let upload_part = client
. upload_part ( )
. set_body ( body )
. set_bucket ( bucket )
. set_key ( key )
. set_upload_id ( upload_id )
. set_part_number ( Some ( part_number as i32 ) ) ;
Ok ( upload_part )
2019-06-12 08:07:39 +00:00
}
2020-04-20 01:28:30 +00:00
fn create_complete_multipart_upload_request (
& self ,
2022-05-14 05:01:35 +00:00
started_state : & mut Started ,
) -> CompleteMultipartUpload {
started_state
. completed_parts
. sort_by ( | a , b | a . part_number . cmp ( & b . part_number ) ) ;
let parts = Some ( std ::mem ::take ( & mut started_state . completed_parts ) ) ;
let completed_upload = CompletedMultipartUpload ::builder ( ) . set_parts ( parts ) . build ( ) ;
2021-04-30 13:05:35 +00:00
let url = self . url . lock ( ) . unwrap ( ) ;
2022-05-14 05:01:35 +00:00
let client = & started_state . client ;
let bucket = Some ( url . as_ref ( ) . unwrap ( ) . bucket . to_owned ( ) ) ;
let key = Some ( url . as_ref ( ) . unwrap ( ) . object . to_owned ( ) ) ;
let upload_id = Some ( started_state . upload_id . to_owned ( ) ) ;
let multipart_upload = Some ( completed_upload ) ;
client
. complete_multipart_upload ( )
. set_bucket ( bucket )
. set_key ( key )
. set_upload_id ( upload_id )
. set_multipart_upload ( multipart_upload )
2019-06-12 08:07:39 +00:00
}
fn create_create_multipart_upload_request (
& self ,
2022-05-14 05:01:35 +00:00
client : & Client ,
2021-04-30 13:05:35 +00:00
url : & GstS3Url ,
2020-04-20 01:28:30 +00:00
settings : & Settings ,
2022-05-14 05:01:35 +00:00
) -> CreateMultipartUpload {
let bucket = Some ( url . bucket . clone ( ) ) ;
let key = Some ( url . object . clone ( ) ) ;
let content_type = settings . content_type . clone ( ) ;
2023-02-09 13:44:16 +00:00
let content_disposition = settings . content_disposition . clone ( ) ;
2022-10-09 13:06:59 +00:00
let metadata = settings . to_metadata ( self ) ;
2022-05-14 05:01:35 +00:00
client
. create_multipart_upload ( )
. set_bucket ( bucket )
. set_key ( key )
. set_content_type ( content_type )
2023-02-09 13:44:16 +00:00
. set_content_disposition ( content_disposition )
2022-05-14 05:01:35 +00:00
. set_metadata ( metadata )
2019-06-12 08:07:39 +00:00
}
2021-12-06 12:06:10 +00:00
fn create_abort_multipart_upload_request (
& self ,
2022-05-14 05:01:35 +00:00
client : & Client ,
2021-12-06 12:06:10 +00:00
url : & GstS3Url ,
started_state : & Started ,
2022-05-14 05:01:35 +00:00
) -> AbortMultipartUpload {
let bucket = Some ( url . bucket . clone ( ) ) ;
let key = Some ( url . object . clone ( ) ) ;
client
. abort_multipart_upload ( )
. set_bucket ( bucket )
. set_expected_bucket_owner ( None )
. set_key ( key )
. set_request_payer ( None )
. set_upload_id ( Some ( started_state . upload_id . to_owned ( ) ) )
2021-12-06 12:06:10 +00:00
}
2019-06-12 08:07:39 +00:00
2021-12-06 12:06:10 +00:00
fn abort_multipart_upload_request (
& self ,
started_state : & Started ,
) -> Result < ( ) , gst ::ErrorMessage > {
2022-06-30 12:44:07 +00:00
let s3url = {
let url = self . url . lock ( ) . unwrap ( ) ;
match * url {
Some ( ref url ) = > url . clone ( ) ,
None = > unreachable! ( " Element should be started " ) ,
}
2020-04-20 01:28:30 +00:00
} ;
2022-03-18 09:44:17 +00:00
2022-05-14 05:01:35 +00:00
let client = & started_state . client ;
let abort_req = self . create_abort_multipart_upload_request ( client , & s3url , started_state ) ;
let abort_req_future = abort_req . send ( ) ;
s3utils ::wait ( & self . abort_multipart_canceller , abort_req_future )
. map ( | _ | ( ) )
. map_err ( | err | match err {
WaitError ::FutureError ( err ) = > {
gst ::error_msg! (
gst ::ResourceError ::Write ,
[ " Failed to abort multipart upload: {}. " , err . to_string ( ) ]
)
}
WaitError ::Cancelled = > {
gst ::error_msg! (
gst ::ResourceError ::Write ,
[ " Abort multipart upload request interrupted. " ]
)
}
} )
2021-12-06 12:06:10 +00:00
}
fn complete_multipart_upload_request (
& self ,
started_state : & mut Started ,
) -> Result < ( ) , gst ::ErrorMessage > {
2022-05-14 05:01:35 +00:00
let complete_req = self . create_complete_multipart_upload_request ( started_state ) ;
let complete_req_future = complete_req . send ( ) ;
2022-03-18 09:44:17 +00:00
2022-05-14 05:01:35 +00:00
s3utils ::wait ( & self . canceller , complete_req_future )
. map ( | _ | ( ) )
. map_err ( | err | match err {
WaitError ::FutureError ( err ) = > gst ::error_msg! (
gst ::ResourceError ::Write ,
[ " Failed to complete multipart upload: {}. " , err . to_string ( ) ]
) ,
WaitError ::Cancelled = > {
gst ::error_msg! (
gst ::LibraryError ::Failed ,
[ " Complete multipart upload request interrupted " ]
)
}
} )
2019-06-12 08:07:39 +00:00
}
2022-10-09 13:06:59 +00:00
fn finalize_upload ( & self ) -> Result < ( ) , gst ::ErrorMessage > {
if self . flush_current_buffer ( ) . is_err ( ) {
2021-12-06 12:06:10 +00:00
return Err ( gst ::error_msg! (
gst ::ResourceError ::Settings ,
[ " Failed to flush internal buffer. " ]
) ) ;
}
let mut state = self . state . lock ( ) . unwrap ( ) ;
let started_state = match * state {
State ::Started ( ref mut started_state ) = > started_state ,
2022-11-07 19:53:15 +00:00
State ::Completed = > {
unreachable! ( " Upload should not be completed yet " ) ;
}
2021-12-06 12:06:10 +00:00
State ::Stopped = > {
unreachable! ( " Element should be started " ) ;
}
} ;
2022-11-07 19:53:15 +00:00
let res = self . complete_multipart_upload_request ( started_state ) ;
if res . is_ok ( ) {
* state = State ::Completed ;
}
res
2021-12-06 12:06:10 +00:00
}
2020-04-20 01:28:30 +00:00
fn start ( & self ) -> Result < ( ) , gst ::ErrorMessage > {
let mut state = self . state . lock ( ) . unwrap ( ) ;
let settings = self . settings . lock ( ) . unwrap ( ) ;
if let State ::Started { .. } = * state {
unreachable! ( " Element should be started " ) ;
}
2022-06-30 12:44:07 +00:00
let s3url = {
let url = self . url . lock ( ) . unwrap ( ) ;
match * url {
Some ( ref url ) = > url . clone ( ) ,
None = > {
return Err ( gst ::error_msg! (
gst ::ResourceError ::Settings ,
[ " Cannot start without a URL being set " ]
) ) ;
}
2021-04-30 13:05:35 +00:00
}
} ;
2022-05-14 05:01:35 +00:00
let timeout_config = s3utils ::timeout_config ( settings . request_timeout ) ;
let cred = match (
2021-09-27 13:49:12 +00:00
settings . access_key . as_ref ( ) ,
settings . secret_access_key . as_ref ( ) ,
) {
2022-05-14 05:01:35 +00:00
( Some ( access_key ) , Some ( secret_access_key ) ) = > Some ( Credentials ::new (
access_key . clone ( ) ,
secret_access_key . clone ( ) ,
2022-06-16 07:16:28 +00:00
settings . session_token . clone ( ) ,
2022-05-14 05:01:35 +00:00
None ,
2022-05-26 09:52:42 +00:00
" aws-s3-sink " ,
2022-05-14 05:01:35 +00:00
) ) ,
_ = > None ,
2021-09-27 13:49:12 +00:00
} ;
2020-04-20 01:28:30 +00:00
2022-05-14 05:01:35 +00:00
let sdk_config =
s3utils ::wait_config ( & self . canceller , s3url . region . clone ( ) , timeout_config , cred )
. map_err ( | err | match err {
WaitError ::FutureError ( err ) = > gst ::error_msg! (
gst ::ResourceError ::OpenWrite ,
[ " Failed to create SDK config: {} " , err ]
) ,
WaitError ::Cancelled = > {
gst ::error_msg! (
gst ::LibraryError ::Failed ,
[ " SDK config request interrupted during start " ]
)
}
} ) ? ;
2020-04-20 01:28:30 +00:00
2022-07-30 05:59:50 +00:00
let config_builder = config ::Builder ::from ( & sdk_config )
2022-09-21 08:01:49 +00:00
. retry_config ( RetryConfig ::standard ( ) . with_max_attempts ( settings . retry_attempts ) ) ;
2022-07-30 05:59:50 +00:00
2023-01-14 16:58:30 +00:00
let config = if let Some ( ref uri ) = settings . endpoint_uri {
config_builder . endpoint_url ( uri ) . build ( )
2022-07-30 05:59:50 +00:00
} else {
config_builder . build ( )
} ;
2022-05-14 05:01:35 +00:00
let client = Client ::from_conf ( config ) ;
let create_multipart_req =
self . create_create_multipart_upload_request ( & client , & s3url , & settings ) ;
let create_multipart_req_future = create_multipart_req . send ( ) ;
let response = s3utils ::wait ( & self . canceller , create_multipart_req_future ) . map_err (
| err | match err {
WaitError ::FutureError ( err ) = > gst ::error_msg! (
gst ::ResourceError ::OpenWrite ,
[ " Failed to create multipart upload: {} " , err ]
) ,
WaitError ::Cancelled = > {
gst ::error_msg! (
gst ::LibraryError ::Failed ,
[ " Create multipart request interrupted during start " ]
)
}
} ,
) ? ;
2019-06-12 08:07:39 +00:00
2019-07-04 15:30:26 +00:00
let upload_id = response . upload_id . ok_or_else ( | | {
2020-12-20 18:43:45 +00:00
gst ::error_msg! (
2019-07-04 15:30:26 +00:00
gst ::ResourceError ::Failed ,
[ " Failed to get multipart upload ID " ]
)
} ) ? ;
2019-06-12 08:07:39 +00:00
2020-04-20 01:28:30 +00:00
* state = State ::Started ( Started ::new (
client ,
Vec ::with_capacity ( settings . buffer_size as usize ) ,
2019-06-12 08:07:39 +00:00
upload_id ,
2020-04-20 01:28:30 +00:00
) ) ;
Ok ( ( ) )
2019-06-12 08:07:39 +00:00
}
2022-10-09 13:06:59 +00:00
fn update_buffer ( & self , src : & [ u8 ] ) -> Result < ( ) , Option < gst ::ErrorMessage > > {
2019-06-12 08:07:39 +00:00
let mut state = self . state . lock ( ) . unwrap ( ) ;
let started_state = match * state {
State ::Started ( ref mut started_state ) = > started_state ,
2022-11-07 19:53:15 +00:00
State ::Completed = > {
unreachable! ( " Upload should not be completed yet " ) ;
}
2019-06-12 08:07:39 +00:00
State ::Stopped = > {
unreachable! ( " Element should be started already " ) ;
}
} ;
let to_copy = std ::cmp ::min (
started_state . buffer . capacity ( ) - started_state . buffer . len ( ) ,
src . len ( ) ,
) ;
let ( head , tail ) = src . split_at ( to_copy ) ;
started_state . buffer . extend_from_slice ( head ) ;
let do_flush = started_state . buffer . capacity ( ) = = started_state . buffer . len ( ) ;
drop ( state ) ;
if do_flush {
2022-10-09 13:06:59 +00:00
self . flush_current_buffer ( ) ? ;
2019-06-12 08:07:39 +00:00
}
if to_copy < src . len ( ) {
2022-10-09 13:06:59 +00:00
self . update_buffer ( tail ) ? ;
2019-06-12 08:07:39 +00:00
}
Ok ( ( ) )
}
fn cancel ( & self ) {
let mut canceller = self . canceller . lock ( ) . unwrap ( ) ;
2021-12-06 12:06:10 +00:00
let mut abort_canceller = self . abort_multipart_canceller . lock ( ) . unwrap ( ) ;
if let Some ( c ) = abort_canceller . take ( ) {
c . abort ( )
} ;
2019-06-12 08:07:39 +00:00
2020-04-20 01:28:30 +00:00
if let Some ( c ) = canceller . take ( ) {
c . abort ( )
} ;
2019-06-12 08:07:39 +00:00
}
2021-04-30 13:05:35 +00:00
2022-10-09 13:06:59 +00:00
fn set_uri ( self : & S3Sink , url_str : Option < & str > ) -> Result < ( ) , glib ::Error > {
2021-04-30 13:05:35 +00:00
let state = self . state . lock ( ) . unwrap ( ) ;
if let State ::Started { .. } = * state {
return Err ( glib ::Error ::new (
gst ::URIError ::BadState ,
" Cannot set URI on a started s3sink " ,
) ) ;
}
let mut url = self . url . lock ( ) . unwrap ( ) ;
if url_str . is_none ( ) {
* url = None ;
return Ok ( ( ) ) ;
}
2022-10-09 13:06:59 +00:00
gst ::debug! ( CAT , imp : self , " Setting uri to {:?} " , url_str ) ;
2021-06-21 10:20:32 +00:00
2021-04-30 13:05:35 +00:00
let url_str = url_str . unwrap ( ) ;
match parse_s3_url ( url_str ) {
Ok ( s3url ) = > {
* url = Some ( s3url ) ;
Ok ( ( ) )
}
Err ( _ ) = > Err ( glib ::Error ::new (
gst ::URIError ::BadUri ,
" Could not parse URI " ,
) ) ,
}
}
2019-06-12 08:07:39 +00:00
}
2021-03-07 16:22:24 +00:00
#[ glib::object_subclass ]
2019-06-12 08:07:39 +00:00
impl ObjectSubclass for S3Sink {
2022-10-23 15:42:58 +00:00
const NAME : & 'static str = " GstAwsS3Sink " ;
2020-11-14 17:24:01 +00:00
type Type = super ::S3Sink ;
2019-06-12 08:07:39 +00:00
type ParentType = gst_base ::BaseSink ;
2021-04-30 13:05:35 +00:00
type Interfaces = ( gst ::URIHandler , ) ;
2021-01-21 18:21:29 +00:00
}
2019-06-12 08:07:39 +00:00
2021-01-21 18:21:29 +00:00
impl ObjectImpl for S3Sink {
fn properties ( ) -> & 'static [ glib ::ParamSpec ] {
static PROPERTIES : Lazy < Vec < glib ::ParamSpec > > = Lazy ::new ( | | {
vec! [
2022-08-18 12:04:15 +00:00
glib ::ParamSpecString ::builder ( " bucket " )
. nick ( " S3 Bucket " )
. blurb ( " The bucket of the file to write " )
. mutable_ready ( )
. build ( ) ,
glib ::ParamSpecString ::builder ( " key " )
. nick ( " S3 Key " )
. blurb ( " The key of the file to write " )
. mutable_ready ( )
. build ( ) ,
glib ::ParamSpecString ::builder ( " region " )
. nick ( " AWS Region " )
. blurb ( " An AWS region (e.g. eu-west-2). " )
. default_value ( Some ( " us-west-2 " ) )
. mutable_ready ( )
. build ( ) ,
glib ::ParamSpecUInt64 ::builder ( " part-size " )
. nick ( " Part size " )
. blurb ( " A size (in bytes) of an individual part used for multipart upload. " )
. minimum ( 5 * 1024 * 1024 ) // 5 MB
. maximum ( 5 * 1024 * 1024 * 1024 ) // 5 GB
. default_value ( DEFAULT_BUFFER_SIZE )
. mutable_ready ( )
. build ( ) ,
glib ::ParamSpecString ::builder ( " uri " )
. nick ( " URI " )
. blurb ( " The S3 object URI " )
. mutable_ready ( )
. build ( ) ,
glib ::ParamSpecString ::builder ( " access-key " )
. nick ( " Access Key " )
. blurb ( " AWS Access Key " )
. mutable_ready ( )
. build ( ) ,
glib ::ParamSpecString ::builder ( " secret-access-key " )
. nick ( " Secret Access Key " )
. blurb ( " AWS Secret Access Key " )
. mutable_ready ( )
. build ( ) ,
glib ::ParamSpecString ::builder ( " session-token " )
. nick ( " Session Token " )
. blurb ( " AWS temporary Session Token from STS " )
. mutable_ready ( )
. build ( ) ,
2022-09-05 08:45:47 +00:00
glib ::ParamSpecBoxed ::builder ::< gst ::Structure > ( " metadata " )
2022-08-18 12:04:15 +00:00
. nick ( " Metadata " )
. blurb ( " A map of metadata to store with the object in S3; field values need to be convertible to strings. " )
. mutable_ready ( )
. build ( ) ,
2023-01-21 16:13:48 +00:00
glib ::ParamSpecEnum ::builder_with_default ( " on-error " , DEFAULT_MULTIPART_UPLOAD_ON_ERROR )
2022-08-18 12:04:15 +00:00
. nick ( " Whether to upload or complete the multipart upload on error " )
. blurb ( " Do nothing, abort or complete a multipart upload request on error " )
. mutable_ready ( )
. build ( ) ,
glib ::ParamSpecUInt ::builder ( " retry-attempts " )
. nick ( " Retry attempts " )
. blurb ( " Number of times AWS SDK attempts a request before abandoning the request " )
. minimum ( 1 )
. maximum ( 10 )
. default_value ( DEFAULT_RETRY_ATTEMPTS )
. build ( ) ,
glib ::ParamSpecInt64 ::builder ( " request-timeout " )
. nick ( " Request timeout " )
. blurb ( " Timeout for general S3 requests (in ms, set to -1 for infinity) " )
. minimum ( - 1 )
. default_value ( DEFAULT_REQUEST_TIMEOUT_MSEC as i64 )
. build ( ) ,
glib ::ParamSpecInt64 ::builder ( " upload-part-request-timeout " )
. nick ( " Upload part request timeout " )
. blurb ( " Timeout for a single upload part request (in ms, set to -1 for infinity) (Deprecated. Use request-timeout.) " )
. minimum ( - 1 )
. default_value ( DEFAULT_UPLOAD_PART_REQUEST_TIMEOUT_MSEC as i64 )
. build ( ) ,
glib ::ParamSpecInt64 ::builder ( " complete-upload-request-timeout " )
. nick ( " Complete upload request timeout " )
. blurb ( " Timeout for the complete multipart upload request (in ms, set to -1 for infinity) (Deprecated. Use request-timeout.) " )
. minimum ( - 1 )
. default_value ( DEFAULT_COMPLETE_REQUEST_TIMEOUT_MSEC as i64 )
. build ( ) ,
glib ::ParamSpecInt64 ::builder ( " retry-duration " )
. nick ( " Retry duration " )
. blurb ( " How long we should retry general S3 requests before giving up (in ms, set to -1 for infinity) (Deprecated. Use retry-attempts.) " )
. minimum ( - 1 )
. default_value ( DEFAULT_RETRY_DURATION_MSEC as i64 )
. build ( ) ,
glib ::ParamSpecInt64 ::builder ( " upload-part-retry-duration " )
. nick ( " Upload part retry duration " )
. blurb ( " How long we should retry upload part requests before giving up (in ms, set to -1 for infinity) (Deprecated. Use retry-attempts.) " )
. minimum ( - 1 )
. default_value ( DEFAULT_UPLOAD_PART_RETRY_DURATION_MSEC as i64 )
. build ( ) ,
glib ::ParamSpecInt64 ::builder ( " complete-upload-retry-duration " )
. nick ( " Complete upload retry duration " )
. blurb ( " How long we should retry complete multipart upload requests before giving up (in ms, set to -1 for infinity) (Deprecated. Use retry-attempts.) " )
. minimum ( - 1 )
. default_value ( DEFAULT_COMPLETE_RETRY_DURATION_MSEC as i64 )
. build ( ) ,
glib ::ParamSpecString ::builder ( " endpoint-uri " )
. nick ( " S3 endpoint URI " )
. blurb ( " The S3 endpoint URI to use " )
. build ( ) ,
2023-02-09 13:44:16 +00:00
glib ::ParamSpecString ::builder ( " content-type " )
. nick ( " content-type " )
. blurb ( " Content-Type header to set for uploaded object " )
. build ( ) ,
glib ::ParamSpecString ::builder ( " content-disposition " )
. nick ( " content-disposition " )
. blurb ( " Content-Disposition header to set for uploaded object " )
. build ( ) ,
2021-01-21 18:21:29 +00:00
]
} ) ;
2019-06-12 08:07:39 +00:00
2021-01-21 18:21:29 +00:00
PROPERTIES . as_ref ( )
2019-06-12 08:07:39 +00:00
}
2022-10-09 13:06:59 +00:00
fn set_property ( & self , _id : usize , value : & glib ::Value , pspec : & glib ::ParamSpec ) {
2019-06-12 08:07:39 +00:00
let mut settings = self . settings . lock ( ) . unwrap ( ) ;
2022-02-21 17:43:46 +00:00
gst ::debug! (
2021-11-22 10:28:46 +00:00
CAT ,
2022-10-09 13:06:59 +00:00
imp : self ,
2021-11-22 10:28:46 +00:00
" Setting property '{}' to '{:?}' " ,
pspec . name ( ) ,
value
) ;
2021-04-12 12:49:54 +00:00
match pspec . name ( ) {
2021-06-21 10:20:32 +00:00
" bucket " = > {
settings . bucket = value
. get ::< Option < String > > ( )
. expect ( " type checked upstream " ) ;
if settings . key . is_some ( ) {
2022-10-09 13:06:59 +00:00
let _ = self . set_uri ( Some ( & settings . to_uri ( ) ) ) ;
2021-06-21 10:20:32 +00:00
}
}
" key " = > {
settings . key = value
. get ::< Option < String > > ( )
. expect ( " type checked upstream " ) ;
if settings . bucket . is_some ( ) {
2022-10-09 13:06:59 +00:00
let _ = self . set_uri ( Some ( & settings . to_uri ( ) ) ) ;
2021-06-21 10:20:32 +00:00
}
}
" region " = > {
2021-09-27 16:16:26 +00:00
let region = value . get ::< String > ( ) . expect ( " type checked upstream " ) ;
2022-05-14 05:01:35 +00:00
settings . region = Region ::new ( region ) ;
2021-06-21 10:20:32 +00:00
if settings . key . is_some ( ) & & settings . bucket . is_some ( ) {
2022-10-09 13:06:59 +00:00
let _ = self . set_uri ( Some ( & settings . to_uri ( ) ) ) ;
2021-06-21 10:20:32 +00:00
}
}
2021-01-21 18:21:29 +00:00
" part-size " = > {
2021-04-25 12:41:22 +00:00
settings . buffer_size = value . get ::< u64 > ( ) . expect ( " type checked upstream " ) ;
2019-06-12 08:07:39 +00:00
}
2021-04-30 13:05:35 +00:00
" uri " = > {
2022-10-09 13:06:59 +00:00
let _ = self . set_uri ( value . get ( ) . expect ( " type checked upstream " ) ) ;
2021-04-30 13:05:35 +00:00
}
2021-09-27 13:49:12 +00:00
" access-key " = > {
settings . access_key = value . get ( ) . expect ( " type checked upstream " ) ;
}
" secret-access-key " = > {
settings . secret_access_key = value . get ( ) . expect ( " type checked upstream " ) ;
}
2022-06-16 07:16:28 +00:00
" session-token " = > {
settings . session_token = value . get ( ) . expect ( " type checked upstream " ) ;
}
2021-11-22 09:12:51 +00:00
" metadata " = > {
settings . metadata = value . get ( ) . expect ( " type checked upstream " ) ;
}
2021-12-06 12:06:10 +00:00
" on-error " = > {
settings . multipart_upload_on_error =
value . get ::< OnError > ( ) . expect ( " type checked upstream " ) ;
}
2022-05-14 05:01:35 +00:00
" retry-attempts " = > {
settings . retry_attempts = value . get ::< u32 > ( ) . expect ( " type checked upstream " ) ;
}
2022-03-18 10:10:18 +00:00
" request-timeout " = > {
settings . request_timeout =
duration_from_millis ( value . get ::< i64 > ( ) . expect ( " type checked upstream " ) ) ;
}
2022-02-23 18:10:30 +00:00
" upload-part-request-timeout " = > {
2022-05-14 05:01:35 +00:00
settings . request_timeout =
2022-03-18 10:10:18 +00:00
duration_from_millis ( value . get ::< i64 > ( ) . expect ( " type checked upstream " ) ) ;
}
" complete-upload-request-timeout " = > {
2022-05-14 05:01:35 +00:00
settings . request_timeout =
2022-03-18 10:10:18 +00:00
duration_from_millis ( value . get ::< i64 > ( ) . expect ( " type checked upstream " ) ) ;
}
2022-06-06 13:27:50 +00:00
" retry-duration " = > {
2022-05-14 05:01:35 +00:00
/*
* To maintain backwards compatibility calculate retry attempts
* by dividing the provided duration from request timeout .
* /
let value = value . get ::< i64 > ( ) . expect ( " type checked upstream " ) ;
let request_timeout = duration_to_millis ( Some ( settings . request_timeout ) ) ;
let retry_attempts = if value > request_timeout {
value / request_timeout
} else {
2022-09-09 04:23:06 +00:00
1
2022-05-14 05:01:35 +00:00
} ;
settings . retry_attempts = retry_attempts as u32 ;
2022-02-23 18:10:30 +00:00
}
2022-06-06 13:27:50 +00:00
" upload-part-retry-duration " | " complete-upload-retry-duration " = > {
gst ::warning! ( CAT , " Use retry-attempts. retry/upload-part/complete-upload-retry duration are deprecated. " ) ;
}
2022-07-30 05:59:50 +00:00
" endpoint-uri " = > {
settings . endpoint_uri = value
. get ::< Option < String > > ( )
. expect ( " type checked upstream " ) ;
if settings . key . is_some ( ) & & settings . bucket . is_some ( ) {
2022-10-09 13:06:59 +00:00
let _ = self . set_uri ( Some ( & settings . to_uri ( ) ) ) ;
2022-07-30 05:59:50 +00:00
}
}
2023-02-09 13:44:16 +00:00
" content-type " = > {
settings . content_type = value
. get ::< Option < String > > ( )
. expect ( " type checked upstream " ) ;
}
" content-disposition " = > {
settings . content_disposition = value
. get ::< Option < String > > ( )
. expect ( " type checked upstream " ) ;
}
2019-06-12 08:07:39 +00:00
_ = > unimplemented! ( ) ,
}
}
2022-10-09 13:06:59 +00:00
fn property ( & self , _id : usize , pspec : & glib ::ParamSpec ) -> glib ::Value {
2019-06-12 08:07:39 +00:00
let settings = self . settings . lock ( ) . unwrap ( ) ;
2021-04-12 12:49:54 +00:00
match pspec . name ( ) {
2021-06-21 10:20:32 +00:00
" key " = > settings . key . to_value ( ) ,
" bucket " = > settings . bucket . to_value ( ) ,
2022-05-14 05:01:35 +00:00
" region " = > settings . region . to_string ( ) . to_value ( ) ,
2021-01-21 18:21:29 +00:00
" part-size " = > settings . buffer_size . to_value ( ) ,
2021-04-30 13:05:35 +00:00
" uri " = > {
2022-06-30 12:44:07 +00:00
let url = self . url . lock ( ) . unwrap ( ) ;
let url = match * url {
2021-04-30 13:05:35 +00:00
Some ( ref url ) = > url . to_string ( ) ,
None = > " " . to_string ( ) ,
} ;
url . to_value ( )
}
2021-09-27 13:49:12 +00:00
" access-key " = > settings . access_key . to_value ( ) ,
" secret-access-key " = > settings . secret_access_key . to_value ( ) ,
2022-06-16 07:16:28 +00:00
" session-token " = > settings . session_token . to_value ( ) ,
2021-11-22 09:12:51 +00:00
" metadata " = > settings . metadata . to_value ( ) ,
2021-12-06 12:06:10 +00:00
" on-error " = > settings . multipart_upload_on_error . to_value ( ) ,
2022-05-14 05:01:35 +00:00
" retry-attempts " = > settings . retry_attempts . to_value ( ) ,
" request-timeout " = > duration_to_millis ( Some ( settings . request_timeout ) ) . to_value ( ) ,
2022-02-23 18:10:30 +00:00
" upload-part-request-timeout " = > {
2022-05-14 05:01:35 +00:00
duration_to_millis ( Some ( settings . request_timeout ) ) . to_value ( )
2022-03-18 10:10:18 +00:00
}
" complete-upload-request-timeout " = > {
2022-05-14 05:01:35 +00:00
duration_to_millis ( Some ( settings . request_timeout ) ) . to_value ( )
2022-03-18 10:10:18 +00:00
}
2022-05-14 05:01:35 +00:00
" retry-duration " | " upload-part-retry-duration " | " complete-upload-retry-duration " = > {
let request_timeout = duration_to_millis ( Some ( settings . request_timeout ) ) ;
( settings . retry_attempts as i64 * request_timeout ) . to_value ( )
2022-02-23 18:10:30 +00:00
}
2022-07-30 05:59:50 +00:00
" endpoint-uri " = > settings . endpoint_uri . to_value ( ) ,
2023-02-09 13:44:16 +00:00
" content-type " = > settings . content_type . to_value ( ) ,
" content-disposition " = > settings . content_disposition . to_value ( ) ,
2019-06-12 08:07:39 +00:00
_ = > unimplemented! ( ) ,
}
}
}
2021-10-23 08:57:31 +00:00
impl GstObjectImpl for S3Sink { }
2021-01-21 18:21:29 +00:00
impl ElementImpl for S3Sink {
fn metadata ( ) -> Option < & 'static gst ::subclass ::ElementMetadata > {
static ELEMENT_METADATA : Lazy < gst ::subclass ::ElementMetadata > = Lazy ::new ( | | {
2022-08-25 22:30:08 +00:00
#[ cfg(feature = " doc " ) ]
OnError ::static_type ( ) . mark_as_plugin_api ( gst ::PluginAPIFlags ::empty ( ) ) ;
2021-01-21 18:21:29 +00:00
gst ::subclass ::ElementMetadata ::new (
" Amazon S3 sink " ,
" Source/Network " ,
" Writes an object to Amazon S3 " ,
" Marcin Kolny <mkolny@amazon.com> " ,
)
} ) ;
Some ( & * ELEMENT_METADATA )
}
fn pad_templates ( ) -> & 'static [ gst ::PadTemplate ] {
static PAD_TEMPLATES : Lazy < Vec < gst ::PadTemplate > > = Lazy ::new ( | | {
let caps = gst ::Caps ::new_any ( ) ;
let sink_pad_template = gst ::PadTemplate ::new (
" sink " ,
gst ::PadDirection ::Sink ,
gst ::PadPresence ::Always ,
& caps ,
)
. unwrap ( ) ;
vec! [ sink_pad_template ]
} ) ;
PAD_TEMPLATES . as_ref ( )
}
}
2019-06-12 08:07:39 +00:00
2021-04-30 13:05:35 +00:00
impl URIHandlerImpl for S3Sink {
const URI_TYPE : gst ::URIType = gst ::URIType ::Sink ;
fn protocols ( ) -> & 'static [ & 'static str ] {
& [ " s3 " ]
}
2022-10-09 13:06:59 +00:00
fn uri ( & self ) -> Option < String > {
2021-04-30 13:05:35 +00:00
self . url . lock ( ) . unwrap ( ) . as_ref ( ) . map ( | s | s . to_string ( ) )
}
2022-10-09 13:06:59 +00:00
fn set_uri ( & self , uri : & str ) -> Result < ( ) , glib ::Error > {
self . set_uri ( Some ( uri ) )
2021-04-30 13:05:35 +00:00
}
}
2019-06-12 08:07:39 +00:00
impl BaseSinkImpl for S3Sink {
2022-10-09 13:06:59 +00:00
fn start ( & self ) -> Result < ( ) , gst ::ErrorMessage > {
2020-04-20 01:28:30 +00:00
self . start ( )
2019-06-12 08:07:39 +00:00
}
2022-10-09 13:06:59 +00:00
fn stop ( & self ) -> Result < ( ) , gst ::ErrorMessage > {
2019-06-12 08:07:39 +00:00
let mut state = self . state . lock ( ) . unwrap ( ) ;
2022-11-07 19:53:15 +00:00
if let State ::Started ( ref mut state ) = * state {
gst ::warning! ( CAT , imp : self , " Stopped without EOS " ) ;
// We're stopping without an EOS -- treat this as an error and deal with the open
// multipart upload accordingly _if_ we managed to upload any parts
if ! state . completed_parts . is_empty ( ) {
self . flush_multipart_upload ( state ) ;
}
}
2019-06-12 08:07:39 +00:00
* state = State ::Stopped ;
2022-10-09 13:06:59 +00:00
gst ::info! ( CAT , imp : self , " Stopped " ) ;
2019-06-12 08:07:39 +00:00
Ok ( ( ) )
}
2022-10-09 13:06:59 +00:00
fn render ( & self , buffer : & gst ::Buffer ) -> Result < gst ::FlowSuccess , gst ::FlowError > {
2019-06-12 08:07:39 +00:00
if let State ::Stopped = * self . state . lock ( ) . unwrap ( ) {
2022-10-09 13:06:59 +00:00
gst ::element_imp_error! ( self , gst ::CoreError ::Failed , [ " Not started yet " ] ) ;
2019-06-12 08:07:39 +00:00
return Err ( gst ::FlowError ::Error ) ;
}
2022-11-07 19:53:15 +00:00
if let State ::Completed = * self . state . lock ( ) . unwrap ( ) {
gst ::element_imp_error! (
self ,
gst ::CoreError ::Failed ,
[ " Trying to render after upload complete " ]
) ;
return Err ( gst ::FlowError ::Error ) ;
}
2022-10-09 13:06:59 +00:00
gst ::trace! ( CAT , imp : self , " Rendering {:?} " , buffer ) ;
2019-12-18 05:50:10 +00:00
let map = buffer . map_readable ( ) . map_err ( | _ | {
2022-10-09 13:06:59 +00:00
gst ::element_imp_error! ( self , gst ::CoreError ::Failed , [ " Failed to map buffer " ] ) ;
2019-06-12 08:07:39 +00:00
gst ::FlowError ::Error
} ) ? ;
2022-10-09 13:06:59 +00:00
match self . update_buffer ( & map ) {
2019-06-12 08:07:39 +00:00
Ok ( _ ) = > Ok ( gst ::FlowSuccess ::Ok ) ,
Err ( err ) = > match err {
Some ( error_message ) = > {
2022-10-09 13:06:59 +00:00
gst ::error! ( CAT , imp : self , " Multipart upload failed: {} " , error_message ) ;
self . post_error_message ( error_message ) ;
2019-06-12 08:07:39 +00:00
Err ( gst ::FlowError ::Error )
}
_ = > {
2022-10-09 13:06:59 +00:00
gst ::info! ( CAT , imp : self , " Upload interrupted. Flushing... " ) ;
2019-06-12 08:07:39 +00:00
Err ( gst ::FlowError ::Flushing )
}
} ,
}
}
2022-10-09 13:06:59 +00:00
fn unlock ( & self ) -> Result < ( ) , gst ::ErrorMessage > {
2019-06-12 08:07:39 +00:00
self . cancel ( ) ;
Ok ( ( ) )
}
2022-10-09 13:06:59 +00:00
fn event ( & self , event : gst ::Event ) -> bool {
2019-11-24 22:00:27 +00:00
if let gst ::EventView ::Eos ( _ ) = event . view ( ) {
2022-10-09 13:06:59 +00:00
if let Err ( error_message ) = self . finalize_upload ( ) {
2022-02-21 17:43:46 +00:00
gst ::error! (
2019-11-24 22:00:27 +00:00
CAT ,
2022-10-09 13:06:59 +00:00
imp : self ,
2019-11-24 22:00:27 +00:00
" Failed to finalize the upload: {} " ,
error_message
) ;
return false ;
2019-06-12 08:07:39 +00:00
}
}
2022-10-09 13:06:59 +00:00
BaseSinkImplExt ::parent_event ( self , event )
2019-06-12 08:07:39 +00:00
}
}