gtsocial-umbx

Unnamed repository; edit this file 'description' to name the repository.
Log | Files | Refs | README | LICENSE

agent.go (6594B)


      1 // Copyright The OpenTelemetry Authors
      2 //
      3 // Licensed under the Apache License, Version 2.0 (the "License");
      4 // you may not use this file except in compliance with the License.
      5 // You may obtain a copy of the License at
      6 //
      7 //     http://www.apache.org/licenses/LICENSE-2.0
      8 //
      9 // Unless required by applicable law or agreed to in writing, software
     10 // distributed under the License is distributed on an "AS IS" BASIS,
     11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     12 // See the License for the specific language governing permissions and
     13 // limitations under the License.
     14 
     15 package jaeger // import "go.opentelemetry.io/otel/exporters/jaeger"
     16 
     17 import (
     18 	"context"
     19 	"fmt"
     20 	"io"
     21 	"net"
     22 	"strings"
     23 	"time"
     24 
     25 	"github.com/go-logr/logr"
     26 
     27 	genAgent "go.opentelemetry.io/otel/exporters/jaeger/internal/gen-go/agent"
     28 	gen "go.opentelemetry.io/otel/exporters/jaeger/internal/gen-go/jaeger"
     29 	"go.opentelemetry.io/otel/exporters/jaeger/internal/third_party/thrift/lib/go/thrift"
     30 )
     31 
     32 const (
     33 	// udpPacketMaxLength is the max size of UDP packet we want to send, synced with jaeger-agent.
     34 	udpPacketMaxLength = 65000
     35 	// emitBatchOverhead is the additional overhead bytes used for enveloping the datagram,
     36 	// synced with jaeger-agent https://github.com/jaegertracing/jaeger-client-go/blob/master/transport_udp.go#L37
     37 	emitBatchOverhead = 70
     38 )
     39 
     40 // agentClientUDP is a UDP client to Jaeger agent that implements gen.Agent interface.
     41 type agentClientUDP struct {
     42 	genAgent.Agent
     43 	io.Closer
     44 
     45 	connUDP        udpConn
     46 	client         *genAgent.AgentClient
     47 	maxPacketSize  int                   // max size of datagram in bytes
     48 	thriftBuffer   *thrift.TMemoryBuffer // buffer used to calculate byte size of a span
     49 	thriftProtocol thrift.TProtocol
     50 }
     51 
     52 type udpConn interface {
     53 	Write([]byte) (int, error)
     54 	SetWriteBuffer(int) error
     55 	Close() error
     56 }
     57 
     58 type agentClientUDPParams struct {
     59 	Host                     string
     60 	Port                     string
     61 	MaxPacketSize            int
     62 	Logger                   logr.Logger
     63 	AttemptReconnecting      bool
     64 	AttemptReconnectInterval time.Duration
     65 }
     66 
     67 // newAgentClientUDP creates a client that sends spans to Jaeger Agent over UDP.
     68 func newAgentClientUDP(params agentClientUDPParams) (*agentClientUDP, error) {
     69 	hostPort := net.JoinHostPort(params.Host, params.Port)
     70 	// validate hostport
     71 	if _, _, err := net.SplitHostPort(hostPort); err != nil {
     72 		return nil, err
     73 	}
     74 
     75 	if params.MaxPacketSize <= 0 || params.MaxPacketSize > udpPacketMaxLength {
     76 		params.MaxPacketSize = udpPacketMaxLength
     77 	}
     78 
     79 	if params.AttemptReconnecting && params.AttemptReconnectInterval <= 0 {
     80 		params.AttemptReconnectInterval = time.Second * 30
     81 	}
     82 
     83 	thriftBuffer := thrift.NewTMemoryBufferLen(params.MaxPacketSize)
     84 	protocolFactory := thrift.NewTCompactProtocolFactoryConf(&thrift.TConfiguration{})
     85 	thriftProtocol := protocolFactory.GetProtocol(thriftBuffer)
     86 	client := genAgent.NewAgentClientFactory(thriftBuffer, protocolFactory)
     87 
     88 	var connUDP udpConn
     89 	var err error
     90 
     91 	if params.AttemptReconnecting {
     92 		// host is hostname, setup resolver loop in case host record changes during operation
     93 		connUDP, err = newReconnectingUDPConn(hostPort, params.MaxPacketSize, params.AttemptReconnectInterval, net.ResolveUDPAddr, net.DialUDP, params.Logger)
     94 		if err != nil {
     95 			return nil, err
     96 		}
     97 	} else {
     98 		destAddr, err := net.ResolveUDPAddr("udp", hostPort)
     99 		if err != nil {
    100 			return nil, err
    101 		}
    102 
    103 		connUDP, err = net.DialUDP(destAddr.Network(), nil, destAddr)
    104 		if err != nil {
    105 			return nil, err
    106 		}
    107 	}
    108 
    109 	if err := connUDP.SetWriteBuffer(params.MaxPacketSize); err != nil {
    110 		return nil, err
    111 	}
    112 
    113 	return &agentClientUDP{
    114 		connUDP:        connUDP,
    115 		client:         client,
    116 		maxPacketSize:  params.MaxPacketSize,
    117 		thriftBuffer:   thriftBuffer,
    118 		thriftProtocol: thriftProtocol,
    119 	}, nil
    120 }
    121 
    122 // EmitBatch buffers batch to fit into UDP packets and sends the data to the agent.
    123 func (a *agentClientUDP) EmitBatch(ctx context.Context, batch *gen.Batch) error {
    124 	var errs []error
    125 	processSize, err := a.calcSizeOfSerializedThrift(ctx, batch.Process)
    126 	if err != nil {
    127 		// drop the batch if serialization of process fails.
    128 		return err
    129 	}
    130 
    131 	maxPacketSize := a.maxPacketSize
    132 	if maxPacketSize > udpPacketMaxLength-emitBatchOverhead {
    133 		maxPacketSize = udpPacketMaxLength - emitBatchOverhead
    134 	}
    135 	totalSize := processSize
    136 	var spans []*gen.Span
    137 	for _, span := range batch.Spans {
    138 		spanSize, err := a.calcSizeOfSerializedThrift(ctx, span)
    139 		if err != nil {
    140 			errs = append(errs, fmt.Errorf("thrift serialization failed: %v", span))
    141 			continue
    142 		}
    143 		if spanSize+processSize >= maxPacketSize {
    144 			// drop the span that exceeds the limit.
    145 			errs = append(errs, fmt.Errorf("span too large to send: %v", span))
    146 			continue
    147 		}
    148 		if totalSize+spanSize >= maxPacketSize {
    149 			if err := a.flush(ctx, &gen.Batch{
    150 				Process: batch.Process,
    151 				Spans:   spans,
    152 			}); err != nil {
    153 				errs = append(errs, err)
    154 			}
    155 			spans = spans[:0]
    156 			totalSize = processSize
    157 		}
    158 		totalSize += spanSize
    159 		spans = append(spans, span)
    160 	}
    161 
    162 	if len(spans) > 0 {
    163 		if err := a.flush(ctx, &gen.Batch{
    164 			Process: batch.Process,
    165 			Spans:   spans,
    166 		}); err != nil {
    167 			errs = append(errs, err)
    168 		}
    169 	}
    170 
    171 	if len(errs) == 1 {
    172 		return errs[0]
    173 	} else if len(errs) > 1 {
    174 		joined := a.makeJoinedErrorString(errs)
    175 		return fmt.Errorf("multiple errors during transform: %s", joined)
    176 	}
    177 	return nil
    178 }
    179 
    180 // makeJoinedErrorString join all the errors to one error message.
    181 func (a *agentClientUDP) makeJoinedErrorString(errs []error) string {
    182 	var errMsgs []string
    183 	for _, err := range errs {
    184 		errMsgs = append(errMsgs, err.Error())
    185 	}
    186 	return strings.Join(errMsgs, ", ")
    187 }
    188 
    189 // flush will send the batch of spans to the agent.
    190 func (a *agentClientUDP) flush(ctx context.Context, batch *gen.Batch) error {
    191 	a.thriftBuffer.Reset()
    192 	if err := a.client.EmitBatch(ctx, batch); err != nil {
    193 		return err
    194 	}
    195 	if a.thriftBuffer.Len() > a.maxPacketSize {
    196 		return fmt.Errorf("data does not fit within one UDP packet; size %d, max %d, spans %d",
    197 			a.thriftBuffer.Len(), a.maxPacketSize, len(batch.Spans))
    198 	}
    199 	_, err := a.connUDP.Write(a.thriftBuffer.Bytes())
    200 	return err
    201 }
    202 
    203 // calcSizeOfSerializedThrift calculate the serialized thrift packet size.
    204 func (a *agentClientUDP) calcSizeOfSerializedThrift(ctx context.Context, thriftStruct thrift.TStruct) (int, error) {
    205 	a.thriftBuffer.Reset()
    206 	err := thriftStruct.Write(ctx, a.thriftProtocol)
    207 	return a.thriftBuffer.Len(), err
    208 }
    209 
    210 // Close implements Close() of io.Closer and closes the underlying UDP connection.
    211 func (a *agentClientUDP) Close() error {
    212 	return a.connUDP.Close()
    213 }