// GoToSocial // Copyright (C) GoToSocial Authors admin@gotosocial.org // SPDX-License-Identifier: AGPL-3.0-or-later // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU Affero General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Affero General Public License for more details. // // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . package stream import ( "context" "maps" "slices" "sync" "sync/atomic" ) const ( // EventTypeNotification -- a user // should be shown a notification. EventTypeNotification = "notification" // EventTypeUpdate -- a user should // be shown an update in their timeline. EventTypeUpdate = "update" // EventTypeDelete -- something // should be deleted from a user. EventTypeDelete = "delete" // EventTypeStatusUpdate -- something in the // user's timeline has been edited (yes this // is a confusing name, blame Mastodon ...). EventTypeStatusUpdate = "status.update" ) const ( // TimelineLocal: // All public posts originating from this // server. Analogous to the local timeline. TimelineLocal = "public:local" // TimelinePublic: // All public posts known to the server. // Analogous to the federated timeline. TimelinePublic = "public" // TimelineHome: // Events related to the current user, such // as home feed updates and notifications. TimelineHome = "user" // TimelineNotifications: // Notifications for the current user. TimelineNotifications = "user:notification" // TimelineDirect: // Updates to direct conversations. TimelineDirect = "direct" // TimelineList: // Updates to a specific list. TimelineList = "list" ) // AllStatusTimelines contains all Timelines // that a status could conceivably be delivered // to, useful for sending out status deletes. var AllStatusTimelines = []string{ TimelineLocal, TimelinePublic, TimelineHome, TimelineDirect, TimelineList, } type Streams struct { streams map[string][]*Stream mutex sync.Mutex } // Open will open open a new Stream for given account ID and stream types, the given context will be passed to Stream. func (s *Streams) Open(accountID string, streamTypes ...string) *Stream { if len(streamTypes) == 0 { panic("no stream types given") } // Prep new Stream. str := new(Stream) str.done = make(chan struct{}) str.msgCh = make(chan Message, 50) // TODO: make configurable for _, streamType := range streamTypes { str.Subscribe(streamType) } // TODO: add configurable // max streams per account. // Acquire lock. s.mutex.Lock() if s.streams == nil { // Main stream-map needs allocating. s.streams = make(map[string][]*Stream) } // Add new stream for account. strs := s.streams[accountID] strs = append(strs, str) s.streams[accountID] = strs // Register close callback // to remove stream from our // internal map for this account. str.close = func() { s.mutex.Lock() strs := s.streams[accountID] strs = slices.DeleteFunc(strs, func(s *Stream) bool { return s == str // remove 'str' ptr }) s.streams[accountID] = strs s.mutex.Unlock() } // Done with lock. s.mutex.Unlock() return str } // Post will post the given message to all streams of given account ID matching type. func (s *Streams) Post(ctx context.Context, accountID string, msg Message) bool { var deferred []func() bool // Acquire lock. s.mutex.Lock() // Iterate all streams stored for account. for _, str := range s.streams[accountID] { // Check whether stream supports any of our message targets. if stype := str.getStreamType(msg.Stream...); stype != "" { // Rescope var // to prevent // ptr reuse. stream := str // Use a message copy to *only* // include the supported stream. msgCopy := Message{ Stream: []string{stype}, Event: msg.Event, Payload: msg.Payload, } // Send message to supported stream // DEFERRED (i.e. OUTSIDE OF MAIN MUTEX). // This prevents deadlocks between each // msg channel and main Streams{} mutex. deferred = append(deferred, func() bool { return stream.send(ctx, msgCopy) }) } } // Done with lock. s.mutex.Unlock() var ok bool // Execute deferred outside lock. for _, deferfn := range deferred { v := deferfn() ok = ok && v } return ok } // PostAll will post the given message to all streams with matching types. func (s *Streams) PostAll(ctx context.Context, msg Message) bool { var deferred []func() bool // Acquire lock. s.mutex.Lock() // Iterate ALL stored streams. for _, strs := range s.streams { for _, str := range strs { // Check whether stream supports any of our message targets. if stype := str.getStreamType(msg.Stream...); stype != "" { // Rescope var // to prevent // ptr reuse. stream := str // Use a message copy to *only* // include the supported stream. msgCopy := Message{ Stream: []string{stype}, Event: msg.Event, Payload: msg.Payload, } // Send message to supported stream // DEFERRED (i.e. OUTSIDE OF MAIN MUTEX). // This prevents deadlocks between each // msg channel and main Streams{} mutex. deferred = append(deferred, func() bool { return stream.send(ctx, msgCopy) }) } } } // Done with lock. s.mutex.Unlock() var ok bool // Execute deferred outside lock. for _, deferfn := range deferred { v := deferfn() ok = ok && v } return ok } // Stream represents one // open stream for a client. type Stream struct { // atomically updated ptr to a read-only copy // of supported stream types in a hashmap. this // gets updated via CAS operations in .cas(). types atomic.Pointer[map[string]struct{}] // protects stream close. done chan struct{} // inbound msg ch. msgCh chan Message // close hook to remove // stream from Streams{}. close func() } // Subscribe will add given type to given types this stream supports. func (s *Stream) Subscribe(streamType string) { s.cas(func(m map[string]struct{}) bool { if _, ok := m[streamType]; ok { return false } m[streamType] = struct{}{} return true }) } // Unsubscribe will remove given type (if found) from types this stream supports. func (s *Stream) Unsubscribe(streamType string) { s.cas(func(m map[string]struct{}) bool { if _, ok := m[streamType]; !ok { return false } delete(m, streamType) return true }) } // getStreamType returns the first stream type in given list that stream supports. func (s *Stream) getStreamType(streamTypes ...string) string { if ptr := s.types.Load(); ptr != nil { for _, streamType := range streamTypes { if _, ok := (*ptr)[streamType]; ok { return streamType } } } return "" } // send will block on posting a new Message{}, returning early with // a false value if provided context is canceled, or stream closed. func (s *Stream) send(ctx context.Context, msg Message) bool { select { case <-s.done: return false case <-ctx.Done(): return false case s.msgCh <- msg: return true } } // Recv will block on receiving Message{}, returning early with a // false value if provided context is canceled, or stream closed. func (s *Stream) Recv(ctx context.Context) (Message, bool) { select { case <-s.done: return Message{}, false case <-ctx.Done(): return Message{}, false case msg := <-s.msgCh: return msg, true } } // Close will close the underlying context, finally // removing it from the parent Streams per-account-map. func (s *Stream) Close() { select { case <-s.done: default: close(s.done) s.close() } } // cas will perform a Compare And Swap operation on s.types using modifier func. func (s *Stream) cas(fn func(map[string]struct{}) bool) { if fn == nil { panic("nil function") } for { var m map[string]struct{} // Get current value. ptr := s.types.Load() if ptr == nil { // Allocate new types map. m = make(map[string]struct{}) } else { // Clone r-only map. m = maps.Clone(*ptr) } // Apply // changes. if !fn(m) { return } // Attempt to Compare And Swap ptr. if s.types.CompareAndSwap(ptr, &m) { return } } } // Message represents // one streamed message. type Message struct { // All the stream types this // message should be delivered to. Stream []string `json:"stream"` // The event type of the message // (update/delete/notification etc) Event string `json:"event"` // The actual payload of the message. In case of an // update or notification, this will be a JSON string. Payload string `json:"payload"` }