process.go (1791B)
1 package runners 2 3 import ( 4 "fmt" 5 "sync" 6 ) 7 8 // Processable defines a runnable process with error return 9 // that can be passed to a Processor instance for managed running. 10 type Processable func() error 11 12 // Processor acts similarly to a sync.Once object, except that it is reusable. After 13 // the first call to Process(), any further calls before this first has returned will 14 // block until the first call has returned, and return the same error. This ensures 15 // that only a single instance of it is ever running at any one time. 16 type Processor struct { 17 mutex sync.Mutex 18 wait *sync.WaitGroup 19 err *error 20 } 21 22 // Process will process the given function if first-call, else blocking until 23 // the first function has returned, returning the same error result. 24 func (p *Processor) Process(proc Processable) (err error) { 25 // Acquire state lock. 26 p.mutex.Lock() 27 28 if p.wait != nil { 29 // Already running. 30 // 31 // Get current ptrs. 32 waitPtr := p.wait 33 errPtr := p.err 34 35 // Free state lock. 36 p.mutex.Unlock() 37 38 // Wait for finish. 39 waitPtr.Wait() 40 return *errPtr 41 } 42 43 // Alloc waiter for new process. 44 var wait sync.WaitGroup 45 46 // No need to alloc new error as 47 // we use the alloc'd named error 48 // return required for panic handling. 49 50 // Reset ptrs. 51 p.wait = &wait 52 p.err = &err 53 54 // Set started. 55 wait.Add(1) 56 p.mutex.Unlock() 57 58 defer func() { 59 if r := recover(); r != nil { 60 if err != nil { 61 rOld := r // wrap the panic so we don't lose existing returned error 62 r = fmt.Errorf("panic occured after error %q: %v", err.Error(), rOld) 63 } 64 65 // Catch any panics and wrap as error. 66 err = fmt.Errorf("caught panic: %v", r) 67 } 68 69 // Mark done. 70 wait.Done() 71 72 // Set stopped. 73 p.mutex.Lock() 74 p.wait = nil 75 p.mutex.Unlock() 76 }() 77 78 // Run process. 79 err = proc() 80 return 81 }