pool.go (6126B)
1 package runners 2 3 import ( 4 "context" 5 "fmt" 6 "os" 7 "runtime" 8 "sync" 9 10 "codeberg.org/gruf/go-errors/v2" 11 ) 12 13 // WorkerFunc represents a function processable by a worker in WorkerPool. Note 14 // that implementations absolutely MUST check whether passed context is <-ctx.Done() 15 // otherwise stopping the pool may block indefinitely. 16 type WorkerFunc func(context.Context) 17 18 // WorkerPool provides a means of enqueuing asynchronous work. 19 type WorkerPool struct { 20 fns chan WorkerFunc 21 svc Service 22 } 23 24 // Start will start the main WorkerPool management loop in a new goroutine, along 25 // with requested number of child worker goroutines. Returns false if already running. 26 func (pool *WorkerPool) Start(workers int, queue int) bool { 27 // Attempt to start the svc 28 ctx, ok := pool.svc.doStart() 29 if !ok { 30 return false 31 } 32 33 if workers <= 0 { 34 // Use $GOMAXPROCS as default. 35 workers = runtime.GOMAXPROCS(0) 36 } 37 38 if queue < 0 { 39 // Use reasonable queue default. 40 queue = workers * 10 41 } 42 43 // Allocate pool queue of given size. 44 // 45 // This MUST be set BEFORE we return and NOT in 46 // the launched goroutine, or there is a risk that 47 // the pool may appear as closed for a short time 48 // until the main goroutine has been entered. 49 fns := make(chan WorkerFunc, queue) 50 pool.fns = fns 51 52 go func() { 53 defer func() { 54 // unlock single wait 55 pool.svc.wait.Unlock() 56 57 // ensure stopped 58 pool.svc.Stop() 59 }() 60 61 var wait sync.WaitGroup 62 63 // Start goroutine worker functions 64 for i := 0; i < workers; i++ { 65 wait.Add(1) 66 67 go func() { 68 defer wait.Done() 69 70 // Run worker function (retry on panic) 71 for !worker_run(CancelCtx(ctx), fns) { 72 } 73 }() 74 } 75 76 // Wait on ctx 77 <-ctx 78 79 // Drain function queue. 80 // 81 // All functions in the queue MUST be 82 // run, so we pass them a closed context. 83 // 84 // This mainly allows us to block until 85 // the function queue is empty, as worker 86 // functions will also continue draining in 87 // the background with the (now) closed ctx. 88 for !drain_queue(fns) { 89 // retry on panic 90 } 91 92 // Now the queue is empty, we can 93 // safely close the channel signalling 94 // all of the workers to return. 95 close(fns) 96 wait.Wait() 97 }() 98 99 return true 100 } 101 102 // Stop will stop the WorkerPool management loop, blocking until stopped. 103 func (pool *WorkerPool) Stop() bool { 104 return pool.svc.Stop() 105 } 106 107 // Running returns if WorkerPool management loop is running (i.e. NOT stopped / stopping). 108 func (pool *WorkerPool) Running() bool { 109 return pool.svc.Running() 110 } 111 112 // Done returns a channel that's closed when WorkerPool.Stop() is called. It is the same channel provided to the currently running worker functions. 113 func (pool *WorkerPool) Done() <-chan struct{} { 114 return pool.svc.Done() 115 } 116 117 // Enqueue will add provided WorkerFunc to the queue to be performed when there is a free worker. 118 // This will block until function is queued or pool is stopped. In all cases, the WorkerFunc will be 119 // executed, with the state of the pool being indicated by <-ctx.Done() of the passed ctx. 120 // WorkerFuncs MUST respect the passed context. 121 func (pool *WorkerPool) Enqueue(fn WorkerFunc) { 122 // Check valid fn 123 if fn == nil { 124 return 125 } 126 127 select { 128 // Pool ctx cancelled 129 case <-pool.svc.Done(): 130 fn(closedctx) 131 132 // Placed fn in queue 133 case pool.fns <- fn: 134 } 135 } 136 137 // EnqueueCtx is functionally identical to WorkerPool.Enqueue() but returns early in the 138 // case that caller provided <-ctx.Done() is closed, WITHOUT running the WorkerFunc. 139 func (pool *WorkerPool) EnqueueCtx(ctx context.Context, fn WorkerFunc) bool { 140 // Check valid fn 141 if fn == nil { 142 return false 143 } 144 145 select { 146 // Caller ctx cancelled 147 case <-ctx.Done(): 148 return false 149 150 // Pool ctx cancelled 151 case <-pool.svc.Done(): 152 return false 153 154 // Placed fn in queue 155 case pool.fns <- fn: 156 return true 157 } 158 } 159 160 // MustEnqueueCtx functionally performs similarly to WorkerPool.EnqueueCtx(), but in the case 161 // that the provided <-ctx.Done() is closed, it is passed asynchronously to WorkerPool.Enqueue(). 162 // Return boolean indicates whether function was executed in time before <-ctx.Done() is closed. 163 func (pool *WorkerPool) MustEnqueueCtx(ctx context.Context, fn WorkerFunc) (ok bool) { 164 // Check valid fn 165 if fn == nil { 166 return false 167 } 168 169 select { 170 case <-ctx.Done(): 171 // We failed to add this entry to the worker queue before the 172 // incoming context was cancelled. So to ensure processing 173 // we simply queue it asynchronously and return early to caller. 174 go pool.Enqueue(fn) 175 return false 176 177 case <-pool.svc.Done(): 178 // Pool ctx cancelled 179 fn(closedctx) 180 return false 181 182 case pool.fns <- fn: 183 // Placed fn in queue 184 return true 185 } 186 } 187 188 // EnqueueNow attempts Enqueue but returns false if not executed. 189 func (pool *WorkerPool) EnqueueNow(fn WorkerFunc) bool { 190 // Check valid fn 191 if fn == nil { 192 return false 193 } 194 195 select { 196 // Pool ctx cancelled 197 case <-pool.svc.Done(): 198 return false 199 200 // Placed fn in queue 201 case pool.fns <- fn: 202 return true 203 204 // Queue is full 205 default: 206 return false 207 } 208 } 209 210 // Queue returns the number of currently queued WorkerFuncs. 211 func (pool *WorkerPool) Queue() int { 212 var l int 213 pool.svc.While(func() { 214 l = len(pool.fns) 215 }) 216 return l 217 } 218 219 // worker_run is the main worker routine, accepting functions from 'fns' until it is closed. 220 func worker_run(ctx context.Context, fns <-chan WorkerFunc) bool { 221 defer func() { 222 // Recover and drop any panic 223 if r := recover(); r != nil { 224 const msg = "worker_run: recovered panic: %v\n\n%s\n" 225 fmt.Fprintf(os.Stderr, msg, r, errors.GetCallers(2, 10)) 226 } 227 }() 228 229 for { 230 // Wait on next func 231 fn, ok := <-fns 232 if !ok { 233 return true 234 } 235 236 // Run with ctx 237 fn(ctx) 238 } 239 } 240 241 // drain_queue will drain and run all functions in worker queue, passing in a closed context. 242 func drain_queue(fns <-chan WorkerFunc) bool { 243 defer func() { 244 // Recover and drop any panic 245 if r := recover(); r != nil { 246 const msg = "drain_queue: recovered panic: %v\n\n%s\n" 247 fmt.Fprintf(os.Stderr, msg, r, errors.GetCallers(2, 10)) 248 } 249 }() 250 251 for { 252 select { 253 // Run with closed ctx 254 case fn := <-fns: 255 fn(closedctx) 256 257 // Queue is empty 258 default: 259 return true 260 } 261 } 262 }