Add subscribers_count field to Account object
This commit is contained in:
parent
5e8a95c646
commit
7600efccb5
12 changed files with 85 additions and 18 deletions
|
@ -962,6 +962,15 @@ components:
|
||||||
type: array
|
type: array
|
||||||
items:
|
items:
|
||||||
$ref: '#/components/schemas/Field'
|
$ref: '#/components/schemas/Field'
|
||||||
|
followers_count:
|
||||||
|
description: The reported followers of this profile.
|
||||||
|
type: number
|
||||||
|
following_count:
|
||||||
|
description: The reported follows of this profile.
|
||||||
|
type: number
|
||||||
|
subscribers_count:
|
||||||
|
description: The reported subscribers of this profile.
|
||||||
|
type: number
|
||||||
Attachment:
|
Attachment:
|
||||||
type: object
|
type: object
|
||||||
properties:
|
properties:
|
||||||
|
|
|
@ -0,0 +1,5 @@
|
||||||
|
ALTER TABLE actor_profile ADD COLUMN subscriber_count INTEGER NOT NULL CHECK (subscriber_count >= 0) DEFAULT 0;
|
||||||
|
UPDATE actor_profile SET subscriber_count = (
|
||||||
|
SELECT count(*) FROM relationship WHERE relationship.target_id = actor_profile.id
|
||||||
|
AND relationship.relationship_type = 3
|
||||||
|
);
|
|
@ -12,6 +12,7 @@ CREATE TABLE actor_profile (
|
||||||
extra_fields JSONB NOT NULL DEFAULT '[]',
|
extra_fields JSONB NOT NULL DEFAULT '[]',
|
||||||
follower_count INTEGER NOT NULL CHECK (follower_count >= 0) DEFAULT 0,
|
follower_count INTEGER NOT NULL CHECK (follower_count >= 0) DEFAULT 0,
|
||||||
following_count INTEGER NOT NULL CHECK (following_count >= 0) DEFAULT 0,
|
following_count INTEGER NOT NULL CHECK (following_count >= 0) DEFAULT 0,
|
||||||
|
subscriber_count INTEGER NOT NULL CHECK (subscriber_count >= 0) DEFAULT 0,
|
||||||
post_count INTEGER NOT NULL CHECK (post_count >= 0) DEFAULT 0,
|
post_count INTEGER NOT NULL CHECK (post_count >= 0) DEFAULT 0,
|
||||||
actor_json JSONB,
|
actor_json JSONB,
|
||||||
actor_id VARCHAR(200) UNIQUE GENERATED ALWAYS AS (actor_json ->> 'id') STORED,
|
actor_id VARCHAR(200) UNIQUE GENERATED ALWAYS AS (actor_json ->> 'id') STORED,
|
||||||
|
|
|
@ -16,7 +16,7 @@ use super::HandlerResult;
|
||||||
|
|
||||||
pub async fn handle_add(
|
pub async fn handle_add(
|
||||||
config: &Config,
|
config: &Config,
|
||||||
db_client: &impl GenericClient,
|
db_client: &mut impl GenericClient,
|
||||||
activity: Activity,
|
activity: Activity,
|
||||||
) -> HandlerResult {
|
) -> HandlerResult {
|
||||||
let actor_profile = get_profile_by_actor_id(
|
let actor_profile = get_profile_by_actor_id(
|
||||||
|
|
|
@ -19,7 +19,7 @@ use super::HandlerResult;
|
||||||
|
|
||||||
pub async fn handle_remove(
|
pub async fn handle_remove(
|
||||||
config: &Config,
|
config: &Config,
|
||||||
db_client: &impl GenericClient,
|
db_client: &mut impl GenericClient,
|
||||||
activity: Activity,
|
activity: Activity,
|
||||||
) -> HandlerResult {
|
) -> HandlerResult {
|
||||||
let actor_profile = get_profile_by_actor_id(
|
let actor_profile = get_profile_by_actor_id(
|
||||||
|
|
|
@ -299,7 +299,7 @@ impl ResetSubscriptions {
|
||||||
pub async fn execute(
|
pub async fn execute(
|
||||||
&self,
|
&self,
|
||||||
_config: &Config,
|
_config: &Config,
|
||||||
db_client: &impl GenericClient,
|
db_client: &mut impl GenericClient,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
reset_subscriptions(db_client, self.ethereum_contract_replaced).await?;
|
reset_subscriptions(db_client, self.ethereum_contract_replaced).await?;
|
||||||
println!("subscriptions deleted");
|
println!("subscriptions deleted");
|
||||||
|
|
|
@ -260,7 +260,7 @@ pub async fn update_expired_subscriptions(
|
||||||
instance: &Instance,
|
instance: &Instance,
|
||||||
db_pool: &Pool,
|
db_pool: &Pool,
|
||||||
) -> Result<(), EthereumError> {
|
) -> Result<(), EthereumError> {
|
||||||
let db_client = &**get_database_client(db_pool).await?;
|
let db_client = &mut **get_database_client(db_pool).await?;
|
||||||
for subscription in get_expired_subscriptions(db_client).await? {
|
for subscription in get_expired_subscriptions(db_client).await? {
|
||||||
// Remove relationship
|
// Remove relationship
|
||||||
unsubscribe(db_client, &subscription.sender_id, &subscription.recipient_id).await?;
|
unsubscribe(db_client, &subscription.sender_id, &subscription.recipient_id).await?;
|
||||||
|
|
|
@ -61,6 +61,7 @@ pub struct Account {
|
||||||
pub fields: Vec<AccountField>,
|
pub fields: Vec<AccountField>,
|
||||||
pub followers_count: i32,
|
pub followers_count: i32,
|
||||||
pub following_count: i32,
|
pub following_count: i32,
|
||||||
|
pub subscribers_count: i32,
|
||||||
pub statuses_count: i32,
|
pub statuses_count: i32,
|
||||||
|
|
||||||
pub source: Option<Source>,
|
pub source: Option<Source>,
|
||||||
|
@ -136,6 +137,7 @@ impl Account {
|
||||||
fields: extra_fields,
|
fields: extra_fields,
|
||||||
followers_count: profile.follower_count,
|
followers_count: profile.follower_count,
|
||||||
following_count: profile.following_count,
|
following_count: profile.following_count,
|
||||||
|
subscribers_count: profile.subscriber_count,
|
||||||
statuses_count: profile.post_count,
|
statuses_count: profile.post_count,
|
||||||
source: None,
|
source: None,
|
||||||
}
|
}
|
||||||
|
|
|
@ -291,6 +291,18 @@ pub async fn delete_profile(
|
||||||
",
|
",
|
||||||
&[&profile_id, &RelationshipType::Follow],
|
&[&profile_id, &RelationshipType::Follow],
|
||||||
).await?;
|
).await?;
|
||||||
|
transaction.execute(
|
||||||
|
"
|
||||||
|
UPDATE actor_profile
|
||||||
|
SET subscriber_count = subscriber_count - 1
|
||||||
|
FROM relationship
|
||||||
|
WHERE
|
||||||
|
relationship.source_id = $1
|
||||||
|
AND relationship.target_id = actor_profile.id
|
||||||
|
AND relationship.relationship_type = $2
|
||||||
|
",
|
||||||
|
&[&profile_id, &RelationshipType::Subscription],
|
||||||
|
).await?;
|
||||||
transaction.execute(
|
transaction.execute(
|
||||||
"
|
"
|
||||||
UPDATE post
|
UPDATE post
|
||||||
|
@ -500,6 +512,25 @@ pub async fn update_following_count(
|
||||||
Ok(profile)
|
Ok(profile)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn update_subscriber_count(
|
||||||
|
db_client: &impl GenericClient,
|
||||||
|
profile_id: &Uuid,
|
||||||
|
change: i32,
|
||||||
|
) -> Result<DbActorProfile, DatabaseError> {
|
||||||
|
let maybe_row = db_client.query_opt(
|
||||||
|
"
|
||||||
|
UPDATE actor_profile
|
||||||
|
SET subscriber_count = subscriber_count + $1
|
||||||
|
WHERE id = $2
|
||||||
|
RETURNING actor_profile
|
||||||
|
",
|
||||||
|
&[&change, &profile_id],
|
||||||
|
).await?;
|
||||||
|
let row = maybe_row.ok_or(DatabaseError::NotFound("profile"))?;
|
||||||
|
let profile = row.try_get("actor_profile")?;
|
||||||
|
Ok(profile)
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn update_post_count(
|
pub async fn update_post_count(
|
||||||
db_client: &impl GenericClient,
|
db_client: &impl GenericClient,
|
||||||
profile_id: &Uuid,
|
profile_id: &Uuid,
|
||||||
|
|
|
@ -232,6 +232,7 @@ pub struct DbActorProfile {
|
||||||
pub extra_fields: ExtraFields,
|
pub extra_fields: ExtraFields,
|
||||||
pub follower_count: i32,
|
pub follower_count: i32,
|
||||||
pub following_count: i32,
|
pub following_count: i32,
|
||||||
|
pub subscriber_count: i32,
|
||||||
pub post_count: i32,
|
pub post_count: i32,
|
||||||
pub created_at: DateTime<Utc>,
|
pub created_at: DateTime<Utc>,
|
||||||
pub updated_at: DateTime<Utc>,
|
pub updated_at: DateTime<Utc>,
|
||||||
|
@ -306,6 +307,7 @@ impl Default for DbActorProfile {
|
||||||
extra_fields: ExtraFields(vec![]),
|
extra_fields: ExtraFields(vec![]),
|
||||||
follower_count: 0,
|
follower_count: 0,
|
||||||
following_count: 0,
|
following_count: 0,
|
||||||
|
subscriber_count: 0,
|
||||||
post_count: 0,
|
post_count: 0,
|
||||||
created_at: now,
|
created_at: now,
|
||||||
updated_at: now,
|
updated_at: now,
|
||||||
|
|
|
@ -9,6 +9,7 @@ use crate::models::notifications::queries::create_follow_notification;
|
||||||
use crate::models::profiles::queries::{
|
use crate::models::profiles::queries::{
|
||||||
update_follower_count,
|
update_follower_count,
|
||||||
update_following_count,
|
update_following_count,
|
||||||
|
update_subscriber_count,
|
||||||
};
|
};
|
||||||
use crate::models::profiles::types::DbActorProfile;
|
use crate::models::profiles::types::DbActorProfile;
|
||||||
use crate::utils::id::new_uuid;
|
use crate::utils::id::new_uuid;
|
||||||
|
@ -354,26 +355,30 @@ pub async fn get_following_paginated(
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn subscribe(
|
pub async fn subscribe(
|
||||||
db_client: &impl GenericClient,
|
db_client: &mut impl GenericClient,
|
||||||
source_id: &Uuid,
|
source_id: &Uuid,
|
||||||
target_id: &Uuid,
|
target_id: &Uuid,
|
||||||
) -> Result<(), DatabaseError> {
|
) -> Result<(), DatabaseError> {
|
||||||
db_client.execute(
|
let transaction = db_client.transaction().await?;
|
||||||
|
transaction.execute(
|
||||||
"
|
"
|
||||||
INSERT INTO relationship (source_id, target_id, relationship_type)
|
INSERT INTO relationship (source_id, target_id, relationship_type)
|
||||||
VALUES ($1, $2, $3)
|
VALUES ($1, $2, $3)
|
||||||
",
|
",
|
||||||
&[&source_id, &target_id, &RelationshipType::Subscription],
|
&[&source_id, &target_id, &RelationshipType::Subscription],
|
||||||
).await.map_err(catch_unique_violation("relationship"))?;
|
).await.map_err(catch_unique_violation("relationship"))?;
|
||||||
|
update_subscriber_count(&transaction, target_id, 1).await?;
|
||||||
|
transaction.commit().await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn subscribe_opt(
|
pub async fn subscribe_opt(
|
||||||
db_client: &impl GenericClient,
|
db_client: &mut impl GenericClient,
|
||||||
source_id: &Uuid,
|
source_id: &Uuid,
|
||||||
target_id: &Uuid,
|
target_id: &Uuid,
|
||||||
) -> Result<(), DatabaseError> {
|
) -> Result<(), DatabaseError> {
|
||||||
db_client.execute(
|
let transaction = db_client.transaction().await?;
|
||||||
|
let inserted_count = transaction.execute(
|
||||||
"
|
"
|
||||||
INSERT INTO relationship (source_id, target_id, relationship_type)
|
INSERT INTO relationship (source_id, target_id, relationship_type)
|
||||||
VALUES ($1, $2, $3)
|
VALUES ($1, $2, $3)
|
||||||
|
@ -381,15 +386,20 @@ pub async fn subscribe_opt(
|
||||||
",
|
",
|
||||||
&[&source_id, &target_id, &RelationshipType::Subscription],
|
&[&source_id, &target_id, &RelationshipType::Subscription],
|
||||||
).await?;
|
).await?;
|
||||||
|
if inserted_count > 0 {
|
||||||
|
update_subscriber_count(&transaction, target_id, 1).await?;
|
||||||
|
};
|
||||||
|
transaction.commit().await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn unsubscribe(
|
pub async fn unsubscribe(
|
||||||
db_client: &impl GenericClient,
|
db_client: &mut impl GenericClient,
|
||||||
source_id: &Uuid,
|
source_id: &Uuid,
|
||||||
target_id: &Uuid,
|
target_id: &Uuid,
|
||||||
) -> Result<(), DatabaseError> {
|
) -> Result<(), DatabaseError> {
|
||||||
let deleted_count = db_client.execute(
|
let transaction = db_client.transaction().await?;
|
||||||
|
let deleted_count = transaction.execute(
|
||||||
"
|
"
|
||||||
DELETE FROM relationship
|
DELETE FROM relationship
|
||||||
WHERE
|
WHERE
|
||||||
|
@ -401,6 +411,8 @@ pub async fn unsubscribe(
|
||||||
if deleted_count == 0 {
|
if deleted_count == 0 {
|
||||||
return Err(DatabaseError::NotFound("relationship"));
|
return Err(DatabaseError::NotFound("relationship"));
|
||||||
};
|
};
|
||||||
|
update_subscriber_count(&transaction, target_id, -1).await?;
|
||||||
|
transaction.commit().await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -22,7 +22,7 @@ pub async fn create_subscription(
|
||||||
updated_at: &DateTime<Utc>,
|
updated_at: &DateTime<Utc>,
|
||||||
) -> Result<(), DatabaseError> {
|
) -> Result<(), DatabaseError> {
|
||||||
assert!(chain_id.is_ethereum() == sender_address.is_some());
|
assert!(chain_id.is_ethereum() == sender_address.is_some());
|
||||||
let transaction = db_client.transaction().await?;
|
let mut transaction = db_client.transaction().await?;
|
||||||
transaction.execute(
|
transaction.execute(
|
||||||
"
|
"
|
||||||
INSERT INTO subscription (
|
INSERT INTO subscription (
|
||||||
|
@ -44,7 +44,7 @@ pub async fn create_subscription(
|
||||||
&updated_at,
|
&updated_at,
|
||||||
],
|
],
|
||||||
).await.map_err(catch_unique_violation("subscription"))?;
|
).await.map_err(catch_unique_violation("subscription"))?;
|
||||||
subscribe(&transaction, sender_id, recipient_id).await?;
|
subscribe(&mut transaction, sender_id, recipient_id).await?;
|
||||||
transaction.commit().await?;
|
transaction.commit().await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -56,7 +56,7 @@ pub async fn update_subscription(
|
||||||
expires_at: &DateTime<Utc>,
|
expires_at: &DateTime<Utc>,
|
||||||
updated_at: &DateTime<Utc>,
|
updated_at: &DateTime<Utc>,
|
||||||
) -> Result<(), DatabaseError> {
|
) -> Result<(), DatabaseError> {
|
||||||
let transaction = db_client.transaction().await?;
|
let mut transaction = db_client.transaction().await?;
|
||||||
let maybe_row = transaction.query_opt(
|
let maybe_row = transaction.query_opt(
|
||||||
"
|
"
|
||||||
UPDATE subscription
|
UPDATE subscription
|
||||||
|
@ -78,7 +78,7 @@ pub async fn update_subscription(
|
||||||
let sender_id: Uuid = row.try_get("sender_id")?;
|
let sender_id: Uuid = row.try_get("sender_id")?;
|
||||||
let recipient_id: Uuid = row.try_get("recipient_id")?;
|
let recipient_id: Uuid = row.try_get("recipient_id")?;
|
||||||
if *expires_at > Utc::now() {
|
if *expires_at > Utc::now() {
|
||||||
subscribe_opt(&transaction, &sender_id, &recipient_id).await?;
|
subscribe_opt(&mut transaction, &sender_id, &recipient_id).await?;
|
||||||
};
|
};
|
||||||
transaction.commit().await?;
|
transaction.commit().await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -152,13 +152,14 @@ pub async fn get_incoming_subscriptions(
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn reset_subscriptions(
|
pub async fn reset_subscriptions(
|
||||||
db_client: &impl GenericClient,
|
db_client: &mut impl GenericClient,
|
||||||
ethereum_contract_replaced: bool,
|
ethereum_contract_replaced: bool,
|
||||||
) -> Result<(), DatabaseError> {
|
) -> Result<(), DatabaseError> {
|
||||||
|
let transaction = db_client.transaction().await?;
|
||||||
if ethereum_contract_replaced {
|
if ethereum_contract_replaced {
|
||||||
// Ethereum subscription configuration is stored in contract.
|
// Ethereum subscription configuration is stored in contract.
|
||||||
// If contract is replaced, payment option needs to be deleted.
|
// If contract is replaced, payment option needs to be deleted.
|
||||||
db_client.execute(
|
transaction.execute(
|
||||||
"
|
"
|
||||||
UPDATE actor_profile
|
UPDATE actor_profile
|
||||||
SET payment_options = '[]'
|
SET payment_options = '[]'
|
||||||
|
@ -174,14 +175,18 @@ pub async fn reset_subscriptions(
|
||||||
&[&i16::from(&PaymentType::EthereumSubscription)],
|
&[&i16::from(&PaymentType::EthereumSubscription)],
|
||||||
).await?;
|
).await?;
|
||||||
};
|
};
|
||||||
db_client.execute(
|
transaction.execute(
|
||||||
"
|
"
|
||||||
DELETE FROM relationship
|
DELETE FROM relationship
|
||||||
WHERE relationship_type = $1
|
WHERE relationship_type = $1
|
||||||
",
|
",
|
||||||
&[&RelationshipType::Subscription],
|
&[&RelationshipType::Subscription],
|
||||||
).await?;
|
).await?;
|
||||||
db_client.execute("DELETE FROM subscription", &[]).await?;
|
transaction.execute(
|
||||||
|
"UPDATE actor_profile SET subscriber_count = 0", &[],
|
||||||
|
).await?;
|
||||||
|
transaction.execute("DELETE FROM subscription", &[]).await?;
|
||||||
|
transaction.commit().await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue