forked from mirrors/gotosocial
908 lines
27 KiB
Go
908 lines
27 KiB
Go
|
package pub
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"encoding/json"
|
||
|
"fmt"
|
||
|
"github.com/go-fed/activity/streams"
|
||
|
"github.com/go-fed/activity/streams/vocab"
|
||
|
"net/url"
|
||
|
)
|
||
|
|
||
|
// OnFollowBehavior enumerates the different default actions that the go-fed
|
||
|
// library can provide when receiving a Follow Activity from a peer.
|
||
|
type OnFollowBehavior int
|
||
|
|
||
|
const (
|
||
|
// OnFollowDoNothing does not take any action when a Follow Activity
|
||
|
// is received.
|
||
|
OnFollowDoNothing OnFollowBehavior = iota
|
||
|
// OnFollowAutomaticallyAccept triggers the side effect of sending an
|
||
|
// Accept of this Follow request in response.
|
||
|
OnFollowAutomaticallyAccept
|
||
|
// OnFollowAutomaticallyAccept triggers the side effect of sending a
|
||
|
// Reject of this Follow request in response.
|
||
|
OnFollowAutomaticallyReject
|
||
|
)
|
||
|
|
||
|
// FederatingWrappedCallbacks lists the callback functions that already have
|
||
|
// some side effect behavior provided by the pub library.
|
||
|
//
|
||
|
// These functions are wrapped for the Federating Protocol.
|
||
|
type FederatingWrappedCallbacks struct {
|
||
|
// Create handles additional side effects for the Create ActivityStreams
|
||
|
// type, specific to the application using go-fed.
|
||
|
//
|
||
|
// The wrapping callback for the Federating Protocol ensures the
|
||
|
// 'object' property is created in the database.
|
||
|
//
|
||
|
// Create calls Create for each object in the federated Activity.
|
||
|
Create func(context.Context, vocab.ActivityStreamsCreate) error
|
||
|
// Update handles additional side effects for the Update ActivityStreams
|
||
|
// type, specific to the application using go-fed.
|
||
|
//
|
||
|
// The wrapping callback for the Federating Protocol ensures the
|
||
|
// 'object' property is updated in the database.
|
||
|
//
|
||
|
// Update calls Update on the federated entry from the database, with a
|
||
|
// new value.
|
||
|
Update func(context.Context, vocab.ActivityStreamsUpdate) error
|
||
|
// Delete handles additional side effects for the Delete ActivityStreams
|
||
|
// type, specific to the application using go-fed.
|
||
|
//
|
||
|
// Delete removes the federated entry from the database.
|
||
|
Delete func(context.Context, vocab.ActivityStreamsDelete) error
|
||
|
// Follow handles additional side effects for the Follow ActivityStreams
|
||
|
// type, specific to the application using go-fed.
|
||
|
//
|
||
|
// The wrapping function can have one of several default behaviors,
|
||
|
// depending on the value of the OnFollow setting.
|
||
|
Follow func(context.Context, vocab.ActivityStreamsFollow) error
|
||
|
// OnFollow determines what action to take for this particular callback
|
||
|
// if a Follow Activity is handled.
|
||
|
OnFollow OnFollowBehavior
|
||
|
// Accept handles additional side effects for the Accept ActivityStreams
|
||
|
// type, specific to the application using go-fed.
|
||
|
//
|
||
|
// The wrapping function determines if this 'Accept' is in response to a
|
||
|
// 'Follow'. If so, then the 'actor' is added to the original 'actor's
|
||
|
// 'following' collection.
|
||
|
//
|
||
|
// Otherwise, no side effects are done by go-fed.
|
||
|
Accept func(context.Context, vocab.ActivityStreamsAccept) error
|
||
|
// Reject handles additional side effects for the Reject ActivityStreams
|
||
|
// type, specific to the application using go-fed.
|
||
|
//
|
||
|
// The wrapping function has no default side effects. However, if this
|
||
|
// 'Reject' is in response to a 'Follow' then the client MUST NOT go
|
||
|
// forward with adding the 'actor' to the original 'actor's 'following'
|
||
|
// collection by the client application.
|
||
|
Reject func(context.Context, vocab.ActivityStreamsReject) error
|
||
|
// Add handles additional side effects for the Add ActivityStreams
|
||
|
// type, specific to the application using go-fed.
|
||
|
//
|
||
|
// The wrapping function will add the 'object' IRIs to a specific
|
||
|
// 'target' collection if the 'target' collection(s) live on this
|
||
|
// server.
|
||
|
Add func(context.Context, vocab.ActivityStreamsAdd) error
|
||
|
// Remove handles additional side effects for the Remove ActivityStreams
|
||
|
// type, specific to the application using go-fed.
|
||
|
//
|
||
|
// The wrapping function will remove all 'object' IRIs from a specific
|
||
|
// 'target' collection if the 'target' collection(s) live on this
|
||
|
// server.
|
||
|
Remove func(context.Context, vocab.ActivityStreamsRemove) error
|
||
|
// Like handles additional side effects for the Like ActivityStreams
|
||
|
// type, specific to the application using go-fed.
|
||
|
//
|
||
|
// The wrapping function will add the activity to the "likes" collection
|
||
|
// on all 'object' targets owned by this server.
|
||
|
Like func(context.Context, vocab.ActivityStreamsLike) error
|
||
|
// Announce handles additional side effects for the Announce
|
||
|
// ActivityStreams type, specific to the application using go-fed.
|
||
|
//
|
||
|
// The wrapping function will add the activity to the "shares"
|
||
|
// collection on all 'object' targets owned by this server.
|
||
|
Announce func(context.Context, vocab.ActivityStreamsAnnounce) error
|
||
|
// Undo handles additional side effects for the Undo ActivityStreams
|
||
|
// type, specific to the application using go-fed.
|
||
|
//
|
||
|
// The wrapping function ensures the 'actor' on the 'Undo'
|
||
|
// is be the same as the 'actor' on all Activities being undone.
|
||
|
// It enforces that the actors on the Undo must correspond to all of the
|
||
|
// 'object' actors in some manner.
|
||
|
//
|
||
|
// It is expected that the application will implement the proper
|
||
|
// reversal of activities that are being undone.
|
||
|
Undo func(context.Context, vocab.ActivityStreamsUndo) error
|
||
|
// Block handles additional side effects for the Block ActivityStreams
|
||
|
// type, specific to the application using go-fed.
|
||
|
//
|
||
|
// The wrapping function provides no default side effects. It simply
|
||
|
// calls the wrapped function. However, note that Blocks should not be
|
||
|
// received from a federated peer, as delivering Blocks explicitly
|
||
|
// deviates from the original ActivityPub specification.
|
||
|
Block func(context.Context, vocab.ActivityStreamsBlock) error
|
||
|
|
||
|
// Sidechannel data -- this is set at request handling time. These must
|
||
|
// be set before the callbacks are used.
|
||
|
|
||
|
// db is the Database the FederatingWrappedCallbacks should use.
|
||
|
db Database
|
||
|
// inboxIRI is the inboxIRI that is handling this callback.
|
||
|
inboxIRI *url.URL
|
||
|
// addNewIds creates new 'id' entries on an activity and its objects if
|
||
|
// it is a Create activity.
|
||
|
addNewIds func(c context.Context, activity Activity) error
|
||
|
// deliver delivers an outgoing message.
|
||
|
deliver func(c context.Context, outboxIRI *url.URL, activity Activity) error
|
||
|
// newTransport creates a new Transport.
|
||
|
newTransport func(c context.Context, actorBoxIRI *url.URL, gofedAgent string) (t Transport, err error)
|
||
|
}
|
||
|
|
||
|
// callbacks returns the WrappedCallbacks members into a single interface slice
|
||
|
// for use in streams.Resolver callbacks.
|
||
|
//
|
||
|
// If the given functions have a type that collides with the default behavior,
|
||
|
// then disable our default behavior
|
||
|
func (w FederatingWrappedCallbacks) callbacks(fns []interface{}) []interface{} {
|
||
|
enableCreate := true
|
||
|
enableUpdate := true
|
||
|
enableDelete := true
|
||
|
enableFollow := true
|
||
|
enableAccept := true
|
||
|
enableReject := true
|
||
|
enableAdd := true
|
||
|
enableRemove := true
|
||
|
enableLike := true
|
||
|
enableAnnounce := true
|
||
|
enableUndo := true
|
||
|
enableBlock := true
|
||
|
for _, fn := range fns {
|
||
|
switch fn.(type) {
|
||
|
default:
|
||
|
continue
|
||
|
case func(context.Context, vocab.ActivityStreamsCreate) error:
|
||
|
enableCreate = false
|
||
|
case func(context.Context, vocab.ActivityStreamsUpdate) error:
|
||
|
enableUpdate = false
|
||
|
case func(context.Context, vocab.ActivityStreamsDelete) error:
|
||
|
enableDelete = false
|
||
|
case func(context.Context, vocab.ActivityStreamsFollow) error:
|
||
|
enableFollow = false
|
||
|
case func(context.Context, vocab.ActivityStreamsAccept) error:
|
||
|
enableAccept = false
|
||
|
case func(context.Context, vocab.ActivityStreamsReject) error:
|
||
|
enableReject = false
|
||
|
case func(context.Context, vocab.ActivityStreamsAdd) error:
|
||
|
enableAdd = false
|
||
|
case func(context.Context, vocab.ActivityStreamsRemove) error:
|
||
|
enableRemove = false
|
||
|
case func(context.Context, vocab.ActivityStreamsLike) error:
|
||
|
enableLike = false
|
||
|
case func(context.Context, vocab.ActivityStreamsAnnounce) error:
|
||
|
enableAnnounce = false
|
||
|
case func(context.Context, vocab.ActivityStreamsUndo) error:
|
||
|
enableUndo = false
|
||
|
case func(context.Context, vocab.ActivityStreamsBlock) error:
|
||
|
enableBlock = false
|
||
|
}
|
||
|
}
|
||
|
if enableCreate {
|
||
|
fns = append(fns, w.create)
|
||
|
}
|
||
|
if enableUpdate {
|
||
|
fns = append(fns, w.update)
|
||
|
}
|
||
|
if enableDelete {
|
||
|
fns = append(fns, w.deleteFn)
|
||
|
}
|
||
|
if enableFollow {
|
||
|
fns = append(fns, w.follow)
|
||
|
}
|
||
|
if enableAccept {
|
||
|
fns = append(fns, w.accept)
|
||
|
}
|
||
|
if enableReject {
|
||
|
fns = append(fns, w.reject)
|
||
|
}
|
||
|
if enableAdd {
|
||
|
fns = append(fns, w.add)
|
||
|
}
|
||
|
if enableRemove {
|
||
|
fns = append(fns, w.remove)
|
||
|
}
|
||
|
if enableLike {
|
||
|
fns = append(fns, w.like)
|
||
|
}
|
||
|
if enableAnnounce {
|
||
|
fns = append(fns, w.announce)
|
||
|
}
|
||
|
if enableUndo {
|
||
|
fns = append(fns, w.undo)
|
||
|
}
|
||
|
if enableBlock {
|
||
|
fns = append(fns, w.block)
|
||
|
}
|
||
|
return fns
|
||
|
}
|
||
|
|
||
|
// create implements the federating Create activity side effects.
|
||
|
func (w FederatingWrappedCallbacks) create(c context.Context, a vocab.ActivityStreamsCreate) error {
|
||
|
op := a.GetActivityStreamsObject()
|
||
|
if op == nil || op.Len() == 0 {
|
||
|
return ErrObjectRequired
|
||
|
}
|
||
|
// Create anonymous loop function to be able to properly scope the defer
|
||
|
// for the database lock at each iteration.
|
||
|
loopFn := func(iter vocab.ActivityStreamsObjectPropertyIterator) error {
|
||
|
t := iter.GetType()
|
||
|
if t == nil && iter.IsIRI() {
|
||
|
// Attempt to dereference the IRI instead
|
||
|
tport, err := w.newTransport(c, w.inboxIRI, goFedUserAgent())
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
b, err := tport.Dereference(c, iter.GetIRI())
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
var m map[string]interface{}
|
||
|
if err = json.Unmarshal(b, &m); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
t, err = streams.ToType(c, m)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
} else if t == nil {
|
||
|
return fmt.Errorf("cannot handle federated create: object is neither a value nor IRI")
|
||
|
}
|
||
|
id, err := GetId(t)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
err = w.db.Lock(c, id)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
defer w.db.Unlock(c, id)
|
||
|
if err := w.db.Create(c, t); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
for iter := op.Begin(); iter != op.End(); iter = iter.Next() {
|
||
|
if err := loopFn(iter); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
}
|
||
|
if w.Create != nil {
|
||
|
return w.Create(c, a)
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// update implements the federating Update activity side effects.
|
||
|
func (w FederatingWrappedCallbacks) update(c context.Context, a vocab.ActivityStreamsUpdate) error {
|
||
|
op := a.GetActivityStreamsObject()
|
||
|
if op == nil || op.Len() == 0 {
|
||
|
return ErrObjectRequired
|
||
|
}
|
||
|
if err := mustHaveActivityOriginMatchObjects(a); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
// Create anonymous loop function to be able to properly scope the defer
|
||
|
// for the database lock at each iteration.
|
||
|
loopFn := func(iter vocab.ActivityStreamsObjectPropertyIterator) error {
|
||
|
t := iter.GetType()
|
||
|
if t == nil {
|
||
|
return fmt.Errorf("update requires an object to be wholly provided")
|
||
|
}
|
||
|
id, err := GetId(t)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
err = w.db.Lock(c, id)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
defer w.db.Unlock(c, id)
|
||
|
if err := w.db.Update(c, t); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
for iter := op.Begin(); iter != op.End(); iter = iter.Next() {
|
||
|
if err := loopFn(iter); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
}
|
||
|
if w.Update != nil {
|
||
|
return w.Update(c, a)
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// deleteFn implements the federating Delete activity side effects.
|
||
|
func (w FederatingWrappedCallbacks) deleteFn(c context.Context, a vocab.ActivityStreamsDelete) error {
|
||
|
op := a.GetActivityStreamsObject()
|
||
|
if op == nil || op.Len() == 0 {
|
||
|
return ErrObjectRequired
|
||
|
}
|
||
|
if err := mustHaveActivityOriginMatchObjects(a); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
// Create anonymous loop function to be able to properly scope the defer
|
||
|
// for the database lock at each iteration.
|
||
|
loopFn := func(iter vocab.ActivityStreamsObjectPropertyIterator) error {
|
||
|
id, err := ToId(iter)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
err = w.db.Lock(c, id)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
defer w.db.Unlock(c, id)
|
||
|
if err := w.db.Delete(c, id); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
for iter := op.Begin(); iter != op.End(); iter = iter.Next() {
|
||
|
if err := loopFn(iter); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
}
|
||
|
if w.Delete != nil {
|
||
|
return w.Delete(c, a)
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// follow implements the federating Follow activity side effects.
|
||
|
func (w FederatingWrappedCallbacks) follow(c context.Context, a vocab.ActivityStreamsFollow) error {
|
||
|
op := a.GetActivityStreamsObject()
|
||
|
if op == nil || op.Len() == 0 {
|
||
|
return ErrObjectRequired
|
||
|
}
|
||
|
// Check that we own at least one of the 'object' properties, and ensure
|
||
|
// it is to the actor that owns this inbox.
|
||
|
//
|
||
|
// If not then don't send a response. It was federated to us as an FYI,
|
||
|
// by mistake, or some other reason.
|
||
|
if err := w.db.Lock(c, w.inboxIRI); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
// WARNING: Unlock not deferred.
|
||
|
actorIRI, err := w.db.ActorForInbox(c, w.inboxIRI)
|
||
|
if err != nil {
|
||
|
w.db.Unlock(c, w.inboxIRI)
|
||
|
return err
|
||
|
}
|
||
|
w.db.Unlock(c, w.inboxIRI)
|
||
|
// Unlock must be called by now and every branch above.
|
||
|
isMe := false
|
||
|
if w.OnFollow != OnFollowDoNothing {
|
||
|
for iter := op.Begin(); iter != op.End(); iter = iter.Next() {
|
||
|
id, err := ToId(iter)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
if id.String() == actorIRI.String() {
|
||
|
isMe = true
|
||
|
break
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
if isMe {
|
||
|
// Prepare the response.
|
||
|
var response Activity
|
||
|
if w.OnFollow == OnFollowAutomaticallyAccept {
|
||
|
response = streams.NewActivityStreamsAccept()
|
||
|
} else if w.OnFollow == OnFollowAutomaticallyReject {
|
||
|
response = streams.NewActivityStreamsReject()
|
||
|
} else {
|
||
|
return fmt.Errorf("unknown OnFollowBehavior: %d", w.OnFollow)
|
||
|
}
|
||
|
// Set us as the 'actor'.
|
||
|
me := streams.NewActivityStreamsActorProperty()
|
||
|
response.SetActivityStreamsActor(me)
|
||
|
me.AppendIRI(actorIRI)
|
||
|
// Set the Follow as the 'object' property.
|
||
|
op := streams.NewActivityStreamsObjectProperty()
|
||
|
response.SetActivityStreamsObject(op)
|
||
|
op.AppendActivityStreamsFollow(a)
|
||
|
// Add all actors on the original Follow to the 'to' property.
|
||
|
recipients := make([]*url.URL, 0)
|
||
|
to := streams.NewActivityStreamsToProperty()
|
||
|
response.SetActivityStreamsTo(to)
|
||
|
followActors := a.GetActivityStreamsActor()
|
||
|
for iter := followActors.Begin(); iter != followActors.End(); iter = iter.Next() {
|
||
|
id, err := ToId(iter)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
to.AppendIRI(id)
|
||
|
recipients = append(recipients, id)
|
||
|
}
|
||
|
if w.OnFollow == OnFollowAutomaticallyAccept {
|
||
|
// If automatically accepting, then also update our
|
||
|
// followers collection with the new actors.
|
||
|
//
|
||
|
// If automatically rejecting, do not update the
|
||
|
// followers collection.
|
||
|
if err := w.db.Lock(c, actorIRI); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
// WARNING: Unlock not deferred.
|
||
|
followers, err := w.db.Followers(c, actorIRI)
|
||
|
if err != nil {
|
||
|
w.db.Unlock(c, actorIRI)
|
||
|
return err
|
||
|
}
|
||
|
items := followers.GetActivityStreamsItems()
|
||
|
if items == nil {
|
||
|
items = streams.NewActivityStreamsItemsProperty()
|
||
|
followers.SetActivityStreamsItems(items)
|
||
|
}
|
||
|
for _, elem := range recipients {
|
||
|
items.PrependIRI(elem)
|
||
|
}
|
||
|
if err = w.db.Update(c, followers); err != nil {
|
||
|
w.db.Unlock(c, actorIRI)
|
||
|
return err
|
||
|
}
|
||
|
w.db.Unlock(c, actorIRI)
|
||
|
// Unlock must be called by now and every branch above.
|
||
|
}
|
||
|
// Lock without defer!
|
||
|
w.db.Lock(c, w.inboxIRI)
|
||
|
outboxIRI, err := w.db.OutboxForInbox(c, w.inboxIRI)
|
||
|
if err != nil {
|
||
|
w.db.Unlock(c, w.inboxIRI)
|
||
|
return err
|
||
|
}
|
||
|
w.db.Unlock(c, w.inboxIRI)
|
||
|
// Everything must be unlocked by now.
|
||
|
if err := w.addNewIds(c, response); err != nil {
|
||
|
return err
|
||
|
} else if err := w.deliver(c, outboxIRI, response); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
}
|
||
|
if w.Follow != nil {
|
||
|
return w.Follow(c, a)
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// accept implements the federating Accept activity side effects.
|
||
|
func (w FederatingWrappedCallbacks) accept(c context.Context, a vocab.ActivityStreamsAccept) error {
|
||
|
op := a.GetActivityStreamsObject()
|
||
|
if op != nil && op.Len() > 0 {
|
||
|
// Get this actor's id.
|
||
|
if err := w.db.Lock(c, w.inboxIRI); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
// WARNING: Unlock not deferred.
|
||
|
actorIRI, err := w.db.ActorForInbox(c, w.inboxIRI)
|
||
|
if err != nil {
|
||
|
w.db.Unlock(c, w.inboxIRI)
|
||
|
return err
|
||
|
}
|
||
|
w.db.Unlock(c, w.inboxIRI)
|
||
|
// Unlock must be called by now and every branch above.
|
||
|
//
|
||
|
// Determine if we are in a follow on the 'object' property.
|
||
|
//
|
||
|
// TODO: Handle Accept multiple Follow.
|
||
|
var maybeMyFollowIRI *url.URL
|
||
|
for iter := op.Begin(); iter != op.End(); iter = iter.Next() {
|
||
|
t := iter.GetType()
|
||
|
if t == nil && iter.IsIRI() {
|
||
|
// Attempt to dereference the IRI instead
|
||
|
tport, err := w.newTransport(c, w.inboxIRI, goFedUserAgent())
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
b, err := tport.Dereference(c, iter.GetIRI())
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
var m map[string]interface{}
|
||
|
if err = json.Unmarshal(b, &m); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
t, err = streams.ToType(c, m)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
} else if t == nil {
|
||
|
return fmt.Errorf("cannot handle federated create: object is neither a value nor IRI")
|
||
|
}
|
||
|
// Ensure it is a Follow.
|
||
|
if !streams.IsOrExtendsActivityStreamsFollow(t) {
|
||
|
continue
|
||
|
}
|
||
|
follow, ok := t.(Activity)
|
||
|
if !ok {
|
||
|
return fmt.Errorf("a Follow in an Accept does not satisfy the Activity interface")
|
||
|
}
|
||
|
followId, err := GetId(follow)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
// Ensure that we are one of the actors on the Follow.
|
||
|
actors := follow.GetActivityStreamsActor()
|
||
|
for iter := actors.Begin(); iter != actors.End(); iter = iter.Next() {
|
||
|
id, err := ToId(iter)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
if id.String() == actorIRI.String() {
|
||
|
maybeMyFollowIRI = followId
|
||
|
break
|
||
|
}
|
||
|
}
|
||
|
// Continue breaking if we found ourselves
|
||
|
if maybeMyFollowIRI != nil {
|
||
|
break
|
||
|
}
|
||
|
}
|
||
|
// If we received an Accept whose 'object' is a Follow with an
|
||
|
// Accept that we sent, add to the following collection.
|
||
|
if maybeMyFollowIRI != nil {
|
||
|
// Verify our Follow request exists and the peer didn't
|
||
|
// fabricate it.
|
||
|
activityActors := a.GetActivityStreamsActor()
|
||
|
if activityActors == nil || activityActors.Len() == 0 {
|
||
|
return fmt.Errorf("an Accept with a Follow has no actors")
|
||
|
}
|
||
|
// This may be a duplicate check if we dereferenced the
|
||
|
// Follow above. TODO: Separate this logic to avoid
|
||
|
// redundancy.
|
||
|
//
|
||
|
// Use an anonymous function to properly scope the
|
||
|
// database lock, immediately call it.
|
||
|
err = func() error {
|
||
|
if err := w.db.Lock(c, maybeMyFollowIRI); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
defer w.db.Unlock(c, maybeMyFollowIRI)
|
||
|
t, err := w.db.Get(c, maybeMyFollowIRI)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
if !streams.IsOrExtendsActivityStreamsFollow(t) {
|
||
|
return fmt.Errorf("peer gave an Accept wrapping a Follow but provided a non-Follow id")
|
||
|
}
|
||
|
follow, ok := t.(Activity)
|
||
|
if !ok {
|
||
|
return fmt.Errorf("a Follow in an Accept does not satisfy the Activity interface")
|
||
|
}
|
||
|
// Ensure that we are one of the actors on the Follow.
|
||
|
ok = false
|
||
|
actors := follow.GetActivityStreamsActor()
|
||
|
for iter := actors.Begin(); iter != actors.End(); iter = iter.Next() {
|
||
|
id, err := ToId(iter)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
if id.String() == actorIRI.String() {
|
||
|
ok = true
|
||
|
break
|
||
|
}
|
||
|
}
|
||
|
if !ok {
|
||
|
return fmt.Errorf("peer gave an Accept wrapping a Follow but we are not the actor on that Follow")
|
||
|
}
|
||
|
// Build map of original Accept actors
|
||
|
acceptActors := make(map[string]bool)
|
||
|
for iter := activityActors.Begin(); iter != activityActors.End(); iter = iter.Next() {
|
||
|
id, err := ToId(iter)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
acceptActors[id.String()] = false
|
||
|
}
|
||
|
// Verify all actor(s) were on the original Follow.
|
||
|
followObj := follow.GetActivityStreamsObject()
|
||
|
for iter := followObj.Begin(); iter != followObj.End(); iter = iter.Next() {
|
||
|
id, err := ToId(iter)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
if _, ok := acceptActors[id.String()]; ok {
|
||
|
acceptActors[id.String()] = true
|
||
|
}
|
||
|
}
|
||
|
for _, found := range acceptActors {
|
||
|
if !found {
|
||
|
return fmt.Errorf("peer gave an Accept wrapping a Follow but was not an object in the original Follow")
|
||
|
}
|
||
|
}
|
||
|
return nil
|
||
|
}()
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
// Add the peer to our following collection.
|
||
|
if err := w.db.Lock(c, actorIRI); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
// WARNING: Unlock not deferred.
|
||
|
following, err := w.db.Following(c, actorIRI)
|
||
|
if err != nil {
|
||
|
w.db.Unlock(c, actorIRI)
|
||
|
return err
|
||
|
}
|
||
|
items := following.GetActivityStreamsItems()
|
||
|
if items == nil {
|
||
|
items = streams.NewActivityStreamsItemsProperty()
|
||
|
following.SetActivityStreamsItems(items)
|
||
|
}
|
||
|
for iter := activityActors.Begin(); iter != activityActors.End(); iter = iter.Next() {
|
||
|
id, err := ToId(iter)
|
||
|
if err != nil {
|
||
|
w.db.Unlock(c, actorIRI)
|
||
|
return err
|
||
|
}
|
||
|
items.PrependIRI(id)
|
||
|
}
|
||
|
if err = w.db.Update(c, following); err != nil {
|
||
|
w.db.Unlock(c, actorIRI)
|
||
|
return err
|
||
|
}
|
||
|
w.db.Unlock(c, actorIRI)
|
||
|
// Unlock must be called by now and every branch above.
|
||
|
}
|
||
|
}
|
||
|
if w.Accept != nil {
|
||
|
return w.Accept(c, a)
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// reject implements the federating Reject activity side effects.
|
||
|
func (w FederatingWrappedCallbacks) reject(c context.Context, a vocab.ActivityStreamsReject) error {
|
||
|
if w.Reject != nil {
|
||
|
return w.Reject(c, a)
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// add implements the federating Add activity side effects.
|
||
|
func (w FederatingWrappedCallbacks) add(c context.Context, a vocab.ActivityStreamsAdd) error {
|
||
|
op := a.GetActivityStreamsObject()
|
||
|
if op == nil || op.Len() == 0 {
|
||
|
return ErrObjectRequired
|
||
|
}
|
||
|
target := a.GetActivityStreamsTarget()
|
||
|
if target == nil || target.Len() == 0 {
|
||
|
return ErrTargetRequired
|
||
|
}
|
||
|
if err := add(c, op, target, w.db); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
if w.Add != nil {
|
||
|
return w.Add(c, a)
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// remove implements the federating Remove activity side effects.
|
||
|
func (w FederatingWrappedCallbacks) remove(c context.Context, a vocab.ActivityStreamsRemove) error {
|
||
|
op := a.GetActivityStreamsObject()
|
||
|
if op == nil || op.Len() == 0 {
|
||
|
return ErrObjectRequired
|
||
|
}
|
||
|
target := a.GetActivityStreamsTarget()
|
||
|
if target == nil || target.Len() == 0 {
|
||
|
return ErrTargetRequired
|
||
|
}
|
||
|
if err := remove(c, op, target, w.db); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
if w.Remove != nil {
|
||
|
return w.Remove(c, a)
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// like implements the federating Like activity side effects.
|
||
|
func (w FederatingWrappedCallbacks) like(c context.Context, a vocab.ActivityStreamsLike) error {
|
||
|
op := a.GetActivityStreamsObject()
|
||
|
if op == nil || op.Len() == 0 {
|
||
|
return ErrObjectRequired
|
||
|
}
|
||
|
id, err := GetId(a)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
// Create anonymous loop function to be able to properly scope the defer
|
||
|
// for the database lock at each iteration.
|
||
|
loopFn := func(iter vocab.ActivityStreamsObjectPropertyIterator) error {
|
||
|
objId, err := ToId(iter)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
if err := w.db.Lock(c, objId); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
defer w.db.Unlock(c, objId)
|
||
|
if owns, err := w.db.Owns(c, objId); err != nil {
|
||
|
return err
|
||
|
} else if !owns {
|
||
|
return nil
|
||
|
}
|
||
|
t, err := w.db.Get(c, objId)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
l, ok := t.(likeser)
|
||
|
if !ok {
|
||
|
return fmt.Errorf("cannot add Like to likes collection for type %T", t)
|
||
|
}
|
||
|
// Get 'likes' property on the object, creating default if
|
||
|
// necessary.
|
||
|
likes := l.GetActivityStreamsLikes()
|
||
|
if likes == nil {
|
||
|
likes = streams.NewActivityStreamsLikesProperty()
|
||
|
l.SetActivityStreamsLikes(likes)
|
||
|
}
|
||
|
// Get 'likes' value, defaulting to a collection.
|
||
|
likesT := likes.GetType()
|
||
|
if likesT == nil {
|
||
|
col := streams.NewActivityStreamsCollection()
|
||
|
likesT = col
|
||
|
likes.SetActivityStreamsCollection(col)
|
||
|
}
|
||
|
// Prepend the activity's 'id' on the 'likes' Collection or
|
||
|
// OrderedCollection.
|
||
|
if col, ok := likesT.(itemser); ok {
|
||
|
items := col.GetActivityStreamsItems()
|
||
|
if items == nil {
|
||
|
items = streams.NewActivityStreamsItemsProperty()
|
||
|
col.SetActivityStreamsItems(items)
|
||
|
}
|
||
|
items.PrependIRI(id)
|
||
|
} else if oCol, ok := likesT.(orderedItemser); ok {
|
||
|
oItems := oCol.GetActivityStreamsOrderedItems()
|
||
|
if oItems == nil {
|
||
|
oItems = streams.NewActivityStreamsOrderedItemsProperty()
|
||
|
oCol.SetActivityStreamsOrderedItems(oItems)
|
||
|
}
|
||
|
oItems.PrependIRI(id)
|
||
|
} else {
|
||
|
return fmt.Errorf("likes type is neither a Collection nor an OrderedCollection: %T", likesT)
|
||
|
}
|
||
|
err = w.db.Update(c, t)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
for iter := op.Begin(); iter != op.End(); iter = iter.Next() {
|
||
|
if err := loopFn(iter); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
}
|
||
|
if w.Like != nil {
|
||
|
return w.Like(c, a)
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// announce implements the federating Announce activity side effects.
|
||
|
func (w FederatingWrappedCallbacks) announce(c context.Context, a vocab.ActivityStreamsAnnounce) error {
|
||
|
id, err := GetId(a)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
op := a.GetActivityStreamsObject()
|
||
|
// Create anonymous loop function to be able to properly scope the defer
|
||
|
// for the database lock at each iteration.
|
||
|
loopFn := func(iter vocab.ActivityStreamsObjectPropertyIterator) error {
|
||
|
objId, err := ToId(iter)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
if err := w.db.Lock(c, objId); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
defer w.db.Unlock(c, objId)
|
||
|
if owns, err := w.db.Owns(c, objId); err != nil {
|
||
|
return err
|
||
|
} else if !owns {
|
||
|
return nil
|
||
|
}
|
||
|
t, err := w.db.Get(c, objId)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
s, ok := t.(shareser)
|
||
|
if !ok {
|
||
|
return fmt.Errorf("cannot add Announce to Shares collection for type %T", t)
|
||
|
}
|
||
|
// Get 'shares' property on the object, creating default if
|
||
|
// necessary.
|
||
|
shares := s.GetActivityStreamsShares()
|
||
|
if shares == nil {
|
||
|
shares = streams.NewActivityStreamsSharesProperty()
|
||
|
s.SetActivityStreamsShares(shares)
|
||
|
}
|
||
|
// Get 'shares' value, defaulting to a collection.
|
||
|
sharesT := shares.GetType()
|
||
|
if sharesT == nil {
|
||
|
col := streams.NewActivityStreamsCollection()
|
||
|
sharesT = col
|
||
|
shares.SetActivityStreamsCollection(col)
|
||
|
}
|
||
|
// Prepend the activity's 'id' on the 'shares' Collection or
|
||
|
// OrderedCollection.
|
||
|
if col, ok := sharesT.(itemser); ok {
|
||
|
items := col.GetActivityStreamsItems()
|
||
|
if items == nil {
|
||
|
items = streams.NewActivityStreamsItemsProperty()
|
||
|
col.SetActivityStreamsItems(items)
|
||
|
}
|
||
|
items.PrependIRI(id)
|
||
|
} else if oCol, ok := sharesT.(orderedItemser); ok {
|
||
|
oItems := oCol.GetActivityStreamsOrderedItems()
|
||
|
if oItems == nil {
|
||
|
oItems = streams.NewActivityStreamsOrderedItemsProperty()
|
||
|
oCol.SetActivityStreamsOrderedItems(oItems)
|
||
|
}
|
||
|
oItems.PrependIRI(id)
|
||
|
} else {
|
||
|
return fmt.Errorf("shares type is neither a Collection nor an OrderedCollection: %T", sharesT)
|
||
|
}
|
||
|
err = w.db.Update(c, t)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
if op != nil {
|
||
|
for iter := op.Begin(); iter != op.End(); iter = iter.Next() {
|
||
|
if err := loopFn(iter); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
if w.Announce != nil {
|
||
|
return w.Announce(c, a)
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// undo implements the federating Undo activity side effects.
|
||
|
func (w FederatingWrappedCallbacks) undo(c context.Context, a vocab.ActivityStreamsUndo) error {
|
||
|
op := a.GetActivityStreamsObject()
|
||
|
if op == nil || op.Len() == 0 {
|
||
|
return ErrObjectRequired
|
||
|
}
|
||
|
actors := a.GetActivityStreamsActor()
|
||
|
if err := mustHaveActivityActorsMatchObjectActors(c, actors, op, w.newTransport, w.inboxIRI); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
if w.Undo != nil {
|
||
|
return w.Undo(c, a)
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// block implements the federating Block activity side effects.
|
||
|
func (w FederatingWrappedCallbacks) block(c context.Context, a vocab.ActivityStreamsBlock) error {
|
||
|
op := a.GetActivityStreamsObject()
|
||
|
if op == nil || op.Len() == 0 {
|
||
|
return ErrObjectRequired
|
||
|
}
|
||
|
if w.Block != nil {
|
||
|
return w.Block(c, a)
|
||
|
}
|
||
|
return nil
|
||
|
}
|