agent.go (6594B)
1 // Copyright The OpenTelemetry Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 package jaeger // import "go.opentelemetry.io/otel/exporters/jaeger" 16 17 import ( 18 "context" 19 "fmt" 20 "io" 21 "net" 22 "strings" 23 "time" 24 25 "github.com/go-logr/logr" 26 27 genAgent "go.opentelemetry.io/otel/exporters/jaeger/internal/gen-go/agent" 28 gen "go.opentelemetry.io/otel/exporters/jaeger/internal/gen-go/jaeger" 29 "go.opentelemetry.io/otel/exporters/jaeger/internal/third_party/thrift/lib/go/thrift" 30 ) 31 32 const ( 33 // udpPacketMaxLength is the max size of UDP packet we want to send, synced with jaeger-agent. 34 udpPacketMaxLength = 65000 35 // emitBatchOverhead is the additional overhead bytes used for enveloping the datagram, 36 // synced with jaeger-agent https://github.com/jaegertracing/jaeger-client-go/blob/master/transport_udp.go#L37 37 emitBatchOverhead = 70 38 ) 39 40 // agentClientUDP is a UDP client to Jaeger agent that implements gen.Agent interface. 41 type agentClientUDP struct { 42 genAgent.Agent 43 io.Closer 44 45 connUDP udpConn 46 client *genAgent.AgentClient 47 maxPacketSize int // max size of datagram in bytes 48 thriftBuffer *thrift.TMemoryBuffer // buffer used to calculate byte size of a span 49 thriftProtocol thrift.TProtocol 50 } 51 52 type udpConn interface { 53 Write([]byte) (int, error) 54 SetWriteBuffer(int) error 55 Close() error 56 } 57 58 type agentClientUDPParams struct { 59 Host string 60 Port string 61 MaxPacketSize int 62 Logger logr.Logger 63 AttemptReconnecting bool 64 AttemptReconnectInterval time.Duration 65 } 66 67 // newAgentClientUDP creates a client that sends spans to Jaeger Agent over UDP. 68 func newAgentClientUDP(params agentClientUDPParams) (*agentClientUDP, error) { 69 hostPort := net.JoinHostPort(params.Host, params.Port) 70 // validate hostport 71 if _, _, err := net.SplitHostPort(hostPort); err != nil { 72 return nil, err 73 } 74 75 if params.MaxPacketSize <= 0 || params.MaxPacketSize > udpPacketMaxLength { 76 params.MaxPacketSize = udpPacketMaxLength 77 } 78 79 if params.AttemptReconnecting && params.AttemptReconnectInterval <= 0 { 80 params.AttemptReconnectInterval = time.Second * 30 81 } 82 83 thriftBuffer := thrift.NewTMemoryBufferLen(params.MaxPacketSize) 84 protocolFactory := thrift.NewTCompactProtocolFactoryConf(&thrift.TConfiguration{}) 85 thriftProtocol := protocolFactory.GetProtocol(thriftBuffer) 86 client := genAgent.NewAgentClientFactory(thriftBuffer, protocolFactory) 87 88 var connUDP udpConn 89 var err error 90 91 if params.AttemptReconnecting { 92 // host is hostname, setup resolver loop in case host record changes during operation 93 connUDP, err = newReconnectingUDPConn(hostPort, params.MaxPacketSize, params.AttemptReconnectInterval, net.ResolveUDPAddr, net.DialUDP, params.Logger) 94 if err != nil { 95 return nil, err 96 } 97 } else { 98 destAddr, err := net.ResolveUDPAddr("udp", hostPort) 99 if err != nil { 100 return nil, err 101 } 102 103 connUDP, err = net.DialUDP(destAddr.Network(), nil, destAddr) 104 if err != nil { 105 return nil, err 106 } 107 } 108 109 if err := connUDP.SetWriteBuffer(params.MaxPacketSize); err != nil { 110 return nil, err 111 } 112 113 return &agentClientUDP{ 114 connUDP: connUDP, 115 client: client, 116 maxPacketSize: params.MaxPacketSize, 117 thriftBuffer: thriftBuffer, 118 thriftProtocol: thriftProtocol, 119 }, nil 120 } 121 122 // EmitBatch buffers batch to fit into UDP packets and sends the data to the agent. 123 func (a *agentClientUDP) EmitBatch(ctx context.Context, batch *gen.Batch) error { 124 var errs []error 125 processSize, err := a.calcSizeOfSerializedThrift(ctx, batch.Process) 126 if err != nil { 127 // drop the batch if serialization of process fails. 128 return err 129 } 130 131 maxPacketSize := a.maxPacketSize 132 if maxPacketSize > udpPacketMaxLength-emitBatchOverhead { 133 maxPacketSize = udpPacketMaxLength - emitBatchOverhead 134 } 135 totalSize := processSize 136 var spans []*gen.Span 137 for _, span := range batch.Spans { 138 spanSize, err := a.calcSizeOfSerializedThrift(ctx, span) 139 if err != nil { 140 errs = append(errs, fmt.Errorf("thrift serialization failed: %v", span)) 141 continue 142 } 143 if spanSize+processSize >= maxPacketSize { 144 // drop the span that exceeds the limit. 145 errs = append(errs, fmt.Errorf("span too large to send: %v", span)) 146 continue 147 } 148 if totalSize+spanSize >= maxPacketSize { 149 if err := a.flush(ctx, &gen.Batch{ 150 Process: batch.Process, 151 Spans: spans, 152 }); err != nil { 153 errs = append(errs, err) 154 } 155 spans = spans[:0] 156 totalSize = processSize 157 } 158 totalSize += spanSize 159 spans = append(spans, span) 160 } 161 162 if len(spans) > 0 { 163 if err := a.flush(ctx, &gen.Batch{ 164 Process: batch.Process, 165 Spans: spans, 166 }); err != nil { 167 errs = append(errs, err) 168 } 169 } 170 171 if len(errs) == 1 { 172 return errs[0] 173 } else if len(errs) > 1 { 174 joined := a.makeJoinedErrorString(errs) 175 return fmt.Errorf("multiple errors during transform: %s", joined) 176 } 177 return nil 178 } 179 180 // makeJoinedErrorString join all the errors to one error message. 181 func (a *agentClientUDP) makeJoinedErrorString(errs []error) string { 182 var errMsgs []string 183 for _, err := range errs { 184 errMsgs = append(errMsgs, err.Error()) 185 } 186 return strings.Join(errMsgs, ", ") 187 } 188 189 // flush will send the batch of spans to the agent. 190 func (a *agentClientUDP) flush(ctx context.Context, batch *gen.Batch) error { 191 a.thriftBuffer.Reset() 192 if err := a.client.EmitBatch(ctx, batch); err != nil { 193 return err 194 } 195 if a.thriftBuffer.Len() > a.maxPacketSize { 196 return fmt.Errorf("data does not fit within one UDP packet; size %d, max %d, spans %d", 197 a.thriftBuffer.Len(), a.maxPacketSize, len(batch.Spans)) 198 } 199 _, err := a.connUDP.Write(a.thriftBuffer.Bytes()) 200 return err 201 } 202 203 // calcSizeOfSerializedThrift calculate the serialized thrift packet size. 204 func (a *agentClientUDP) calcSizeOfSerializedThrift(ctx context.Context, thriftStruct thrift.TStruct) (int, error) { 205 a.thriftBuffer.Reset() 206 err := thriftStruct.Write(ctx, a.thriftProtocol) 207 return a.thriftBuffer.Len(), err 208 } 209 210 // Close implements Close() of io.Closer and closes the underlying UDP connection. 211 func (a *agentClientUDP) Close() error { 212 return a.connUDP.Close() 213 }