scheduler.go (6813B)
1 package sched 2 3 import ( 4 "context" 5 "sort" 6 "sync" 7 "sync/atomic" 8 "time" 9 10 "codeberg.org/gruf/go-runners" 11 ) 12 13 // precision is the maximum time we can offer scheduler run-time precision down to. 14 const precision = time.Millisecond 15 16 var ( 17 // neverticks is a timer channel that never ticks (it's starved). 18 neverticks = make(chan time.Time) 19 20 // alwaysticks is a timer channel that always ticks (it's closed). 21 alwaysticks = func() chan time.Time { 22 ch := make(chan time.Time) 23 close(ch) 24 return ch 25 }() 26 ) 27 28 // Scheduler provides a means of running jobs at specific times and 29 // regular intervals, all while sharing a single underlying timer. 30 type Scheduler struct { 31 jobs []*Job // jobs is a list of tracked Jobs to be executed 32 jch chan interface{} // jch accepts either Jobs or job IDs to notify new/removed jobs 33 svc runners.Service // svc manages the main scheduler routine 34 jid atomic.Uint64 // jid is used to iteratively generate unique IDs for jobs 35 rgo func(func()) // goroutine runner, allows using goroutine pool to launch jobs 36 } 37 38 // Start will attempt to start the Scheduler. Immediately returns false if the Service is already running, and true after completed run. 39 func (sch *Scheduler) Start(gorun func(func())) bool { 40 var block sync.Mutex 41 42 // Use mutex to synchronize between started 43 // goroutine and ourselves, to ensure that 44 // we don't return before Scheduler init'd. 45 block.Lock() 46 defer block.Unlock() 47 48 ok := sch.svc.GoRun(func(ctx context.Context) { 49 // Create Scheduler job channel 50 sch.jch = make(chan interface{}) 51 52 // Set goroutine runner function 53 if sch.rgo = gorun; sch.rgo == nil { 54 sch.rgo = func(f func()) { go f() } 55 } 56 57 // Unlock start routine 58 block.Unlock() 59 60 // Enter main loop 61 sch.run(ctx) 62 }) 63 64 if ok { 65 // Wait on goroutine 66 block.Lock() 67 } 68 69 return ok 70 } 71 72 // Stop will attempt to stop the Scheduler. Immediately returns false if not running, and true only after Scheduler is fully stopped. 73 func (sch *Scheduler) Stop() bool { 74 return sch.svc.Stop() 75 } 76 77 // Running will return whether Scheduler is running (i.e. NOT stopped / stopping). 78 func (sch *Scheduler) Running() bool { 79 return sch.svc.Running() 80 } 81 82 // Done returns a channel that's closed when Scheduler.Stop() is called. 83 func (sch *Scheduler) Done() <-chan struct{} { 84 return sch.svc.Done() 85 } 86 87 // Schedule will add provided Job to the Scheduler, returning a cancel function. 88 func (sch *Scheduler) Schedule(job *Job) (cancel func()) { 89 switch { 90 // Check a job was passed 91 case job == nil: 92 panic("nil job") 93 94 // Check we are running 95 case !sch.Running(): 96 panic("scheduler not running") 97 } 98 99 // Calculate next job ID 100 last := sch.jid.Load() 101 next := sch.jid.Add(1) 102 if next < last { 103 panic("job id overflow") 104 } 105 106 // Pass job to scheduler 107 job.id = next 108 sch.jch <- job 109 110 // Take ptrs to current state chs 111 ctx := sch.svc.Done() 112 jch := sch.jch 113 114 // Return cancel function for job ID 115 return func() { 116 select { 117 // Sched stopped 118 case <-ctx: 119 120 // Cancel this job 121 case jch <- next: 122 } 123 } 124 } 125 126 // run is the main scheduler run routine, which runs for as long as ctx is valid. 127 func (sch *Scheduler) run(ctx context.Context) { 128 var ( 129 // now stores the current time, and will only be 130 // set when the timer channel is set to be the 131 // 'alwaysticks' channel. this allows minimizing 132 // the number of calls required to time.Now(). 133 now time.Time 134 135 // timerset represents whether timer was running 136 // for a particular run of the loop. false means 137 // that tch == neverticks || tch == alwaysticks. 138 timerset bool 139 140 // timer tick channel (or always / never ticks). 141 tch <-chan time.Time 142 143 // timer notifies this main routine to wake when 144 // the job queued needs to be checked for executions. 145 timer *time.Timer 146 147 // stopdrain will stop and drain the timer 148 // if it has been running (i.e. timerset == true). 149 stopdrain = func() { 150 if timerset && !timer.Stop() { 151 <-timer.C 152 } 153 } 154 ) 155 156 // Create a stopped timer. 157 timer = time.NewTimer(1) 158 <-timer.C 159 160 for { 161 // Reset timer state. 162 timerset = false 163 164 if len(sch.jobs) > 0 { 165 // Get now time. 166 now = time.Now() 167 168 // Sort jobs by next occurring. 169 sort.Sort(byNext(sch.jobs)) 170 171 // Get next job time. 172 next := sch.jobs[0].Next() 173 174 // If this job is _just_ about to be ready, we don't bother 175 // sleeping. It's wasted cycles only sleeping for some obscenely 176 // tiny amount of time we can't guarantee precision for. 177 if until := next.Sub(now); until <= precision/1e3 { 178 // This job is behind, 179 // set to always tick. 180 tch = alwaysticks 181 } else { 182 // Reset timer to period. 183 timer.Reset(until) 184 tch = timer.C 185 timerset = true 186 } 187 } else { 188 // Unset timer 189 tch = neverticks 190 } 191 192 select { 193 // Scheduler stopped 194 case <-ctx.Done(): 195 stopdrain() 196 return 197 198 // Timer ticked, run scheduled 199 case t := <-tch: 200 if !timerset { 201 // 'alwaysticks' returns zero 202 // times, BUT 'now' will have 203 // been set during above sort. 204 t = now 205 } 206 sch.schedule(t) 207 208 // Received update, handle job/id 209 case v := <-sch.jch: 210 sch.handle(v) 211 stopdrain() 212 } 213 } 214 } 215 216 // handle takes an interfaces received from Scheduler.jch and handles either: 217 // - Job --> new job to add. 218 // - uint64 --> job ID to remove. 219 func (sch *Scheduler) handle(v interface{}) { 220 switch v := v.(type) { 221 // New job added 222 case *Job: 223 // Get current time 224 now := time.Now() 225 226 // Update the next call time 227 next := v.timing.Next(now) 228 v.next.Store(next) 229 230 // Append this job to queued 231 sch.jobs = append(sch.jobs, v) 232 233 // Job removed 234 case uint64: 235 for i := 0; i < len(sch.jobs); i++ { 236 if sch.jobs[i].id == v { 237 // This is the job we're looking for! Drop this 238 sch.jobs = append(sch.jobs[:i], sch.jobs[i+1:]...) 239 return 240 } 241 } 242 } 243 } 244 245 // schedule will iterate through the scheduler jobs and execute those necessary, updating their next call time. 246 func (sch *Scheduler) schedule(now time.Time) { 247 for i := 0; i < len(sch.jobs); { 248 // Scope our own var 249 job := sch.jobs[i] 250 251 // We know these jobs are ordered by .Next(), so as soon 252 // as we reach one with .Next() after now, we can return 253 if job.Next().After(now) { 254 return 255 } 256 257 // Pass to runner 258 sch.rgo(func() { 259 job.Run(now) 260 }) 261 262 // Update the next call time 263 next := job.timing.Next(now) 264 job.next.Store(next) 265 266 if next.IsZero() { 267 // Zero time, this job is done and can be dropped 268 sch.jobs = append(sch.jobs[:i], sch.jobs[i+1:]...) 269 continue 270 } 271 272 // Iter 273 i++ 274 } 275 } 276 277 // byNext is an implementation of sort.Interface to sort Jobs by their .Next() time. 278 type byNext []*Job 279 280 func (by byNext) Len() int { 281 return len(by) 282 } 283 284 func (by byNext) Less(i int, j int) bool { 285 return by[i].Next().Before(by[j].Next()) 286 } 287 288 func (by byNext) Swap(i int, j int) { 289 by[i], by[j] = by[j], by[i] 290 }