fix tests to delivering messages to check worker delivery queue

This commit is contained in:
kim 2024-04-08 12:21:19 +01:00
parent 39e6d05b6c
commit 3880020166
6 changed files with 168 additions and 74 deletions

View file

@ -21,6 +21,7 @@ import (
"bytes"
"context"
"encoding/json"
"io"
"net/url"
"testing"
"time"
@ -129,23 +130,27 @@ func (suite *FederatingActorTestSuite) TestSendRemoteFollower() {
suite.NotNil(activity)
// because we added 1 remote follower for zork, there should be a url in sentMessage
var sent [][]byte
var sent []byte
if !testrig.WaitFor(func() bool {
sentI, ok := httpClient.SentMessages.Load(*testRemoteAccount.SharedInboxURI)
if ok {
sent, ok = sentI.([][]byte)
if !ok {
panic("SentMessages entry was not []byte")
}
return true
delivery, ok := suite.state.Workers.Delivery.Queue.Pop()
if !ok {
return false
}
return false
if !testrig.EqualRequestURIs(delivery.Request.URL, *testRemoteAccount.SharedInboxURI) {
panic("differing request uris")
}
sent, err = io.ReadAll(delivery.Request.Body)
if err != nil {
panic("error reading body: " + err.Error())
}
return true
}) {
suite.FailNow("timed out waiting for message")
}
dst := new(bytes.Buffer)
err = json.Indent(dst, sent[0], "", " ")
err = json.Indent(dst, sent, "", " ")
suite.NoError(err)
suite.Equal(`{
"@context": "https://www.w3.org/ns/activitystreams",

View file

@ -21,6 +21,7 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"testing"
"time"
@ -55,7 +56,7 @@ func (suite *AccountTestSuite) TestAccountDeleteLocal() {
suite.NoError(errWithCode)
// the delete should be federated outwards to the following account's inbox
var sent [][]byte
var sent []byte
delete := new(struct {
Actor string `json:"actor"`
ID string `json:"id"`
@ -66,16 +67,22 @@ func (suite *AccountTestSuite) TestAccountDeleteLocal() {
})
if !testrig.WaitFor(func() bool {
sentI, ok := suite.httpClient.SentMessages.Load(*followingAccount.SharedInboxURI)
if ok {
sent, ok = sentI.([][]byte)
if !ok {
panic("SentMessages entry was not [][]byte")
}
err = json.Unmarshal(sent[0], delete)
return err == nil
delivery, ok := suite.state.Workers.Delivery.Queue.Pop()
if !ok {
return false
}
return false
if !testrig.EqualRequestURIs(delivery.Request.URL, *followingAccount.SharedInboxURI) {
panic("differing request uris")
}
sent, err = io.ReadAll(delivery.Request.Body)
if err != nil {
panic("error reading body: " + err.Error())
}
err = json.Unmarshal(sent, delete)
if err != nil {
panic("error unmarshaling json: " + err.Error())
}
return true
}) {
suite.FailNow("timed out waiting for message")
}

View file

@ -21,6 +21,7 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"testing"
"time"
@ -77,22 +78,6 @@ func (suite *FollowRequestTestSuite) TestFollowRequestAccept() {
Note: "",
}, relationship)
// accept should be sent to Some_User
var sent [][]byte
if !testrig.WaitFor(func() bool {
sentI, ok := suite.httpClient.SentMessages.Load(targetAccount.InboxURI)
if ok {
sent, ok = sentI.([][]byte)
if !ok {
panic("SentMessages entry was not []byte")
}
return true
}
return false
}) {
suite.FailNow("timed out waiting for message")
}
accept := &struct {
Actor string `json:"actor"`
ID string `json:"id"`
@ -106,8 +91,29 @@ func (suite *FollowRequestTestSuite) TestFollowRequestAccept() {
To string `json:"to"`
Type string `json:"type"`
}{}
err = json.Unmarshal(sent[0], accept)
suite.NoError(err)
// accept should be sent to Some_User
var sent []byte
if !testrig.WaitFor(func() bool {
delivery, ok := suite.state.Workers.Delivery.Queue.Pop()
if !ok {
return false
}
if !testrig.EqualRequestURIs(delivery.Request.URL, targetAccount.InboxURI) {
panic("differing request uris")
}
sent, err = io.ReadAll(delivery.Request.Body)
if err != nil {
panic("error reading body: " + err.Error())
}
err = json.Unmarshal(sent, accept)
if err != nil {
panic("error unmarshaling json: " + err.Error())
}
return true
}) {
suite.FailNow("timed out waiting for message")
}
suite.Equal(requestingAccount.URI, accept.Actor)
suite.Equal(targetAccount.URI, accept.Object.Actor)
@ -144,22 +150,6 @@ func (suite *FollowRequestTestSuite) TestFollowRequestReject() {
suite.NoError(errWithCode)
suite.EqualValues(&apimodel.Relationship{ID: "01FHMQX3GAABWSM0S2VZEC2SWC", Following: false, ShowingReblogs: false, Notifying: false, FollowedBy: false, Blocking: false, BlockedBy: false, Muting: false, MutingNotifications: false, Requested: false, DomainBlocking: false, Endorsed: false, Note: ""}, relationship)
// reject should be sent to Some_User
var sent [][]byte
if !testrig.WaitFor(func() bool {
sentI, ok := suite.httpClient.SentMessages.Load(targetAccount.InboxURI)
if ok {
sent, ok = sentI.([][]byte)
if !ok {
panic("SentMessages entry was not []byte")
}
return true
}
return false
}) {
suite.FailNow("timed out waiting for message")
}
reject := &struct {
Actor string `json:"actor"`
ID string `json:"id"`
@ -173,8 +163,29 @@ func (suite *FollowRequestTestSuite) TestFollowRequestReject() {
To string `json:"to"`
Type string `json:"type"`
}{}
err = json.Unmarshal(sent[0], reject)
suite.NoError(err)
// reject should be sent to Some_User
var sent []byte
if !testrig.WaitFor(func() bool {
delivery, ok := suite.state.Workers.Delivery.Queue.Pop()
if !ok {
return false
}
if !testrig.EqualRequestURIs(delivery.Request.URL, targetAccount.InboxURI) {
panic("differing request uris")
}
sent, err = io.ReadAll(delivery.Request.Body)
if err != nil {
panic("error reading body: " + err.Error())
}
err = json.Unmarshal(sent, reject)
if err != nil {
panic("error unmarshaling json: " + err.Error())
}
return true
}) {
suite.FailNow("timed out waiting for message")
}
suite.Equal(requestingAccount.URI, reject.Actor)
suite.Equal(targetAccount.URI, reject.Object.Actor)

View file

@ -21,6 +21,7 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"testing"
"time"
@ -457,22 +458,6 @@ func (suite *FromFediAPITestSuite) TestProcessFollowRequestUnlocked() {
})
suite.NoError(err)
// an accept message should be sent to satan's inbox
var sent [][]byte
if !testrig.WaitFor(func() bool {
sentI, ok := suite.httpClient.SentMessages.Load(*originAccount.SharedInboxURI)
if ok {
sent, ok = sentI.([][]byte)
if !ok {
panic("SentMessages entry was not []byte")
}
return true
}
return false
}) {
suite.FailNow("timed out waiting for message")
}
accept := &struct {
Actor string `json:"actor"`
ID string `json:"id"`
@ -486,8 +471,29 @@ func (suite *FromFediAPITestSuite) TestProcessFollowRequestUnlocked() {
To string `json:"to"`
Type string `json:"type"`
}{}
err = json.Unmarshal(sent[0], accept)
suite.NoError(err)
// an accept message should be sent to satan's inbox
var sent []byte
if !testrig.WaitFor(func() bool {
delivery, ok := suite.state.Workers.Delivery.Queue.Pop()
if !ok {
return false
}
if !testrig.EqualRequestURIs(delivery.Request.URL, *originAccount.SharedInboxURI) {
panic("differing request uris")
}
sent, err = io.ReadAll(delivery.Request.Body)
if err != nil {
panic("error reading body: " + err.Error())
}
err = json.Unmarshal(sent, accept)
if err != nil {
panic("error unmarshaling json: " + err.Error())
}
return true
}) {
suite.FailNow("timed out waiting for message")
}
suite.Equal(targetAccount.URI, accept.Actor)
suite.Equal(originAccount.URI, accept.Object.Actor)

View file

@ -155,6 +155,9 @@ func (w *Worker) Stop() bool {
// run wraps process to restart on any panic.
func (w *Worker) run(ctx context.Context) {
if w.Client == nil || w.Queue == nil {
panic("not yet initialized")
}
log.Infof(ctx, "%p: started delivery worker", w)
defer log.Infof(ctx, "%p: stopped delivery worker", w)
for returned := false; !returned; {
@ -173,6 +176,10 @@ func (w *Worker) run(ctx context.Context) {
// process is the main delivery worker processing routine.
func (w *Worker) process(ctx context.Context) {
if w.Client == nil || w.Queue == nil {
// we perform this check here just
// to ensure the compiler knows these
// variables aren't nil in the loop,
// even if already checked by caller.
panic("not yet initialized")
}

View file

@ -97,6 +97,64 @@ func StartTimelines(state *state.State, filter *visibility.Filter, converter *ty
}
}
// EqualRequestURIs checks whether inputs have equal request URIs,
// handling cases of url.URL{}, *url.URL{}, string, *string.
func EqualRequestURIs(u1, u2 any) bool {
var uri1, uri2 string
requestURI := func(in string) (string, error) {
u, err := url.Parse(in)
if err != nil {
return "", err
}
return u.RequestURI(), nil
}
switch u1 := u1.(type) {
case url.URL:
uri1 = u1.RequestURI()
case *url.URL:
uri1 = u1.RequestURI()
case *string:
var err error
uri1, err = requestURI(*u1)
if err != nil {
return false
}
case string:
var err error
uri1, err = requestURI(u1)
if err != nil {
return false
}
default:
panic("unsupported type")
}
switch u2 := u2.(type) {
case url.URL:
uri2 = u2.RequestURI()
case *url.URL:
uri2 = u2.RequestURI()
case *string:
var err error
uri2, err = requestURI(*u2)
if err != nil {
return false
}
case string:
var err error
uri2, err = requestURI(u2)
if err != nil {
return false
}
default:
panic("unsupported type")
}
return uri1 == uri2
}
// CreateMultipartFormData is a handy function for taking a fieldname and a filename, and creating a multipart form bytes buffer
// with the file contents set in the given fieldname. The extraFields param can be used to add extra FormFields to the request, as necessary.
// The returned bytes.Buffer b can be used like so: