mirror of
https://github.com/superseriousbusiness/gotosocial.git
synced 2025-01-16 11:15:43 +00:00
a483bd9e38
* add delivery worker type that pulls from queue to httpclient package * finish up some code commenting, bodge a vendored activity library change, integrate the deliverypool changes into transportcontroller * hook up queue deletion logic * support deleting queued http requests by target ID * don't index APRequest by hostname in the queue * use gorun * use the original context's values when wrapping msg type as delivery{} * actually log in the AP delivery worker ... * add uncommitted changes * use errors.AsV2() * use errorsv2.AsV2() * finish adding some code comments, add bad host handling to delivery workers * slightly tweak deliveryworkerpool API, use advanced sender multiplier * remove PopCtx() method, let others instead rely on Wait() * shuffle things around to move delivery stuff into transport/ subpkg * remove dead code * formatting * validate request before queueing for delivery * finish adding code comments, fix up backoff code * finish adding more code comments * clamp minimum no. senders to 1 * add start/stop logging to delivery worker, some slight changes * remove double logging * use worker ptrs * expose the embedded log fields in httpclient.Request{} * ensure request context values are preserved when updating ctx * add delivery worker tests * fix linter issues * ensure delivery worker gets inited in testrig * fix tests to delivering messages to check worker delivery queue * update error type to use ptr instead of value receiver * fix test calling Workers{}.Start() instead of testrig.StartWorkers() * update docs for advanced-sender-multiplier * update to the latest activity library version * add comment about not using httptest.Server{}
206 lines
5.1 KiB
Go
206 lines
5.1 KiB
Go
// GoToSocial
|
|
// Copyright (C) GoToSocial Authors admin@gotosocial.org
|
|
// SPDX-License-Identifier: AGPL-3.0-or-later
|
|
//
|
|
// This program is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Affero General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// This program is distributed in the hope that it will be useful,
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Affero General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Affero General Public License
|
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
package transport
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"net/http"
|
|
"net/url"
|
|
|
|
"codeberg.org/gruf/go-byteutil"
|
|
apiutil "github.com/superseriousbusiness/gotosocial/internal/api/util"
|
|
"github.com/superseriousbusiness/gotosocial/internal/config"
|
|
"github.com/superseriousbusiness/gotosocial/internal/gtscontext"
|
|
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
|
|
"github.com/superseriousbusiness/gotosocial/internal/httpclient"
|
|
"github.com/superseriousbusiness/gotosocial/internal/transport/delivery"
|
|
)
|
|
|
|
func (t *transport) BatchDeliver(ctx context.Context, obj map[string]interface{}, recipients []*url.URL) error {
|
|
var (
|
|
// accumulated delivery reqs.
|
|
reqs []*delivery.Delivery
|
|
|
|
// accumulated preparation errs.
|
|
errs gtserror.MultiError
|
|
|
|
// Get current instance host info.
|
|
domain = config.GetAccountDomain()
|
|
host = config.GetHost()
|
|
)
|
|
|
|
// Marshal object as JSON.
|
|
b, err := json.Marshal(obj)
|
|
if err != nil {
|
|
return gtserror.Newf("error marshaling json: %w", err)
|
|
}
|
|
|
|
// Extract object IDs.
|
|
actID := getActorID(obj)
|
|
objID := getObjectID(obj)
|
|
tgtID := getTargetID(obj)
|
|
|
|
for _, to := range recipients {
|
|
// Skip delivery to recipient if it is "us".
|
|
if to.Host == host || to.Host == domain {
|
|
continue
|
|
}
|
|
|
|
// Prepare http client request.
|
|
req, err := t.prepare(ctx,
|
|
actID,
|
|
objID,
|
|
tgtID,
|
|
b,
|
|
to,
|
|
)
|
|
if err != nil {
|
|
errs.Append(err)
|
|
continue
|
|
}
|
|
|
|
// Append to request queue.
|
|
reqs = append(reqs, req)
|
|
}
|
|
|
|
// Push prepared request list to the delivery queue.
|
|
t.controller.state.Workers.Delivery.Queue.Push(reqs...)
|
|
|
|
// Return combined err.
|
|
return errs.Combine()
|
|
}
|
|
|
|
func (t *transport) Deliver(ctx context.Context, obj map[string]interface{}, to *url.URL) error {
|
|
// if 'to' host is our own, skip as we don't need to deliver to ourselves...
|
|
if to.Host == config.GetHost() || to.Host == config.GetAccountDomain() {
|
|
return nil
|
|
}
|
|
|
|
// Marshal object as JSON.
|
|
b, err := json.Marshal(obj)
|
|
if err != nil {
|
|
return gtserror.Newf("error marshaling json: %w", err)
|
|
}
|
|
|
|
// Prepare http client request.
|
|
req, err := t.prepare(ctx,
|
|
getActorID(obj),
|
|
getObjectID(obj),
|
|
getTargetID(obj),
|
|
b,
|
|
to,
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Push prepared request to the delivery queue.
|
|
t.controller.state.Workers.Delivery.Queue.Push(req)
|
|
|
|
return nil
|
|
}
|
|
|
|
// prepare will prepare a POST http.Request{}
|
|
// to recipient at 'to', wrapping in a queued
|
|
// request object with signing function.
|
|
func (t *transport) prepare(
|
|
ctx context.Context,
|
|
actorID string,
|
|
objectID string,
|
|
targetID string,
|
|
data []byte,
|
|
to *url.URL,
|
|
) (
|
|
*delivery.Delivery,
|
|
error,
|
|
) {
|
|
url := to.String()
|
|
|
|
// Use rewindable reader for body.
|
|
var body byteutil.ReadNopCloser
|
|
body.Reset(data)
|
|
|
|
// Prepare POST signer.
|
|
sign := t.signPOST(data)
|
|
|
|
// Update to-be-used request context with signing details.
|
|
ctx = gtscontext.SetOutgoingPublicKeyID(ctx, t.pubKeyID)
|
|
ctx = gtscontext.SetHTTPClientSignFunc(ctx, sign)
|
|
|
|
// Prepare a new request with data body directed at URL.
|
|
r, err := http.NewRequestWithContext(ctx, "POST", url, &body)
|
|
if err != nil {
|
|
return nil, gtserror.Newf("error preparing request: %w", err)
|
|
}
|
|
|
|
// Set the standard ActivityPub content-type + charset headers.
|
|
r.Header.Add("Content-Type", string(apiutil.AppActivityLDJSON))
|
|
r.Header.Add("Accept-Charset", "utf-8")
|
|
|
|
// Validate the request before queueing for delivery.
|
|
if err := httpclient.ValidateRequest(r); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &delivery.Delivery{
|
|
ActorID: actorID,
|
|
ObjectID: objectID,
|
|
TargetID: targetID,
|
|
Request: httpclient.WrapRequest(r),
|
|
}, nil
|
|
}
|
|
|
|
// getObjectID extracts an object ID from 'serialized' ActivityPub object map.
|
|
func getObjectID(obj map[string]interface{}) string {
|
|
switch t := obj["object"].(type) {
|
|
case string:
|
|
return t
|
|
case map[string]interface{}:
|
|
id, _ := t["id"].(string)
|
|
return id
|
|
default:
|
|
return ""
|
|
}
|
|
}
|
|
|
|
// getActorID extracts an actor ID from 'serialized' ActivityPub object map.
|
|
func getActorID(obj map[string]interface{}) string {
|
|
switch t := obj["actor"].(type) {
|
|
case string:
|
|
return t
|
|
case map[string]interface{}:
|
|
id, _ := t["id"].(string)
|
|
return id
|
|
default:
|
|
return ""
|
|
}
|
|
}
|
|
|
|
// getTargetID extracts a target ID from 'serialized' ActivityPub object map.
|
|
func getTargetID(obj map[string]interface{}) string {
|
|
switch t := obj["target"].(type) {
|
|
case string:
|
|
return t
|
|
case map[string]interface{}:
|
|
id, _ := t["id"].(string)
|
|
return id
|
|
default:
|
|
return ""
|
|
}
|
|
}
|