mirror of
https://github.com/LemmyNet/lemmy.git
synced 2024-11-26 19:31:05 +00:00
background-jobs 0.11 (#1943)
This commit is contained in:
parent
3fea5645f8
commit
1579ee566f
15 changed files with 51 additions and 38 deletions
19
Cargo.lock
generated
19
Cargo.lock
generated
|
@ -421,9 +421,9 @@ dependencies = [
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "background-jobs"
|
name = "background-jobs"
|
||||||
version = "0.9.1"
|
version = "0.11.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "bb7df0fd6abf9d55139d4c9e569c0a8cd271ec265862c41bd215b46b36c52397"
|
checksum = "77f4508c6c5b5cfc6c18d43d0ba6ecda339710206854da9e1c9ac9dfb7e3eb6f"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"background-jobs-actix",
|
"background-jobs-actix",
|
||||||
"background-jobs-core",
|
"background-jobs-core",
|
||||||
|
@ -431,9 +431,9 @@ dependencies = [
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "background-jobs-actix"
|
name = "background-jobs-actix"
|
||||||
version = "0.9.6"
|
version = "0.11.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "38aebb545b0fac45046421993890eb49cc04896a93b85bbfb1b9017decc413f9"
|
checksum = "5dabf6a2204fe034db7910a38f8e2d183fe24eb92abd4c0aaca59f8cacf4e48b"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"actix-rt",
|
"actix-rt",
|
||||||
"anyhow",
|
"anyhow",
|
||||||
|
@ -441,31 +441,32 @@ dependencies = [
|
||||||
"async-trait",
|
"async-trait",
|
||||||
"background-jobs-core",
|
"background-jobs-core",
|
||||||
"chrono",
|
"chrono",
|
||||||
"log",
|
|
||||||
"num_cpus",
|
"num_cpus",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"thiserror",
|
"thiserror",
|
||||||
"tokio",
|
"tokio",
|
||||||
|
"tracing",
|
||||||
|
"tracing-futures",
|
||||||
"uuid",
|
"uuid",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "background-jobs-core"
|
name = "background-jobs-core"
|
||||||
version = "0.9.5"
|
version = "0.11.1"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "ee8604ff89c62ca8eefc1ea2c3f359a53b7930e640fb22bf7890eab13b4640d2"
|
checksum = "174d36b170699ecc13b7513bda9eff6f12cc889eae5d16b792daa3f7b21be452"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"actix-rt",
|
"actix-rt",
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"async-mutex",
|
"async-mutex",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
"chrono",
|
"chrono",
|
||||||
"log",
|
|
||||||
"serde",
|
"serde",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"thiserror",
|
"thiserror",
|
||||||
"tokio",
|
"tracing",
|
||||||
|
"tracing-futures",
|
||||||
"uuid",
|
"uuid",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
|
@ -48,5 +48,5 @@ async-trait = "0.1.51"
|
||||||
captcha = "0.0.8"
|
captcha = "0.0.8"
|
||||||
anyhow = "1.0.44"
|
anyhow = "1.0.44"
|
||||||
thiserror = "1.0.29"
|
thiserror = "1.0.29"
|
||||||
background-jobs = "0.9.1"
|
background-jobs = "0.11.0"
|
||||||
reqwest = { version = "0.11.4", features = ["json"] }
|
reqwest = { version = "0.11.4", features = ["json"] }
|
||||||
|
|
|
@ -43,6 +43,6 @@ sha2 = "0.9.8"
|
||||||
async-trait = "0.1.51"
|
async-trait = "0.1.51"
|
||||||
anyhow = "1.0.44"
|
anyhow = "1.0.44"
|
||||||
thiserror = "1.0.29"
|
thiserror = "1.0.29"
|
||||||
background-jobs = "0.9.1"
|
background-jobs = "0.11.0"
|
||||||
reqwest = { version = "0.11.4", features = ["json"] }
|
reqwest = { version = "0.11.4", features = ["json"] }
|
||||||
webmention = "0.4.0"
|
webmention = "0.4.0"
|
||||||
|
|
|
@ -47,7 +47,7 @@ sha2 = "0.9.8"
|
||||||
async-trait = "0.1.51"
|
async-trait = "0.1.51"
|
||||||
anyhow = "1.0.44"
|
anyhow = "1.0.44"
|
||||||
thiserror = "1.0.29"
|
thiserror = "1.0.29"
|
||||||
background-jobs = "0.9.1"
|
background-jobs = "0.11.0"
|
||||||
reqwest = { version = "0.11.4", features = ["json"] }
|
reqwest = { version = "0.11.4", features = ["json"] }
|
||||||
html2md = "0.2.13"
|
html2md = "0.2.13"
|
||||||
once_cell = "1.8.0"
|
once_cell = "1.8.0"
|
||||||
|
|
|
@ -136,6 +136,7 @@ mod tests {
|
||||||
person::tests::parse_lemmy_person,
|
person::tests::parse_lemmy_person,
|
||||||
tests::{file_to_json_object, init_context},
|
tests::{file_to_json_object, init_context},
|
||||||
};
|
};
|
||||||
|
use lemmy_apub_lib::activity_queue::create_activity_queue;
|
||||||
use lemmy_db_schema::{
|
use lemmy_db_schema::{
|
||||||
source::{
|
source::{
|
||||||
community::Community,
|
community::Community,
|
||||||
|
@ -148,7 +149,8 @@ mod tests {
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
#[serial]
|
#[serial]
|
||||||
async fn test_parse_lemmy_community_moderators() {
|
async fn test_parse_lemmy_community_moderators() {
|
||||||
let context = init_context();
|
let manager = create_activity_queue();
|
||||||
|
let context = init_context(manager.queue_handle().clone());
|
||||||
let community = parse_lemmy_community(&context).await;
|
let community = parse_lemmy_community(&context).await;
|
||||||
let community_id = community.id;
|
let community_id = community.id;
|
||||||
|
|
||||||
|
|
|
@ -214,6 +214,7 @@ pub(crate) mod tests {
|
||||||
tests::{file_to_json_object, init_context},
|
tests::{file_to_json_object, init_context},
|
||||||
};
|
};
|
||||||
use assert_json_diff::assert_json_include;
|
use assert_json_diff::assert_json_include;
|
||||||
|
use lemmy_apub_lib::activity_queue::create_activity_queue;
|
||||||
use serial_test::serial;
|
use serial_test::serial;
|
||||||
|
|
||||||
async fn prepare_comment_test(
|
async fn prepare_comment_test(
|
||||||
|
@ -241,7 +242,8 @@ pub(crate) mod tests {
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
#[serial]
|
#[serial]
|
||||||
pub(crate) async fn test_parse_lemmy_comment() {
|
pub(crate) async fn test_parse_lemmy_comment() {
|
||||||
let context = init_context();
|
let manager = create_activity_queue();
|
||||||
|
let context = init_context(manager.queue_handle().clone());
|
||||||
let url = Url::parse("https://enterprise.lemmy.ml/comment/38741").unwrap();
|
let url = Url::parse("https://enterprise.lemmy.ml/comment/38741").unwrap();
|
||||||
let data = prepare_comment_test(&url, &context).await;
|
let data = prepare_comment_test(&url, &context).await;
|
||||||
|
|
||||||
|
@ -270,7 +272,8 @@ pub(crate) mod tests {
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
#[serial]
|
#[serial]
|
||||||
async fn test_parse_pleroma_comment() {
|
async fn test_parse_pleroma_comment() {
|
||||||
let context = init_context();
|
let manager = create_activity_queue();
|
||||||
|
let context = init_context(manager.queue_handle().clone());
|
||||||
let url = Url::parse("https://enterprise.lemmy.ml/comment/38741").unwrap();
|
let url = Url::parse("https://enterprise.lemmy.ml/comment/38741").unwrap();
|
||||||
let data = prepare_comment_test(&url, &context).await;
|
let data = prepare_comment_test(&url, &context).await;
|
||||||
|
|
||||||
|
|
|
@ -214,6 +214,7 @@ impl ApubCommunity {
|
||||||
pub(crate) mod tests {
|
pub(crate) mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::objects::tests::{file_to_json_object, init_context};
|
use crate::objects::tests::{file_to_json_object, init_context};
|
||||||
|
use lemmy_apub_lib::activity_queue::create_activity_queue;
|
||||||
use lemmy_db_schema::traits::Crud;
|
use lemmy_db_schema::traits::Crud;
|
||||||
use serial_test::serial;
|
use serial_test::serial;
|
||||||
|
|
||||||
|
@ -240,7 +241,8 @@ pub(crate) mod tests {
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
#[serial]
|
#[serial]
|
||||||
async fn test_parse_lemmy_community() {
|
async fn test_parse_lemmy_community() {
|
||||||
let context = init_context();
|
let manager = create_activity_queue();
|
||||||
|
let context = init_context(manager.queue_handle().clone());
|
||||||
let community = parse_lemmy_community(&context).await;
|
let community = parse_lemmy_community(&context).await;
|
||||||
|
|
||||||
assert_eq!(community.title, "Ten Forward");
|
assert_eq!(community.title, "Ten Forward");
|
||||||
|
|
|
@ -21,11 +21,11 @@ pub(crate) fn get_summary_from_string_or_source(
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
pub(crate) mod tests {
|
pub(crate) mod tests {
|
||||||
use actix::Actor;
|
use actix::Actor;
|
||||||
|
use background_jobs::QueueHandle;
|
||||||
use diesel::{
|
use diesel::{
|
||||||
r2d2::{ConnectionManager, Pool},
|
r2d2::{ConnectionManager, Pool},
|
||||||
PgConnection,
|
PgConnection,
|
||||||
};
|
};
|
||||||
use lemmy_apub_lib::activity_queue::create_activity_queue;
|
|
||||||
use lemmy_db_schema::{
|
use lemmy_db_schema::{
|
||||||
establish_unpooled_connection,
|
establish_unpooled_connection,
|
||||||
get_database_url_from_env,
|
get_database_url_from_env,
|
||||||
|
@ -45,7 +45,7 @@ pub(crate) mod tests {
|
||||||
|
|
||||||
// TODO: would be nice if we didnt have to use a full context for tests.
|
// TODO: would be nice if we didnt have to use a full context for tests.
|
||||||
// or at least write a helper function so this code is shared with main.rs
|
// or at least write a helper function so this code is shared with main.rs
|
||||||
pub(crate) fn init_context() -> LemmyContext {
|
pub(crate) fn init_context(activity_queue: QueueHandle) -> LemmyContext {
|
||||||
// call this to run migrations
|
// call this to run migrations
|
||||||
establish_unpooled_connection();
|
establish_unpooled_connection();
|
||||||
let settings = Settings::init().unwrap();
|
let settings = Settings::init().unwrap();
|
||||||
|
@ -57,7 +57,6 @@ pub(crate) mod tests {
|
||||||
.user_agent(build_user_agent(&settings))
|
.user_agent(build_user_agent(&settings))
|
||||||
.build()
|
.build()
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let activity_queue = create_activity_queue();
|
|
||||||
let secret = Secret {
|
let secret = Secret {
|
||||||
id: 0,
|
id: 0,
|
||||||
jwt_secret: "".to_string(),
|
jwt_secret: "".to_string(),
|
||||||
|
|
|
@ -198,6 +198,7 @@ impl ActorType for ApubPerson {
|
||||||
pub(crate) mod tests {
|
pub(crate) mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::objects::tests::{file_to_json_object, init_context};
|
use crate::objects::tests::{file_to_json_object, init_context};
|
||||||
|
use lemmy_apub_lib::activity_queue::create_activity_queue;
|
||||||
use lemmy_db_schema::traits::Crud;
|
use lemmy_db_schema::traits::Crud;
|
||||||
use serial_test::serial;
|
use serial_test::serial;
|
||||||
|
|
||||||
|
@ -218,7 +219,8 @@ pub(crate) mod tests {
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
#[serial]
|
#[serial]
|
||||||
async fn test_parse_lemmy_person() {
|
async fn test_parse_lemmy_person() {
|
||||||
let context = init_context();
|
let manager = create_activity_queue();
|
||||||
|
let context = init_context(manager.queue_handle().clone());
|
||||||
let person = parse_lemmy_person(&context).await;
|
let person = parse_lemmy_person(&context).await;
|
||||||
|
|
||||||
assert_eq!(person.display_name, Some("Jean-Luc Picard".to_string()));
|
assert_eq!(person.display_name, Some("Jean-Luc Picard".to_string()));
|
||||||
|
@ -231,7 +233,8 @@ pub(crate) mod tests {
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
#[serial]
|
#[serial]
|
||||||
async fn test_parse_pleroma_person() {
|
async fn test_parse_pleroma_person() {
|
||||||
let context = init_context();
|
let manager = create_activity_queue();
|
||||||
|
let context = init_context(manager.queue_handle().clone());
|
||||||
let json = file_to_json_object("assets/pleroma/objects/person.json");
|
let json = file_to_json_object("assets/pleroma/objects/person.json");
|
||||||
let url = Url::parse("https://queer.hacktivis.me/users/lanodan").unwrap();
|
let url = Url::parse("https://queer.hacktivis.me/users/lanodan").unwrap();
|
||||||
let mut request_counter = 0;
|
let mut request_counter = 0;
|
||||||
|
|
|
@ -205,12 +205,14 @@ mod tests {
|
||||||
post::ApubPost,
|
post::ApubPost,
|
||||||
tests::{file_to_json_object, init_context},
|
tests::{file_to_json_object, init_context},
|
||||||
};
|
};
|
||||||
|
use lemmy_apub_lib::activity_queue::create_activity_queue;
|
||||||
use serial_test::serial;
|
use serial_test::serial;
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
#[serial]
|
#[serial]
|
||||||
async fn test_parse_lemmy_post() {
|
async fn test_parse_lemmy_post() {
|
||||||
let context = init_context();
|
let manager = create_activity_queue();
|
||||||
|
let context = init_context(manager.queue_handle().clone());
|
||||||
let community = parse_lemmy_community(&context).await;
|
let community = parse_lemmy_community(&context).await;
|
||||||
let person = parse_lemmy_person(&context).await;
|
let person = parse_lemmy_person(&context).await;
|
||||||
|
|
||||||
|
|
|
@ -162,6 +162,7 @@ mod tests {
|
||||||
tests::{file_to_json_object, init_context},
|
tests::{file_to_json_object, init_context},
|
||||||
};
|
};
|
||||||
use assert_json_diff::assert_json_include;
|
use assert_json_diff::assert_json_include;
|
||||||
|
use lemmy_apub_lib::activity_queue::create_activity_queue;
|
||||||
use serial_test::serial;
|
use serial_test::serial;
|
||||||
|
|
||||||
async fn prepare_comment_test(url: &Url, context: &LemmyContext) -> (ApubPerson, ApubPerson) {
|
async fn prepare_comment_test(url: &Url, context: &LemmyContext) -> (ApubPerson, ApubPerson) {
|
||||||
|
@ -191,7 +192,8 @@ mod tests {
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
#[serial]
|
#[serial]
|
||||||
async fn test_parse_lemmy_pm() {
|
async fn test_parse_lemmy_pm() {
|
||||||
let context = init_context();
|
let manager = create_activity_queue();
|
||||||
|
let context = init_context(manager.queue_handle().clone());
|
||||||
let url = Url::parse("https://enterprise.lemmy.ml/private_message/1621").unwrap();
|
let url = Url::parse("https://enterprise.lemmy.ml/private_message/1621").unwrap();
|
||||||
let data = prepare_comment_test(&url, &context).await;
|
let data = prepare_comment_test(&url, &context).await;
|
||||||
let json: ChatMessage = file_to_json_object("assets/lemmy/objects/chat_message.json");
|
let json: ChatMessage = file_to_json_object("assets/lemmy/objects/chat_message.json");
|
||||||
|
@ -218,7 +220,8 @@ mod tests {
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
#[serial]
|
#[serial]
|
||||||
async fn test_parse_pleroma_pm() {
|
async fn test_parse_pleroma_pm() {
|
||||||
let context = init_context();
|
let manager = create_activity_queue();
|
||||||
|
let context = init_context(manager.queue_handle().clone());
|
||||||
let url = Url::parse("https://enterprise.lemmy.ml/private_message/1621").unwrap();
|
let url = Url::parse("https://enterprise.lemmy.ml/private_message/1621").unwrap();
|
||||||
let data = prepare_comment_test(&url, &context).await;
|
let data = prepare_comment_test(&url, &context).await;
|
||||||
let pleroma_url = Url::parse("https://queer.hacktivis.me/objects/2").unwrap();
|
let pleroma_url = Url::parse("https://queer.hacktivis.me/objects/2").unwrap();
|
||||||
|
|
|
@ -26,5 +26,5 @@ sha2 = "0.9.8"
|
||||||
actix-web = { version = "4.0.0-beta.9", default-features = false }
|
actix-web = { version = "4.0.0-beta.9", default-features = false }
|
||||||
http-signature-normalization-actix = { version = "0.5.0-beta.10", default-features = false, features = ["server", "sha-2"] }
|
http-signature-normalization-actix = { version = "0.5.0-beta.10", default-features = false, features = ["server", "sha-2"] }
|
||||||
http-signature-normalization-reqwest = { version = "0.2.0", default-features = false, features = ["sha-2"] }
|
http-signature-normalization-reqwest = { version = "0.2.0", default-features = false, features = ["sha-2"] }
|
||||||
background-jobs = "0.9.1"
|
background-jobs = "0.11.0"
|
||||||
diesel = "1.4.8"
|
diesel = "1.4.8"
|
||||||
|
|
|
@ -1,10 +1,10 @@
|
||||||
use crate::{signatures::sign_and_send, traits::ActorType};
|
use crate::{signatures::sign_and_send, traits::ActorType};
|
||||||
use anyhow::{anyhow, Context, Error};
|
use anyhow::{anyhow, Context, Error};
|
||||||
use background_jobs::{
|
use background_jobs::{
|
||||||
create_server,
|
|
||||||
memory_storage::Storage,
|
memory_storage::Storage,
|
||||||
ActixJob,
|
ActixJob,
|
||||||
Backoff,
|
Backoff,
|
||||||
|
Manager,
|
||||||
MaxRetries,
|
MaxRetries,
|
||||||
QueueHandle,
|
QueueHandle,
|
||||||
WorkerConfig,
|
WorkerConfig,
|
||||||
|
@ -35,7 +35,7 @@ pub async fn send_activity(
|
||||||
if env::var("APUB_TESTING_SEND_SYNC").is_ok() {
|
if env::var("APUB_TESTING_SEND_SYNC").is_ok() {
|
||||||
do_send(message, client).await?;
|
do_send(message, client).await?;
|
||||||
} else {
|
} else {
|
||||||
activity_queue.queue::<SendActivityTask>(message)?;
|
activity_queue.queue::<SendActivityTask>(message).await?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -101,19 +101,13 @@ async fn do_send(task: SendActivityTask, client: &Client) -> Result<(), Error> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn create_activity_queue() -> QueueHandle {
|
pub fn create_activity_queue() -> Manager {
|
||||||
// Start the application server. This guards access to to the jobs store
|
|
||||||
let queue_handle = create_server(Storage::new());
|
|
||||||
let arbiter = actix_web::rt::Arbiter::new();
|
|
||||||
|
|
||||||
// Configure and start our workers
|
// Configure and start our workers
|
||||||
WorkerConfig::new(|| MyState {
|
WorkerConfig::new_managed(Storage::new(), |_| MyState {
|
||||||
client: Client::default(),
|
client: Client::default(),
|
||||||
})
|
})
|
||||||
.register::<SendActivityTask>()
|
.register::<SendActivityTask>()
|
||||||
.start_in_arbiter(&arbiter, queue_handle.clone());
|
.start()
|
||||||
|
|
||||||
queue_handle
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
|
|
|
@ -26,7 +26,7 @@ serde_json = { version = "1.0.68", features = ["preserve_order"] }
|
||||||
actix = "0.12.0"
|
actix = "0.12.0"
|
||||||
anyhow = "1.0.44"
|
anyhow = "1.0.44"
|
||||||
diesel = "1.4.8"
|
diesel = "1.4.8"
|
||||||
background-jobs = "0.9.1"
|
background-jobs = "0.11.0"
|
||||||
tokio = "1.12.0"
|
tokio = "1.12.0"
|
||||||
strum = "0.21.0"
|
strum = "0.21.0"
|
||||||
strum_macros = "0.21.1"
|
strum_macros = "0.21.1"
|
||||||
|
|
|
@ -94,7 +94,9 @@ async fn main() -> Result<(), LemmyError> {
|
||||||
.user_agent(build_user_agent(&settings))
|
.user_agent(build_user_agent(&settings))
|
||||||
.build()?;
|
.build()?;
|
||||||
|
|
||||||
let activity_queue = create_activity_queue();
|
let queue_manager = create_activity_queue();
|
||||||
|
|
||||||
|
let activity_queue = queue_manager.queue_handle().clone();
|
||||||
|
|
||||||
let chat_server = ChatServer::startup(
|
let chat_server = ChatServer::startup(
|
||||||
pool.clone(),
|
pool.clone(),
|
||||||
|
@ -135,5 +137,7 @@ async fn main() -> Result<(), LemmyError> {
|
||||||
.run()
|
.run()
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
|
drop(queue_manager);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue