gtsocial-umbx

Unnamed repository; edit this file 'description' to name the repository.
Log | Files | Refs | README | LICENSE

pool.go (6126B)


      1 package runners
      2 
      3 import (
      4 	"context"
      5 	"fmt"
      6 	"os"
      7 	"runtime"
      8 	"sync"
      9 
     10 	"codeberg.org/gruf/go-errors/v2"
     11 )
     12 
     13 // WorkerFunc represents a function processable by a worker in WorkerPool. Note
     14 // that implementations absolutely MUST check whether passed context is <-ctx.Done()
     15 // otherwise stopping the pool may block indefinitely.
     16 type WorkerFunc func(context.Context)
     17 
     18 // WorkerPool provides a means of enqueuing asynchronous work.
     19 type WorkerPool struct {
     20 	fns chan WorkerFunc
     21 	svc Service
     22 }
     23 
     24 // Start will start the main WorkerPool management loop in a new goroutine, along
     25 // with requested number of child worker goroutines. Returns false if already running.
     26 func (pool *WorkerPool) Start(workers int, queue int) bool {
     27 	// Attempt to start the svc
     28 	ctx, ok := pool.svc.doStart()
     29 	if !ok {
     30 		return false
     31 	}
     32 
     33 	if workers <= 0 {
     34 		// Use $GOMAXPROCS as default.
     35 		workers = runtime.GOMAXPROCS(0)
     36 	}
     37 
     38 	if queue < 0 {
     39 		// Use reasonable queue default.
     40 		queue = workers * 10
     41 	}
     42 
     43 	// Allocate pool queue of given size.
     44 	//
     45 	// This MUST be set BEFORE we return and NOT in
     46 	// the launched goroutine, or there is a risk that
     47 	// the pool may appear as closed for a short time
     48 	// until the main goroutine has been entered.
     49 	fns := make(chan WorkerFunc, queue)
     50 	pool.fns = fns
     51 
     52 	go func() {
     53 		defer func() {
     54 			// unlock single wait
     55 			pool.svc.wait.Unlock()
     56 
     57 			// ensure stopped
     58 			pool.svc.Stop()
     59 		}()
     60 
     61 		var wait sync.WaitGroup
     62 
     63 		// Start goroutine worker functions
     64 		for i := 0; i < workers; i++ {
     65 			wait.Add(1)
     66 
     67 			go func() {
     68 				defer wait.Done()
     69 
     70 				// Run worker function (retry on panic)
     71 				for !worker_run(CancelCtx(ctx), fns) {
     72 				}
     73 			}()
     74 		}
     75 
     76 		// Wait on ctx
     77 		<-ctx
     78 
     79 		// Drain function queue.
     80 		//
     81 		// All functions in the queue MUST be
     82 		// run, so we pass them a closed context.
     83 		//
     84 		// This mainly allows us to block until
     85 		// the function queue is empty, as worker
     86 		// functions will also continue draining in
     87 		// the background with the (now) closed ctx.
     88 		for !drain_queue(fns) {
     89 			// retry on panic
     90 		}
     91 
     92 		// Now the queue is empty, we can
     93 		// safely close the channel signalling
     94 		// all of the workers to return.
     95 		close(fns)
     96 		wait.Wait()
     97 	}()
     98 
     99 	return true
    100 }
    101 
    102 // Stop will stop the WorkerPool management loop, blocking until stopped.
    103 func (pool *WorkerPool) Stop() bool {
    104 	return pool.svc.Stop()
    105 }
    106 
    107 // Running returns if WorkerPool management loop is running (i.e. NOT stopped / stopping).
    108 func (pool *WorkerPool) Running() bool {
    109 	return pool.svc.Running()
    110 }
    111 
    112 // Done returns a channel that's closed when WorkerPool.Stop() is called. It is the same channel provided to the currently running worker functions.
    113 func (pool *WorkerPool) Done() <-chan struct{} {
    114 	return pool.svc.Done()
    115 }
    116 
    117 // Enqueue will add provided WorkerFunc to the queue to be performed when there is a free worker.
    118 // This will block until function is queued or pool is stopped. In all cases, the WorkerFunc will be
    119 // executed, with the state of the pool being indicated by <-ctx.Done() of the passed ctx.
    120 // WorkerFuncs MUST respect the passed context.
    121 func (pool *WorkerPool) Enqueue(fn WorkerFunc) {
    122 	// Check valid fn
    123 	if fn == nil {
    124 		return
    125 	}
    126 
    127 	select {
    128 	// Pool ctx cancelled
    129 	case <-pool.svc.Done():
    130 		fn(closedctx)
    131 
    132 	// Placed fn in queue
    133 	case pool.fns <- fn:
    134 	}
    135 }
    136 
    137 // EnqueueCtx is functionally identical to WorkerPool.Enqueue() but returns early in the
    138 // case that caller provided <-ctx.Done() is closed, WITHOUT running the WorkerFunc.
    139 func (pool *WorkerPool) EnqueueCtx(ctx context.Context, fn WorkerFunc) bool {
    140 	// Check valid fn
    141 	if fn == nil {
    142 		return false
    143 	}
    144 
    145 	select {
    146 	// Caller ctx cancelled
    147 	case <-ctx.Done():
    148 		return false
    149 
    150 	// Pool ctx cancelled
    151 	case <-pool.svc.Done():
    152 		return false
    153 
    154 	// Placed fn in queue
    155 	case pool.fns <- fn:
    156 		return true
    157 	}
    158 }
    159 
    160 // MustEnqueueCtx functionally performs similarly to WorkerPool.EnqueueCtx(), but in the case
    161 // that the provided <-ctx.Done() is closed, it is passed asynchronously to WorkerPool.Enqueue().
    162 // Return boolean indicates whether function was executed in time before <-ctx.Done() is closed.
    163 func (pool *WorkerPool) MustEnqueueCtx(ctx context.Context, fn WorkerFunc) (ok bool) {
    164 	// Check valid fn
    165 	if fn == nil {
    166 		return false
    167 	}
    168 
    169 	select {
    170 	case <-ctx.Done():
    171 		// We failed to add this entry to the worker queue before the
    172 		// incoming context was cancelled. So to ensure processing
    173 		// we simply queue it asynchronously and return early to caller.
    174 		go pool.Enqueue(fn)
    175 		return false
    176 
    177 	case <-pool.svc.Done():
    178 		// Pool ctx cancelled
    179 		fn(closedctx)
    180 		return false
    181 
    182 	case pool.fns <- fn:
    183 		// Placed fn in queue
    184 		return true
    185 	}
    186 }
    187 
    188 // EnqueueNow attempts Enqueue but returns false if not executed.
    189 func (pool *WorkerPool) EnqueueNow(fn WorkerFunc) bool {
    190 	// Check valid fn
    191 	if fn == nil {
    192 		return false
    193 	}
    194 
    195 	select {
    196 	// Pool ctx cancelled
    197 	case <-pool.svc.Done():
    198 		return false
    199 
    200 	// Placed fn in queue
    201 	case pool.fns <- fn:
    202 		return true
    203 
    204 	// Queue is full
    205 	default:
    206 		return false
    207 	}
    208 }
    209 
    210 // Queue returns the number of currently queued WorkerFuncs.
    211 func (pool *WorkerPool) Queue() int {
    212 	var l int
    213 	pool.svc.While(func() {
    214 		l = len(pool.fns)
    215 	})
    216 	return l
    217 }
    218 
    219 // worker_run is the main worker routine, accepting functions from 'fns' until it is closed.
    220 func worker_run(ctx context.Context, fns <-chan WorkerFunc) bool {
    221 	defer func() {
    222 		// Recover and drop any panic
    223 		if r := recover(); r != nil {
    224 			const msg = "worker_run: recovered panic: %v\n\n%s\n"
    225 			fmt.Fprintf(os.Stderr, msg, r, errors.GetCallers(2, 10))
    226 		}
    227 	}()
    228 
    229 	for {
    230 		// Wait on next func
    231 		fn, ok := <-fns
    232 		if !ok {
    233 			return true
    234 		}
    235 
    236 		// Run with ctx
    237 		fn(ctx)
    238 	}
    239 }
    240 
    241 // drain_queue will drain and run all functions in worker queue, passing in a closed context.
    242 func drain_queue(fns <-chan WorkerFunc) bool {
    243 	defer func() {
    244 		// Recover and drop any panic
    245 		if r := recover(); r != nil {
    246 			const msg = "drain_queue: recovered panic: %v\n\n%s\n"
    247 			fmt.Fprintf(os.Stderr, msg, r, errors.GetCallers(2, 10))
    248 		}
    249 	}()
    250 
    251 	for {
    252 		select {
    253 		// Run with closed ctx
    254 		case fn := <-fns:
    255 			fn(closedctx)
    256 
    257 		// Queue is empty
    258 		default:
    259 			return true
    260 		}
    261 	}
    262 }