service.go (4699B)
1 package runners 2 3 import ( 4 "context" 5 "sync" 6 ) 7 8 // Service provides a means of tracking a single long-running service, provided protected state 9 // changes and preventing multiple instances running. Also providing service state information. 10 type Service struct { 11 state uint32 // 0=stopped, 1=running, 2=stopping 12 mutex sync.Mutex // mutex protects overall state changes 13 wait sync.Mutex // wait is used as a single-entity wait-group, only ever locked within 'mutex' 14 ctx chan struct{} // ctx is the current context for running function (or nil if not running) 15 } 16 17 // Run will run the supplied function until completion, using given context to propagate cancel. 18 // Immediately returns false if the Service is already running, and true after completed run. 19 func (svc *Service) Run(fn func(context.Context)) bool { 20 // Attempt to start the svc 21 ctx, ok := svc.doStart() 22 if !ok { 23 return false 24 } 25 26 defer func() { 27 // unlock single wait 28 svc.wait.Unlock() 29 30 // ensure stopped 31 _ = svc.Stop() 32 }() 33 34 // Run with context. 35 fn(CancelCtx(ctx)) 36 37 return true 38 } 39 40 // GoRun will run the supplied function until completion in a goroutine, using given context to 41 // propagate cancel. Immediately returns boolean indicating success, or that service is already running. 42 func (svc *Service) GoRun(fn func(context.Context)) bool { 43 // Attempt to start the svc 44 ctx, ok := svc.doStart() 45 if !ok { 46 return false 47 } 48 49 go func() { 50 defer func() { 51 // unlock single wait 52 svc.wait.Unlock() 53 54 // ensure stopped 55 _ = svc.Stop() 56 }() 57 58 // Run with context. 59 fn(CancelCtx(ctx)) 60 }() 61 62 return true 63 } 64 65 // RunWait is functionally the same as .Run(), but blocks until the first instance of .Run() returns. 66 func (svc *Service) RunWait(fn func(context.Context)) bool { 67 // Attempt to start the svc 68 ctx, ok := svc.doStart() 69 if !ok { 70 <-ctx // block 71 return false 72 } 73 74 defer func() { 75 // unlock single wait 76 svc.wait.Unlock() 77 78 // ensure stopped 79 _ = svc.Stop() 80 }() 81 82 // Run with context. 83 fn(CancelCtx(ctx)) 84 85 return true 86 } 87 88 // Stop will attempt to stop the service, cancelling the running function's context. Immediately 89 // returns false if not running, and true only after Service is fully stopped. 90 func (svc *Service) Stop() bool { 91 // Attempt to stop the svc 92 ctx, ok := svc.doStop() 93 if !ok { 94 return false 95 } 96 97 defer func() { 98 // Get svc lock 99 svc.mutex.Lock() 100 101 // Wait until stopped 102 svc.wait.Lock() 103 svc.wait.Unlock() 104 105 // Reset the svc 106 svc.ctx = nil 107 svc.state = 0 108 svc.mutex.Unlock() 109 }() 110 111 // Cancel ctx 112 close(ctx) 113 114 return true 115 } 116 117 // While allows you to execute given function guaranteed within current 118 // service state. Please note that this will hold the underlying service 119 // state change mutex open while executing the function. 120 func (svc *Service) While(fn func()) { 121 // Protect state change 122 svc.mutex.Lock() 123 defer svc.mutex.Unlock() 124 125 // Run 126 fn() 127 } 128 129 // doStart will safely set Service state to started, returning a ptr to this context insance. 130 func (svc *Service) doStart() (chan struct{}, bool) { 131 // Protect startup 132 svc.mutex.Lock() 133 134 if svc.ctx == nil { 135 // this will only have been allocated 136 // if svc.Done() was already called. 137 svc.ctx = make(chan struct{}) 138 } 139 140 // Take our own ptr 141 ctx := svc.ctx 142 143 if svc.state != 0 { 144 // State was not stopped. 145 svc.mutex.Unlock() 146 return ctx, false 147 } 148 149 // Set started. 150 svc.state = 1 151 152 // Start waiter. 153 svc.wait.Lock() 154 155 // Unlock and return 156 svc.mutex.Unlock() 157 return ctx, true 158 } 159 160 // doStop will safely set Service state to stopping, returning a ptr to this cancelfunc instance. 161 func (svc *Service) doStop() (chan struct{}, bool) { 162 // Protect stop 163 svc.mutex.Lock() 164 165 if svc.state != 1 /* not started */ { 166 svc.mutex.Unlock() 167 return nil, false 168 } 169 170 // state stopping 171 svc.state = 2 172 173 // Take our own ptr 174 // and unlock state 175 ctx := svc.ctx 176 svc.mutex.Unlock() 177 178 return ctx, true 179 } 180 181 // Running returns if Service is running (i.e. state NOT stopped / stopping). 182 func (svc *Service) Running() bool { 183 svc.mutex.Lock() 184 state := svc.state 185 svc.mutex.Unlock() 186 return (state == 1) 187 } 188 189 // Done returns a channel that's closed when Service.Stop() is called. It is 190 // the same channel provided to the currently running service function. 191 func (svc *Service) Done() <-chan struct{} { 192 var done <-chan struct{} 193 194 svc.mutex.Lock() 195 switch svc.state { 196 // stopped 197 case 0: 198 if svc.ctx == nil { 199 // here we create a new context so that the 200 // returned 'done' channel here will still 201 // be valid for when Service is next started. 202 svc.ctx = make(chan struct{}) 203 } 204 done = svc.ctx 205 206 // started 207 case 1: 208 done = svc.ctx 209 210 // stopping 211 case 2: 212 done = svc.ctx 213 } 214 svc.mutex.Unlock() 215 216 return done 217 }