Implemented proxy support

Implemented analogously to souphttpsrc for compatibility. Proxy
prevents sharing the client between element instances.

Change-Id: I50d676fd55f0e1d7051d8cd7d5922b7be4f0c6e8
This commit is contained in:
Karl Rikte 2021-04-14 23:58:37 +02:00 committed by Karl Rikte
parent bf5e231e5b
commit e1ea71fec7
2 changed files with 334 additions and 53 deletions

View file

@ -49,6 +49,15 @@ struct Settings {
cookies: Vec<String>, cookies: Vec<String>,
iradio_mode: bool, iradio_mode: bool,
keep_alive: bool, keep_alive: bool,
// Notes about souphttpsrc compatibility:
// Internal representation of no proxy is None,
// but externally Some("").
// Default is set from env var 'http_proxy'.
// Prepends http:// if not protocol specified.
proxy: Option<String>,
// Nullable fields that behave normally:
proxy_id: Option<String>,
proxy_pw: Option<String>,
} }
impl Default for Settings { impl Default for Settings {
@ -64,6 +73,38 @@ impl Default for Settings {
cookies: Vec::new(), cookies: Vec::new(),
iradio_mode: DEFAULT_IRADIO_MODE, iradio_mode: DEFAULT_IRADIO_MODE,
keep_alive: DEFAULT_KEEP_ALIVE, keep_alive: DEFAULT_KEEP_ALIVE,
proxy: match proxy_from_str(std::env::var("http_proxy").ok()) {
Ok(a) => a,
Err(_) => None,
},
proxy_id: None,
proxy_pw: None,
}
}
}
fn proxy_from_str(s: Option<String>) -> Result<Option<String>, glib::Error> {
match s {
None => Ok(None),
Some(s) if s.is_empty() => Ok(None),
Some(not_empty_str) => {
// If no protocol specified, prepend http for compatibility
// https://gstreamer.freedesktop.org/documentation/soup/souphttpsrc.html
let url_string = if !not_empty_str.contains("://") {
format!("http://{}", not_empty_str)
} else {
not_empty_str
};
match reqwest::Url::parse(&url_string) {
Ok(url) => {
// this may urlencode and add trailing /
Ok(Some(url.to_string()))
}
Err(err) => Err(glib::Error::new(
gst::URIError::BadUri,
format!("Failed to parse URI '{}': {:?}", url_string, err).as_str(),
)),
}
} }
} }
} }
@ -168,9 +209,51 @@ impl ReqwestHttpSrc {
Ok(()) Ok(())
} }
/// Set a proxy-related property and perform necessary state checks and modifications to client.
fn set_proxy_prop<F>(
&self,
property_name: &str,
desired_value: Option<String>,
prop_memory_location: F,
) -> Result<(), glib::Error>
where
F: Fn(&mut Settings) -> &mut Option<String>,
{
// Proxy props can only be changed when not started.
let state = self.state.lock().unwrap();
if let State::Started { .. } = *state {
return Err(glib::Error::new(
gst::URIError::BadState,
&format!(
"Changing the `{}` property on a started `reqwesthttpsrc` is not supported",
property_name
),
));
}
// Get memory address of specific variable to change.
let mut settings = self.settings.lock().unwrap();
let target_variable = prop_memory_location(&mut settings);
if &desired_value == target_variable {
return Ok(());
}
// If the Proxy is changed we need to throw away the old client since it isn't properly
// configured with a proxy anymore. Since element is not started, an existing client
// without proxy will be used, or a new one with/without proxy will be built on next call
// to ensure_client.
*self.client.lock().unwrap() = None;
*target_variable = desired_value;
Ok(())
}
fn ensure_client( fn ensure_client(
&self, &self,
src: &super::ReqwestHttpSrc, src: &super::ReqwestHttpSrc,
proxy: Option<String>,
proxy_id: Option<String>,
proxy_pw: Option<String>,
) -> Result<ClientContext, gst::ErrorMessage> { ) -> Result<ClientContext, gst::ErrorMessage> {
let mut client_guard = self.client.lock().unwrap(); let mut client_guard = self.client.lock().unwrap();
if let Some(ref client) = *client_guard { if let Some(ref client) = *client_guard {
@ -178,57 +261,70 @@ impl ReqwestHttpSrc {
return Ok(client.clone()); return Ok(client.clone());
} }
let srcpad = src.static_pad("src").unwrap(); // Attempt to acquire an existing client context from another element instance
let mut q = gst::query::Context::new(REQWEST_CLIENT_CONTEXT); // unless using proxy, because proxy is client specific.
if srcpad.peer_query(&mut q) { if proxy.is_none() {
if let Some(context) = q.context_owned() { let srcpad = src.static_pad("src").unwrap();
src.set_context(&context); let mut q = gst::query::Context::new(REQWEST_CLIENT_CONTEXT);
if srcpad.peer_query(&mut q) {
if let Some(context) = q.context_owned() {
src.set_context(&context);
}
} else {
let _ = src.post_message(
gst::message::NeedContext::builder(REQWEST_CLIENT_CONTEXT)
.src(src)
.build(),
);
}
// Hopefully now, self.set_context will have been synchronously called
if let Some(client) = self.external_client.lock().unwrap().clone() {
gst_debug!(CAT, obj: src, "Using shared client");
*client_guard = Some(client.clone());
return Ok(client);
} }
} else {
let _ = src.post_message(
gst::message::NeedContext::builder(REQWEST_CLIENT_CONTEXT)
.src(src)
.build(),
);
} }
if let Some(client) = { let mut builder = Client::builder().cookie_store(true).gzip(true);
// FIXME: Is there a simpler way to ensure the lock is not hold
// after this block anymore?
let external_client = self.external_client.lock().unwrap();
let client = external_client.as_ref().cloned();
drop(external_client);
client
} {
gst_debug!(CAT, obj: src, "Using shared client");
*client_guard = Some(client.clone());
return Ok(client); if let Some(proxy) = &proxy {
// Proxy is url-checked on property set but perhaps this might still fail.
let mut p = reqwest::Proxy::all(proxy).map_err(|err| {
gst::error_msg!(gst::ResourceError::OpenRead, ["Bad proxy URI: {}", err])
})?;
if let Some(proxy_id) = &proxy_id {
let proxy_pw = proxy_pw.as_deref().unwrap_or("");
p = p.basic_auth(proxy_id, proxy_pw);
}
builder = builder.proxy(p);
} }
gst_debug!(CAT, obj: src, "Creating new client"); gst_debug!(CAT, obj: src, "Creating new client");
let client = ClientContext(Arc::new(ClientContextInner { let client = ClientContext(Arc::new(ClientContextInner {
client: Client::builder() client: builder.build().map_err(|err| {
.cookie_store(true) gst::error_msg!(
.gzip(true) gst::ResourceError::OpenRead,
.build() ["Failed to create Client: {}", err]
.map_err(|err| { )
gst::error_msg!( })?,
gst::ResourceError::OpenRead,
["Failed to create Client: {}", err]
)
})?,
})); }));
gst_debug!(CAT, obj: src, "Sharing new client with other elements"); // Share created client with other elements, unless using proxy. Shared client never uses proxy.
let mut context = gst::Context::new(REQWEST_CLIENT_CONTEXT, true); // The alternative would be different contexts for different proxy settings, or one context with a
{ // map from proxy settings to client, but then, how and when to discard those, retaining reuse benefits?
let context = context.get_mut().unwrap(); if proxy.is_none() {
let s = context.structure_mut(); gst_debug!(CAT, obj: src, "Sharing new client with other elements");
s.set("client", &client); let mut context = gst::Context::new(REQWEST_CLIENT_CONTEXT, true);
{
let context = context.get_mut().unwrap();
let s = context.structure_mut();
s.set("client", &client);
}
src.set_context(&context);
let _ = src.post_message(gst::message::HaveContext::builder(context).src(src).build());
} }
src.set_context(&context);
let _ = src.post_message(gst::message::HaveContext::builder(context).src(src).build());
*client_guard = Some(client.clone()); *client_guard = Some(client.clone());
@ -251,12 +347,14 @@ impl ReqwestHttpSrc {
gst_debug!(CAT, obj: src, "Creating new request for {}", uri); gst_debug!(CAT, obj: src, "Creating new request for {}", uri);
let req = {
let client = self.ensure_client(src)?;
client.0.client.get(uri.clone())
};
let settings = self.settings.lock().unwrap().clone(); let settings = self.settings.lock().unwrap().clone();
let req = self
.ensure_client(src, settings.proxy, settings.proxy_id, settings.proxy_pw)?
.0
.client
.get(uri.clone());
let mut headers = HeaderMap::new(); let mut headers = HeaderMap::new();
if settings.keep_alive { if settings.keep_alive {
@ -658,6 +756,27 @@ impl ObjectImpl for ReqwestHttpSrc {
DEFAULT_KEEP_ALIVE, DEFAULT_KEEP_ALIVE,
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
), ),
glib::ParamSpec::new_string(
"proxy",
"Proxy",
"HTTP proxy server URI",
Some(""),
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
),
glib::ParamSpec::new_string(
"proxy-id",
"Proxy-id",
"HTTP proxy URI user id for authentication",
Some(""),
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
),
glib::ParamSpec::new_string(
"proxy-pw",
"Proxy-pw",
"HTTP proxy URI user password for authentication",
Some(""),
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
),
] ]
}); });
@ -671,17 +790,10 @@ impl ObjectImpl for ReqwestHttpSrc {
value: &glib::Value, value: &glib::Value,
pspec: &glib::ParamSpec, pspec: &glib::ParamSpec,
) { ) {
match pspec.name() { let res = match pspec.name() {
"location" => { "location" => {
let location = value.get::<Option<&str>>().expect("type checked upstream"); let location = value.get::<Option<&str>>().expect("type checked upstream");
if let Err(err) = self.set_location(obj, location) { self.set_location(obj, location)
gst_error!(
CAT,
obj: obj,
"Failed to set property `location`: {:?}",
err
);
}
} }
"user-agent" => { "user-agent" => {
let mut settings = self.settings.lock().unwrap(); let mut settings = self.settings.lock().unwrap();
@ -690,52 +802,100 @@ impl ObjectImpl for ReqwestHttpSrc {
.expect("type checked upstream") .expect("type checked upstream")
.unwrap_or_else(|| DEFAULT_USER_AGENT.into()); .unwrap_or_else(|| DEFAULT_USER_AGENT.into());
settings.user_agent = user_agent; settings.user_agent = user_agent;
Ok(())
} }
"is-live" => { "is-live" => {
let is_live = value.get().expect("type checked upstream"); let is_live = value.get().expect("type checked upstream");
obj.set_live(is_live); obj.set_live(is_live);
Ok(())
} }
"user-id" => { "user-id" => {
let mut settings = self.settings.lock().unwrap(); let mut settings = self.settings.lock().unwrap();
let user_id = value.get().expect("type checked upstream"); let user_id = value.get().expect("type checked upstream");
settings.user_id = user_id; settings.user_id = user_id;
Ok(())
} }
"user-pw" => { "user-pw" => {
let mut settings = self.settings.lock().unwrap(); let mut settings = self.settings.lock().unwrap();
let user_pw = value.get().expect("type checked upstream"); let user_pw = value.get().expect("type checked upstream");
settings.user_pw = user_pw; settings.user_pw = user_pw;
Ok(())
} }
"timeout" => { "timeout" => {
let mut settings = self.settings.lock().unwrap(); let mut settings = self.settings.lock().unwrap();
let timeout = value.get().expect("type checked upstream"); let timeout = value.get().expect("type checked upstream");
settings.timeout = timeout; settings.timeout = timeout;
Ok(())
} }
"compress" => { "compress" => {
let mut settings = self.settings.lock().unwrap(); let mut settings = self.settings.lock().unwrap();
let compress = value.get().expect("type checked upstream"); let compress = value.get().expect("type checked upstream");
settings.compress = compress; settings.compress = compress;
Ok(())
} }
"extra-headers" => { "extra-headers" => {
let mut settings = self.settings.lock().unwrap(); let mut settings = self.settings.lock().unwrap();
let extra_headers = value.get().expect("type checked upstream"); let extra_headers = value.get().expect("type checked upstream");
settings.extra_headers = extra_headers; settings.extra_headers = extra_headers;
Ok(())
} }
"cookies" => { "cookies" => {
let mut settings = self.settings.lock().unwrap(); let mut settings = self.settings.lock().unwrap();
settings.cookies = value.get::<Vec<String>>().expect("type checked upstream"); settings.cookies = value.get::<Vec<String>>().expect("type checked upstream");
Ok(())
} }
"iradio-mode" => { "iradio-mode" => {
let mut settings = self.settings.lock().unwrap(); let mut settings = self.settings.lock().unwrap();
let iradio_mode = value.get().expect("type checked upstream"); let iradio_mode = value.get().expect("type checked upstream");
settings.iradio_mode = iradio_mode; settings.iradio_mode = iradio_mode;
Ok(())
} }
"keep-alive" => { "keep-alive" => {
let mut settings = self.settings.lock().unwrap(); let mut settings = self.settings.lock().unwrap();
let keep_alive = value.get().expect("type checked upstream"); let keep_alive = value.get().expect("type checked upstream");
settings.keep_alive = keep_alive; settings.keep_alive = keep_alive;
Ok(())
}
"proxy" => {
let proxy = proxy_from_str(
value
.get::<Option<String>>()
.expect("type checked upstream"),
);
match proxy {
Ok(proxy) => self
.set_proxy_prop(pspec.name(), proxy, move |settings| &mut settings.proxy),
Err(e) => Err(e),
}
}
"proxy-id" => {
let proxy_id = value
.get::<Option<String>>()
.expect("type checked upstream");
self.set_proxy_prop(pspec.name(), proxy_id, move |settings| {
&mut settings.proxy_id
})
}
"proxy-pw" => {
let proxy_pw = value
.get::<Option<String>>()
.expect("type checked upstream");
self.set_proxy_prop(pspec.name(), proxy_pw, move |settings| {
&mut settings.proxy_pw
})
} }
_ => unimplemented!(), _ => unimplemented!(),
}; };
if let Err(err) = res {
gst_error!(
CAT,
obj: obj,
"Failed to set property `{}`: {:?}",
pspec.name(),
err
);
}
} }
fn property(&self, obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { fn property(&self, obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
@ -783,6 +943,17 @@ impl ObjectImpl for ReqwestHttpSrc {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
settings.keep_alive.to_value() settings.keep_alive.to_value()
} }
// return None values as Some("") for compatibility with souphttpsrc
"proxy" => self
.settings
.lock()
.unwrap()
.proxy
.as_deref()
.unwrap_or("")
.to_value(),
"proxy-id" => self.settings.lock().unwrap().proxy_id.to_value(),
"proxy-pw" => self.settings.lock().unwrap().proxy_pw.to_value(),
_ => unimplemented!(), _ => unimplemented!(),
} }
} }

View file

@ -15,6 +15,8 @@ fn init() {
static INIT: Once = Once::new(); static INIT: Once = Once::new();
INIT.call_once(|| { INIT.call_once(|| {
// clear this environment because it affects the default settings
std::env::remove_var("http_proxy");
gst::init().unwrap(); gst::init().unwrap();
gstreqwest::plugin_register_static().expect("reqwesthttpsrc tests"); gstreqwest::plugin_register_static().expect("reqwesthttpsrc tests");
}); });
@ -1183,3 +1185,111 @@ fn test_cookies() {
} }
assert_eq!(num_bytes, 12); assert_eq!(num_bytes, 12);
} }
#[test]
fn test_proxy_prop_souphttpsrc_compatibility() {
init();
fn assert_proxy_set(set_to: Option<&str>, expected: Option<&str>) {
// The same assertions should hold for "souphttpsrc".
let src = gst::ElementFactory::make("reqwesthttpsrc", None).unwrap();
src.set_property("proxy", set_to).unwrap();
assert_eq!(
src.property("proxy")
.unwrap()
.get::<Option<&str>>()
.unwrap(),
expected
);
}
// Test env var proxy.
assert_proxy_set(Some("http://mydomain/"), Some("http://mydomain/"));
// It should prepend http if no protocol specified and add /.
assert_proxy_set(Some("myotherdomain"), Some("http://myotherdomain/"));
// Empty env var should result in "" proxy (meaning None) for compatibility.
assert_proxy_set(Some(""), Some(""));
// It should allow setting this value for proxy for compatibility.
assert_proxy_set(Some("&$"), Some("http://&$/"));
// No env var should result in "" proxy (meaning None) for compatibility.
assert_proxy_set(None, Some(""));
}
#[test]
fn test_proxy() {
init();
// Simplest possible implementation of naive oneshot proxy server?
// Listen on socket before spawning thread (we won't error out with connection refused).
let incoming = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
let proxy_addr = incoming.local_addr().unwrap();
println!("listening on {}, starting proxy server", proxy_addr);
let proxy_server = std::thread::spawn(move || {
use std::io::*;
println!("awaiting connection to proxy server");
let (mut conn, _addr) = incoming.accept().unwrap();
println!("client connected, reading request line");
let mut reader = BufReader::new(conn.try_clone().unwrap());
let mut buf = String::new();
reader.read_line(&mut buf).unwrap();
let parts: Vec<&str> = buf.split(' ').collect();
let url = reqwest::Url::parse(parts[1]).unwrap();
let host = format!(
"{}:{}",
url.host_str().unwrap(),
url.port_or_known_default().unwrap()
);
println!("connecting to target server {}", host);
let mut server_connection = std::net::TcpStream::connect(host).unwrap();
println!("connected to target server, sending modified request line");
server_connection
.write_all(format!("{} {} {}\r\n", parts[0], url.path(), parts[2]).as_bytes())
.unwrap();
println!("sent modified request line, forwarding data in both directions");
let send_join_handle = {
let mut server_connection = server_connection.try_clone().unwrap();
std::thread::spawn(move || {
copy(&mut reader, &mut server_connection).unwrap();
})
};
copy(&mut server_connection, &mut conn).unwrap();
send_join_handle.join().unwrap();
println!("shutting down proxy server");
});
let mut h = Harness::new(
|_req| {
use hyper::{Body, Response};
Response::builder()
.body(Body::from("Hello Proxy World"))
.unwrap()
},
|src| {
src.set_property("proxy", proxy_addr.to_string()).unwrap();
},
);
// Set the HTTP source to Playing so that everything can start.
h.run(|src| {
src.set_state(gst::State::Playing).unwrap();
});
// Wait for a buffer.
let mut num_bytes = 0;
while let Some(buffer) = h.wait_buffer_or_eos() {
num_bytes += buffer.size();
}
assert_eq!(num_bytes, "Hello Proxy World".len());
// Don't leave threads hanging around.
proxy_server.join().unwrap();
}