package sched import ( "context" "runtime" "sort" "sync" "sync/atomic" "time" "codeberg.org/gruf/go-runners" ) // precision is the maximum time we can offer scheduler run-time precision down to. const precision = time.Millisecond var ( // neverticks is a timer channel that never ticks (it's starved). neverticks = make(chan time.Time) // alwaysticks is a timer channel that always ticks (it's closed). alwaysticks = func() chan time.Time { ch := make(chan time.Time) close(ch) return ch }() ) // Scheduler provides a means of running jobs at specific times and // regular intervals, all while sharing a single underlying timer. type Scheduler struct { jobs []*Job // jobs is a list of tracked Jobs to be executed jch chan interface{} // jch accepts either Jobs or job IDs to notify new/removed jobs svc runners.Service // svc manages the main scheduler routine jid atomic.Uint64 // jid is used to iteratively generate unique IDs for jobs } // Start will attempt to start the Scheduler. Immediately returns false if the Service is already running, and true after completed run. func (sch *Scheduler) Start() bool { var block sync.Mutex // Use mutex to synchronize between started // goroutine and ourselves, to ensure that // we don't return before Scheduler init'd. block.Lock() defer block.Unlock() ok := sch.svc.GoRun(func(ctx context.Context) { // Create Scheduler job channel sch.jch = make(chan interface{}) // Unlock start routine block.Unlock() // Set GC finalizer to ensure scheduler stopped runtime.SetFinalizer(sch, func(sch *Scheduler) { _ = sch.Stop() }) // Enter main loop sch.run(ctx) }) if ok { // Wait on goroutine block.Lock() } return ok } // Stop will attempt to stop the Scheduler. Immediately returns false if not running, and true only after Scheduler is fully stopped. func (sch *Scheduler) Stop() bool { return sch.svc.Stop() } // Running will return whether Scheduler is running. func (sch *Scheduler) Running() bool { return sch.svc.Running() } // Schedule will add provided Job to the Scheduler, returning a cancel function. func (sch *Scheduler) Schedule(job *Job) (cancel func()) { switch { // Check a job was passed case job == nil: panic("nil job") // Check we are running case sch.jch == nil: panic("scheduler not running") } // Calculate next job ID last := sch.jid.Load() next := sch.jid.Add(1) if next < last { panic("job id overflow") } // Pass job to scheduler job.id = next sch.jch <- job // Take ptrs to current state chs ctx := sch.svc.Done() jch := sch.jch // Return cancel function for job ID return func() { select { // Sched stopped case <-ctx: // Cancel this job case jch <- next: } } } // run is the main scheduler run routine, which runs for as long as ctx is valid. func (sch *Scheduler) run(ctx context.Context) { var ( // timerset represents whether timer was running // for a particular run of the loop. false means // that tch == neverticks || tch == alwaysticks timerset bool // timer tick channel (or a never-tick channel) tch <-chan time.Time // timer notifies this main routine to wake when // the job queued needs to be checked for executions timer *time.Timer // stopdrain will stop and drain the timer // if it has been running (i.e. timerset == true) stopdrain = func() { if timerset && !timer.Stop() { <-timer.C } } ) for { select { // Handle received job/id case v := <-sch.jch: sch.handle(v) continue // No more default: } // Done break } // Create a stopped timer timer = time.NewTimer(1) <-timer.C for { // Reset timer state timerset = false if len(sch.jobs) > 0 { // Sort jobs by next occurring sort.Sort(byNext(sch.jobs)) // Get execution time now := time.Now() // Get next job time next := sch.jobs[0].Next() // If this job is _just_ about to be ready, we // don't bother sleeping. It's wasted cycles only // sleeping for some obscenely tiny amount of time // we can't guarantee precision for. if until := next.Sub(now); until <= precision/1e3 { // This job is behind schedule, set to always tick. tch = alwaysticks } else { // Reset timer to period timer.Reset(until) tch = timer.C timerset = true } } else { // Unset timer tch = neverticks } select { // Scheduler stopped case <-ctx.Done(): stopdrain() return // Timer ticked, run scheduled case now := <-tch: if !timerset { // alwaysticks returns zero times now = time.Now() } sch.schedule(now) // Received update, handle job/id case v := <-sch.jch: sch.handle(v) stopdrain() } } } // handle takes an interfaces received from Scheduler.jch and handles either: // - Job --> new job to add. // - uint64 --> job ID to remove. func (sch *Scheduler) handle(v interface{}) { switch v := v.(type) { // New job added case *Job: // Get current time now := time.Now() // Update the next call time next := v.timing.Next(now) v.next.Store(next) // Append this job to queued sch.jobs = append(sch.jobs, v) // Job removed case uint64: for i := 0; i < len(sch.jobs); i++ { if sch.jobs[i].id == v { // This is the job we're looking for! Drop this sch.jobs = append(sch.jobs[:i], sch.jobs[i+1:]...) return } } } } // schedule will iterate through the scheduler jobs and execute those necessary, updating their next call time. func (sch *Scheduler) schedule(now time.Time) { for i := 0; i < len(sch.jobs); { // Scope our own var job := sch.jobs[i] // We know these jobs are ordered by .Next(), so as soon // as we reach one with .Next() after now, we can return if job.Next().After(now) { return } // Pass job to runner go job.Run(now) // Update the next call time next := job.timing.Next(now) job.next.Store(next) if next.IsZero() { // Zero time, this job is done and can be dropped sch.jobs = append(sch.jobs[:i], sch.jobs[i+1:]...) continue } // Iter i++ } } // byNext is an implementation of sort.Interface to sort Jobs by their .Next() time. type byNext []*Job func (by byNext) Len() int { return len(by) } func (by byNext) Less(i int, j int) bool { return by[i].Next().Before(by[j].Next()) } func (by byNext) Swap(i int, j int) { by[i], by[j] = by[j], by[i] }