add logging to worker processing functions in testrig, don't start workers in unexpected places

This commit is contained in:
kim 2024-04-25 15:25:33 +01:00
parent a055c5f749
commit 382790cf7f
3 changed files with 29 additions and 9 deletions

View file

@ -24,6 +24,5 @@ import (
// NewTestMediaManager returns a media handler with the default test config, and the given db and storage. // NewTestMediaManager returns a media handler with the default test config, and the given db and storage.
func NewTestMediaManager(state *state.State) *media.Manager { func NewTestMediaManager(state *state.State) *media.Manager {
StartNoopWorkers(state) // ensure started
return media.NewManager(state) return media.NewManager(state)
} }

View file

@ -31,8 +31,5 @@ import (
// The passed in state will have its worker functions set appropriately, // The passed in state will have its worker functions set appropriately,
// but the state will not be initialized. // but the state will not be initialized.
func NewTestProcessor(state *state.State, federator *federation.Federator, emailSender email.Sender, mediaManager *media.Manager) *processing.Processor { func NewTestProcessor(state *state.State, federator *federation.Federator, emailSender email.Sender, mediaManager *media.Manager) *processing.Processor {
p := processing.NewProcessor(cleaner.New(state), typeutils.NewConverter(state), federator, NewTestOauthServer(state.DB), mediaManager, state, emailSender) return processing.NewProcessor(cleaner.New(state), typeutils.NewConverter(state), federator, NewTestOauthServer(state.DB), mediaManager, state, emailSender)
state.Workers.Client.Process = p.Workers().ProcessFromClientAPI
state.Workers.Federator.Process = p.Workers().ProcessFromFediAPI
return p
} }

View file

@ -27,7 +27,10 @@ import (
"os" "os"
"time" "time"
"codeberg.org/gruf/go-byteutil"
"codeberg.org/gruf/go-kv/format"
"github.com/superseriousbusiness/gotosocial/internal/filter/visibility" "github.com/superseriousbusiness/gotosocial/internal/filter/visibility"
"github.com/superseriousbusiness/gotosocial/internal/log"
"github.com/superseriousbusiness/gotosocial/internal/messages" "github.com/superseriousbusiness/gotosocial/internal/messages"
tlprocessor "github.com/superseriousbusiness/gotosocial/internal/processing/timeline" tlprocessor "github.com/superseriousbusiness/gotosocial/internal/processing/timeline"
"github.com/superseriousbusiness/gotosocial/internal/processing/workers" "github.com/superseriousbusiness/gotosocial/internal/processing/workers"
@ -39,8 +42,15 @@ import (
// Starts workers on the provided state using noop processing functions. // Starts workers on the provided state using noop processing functions.
// Useful when you *don't* want to trigger side effects in a test. // Useful when you *don't* want to trigger side effects in a test.
func StartNoopWorkers(state *state.State) { func StartNoopWorkers(state *state.State) {
state.Workers.Client.Process = func(context.Context, *messages.FromClientAPI) error { return nil } state.Workers.Client.Process = func(ctx context.Context, msg *messages.FromClientAPI) error {
state.Workers.Federator.Process = func(context.Context, *messages.FromFediAPI) error { return nil } log.Debugf(ctx, "Workers{}.Client{}.Noop(%s)", dump(msg))
return nil // noop
}
state.Workers.Federator.Process = func(ctx context.Context, msg *messages.FromFediAPI) error {
log.Debugf(ctx, "Workers{}.Federator{}.Noop(%s)", dump(msg))
return nil // noop
}
state.Workers.Client.Init(messages.ClientMsgIndices()) state.Workers.Client.Init(messages.ClientMsgIndices())
state.Workers.Federator.Init(messages.FederatorMsgIndices()) state.Workers.Federator.Init(messages.FederatorMsgIndices())
@ -56,8 +66,15 @@ func StartNoopWorkers(state *state.State) {
// Starts workers on the provided state using processing functions from the given // Starts workers on the provided state using processing functions from the given
// workers processor. Useful when you *do* want to trigger side effects in a test. // workers processor. Useful when you *do* want to trigger side effects in a test.
func StartWorkers(state *state.State, processor *workers.Processor) { func StartWorkers(state *state.State, processor *workers.Processor) {
state.Workers.Client.Process = processor.ProcessFromClientAPI state.Workers.Client.Process = func(ctx context.Context, msg *messages.FromClientAPI) error {
state.Workers.Federator.Process = processor.ProcessFromFediAPI log.Debugf(ctx, "Workers{}.Client{}.Process(%s)", dump(msg))
return processor.ProcessFromClientAPI(ctx, msg)
}
state.Workers.Federator.Process = func(ctx context.Context, msg *messages.FromFediAPI) error {
log.Debugf(ctx, "Workers{}.Federator{}.Process(%s)", dump(msg))
return processor.ProcessFromFediAPI(ctx, msg)
}
state.Workers.Client.Init(messages.ClientMsgIndices()) state.Workers.Client.Init(messages.ClientMsgIndices())
state.Workers.Federator.Init(messages.FederatorMsgIndices()) state.Workers.Federator.Init(messages.FederatorMsgIndices())
@ -244,3 +261,10 @@ func WaitFor(condition func() bool) bool {
} }
} }
} }
// dump returns debug output of 'v'.
func dump(v any) string {
var buf byteutil.Buffer
format.Append(&buf, v)
return buf.String()
}