Add auto follow instances index support

This commit is contained in:
Chocobozzz 2019-09-04 11:18:33 +02:00 committed by Chocobozzz
parent 8424c4026a
commit 6f1b4fa417
8 changed files with 234 additions and 15 deletions

View file

@ -115,6 +115,7 @@ import { UpdateVideosScheduler } from './server/lib/schedulers/update-videos-sch
import { YoutubeDlUpdateScheduler } from './server/lib/schedulers/youtube-dl-update-scheduler'
import { VideosRedundancyScheduler } from './server/lib/schedulers/videos-redundancy-scheduler'
import { RemoveOldHistoryScheduler } from './server/lib/schedulers/remove-old-history-scheduler'
import { AutoFollowIndexInstances } from './server/lib/schedulers/auto-follow-index-instances'
import { isHTTPSignatureDigestValid } from './server/helpers/peertube-crypto'
import { PeerTubeSocket } from './server/lib/peertube-socket'
import { updateStreamingPlaylistsInfohashesIfNeeded } from './server/lib/hls'
@ -260,6 +261,7 @@ async function startApplication () {
RemoveOldHistoryScheduler.Instance.enable()
RemoveOldViewsScheduler.Instance.enable()
PluginsCheckScheduler.Instance.enable()
AutoFollowIndexInstances.Instance.enable()
// Redis initialization
Redis.Instance.init()

View file

@ -168,10 +168,15 @@ const SCHEDULER_INTERVALS_MS = {
updateVideos: 60000, // 1 minute
youtubeDLUpdate: 60000 * 60 * 24, // 1 day
checkPlugins: CONFIG.PLUGINS.INDEX.CHECK_LATEST_VERSIONS_INTERVAL,
autoFollowIndexInstances: 60000 * 60 * 24, // 1 day
removeOldViews: 60000 * 60 * 24, // 1 day
removeOldHistory: 60000 * 60 * 24 // 1 day
}
const INSTANCES_INDEX = {
HOSTS_PATH: '/api/v1/instances/hosts'
}
// ---------------------------------------------------------------------------
const CONSTRAINTS_FIELDS = {
@ -633,6 +638,7 @@ if (isTestInstance() === true) {
SCHEDULER_INTERVALS_MS.removeOldHistory = 5000
SCHEDULER_INTERVALS_MS.removeOldViews = 5000
SCHEDULER_INTERVALS_MS.updateVideos = 5000
SCHEDULER_INTERVALS_MS.autoFollowIndexInstances = 5000
REPEAT_JOBS[ 'videos-views' ] = { every: 5000 }
REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR = 1
@ -683,6 +689,7 @@ export {
PREVIEWS_SIZE,
REMOTE_SCHEME,
FOLLOW_STATES,
INSTANCES_INDEX,
DEFAULT_USER_THEME_NAME,
SERVER_ACTOR_NAME,
PLUGIN_GLOBAL_CSS_FILE_NAME,

View file

@ -0,0 +1,72 @@
import { logger } from '../../helpers/logger'
import { AbstractScheduler } from './abstract-scheduler'
import { INSTANCES_INDEX, SCHEDULER_INTERVALS_MS, SERVER_ACTOR_NAME } from '../../initializers/constants'
import { CONFIG } from '../../initializers/config'
import { chunk } from 'lodash'
import { doRequest } from '@server/helpers/requests'
import { ActorFollowModel } from '@server/models/activitypub/actor-follow'
import { JobQueue } from '@server/lib/job-queue'
import { getServerActor } from '@server/helpers/utils'
export class AutoFollowIndexInstances extends AbstractScheduler {
private static instance: AbstractScheduler
protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.autoFollowIndexInstances
private lastCheck: Date
private constructor () {
super()
}
protected async internalExecute () {
return this.autoFollow()
}
private async autoFollow () {
if (CONFIG.FOLLOWINGS.INSTANCE.AUTO_FOLLOW_INDEX.ENABLED === false) return
const indexUrl = CONFIG.FOLLOWINGS.INSTANCE.AUTO_FOLLOW_INDEX.INDEX_URL
logger.info('Auto follow instances of index %s.', indexUrl)
try {
const serverActor = await getServerActor()
const uri = indexUrl + INSTANCES_INDEX.HOSTS_PATH
const qs = this.lastCheck ? { since: this.lastCheck.toISOString() } : {}
this.lastCheck = new Date()
const { body } = await doRequest({ uri, qs, json: true })
const hosts: string[] = body.data.map(o => o.host)
const chunks = chunk(hosts, 20)
for (const chunk of chunks) {
const unfollowedHosts = await ActorFollowModel.keepUnfollowedInstance(chunk)
for (const unfollowedHost of unfollowedHosts) {
const payload = {
host: unfollowedHost,
name: SERVER_ACTOR_NAME,
followerActorId: serverActor.id,
isAutoFollow: true
}
await JobQueue.Instance.createJob({ type: 'activitypub-follow', payload })
.catch(err => logger.error('Cannot create follow job for %s.', unfollowedHost, err))
}
}
} catch (err) {
logger.error('Cannot auto follow hosts of index %s.', indexUrl, { err })
}
}
static get Instance () {
return this.instance || (this.instance = new this())
}
}

View file

@ -1,5 +1,5 @@
import * as Bluebird from 'bluebird'
import { values } from 'lodash'
import { values, difference } from 'lodash'
import {
AfterCreate,
AfterDestroy,
@ -21,7 +21,7 @@ import { FollowState } from '../../../shared/models/actors'
import { ActorFollow } from '../../../shared/models/actors/follow.model'
import { logger } from '../../helpers/logger'
import { getServerActor } from '../../helpers/utils'
import { ACTOR_FOLLOW_SCORE, FOLLOW_STATES } from '../../initializers/constants'
import { ACTOR_FOLLOW_SCORE, FOLLOW_STATES, SERVER_ACTOR_NAME } from '../../initializers/constants'
import { ServerModel } from '../server/server'
import { createSafeIn, getSort } from '../utils'
import { ActorModel, unusedActorAttributesForAPI } from './actor'
@ -435,6 +435,45 @@ export class ActorFollowModel extends Model<ActorFollowModel> {
})
}
static async keepUnfollowedInstance (hosts: string[]) {
const followerId = (await getServerActor()).id
const query = {
attributes: [],
where: {
actorId: followerId
},
include: [
{
attributes: [ ],
model: ActorModel.unscoped(),
required: true,
as: 'ActorFollowing',
where: {
preferredUsername: SERVER_ACTOR_NAME
},
include: [
{
attributes: [ 'host' ],
model: ServerModel.unscoped(),
required: true,
where: {
host: {
[Op.in]: hosts
}
}
}
]
}
]
}
const res = await ActorFollowModel.findAll(query)
const followedHosts = res.map(res => res.ActorFollowing.Server.host)
return difference(hosts, followedHosts)
}
static listAcceptedFollowerUrlsForAP (actorIds: number[], t: Transaction, start?: number, count?: number) {
return ActorFollowModel.createListAcceptedFollowForApiQuery('followers', actorIds, t, start, count)
}

View file

@ -6,10 +6,12 @@ import {
acceptFollower,
cleanupTests,
flushAndRunMultipleServers,
MockInstancesIndex,
ServerInfo,
setAccessTokensToServers,
unfollow,
updateCustomSubConfig
updateCustomSubConfig,
wait
} from '../../../../shared/extra-utils/index'
import { follow, getFollowersListPaginationAndSort, getFollowingListPaginationAndSort } from '../../../../shared/extra-utils/server/follows'
import { waitJobs } from '../../../../shared/extra-utils/server/jobs'
@ -22,13 +24,14 @@ async function checkFollow (follower: ServerInfo, following: ServerInfo, exists:
const res = await getFollowersListPaginationAndSort(following.url, 0, 5, '-createdAt')
const follows = res.body.data as ActorFollow[]
if (exists === true) {
expect(res.body.total).to.equal(1)
const follow = follows.find(f => {
return f.follower.host === follower.host && f.state === 'accepted'
})
expect(follows[ 0 ].follower.host).to.equal(follower.host)
expect(follows[ 0 ].state).to.equal('accepted')
if (exists === true) {
expect(follow).to.exist
} else {
expect(follows.filter(f => f.state === 'accepted')).to.have.lengthOf(0)
expect(follow).to.be.undefined
}
}
@ -36,13 +39,14 @@ async function checkFollow (follower: ServerInfo, following: ServerInfo, exists:
const res = await getFollowingListPaginationAndSort(follower.url, 0, 5, '-createdAt')
const follows = res.body.data as ActorFollow[]
if (exists === true) {
expect(res.body.total).to.equal(1)
const follow = follows.find(f => {
return f.following.host === following.host && f.state === 'accepted'
})
expect(follows[ 0 ].following.host).to.equal(following.host)
expect(follows[ 0 ].state).to.equal('accepted')
if (exists === true) {
expect(follow).to.exist
} else {
expect(follows.filter(f => f.state === 'accepted')).to.have.lengthOf(0)
expect(follow).to.be.undefined
}
}
}
@ -71,7 +75,7 @@ describe('Test auto follows', function () {
before(async function () {
this.timeout(30000)
servers = await flushAndRunMultipleServers(2)
servers = await flushAndRunMultipleServers(3)
// Get the access tokens
await setAccessTokensToServers(servers)
@ -142,6 +146,61 @@ describe('Test auto follows', function () {
})
})
describe('Auto follow index', function () {
const instanceIndexServer = new MockInstancesIndex()
before(async () => {
await instanceIndexServer.initialize()
})
it('Should not auto follow index if the option is not enabled', async function () {
this.timeout(30000)
await wait(5000)
await waitJobs(servers)
await checkFollow(servers[ 0 ], servers[ 1 ], false)
await checkFollow(servers[ 1 ], servers[ 0 ], false)
})
it('Should auto follow the index', async function () {
this.timeout(30000)
instanceIndexServer.addInstance(servers[1].host)
const config = {
followings: {
instance: {
autoFollowIndex: {
indexUrl: 'http://localhost:42100',
enabled: true
}
}
}
}
await updateCustomSubConfig(servers[0].url, servers[0].accessToken, config)
await wait(5000)
await waitJobs(servers)
await checkFollow(servers[ 0 ], servers[ 1 ], true)
await resetFollows(servers)
})
it('Should follow new added instances in the index but not old ones', async function () {
this.timeout(30000)
instanceIndexServer.addInstance(servers[2].host)
await wait(5000)
await waitJobs(servers)
await checkFollow(servers[ 0 ], servers[ 1 ], false)
await checkFollow(servers[ 0 ], servers[ 2 ], true)
})
})
after(async function () {
await cleanupTests(servers)
})

View file

@ -24,4 +24,5 @@ export * from './videos/video-streaming-playlists'
export * from './videos/videos'
export * from './videos/video-change-ownership'
export * from './feeds/feeds'
export * from './instances-index/mock-instances-index'
export * from './search/videos'

View file

@ -0,0 +1,38 @@
import * as express from 'express'
export class MockInstancesIndex {
private indexInstances: { host: string, createdAt: string }[] = []
initialize () {
return new Promise(res => {
const app = express()
app.use('/', (req: express.Request, res: express.Response, next: express.NextFunction) => {
if (process.env.DEBUG) console.log('Receiving request on mocked server %s.', req.url)
return next()
})
app.get('/api/v1/instances/hosts', (req: express.Request, res: express.Response) => {
const since = req.query.since
const filtered = this.indexInstances.filter(i => {
if (!since) return true
return i.createdAt > since
})
return res.json({
total: filtered.length,
data: filtered
})
})
app.listen(42100, () => res())
})
}
addInstance (host: string) {
this.indexInstances.push({ host, createdAt: new Date().toISOString() })
}
}

View file

@ -17,7 +17,8 @@
"typeRoots": [ "node_modules/@types", "server/typings" ],
"baseUrl": "./",
"paths": {
"@server/*": [ "server/*" ]
"@server/*": [ "server/*" ],
"@shared/*": [ "shared/*" ]
}
},
"exclude": [