callback_serializer.go (2060B)
1 /* 2 * 3 * Copyright 2022 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 package grpcsync 20 21 import ( 22 "context" 23 24 "google.golang.org/grpc/internal/buffer" 25 ) 26 27 // CallbackSerializer provides a mechanism to schedule callbacks in a 28 // synchronized manner. It provides a FIFO guarantee on the order of execution 29 // of scheduled callbacks. New callbacks can be scheduled by invoking the 30 // Schedule() method. 31 // 32 // This type is safe for concurrent access. 33 type CallbackSerializer struct { 34 callbacks *buffer.Unbounded 35 } 36 37 // NewCallbackSerializer returns a new CallbackSerializer instance. The provided 38 // context will be passed to the scheduled callbacks. Users should cancel the 39 // provided context to shutdown the CallbackSerializer. It is guaranteed that no 40 // callbacks will be executed once this context is canceled. 41 func NewCallbackSerializer(ctx context.Context) *CallbackSerializer { 42 t := &CallbackSerializer{callbacks: buffer.NewUnbounded()} 43 go t.run(ctx) 44 return t 45 } 46 47 // Schedule adds a callback to be scheduled after existing callbacks are run. 48 // 49 // Callbacks are expected to honor the context when performing any blocking 50 // operations, and should return early when the context is canceled. 51 func (t *CallbackSerializer) Schedule(f func(ctx context.Context)) { 52 t.callbacks.Put(f) 53 } 54 55 func (t *CallbackSerializer) run(ctx context.Context) { 56 for ctx.Err() == nil { 57 select { 58 case <-ctx.Done(): 59 return 60 case callback := <-t.callbacks.Get(): 61 t.callbacks.Load() 62 callback.(func(ctx context.Context))(ctx) 63 } 64 } 65 }