gtsocial-umbx

Unnamed repository; edit this file 'description' to name the repository.
Log | Files | Refs | README | LICENSE

uploader.go (11221B)


      1 // Copyright The OpenTelemetry Authors
      2 //
      3 // Licensed under the Apache License, Version 2.0 (the "License");
      4 // you may not use this file except in compliance with the License.
      5 // You may obtain a copy of the License at
      6 //
      7 //     http://www.apache.org/licenses/LICENSE-2.0
      8 //
      9 // Unless required by applicable law or agreed to in writing, software
     10 // distributed under the License is distributed on an "AS IS" BASIS,
     11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     12 // See the License for the specific language governing permissions and
     13 // limitations under the License.
     14 
     15 package jaeger // import "go.opentelemetry.io/otel/exporters/jaeger"
     16 
     17 import (
     18 	"bytes"
     19 	"context"
     20 	"fmt"
     21 	"io"
     22 	"log"
     23 	"net/http"
     24 	"time"
     25 
     26 	"github.com/go-logr/logr"
     27 	"github.com/go-logr/stdr"
     28 
     29 	gen "go.opentelemetry.io/otel/exporters/jaeger/internal/gen-go/jaeger"
     30 	"go.opentelemetry.io/otel/exporters/jaeger/internal/third_party/thrift/lib/go/thrift"
     31 )
     32 
     33 // batchUploader send a batch of spans to Jaeger.
     34 type batchUploader interface {
     35 	upload(context.Context, *gen.Batch) error
     36 	shutdown(context.Context) error
     37 }
     38 
     39 // EndpointOption configures a Jaeger endpoint.
     40 type EndpointOption interface {
     41 	newBatchUploader() (batchUploader, error)
     42 }
     43 
     44 type endpointOptionFunc func() (batchUploader, error)
     45 
     46 func (fn endpointOptionFunc) newBatchUploader() (batchUploader, error) {
     47 	return fn()
     48 }
     49 
     50 // WithAgentEndpoint configures the Jaeger exporter to send spans to a Jaeger agent
     51 // over compact thrift protocol. This will use the following environment variables for
     52 // configuration if no explicit option is provided:
     53 //
     54 // - OTEL_EXPORTER_JAEGER_AGENT_HOST is used for the agent address host
     55 // - OTEL_EXPORTER_JAEGER_AGENT_PORT is used for the agent address port
     56 //
     57 // The passed options will take precedence over any environment variables and default values
     58 // will be used if neither are provided.
     59 func WithAgentEndpoint(options ...AgentEndpointOption) EndpointOption {
     60 	return endpointOptionFunc(func() (batchUploader, error) {
     61 		cfg := agentEndpointConfig{
     62 			agentClientUDPParams{
     63 				AttemptReconnecting: true,
     64 				Host:                envOr(envAgentHost, "localhost"),
     65 				Port:                envOr(envAgentPort, "6831"),
     66 			},
     67 		}
     68 		for _, opt := range options {
     69 			cfg = opt.apply(cfg)
     70 		}
     71 
     72 		client, err := newAgentClientUDP(cfg.agentClientUDPParams)
     73 		if err != nil {
     74 			return nil, err
     75 		}
     76 
     77 		return &agentUploader{client: client}, nil
     78 	})
     79 }
     80 
     81 // AgentEndpointOption configures a Jaeger agent endpoint.
     82 type AgentEndpointOption interface {
     83 	apply(agentEndpointConfig) agentEndpointConfig
     84 }
     85 
     86 type agentEndpointConfig struct {
     87 	agentClientUDPParams
     88 }
     89 
     90 type agentEndpointOptionFunc func(agentEndpointConfig) agentEndpointConfig
     91 
     92 func (fn agentEndpointOptionFunc) apply(cfg agentEndpointConfig) agentEndpointConfig {
     93 	return fn(cfg)
     94 }
     95 
     96 // WithAgentHost sets a host to be used in the agent client endpoint.
     97 // This option overrides any value set for the
     98 // OTEL_EXPORTER_JAEGER_AGENT_HOST environment variable.
     99 // If this option is not passed and the env var is not set, "localhost" will be used by default.
    100 func WithAgentHost(host string) AgentEndpointOption {
    101 	return agentEndpointOptionFunc(func(o agentEndpointConfig) agentEndpointConfig {
    102 		o.Host = host
    103 		return o
    104 	})
    105 }
    106 
    107 // WithAgentPort sets a port to be used in the agent client endpoint.
    108 // This option overrides any value set for the
    109 // OTEL_EXPORTER_JAEGER_AGENT_PORT environment variable.
    110 // If this option is not passed and the env var is not set, "6831" will be used by default.
    111 func WithAgentPort(port string) AgentEndpointOption {
    112 	return agentEndpointOptionFunc(func(o agentEndpointConfig) agentEndpointConfig {
    113 		o.Port = port
    114 		return o
    115 	})
    116 }
    117 
    118 var emptyLogger = logr.Logger{}
    119 
    120 // WithLogger sets a logger to be used by agent client.
    121 // WithLogger and WithLogr will overwrite each other.
    122 func WithLogger(logger *log.Logger) AgentEndpointOption {
    123 	return WithLogr(stdr.New(logger))
    124 }
    125 
    126 // WithLogr sets a logr.Logger to be used by agent client.
    127 // WithLogr and WithLogger will overwrite each other.
    128 func WithLogr(logger logr.Logger) AgentEndpointOption {
    129 	return agentEndpointOptionFunc(func(o agentEndpointConfig) agentEndpointConfig {
    130 		o.Logger = logger
    131 		return o
    132 	})
    133 }
    134 
    135 // WithDisableAttemptReconnecting sets option to disable reconnecting udp client.
    136 func WithDisableAttemptReconnecting() AgentEndpointOption {
    137 	return agentEndpointOptionFunc(func(o agentEndpointConfig) agentEndpointConfig {
    138 		o.AttemptReconnecting = false
    139 		return o
    140 	})
    141 }
    142 
    143 // WithAttemptReconnectingInterval sets the interval between attempts to re resolve agent endpoint.
    144 func WithAttemptReconnectingInterval(interval time.Duration) AgentEndpointOption {
    145 	return agentEndpointOptionFunc(func(o agentEndpointConfig) agentEndpointConfig {
    146 		o.AttemptReconnectInterval = interval
    147 		return o
    148 	})
    149 }
    150 
    151 // WithMaxPacketSize sets the maximum UDP packet size for transport to the Jaeger agent.
    152 func WithMaxPacketSize(size int) AgentEndpointOption {
    153 	return agentEndpointOptionFunc(func(o agentEndpointConfig) agentEndpointConfig {
    154 		o.MaxPacketSize = size
    155 		return o
    156 	})
    157 }
    158 
    159 // WithCollectorEndpoint defines the full URL to the Jaeger HTTP Thrift collector. This will
    160 // use the following environment variables for configuration if no explicit option is provided:
    161 //
    162 // - OTEL_EXPORTER_JAEGER_ENDPOINT is the HTTP endpoint for sending spans directly to a collector.
    163 // - OTEL_EXPORTER_JAEGER_USER is the username to be sent as authentication to the collector endpoint.
    164 // - OTEL_EXPORTER_JAEGER_PASSWORD is the password to be sent as authentication to the collector endpoint.
    165 //
    166 // The passed options will take precedence over any environment variables.
    167 // If neither values are provided for the endpoint, the default value of "http://localhost:14268/api/traces" will be used.
    168 // If neither values are provided for the username or the password, they will not be set since there is no default.
    169 func WithCollectorEndpoint(options ...CollectorEndpointOption) EndpointOption {
    170 	return endpointOptionFunc(func() (batchUploader, error) {
    171 		cfg := collectorEndpointConfig{
    172 			endpoint:   envOr(envEndpoint, "http://localhost:14268/api/traces"),
    173 			username:   envOr(envUser, ""),
    174 			password:   envOr(envPassword, ""),
    175 			httpClient: http.DefaultClient,
    176 		}
    177 
    178 		for _, opt := range options {
    179 			cfg = opt.apply(cfg)
    180 		}
    181 
    182 		return &collectorUploader{
    183 			endpoint:   cfg.endpoint,
    184 			username:   cfg.username,
    185 			password:   cfg.password,
    186 			httpClient: cfg.httpClient,
    187 		}, nil
    188 	})
    189 }
    190 
    191 // CollectorEndpointOption configures a Jaeger collector endpoint.
    192 type CollectorEndpointOption interface {
    193 	apply(collectorEndpointConfig) collectorEndpointConfig
    194 }
    195 
    196 type collectorEndpointConfig struct {
    197 	// endpoint for sending spans directly to a collector.
    198 	endpoint string
    199 
    200 	// username to be used for authentication with the collector endpoint.
    201 	username string
    202 
    203 	// password to be used for authentication with the collector endpoint.
    204 	password string
    205 
    206 	// httpClient to be used to make requests to the collector endpoint.
    207 	httpClient *http.Client
    208 }
    209 
    210 type collectorEndpointOptionFunc func(collectorEndpointConfig) collectorEndpointConfig
    211 
    212 func (fn collectorEndpointOptionFunc) apply(cfg collectorEndpointConfig) collectorEndpointConfig {
    213 	return fn(cfg)
    214 }
    215 
    216 // WithEndpoint is the URL for the Jaeger collector that spans are sent to.
    217 // This option overrides any value set for the
    218 // OTEL_EXPORTER_JAEGER_ENDPOINT environment variable.
    219 // If this option is not passed and the environment variable is not set,
    220 // "http://localhost:14268/api/traces" will be used by default.
    221 func WithEndpoint(endpoint string) CollectorEndpointOption {
    222 	return collectorEndpointOptionFunc(func(o collectorEndpointConfig) collectorEndpointConfig {
    223 		o.endpoint = endpoint
    224 		return o
    225 	})
    226 }
    227 
    228 // WithUsername sets the username to be used in the authorization header sent for all requests to the collector.
    229 // This option overrides any value set for the
    230 // OTEL_EXPORTER_JAEGER_USER environment variable.
    231 // If this option is not passed and the environment variable is not set, no username will be set.
    232 func WithUsername(username string) CollectorEndpointOption {
    233 	return collectorEndpointOptionFunc(func(o collectorEndpointConfig) collectorEndpointConfig {
    234 		o.username = username
    235 		return o
    236 	})
    237 }
    238 
    239 // WithPassword sets the password to be used in the authorization header sent for all requests to the collector.
    240 // This option overrides any value set for the
    241 // OTEL_EXPORTER_JAEGER_PASSWORD environment variable.
    242 // If this option is not passed and the environment variable is not set, no password will be set.
    243 func WithPassword(password string) CollectorEndpointOption {
    244 	return collectorEndpointOptionFunc(func(o collectorEndpointConfig) collectorEndpointConfig {
    245 		o.password = password
    246 		return o
    247 	})
    248 }
    249 
    250 // WithHTTPClient sets the http client to be used to make request to the collector endpoint.
    251 func WithHTTPClient(client *http.Client) CollectorEndpointOption {
    252 	return collectorEndpointOptionFunc(func(o collectorEndpointConfig) collectorEndpointConfig {
    253 		o.httpClient = client
    254 		return o
    255 	})
    256 }
    257 
    258 // agentUploader implements batchUploader interface sending batches to
    259 // Jaeger through the UDP agent.
    260 type agentUploader struct {
    261 	client *agentClientUDP
    262 }
    263 
    264 var _ batchUploader = (*agentUploader)(nil)
    265 
    266 func (a *agentUploader) shutdown(ctx context.Context) error {
    267 	done := make(chan error, 1)
    268 	go func() {
    269 		done <- a.client.Close()
    270 	}()
    271 
    272 	select {
    273 	case <-ctx.Done():
    274 		// Prioritize not blocking the calling thread and just leak the
    275 		// spawned goroutine to close the client.
    276 		return ctx.Err()
    277 	case err := <-done:
    278 		return err
    279 	}
    280 }
    281 
    282 func (a *agentUploader) upload(ctx context.Context, batch *gen.Batch) error {
    283 	return a.client.EmitBatch(ctx, batch)
    284 }
    285 
    286 // collectorUploader implements batchUploader interface sending batches to
    287 // Jaeger through the collector http endpoint.
    288 type collectorUploader struct {
    289 	endpoint   string
    290 	username   string
    291 	password   string
    292 	httpClient *http.Client
    293 }
    294 
    295 var _ batchUploader = (*collectorUploader)(nil)
    296 
    297 func (c *collectorUploader) shutdown(ctx context.Context) error {
    298 	// The Exporter will cancel any active exports and will prevent all
    299 	// subsequent exports, so nothing to do here.
    300 	return nil
    301 }
    302 
    303 func (c *collectorUploader) upload(ctx context.Context, batch *gen.Batch) error {
    304 	body, err := serialize(batch)
    305 	if err != nil {
    306 		return err
    307 	}
    308 	req, err := http.NewRequestWithContext(ctx, "POST", c.endpoint, body)
    309 	if err != nil {
    310 		return err
    311 	}
    312 	if c.username != "" && c.password != "" {
    313 		req.SetBasicAuth(c.username, c.password)
    314 	}
    315 	req.Header.Set("Content-Type", "application/x-thrift")
    316 
    317 	resp, err := c.httpClient.Do(req)
    318 	if err != nil {
    319 		return err
    320 	}
    321 
    322 	_, _ = io.Copy(io.Discard, resp.Body)
    323 	if err = resp.Body.Close(); err != nil {
    324 		return err
    325 	}
    326 
    327 	if resp.StatusCode < 200 || resp.StatusCode >= 300 {
    328 		return fmt.Errorf("failed to upload traces; HTTP status code: %d", resp.StatusCode)
    329 	}
    330 	return nil
    331 }
    332 
    333 func serialize(obj thrift.TStruct) (*bytes.Buffer, error) {
    334 	buf := thrift.NewTMemoryBuffer()
    335 	if err := obj.Write(context.Background(), thrift.NewTBinaryProtocolConf(buf, &thrift.TConfiguration{})); err != nil {
    336 		return nil, err
    337 	}
    338 	return buf.Buffer, nil
    339 }