reqwesthttpsrc: Implement client context sharing, including cookie storage and connection re-use

This commit is contained in:
Sebastian Dröge 2019-09-28 15:02:10 +03:00
parent 5976c9c1e4
commit e3a92edd45

View file

@ -10,13 +10,13 @@ use futures::future::Either;
use futures::sync::oneshot;
use futures::{Future, Stream};
use hyperx::header::{
AcceptRanges, ByteRangeSpec, ContentLength, ContentRange, ContentRangeSpec, Headers, Range,
RangeUnit, UserAgent,
AcceptRanges, ByteRangeSpec, Connection, ContentLength, ContentRange, ContentRangeSpec,
Headers, Range, RangeUnit, UserAgent,
};
use reqwest::r#async::{Client, Decoder};
use reqwest::StatusCode;
use std::mem;
use std::sync::Mutex;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use std::u64;
use tokio::prelude::*;
@ -123,6 +123,24 @@ static PROPERTIES: [subclass::Property; 6] = [
}),
];
const REQWEST_CLIENT_CONTEXT: &str = "gst.request.client";
#[derive(Clone, Debug)]
struct ClientContext(Arc<ClientContextInner>);
#[derive(Debug)]
struct ClientContextInner {
client: Client,
}
impl glib::subclass::boxed::BoxedType for ClientContext {
const NAME: &'static str = "ReqwestClientContext";
glib_boxed_type!();
}
glib_boxed_derive_traits!(ClientContext);
#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
enum State {
@ -147,7 +165,8 @@ impl Default for State {
#[derive(Debug)]
pub struct ReqwestHttpSrc {
cat: gst::DebugCategory,
client: Client,
client: Mutex<Option<ClientContext>>,
external_client: Mutex<Option<ClientContext>>,
settings: Mutex<Settings>,
state: Mutex<State>,
runtime: runtime::Runtime,
@ -203,11 +222,75 @@ impl ReqwestHttpSrc {
stop: Option<u64>,
) -> Result<State, gst::ErrorMessage> {
let cat = self.cat;
let req = self.client.get(uri.clone());
let req = {
let mut client_guard = self.client.lock().unwrap();
if client_guard.is_none() {
let srcpad = src.get_static_pad("src").unwrap();
let mut q = gst::Query::new_context(REQWEST_CLIENT_CONTEXT);
if srcpad.peer_query(&mut q) {
if let Some(context) = q.get_context_owned() {
src.set_context(&context);
}
} else {
let _ = src.post_message(
&gst::Message::new_need_context(REQWEST_CLIENT_CONTEXT)
.src(Some(src))
.build(),
);
}
if let Some(client) = {
// FIXME: Is there a simpler way to ensure the lock is not hold
// after this block anymore?
let external_client = self.external_client.lock().unwrap();
let client = external_client.as_ref().map(|c| c.clone());
drop(external_client);
client
} {
gst_debug!(cat, obj: src, "Using shared client");
*client_guard = Some(client);
} else {
gst_debug!(cat, obj: src, "Creating new client");
let client = ClientContext(Arc::new(ClientContextInner {
client: Client::builder()
.cookie_store(true)
.build()
.map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
["Failed to create Client for {}: {}", uri, err]
)
})?,
}));
gst_debug!(cat, obj: src, "Sharing new client with other elements");
let mut context = gst::Context::new(REQWEST_CLIENT_CONTEXT, true);
{
let context = context.get_mut().unwrap();
let s = context.get_mut_structure();
s.set("client", &client);
}
src.set_context(&context);
let _ = src.post_message(
&gst::Message::new_have_context(context)
.src(Some(src))
.build(),
);
*client_guard = Some(client);
}
} else {
gst_debug!(cat, obj: src, "Using already configured client");
}
client_guard.as_ref().unwrap().0.client.get(uri.clone())
};
let settings = self.settings.lock().unwrap().clone();
let mut headers = Headers::new();
headers.set(Connection::keep_alive());
match (start != 0, stop) {
(false, None) => (),
(true, None) => {
@ -459,7 +542,35 @@ impl ObjectImpl for ReqwestHttpSrc {
}
}
impl ElementImpl for ReqwestHttpSrc {}
impl ElementImpl for ReqwestHttpSrc {
fn set_context(&self, element: &gst::Element, context: &gst::Context) {
if context.get_context_type() == REQWEST_CLIENT_CONTEXT {
let mut external_client = self.external_client.lock().unwrap();
let s = context.get_structure();
*external_client = s
.get_some::<&ClientContext>("client")
.map(|c| Some(c.clone()))
.unwrap_or(None);
}
self.parent_set_context(element, context);
}
fn change_state(
&self,
element: &gst::Element,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
match transition {
gst::StateChange::ReadyToNull => {
*self.client.lock().unwrap() = None;
}
_ => (),
}
self.parent_change_state(element, transition)
}
}
impl BaseSrcImpl for ReqwestHttpSrc {
fn is_seekable(&self, _src: &gst_base::BaseSrc) -> bool {
@ -690,7 +801,8 @@ impl ObjectSubclass for ReqwestHttpSrc {
gst::DebugColorFlags::empty(),
Some("Rust HTTP source"),
),
client: Client::new(),
client: Mutex::new(None),
external_client: Mutex::new(None),
settings: Mutex::new(Default::default()),
state: Mutex::new(Default::default()),
runtime: runtime::Builder::new()