gtsocial-umbx

Unnamed repository; edit this file 'description' to name the repository.
Log | Files | Refs | README | LICENSE

pool.go (8684B)


      1 package pools
      2 
      3 import (
      4 	"runtime"
      5 	"sync"
      6 	"sync/atomic"
      7 	"unsafe"
      8 )
      9 
     10 type Pool struct {
     11 	// New is used to instantiate new items
     12 	New func() interface{}
     13 
     14 	// Evict is called on evicted items during pool .Clean()
     15 	Evict func(interface{})
     16 
     17 	local    unsafe.Pointer // ptr to []_ppool
     18 	localSz  int64          // count of all elems in local
     19 	victim   unsafe.Pointer // ptr to []_ppool
     20 	victimSz int64          // count of all elems in victim
     21 	mutex    sync.Mutex     // mutex protects new cleanups, and new allocations of local
     22 }
     23 
     24 // Get attempts to fetch an item from the pool, failing that allocates with supplied .New() function
     25 func (p *Pool) Get() interface{} {
     26 	// Get local pool for proc
     27 	// (also pins proc)
     28 	pool, pid := p.pin()
     29 
     30 	if v := pool.getPrivate(); v != nil {
     31 		// local _ppool private elem acquired
     32 		runtime_procUnpin()
     33 		atomic.AddInt64(&p.localSz, -1)
     34 		return v
     35 	}
     36 
     37 	if v := pool.get(); v != nil {
     38 		// local _ppool queue elem acquired
     39 		runtime_procUnpin()
     40 		atomic.AddInt64(&p.localSz, -1)
     41 		return v
     42 	}
     43 
     44 	// Unpin before attempting slow
     45 	runtime_procUnpin()
     46 	if v := p.getSlow(pid); v != nil {
     47 		// note size decrementing
     48 		// is handled within p.getSlow()
     49 		// as we don't know if it came
     50 		// from the local or victim pools
     51 		return v
     52 	}
     53 
     54 	// Alloc new
     55 	return p.New()
     56 }
     57 
     58 // Put places supplied item in the proc local pool
     59 func (p *Pool) Put(v interface{}) {
     60 	// Don't store nil
     61 	if v == nil {
     62 		return
     63 	}
     64 
     65 	// Get proc local pool
     66 	// (also pins proc)
     67 	pool, _ := p.pin()
     68 
     69 	// first try private, then queue
     70 	if !pool.setPrivate(v) {
     71 		pool.put(v)
     72 	}
     73 	runtime_procUnpin()
     74 
     75 	// Increment local pool size
     76 	atomic.AddInt64(&p.localSz, 1)
     77 }
     78 
     79 // Clean will drop the current victim pools, move the current local pools to its
     80 // place and reset the local pools ptr in order to be regenerated
     81 func (p *Pool) Clean() {
     82 	p.mutex.Lock()
     83 
     84 	// victim becomes local, local becomes nil
     85 	localPtr := atomic.SwapPointer(&p.local, nil)
     86 	victimPtr := atomic.SwapPointer(&p.victim, localPtr)
     87 	localSz := atomic.SwapInt64(&p.localSz, 0)
     88 	atomic.StoreInt64(&p.victimSz, localSz)
     89 
     90 	var victim []ppool
     91 	if victimPtr != nil {
     92 		victim = *(*[]ppool)(victimPtr)
     93 	}
     94 
     95 	// drain each of the vict _ppool items
     96 	for i := 0; i < len(victim); i++ {
     97 		ppool := &victim[i]
     98 		ppool.evict(p.Evict)
     99 	}
    100 
    101 	p.mutex.Unlock()
    102 }
    103 
    104 // LocalSize returns the total number of elements in all the proc-local pools
    105 func (p *Pool) LocalSize() int64 {
    106 	return atomic.LoadInt64(&p.localSz)
    107 }
    108 
    109 // VictimSize returns the total number of elements in all the victim (old proc-local) pools
    110 func (p *Pool) VictimSize() int64 {
    111 	return atomic.LoadInt64(&p.victimSz)
    112 }
    113 
    114 // getSlow is the slow path for fetching an element, attempting to steal from other proc's
    115 // local pools, and failing that, from the aging-out victim pools. pid is still passed so
    116 // not all procs start iterating from the same index
    117 func (p *Pool) getSlow(pid int) interface{} {
    118 	// get local pools
    119 	local := p.localPools()
    120 
    121 	// Try to steal from other proc locals
    122 	for i := 0; i < len(local); i++ {
    123 		pool := &local[(pid+i+1)%len(local)]
    124 		if v := pool.get(); v != nil {
    125 			atomic.AddInt64(&p.localSz, -1)
    126 			return v
    127 		}
    128 	}
    129 
    130 	// get victim pools
    131 	victim := p.victimPools()
    132 
    133 	// Attempt to steal from victim pools
    134 	for i := 0; i < len(victim); i++ {
    135 		pool := &victim[(pid+i+1)%len(victim)]
    136 		if v := pool.get(); v != nil {
    137 			atomic.AddInt64(&p.victimSz, -1)
    138 			return v
    139 		}
    140 	}
    141 
    142 	// Set victim pools to nil (none found)
    143 	atomic.StorePointer(&p.victim, nil)
    144 
    145 	return nil
    146 }
    147 
    148 // localPools safely loads slice of local _ppools
    149 func (p *Pool) localPools() []ppool {
    150 	local := atomic.LoadPointer(&p.local)
    151 	if local == nil {
    152 		return nil
    153 	}
    154 	return *(*[]ppool)(local)
    155 }
    156 
    157 // victimPools safely loads slice of victim _ppools
    158 func (p *Pool) victimPools() []ppool {
    159 	victim := atomic.LoadPointer(&p.victim)
    160 	if victim == nil {
    161 		return nil
    162 	}
    163 	return *(*[]ppool)(victim)
    164 }
    165 
    166 // pin will get fetch pin proc to PID, fetch proc-local _ppool and current PID we're pinned to
    167 func (p *Pool) pin() (*ppool, int) {
    168 	for {
    169 		// get local pools
    170 		local := p.localPools()
    171 
    172 		if len(local) > 0 {
    173 			// local already initialized
    174 
    175 			// pin to current proc
    176 			pid := runtime_procPin()
    177 
    178 			// check for pid local pool
    179 			if pid < len(local) {
    180 				return &local[pid], pid
    181 			}
    182 
    183 			// unpin from proc
    184 			runtime_procUnpin()
    185 		} else {
    186 			// local not yet initialized
    187 
    188 			// Check functions are set
    189 			if p.New == nil {
    190 				panic("new func must not be nil")
    191 			}
    192 			if p.Evict == nil {
    193 				panic("evict func must not be nil")
    194 			}
    195 		}
    196 
    197 		// allocate local
    198 		p.allocLocal()
    199 	}
    200 }
    201 
    202 // allocLocal allocates a new local pool slice, with the old length passed to check
    203 // if pool was previously nil, or whether a change in GOMAXPROCS occurred
    204 func (p *Pool) allocLocal() {
    205 	// get pool lock
    206 	p.mutex.Lock()
    207 
    208 	// Calculate new size to use
    209 	size := runtime.GOMAXPROCS(0)
    210 
    211 	local := p.localPools()
    212 	if len(local) != size {
    213 		// GOMAXPROCS changed, reallocate
    214 		pools := make([]ppool, size)
    215 		atomic.StorePointer(&p.local, unsafe.Pointer(&pools))
    216 
    217 		// Evict old local elements
    218 		for i := 0; i < len(local); i++ {
    219 			pool := &local[i]
    220 			pool.evict(p.Evict)
    221 		}
    222 	}
    223 
    224 	// Unlock pool
    225 	p.mutex.Unlock()
    226 }
    227 
    228 // _ppool is a proc local pool
    229 type _ppool struct {
    230 	// root is the root element of the _ppool queue,
    231 	// and protects concurrent access to the queue
    232 	root unsafe.Pointer
    233 
    234 	// private is a proc private member accessible
    235 	// only to the pid this _ppool is assigned to,
    236 	// except during evict (hence the unsafe pointer)
    237 	private unsafe.Pointer
    238 }
    239 
    240 // ppool wraps _ppool with pad.
    241 type ppool struct {
    242 	_ppool
    243 
    244 	// Prevents false sharing on widespread platforms with
    245 	// 128 mod (cache line size) = 0 .
    246 	pad [128 - unsafe.Sizeof(_ppool{})%128]byte
    247 }
    248 
    249 // getPrivate gets the proc private member
    250 func (pp *_ppool) getPrivate() interface{} {
    251 	ptr := atomic.SwapPointer(&pp.private, nil)
    252 	if ptr == nil {
    253 		return nil
    254 	}
    255 	return *(*interface{})(ptr)
    256 }
    257 
    258 // setPrivate sets the proc private member (only if unset)
    259 func (pp *_ppool) setPrivate(v interface{}) bool {
    260 	return atomic.CompareAndSwapPointer(&pp.private, nil, unsafe.Pointer(&v))
    261 }
    262 
    263 // get fetches an element from the queue
    264 func (pp *_ppool) get() interface{} {
    265 	for {
    266 		// Attempt to load root elem
    267 		root := atomic.LoadPointer(&pp.root)
    268 		if root == nil {
    269 			return nil
    270 		}
    271 
    272 		// Attempt to consume root elem
    273 		if root == inUsePtr ||
    274 			!atomic.CompareAndSwapPointer(&pp.root, root, inUsePtr) {
    275 			continue
    276 		}
    277 
    278 		// Root becomes next in chain
    279 		e := (*elem)(root)
    280 		v := e.value
    281 
    282 		// Place new root back in the chain
    283 		atomic.StorePointer(&pp.root, unsafe.Pointer(e.next))
    284 		putElem(e)
    285 
    286 		return v
    287 	}
    288 }
    289 
    290 // put places an element in the queue
    291 func (pp *_ppool) put(v interface{}) {
    292 	// Prepare next elem
    293 	e := getElem()
    294 	e.value = v
    295 
    296 	for {
    297 		// Attempt to load root elem
    298 		root := atomic.LoadPointer(&pp.root)
    299 		if root == inUsePtr {
    300 			continue
    301 		}
    302 
    303 		// Set the next elem value (might be nil)
    304 		e.next = (*elem)(root)
    305 
    306 		// Attempt to store this new value at root
    307 		if atomic.CompareAndSwapPointer(&pp.root, root, unsafe.Pointer(e)) {
    308 			break
    309 		}
    310 	}
    311 }
    312 
    313 // hook evicts all entries from pool, calling hook on each
    314 func (pp *_ppool) evict(hook func(interface{})) {
    315 	if v := pp.getPrivate(); v != nil {
    316 		hook(v)
    317 	}
    318 	for {
    319 		v := pp.get()
    320 		if v == nil {
    321 			break
    322 		}
    323 		hook(v)
    324 	}
    325 }
    326 
    327 // inUsePtr is a ptr used to indicate _ppool is in use
    328 var inUsePtr = unsafe.Pointer(&elem{
    329 	next:  nil,
    330 	value: "in_use",
    331 })
    332 
    333 // elem defines an element in the _ppool queue
    334 type elem struct {
    335 	next  *elem
    336 	value interface{}
    337 }
    338 
    339 // elemPool is a simple pool of unused elements
    340 var elemPool = struct {
    341 	root unsafe.Pointer
    342 }{}
    343 
    344 // getElem fetches a new elem from pool, or creates new
    345 func getElem() *elem {
    346 	// Attempt to load root elem
    347 	root := atomic.LoadPointer(&elemPool.root)
    348 	if root == nil {
    349 		return &elem{}
    350 	}
    351 
    352 	// Attempt to consume root elem
    353 	if root == inUsePtr ||
    354 		!atomic.CompareAndSwapPointer(&elemPool.root, root, inUsePtr) {
    355 		return &elem{}
    356 	}
    357 
    358 	// Root becomes next in chain
    359 	e := (*elem)(root)
    360 	atomic.StorePointer(&elemPool.root, unsafe.Pointer(e.next))
    361 	e.next = nil
    362 
    363 	return e
    364 }
    365 
    366 // putElem will place element in the pool
    367 func putElem(e *elem) {
    368 	e.value = nil
    369 
    370 	// Attempt to load root elem
    371 	root := atomic.LoadPointer(&elemPool.root)
    372 	if root == inUsePtr {
    373 		return // drop
    374 	}
    375 
    376 	// Set the next elem value (might be nil)
    377 	e.next = (*elem)(root)
    378 
    379 	// Attempt to store this new value at root
    380 	atomic.CompareAndSwapPointer(&elemPool.root, root, unsafe.Pointer(e))
    381 }
    382 
    383 //go:linkname runtime_procPin sync.runtime_procPin
    384 func runtime_procPin() int
    385 
    386 //go:linkname runtime_procUnpin sync.runtime_procUnpin
    387 func runtime_procUnpin()