method_logger.go (11652B)
1 /* 2 * 3 * Copyright 2018 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 package binarylog 20 21 import ( 22 "context" 23 "net" 24 "strings" 25 "sync/atomic" 26 "time" 27 28 "github.com/golang/protobuf/proto" 29 "github.com/golang/protobuf/ptypes" 30 binlogpb "google.golang.org/grpc/binarylog/grpc_binarylog_v1" 31 "google.golang.org/grpc/metadata" 32 "google.golang.org/grpc/status" 33 ) 34 35 type callIDGenerator struct { 36 id uint64 37 } 38 39 func (g *callIDGenerator) next() uint64 { 40 id := atomic.AddUint64(&g.id, 1) 41 return id 42 } 43 44 // reset is for testing only, and doesn't need to be thread safe. 45 func (g *callIDGenerator) reset() { 46 g.id = 0 47 } 48 49 var idGen callIDGenerator 50 51 // MethodLogger is the sub-logger for each method. 52 type MethodLogger interface { 53 Log(context.Context, LogEntryConfig) 54 } 55 56 // TruncatingMethodLogger is a method logger that truncates headers and messages 57 // based on configured fields. 58 type TruncatingMethodLogger struct { 59 headerMaxLen, messageMaxLen uint64 60 61 callID uint64 62 idWithinCallGen *callIDGenerator 63 64 sink Sink // TODO(blog): make this plugable. 65 } 66 67 // NewTruncatingMethodLogger returns a new truncating method logger. 68 func NewTruncatingMethodLogger(h, m uint64) *TruncatingMethodLogger { 69 return &TruncatingMethodLogger{ 70 headerMaxLen: h, 71 messageMaxLen: m, 72 73 callID: idGen.next(), 74 idWithinCallGen: &callIDGenerator{}, 75 76 sink: DefaultSink, // TODO(blog): make it plugable. 77 } 78 } 79 80 // Build is an internal only method for building the proto message out of the 81 // input event. It's made public to enable other library to reuse as much logic 82 // in TruncatingMethodLogger as possible. 83 func (ml *TruncatingMethodLogger) Build(c LogEntryConfig) *binlogpb.GrpcLogEntry { 84 m := c.toProto() 85 timestamp, _ := ptypes.TimestampProto(time.Now()) 86 m.Timestamp = timestamp 87 m.CallId = ml.callID 88 m.SequenceIdWithinCall = ml.idWithinCallGen.next() 89 90 switch pay := m.Payload.(type) { 91 case *binlogpb.GrpcLogEntry_ClientHeader: 92 m.PayloadTruncated = ml.truncateMetadata(pay.ClientHeader.GetMetadata()) 93 case *binlogpb.GrpcLogEntry_ServerHeader: 94 m.PayloadTruncated = ml.truncateMetadata(pay.ServerHeader.GetMetadata()) 95 case *binlogpb.GrpcLogEntry_Message: 96 m.PayloadTruncated = ml.truncateMessage(pay.Message) 97 } 98 return m 99 } 100 101 // Log creates a proto binary log entry, and logs it to the sink. 102 func (ml *TruncatingMethodLogger) Log(ctx context.Context, c LogEntryConfig) { 103 ml.sink.Write(ml.Build(c)) 104 } 105 106 func (ml *TruncatingMethodLogger) truncateMetadata(mdPb *binlogpb.Metadata) (truncated bool) { 107 if ml.headerMaxLen == maxUInt { 108 return false 109 } 110 var ( 111 bytesLimit = ml.headerMaxLen 112 index int 113 ) 114 // At the end of the loop, index will be the first entry where the total 115 // size is greater than the limit: 116 // 117 // len(entry[:index]) <= ml.hdr && len(entry[:index+1]) > ml.hdr. 118 for ; index < len(mdPb.Entry); index++ { 119 entry := mdPb.Entry[index] 120 if entry.Key == "grpc-trace-bin" { 121 // "grpc-trace-bin" is a special key. It's kept in the log entry, 122 // but not counted towards the size limit. 123 continue 124 } 125 currentEntryLen := uint64(len(entry.GetKey())) + uint64(len(entry.GetValue())) 126 if currentEntryLen > bytesLimit { 127 break 128 } 129 bytesLimit -= currentEntryLen 130 } 131 truncated = index < len(mdPb.Entry) 132 mdPb.Entry = mdPb.Entry[:index] 133 return truncated 134 } 135 136 func (ml *TruncatingMethodLogger) truncateMessage(msgPb *binlogpb.Message) (truncated bool) { 137 if ml.messageMaxLen == maxUInt { 138 return false 139 } 140 if ml.messageMaxLen >= uint64(len(msgPb.Data)) { 141 return false 142 } 143 msgPb.Data = msgPb.Data[:ml.messageMaxLen] 144 return true 145 } 146 147 // LogEntryConfig represents the configuration for binary log entry. 148 type LogEntryConfig interface { 149 toProto() *binlogpb.GrpcLogEntry 150 } 151 152 // ClientHeader configs the binary log entry to be a ClientHeader entry. 153 type ClientHeader struct { 154 OnClientSide bool 155 Header metadata.MD 156 MethodName string 157 Authority string 158 Timeout time.Duration 159 // PeerAddr is required only when it's on server side. 160 PeerAddr net.Addr 161 } 162 163 func (c *ClientHeader) toProto() *binlogpb.GrpcLogEntry { 164 // This function doesn't need to set all the fields (e.g. seq ID). The Log 165 // function will set the fields when necessary. 166 clientHeader := &binlogpb.ClientHeader{ 167 Metadata: mdToMetadataProto(c.Header), 168 MethodName: c.MethodName, 169 Authority: c.Authority, 170 } 171 if c.Timeout > 0 { 172 clientHeader.Timeout = ptypes.DurationProto(c.Timeout) 173 } 174 ret := &binlogpb.GrpcLogEntry{ 175 Type: binlogpb.GrpcLogEntry_EVENT_TYPE_CLIENT_HEADER, 176 Payload: &binlogpb.GrpcLogEntry_ClientHeader{ 177 ClientHeader: clientHeader, 178 }, 179 } 180 if c.OnClientSide { 181 ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT 182 } else { 183 ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER 184 } 185 if c.PeerAddr != nil { 186 ret.Peer = addrToProto(c.PeerAddr) 187 } 188 return ret 189 } 190 191 // ServerHeader configs the binary log entry to be a ServerHeader entry. 192 type ServerHeader struct { 193 OnClientSide bool 194 Header metadata.MD 195 // PeerAddr is required only when it's on client side. 196 PeerAddr net.Addr 197 } 198 199 func (c *ServerHeader) toProto() *binlogpb.GrpcLogEntry { 200 ret := &binlogpb.GrpcLogEntry{ 201 Type: binlogpb.GrpcLogEntry_EVENT_TYPE_SERVER_HEADER, 202 Payload: &binlogpb.GrpcLogEntry_ServerHeader{ 203 ServerHeader: &binlogpb.ServerHeader{ 204 Metadata: mdToMetadataProto(c.Header), 205 }, 206 }, 207 } 208 if c.OnClientSide { 209 ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT 210 } else { 211 ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER 212 } 213 if c.PeerAddr != nil { 214 ret.Peer = addrToProto(c.PeerAddr) 215 } 216 return ret 217 } 218 219 // ClientMessage configs the binary log entry to be a ClientMessage entry. 220 type ClientMessage struct { 221 OnClientSide bool 222 // Message can be a proto.Message or []byte. Other messages formats are not 223 // supported. 224 Message interface{} 225 } 226 227 func (c *ClientMessage) toProto() *binlogpb.GrpcLogEntry { 228 var ( 229 data []byte 230 err error 231 ) 232 if m, ok := c.Message.(proto.Message); ok { 233 data, err = proto.Marshal(m) 234 if err != nil { 235 grpclogLogger.Infof("binarylogging: failed to marshal proto message: %v", err) 236 } 237 } else if b, ok := c.Message.([]byte); ok { 238 data = b 239 } else { 240 grpclogLogger.Infof("binarylogging: message to log is neither proto.message nor []byte") 241 } 242 ret := &binlogpb.GrpcLogEntry{ 243 Type: binlogpb.GrpcLogEntry_EVENT_TYPE_CLIENT_MESSAGE, 244 Payload: &binlogpb.GrpcLogEntry_Message{ 245 Message: &binlogpb.Message{ 246 Length: uint32(len(data)), 247 Data: data, 248 }, 249 }, 250 } 251 if c.OnClientSide { 252 ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT 253 } else { 254 ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER 255 } 256 return ret 257 } 258 259 // ServerMessage configs the binary log entry to be a ServerMessage entry. 260 type ServerMessage struct { 261 OnClientSide bool 262 // Message can be a proto.Message or []byte. Other messages formats are not 263 // supported. 264 Message interface{} 265 } 266 267 func (c *ServerMessage) toProto() *binlogpb.GrpcLogEntry { 268 var ( 269 data []byte 270 err error 271 ) 272 if m, ok := c.Message.(proto.Message); ok { 273 data, err = proto.Marshal(m) 274 if err != nil { 275 grpclogLogger.Infof("binarylogging: failed to marshal proto message: %v", err) 276 } 277 } else if b, ok := c.Message.([]byte); ok { 278 data = b 279 } else { 280 grpclogLogger.Infof("binarylogging: message to log is neither proto.message nor []byte") 281 } 282 ret := &binlogpb.GrpcLogEntry{ 283 Type: binlogpb.GrpcLogEntry_EVENT_TYPE_SERVER_MESSAGE, 284 Payload: &binlogpb.GrpcLogEntry_Message{ 285 Message: &binlogpb.Message{ 286 Length: uint32(len(data)), 287 Data: data, 288 }, 289 }, 290 } 291 if c.OnClientSide { 292 ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT 293 } else { 294 ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER 295 } 296 return ret 297 } 298 299 // ClientHalfClose configs the binary log entry to be a ClientHalfClose entry. 300 type ClientHalfClose struct { 301 OnClientSide bool 302 } 303 304 func (c *ClientHalfClose) toProto() *binlogpb.GrpcLogEntry { 305 ret := &binlogpb.GrpcLogEntry{ 306 Type: binlogpb.GrpcLogEntry_EVENT_TYPE_CLIENT_HALF_CLOSE, 307 Payload: nil, // No payload here. 308 } 309 if c.OnClientSide { 310 ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT 311 } else { 312 ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER 313 } 314 return ret 315 } 316 317 // ServerTrailer configs the binary log entry to be a ServerTrailer entry. 318 type ServerTrailer struct { 319 OnClientSide bool 320 Trailer metadata.MD 321 // Err is the status error. 322 Err error 323 // PeerAddr is required only when it's on client side and the RPC is trailer 324 // only. 325 PeerAddr net.Addr 326 } 327 328 func (c *ServerTrailer) toProto() *binlogpb.GrpcLogEntry { 329 st, ok := status.FromError(c.Err) 330 if !ok { 331 grpclogLogger.Info("binarylogging: error in trailer is not a status error") 332 } 333 var ( 334 detailsBytes []byte 335 err error 336 ) 337 stProto := st.Proto() 338 if stProto != nil && len(stProto.Details) != 0 { 339 detailsBytes, err = proto.Marshal(stProto) 340 if err != nil { 341 grpclogLogger.Infof("binarylogging: failed to marshal status proto: %v", err) 342 } 343 } 344 ret := &binlogpb.GrpcLogEntry{ 345 Type: binlogpb.GrpcLogEntry_EVENT_TYPE_SERVER_TRAILER, 346 Payload: &binlogpb.GrpcLogEntry_Trailer{ 347 Trailer: &binlogpb.Trailer{ 348 Metadata: mdToMetadataProto(c.Trailer), 349 StatusCode: uint32(st.Code()), 350 StatusMessage: st.Message(), 351 StatusDetails: detailsBytes, 352 }, 353 }, 354 } 355 if c.OnClientSide { 356 ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT 357 } else { 358 ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER 359 } 360 if c.PeerAddr != nil { 361 ret.Peer = addrToProto(c.PeerAddr) 362 } 363 return ret 364 } 365 366 // Cancel configs the binary log entry to be a Cancel entry. 367 type Cancel struct { 368 OnClientSide bool 369 } 370 371 func (c *Cancel) toProto() *binlogpb.GrpcLogEntry { 372 ret := &binlogpb.GrpcLogEntry{ 373 Type: binlogpb.GrpcLogEntry_EVENT_TYPE_CANCEL, 374 Payload: nil, 375 } 376 if c.OnClientSide { 377 ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT 378 } else { 379 ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER 380 } 381 return ret 382 } 383 384 // metadataKeyOmit returns whether the metadata entry with this key should be 385 // omitted. 386 func metadataKeyOmit(key string) bool { 387 switch key { 388 case "lb-token", ":path", ":authority", "content-encoding", "content-type", "user-agent", "te": 389 return true 390 case "grpc-trace-bin": // grpc-trace-bin is special because it's visiable to users. 391 return false 392 } 393 return strings.HasPrefix(key, "grpc-") 394 } 395 396 func mdToMetadataProto(md metadata.MD) *binlogpb.Metadata { 397 ret := &binlogpb.Metadata{} 398 for k, vv := range md { 399 if metadataKeyOmit(k) { 400 continue 401 } 402 for _, v := range vv { 403 ret.Entry = append(ret.Entry, 404 &binlogpb.MetadataEntry{ 405 Key: k, 406 Value: []byte(v), 407 }, 408 ) 409 } 410 } 411 return ret 412 } 413 414 func addrToProto(addr net.Addr) *binlogpb.Address { 415 ret := &binlogpb.Address{} 416 switch a := addr.(type) { 417 case *net.TCPAddr: 418 if a.IP.To4() != nil { 419 ret.Type = binlogpb.Address_TYPE_IPV4 420 } else if a.IP.To16() != nil { 421 ret.Type = binlogpb.Address_TYPE_IPV6 422 } else { 423 ret.Type = binlogpb.Address_TYPE_UNKNOWN 424 // Do not set address and port fields. 425 break 426 } 427 ret.Address = a.IP.String() 428 ret.IpPort = uint32(a.Port) 429 case *net.UnixAddr: 430 ret.Type = binlogpb.Address_TYPE_UNIX 431 ret.Address = a.String() 432 default: 433 ret.Type = binlogpb.Address_TYPE_UNKNOWN 434 } 435 return ret 436 }