gtsocial-umbx

Unnamed repository; edit this file 'description' to name the repository.
Log | Files | Refs | README | LICENSE

unbounded_executor.go (3725B)


      1 package concurrent
      2 
      3 import (
      4 	"context"
      5 	"fmt"
      6 	"runtime"
      7 	"runtime/debug"
      8 	"sync"
      9 	"time"
     10 	"reflect"
     11 )
     12 
     13 // HandlePanic logs goroutine panic by default
     14 var HandlePanic = func(recovered interface{}, funcName string) {
     15 	ErrorLogger.Println(fmt.Sprintf("%s panic: %v", funcName, recovered))
     16 	ErrorLogger.Println(string(debug.Stack()))
     17 }
     18 
     19 // UnboundedExecutor is a executor without limits on counts of alive goroutines
     20 // it tracks the goroutine started by it, and can cancel them when shutdown
     21 type UnboundedExecutor struct {
     22 	ctx                   context.Context
     23 	cancel                context.CancelFunc
     24 	activeGoroutinesMutex *sync.Mutex
     25 	activeGoroutines      map[string]int
     26 	HandlePanic           func(recovered interface{}, funcName string)
     27 }
     28 
     29 // GlobalUnboundedExecutor has the life cycle of the program itself
     30 // any goroutine want to be shutdown before main exit can be started from this executor
     31 // GlobalUnboundedExecutor expects the main function to call stop
     32 // it does not magically knows the main function exits
     33 var GlobalUnboundedExecutor = NewUnboundedExecutor()
     34 
     35 // NewUnboundedExecutor creates a new UnboundedExecutor,
     36 // UnboundedExecutor can not be created by &UnboundedExecutor{}
     37 // HandlePanic can be set with a callback to override global HandlePanic
     38 func NewUnboundedExecutor() *UnboundedExecutor {
     39 	ctx, cancel := context.WithCancel(context.TODO())
     40 	return &UnboundedExecutor{
     41 		ctx:                   ctx,
     42 		cancel:                cancel,
     43 		activeGoroutinesMutex: &sync.Mutex{},
     44 		activeGoroutines:      map[string]int{},
     45 	}
     46 }
     47 
     48 // Go starts a new goroutine and tracks its lifecycle.
     49 // Panic will be recovered and logged automatically, except for StopSignal
     50 func (executor *UnboundedExecutor) Go(handler func(ctx context.Context)) {
     51 	pc := reflect.ValueOf(handler).Pointer()
     52 	f := runtime.FuncForPC(pc)
     53 	funcName := f.Name()
     54 	file, line := f.FileLine(pc)
     55 	executor.activeGoroutinesMutex.Lock()
     56 	defer executor.activeGoroutinesMutex.Unlock()
     57 	startFrom := fmt.Sprintf("%s:%d", file, line)
     58 	executor.activeGoroutines[startFrom] += 1
     59 	go func() {
     60 		defer func() {
     61 			recovered := recover()
     62 			// if you want to quit a goroutine without trigger HandlePanic
     63 			// use runtime.Goexit() to quit
     64 			if recovered != nil {
     65 				if executor.HandlePanic == nil {
     66 					HandlePanic(recovered, funcName)
     67 				} else {
     68 					executor.HandlePanic(recovered, funcName)
     69 				}
     70 			}
     71 			executor.activeGoroutinesMutex.Lock()
     72 			executor.activeGoroutines[startFrom] -= 1
     73 			executor.activeGoroutinesMutex.Unlock()
     74 		}()
     75 		handler(executor.ctx)
     76 	}()
     77 }
     78 
     79 // Stop cancel all goroutines started by this executor without wait
     80 func (executor *UnboundedExecutor) Stop() {
     81 	executor.cancel()
     82 }
     83 
     84 // StopAndWaitForever cancel all goroutines started by this executor and
     85 // wait until all goroutines exited
     86 func (executor *UnboundedExecutor) StopAndWaitForever() {
     87 	executor.StopAndWait(context.Background())
     88 }
     89 
     90 // StopAndWait cancel all goroutines started by this executor and wait.
     91 // Wait can be cancelled by the context passed in.
     92 func (executor *UnboundedExecutor) StopAndWait(ctx context.Context) {
     93 	executor.cancel()
     94 	for {
     95 		oneHundredMilliseconds := time.NewTimer(time.Millisecond * 100)
     96 		select {
     97 		case <-oneHundredMilliseconds.C:
     98 			if executor.checkNoActiveGoroutines() {
     99 				return
    100 			}
    101 		case <-ctx.Done():
    102 			return
    103 		}
    104 	}
    105 }
    106 
    107 func (executor *UnboundedExecutor) checkNoActiveGoroutines() bool {
    108 	executor.activeGoroutinesMutex.Lock()
    109 	defer executor.activeGoroutinesMutex.Unlock()
    110 	for startFrom, count := range executor.activeGoroutines {
    111 		if count > 0 {
    112 			InfoLogger.Println("UnboundedExecutor is still waiting goroutines to quit",
    113 				"startFrom", startFrom,
    114 				"count", count)
    115 			return false
    116 		}
    117 	}
    118 	return true
    119 }