gtsocial-umbx

Unnamed repository; edit this file 'description' to name the repository.
Log | Files | Refs | README | LICENSE

service.go (4699B)


      1 package runners
      2 
      3 import (
      4 	"context"
      5 	"sync"
      6 )
      7 
      8 // Service provides a means of tracking a single long-running service, provided protected state
      9 // changes and preventing multiple instances running. Also providing service state information.
     10 type Service struct {
     11 	state uint32        // 0=stopped, 1=running, 2=stopping
     12 	mutex sync.Mutex    // mutex protects overall state changes
     13 	wait  sync.Mutex    // wait is used as a single-entity wait-group, only ever locked within 'mutex'
     14 	ctx   chan struct{} // ctx is the current context for running function (or nil if not running)
     15 }
     16 
     17 // Run will run the supplied function until completion, using given context to propagate cancel.
     18 // Immediately returns false if the Service is already running, and true after completed run.
     19 func (svc *Service) Run(fn func(context.Context)) bool {
     20 	// Attempt to start the svc
     21 	ctx, ok := svc.doStart()
     22 	if !ok {
     23 		return false
     24 	}
     25 
     26 	defer func() {
     27 		// unlock single wait
     28 		svc.wait.Unlock()
     29 
     30 		// ensure stopped
     31 		_ = svc.Stop()
     32 	}()
     33 
     34 	// Run with context.
     35 	fn(CancelCtx(ctx))
     36 
     37 	return true
     38 }
     39 
     40 // GoRun will run the supplied function until completion in a goroutine, using given context to
     41 // propagate cancel. Immediately returns boolean indicating success, or that service is already running.
     42 func (svc *Service) GoRun(fn func(context.Context)) bool {
     43 	// Attempt to start the svc
     44 	ctx, ok := svc.doStart()
     45 	if !ok {
     46 		return false
     47 	}
     48 
     49 	go func() {
     50 		defer func() {
     51 			// unlock single wait
     52 			svc.wait.Unlock()
     53 
     54 			// ensure stopped
     55 			_ = svc.Stop()
     56 		}()
     57 
     58 		// Run with context.
     59 		fn(CancelCtx(ctx))
     60 	}()
     61 
     62 	return true
     63 }
     64 
     65 // RunWait is functionally the same as .Run(), but blocks until the first instance of .Run() returns.
     66 func (svc *Service) RunWait(fn func(context.Context)) bool {
     67 	// Attempt to start the svc
     68 	ctx, ok := svc.doStart()
     69 	if !ok {
     70 		<-ctx // block
     71 		return false
     72 	}
     73 
     74 	defer func() {
     75 		// unlock single wait
     76 		svc.wait.Unlock()
     77 
     78 		// ensure stopped
     79 		_ = svc.Stop()
     80 	}()
     81 
     82 	// Run with context.
     83 	fn(CancelCtx(ctx))
     84 
     85 	return true
     86 }
     87 
     88 // Stop will attempt to stop the service, cancelling the running function's context. Immediately
     89 // returns false if not running, and true only after Service is fully stopped.
     90 func (svc *Service) Stop() bool {
     91 	// Attempt to stop the svc
     92 	ctx, ok := svc.doStop()
     93 	if !ok {
     94 		return false
     95 	}
     96 
     97 	defer func() {
     98 		// Get svc lock
     99 		svc.mutex.Lock()
    100 
    101 		// Wait until stopped
    102 		svc.wait.Lock()
    103 		svc.wait.Unlock()
    104 
    105 		// Reset the svc
    106 		svc.ctx = nil
    107 		svc.state = 0
    108 		svc.mutex.Unlock()
    109 	}()
    110 
    111 	// Cancel ctx
    112 	close(ctx)
    113 
    114 	return true
    115 }
    116 
    117 // While allows you to execute given function guaranteed within current
    118 // service state. Please note that this will hold the underlying service
    119 // state change mutex open while executing the function.
    120 func (svc *Service) While(fn func()) {
    121 	// Protect state change
    122 	svc.mutex.Lock()
    123 	defer svc.mutex.Unlock()
    124 
    125 	// Run
    126 	fn()
    127 }
    128 
    129 // doStart will safely set Service state to started, returning a ptr to this context insance.
    130 func (svc *Service) doStart() (chan struct{}, bool) {
    131 	// Protect startup
    132 	svc.mutex.Lock()
    133 
    134 	if svc.ctx == nil {
    135 		// this will only have been allocated
    136 		// if svc.Done() was already called.
    137 		svc.ctx = make(chan struct{})
    138 	}
    139 
    140 	// Take our own ptr
    141 	ctx := svc.ctx
    142 
    143 	if svc.state != 0 {
    144 		// State was not stopped.
    145 		svc.mutex.Unlock()
    146 		return ctx, false
    147 	}
    148 
    149 	// Set started.
    150 	svc.state = 1
    151 
    152 	// Start waiter.
    153 	svc.wait.Lock()
    154 
    155 	// Unlock and return
    156 	svc.mutex.Unlock()
    157 	return ctx, true
    158 }
    159 
    160 // doStop will safely set Service state to stopping, returning a ptr to this cancelfunc instance.
    161 func (svc *Service) doStop() (chan struct{}, bool) {
    162 	// Protect stop
    163 	svc.mutex.Lock()
    164 
    165 	if svc.state != 1 /* not started */ {
    166 		svc.mutex.Unlock()
    167 		return nil, false
    168 	}
    169 
    170 	// state stopping
    171 	svc.state = 2
    172 
    173 	// Take our own ptr
    174 	// and unlock state
    175 	ctx := svc.ctx
    176 	svc.mutex.Unlock()
    177 
    178 	return ctx, true
    179 }
    180 
    181 // Running returns if Service is running (i.e. state NOT stopped / stopping).
    182 func (svc *Service) Running() bool {
    183 	svc.mutex.Lock()
    184 	state := svc.state
    185 	svc.mutex.Unlock()
    186 	return (state == 1)
    187 }
    188 
    189 // Done returns a channel that's closed when Service.Stop() is called. It is
    190 // the same channel provided to the currently running service function.
    191 func (svc *Service) Done() <-chan struct{} {
    192 	var done <-chan struct{}
    193 
    194 	svc.mutex.Lock()
    195 	switch svc.state {
    196 	// stopped
    197 	case 0:
    198 		if svc.ctx == nil {
    199 			// here we create a new context so that the
    200 			// returned 'done' channel here will still
    201 			// be valid for when Service is next started.
    202 			svc.ctx = make(chan struct{})
    203 		}
    204 		done = svc.ctx
    205 
    206 	// started
    207 	case 1:
    208 		done = svc.ctx
    209 
    210 	// stopping
    211 	case 2:
    212 		done = svc.ctx
    213 	}
    214 	svc.mutex.Unlock()
    215 
    216 	return done
    217 }