gotosocial/vendor/codeberg.org/gruf/go-sched/scheduler.go
kim a156188b3e
[chore] update dependencies, bump to Go 1.19.1 (#826)
* update dependencies, bump Go version to 1.19

* bump test image Go version

* update golangci-lint

* update gotosocial-drone-build

* sign

* linting, go fmt

* update swagger docs

* update swagger docs

* whitespace

* update contributing.md

* fuckin whoopsie doopsie

* linterino, linteroni

* fix followrequest test not starting processor

* fix other api/client tests not starting processor

* fix remaining tests where processor not started

* bump go-runners version

* don't check last-webfingered-at, processor may have updated this

* update swagger command

* update bun to latest version

* fix embed to work the same as before with new bun

Signed-off-by: kim <grufwub@gmail.com>
Co-authored-by: tsmethurst <tobi.smethurst@protonmail.com>
2022-09-28 18:30:40 +01:00

290 lines
6.3 KiB
Go

package sched
import (
"context"
"runtime"
"sort"
"sync"
"sync/atomic"
"time"
"codeberg.org/gruf/go-runners"
)
// precision is the maximum time we can offer scheduler run-time precision down to.
const precision = time.Millisecond
var (
// neverticks is a timer channel that never ticks (it's starved).
neverticks = make(chan time.Time)
// alwaysticks is a timer channel that always ticks (it's closed).
alwaysticks = func() chan time.Time {
ch := make(chan time.Time)
close(ch)
return ch
}()
)
// Scheduler provides a means of running jobs at specific times and
// regular intervals, all while sharing a single underlying timer.
type Scheduler struct {
jobs []*Job // jobs is a list of tracked Jobs to be executed
jch chan interface{} // jch accepts either Jobs or job IDs to notify new/removed jobs
svc runners.Service // svc manages the main scheduler routine
jid atomic.Uint64 // jid is used to iteratively generate unique IDs for jobs
}
// Start will attempt to start the Scheduler. Immediately returns false if the Service is already running, and true after completed run.
func (sch *Scheduler) Start() bool {
var block sync.Mutex
// Use mutex to synchronize between started
// goroutine and ourselves, to ensure that
// we don't return before Scheduler init'd.
block.Lock()
defer block.Unlock()
ok := sch.svc.GoRun(func(ctx context.Context) {
// Create Scheduler job channel
sch.jch = make(chan interface{})
// Unlock start routine
block.Unlock()
// Set GC finalizer to ensure scheduler stopped
runtime.SetFinalizer(sch, func(sch *Scheduler) {
_ = sch.Stop()
})
// Enter main loop
sch.run(ctx)
})
if ok {
// Wait on goroutine
block.Lock()
}
return ok
}
// Stop will attempt to stop the Scheduler. Immediately returns false if not running, and true only after Scheduler is fully stopped.
func (sch *Scheduler) Stop() bool {
return sch.svc.Stop()
}
// Running will return whether Scheduler is running.
func (sch *Scheduler) Running() bool {
return sch.svc.Running()
}
// Schedule will add provided Job to the Scheduler, returning a cancel function.
func (sch *Scheduler) Schedule(job *Job) (cancel func()) {
switch {
// Check a job was passed
case job == nil:
panic("nil job")
// Check we are running
case sch.jch == nil:
panic("scheduler not running")
}
// Calculate next job ID
last := sch.jid.Load()
next := sch.jid.Add(1)
if next < last {
panic("job id overflow")
}
// Pass job to scheduler
job.id = next
sch.jch <- job
// Take ptrs to current state chs
ctx := sch.svc.Done()
jch := sch.jch
// Return cancel function for job ID
return func() {
select {
// Sched stopped
case <-ctx:
// Cancel this job
case jch <- next:
}
}
}
// run is the main scheduler run routine, which runs for as long as ctx is valid.
func (sch *Scheduler) run(ctx context.Context) {
var (
// timerset represents whether timer was running
// for a particular run of the loop. false means
// that tch == neverticks || tch == alwaysticks
timerset bool
// timer tick channel (or a never-tick channel)
tch <-chan time.Time
// timer notifies this main routine to wake when
// the job queued needs to be checked for executions
timer *time.Timer
// stopdrain will stop and drain the timer
// if it has been running (i.e. timerset == true)
stopdrain = func() {
if timerset && !timer.Stop() {
<-timer.C
}
}
)
for {
select {
// Handle received job/id
case v := <-sch.jch:
sch.handle(v)
continue
// No more
default:
}
// Done
break
}
// Create a stopped timer
timer = time.NewTimer(1)
<-timer.C
for {
// Reset timer state
timerset = false
if len(sch.jobs) > 0 {
// Sort jobs by next occurring
sort.Sort(byNext(sch.jobs))
// Get execution time
now := time.Now()
// Get next job time
next := sch.jobs[0].Next()
// If this job is _just_ about to be ready, we
// don't bother sleeping. It's wasted cycles only
// sleeping for some obscenely tiny amount of time
// we can't guarantee precision for.
if until := next.Sub(now); until <= precision/1e3 {
// This job is behind schedule, set to always tick.
tch = alwaysticks
} else {
// Reset timer to period
timer.Reset(until)
tch = timer.C
timerset = true
}
} else {
// Unset timer
tch = neverticks
}
select {
// Scheduler stopped
case <-ctx.Done():
stopdrain()
return
// Timer ticked, run scheduled
case now := <-tch:
if !timerset {
// alwaysticks returns zero times
now = time.Now()
}
sch.schedule(now)
// Received update, handle job/id
case v := <-sch.jch:
sch.handle(v)
stopdrain()
}
}
}
// handle takes an interfaces received from Scheduler.jch and handles either:
// - Job --> new job to add.
// - uint64 --> job ID to remove.
func (sch *Scheduler) handle(v interface{}) {
switch v := v.(type) {
// New job added
case *Job:
// Get current time
now := time.Now()
// Update the next call time
next := v.timing.Next(now)
v.next.Store(next)
// Append this job to queued
sch.jobs = append(sch.jobs, v)
// Job removed
case uint64:
for i := 0; i < len(sch.jobs); i++ {
if sch.jobs[i].id == v {
// This is the job we're looking for! Drop this
sch.jobs = append(sch.jobs[:i], sch.jobs[i+1:]...)
return
}
}
}
}
// schedule will iterate through the scheduler jobs and execute those necessary, updating their next call time.
func (sch *Scheduler) schedule(now time.Time) {
for i := 0; i < len(sch.jobs); {
// Scope our own var
job := sch.jobs[i]
// We know these jobs are ordered by .Next(), so as soon
// as we reach one with .Next() after now, we can return
if job.Next().After(now) {
return
}
// Pass job to runner
go job.Run(now)
// Update the next call time
next := job.timing.Next(now)
job.next.Store(next)
if next.IsZero() {
// Zero time, this job is done and can be dropped
sch.jobs = append(sch.jobs[:i], sch.jobs[i+1:]...)
continue
}
// Iter
i++
}
}
// byNext is an implementation of sort.Interface to sort Jobs by their .Next() time.
type byNext []*Job
func (by byNext) Len() int {
return len(by)
}
func (by byNext) Less(i int, j int) bool {
return by[i].Next().Before(by[j].Next())
}
func (by byNext) Swap(i int, j int) {
by[i], by[j] = by[j], by[i]
}