implement new timeline code into more areas of codebase, pull in latest go-mangler, go-mutexes, go-structr

This commit is contained in:
kim 2025-03-20 13:15:25 +00:00
parent bf32a08da9
commit d71dd00f79
5 changed files with 82 additions and 86 deletions

View file

@ -216,7 +216,6 @@ func (c *Caches) Sweep(threshold float64) {
c.Timelines.Home.Trim(threshold)
c.Timelines.List.Trim(threshold)
c.Timelines.Public.Trim(threshold)
c.Timelines.Local.Trim(threshold)
c.Visibility.Trim(threshold)
}

View file

@ -32,9 +32,6 @@ type TimelineCaches struct {
// Public ...
Public timeline.StatusTimeline
// Local ...
Local timeline.StatusTimeline
}
func (c *Caches) initHomeTimelines() {
@ -60,11 +57,3 @@ func (c *Caches) initPublicTimeline() {
c.Timelines.Public.Init(cap)
}
func (c *Caches) initLocalTimeline() {
cap := 1000
log.Infof(nil, "cache size = %d", cap)
c.Timelines.Local.Init(cap)
}

View file

@ -159,7 +159,12 @@ func (t *StatusTimelines) Delete(key string) {
}
}
// Insert ...
// InsertInto allows you to bulk insert many statuses into timeline mapped by key.
func (t *StatusTimelines) InsertInto(key string, statuses ...*gtsmodel.Status) {
t.MustGet(key).Insert(statuses...)
}
// Insert allows you to bulk insert many statuses into *all* mapped timelines.
func (t *StatusTimelines) Insert(statuses ...*gtsmodel.Status) {
meta := toStatusMeta(statuses)
if p := t.ptr.Load(); p != nil {
@ -169,11 +174,6 @@ func (t *StatusTimelines) Insert(statuses ...*gtsmodel.Status) {
}
}
// InsertInto ...
func (t *StatusTimelines) InsertInto(key string, statuses ...*gtsmodel.Status) {
t.MustGet(key).Insert(statuses...)
}
// RemoveByStatusIDs ...
func (t *StatusTimelines) RemoveByStatusIDs(statusIDs ...string) {
if p := t.ptr.Load(); p != nil {
@ -210,6 +210,15 @@ func (t *StatusTimelines) UnprepareByAccountIDs(accountIDs ...string) {
}
}
// UnprepareAll ...
func (t *StatusTimelines) UnprepareAll() {
if p := t.ptr.Load(); p != nil {
for _, tt := range *p {
tt.UnprepareAll()
}
}
}
// Trim ...
func (t *StatusTimelines) Trim(threshold float64) {
if p := t.ptr.Load(); p != nil {
@ -470,7 +479,20 @@ func (t *StatusTimeline) Load(
return apiStatuses, lo, hi, nil
}
// Insert ...
// InsertOne allows you to insert a single status into the timeline, with optional prepared API model.
func (t *StatusTimeline) InsertOne(status *gtsmodel.Status, prepared *apimodel.Status) {
t.cache.Insert(&StatusMeta{
ID: status.ID,
AccountID: status.AccountID,
BoostOfID: status.BoostOfID,
BoostOfAccountID: status.BoostOfAccountID,
Local: *status.Local,
loaded: status,
prepared: prepared,
})
}
// Insert allows you to bulk insert many statuses into the timeline.
func (t *StatusTimeline) Insert(statuses ...*gtsmodel.Status) {
t.cache.Insert(toStatusMeta(statuses)...)
}
@ -595,6 +617,14 @@ func (t *StatusTimeline) UnprepareByAccountIDs(accountIDs ...string) {
}
}
// UnprepareAll removes cached frontend API
// models for all cached timeline entries.
func (t *StatusTimeline) UnprepareAll() {
for value := range t.cache.RangeUnsafe(structr.Asc) {
value.prepared = nil
}
}
// Trim ...
func (t *StatusTimeline) Trim(threshold float64) {
@ -690,7 +720,8 @@ func loadStatuses(
metas []*StatusMeta,
loadIDs func([]string) ([]*gtsmodel.Status, error),
) error {
// ...
// Determine which of our passed status
// meta objects still need statuses loading.
toLoadIDs := make([]string, len(metas))
loadedMap := make(map[string]*StatusMeta, len(metas))
for i, meta := range metas {
@ -733,7 +764,8 @@ func toStatusMeta(statuses []*gtsmodel.Status) []*StatusMeta {
})
}
// ...
// doStatusPreFilter performs given filter function on provided statuses,
// returning early if an error is returned. returns filtered statuses.
func doStatusPreFilter(statuses []*gtsmodel.Status, filter func(*gtsmodel.Status) (bool, error)) ([]*gtsmodel.Status, error) {
// Check for provided
@ -765,7 +797,9 @@ func doStatusPreFilter(statuses []*gtsmodel.Status, filter func(*gtsmodel.Status
return statuses, nil
}
// ...
// doStatusPostFilter performs given filter function on provided status meta,
// expecting that embedded status is already loaded, returning filtered status
// meta, as well as those *filtered out*. returns early if error is returned.
func doStatusPostFilter(metas []*StatusMeta, filter func(*gtsmodel.Status) (bool, error)) ([]*StatusMeta, []*StatusMeta, error) {
// Check for provided

View file

@ -23,7 +23,6 @@ import (
"github.com/stretchr/testify/suite"
apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model"
apiutil "github.com/superseriousbusiness/gotosocial/internal/api/util"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
"github.com/superseriousbusiness/gotosocial/internal/id"
"github.com/superseriousbusiness/gotosocial/internal/paging"
@ -35,10 +34,6 @@ type HomeTestSuite struct {
}
func (suite *HomeTestSuite) TearDownTest() {
if err := suite.state.Timelines.Home.Stop(); err != nil {
suite.FailNow(err.Error())
}
suite.TimelineStandardTestSuite.TearDownTest()
}
@ -47,7 +42,6 @@ func (suite *HomeTestSuite) TestHomeTimelineGetHideFiltered() {
var (
ctx = context.Background()
requester = suite.testAccounts["local_account_1"]
authed = &apiutil.Auth{Account: requester}
maxID = ""
sinceID = ""
minID = "01F8MHAAY43M6RJ473VQFCVH36" // 1 before filteredStatus
@ -98,10 +92,9 @@ func (suite *HomeTestSuite) TestHomeTimelineGetHideFiltered() {
if !filteredStatusFound {
suite.FailNow("precondition failed: status we would filter isn't present in unfiltered timeline")
}
// Prune the timeline to drop cached prepared statuses, a side effect of this precondition check.
if _, err := suite.state.Timelines.Home.Prune(ctx, requester.ID, 0, 0); err != nil {
suite.FailNow(err.Error())
}
// Clear the timeline to drop all cached statuses.
suite.state.Caches.Timelines.Home.Clear(requester.ID)
// Create a filter to hide one status on the timeline.
if err := suite.db.PutFilter(ctx, filter); err != nil {

View file

@ -21,6 +21,7 @@ import (
"context"
"errors"
timeline2 "github.com/superseriousbusiness/gotosocial/internal/cache/timeline"
statusfilter "github.com/superseriousbusiness/gotosocial/internal/filter/status"
"github.com/superseriousbusiness/gotosocial/internal/filter/usermute"
"github.com/superseriousbusiness/gotosocial/internal/gtscontext"
@ -28,7 +29,6 @@ import (
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
"github.com/superseriousbusiness/gotosocial/internal/log"
"github.com/superseriousbusiness/gotosocial/internal/stream"
"github.com/superseriousbusiness/gotosocial/internal/timeline"
"github.com/superseriousbusiness/gotosocial/internal/util"
)
@ -161,21 +161,16 @@ func (s *Surface) timelineAndNotifyStatusForFollowers(
// Add status to home timeline for owner of
// this follow (origin account), if applicable.
homeTimelined, err = s.timelineStatus(ctx,
s.State.Timelines.Home.IngestOne,
follow.AccountID, // home timelines are keyed by account ID
if homeTimelined := s.timelineStatus(ctx,
s.State.Caches.Timelines.Home.MustGet(follow.AccountID),
follow.Account,
status,
stream.TimelineHome,
statusfilter.FilterContextHome,
filters,
mutes,
)
if err != nil {
log.Errorf(ctx, "error home timelining status: %v", err)
continue
}
); homeTimelined {
if homeTimelined {
// If hometimelined, add to list of returned account IDs.
homeTimelinedAccountIDs = append(homeTimelinedAccountIDs, follow.AccountID)
}
@ -261,21 +256,16 @@ func (s *Surface) listTimelineStatusForFollow(
exclusive = exclusive || *list.Exclusive
// At this point we are certain this status
// should be included in the timeline of the
// list that this list entry belongs to.
listTimelined, err := s.timelineStatus(ctx,
s.State.Timelines.List.IngestOne,
list.ID, // list timelines are keyed by list ID
// should be included in timeline of this list.
listTimelined := s.timelineStatus(ctx,
s.State.Caches.Timelines.List.MustGet(list.ID),
follow.Account,
status,
stream.TimelineList+":"+list.ID, // key streamType to this specific list
statusfilter.FilterContextHome,
filters,
mutes,
)
if err != nil {
log.Errorf(ctx, "error adding status to list timeline: %v", err)
continue
}
// Update flag based on if timelined.
timelined = timelined || listTimelined
@ -371,48 +361,46 @@ func (s *Surface) listEligible(
//
// If the status was inserted into the timeline, true will be returned
// + it will also be streamed to the user using the given streamType.
// timelineStatus ...
func (s *Surface) timelineStatus(
ctx context.Context,
ingest func(context.Context, string, timeline.Timelineable) (bool, error),
timelineID string,
timeline *timeline2.StatusTimeline,
account *gtsmodel.Account,
status *gtsmodel.Status,
streamType string,
filterCtx statusfilter.FilterContext,
filters []*gtsmodel.Filter,
mutes *usermute.CompiledUserMuteList,
) (bool, error) {
) bool {
// Ingest status into given timeline using provided function.
if inserted, err := ingest(ctx, timelineID, status); err != nil &&
!errors.Is(err, statusfilter.ErrHideStatus) {
err := gtserror.Newf("error ingesting status %s: %w", status.ID, err)
return false, err
} else if !inserted {
// Nothing more to do.
return false, nil
}
// Convert updated database model to frontend model.
apiStatus, err := s.Converter.StatusToAPIStatus(ctx,
// Attempt to convert status to frontend API representation,
// this will check whether status is filtered / muted.
apiModel, err := s.Converter.StatusToAPIStatus(ctx,
status,
account,
statusfilter.FilterContextHome,
filterCtx,
filters,
mutes,
)
if err != nil && !errors.Is(err, statusfilter.ErrHideStatus) {
err := gtserror.Newf("error converting status %s to frontend representation: %w", status.ID, err)
return true, err
log.Error(ctx, "error converting status %s to frontend: %v", status.URI, err)
}
if apiStatus != nil {
// The status was inserted so stream it to the user.
s.Stream.Update(ctx, account, apiStatus, streamType)
return true, nil
// Insert status to timeline cache regardless of
// if API model was successfully prepared or not.
timeline.InsertOne(status, apiModel)
if apiModel != nil {
// Only send the status to user's stream if not
// filtered / muted, i.e. successfully prepared model.
s.Stream.Update(ctx, account, apiModel, streamType)
return true
}
// Status was hidden.
return false, nil
// Status was
// filtered / muted.
return false
}
// timelineAndNotifyStatusForTagFollowers inserts the status into the
@ -443,22 +431,15 @@ func (s *Surface) timelineAndNotifyStatusForTagFollowers(
continue
}
if _, err := s.timelineStatus(ctx,
s.State.Timelines.Home.IngestOne,
tagFollowerAccount.ID, // home timelines are keyed by account ID
_ = s.timelineStatus(ctx,
s.State.Caches.Timelines.Home.MustGet(tagFollowerAccount.ID),
tagFollowerAccount,
status,
stream.TimelineHome,
statusfilter.FilterContextHome,
filters,
mutes,
); err != nil {
errs.Appendf(
"error inserting status %s into home timeline for account %s: %w",
status.ID,
tagFollowerAccount.ID,
err,
)
}
)
}
return errs.Combine()