start adding preloading support

This commit is contained in:
kim 2025-04-08 14:16:08 +01:00
parent 624f7522d8
commit ced560ebf8
7 changed files with 255 additions and 79 deletions

View file

@ -351,6 +351,11 @@ var Start action.GTSAction = func(ctx context.Context) error {
intFilter,
)
// Preload our local user's streaming timeline caches.
if err := process.Timeline().Preload(ctx); err != nil {
return fmt.Errorf("error preloading timelines: %w", err)
}
// Schedule background cleaning tasks.
if err := cleaner.ScheduleJobs(); err != nil {
return fmt.Errorf("error scheduling cleaner jobs: %w", err)

View file

@ -318,6 +318,72 @@ func (t *StatusTimeline) Init(cap int) {
t.max = cap
}
// Preload ...
func (t *StatusTimeline) Preload(
ctx context.Context,
// loadPage should load the timeline of given page for cache hydration.
loadPage func(page *paging.Page) (statuses []*gtsmodel.Status, err error),
// filter can be used to perform filtering of returned
// statuses BEFORE insert into cache. i.e. this will effect
// what actually gets stored in the timeline cache.
filter func(each *gtsmodel.Status) (delete bool, err error),
) (int, error) {
if loadPage == nil {
panic("nil load page func")
}
// Our starting, page at the top
// of the possible timeline.
page := new(paging.Page)
order := paging.OrderDescending
page.Max.Order = order
page.Max.Value = plus24hULID()
page.Min.Order = order
page.Min.Value = ""
page.Limit = 100
// Prepare a slice for gathering status meta.
metas := make([]*StatusMeta, 0, page.Limit)
var n int
for n < t.cut {
// Load page of timeline statuses.
statuses, err := loadPage(page)
if err != nil {
return n, gtserror.Newf("error loading statuses: %w", err)
}
// No more statuses from
// load function = at end.
if len(statuses) == 0 {
break
}
// Update our next page cursor from statuses.
page.Max.Value = statuses[len(statuses)-1].ID
// Perform any filtering on newly loaded statuses.
statuses, err = doStatusFilter(statuses, filter)
if err != nil {
return n, gtserror.Newf("error filtering statuses: %w", err)
}
// After filtering no more
// statuses remain, retry.
if len(statuses) == 0 {
continue
}
// Convert statuses to meta and insert.
metas = toStatusMeta(metas[:0], statuses)
n = t.cache.Insert(metas...)
}
return n, nil
}
// Load will load timeline statuses according to given
// page, using provided callbacks to load extra data when
// necessary, and perform fine-grained filtering loaded
@ -424,12 +490,9 @@ func (t *StatusTimeline) Load(
)
}
// Track all newly loaded status entries
// after filtering for insert into cache.
var justLoaded []*StatusMeta
// Check whether loaded enough from cache.
if need := limit - len(apiStatuses); need > 0 {
// Check if we need to call
// through to the database.
if len(apiStatuses) == 0 {
// Load a little more than
// limit to reduce db calls.
@ -475,10 +538,7 @@ func (t *StatusTimeline) Load(
// Convert to our cache type,
// these will get inserted into
// the cache in prepare() below.
metas := toStatusMeta(statuses)
// Append to newly loaded for later insert.
justLoaded = append(justLoaded, metas...)
metas := toStatusMeta(nil, statuses)
// Prepare frontend API models for
// the loaded statuses. For now this
@ -511,12 +571,6 @@ func (t *StatusTimeline) Load(
lo, hi = hi, lo
}
if len(justLoaded) > 0 {
// Even if not returning them, insert
// the excess (filtered) into cache.
_ = t.cache.Insert(justLoaded...)
}
return apiStatuses, lo, hi, nil
}
@ -566,73 +620,69 @@ func LoadStatusTimeline(
// Preallocate a slice of up-to-limit API models.
apiStatuses := make([]*apimodel.Status, 0, limit)
// Check whether loaded enough from cache.
if need := limit - len(apiStatuses); need > 0 {
// Load a little more than
// limit to reduce db calls.
nextPg.Limit += 10
// Load a little more than
// limit to reduce db calls.
nextPg.Limit += 10
// Perform maximum of 5 load
// attempts fetching statuses.
for i := 0; i < 5; i++ {
// Perform maximum of 5 load
// attempts fetching statuses.
for i := 0; i < 5; i++ {
// Load next timeline statuses.
statuses, err := loadPage(nextPg)
if err != nil {
return nil, "", "", gtserror.Newf("error loading timeline: %w", err)
}
// Load next timeline statuses.
statuses, err := loadPage(nextPg)
if err != nil {
return nil, "", "", gtserror.Newf("error loading timeline: %w", err)
}
// No more statuses from
// load function = at end.
if len(statuses) == 0 {
break
}
// No more statuses from
// load function = at end.
if len(statuses) == 0 {
break
}
if hi == "" {
// Set hi returned paging
// value if not already set.
hi = statuses[0].ID
}
if hi == "" {
// Set hi returned paging
// value if not already set.
hi = statuses[0].ID
}
// Update nextPg cursor parameter for next database query.
nextPageParams(nextPg, statuses[len(statuses)-1].ID, order)
// Update nextPg cursor parameter for next database query.
nextPageParams(nextPg, statuses[len(statuses)-1].ID, order)
// Perform any filtering on newly loaded statuses.
statuses, err = doStatusFilter(statuses, filter)
if err != nil {
return nil, "", "", gtserror.Newf("error filtering statuses: %w", err)
}
// Perform any filtering on newly loaded statuses.
statuses, err = doStatusFilter(statuses, filter)
if err != nil {
return nil, "", "", gtserror.Newf("error filtering statuses: %w", err)
}
// After filtering no more
// statuses remain, retry.
if len(statuses) == 0 {
continue
}
// After filtering no more
// statuses remain, retry.
if len(statuses) == 0 {
continue
}
// Convert to our cache type,
// these will get inserted into
// the cache in prepare() below.
metas := toStatusMeta(nil, statuses)
// Convert to our cache type,
// these will get inserted into
// the cache in prepare() below.
metas := toStatusMeta(statuses)
// Prepare frontend API models for
// the loaded statuses. For now this
// also does its own extra filtering.
apiStatuses = prepareStatuses(ctx,
metas,
prepareAPI,
apiStatuses,
limit,
)
// Prepare frontend API models for
// the loaded statuses. For now this
// also does its own extra filtering.
apiStatuses = prepareStatuses(ctx,
metas,
prepareAPI,
apiStatuses,
limit,
)
// If we have anything, return
// here. Even if below limit.
if len(apiStatuses) > 0 {
// If we have anything, return
// here. Even if below limit.
if len(apiStatuses) > 0 {
// Set returned lo status paging value.
lo = apiStatuses[len(apiStatuses)-1].ID
break
}
// Set returned lo status paging value.
lo = apiStatuses[len(apiStatuses)-1].ID
break
}
}
@ -922,8 +972,8 @@ func loadStatuses(
// toStatusMeta converts a slice of database model statuses
// into our cache wrapper type, a slice of []StatusMeta{}.
func toStatusMeta(statuses []*gtsmodel.Status) []*StatusMeta {
return xslices.Gather(nil, statuses, func(s *gtsmodel.Status) *StatusMeta {
func toStatusMeta(in []*StatusMeta, statuses []*gtsmodel.Status) []*StatusMeta {
return xslices.Gather(in, statuses, func(s *gtsmodel.Status) *StatusMeta {
return &StatusMeta{
ID: s.ID,
AccountID: s.AccountID,

View file

@ -273,6 +273,12 @@ func TestStatusTimelineTrim(t *testing.T) {
assert.NotEqual(t, minID2, minStatus(&tt).ID)
assert.False(t, containsStatusID(&tt, minID1))
assert.False(t, containsStatusID(&tt, minID2))
// Trim at desired length
// should cause no change.
before := tt.cache.Len()
tt.Trim()
assert.Equal(t, before, tt.cache.Len())
}
// containsStatusID returns whether timeline contains a status with ID.

View file

@ -18,10 +18,19 @@
package timeline
import (
"time"
"codeberg.org/gruf/go-structr"
"github.com/superseriousbusiness/gotosocial/internal/id"
"github.com/superseriousbusiness/gotosocial/internal/paging"
)
// plus24hULID returns a ULID for now+24h.
func plus24hULID() string {
t := time.Now().Add(24 * time.Hour)
return id.NewULIDFromTime(t)
}
// nextPageParams gets the next set of paging
// parameters to use based on the current set,
// and the next set of lo / hi values. This will

View file

@ -25,6 +25,7 @@ import (
statusfilter "github.com/superseriousbusiness/gotosocial/internal/filter/status"
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
"github.com/superseriousbusiness/gotosocial/internal/log"
"github.com/superseriousbusiness/gotosocial/internal/paging"
)
@ -90,3 +91,35 @@ func (p *Processor) HomeTimelineGet(
},
)
}
// preloadHomeTimeline will ensure that the timeline
// cache for home owned by given account is preloaded.
func (p *Processor) preloadHomeTimeline(
ctx context.Context,
account *gtsmodel.Account,
) error {
// Get (and so, create) home timeline cache for account ID.
timeline := p.state.Caches.Timelines.Home.MustGet(account.ID)
// Preload timeline with funcs.
n, err := timeline.Preload(ctx,
// Database load function.
func(page *paging.Page) ([]*gtsmodel.Status, error) {
return p.state.DB.GetHomeTimeline(ctx, account.ID, page)
},
// Status filtering function.
func(status *gtsmodel.Status) (bool, error) {
ok, err := p.visFilter.StatusHomeTimelineable(ctx, account, status)
return !ok, err
},
)
if err != nil {
return gtserror.Newf("error preloading home timeline %s: %w", account.ID, err)
}
log.Infof(ctx, "%s: preloaded %d", account.Username, n)
return nil
}

View file

@ -27,6 +27,7 @@ import (
"github.com/superseriousbusiness/gotosocial/internal/gtscontext"
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
"github.com/superseriousbusiness/gotosocial/internal/log"
"github.com/superseriousbusiness/gotosocial/internal/paging"
)
@ -103,3 +104,36 @@ func (p *Processor) ListTimelineGet(
},
)
}
// preloadListTimeline will ensure that the timeline
// cache for list owned by given account is preloaded.
func (p *Processor) preloadListTimeline(
ctx context.Context,
account *gtsmodel.Account,
list *gtsmodel.List,
) error {
// Get (and so, create) list timeline cache for list ID.
timeline := p.state.Caches.Timelines.List.MustGet(list.ID)
// Preload timeline with funcs.
n, err := timeline.Preload(ctx,
// Database load function.
func(page *paging.Page) ([]*gtsmodel.Status, error) {
return p.state.DB.GetListTimeline(ctx, list.ID, page)
},
// Status filtering function.
func(status *gtsmodel.Status) (bool, error) {
ok, err := p.visFilter.StatusHomeTimelineable(ctx, account, status)
return !ok, err
},
)
if err != nil {
return gtserror.Newf("error preloading list timeline %s: %w", list.ID, err)
}
log.Infof(ctx, "%s[%q]: preloaded %d", account.Username, list.Title, n)
return nil
}

View file

@ -24,11 +24,12 @@ import (
"net/url"
apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model"
"github.com/superseriousbusiness/gotosocial/internal/cache/timeline"
timelinepkg "github.com/superseriousbusiness/gotosocial/internal/cache/timeline"
"github.com/superseriousbusiness/gotosocial/internal/db"
statusfilter "github.com/superseriousbusiness/gotosocial/internal/filter/status"
"github.com/superseriousbusiness/gotosocial/internal/filter/usermute"
"github.com/superseriousbusiness/gotosocial/internal/filter/visibility"
"github.com/superseriousbusiness/gotosocial/internal/gtscontext"
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
"github.com/superseriousbusiness/gotosocial/internal/id"
@ -61,10 +62,48 @@ func New(state *state.State, converter *typeutils.Converter, visFilter *visibili
}
}
// Preload ...
func (p *Processor) Preload(ctx context.Context) error {
// Get all of our local user accounts.
users, err := p.state.DB.GetAllUsers(ctx)
if err != nil {
return gtserror.Newf("error getting users: %w", err)
}
for _, user := range users {
// Get associated account.
account := user.Account
// Preload this user account's home timeline cache.
if err := p.preloadHomeTimeline(ctx, account); err != nil {
return gtserror.Newf("error preloading home timeline: %w", err)
}
// Get all lists owned by this user account.
lists, err := p.state.DB.GetListsByAccountID(
gtscontext.SetBarebones(ctx),
account.ID,
)
if err != nil {
return gtserror.Newf("error getting account %s lists: %w", account.ID, err)
}
for _, list := range lists {
// Preload each of this user account's list timeline caches.
if err := p.preloadListTimeline(ctx, account, list); err != nil {
return gtserror.Newf("error preloading list timeline: %w", err)
}
}
}
return nil
}
func (p *Processor) getStatusTimeline(
ctx context.Context,
requester *gtsmodel.Account,
cache *timeline.StatusTimeline,
timeline *timelinepkg.StatusTimeline,
page *paging.Page,
pagePath string,
pageQuery url.Values,
@ -128,10 +167,10 @@ func (p *Processor) getStatusTimeline(
return apiStatus, nil
}
if cache != nil {
if timeline != nil {
// Load status page via timeline cache, also
// getting lo, hi values for next, prev pages.
apiStatuses, lo, hi, err = cache.Load(ctx,
apiStatuses, lo, hi, err = timeline.Load(ctx,
// Status page
// to load.
@ -157,7 +196,7 @@ func (p *Processor) getStatusTimeline(
} else {
// Load status page without a receiving timeline cache.
// TODO: remove this code path when all support caching.
apiStatuses, lo, hi, err = timeline.LoadStatusTimeline(ctx,
apiStatuses, lo, hi, err = timelinepkg.LoadStatusTimeline(ctx,
page,
loadPage,
func(ids []string) ([]*gtsmodel.Status, error) {