pool.go (8684B)
1 package pools 2 3 import ( 4 "runtime" 5 "sync" 6 "sync/atomic" 7 "unsafe" 8 ) 9 10 type Pool struct { 11 // New is used to instantiate new items 12 New func() interface{} 13 14 // Evict is called on evicted items during pool .Clean() 15 Evict func(interface{}) 16 17 local unsafe.Pointer // ptr to []_ppool 18 localSz int64 // count of all elems in local 19 victim unsafe.Pointer // ptr to []_ppool 20 victimSz int64 // count of all elems in victim 21 mutex sync.Mutex // mutex protects new cleanups, and new allocations of local 22 } 23 24 // Get attempts to fetch an item from the pool, failing that allocates with supplied .New() function 25 func (p *Pool) Get() interface{} { 26 // Get local pool for proc 27 // (also pins proc) 28 pool, pid := p.pin() 29 30 if v := pool.getPrivate(); v != nil { 31 // local _ppool private elem acquired 32 runtime_procUnpin() 33 atomic.AddInt64(&p.localSz, -1) 34 return v 35 } 36 37 if v := pool.get(); v != nil { 38 // local _ppool queue elem acquired 39 runtime_procUnpin() 40 atomic.AddInt64(&p.localSz, -1) 41 return v 42 } 43 44 // Unpin before attempting slow 45 runtime_procUnpin() 46 if v := p.getSlow(pid); v != nil { 47 // note size decrementing 48 // is handled within p.getSlow() 49 // as we don't know if it came 50 // from the local or victim pools 51 return v 52 } 53 54 // Alloc new 55 return p.New() 56 } 57 58 // Put places supplied item in the proc local pool 59 func (p *Pool) Put(v interface{}) { 60 // Don't store nil 61 if v == nil { 62 return 63 } 64 65 // Get proc local pool 66 // (also pins proc) 67 pool, _ := p.pin() 68 69 // first try private, then queue 70 if !pool.setPrivate(v) { 71 pool.put(v) 72 } 73 runtime_procUnpin() 74 75 // Increment local pool size 76 atomic.AddInt64(&p.localSz, 1) 77 } 78 79 // Clean will drop the current victim pools, move the current local pools to its 80 // place and reset the local pools ptr in order to be regenerated 81 func (p *Pool) Clean() { 82 p.mutex.Lock() 83 84 // victim becomes local, local becomes nil 85 localPtr := atomic.SwapPointer(&p.local, nil) 86 victimPtr := atomic.SwapPointer(&p.victim, localPtr) 87 localSz := atomic.SwapInt64(&p.localSz, 0) 88 atomic.StoreInt64(&p.victimSz, localSz) 89 90 var victim []ppool 91 if victimPtr != nil { 92 victim = *(*[]ppool)(victimPtr) 93 } 94 95 // drain each of the vict _ppool items 96 for i := 0; i < len(victim); i++ { 97 ppool := &victim[i] 98 ppool.evict(p.Evict) 99 } 100 101 p.mutex.Unlock() 102 } 103 104 // LocalSize returns the total number of elements in all the proc-local pools 105 func (p *Pool) LocalSize() int64 { 106 return atomic.LoadInt64(&p.localSz) 107 } 108 109 // VictimSize returns the total number of elements in all the victim (old proc-local) pools 110 func (p *Pool) VictimSize() int64 { 111 return atomic.LoadInt64(&p.victimSz) 112 } 113 114 // getSlow is the slow path for fetching an element, attempting to steal from other proc's 115 // local pools, and failing that, from the aging-out victim pools. pid is still passed so 116 // not all procs start iterating from the same index 117 func (p *Pool) getSlow(pid int) interface{} { 118 // get local pools 119 local := p.localPools() 120 121 // Try to steal from other proc locals 122 for i := 0; i < len(local); i++ { 123 pool := &local[(pid+i+1)%len(local)] 124 if v := pool.get(); v != nil { 125 atomic.AddInt64(&p.localSz, -1) 126 return v 127 } 128 } 129 130 // get victim pools 131 victim := p.victimPools() 132 133 // Attempt to steal from victim pools 134 for i := 0; i < len(victim); i++ { 135 pool := &victim[(pid+i+1)%len(victim)] 136 if v := pool.get(); v != nil { 137 atomic.AddInt64(&p.victimSz, -1) 138 return v 139 } 140 } 141 142 // Set victim pools to nil (none found) 143 atomic.StorePointer(&p.victim, nil) 144 145 return nil 146 } 147 148 // localPools safely loads slice of local _ppools 149 func (p *Pool) localPools() []ppool { 150 local := atomic.LoadPointer(&p.local) 151 if local == nil { 152 return nil 153 } 154 return *(*[]ppool)(local) 155 } 156 157 // victimPools safely loads slice of victim _ppools 158 func (p *Pool) victimPools() []ppool { 159 victim := atomic.LoadPointer(&p.victim) 160 if victim == nil { 161 return nil 162 } 163 return *(*[]ppool)(victim) 164 } 165 166 // pin will get fetch pin proc to PID, fetch proc-local _ppool and current PID we're pinned to 167 func (p *Pool) pin() (*ppool, int) { 168 for { 169 // get local pools 170 local := p.localPools() 171 172 if len(local) > 0 { 173 // local already initialized 174 175 // pin to current proc 176 pid := runtime_procPin() 177 178 // check for pid local pool 179 if pid < len(local) { 180 return &local[pid], pid 181 } 182 183 // unpin from proc 184 runtime_procUnpin() 185 } else { 186 // local not yet initialized 187 188 // Check functions are set 189 if p.New == nil { 190 panic("new func must not be nil") 191 } 192 if p.Evict == nil { 193 panic("evict func must not be nil") 194 } 195 } 196 197 // allocate local 198 p.allocLocal() 199 } 200 } 201 202 // allocLocal allocates a new local pool slice, with the old length passed to check 203 // if pool was previously nil, or whether a change in GOMAXPROCS occurred 204 func (p *Pool) allocLocal() { 205 // get pool lock 206 p.mutex.Lock() 207 208 // Calculate new size to use 209 size := runtime.GOMAXPROCS(0) 210 211 local := p.localPools() 212 if len(local) != size { 213 // GOMAXPROCS changed, reallocate 214 pools := make([]ppool, size) 215 atomic.StorePointer(&p.local, unsafe.Pointer(&pools)) 216 217 // Evict old local elements 218 for i := 0; i < len(local); i++ { 219 pool := &local[i] 220 pool.evict(p.Evict) 221 } 222 } 223 224 // Unlock pool 225 p.mutex.Unlock() 226 } 227 228 // _ppool is a proc local pool 229 type _ppool struct { 230 // root is the root element of the _ppool queue, 231 // and protects concurrent access to the queue 232 root unsafe.Pointer 233 234 // private is a proc private member accessible 235 // only to the pid this _ppool is assigned to, 236 // except during evict (hence the unsafe pointer) 237 private unsafe.Pointer 238 } 239 240 // ppool wraps _ppool with pad. 241 type ppool struct { 242 _ppool 243 244 // Prevents false sharing on widespread platforms with 245 // 128 mod (cache line size) = 0 . 246 pad [128 - unsafe.Sizeof(_ppool{})%128]byte 247 } 248 249 // getPrivate gets the proc private member 250 func (pp *_ppool) getPrivate() interface{} { 251 ptr := atomic.SwapPointer(&pp.private, nil) 252 if ptr == nil { 253 return nil 254 } 255 return *(*interface{})(ptr) 256 } 257 258 // setPrivate sets the proc private member (only if unset) 259 func (pp *_ppool) setPrivate(v interface{}) bool { 260 return atomic.CompareAndSwapPointer(&pp.private, nil, unsafe.Pointer(&v)) 261 } 262 263 // get fetches an element from the queue 264 func (pp *_ppool) get() interface{} { 265 for { 266 // Attempt to load root elem 267 root := atomic.LoadPointer(&pp.root) 268 if root == nil { 269 return nil 270 } 271 272 // Attempt to consume root elem 273 if root == inUsePtr || 274 !atomic.CompareAndSwapPointer(&pp.root, root, inUsePtr) { 275 continue 276 } 277 278 // Root becomes next in chain 279 e := (*elem)(root) 280 v := e.value 281 282 // Place new root back in the chain 283 atomic.StorePointer(&pp.root, unsafe.Pointer(e.next)) 284 putElem(e) 285 286 return v 287 } 288 } 289 290 // put places an element in the queue 291 func (pp *_ppool) put(v interface{}) { 292 // Prepare next elem 293 e := getElem() 294 e.value = v 295 296 for { 297 // Attempt to load root elem 298 root := atomic.LoadPointer(&pp.root) 299 if root == inUsePtr { 300 continue 301 } 302 303 // Set the next elem value (might be nil) 304 e.next = (*elem)(root) 305 306 // Attempt to store this new value at root 307 if atomic.CompareAndSwapPointer(&pp.root, root, unsafe.Pointer(e)) { 308 break 309 } 310 } 311 } 312 313 // hook evicts all entries from pool, calling hook on each 314 func (pp *_ppool) evict(hook func(interface{})) { 315 if v := pp.getPrivate(); v != nil { 316 hook(v) 317 } 318 for { 319 v := pp.get() 320 if v == nil { 321 break 322 } 323 hook(v) 324 } 325 } 326 327 // inUsePtr is a ptr used to indicate _ppool is in use 328 var inUsePtr = unsafe.Pointer(&elem{ 329 next: nil, 330 value: "in_use", 331 }) 332 333 // elem defines an element in the _ppool queue 334 type elem struct { 335 next *elem 336 value interface{} 337 } 338 339 // elemPool is a simple pool of unused elements 340 var elemPool = struct { 341 root unsafe.Pointer 342 }{} 343 344 // getElem fetches a new elem from pool, or creates new 345 func getElem() *elem { 346 // Attempt to load root elem 347 root := atomic.LoadPointer(&elemPool.root) 348 if root == nil { 349 return &elem{} 350 } 351 352 // Attempt to consume root elem 353 if root == inUsePtr || 354 !atomic.CompareAndSwapPointer(&elemPool.root, root, inUsePtr) { 355 return &elem{} 356 } 357 358 // Root becomes next in chain 359 e := (*elem)(root) 360 atomic.StorePointer(&elemPool.root, unsafe.Pointer(e.next)) 361 e.next = nil 362 363 return e 364 } 365 366 // putElem will place element in the pool 367 func putElem(e *elem) { 368 e.value = nil 369 370 // Attempt to load root elem 371 root := atomic.LoadPointer(&elemPool.root) 372 if root == inUsePtr { 373 return // drop 374 } 375 376 // Set the next elem value (might be nil) 377 e.next = (*elem)(root) 378 379 // Attempt to store this new value at root 380 atomic.CompareAndSwapPointer(&elemPool.root, root, unsafe.Pointer(e)) 381 } 382 383 //go:linkname runtime_procPin sync.runtime_procPin 384 func runtime_procPin() int 385 386 //go:linkname runtime_procUnpin sync.runtime_procUnpin 387 func runtime_procUnpin()