move ClientBuilder into thread, since we cannot Copy it

This commit is contained in:
Igor Galić 2020-01-29 13:15:39 +01:00
parent a3f165f9f4
commit 3472a58299
No known key found for this signature in database
GPG key ID: ACFEFF7F6A123A86
2 changed files with 13 additions and 11 deletions

View file

@ -297,7 +297,7 @@ pub trait FromId<C>: Sized {
)
.send()
.map_err(|_| (None, InboxError::DerefError))
.and_then(|mut r| {
.and_then(|r| {
let json: serde_json::Value = r
.json()
.map_err(|_| (None, InboxError::InvalidObject(None)))?;

View file

@ -3,12 +3,11 @@ use array_tool::vec::Uniq;
use reqwest::ClientBuilder;
use rocket::{
http::Status,
request::{FromRequestFuture, FromRequestAsync, Request},
request::{FromRequestAsync, FromRequestFuture, Request},
response::{Responder, Response, ResultFuture},
Outcome,
};
use serde_json;
use tokio::prelude::*;
use self::sign::Signable;
@ -66,7 +65,8 @@ impl<T> ActivityStream<T> {
impl<'r, O: Object + Send + 'r> Responder<'r> for ActivityStream<O> {
fn respond_to(self, request: &'r Request<'_>) -> ResultFuture<'r> {
Box::pin(async move {
let mut json = serde_json::to_value(&self.0).map_err(|_| Status::InternalServerError)?;
let mut json =
serde_json::to_value(&self.0).map_err(|_| Status::InternalServerError)?;
json["@context"] = context();
let result = serde_json::to_string(&json).map_err(rocket::response::Debug);
match result.respond_to(request).await {
@ -95,7 +95,8 @@ impl<'a, 'r> FromRequestAsync<'a, 'r> for ApRequest {
.map(|header| {
header
.split(',')
.map(|ct| match ct.trim() {
.map(|ct| {
match ct.trim() {
// bool for Forward: true if found a valid Content-Type for Plume first (HTML),
// false otherwise
"application/ld+json; profile=\"https://w3.org/ns/activitystreams\""
@ -104,6 +105,7 @@ impl<'a, 'r> FromRequestAsync<'a, 'r> for ApRequest {
| "application/ld+json" => Outcome::Success(ApRequest),
"text/html" => Outcome::Forward(true),
_ => Outcome::Forward(false),
}
})
.fold(Outcome::Forward(false), |out, ct| {
if out.clone().forwarded().unwrap_or_else(|| out.is_success()) {
@ -141,19 +143,19 @@ where
.sign(sender)
.expect("activity_pub::broadcast: signature error");
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.threaded_scheduler()
.build()
.expect("Error while initializing tokio runtime for federation");
let client = ClientBuilder::new()
.connect_timeout(std::time::Duration::from_secs(5))
.build()
.expect("Can't build client");
for inbox in boxes {
let body = signed.to_string();
let mut headers = request::headers();
headers.insert("Digest", request::Digest::digest(&body));
rt.spawn(async move{
rt.spawn(async move {
let client = ClientBuilder::new()
.connect_timeout(std::time::Duration::from_secs(5))
.build()
.expect("Can't build client");
client
.post(&inbox)
.headers(headers.clone())