job.go (3118B)
1 package sched 2 3 import ( 4 "reflect" 5 "strconv" 6 "strings" 7 "time" 8 9 "codeberg.org/gruf/go-atomics" 10 ) 11 12 // Job encapsulates logic for a scheduled job to be run according 13 // to a set Timing, executing the job with a set panic handler, and 14 // holding onto a next execution time safely in a concurrent environment. 15 type Job struct { 16 id uint64 17 next atomics.Time 18 timing Timing 19 call func(time.Time) 20 panic func(interface{}) 21 } 22 23 // NewJob returns a new Job to run given function. 24 func NewJob(fn func(now time.Time)) *Job { 25 if fn == nil { 26 // Ensure a function 27 panic("nil func") 28 } 29 30 j := &Job{ // set defaults 31 timing: emptytiming, // i.e. fire immediately 32 call: fn, 33 panic: func(i interface{}) { panic(i) }, 34 } 35 36 // Init next time ptr 37 j.next.Store(zerotime) 38 39 return j 40 } 41 42 // At sets this Job to execute at time, by passing (*sched.Once)(&at) to .With(). See .With() for details. 43 func (job *Job) At(at time.Time) *Job { 44 return job.With((*Once)(&at)) 45 } 46 47 // Every sets this Job to execute every period, by passing sched.Period(period) to .With(). See .With() for details. 48 func (job *Job) Every(period time.Duration) *Job { 49 return job.With(Periodic(period)) 50 } 51 52 // EveryAt sets this Job to execute every period starting at time, by passing &PeriodicAt{once: Once(at), period: Periodic(period)} to .With(). See .With() for details. 53 func (job *Job) EveryAt(at time.Time, period time.Duration) *Job { 54 return job.With(&PeriodicAt{Once: Once(at), Period: Periodic(period)}) 55 } 56 57 // With sets this Job's timing to given implementation, or if already set will wrap existing using sched.TimingWrap{}. 58 func (job *Job) With(t Timing) *Job { 59 if t == nil { 60 // Ensure a timing 61 panic("nil Timing") 62 } 63 64 if job.id != 0 { 65 // Cannot update scheduled job 66 panic("job already scheduled") 67 } 68 69 if job.timing == emptytiming { 70 // Set new timing 71 job.timing = t 72 } else { 73 // Wrap old timing 74 old := job.timing 75 job.timing = &TimingWrap{ 76 Outer: t, 77 Inner: old, 78 } 79 } 80 81 return job 82 } 83 84 // OnPanic specifies how this job handles panics, default is an actual panic. 85 func (job *Job) OnPanic(fn func(interface{})) *Job { 86 if fn == nil { 87 // Ensure a function 88 panic("nil func") 89 } 90 91 if job.id != 0 { 92 // Cannot update scheduled job 93 panic("job already scheduled") 94 } 95 96 job.panic = fn 97 return job 98 } 99 100 // Next returns the next time this Job is expected to run. 101 func (job *Job) Next() time.Time { 102 return job.next.Load() 103 } 104 105 // Run will execute this Job and pass through given now time. 106 func (job *Job) Run(now time.Time) { 107 defer func() { 108 if r := recover(); r != nil { 109 job.panic(r) 110 } 111 }() 112 job.call(now) 113 } 114 115 // String provides a debuggable string representation of Job including ID, next time and Timing type. 116 func (job *Job) String() string { 117 var buf strings.Builder 118 buf.WriteByte('{') 119 buf.WriteString("id=") 120 buf.WriteString(strconv.FormatUint(job.id, 10)) 121 buf.WriteByte(' ') 122 buf.WriteString("next=") 123 buf.WriteString(job.next.Load().Format(time.StampMicro)) 124 buf.WriteByte(' ') 125 buf.WriteString("timing=") 126 buf.WriteString(reflect.TypeOf(job.timing).String()) 127 buf.WriteByte('}') 128 return buf.String() 129 }