gtsocial-umbx

Unnamed repository; edit this file 'description' to name the repository.
Log | Files | Refs | README | LICENSE

scheduler.go (6813B)


      1 package sched
      2 
      3 import (
      4 	"context"
      5 	"sort"
      6 	"sync"
      7 	"sync/atomic"
      8 	"time"
      9 
     10 	"codeberg.org/gruf/go-runners"
     11 )
     12 
     13 // precision is the maximum time we can offer scheduler run-time precision down to.
     14 const precision = time.Millisecond
     15 
     16 var (
     17 	// neverticks is a timer channel that never ticks (it's starved).
     18 	neverticks = make(chan time.Time)
     19 
     20 	// alwaysticks is a timer channel that always ticks (it's closed).
     21 	alwaysticks = func() chan time.Time {
     22 		ch := make(chan time.Time)
     23 		close(ch)
     24 		return ch
     25 	}()
     26 )
     27 
     28 // Scheduler provides a means of running jobs at specific times and
     29 // regular intervals, all while sharing a single underlying timer.
     30 type Scheduler struct {
     31 	jobs []*Job           // jobs is a list of tracked Jobs to be executed
     32 	jch  chan interface{} // jch accepts either Jobs or job IDs to notify new/removed jobs
     33 	svc  runners.Service  // svc manages the main scheduler routine
     34 	jid  atomic.Uint64    // jid is used to iteratively generate unique IDs for jobs
     35 	rgo  func(func())     // goroutine runner, allows using goroutine pool to launch jobs
     36 }
     37 
     38 // Start will attempt to start the Scheduler. Immediately returns false if the Service is already running, and true after completed run.
     39 func (sch *Scheduler) Start(gorun func(func())) bool {
     40 	var block sync.Mutex
     41 
     42 	// Use mutex to synchronize between started
     43 	// goroutine and ourselves, to ensure that
     44 	// we don't return before Scheduler init'd.
     45 	block.Lock()
     46 	defer block.Unlock()
     47 
     48 	ok := sch.svc.GoRun(func(ctx context.Context) {
     49 		// Create Scheduler job channel
     50 		sch.jch = make(chan interface{})
     51 
     52 		// Set goroutine runner function
     53 		if sch.rgo = gorun; sch.rgo == nil {
     54 			sch.rgo = func(f func()) { go f() }
     55 		}
     56 
     57 		// Unlock start routine
     58 		block.Unlock()
     59 
     60 		// Enter main loop
     61 		sch.run(ctx)
     62 	})
     63 
     64 	if ok {
     65 		// Wait on goroutine
     66 		block.Lock()
     67 	}
     68 
     69 	return ok
     70 }
     71 
     72 // Stop will attempt to stop the Scheduler. Immediately returns false if not running, and true only after Scheduler is fully stopped.
     73 func (sch *Scheduler) Stop() bool {
     74 	return sch.svc.Stop()
     75 }
     76 
     77 // Running will return whether Scheduler is running (i.e. NOT stopped / stopping).
     78 func (sch *Scheduler) Running() bool {
     79 	return sch.svc.Running()
     80 }
     81 
     82 // Done returns a channel that's closed when Scheduler.Stop() is called.
     83 func (sch *Scheduler) Done() <-chan struct{} {
     84 	return sch.svc.Done()
     85 }
     86 
     87 // Schedule will add provided Job to the Scheduler, returning a cancel function.
     88 func (sch *Scheduler) Schedule(job *Job) (cancel func()) {
     89 	switch {
     90 	// Check a job was passed
     91 	case job == nil:
     92 		panic("nil job")
     93 
     94 	// Check we are running
     95 	case !sch.Running():
     96 		panic("scheduler not running")
     97 	}
     98 
     99 	// Calculate next job ID
    100 	last := sch.jid.Load()
    101 	next := sch.jid.Add(1)
    102 	if next < last {
    103 		panic("job id overflow")
    104 	}
    105 
    106 	// Pass job to scheduler
    107 	job.id = next
    108 	sch.jch <- job
    109 
    110 	// Take ptrs to current state chs
    111 	ctx := sch.svc.Done()
    112 	jch := sch.jch
    113 
    114 	// Return cancel function for job ID
    115 	return func() {
    116 		select {
    117 		// Sched stopped
    118 		case <-ctx:
    119 
    120 		// Cancel this job
    121 		case jch <- next:
    122 		}
    123 	}
    124 }
    125 
    126 // run is the main scheduler run routine, which runs for as long as ctx is valid.
    127 func (sch *Scheduler) run(ctx context.Context) {
    128 	var (
    129 		// now stores the current time, and will only be
    130 		// set when the timer channel is set to be the
    131 		// 'alwaysticks' channel. this allows minimizing
    132 		// the number of calls required to time.Now().
    133 		now time.Time
    134 
    135 		// timerset represents whether timer was running
    136 		// for a particular run of the loop. false means
    137 		// that tch == neverticks || tch == alwaysticks.
    138 		timerset bool
    139 
    140 		// timer tick channel (or always / never ticks).
    141 		tch <-chan time.Time
    142 
    143 		// timer notifies this main routine to wake when
    144 		// the job queued needs to be checked for executions.
    145 		timer *time.Timer
    146 
    147 		// stopdrain will stop and drain the timer
    148 		// if it has been running (i.e. timerset == true).
    149 		stopdrain = func() {
    150 			if timerset && !timer.Stop() {
    151 				<-timer.C
    152 			}
    153 		}
    154 	)
    155 
    156 	// Create a stopped timer.
    157 	timer = time.NewTimer(1)
    158 	<-timer.C
    159 
    160 	for {
    161 		// Reset timer state.
    162 		timerset = false
    163 
    164 		if len(sch.jobs) > 0 {
    165 			// Get now time.
    166 			now = time.Now()
    167 
    168 			// Sort jobs by next occurring.
    169 			sort.Sort(byNext(sch.jobs))
    170 
    171 			// Get next job time.
    172 			next := sch.jobs[0].Next()
    173 
    174 			// If this job is _just_ about to be ready, we don't bother
    175 			// sleeping. It's wasted cycles only sleeping for some obscenely
    176 			// tiny amount of time we can't guarantee precision for.
    177 			if until := next.Sub(now); until <= precision/1e3 {
    178 				// This job is behind,
    179 				// set to always tick.
    180 				tch = alwaysticks
    181 			} else {
    182 				// Reset timer to period.
    183 				timer.Reset(until)
    184 				tch = timer.C
    185 				timerset = true
    186 			}
    187 		} else {
    188 			// Unset timer
    189 			tch = neverticks
    190 		}
    191 
    192 		select {
    193 		// Scheduler stopped
    194 		case <-ctx.Done():
    195 			stopdrain()
    196 			return
    197 
    198 		// Timer ticked, run scheduled
    199 		case t := <-tch:
    200 			if !timerset {
    201 				// 'alwaysticks' returns zero
    202 				// times, BUT 'now' will have
    203 				// been set during above sort.
    204 				t = now
    205 			}
    206 			sch.schedule(t)
    207 
    208 		// Received update, handle job/id
    209 		case v := <-sch.jch:
    210 			sch.handle(v)
    211 			stopdrain()
    212 		}
    213 	}
    214 }
    215 
    216 // handle takes an interfaces received from Scheduler.jch and handles either:
    217 // - Job --> new job to add.
    218 // - uint64 --> job ID to remove.
    219 func (sch *Scheduler) handle(v interface{}) {
    220 	switch v := v.(type) {
    221 	// New job added
    222 	case *Job:
    223 		// Get current time
    224 		now := time.Now()
    225 
    226 		// Update the next call time
    227 		next := v.timing.Next(now)
    228 		v.next.Store(next)
    229 
    230 		// Append this job to queued
    231 		sch.jobs = append(sch.jobs, v)
    232 
    233 	// Job removed
    234 	case uint64:
    235 		for i := 0; i < len(sch.jobs); i++ {
    236 			if sch.jobs[i].id == v {
    237 				// This is the job we're looking for! Drop this
    238 				sch.jobs = append(sch.jobs[:i], sch.jobs[i+1:]...)
    239 				return
    240 			}
    241 		}
    242 	}
    243 }
    244 
    245 // schedule will iterate through the scheduler jobs and execute those necessary, updating their next call time.
    246 func (sch *Scheduler) schedule(now time.Time) {
    247 	for i := 0; i < len(sch.jobs); {
    248 		// Scope our own var
    249 		job := sch.jobs[i]
    250 
    251 		// We know these jobs are ordered by .Next(), so as soon
    252 		// as we reach one with .Next() after now, we can return
    253 		if job.Next().After(now) {
    254 			return
    255 		}
    256 
    257 		// Pass to runner
    258 		sch.rgo(func() {
    259 			job.Run(now)
    260 		})
    261 
    262 		// Update the next call time
    263 		next := job.timing.Next(now)
    264 		job.next.Store(next)
    265 
    266 		if next.IsZero() {
    267 			// Zero time, this job is done and can be dropped
    268 			sch.jobs = append(sch.jobs[:i], sch.jobs[i+1:]...)
    269 			continue
    270 		}
    271 
    272 		// Iter
    273 		i++
    274 	}
    275 }
    276 
    277 // byNext is an implementation of sort.Interface to sort Jobs by their .Next() time.
    278 type byNext []*Job
    279 
    280 func (by byNext) Len() int {
    281 	return len(by)
    282 }
    283 
    284 func (by byNext) Less(i int, j int) bool {
    285 	return by[i].Next().Before(by[j].Next())
    286 }
    287 
    288 func (by byNext) Swap(i int, j int) {
    289 	by[i], by[j] = by[j], by[i]
    290 }