gtsocial-umbx

Unnamed repository; edit this file 'description' to name the repository.
Log | Files | Refs | README | LICENSE

method_logger.go (11652B)


      1 /*
      2  *
      3  * Copyright 2018 gRPC authors.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  *
     17  */
     18 
     19 package binarylog
     20 
     21 import (
     22 	"context"
     23 	"net"
     24 	"strings"
     25 	"sync/atomic"
     26 	"time"
     27 
     28 	"github.com/golang/protobuf/proto"
     29 	"github.com/golang/protobuf/ptypes"
     30 	binlogpb "google.golang.org/grpc/binarylog/grpc_binarylog_v1"
     31 	"google.golang.org/grpc/metadata"
     32 	"google.golang.org/grpc/status"
     33 )
     34 
     35 type callIDGenerator struct {
     36 	id uint64
     37 }
     38 
     39 func (g *callIDGenerator) next() uint64 {
     40 	id := atomic.AddUint64(&g.id, 1)
     41 	return id
     42 }
     43 
     44 // reset is for testing only, and doesn't need to be thread safe.
     45 func (g *callIDGenerator) reset() {
     46 	g.id = 0
     47 }
     48 
     49 var idGen callIDGenerator
     50 
     51 // MethodLogger is the sub-logger for each method.
     52 type MethodLogger interface {
     53 	Log(context.Context, LogEntryConfig)
     54 }
     55 
     56 // TruncatingMethodLogger is a method logger that truncates headers and messages
     57 // based on configured fields.
     58 type TruncatingMethodLogger struct {
     59 	headerMaxLen, messageMaxLen uint64
     60 
     61 	callID          uint64
     62 	idWithinCallGen *callIDGenerator
     63 
     64 	sink Sink // TODO(blog): make this plugable.
     65 }
     66 
     67 // NewTruncatingMethodLogger returns a new truncating method logger.
     68 func NewTruncatingMethodLogger(h, m uint64) *TruncatingMethodLogger {
     69 	return &TruncatingMethodLogger{
     70 		headerMaxLen:  h,
     71 		messageMaxLen: m,
     72 
     73 		callID:          idGen.next(),
     74 		idWithinCallGen: &callIDGenerator{},
     75 
     76 		sink: DefaultSink, // TODO(blog): make it plugable.
     77 	}
     78 }
     79 
     80 // Build is an internal only method for building the proto message out of the
     81 // input event. It's made public to enable other library to reuse as much logic
     82 // in TruncatingMethodLogger as possible.
     83 func (ml *TruncatingMethodLogger) Build(c LogEntryConfig) *binlogpb.GrpcLogEntry {
     84 	m := c.toProto()
     85 	timestamp, _ := ptypes.TimestampProto(time.Now())
     86 	m.Timestamp = timestamp
     87 	m.CallId = ml.callID
     88 	m.SequenceIdWithinCall = ml.idWithinCallGen.next()
     89 
     90 	switch pay := m.Payload.(type) {
     91 	case *binlogpb.GrpcLogEntry_ClientHeader:
     92 		m.PayloadTruncated = ml.truncateMetadata(pay.ClientHeader.GetMetadata())
     93 	case *binlogpb.GrpcLogEntry_ServerHeader:
     94 		m.PayloadTruncated = ml.truncateMetadata(pay.ServerHeader.GetMetadata())
     95 	case *binlogpb.GrpcLogEntry_Message:
     96 		m.PayloadTruncated = ml.truncateMessage(pay.Message)
     97 	}
     98 	return m
     99 }
    100 
    101 // Log creates a proto binary log entry, and logs it to the sink.
    102 func (ml *TruncatingMethodLogger) Log(ctx context.Context, c LogEntryConfig) {
    103 	ml.sink.Write(ml.Build(c))
    104 }
    105 
    106 func (ml *TruncatingMethodLogger) truncateMetadata(mdPb *binlogpb.Metadata) (truncated bool) {
    107 	if ml.headerMaxLen == maxUInt {
    108 		return false
    109 	}
    110 	var (
    111 		bytesLimit = ml.headerMaxLen
    112 		index      int
    113 	)
    114 	// At the end of the loop, index will be the first entry where the total
    115 	// size is greater than the limit:
    116 	//
    117 	// len(entry[:index]) <= ml.hdr && len(entry[:index+1]) > ml.hdr.
    118 	for ; index < len(mdPb.Entry); index++ {
    119 		entry := mdPb.Entry[index]
    120 		if entry.Key == "grpc-trace-bin" {
    121 			// "grpc-trace-bin" is a special key. It's kept in the log entry,
    122 			// but not counted towards the size limit.
    123 			continue
    124 		}
    125 		currentEntryLen := uint64(len(entry.GetKey())) + uint64(len(entry.GetValue()))
    126 		if currentEntryLen > bytesLimit {
    127 			break
    128 		}
    129 		bytesLimit -= currentEntryLen
    130 	}
    131 	truncated = index < len(mdPb.Entry)
    132 	mdPb.Entry = mdPb.Entry[:index]
    133 	return truncated
    134 }
    135 
    136 func (ml *TruncatingMethodLogger) truncateMessage(msgPb *binlogpb.Message) (truncated bool) {
    137 	if ml.messageMaxLen == maxUInt {
    138 		return false
    139 	}
    140 	if ml.messageMaxLen >= uint64(len(msgPb.Data)) {
    141 		return false
    142 	}
    143 	msgPb.Data = msgPb.Data[:ml.messageMaxLen]
    144 	return true
    145 }
    146 
    147 // LogEntryConfig represents the configuration for binary log entry.
    148 type LogEntryConfig interface {
    149 	toProto() *binlogpb.GrpcLogEntry
    150 }
    151 
    152 // ClientHeader configs the binary log entry to be a ClientHeader entry.
    153 type ClientHeader struct {
    154 	OnClientSide bool
    155 	Header       metadata.MD
    156 	MethodName   string
    157 	Authority    string
    158 	Timeout      time.Duration
    159 	// PeerAddr is required only when it's on server side.
    160 	PeerAddr net.Addr
    161 }
    162 
    163 func (c *ClientHeader) toProto() *binlogpb.GrpcLogEntry {
    164 	// This function doesn't need to set all the fields (e.g. seq ID). The Log
    165 	// function will set the fields when necessary.
    166 	clientHeader := &binlogpb.ClientHeader{
    167 		Metadata:   mdToMetadataProto(c.Header),
    168 		MethodName: c.MethodName,
    169 		Authority:  c.Authority,
    170 	}
    171 	if c.Timeout > 0 {
    172 		clientHeader.Timeout = ptypes.DurationProto(c.Timeout)
    173 	}
    174 	ret := &binlogpb.GrpcLogEntry{
    175 		Type: binlogpb.GrpcLogEntry_EVENT_TYPE_CLIENT_HEADER,
    176 		Payload: &binlogpb.GrpcLogEntry_ClientHeader{
    177 			ClientHeader: clientHeader,
    178 		},
    179 	}
    180 	if c.OnClientSide {
    181 		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
    182 	} else {
    183 		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
    184 	}
    185 	if c.PeerAddr != nil {
    186 		ret.Peer = addrToProto(c.PeerAddr)
    187 	}
    188 	return ret
    189 }
    190 
    191 // ServerHeader configs the binary log entry to be a ServerHeader entry.
    192 type ServerHeader struct {
    193 	OnClientSide bool
    194 	Header       metadata.MD
    195 	// PeerAddr is required only when it's on client side.
    196 	PeerAddr net.Addr
    197 }
    198 
    199 func (c *ServerHeader) toProto() *binlogpb.GrpcLogEntry {
    200 	ret := &binlogpb.GrpcLogEntry{
    201 		Type: binlogpb.GrpcLogEntry_EVENT_TYPE_SERVER_HEADER,
    202 		Payload: &binlogpb.GrpcLogEntry_ServerHeader{
    203 			ServerHeader: &binlogpb.ServerHeader{
    204 				Metadata: mdToMetadataProto(c.Header),
    205 			},
    206 		},
    207 	}
    208 	if c.OnClientSide {
    209 		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
    210 	} else {
    211 		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
    212 	}
    213 	if c.PeerAddr != nil {
    214 		ret.Peer = addrToProto(c.PeerAddr)
    215 	}
    216 	return ret
    217 }
    218 
    219 // ClientMessage configs the binary log entry to be a ClientMessage entry.
    220 type ClientMessage struct {
    221 	OnClientSide bool
    222 	// Message can be a proto.Message or []byte. Other messages formats are not
    223 	// supported.
    224 	Message interface{}
    225 }
    226 
    227 func (c *ClientMessage) toProto() *binlogpb.GrpcLogEntry {
    228 	var (
    229 		data []byte
    230 		err  error
    231 	)
    232 	if m, ok := c.Message.(proto.Message); ok {
    233 		data, err = proto.Marshal(m)
    234 		if err != nil {
    235 			grpclogLogger.Infof("binarylogging: failed to marshal proto message: %v", err)
    236 		}
    237 	} else if b, ok := c.Message.([]byte); ok {
    238 		data = b
    239 	} else {
    240 		grpclogLogger.Infof("binarylogging: message to log is neither proto.message nor []byte")
    241 	}
    242 	ret := &binlogpb.GrpcLogEntry{
    243 		Type: binlogpb.GrpcLogEntry_EVENT_TYPE_CLIENT_MESSAGE,
    244 		Payload: &binlogpb.GrpcLogEntry_Message{
    245 			Message: &binlogpb.Message{
    246 				Length: uint32(len(data)),
    247 				Data:   data,
    248 			},
    249 		},
    250 	}
    251 	if c.OnClientSide {
    252 		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
    253 	} else {
    254 		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
    255 	}
    256 	return ret
    257 }
    258 
    259 // ServerMessage configs the binary log entry to be a ServerMessage entry.
    260 type ServerMessage struct {
    261 	OnClientSide bool
    262 	// Message can be a proto.Message or []byte. Other messages formats are not
    263 	// supported.
    264 	Message interface{}
    265 }
    266 
    267 func (c *ServerMessage) toProto() *binlogpb.GrpcLogEntry {
    268 	var (
    269 		data []byte
    270 		err  error
    271 	)
    272 	if m, ok := c.Message.(proto.Message); ok {
    273 		data, err = proto.Marshal(m)
    274 		if err != nil {
    275 			grpclogLogger.Infof("binarylogging: failed to marshal proto message: %v", err)
    276 		}
    277 	} else if b, ok := c.Message.([]byte); ok {
    278 		data = b
    279 	} else {
    280 		grpclogLogger.Infof("binarylogging: message to log is neither proto.message nor []byte")
    281 	}
    282 	ret := &binlogpb.GrpcLogEntry{
    283 		Type: binlogpb.GrpcLogEntry_EVENT_TYPE_SERVER_MESSAGE,
    284 		Payload: &binlogpb.GrpcLogEntry_Message{
    285 			Message: &binlogpb.Message{
    286 				Length: uint32(len(data)),
    287 				Data:   data,
    288 			},
    289 		},
    290 	}
    291 	if c.OnClientSide {
    292 		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
    293 	} else {
    294 		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
    295 	}
    296 	return ret
    297 }
    298 
    299 // ClientHalfClose configs the binary log entry to be a ClientHalfClose entry.
    300 type ClientHalfClose struct {
    301 	OnClientSide bool
    302 }
    303 
    304 func (c *ClientHalfClose) toProto() *binlogpb.GrpcLogEntry {
    305 	ret := &binlogpb.GrpcLogEntry{
    306 		Type:    binlogpb.GrpcLogEntry_EVENT_TYPE_CLIENT_HALF_CLOSE,
    307 		Payload: nil, // No payload here.
    308 	}
    309 	if c.OnClientSide {
    310 		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
    311 	} else {
    312 		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
    313 	}
    314 	return ret
    315 }
    316 
    317 // ServerTrailer configs the binary log entry to be a ServerTrailer entry.
    318 type ServerTrailer struct {
    319 	OnClientSide bool
    320 	Trailer      metadata.MD
    321 	// Err is the status error.
    322 	Err error
    323 	// PeerAddr is required only when it's on client side and the RPC is trailer
    324 	// only.
    325 	PeerAddr net.Addr
    326 }
    327 
    328 func (c *ServerTrailer) toProto() *binlogpb.GrpcLogEntry {
    329 	st, ok := status.FromError(c.Err)
    330 	if !ok {
    331 		grpclogLogger.Info("binarylogging: error in trailer is not a status error")
    332 	}
    333 	var (
    334 		detailsBytes []byte
    335 		err          error
    336 	)
    337 	stProto := st.Proto()
    338 	if stProto != nil && len(stProto.Details) != 0 {
    339 		detailsBytes, err = proto.Marshal(stProto)
    340 		if err != nil {
    341 			grpclogLogger.Infof("binarylogging: failed to marshal status proto: %v", err)
    342 		}
    343 	}
    344 	ret := &binlogpb.GrpcLogEntry{
    345 		Type: binlogpb.GrpcLogEntry_EVENT_TYPE_SERVER_TRAILER,
    346 		Payload: &binlogpb.GrpcLogEntry_Trailer{
    347 			Trailer: &binlogpb.Trailer{
    348 				Metadata:      mdToMetadataProto(c.Trailer),
    349 				StatusCode:    uint32(st.Code()),
    350 				StatusMessage: st.Message(),
    351 				StatusDetails: detailsBytes,
    352 			},
    353 		},
    354 	}
    355 	if c.OnClientSide {
    356 		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
    357 	} else {
    358 		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
    359 	}
    360 	if c.PeerAddr != nil {
    361 		ret.Peer = addrToProto(c.PeerAddr)
    362 	}
    363 	return ret
    364 }
    365 
    366 // Cancel configs the binary log entry to be a Cancel entry.
    367 type Cancel struct {
    368 	OnClientSide bool
    369 }
    370 
    371 func (c *Cancel) toProto() *binlogpb.GrpcLogEntry {
    372 	ret := &binlogpb.GrpcLogEntry{
    373 		Type:    binlogpb.GrpcLogEntry_EVENT_TYPE_CANCEL,
    374 		Payload: nil,
    375 	}
    376 	if c.OnClientSide {
    377 		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
    378 	} else {
    379 		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
    380 	}
    381 	return ret
    382 }
    383 
    384 // metadataKeyOmit returns whether the metadata entry with this key should be
    385 // omitted.
    386 func metadataKeyOmit(key string) bool {
    387 	switch key {
    388 	case "lb-token", ":path", ":authority", "content-encoding", "content-type", "user-agent", "te":
    389 		return true
    390 	case "grpc-trace-bin": // grpc-trace-bin is special because it's visiable to users.
    391 		return false
    392 	}
    393 	return strings.HasPrefix(key, "grpc-")
    394 }
    395 
    396 func mdToMetadataProto(md metadata.MD) *binlogpb.Metadata {
    397 	ret := &binlogpb.Metadata{}
    398 	for k, vv := range md {
    399 		if metadataKeyOmit(k) {
    400 			continue
    401 		}
    402 		for _, v := range vv {
    403 			ret.Entry = append(ret.Entry,
    404 				&binlogpb.MetadataEntry{
    405 					Key:   k,
    406 					Value: []byte(v),
    407 				},
    408 			)
    409 		}
    410 	}
    411 	return ret
    412 }
    413 
    414 func addrToProto(addr net.Addr) *binlogpb.Address {
    415 	ret := &binlogpb.Address{}
    416 	switch a := addr.(type) {
    417 	case *net.TCPAddr:
    418 		if a.IP.To4() != nil {
    419 			ret.Type = binlogpb.Address_TYPE_IPV4
    420 		} else if a.IP.To16() != nil {
    421 			ret.Type = binlogpb.Address_TYPE_IPV6
    422 		} else {
    423 			ret.Type = binlogpb.Address_TYPE_UNKNOWN
    424 			// Do not set address and port fields.
    425 			break
    426 		}
    427 		ret.Address = a.IP.String()
    428 		ret.IpPort = uint32(a.Port)
    429 	case *net.UnixAddr:
    430 		ret.Type = binlogpb.Address_TYPE_UNIX
    431 		ret.Address = a.String()
    432 	default:
    433 		ret.Type = binlogpb.Address_TYPE_UNKNOWN
    434 	}
    435 	return ret
    436 }