diff --git a/cmd/gotosocial/action/server/server.go b/cmd/gotosocial/action/server/server.go index d35e0375b..100897a41 100644 --- a/cmd/gotosocial/action/server/server.go +++ b/cmd/gotosocial/action/server/server.go @@ -48,6 +48,7 @@ import ( tlprocessor "github.com/superseriousbusiness/gotosocial/internal/processing/timeline" "github.com/superseriousbusiness/gotosocial/internal/timeline" "github.com/superseriousbusiness/gotosocial/internal/tracing" + "github.com/superseriousbusiness/gotosocial/internal/webpush" "go.uber.org/automaxprocs/maxprocs" "github.com/superseriousbusiness/gotosocial/internal/config" @@ -260,6 +261,9 @@ var Start action.GTSAction = func(ctx context.Context) error { } } + // Create a Web Push notification sender. + webPushSender := webpush.NewSender(client, state) + // Initialize both home / list timelines. state.Timelines.Home = timeline.NewManager( tlprocessor.HomeTimelineGrab(state), @@ -316,6 +320,7 @@ var Start action.GTSAction = func(ctx context.Context) error { mediaManager, state, emailSender, + webPushSender, visFilter, intFilter, ) diff --git a/cmd/gotosocial/action/testrig/testrig.go b/cmd/gotosocial/action/testrig/testrig.go index 19588c70a..daf3a5a41 100644 --- a/cmd/gotosocial/action/testrig/testrig.go +++ b/cmd/gotosocial/action/testrig/testrig.go @@ -51,6 +51,7 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/tracing" "github.com/superseriousbusiness/gotosocial/internal/typeutils" "github.com/superseriousbusiness/gotosocial/internal/web" + "github.com/superseriousbusiness/gotosocial/internal/webpush" "github.com/superseriousbusiness/gotosocial/testrig" ) @@ -173,6 +174,7 @@ var Start action.GTSAction = func(ctx context.Context) error { federator := testrig.NewTestFederator(state, transportController, mediaManager) emailSender := testrig.NewEmailSender("./web/template/", nil) + webPushSender := webpush.NewMockSender() typeConverter := typeutils.NewConverter(state) filter := visibility.NewFilter(state) @@ -196,7 +198,7 @@ var Start action.GTSAction = func(ctx context.Context) error { return fmt.Errorf("error starting list timeline: %s", err) } - processor := testrig.NewTestProcessor(state, federator, emailSender, mediaManager) + processor := testrig.NewTestProcessor(state, federator, emailSender, webPushSender, mediaManager) // Initialize workers. testrig.StartWorkers(state, processor.Workers()) diff --git a/internal/api/model/pushnotification.go b/internal/api/model/pushnotification.go new file mode 100644 index 000000000..602e3d20c --- /dev/null +++ b/internal/api/model/pushnotification.go @@ -0,0 +1,52 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package model + +// PushNotification represents a notification summary delivered to the client by the Web Push server. +// It does not contain an entire Notification, just the NotificationID and some preview information. +// It is not used in the client API directly. +// +// swagger:model pushNotification +type PushNotification struct { + // NotificationID is the Notification.ID of the referenced Notification. + NotificationID string `json:"notification_id"` + + // NotificationType is the Notification.Type of the referenced Notification. + NotificationType string `json:"notification_type"` + + // Title is a title for the notification, + // generally describing an action taken by a user. + Title string `json:"title"` + + // Body is a preview of the notification body, + // such as the first line of a status's CW or text, + // or the first line of an account bio. + Body string `json:"body"` + + // Icon is an image URL that can be displayed with the notification, + // normally the account's avatar. + Icon string `json:"icon"` + + // PreferredLocale is a BCP 47 language tag for the receiving user's locale. + PreferredLocale string `json:"preferred_locale"` + + // AccessToken is the access token associated with the Web Push subscription. + // I don't know why this is sent, given that the client should know that already, + // but Feditext does use it. + AccessToken string `json:"access_token"` +} diff --git a/internal/processing/processor.go b/internal/processing/processor.go index ce0f1cfb8..209129f8b 100644 --- a/internal/processing/processor.go +++ b/internal/processing/processor.go @@ -50,6 +50,7 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/state" "github.com/superseriousbusiness/gotosocial/internal/text" "github.com/superseriousbusiness/gotosocial/internal/typeutils" + "github.com/superseriousbusiness/gotosocial/internal/webpush" ) // Processor groups together processing functions and @@ -186,6 +187,7 @@ func NewProcessor( mediaManager *mm.Manager, state *state.State, emailSender email.Sender, + webPushSender webpush.Sender, visFilter *visibility.Filter, intFilter *interaction.Filter, ) *Processor { @@ -239,6 +241,7 @@ func NewProcessor( converter, visFilter, emailSender, + webPushSender, &processor.account, &processor.media, &processor.stream, diff --git a/internal/processing/workers/surface.go b/internal/processing/workers/surface.go index 4f6597b9a..4dc58c433 100644 --- a/internal/processing/workers/surface.go +++ b/internal/processing/workers/surface.go @@ -24,6 +24,7 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/processing/stream" "github.com/superseriousbusiness/gotosocial/internal/state" "github.com/superseriousbusiness/gotosocial/internal/typeutils" + "github.com/superseriousbusiness/gotosocial/internal/webpush" ) // Surface wraps functions for 'surfacing' the result @@ -38,5 +39,6 @@ type Surface struct { Stream *stream.Processor VisFilter *visibility.Filter EmailSender email.Sender + WebPushSender webpush.Sender Conversations *conversations.Processor } diff --git a/internal/processing/workers/surfacenotify.go b/internal/processing/workers/surfacenotify.go index 7773e80d3..fdbd5e3c1 100644 --- a/internal/processing/workers/surfacenotify.go +++ b/internal/processing/workers/surfacenotify.go @@ -647,5 +647,10 @@ func (s *Surface) Notify( } s.Stream.Notify(ctx, targetAccount, apiNotif) + // Send Web Push notification to the user. + if err = s.WebPushSender.Send(ctx, notif, filters, compiledMutes); err != nil { + return gtserror.Newf("error sending Web Push notifications: %w", err) + } + return nil } diff --git a/internal/processing/workers/workers.go b/internal/processing/workers/workers.go index ad673481b..9f37f554e 100644 --- a/internal/processing/workers/workers.go +++ b/internal/processing/workers/workers.go @@ -28,6 +28,7 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/processing/stream" "github.com/superseriousbusiness/gotosocial/internal/state" "github.com/superseriousbusiness/gotosocial/internal/typeutils" + "github.com/superseriousbusiness/gotosocial/internal/webpush" "github.com/superseriousbusiness/gotosocial/internal/workers" ) @@ -44,6 +45,7 @@ func New( converter *typeutils.Converter, visFilter *visibility.Filter, emailSender email.Sender, + webPushSender webpush.Sender, account *account.Processor, media *media.Processor, stream *stream.Processor, @@ -65,6 +67,7 @@ func New( Stream: stream, VisFilter: visFilter, EmailSender: emailSender, + WebPushSender: webPushSender, Conversations: conversations, } diff --git a/internal/webpush/mocksender.go b/internal/webpush/mocksender.go new file mode 100644 index 000000000..c8aac301e --- /dev/null +++ b/internal/webpush/mocksender.go @@ -0,0 +1,47 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package webpush + +import ( + "context" + + "github.com/superseriousbusiness/gotosocial/internal/filter/usermute" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" +) + +// MockSender collects a map of notifications sent to each account ID. +// This should only be used in tests. +type MockSender struct { + Sent map[string][]*gtsmodel.Notification +} + +func NewMockSender() *MockSender { + return &MockSender{ + Sent: map[string][]*gtsmodel.Notification{}, + } +} + +func (m *MockSender) Send( + ctx context.Context, + notification *gtsmodel.Notification, + filters []*gtsmodel.Filter, + mutes *usermute.CompiledUserMuteList, +) error { + m.Sent[notification.TargetAccountID] = append(m.Sent[notification.TargetAccountID], notification) + return nil +} diff --git a/internal/webpush/noopsender.go b/internal/webpush/noopsender.go new file mode 100644 index 000000000..2676a9e89 --- /dev/null +++ b/internal/webpush/noopsender.go @@ -0,0 +1,42 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package webpush + +import ( + "context" + + "github.com/superseriousbusiness/gotosocial/internal/filter/usermute" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" +) + +// noopSender drops anything sent to it. +// This should only be used in tests. +type noopSender struct{} + +func NewNoopSender() Sender { + return &noopSender{} +} + +func (n *noopSender) Send( + ctx context.Context, + notification *gtsmodel.Notification, + filters []*gtsmodel.Filter, + mutes *usermute.CompiledUserMuteList, +) error { + return nil +} diff --git a/internal/webpush/realsender.go b/internal/webpush/realsender.go new file mode 100644 index 000000000..21a0bdba8 --- /dev/null +++ b/internal/webpush/realsender.go @@ -0,0 +1,249 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package webpush + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "time" + + webpushgo "github.com/SherClockHolmes/webpush-go" + apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" + "github.com/superseriousbusiness/gotosocial/internal/filter/usermute" + "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/httpclient" + "github.com/superseriousbusiness/gotosocial/internal/log" + "github.com/superseriousbusiness/gotosocial/internal/state" + "github.com/superseriousbusiness/gotosocial/internal/text" + "github.com/superseriousbusiness/gotosocial/internal/typeutils" +) + +// realSender is the production Web Push sender, backed by an HTTP client, DB, and worker pool. +type realSender struct { + httpClient *http.Client + state *state.State + tc *typeutils.Converter +} + +// NewRealSender creates a Sender from an http.Client instead of an httpclient.Client. +// This should only be used by NewSender and in tests. +func NewRealSender(httpClient *http.Client, state *state.State) Sender { + return &realSender{ + httpClient: httpClient, + state: state, + tc: typeutils.NewConverter(state), + } +} + +// TTL is an arbitrary time to ask the Web Push server to store notifications +// while waiting for the client to retrieve them. +const TTL = 48 * time.Hour + +func (r *realSender) Send( + ctx context.Context, + notification *gtsmodel.Notification, + filters []*gtsmodel.Filter, + mutes *usermute.CompiledUserMuteList, +) error { + // Load subscriptions. + subscriptions, err := r.state.DB.GetWebPushSubscriptionsByAccountID(ctx, notification.TargetAccountID) + if err != nil { + return gtserror.Newf( + "error getting Web Push subscriptions for account %s: %w", + notification.TargetAccountID, + err, + ) + } + if len(subscriptions) == 0 { + return nil + } + + // Subscriptions we're actually going to send to. + relevantSubscriptions := make([]*gtsmodel.WebPushSubscription, 0, len(subscriptions)) + for _, subscription := range subscriptions { + // Check whether this subscription wants this type of notification. + notify := false + switch notification.NotificationType { + case gtsmodel.NotificationFollow: + notify = *subscription.NotifyFollow + case gtsmodel.NotificationFollowRequest: + notify = *subscription.NotifyFollowRequest + case gtsmodel.NotificationMention: + notify = *subscription.NotifyMention + case gtsmodel.NotificationReblog: + notify = *subscription.NotifyReblog + case gtsmodel.NotificationFavourite: + notify = *subscription.NotifyFavourite + case gtsmodel.NotificationPoll: + notify = *subscription.NotifyPoll + case gtsmodel.NotificationStatus: + notify = *subscription.NotifyStatus + case gtsmodel.NotificationAdminSignup: + notify = *subscription.NotifyAdminSignup + case gtsmodel.NotificationAdminReport: + notify = *subscription.NotifyAdminReport + case gtsmodel.NotificationPendingFave: + notify = *subscription.NotifyPendingFave + case gtsmodel.NotificationPendingReply: + notify = *subscription.NotifyPendingReply + case gtsmodel.NotificationPendingReblog: + notify = *subscription.NotifyPendingReblog + default: + log.Errorf( + ctx, + "notification type not supported by Web Push subscriptions: %v", + notification.NotificationType, + ) + continue + } + if !notify { + continue + } + relevantSubscriptions = append(relevantSubscriptions, subscription) + } + if len(relevantSubscriptions) == 0 { + return nil + } + + // Load VAPID keys into webpush-go options struct. + vapidKeyPair, err := r.state.DB.GetVAPIDKeyPair(ctx) + if err != nil { + return gtserror.Newf("error getting VAPID key pair: %w", err) + } + + // Get API representations of notification and accounts involved. + // This also loads the target account's settings. + apiNotification, err := r.tc.NotificationToAPINotification(ctx, notification, filters, mutes) + if err != nil { + return gtserror.Newf("error converting notification %s to API representation: %w", notification.ID, err) + } + + // Queue up a .Send() call for each relevant subscription. + for _, subscription := range relevantSubscriptions { + r.state.Workers.WebPush.Queue.Push(func(ctx context.Context) { + if err := r.sendToSubscription( + ctx, + vapidKeyPair, + subscription, + notification.TargetAccount, + apiNotification, + ); err != nil { + log.Errorf( + ctx, + "error sending Web Push notification for subscription with token ID %s: %v", + subscription.TokenID, + err, + ) + } + }) + } + + return nil +} + +// sendToSubscription sends a notification to a single Web Push subscription. +func (r *realSender) sendToSubscription( + ctx context.Context, + vapidKeyPair *gtsmodel.VAPIDKeyPair, + subscription *gtsmodel.WebPushSubscription, + targetAccount *gtsmodel.Account, + apiNotification *apimodel.Notification, +) error { + // Get the associated access token. + token, err := r.state.DB.GetTokenByID(ctx, subscription.TokenID) + if err != nil { + return gtserror.Newf("error getting token %s: %w", subscription.TokenID, err) + } + + // Create push notification payload struct. + pushNotification := &apimodel.PushNotification{ + NotificationID: apiNotification.ID, + NotificationType: apiNotification.Type, + Icon: apiNotification.Account.Avatar, + PreferredLocale: targetAccount.Settings.Language, + AccessToken: token.Access, + } + + // Set the notification title. + displayNameOrAcct := apiNotification.Account.DisplayName + if displayNameOrAcct == "" { + displayNameOrAcct = apiNotification.Account.Acct + } + // TODO: (Vyr) improve copy + pushNotification.Title = fmt.Sprintf("%s from %s", apiNotification.Type, displayNameOrAcct) + + // Set the notification body. + if apiNotification.Status != nil { + if apiNotification.Status.SpoilerText != "" { + pushNotification.Body = apiNotification.Status.SpoilerText + } else { + pushNotification.Body = text.SanitizeToPlaintext(apiNotification.Status.Content) + } + } else { + pushNotification.Body = text.SanitizeToPlaintext(apiNotification.Account.Note) + } + // TODO: (Vyr) trim this + + // Encode the push notification as JSON. + pushNotificationBytes, err := json.Marshal(pushNotification) + if err != nil { + return gtserror.Newf("error encoding Web Push notification: %w", err) + } + + // Send push notification. + resp, err := webpushgo.SendNotificationWithContext( + ctx, + pushNotificationBytes, + &webpushgo.Subscription{ + Endpoint: subscription.Endpoint, + Keys: webpushgo.Keys{ + Auth: subscription.Auth, + P256dh: subscription.P256dh, + }, + }, + &webpushgo.Options{ + HTTPClient: r.httpClient, + VAPIDPublicKey: vapidKeyPair.Public, + VAPIDPrivateKey: vapidKeyPair.Private, + TTL: int(TTL.Seconds()), + }, + ) + if err != nil { + return gtserror.Newf("error sending Web Push notification: %w", err) + } + // We're not going to use the response body, but we need to close it so we don't leak the connection. + _ = resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return gtserror.Newf("unexpected HTTP status received when sending Web Push notification: %s", resp.Status) + } + + return nil +} + +// gtsHttpClientRoundTripper helps wrap a GtS HTTP client back into a regular HTTP client, +// so that webpush-go can use our IP filters, bad hosts list, and retries. +type gtsHttpClientRoundTripper struct { + httpClient *httpclient.Client +} + +func (r *gtsHttpClientRoundTripper) RoundTrip(request *http.Request) (*http.Response, error) { + return r.httpClient.Do(request) +} diff --git a/internal/webpush/realsender_test.go b/internal/webpush/realsender_test.go new file mode 100644 index 000000000..49785ea5c --- /dev/null +++ b/internal/webpush/realsender_test.go @@ -0,0 +1,217 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package webpush_test + +import ( + "context" + "io" + "net/http" + "testing" + "time" + + "github.com/stretchr/testify/suite" + "github.com/superseriousbusiness/gotosocial/internal/cleaner" + "github.com/superseriousbusiness/gotosocial/internal/db" + "github.com/superseriousbusiness/gotosocial/internal/email" + "github.com/superseriousbusiness/gotosocial/internal/federation" + "github.com/superseriousbusiness/gotosocial/internal/filter/interaction" + "github.com/superseriousbusiness/gotosocial/internal/filter/visibility" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/media" + "github.com/superseriousbusiness/gotosocial/internal/oauth" + "github.com/superseriousbusiness/gotosocial/internal/processing" + "github.com/superseriousbusiness/gotosocial/internal/state" + "github.com/superseriousbusiness/gotosocial/internal/storage" + "github.com/superseriousbusiness/gotosocial/internal/transport" + "github.com/superseriousbusiness/gotosocial/internal/typeutils" + "github.com/superseriousbusiness/gotosocial/internal/webpush" + "github.com/superseriousbusiness/gotosocial/testrig" +) + +type RealSenderStandardTestSuite struct { + suite.Suite + db db.DB + storage *storage.Driver + state state.State + mediaManager *media.Manager + typeconverter *typeutils.Converter + httpClient *testrig.MockHTTPClient + transportController transport.Controller + federator *federation.Federator + oauthServer oauth.Server + emailSender email.Sender + webPushSender webpush.Sender + + // standard suite models + testTokens map[string]*gtsmodel.Token + testClients map[string]*gtsmodel.Client + testApplications map[string]*gtsmodel.Application + testUsers map[string]*gtsmodel.User + testAccounts map[string]*gtsmodel.Account + testAttachments map[string]*gtsmodel.MediaAttachment + testStatuses map[string]*gtsmodel.Status + testTags map[string]*gtsmodel.Tag + testMentions map[string]*gtsmodel.Mention + testEmojis map[string]*gtsmodel.Emoji + testNotifications map[string]*gtsmodel.Notification + testWebPushSubscriptions map[string]*gtsmodel.WebPushSubscription + + processor *processing.Processor + + webPushHttpClientDo func(request *http.Request) (*http.Response, error) +} + +func (suite *RealSenderStandardTestSuite) SetupSuite() { + suite.testTokens = testrig.NewTestTokens() + suite.testClients = testrig.NewTestClients() + suite.testApplications = testrig.NewTestApplications() + suite.testUsers = testrig.NewTestUsers() + suite.testAccounts = testrig.NewTestAccounts() + suite.testAttachments = testrig.NewTestAttachments() + suite.testStatuses = testrig.NewTestStatuses() + suite.testTags = testrig.NewTestTags() + suite.testMentions = testrig.NewTestMentions() + suite.testEmojis = testrig.NewTestEmojis() + suite.testNotifications = testrig.NewTestNotifications() + suite.testWebPushSubscriptions = testrig.NewTestWebPushSubscriptions() +} + +func (suite *RealSenderStandardTestSuite) SetupTest() { + suite.state.Caches.Init() + + testrig.InitTestConfig() + testrig.InitTestLog() + + suite.db = testrig.NewTestDB(&suite.state) + suite.state.DB = suite.db + suite.storage = testrig.NewInMemoryStorage() + suite.state.Storage = suite.storage + suite.typeconverter = typeutils.NewConverter(&suite.state) + + testrig.StartTimelines( + &suite.state, + visibility.NewFilter(&suite.state), + suite.typeconverter, + ) + + suite.httpClient = testrig.NewMockHTTPClient(nil, "../../testrig/media") + suite.httpClient.TestRemotePeople = testrig.NewTestFediPeople() + suite.httpClient.TestRemoteStatuses = testrig.NewTestFediStatuses() + + suite.transportController = testrig.NewTestTransportController(&suite.state, suite.httpClient) + suite.mediaManager = testrig.NewTestMediaManager(&suite.state) + suite.federator = testrig.NewTestFederator(&suite.state, suite.transportController, suite.mediaManager) + suite.oauthServer = testrig.NewTestOauthServer(suite.db) + suite.emailSender = testrig.NewEmailSender("../../web/template/", nil) + + suite.webPushSender = webpush.NewRealSender( + &http.Client{ + Transport: suite, + }, + &suite.state, + ) + + suite.processor = processing.NewProcessor( + cleaner.New(&suite.state), + suite.typeconverter, + suite.federator, + suite.oauthServer, + suite.mediaManager, + &suite.state, + suite.emailSender, + suite.webPushSender, + visibility.NewFilter(&suite.state), + interaction.NewFilter(&suite.state), + ) + testrig.StartWorkers(&suite.state, suite.processor.Workers()) + + testrig.StandardDBSetup(suite.db, suite.testAccounts) + testrig.StandardStorageSetup(suite.storage, "../../testrig/media") +} + +func (suite *RealSenderStandardTestSuite) TearDownTest() { + testrig.StandardDBTeardown(suite.db) + testrig.StandardStorageTeardown(suite.storage) + testrig.StopWorkers(&suite.state) + suite.webPushHttpClientDo = nil +} + +// RoundTrip implements http.RoundTripper with a closure stored in the test suite. +func (suite *RealSenderStandardTestSuite) RoundTrip(request *http.Request) (*http.Response, error) { + return suite.webPushHttpClientDo(request) +} + +// notifyingReadCloser is a zero-length io.ReadCloser that can tell us when it's been closed, +// indicating the simulated Web Push server response has been sent, received, read, and closed. +type notifyingReadCloser struct { + bodyClosed chan struct{} +} + +func (rc *notifyingReadCloser) Read(p []byte) (n int, err error) { + return 0, io.EOF +} + +func (rc *notifyingReadCloser) Close() error { + rc.bodyClosed <- struct{}{} + close(rc.bodyClosed) + return nil +} + +func (suite *RealSenderStandardTestSuite) TestSendSuccess() { + // Set a timeout on the whole test. If it fails due to the timeout, + // the push notification was not sent for some reason. + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + notification, err := suite.state.DB.GetNotificationByID(ctx, suite.testNotifications["local_account_1_like"].ID) + if !suite.NoError(err) { + suite.FailNow("Couldn't fetch notification to send") + } + + rc := ¬ifyingReadCloser{ + bodyClosed: make(chan struct{}, 1), + } + + // Simulate a successful response from the Web Push server. + suite.webPushHttpClientDo = func(request *http.Request) (*http.Response, error) { + return &http.Response{ + Status: "200 OK", + StatusCode: 200, + Body: rc, + }, nil + } + + // Send the push notification. + suite.NoError(suite.webPushSender.Send(ctx, notification, nil, nil)) + + // Wait for it to be sent or for the context to time out. + bodyClosed := false + contextExpired := false + select { + case <-rc.bodyClosed: + bodyClosed = true + case <-ctx.Done(): + contextExpired = true + } + suite.True(bodyClosed) + suite.False(contextExpired) +} + +func TestRealSenderStandardTestSuite(t *testing.T) { + suite.Run(t, &RealSenderStandardTestSuite{}) +} diff --git a/internal/webpush/sender.go b/internal/webpush/sender.go new file mode 100644 index 000000000..0de9843e4 --- /dev/null +++ b/internal/webpush/sender.go @@ -0,0 +1,52 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package webpush + +import ( + "context" + "net/http" + + "github.com/superseriousbusiness/gotosocial/internal/filter/usermute" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/httpclient" + "github.com/superseriousbusiness/gotosocial/internal/state" +) + +// Sender can send Web Push notifications. +type Sender interface { + // Send queues up a notification for delivery to all of an account's Web Push subscriptions. + Send( + ctx context.Context, + notification *gtsmodel.Notification, + filters []*gtsmodel.Filter, + mutes *usermute.CompiledUserMuteList, + ) error +} + +// NewSender creates a new sender from an HTTP client, DB, and worker pool. +func NewSender(httpClient *httpclient.Client, state *state.State) Sender { + return NewRealSender( + &http.Client{ + Transport: >sHttpClientRoundTripper{ + httpClient: httpClient, + }, + // Other fields are already set on the http.Client inside the httpclient.Client. + }, + state, + ) +} diff --git a/internal/workers/workers.go b/internal/workers/workers.go index 4cf549041..50ad3cce5 100644 --- a/internal/workers/workers.go +++ b/internal/workers/workers.go @@ -54,6 +54,10 @@ type Workers struct { // eg., import tasks, admin tasks. Processing FnWorkerPool + // WebPush provides a worker pool for + // delivering Web Push notifications. + WebPush FnWorkerPool + // prevent pass-by-value. _ nocopy } @@ -90,6 +94,10 @@ func (w *Workers) Start() { n = maxprocs w.Processing.Start(n) log.Infof(nil, "started %d processing workers", n) + + n = maxprocs + w.WebPush.Start(n) + log.Infof(nil, "started %d Web Push workers", n) } // Stop will stop all of the contained @@ -113,6 +121,9 @@ func (w *Workers) Stop() { w.Processing.Stop() log.Info(nil, "stopped processing workers") + + w.WebPush.Stop() + log.Info(nil, "stopped WebPush workers") } // nocopy when embedded will signal linter to diff --git a/testrig/processor.go b/testrig/processor.go index e098de33a..116ee2769 100644 --- a/testrig/processor.go +++ b/testrig/processor.go @@ -27,12 +27,19 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/processing" "github.com/superseriousbusiness/gotosocial/internal/state" "github.com/superseriousbusiness/gotosocial/internal/typeutils" + "github.com/superseriousbusiness/gotosocial/internal/webpush" ) // NewTestProcessor returns a Processor suitable for testing purposes. // The passed in state will have its worker functions set appropriately, // but the state will not be initialized. -func NewTestProcessor(state *state.State, federator *federation.Federator, emailSender email.Sender, mediaManager *media.Manager) *processing.Processor { +func NewTestProcessor( + state *state.State, + federator *federation.Federator, + emailSender email.Sender, + webPushSender webpush.Sender, + mediaManager *media.Manager, +) *processing.Processor { return processing.NewProcessor( cleaner.New(state), typeutils.NewConverter(state), @@ -41,6 +48,7 @@ func NewTestProcessor(state *state.State, federator *federation.Federator, email mediaManager, state, emailSender, + webPushSender, visibility.NewFilter(state), interaction.NewFilter(state), ) diff --git a/testrig/teststructs.go b/testrig/teststructs.go index b88e37d55..d4035bcff 100644 --- a/testrig/teststructs.go +++ b/testrig/teststructs.go @@ -26,6 +26,7 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/processing/common" "github.com/superseriousbusiness/gotosocial/internal/state" "github.com/superseriousbusiness/gotosocial/internal/typeutils" + "github.com/superseriousbusiness/gotosocial/internal/webpush" ) // TestStructs encapsulates structs needed to @@ -78,6 +79,7 @@ func SetupTestStructs( federator := NewTestFederator(&state, transportController, mediaManager) oauthServer := NewTestOauthServer(db) emailSender := NewEmailSender(rTemplatePath, nil) + webPushSender := webpush.NewNoopSender() common := common.New( &state, @@ -95,6 +97,7 @@ func SetupTestStructs( mediaManager, &state, emailSender, + webPushSender, visFilter, intFilter, ) diff --git a/testrig/util.go b/testrig/util.go index 957553d79..a4bf1bea4 100644 --- a/testrig/util.go +++ b/testrig/util.go @@ -84,6 +84,7 @@ func StartWorkers(state *state.State, processor *workers.Processor) { state.Workers.Federator.Start(1) state.Workers.Dereference.Start(1) state.Workers.Processing.Start(1) + state.Workers.WebPush.Start(1) } func StopWorkers(state *state.State) { @@ -92,6 +93,7 @@ func StopWorkers(state *state.State) { state.Workers.Federator.Stop() state.Workers.Dereference.Stop() state.Workers.Processing.Stop() + state.Workers.WebPush.Stop() } func StartTimelines(state *state.State, visFilter *visibility.Filter, converter *typeutils.Converter) {