unbounded_executor.go (3725B)
1 package concurrent 2 3 import ( 4 "context" 5 "fmt" 6 "runtime" 7 "runtime/debug" 8 "sync" 9 "time" 10 "reflect" 11 ) 12 13 // HandlePanic logs goroutine panic by default 14 var HandlePanic = func(recovered interface{}, funcName string) { 15 ErrorLogger.Println(fmt.Sprintf("%s panic: %v", funcName, recovered)) 16 ErrorLogger.Println(string(debug.Stack())) 17 } 18 19 // UnboundedExecutor is a executor without limits on counts of alive goroutines 20 // it tracks the goroutine started by it, and can cancel them when shutdown 21 type UnboundedExecutor struct { 22 ctx context.Context 23 cancel context.CancelFunc 24 activeGoroutinesMutex *sync.Mutex 25 activeGoroutines map[string]int 26 HandlePanic func(recovered interface{}, funcName string) 27 } 28 29 // GlobalUnboundedExecutor has the life cycle of the program itself 30 // any goroutine want to be shutdown before main exit can be started from this executor 31 // GlobalUnboundedExecutor expects the main function to call stop 32 // it does not magically knows the main function exits 33 var GlobalUnboundedExecutor = NewUnboundedExecutor() 34 35 // NewUnboundedExecutor creates a new UnboundedExecutor, 36 // UnboundedExecutor can not be created by &UnboundedExecutor{} 37 // HandlePanic can be set with a callback to override global HandlePanic 38 func NewUnboundedExecutor() *UnboundedExecutor { 39 ctx, cancel := context.WithCancel(context.TODO()) 40 return &UnboundedExecutor{ 41 ctx: ctx, 42 cancel: cancel, 43 activeGoroutinesMutex: &sync.Mutex{}, 44 activeGoroutines: map[string]int{}, 45 } 46 } 47 48 // Go starts a new goroutine and tracks its lifecycle. 49 // Panic will be recovered and logged automatically, except for StopSignal 50 func (executor *UnboundedExecutor) Go(handler func(ctx context.Context)) { 51 pc := reflect.ValueOf(handler).Pointer() 52 f := runtime.FuncForPC(pc) 53 funcName := f.Name() 54 file, line := f.FileLine(pc) 55 executor.activeGoroutinesMutex.Lock() 56 defer executor.activeGoroutinesMutex.Unlock() 57 startFrom := fmt.Sprintf("%s:%d", file, line) 58 executor.activeGoroutines[startFrom] += 1 59 go func() { 60 defer func() { 61 recovered := recover() 62 // if you want to quit a goroutine without trigger HandlePanic 63 // use runtime.Goexit() to quit 64 if recovered != nil { 65 if executor.HandlePanic == nil { 66 HandlePanic(recovered, funcName) 67 } else { 68 executor.HandlePanic(recovered, funcName) 69 } 70 } 71 executor.activeGoroutinesMutex.Lock() 72 executor.activeGoroutines[startFrom] -= 1 73 executor.activeGoroutinesMutex.Unlock() 74 }() 75 handler(executor.ctx) 76 }() 77 } 78 79 // Stop cancel all goroutines started by this executor without wait 80 func (executor *UnboundedExecutor) Stop() { 81 executor.cancel() 82 } 83 84 // StopAndWaitForever cancel all goroutines started by this executor and 85 // wait until all goroutines exited 86 func (executor *UnboundedExecutor) StopAndWaitForever() { 87 executor.StopAndWait(context.Background()) 88 } 89 90 // StopAndWait cancel all goroutines started by this executor and wait. 91 // Wait can be cancelled by the context passed in. 92 func (executor *UnboundedExecutor) StopAndWait(ctx context.Context) { 93 executor.cancel() 94 for { 95 oneHundredMilliseconds := time.NewTimer(time.Millisecond * 100) 96 select { 97 case <-oneHundredMilliseconds.C: 98 if executor.checkNoActiveGoroutines() { 99 return 100 } 101 case <-ctx.Done(): 102 return 103 } 104 } 105 } 106 107 func (executor *UnboundedExecutor) checkNoActiveGoroutines() bool { 108 executor.activeGoroutinesMutex.Lock() 109 defer executor.activeGoroutinesMutex.Unlock() 110 for startFrom, count := range executor.activeGoroutines { 111 if count > 0 { 112 InfoLogger.Println("UnboundedExecutor is still waiting goroutines to quit", 113 "startFrom", startFrom, 114 "count", count) 115 return false 116 } 117 } 118 return true 119 }