gtsocial-umbx

Unnamed repository; edit this file 'description' to name the repository.
Log | Files | Refs | README | LICENSE

unbounded.go (2625B)


      1 /*
      2  * Copyright 2019 gRPC authors.
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *     http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  *
     16  */
     17 
     18 // Package buffer provides an implementation of an unbounded buffer.
     19 package buffer
     20 
     21 import "sync"
     22 
     23 // Unbounded is an implementation of an unbounded buffer which does not use
     24 // extra goroutines. This is typically used for passing updates from one entity
     25 // to another within gRPC.
     26 //
     27 // All methods on this type are thread-safe and don't block on anything except
     28 // the underlying mutex used for synchronization.
     29 //
     30 // Unbounded supports values of any type to be stored in it by using a channel
     31 // of `interface{}`. This means that a call to Put() incurs an extra memory
     32 // allocation, and also that users need a type assertion while reading. For
     33 // performance critical code paths, using Unbounded is strongly discouraged and
     34 // defining a new type specific implementation of this buffer is preferred. See
     35 // internal/transport/transport.go for an example of this.
     36 type Unbounded struct {
     37 	c       chan interface{}
     38 	mu      sync.Mutex
     39 	backlog []interface{}
     40 }
     41 
     42 // NewUnbounded returns a new instance of Unbounded.
     43 func NewUnbounded() *Unbounded {
     44 	return &Unbounded{c: make(chan interface{}, 1)}
     45 }
     46 
     47 // Put adds t to the unbounded buffer.
     48 func (b *Unbounded) Put(t interface{}) {
     49 	b.mu.Lock()
     50 	if len(b.backlog) == 0 {
     51 		select {
     52 		case b.c <- t:
     53 			b.mu.Unlock()
     54 			return
     55 		default:
     56 		}
     57 	}
     58 	b.backlog = append(b.backlog, t)
     59 	b.mu.Unlock()
     60 }
     61 
     62 // Load sends the earliest buffered data, if any, onto the read channel
     63 // returned by Get(). Users are expected to call this every time they read a
     64 // value from the read channel.
     65 func (b *Unbounded) Load() {
     66 	b.mu.Lock()
     67 	if len(b.backlog) > 0 {
     68 		select {
     69 		case b.c <- b.backlog[0]:
     70 			b.backlog[0] = nil
     71 			b.backlog = b.backlog[1:]
     72 		default:
     73 		}
     74 	}
     75 	b.mu.Unlock()
     76 }
     77 
     78 // Get returns a read channel on which values added to the buffer, via Put(),
     79 // are sent on.
     80 //
     81 // Upon reading a value from this channel, users are expected to call Load() to
     82 // send the next buffered value onto the channel if there is any.
     83 func (b *Unbounded) Get() <-chan interface{} {
     84 	return b.c
     85 }