uploader.go (11221B)
1 // Copyright The OpenTelemetry Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 package jaeger // import "go.opentelemetry.io/otel/exporters/jaeger" 16 17 import ( 18 "bytes" 19 "context" 20 "fmt" 21 "io" 22 "log" 23 "net/http" 24 "time" 25 26 "github.com/go-logr/logr" 27 "github.com/go-logr/stdr" 28 29 gen "go.opentelemetry.io/otel/exporters/jaeger/internal/gen-go/jaeger" 30 "go.opentelemetry.io/otel/exporters/jaeger/internal/third_party/thrift/lib/go/thrift" 31 ) 32 33 // batchUploader send a batch of spans to Jaeger. 34 type batchUploader interface { 35 upload(context.Context, *gen.Batch) error 36 shutdown(context.Context) error 37 } 38 39 // EndpointOption configures a Jaeger endpoint. 40 type EndpointOption interface { 41 newBatchUploader() (batchUploader, error) 42 } 43 44 type endpointOptionFunc func() (batchUploader, error) 45 46 func (fn endpointOptionFunc) newBatchUploader() (batchUploader, error) { 47 return fn() 48 } 49 50 // WithAgentEndpoint configures the Jaeger exporter to send spans to a Jaeger agent 51 // over compact thrift protocol. This will use the following environment variables for 52 // configuration if no explicit option is provided: 53 // 54 // - OTEL_EXPORTER_JAEGER_AGENT_HOST is used for the agent address host 55 // - OTEL_EXPORTER_JAEGER_AGENT_PORT is used for the agent address port 56 // 57 // The passed options will take precedence over any environment variables and default values 58 // will be used if neither are provided. 59 func WithAgentEndpoint(options ...AgentEndpointOption) EndpointOption { 60 return endpointOptionFunc(func() (batchUploader, error) { 61 cfg := agentEndpointConfig{ 62 agentClientUDPParams{ 63 AttemptReconnecting: true, 64 Host: envOr(envAgentHost, "localhost"), 65 Port: envOr(envAgentPort, "6831"), 66 }, 67 } 68 for _, opt := range options { 69 cfg = opt.apply(cfg) 70 } 71 72 client, err := newAgentClientUDP(cfg.agentClientUDPParams) 73 if err != nil { 74 return nil, err 75 } 76 77 return &agentUploader{client: client}, nil 78 }) 79 } 80 81 // AgentEndpointOption configures a Jaeger agent endpoint. 82 type AgentEndpointOption interface { 83 apply(agentEndpointConfig) agentEndpointConfig 84 } 85 86 type agentEndpointConfig struct { 87 agentClientUDPParams 88 } 89 90 type agentEndpointOptionFunc func(agentEndpointConfig) agentEndpointConfig 91 92 func (fn agentEndpointOptionFunc) apply(cfg agentEndpointConfig) agentEndpointConfig { 93 return fn(cfg) 94 } 95 96 // WithAgentHost sets a host to be used in the agent client endpoint. 97 // This option overrides any value set for the 98 // OTEL_EXPORTER_JAEGER_AGENT_HOST environment variable. 99 // If this option is not passed and the env var is not set, "localhost" will be used by default. 100 func WithAgentHost(host string) AgentEndpointOption { 101 return agentEndpointOptionFunc(func(o agentEndpointConfig) agentEndpointConfig { 102 o.Host = host 103 return o 104 }) 105 } 106 107 // WithAgentPort sets a port to be used in the agent client endpoint. 108 // This option overrides any value set for the 109 // OTEL_EXPORTER_JAEGER_AGENT_PORT environment variable. 110 // If this option is not passed and the env var is not set, "6831" will be used by default. 111 func WithAgentPort(port string) AgentEndpointOption { 112 return agentEndpointOptionFunc(func(o agentEndpointConfig) agentEndpointConfig { 113 o.Port = port 114 return o 115 }) 116 } 117 118 var emptyLogger = logr.Logger{} 119 120 // WithLogger sets a logger to be used by agent client. 121 // WithLogger and WithLogr will overwrite each other. 122 func WithLogger(logger *log.Logger) AgentEndpointOption { 123 return WithLogr(stdr.New(logger)) 124 } 125 126 // WithLogr sets a logr.Logger to be used by agent client. 127 // WithLogr and WithLogger will overwrite each other. 128 func WithLogr(logger logr.Logger) AgentEndpointOption { 129 return agentEndpointOptionFunc(func(o agentEndpointConfig) agentEndpointConfig { 130 o.Logger = logger 131 return o 132 }) 133 } 134 135 // WithDisableAttemptReconnecting sets option to disable reconnecting udp client. 136 func WithDisableAttemptReconnecting() AgentEndpointOption { 137 return agentEndpointOptionFunc(func(o agentEndpointConfig) agentEndpointConfig { 138 o.AttemptReconnecting = false 139 return o 140 }) 141 } 142 143 // WithAttemptReconnectingInterval sets the interval between attempts to re resolve agent endpoint. 144 func WithAttemptReconnectingInterval(interval time.Duration) AgentEndpointOption { 145 return agentEndpointOptionFunc(func(o agentEndpointConfig) agentEndpointConfig { 146 o.AttemptReconnectInterval = interval 147 return o 148 }) 149 } 150 151 // WithMaxPacketSize sets the maximum UDP packet size for transport to the Jaeger agent. 152 func WithMaxPacketSize(size int) AgentEndpointOption { 153 return agentEndpointOptionFunc(func(o agentEndpointConfig) agentEndpointConfig { 154 o.MaxPacketSize = size 155 return o 156 }) 157 } 158 159 // WithCollectorEndpoint defines the full URL to the Jaeger HTTP Thrift collector. This will 160 // use the following environment variables for configuration if no explicit option is provided: 161 // 162 // - OTEL_EXPORTER_JAEGER_ENDPOINT is the HTTP endpoint for sending spans directly to a collector. 163 // - OTEL_EXPORTER_JAEGER_USER is the username to be sent as authentication to the collector endpoint. 164 // - OTEL_EXPORTER_JAEGER_PASSWORD is the password to be sent as authentication to the collector endpoint. 165 // 166 // The passed options will take precedence over any environment variables. 167 // If neither values are provided for the endpoint, the default value of "http://localhost:14268/api/traces" will be used. 168 // If neither values are provided for the username or the password, they will not be set since there is no default. 169 func WithCollectorEndpoint(options ...CollectorEndpointOption) EndpointOption { 170 return endpointOptionFunc(func() (batchUploader, error) { 171 cfg := collectorEndpointConfig{ 172 endpoint: envOr(envEndpoint, "http://localhost:14268/api/traces"), 173 username: envOr(envUser, ""), 174 password: envOr(envPassword, ""), 175 httpClient: http.DefaultClient, 176 } 177 178 for _, opt := range options { 179 cfg = opt.apply(cfg) 180 } 181 182 return &collectorUploader{ 183 endpoint: cfg.endpoint, 184 username: cfg.username, 185 password: cfg.password, 186 httpClient: cfg.httpClient, 187 }, nil 188 }) 189 } 190 191 // CollectorEndpointOption configures a Jaeger collector endpoint. 192 type CollectorEndpointOption interface { 193 apply(collectorEndpointConfig) collectorEndpointConfig 194 } 195 196 type collectorEndpointConfig struct { 197 // endpoint for sending spans directly to a collector. 198 endpoint string 199 200 // username to be used for authentication with the collector endpoint. 201 username string 202 203 // password to be used for authentication with the collector endpoint. 204 password string 205 206 // httpClient to be used to make requests to the collector endpoint. 207 httpClient *http.Client 208 } 209 210 type collectorEndpointOptionFunc func(collectorEndpointConfig) collectorEndpointConfig 211 212 func (fn collectorEndpointOptionFunc) apply(cfg collectorEndpointConfig) collectorEndpointConfig { 213 return fn(cfg) 214 } 215 216 // WithEndpoint is the URL for the Jaeger collector that spans are sent to. 217 // This option overrides any value set for the 218 // OTEL_EXPORTER_JAEGER_ENDPOINT environment variable. 219 // If this option is not passed and the environment variable is not set, 220 // "http://localhost:14268/api/traces" will be used by default. 221 func WithEndpoint(endpoint string) CollectorEndpointOption { 222 return collectorEndpointOptionFunc(func(o collectorEndpointConfig) collectorEndpointConfig { 223 o.endpoint = endpoint 224 return o 225 }) 226 } 227 228 // WithUsername sets the username to be used in the authorization header sent for all requests to the collector. 229 // This option overrides any value set for the 230 // OTEL_EXPORTER_JAEGER_USER environment variable. 231 // If this option is not passed and the environment variable is not set, no username will be set. 232 func WithUsername(username string) CollectorEndpointOption { 233 return collectorEndpointOptionFunc(func(o collectorEndpointConfig) collectorEndpointConfig { 234 o.username = username 235 return o 236 }) 237 } 238 239 // WithPassword sets the password to be used in the authorization header sent for all requests to the collector. 240 // This option overrides any value set for the 241 // OTEL_EXPORTER_JAEGER_PASSWORD environment variable. 242 // If this option is not passed and the environment variable is not set, no password will be set. 243 func WithPassword(password string) CollectorEndpointOption { 244 return collectorEndpointOptionFunc(func(o collectorEndpointConfig) collectorEndpointConfig { 245 o.password = password 246 return o 247 }) 248 } 249 250 // WithHTTPClient sets the http client to be used to make request to the collector endpoint. 251 func WithHTTPClient(client *http.Client) CollectorEndpointOption { 252 return collectorEndpointOptionFunc(func(o collectorEndpointConfig) collectorEndpointConfig { 253 o.httpClient = client 254 return o 255 }) 256 } 257 258 // agentUploader implements batchUploader interface sending batches to 259 // Jaeger through the UDP agent. 260 type agentUploader struct { 261 client *agentClientUDP 262 } 263 264 var _ batchUploader = (*agentUploader)(nil) 265 266 func (a *agentUploader) shutdown(ctx context.Context) error { 267 done := make(chan error, 1) 268 go func() { 269 done <- a.client.Close() 270 }() 271 272 select { 273 case <-ctx.Done(): 274 // Prioritize not blocking the calling thread and just leak the 275 // spawned goroutine to close the client. 276 return ctx.Err() 277 case err := <-done: 278 return err 279 } 280 } 281 282 func (a *agentUploader) upload(ctx context.Context, batch *gen.Batch) error { 283 return a.client.EmitBatch(ctx, batch) 284 } 285 286 // collectorUploader implements batchUploader interface sending batches to 287 // Jaeger through the collector http endpoint. 288 type collectorUploader struct { 289 endpoint string 290 username string 291 password string 292 httpClient *http.Client 293 } 294 295 var _ batchUploader = (*collectorUploader)(nil) 296 297 func (c *collectorUploader) shutdown(ctx context.Context) error { 298 // The Exporter will cancel any active exports and will prevent all 299 // subsequent exports, so nothing to do here. 300 return nil 301 } 302 303 func (c *collectorUploader) upload(ctx context.Context, batch *gen.Batch) error { 304 body, err := serialize(batch) 305 if err != nil { 306 return err 307 } 308 req, err := http.NewRequestWithContext(ctx, "POST", c.endpoint, body) 309 if err != nil { 310 return err 311 } 312 if c.username != "" && c.password != "" { 313 req.SetBasicAuth(c.username, c.password) 314 } 315 req.Header.Set("Content-Type", "application/x-thrift") 316 317 resp, err := c.httpClient.Do(req) 318 if err != nil { 319 return err 320 } 321 322 _, _ = io.Copy(io.Discard, resp.Body) 323 if err = resp.Body.Close(); err != nil { 324 return err 325 } 326 327 if resp.StatusCode < 200 || resp.StatusCode >= 300 { 328 return fmt.Errorf("failed to upload traces; HTTP status code: %d", resp.StatusCode) 329 } 330 return nil 331 } 332 333 func serialize(obj thrift.TStruct) (*bytes.Buffer, error) { 334 buf := thrift.NewTMemoryBuffer() 335 if err := obj.Write(context.Background(), thrift.NewTBinaryProtocolConf(buf, &thrift.TConfiguration{})); err != nil { 336 return nil, err 337 } 338 return buf.Buffer, nil 339 }