gtsocial-umbx

Unnamed repository; edit this file 'description' to name the repository.
Log | Files | Refs | README | LICENSE

sink.go (4440B)


      1 /*
      2  *
      3  * Copyright 2018 gRPC authors.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  *
     17  */
     18 
     19 package binarylog
     20 
     21 import (
     22 	"bufio"
     23 	"encoding/binary"
     24 	"io"
     25 	"sync"
     26 	"time"
     27 
     28 	"github.com/golang/protobuf/proto"
     29 	binlogpb "google.golang.org/grpc/binarylog/grpc_binarylog_v1"
     30 )
     31 
     32 var (
     33 	// DefaultSink is the sink where the logs will be written to. It's exported
     34 	// for the binarylog package to update.
     35 	DefaultSink Sink = &noopSink{} // TODO(blog): change this default (file in /tmp).
     36 )
     37 
     38 // Sink writes log entry into the binary log sink.
     39 //
     40 // sink is a copy of the exported binarylog.Sink, to avoid circular dependency.
     41 type Sink interface {
     42 	// Write will be called to write the log entry into the sink.
     43 	//
     44 	// It should be thread-safe so it can be called in parallel.
     45 	Write(*binlogpb.GrpcLogEntry) error
     46 	// Close will be called when the Sink is replaced by a new Sink.
     47 	Close() error
     48 }
     49 
     50 type noopSink struct{}
     51 
     52 func (ns *noopSink) Write(*binlogpb.GrpcLogEntry) error { return nil }
     53 func (ns *noopSink) Close() error                       { return nil }
     54 
     55 // newWriterSink creates a binary log sink with the given writer.
     56 //
     57 // Write() marshals the proto message and writes it to the given writer. Each
     58 // message is prefixed with a 4 byte big endian unsigned integer as the length.
     59 //
     60 // No buffer is done, Close() doesn't try to close the writer.
     61 func newWriterSink(w io.Writer) Sink {
     62 	return &writerSink{out: w}
     63 }
     64 
     65 type writerSink struct {
     66 	out io.Writer
     67 }
     68 
     69 func (ws *writerSink) Write(e *binlogpb.GrpcLogEntry) error {
     70 	b, err := proto.Marshal(e)
     71 	if err != nil {
     72 		grpclogLogger.Errorf("binary logging: failed to marshal proto message: %v", err)
     73 		return err
     74 	}
     75 	hdr := make([]byte, 4)
     76 	binary.BigEndian.PutUint32(hdr, uint32(len(b)))
     77 	if _, err := ws.out.Write(hdr); err != nil {
     78 		return err
     79 	}
     80 	if _, err := ws.out.Write(b); err != nil {
     81 		return err
     82 	}
     83 	return nil
     84 }
     85 
     86 func (ws *writerSink) Close() error { return nil }
     87 
     88 type bufferedSink struct {
     89 	mu             sync.Mutex
     90 	closer         io.Closer
     91 	out            Sink          // out is built on buf.
     92 	buf            *bufio.Writer // buf is kept for flush.
     93 	flusherStarted bool
     94 
     95 	writeTicker *time.Ticker
     96 	done        chan struct{}
     97 }
     98 
     99 func (fs *bufferedSink) Write(e *binlogpb.GrpcLogEntry) error {
    100 	fs.mu.Lock()
    101 	defer fs.mu.Unlock()
    102 	if !fs.flusherStarted {
    103 		// Start the write loop when Write is called.
    104 		fs.startFlushGoroutine()
    105 		fs.flusherStarted = true
    106 	}
    107 	if err := fs.out.Write(e); err != nil {
    108 		return err
    109 	}
    110 	return nil
    111 }
    112 
    113 const (
    114 	bufFlushDuration = 60 * time.Second
    115 )
    116 
    117 func (fs *bufferedSink) startFlushGoroutine() {
    118 	fs.writeTicker = time.NewTicker(bufFlushDuration)
    119 	go func() {
    120 		for {
    121 			select {
    122 			case <-fs.done:
    123 				return
    124 			case <-fs.writeTicker.C:
    125 			}
    126 			fs.mu.Lock()
    127 			if err := fs.buf.Flush(); err != nil {
    128 				grpclogLogger.Warningf("failed to flush to Sink: %v", err)
    129 			}
    130 			fs.mu.Unlock()
    131 		}
    132 	}()
    133 }
    134 
    135 func (fs *bufferedSink) Close() error {
    136 	fs.mu.Lock()
    137 	defer fs.mu.Unlock()
    138 	if fs.writeTicker != nil {
    139 		fs.writeTicker.Stop()
    140 	}
    141 	close(fs.done)
    142 	if err := fs.buf.Flush(); err != nil {
    143 		grpclogLogger.Warningf("failed to flush to Sink: %v", err)
    144 	}
    145 	if err := fs.closer.Close(); err != nil {
    146 		grpclogLogger.Warningf("failed to close the underlying WriterCloser: %v", err)
    147 	}
    148 	if err := fs.out.Close(); err != nil {
    149 		grpclogLogger.Warningf("failed to close the Sink: %v", err)
    150 	}
    151 	return nil
    152 }
    153 
    154 // NewBufferedSink creates a binary log sink with the given WriteCloser.
    155 //
    156 // Write() marshals the proto message and writes it to the given writer. Each
    157 // message is prefixed with a 4 byte big endian unsigned integer as the length.
    158 //
    159 // Content is kept in a buffer, and is flushed every 60 seconds.
    160 //
    161 // Close closes the WriteCloser.
    162 func NewBufferedSink(o io.WriteCloser) Sink {
    163 	bufW := bufio.NewWriter(o)
    164 	return &bufferedSink{
    165 		closer: o,
    166 		out:    newWriterSink(bufW),
    167 		buf:    bufW,
    168 		done:   make(chan struct{}),
    169 	}
    170 }