gtsocial-umbx

Unnamed repository; edit this file 'description' to name the repository.
Log | Files | Refs | README | LICENSE

jaeger.go (9792B)


      1 // Copyright The OpenTelemetry Authors
      2 //
      3 // Licensed under the Apache License, Version 2.0 (the "License");
      4 // you may not use this file except in compliance with the License.
      5 // You may obtain a copy of the License at
      6 //
      7 //     http://www.apache.org/licenses/LICENSE-2.0
      8 //
      9 // Unless required by applicable law or agreed to in writing, software
     10 // distributed under the License is distributed on an "AS IS" BASIS,
     11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     12 // See the License for the specific language governing permissions and
     13 // limitations under the License.
     14 
     15 package jaeger // import "go.opentelemetry.io/otel/exporters/jaeger"
     16 
     17 import (
     18 	"context"
     19 	"encoding/binary"
     20 	"encoding/json"
     21 	"fmt"
     22 	"sync"
     23 
     24 	"go.opentelemetry.io/otel/attribute"
     25 	"go.opentelemetry.io/otel/codes"
     26 	gen "go.opentelemetry.io/otel/exporters/jaeger/internal/gen-go/jaeger"
     27 	"go.opentelemetry.io/otel/sdk/resource"
     28 	sdktrace "go.opentelemetry.io/otel/sdk/trace"
     29 	semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
     30 	"go.opentelemetry.io/otel/trace"
     31 )
     32 
     33 const (
     34 	keyInstrumentationLibraryName    = "otel.library.name"
     35 	keyInstrumentationLibraryVersion = "otel.library.version"
     36 	keyError                         = "error"
     37 	keySpanKind                      = "span.kind"
     38 	keyStatusCode                    = "otel.status_code"
     39 	keyStatusMessage                 = "otel.status_description"
     40 	keyDroppedAttributeCount         = "otel.event.dropped_attributes_count"
     41 	keyEventName                     = "event"
     42 )
     43 
     44 // New returns an OTel Exporter implementation that exports the collected
     45 // spans to Jaeger.
     46 func New(endpointOption EndpointOption) (*Exporter, error) {
     47 	uploader, err := endpointOption.newBatchUploader()
     48 	if err != nil {
     49 		return nil, err
     50 	}
     51 
     52 	// Fetch default service.name from default resource for backup
     53 	var defaultServiceName string
     54 	defaultResource := resource.Default()
     55 	if value, exists := defaultResource.Set().Value(semconv.ServiceNameKey); exists {
     56 		defaultServiceName = value.AsString()
     57 	}
     58 	if defaultServiceName == "" {
     59 		return nil, fmt.Errorf("failed to get service name from default resource")
     60 	}
     61 
     62 	stopCh := make(chan struct{})
     63 	e := &Exporter{
     64 		uploader:           uploader,
     65 		stopCh:             stopCh,
     66 		defaultServiceName: defaultServiceName,
     67 	}
     68 	return e, nil
     69 }
     70 
     71 // Exporter exports OpenTelemetry spans to a Jaeger agent or collector.
     72 type Exporter struct {
     73 	uploader           batchUploader
     74 	stopOnce           sync.Once
     75 	stopCh             chan struct{}
     76 	defaultServiceName string
     77 }
     78 
     79 var _ sdktrace.SpanExporter = (*Exporter)(nil)
     80 
     81 // ExportSpans transforms and exports OpenTelemetry spans to Jaeger.
     82 func (e *Exporter) ExportSpans(ctx context.Context, spans []sdktrace.ReadOnlySpan) error {
     83 	// Return fast if context is already canceled or Exporter shutdown.
     84 	select {
     85 	case <-ctx.Done():
     86 		return ctx.Err()
     87 	case <-e.stopCh:
     88 		return nil
     89 	default:
     90 	}
     91 
     92 	// Cancel export if Exporter is shutdown.
     93 	var cancel context.CancelFunc
     94 	ctx, cancel = context.WithCancel(ctx)
     95 	defer cancel()
     96 	go func(ctx context.Context, cancel context.CancelFunc) {
     97 		select {
     98 		case <-ctx.Done():
     99 		case <-e.stopCh:
    100 			cancel()
    101 		}
    102 	}(ctx, cancel)
    103 
    104 	for _, batch := range jaegerBatchList(spans, e.defaultServiceName) {
    105 		if err := e.uploader.upload(ctx, batch); err != nil {
    106 			return err
    107 		}
    108 	}
    109 
    110 	return nil
    111 }
    112 
    113 // Shutdown stops the Exporter. This will close all connections and release
    114 // all resources held by the Exporter.
    115 func (e *Exporter) Shutdown(ctx context.Context) error {
    116 	// Stop any active and subsequent exports.
    117 	e.stopOnce.Do(func() { close(e.stopCh) })
    118 	select {
    119 	case <-ctx.Done():
    120 		return ctx.Err()
    121 	default:
    122 	}
    123 	return e.uploader.shutdown(ctx)
    124 }
    125 
    126 // MarshalLog is the marshaling function used by the logging system to represent this exporter.
    127 func (e *Exporter) MarshalLog() interface{} {
    128 	return struct {
    129 		Type string
    130 	}{
    131 		Type: "jaeger",
    132 	}
    133 }
    134 
    135 func spanToThrift(ss sdktrace.ReadOnlySpan) *gen.Span {
    136 	attr := ss.Attributes()
    137 	tags := make([]*gen.Tag, 0, len(attr))
    138 	for _, kv := range attr {
    139 		tag := keyValueToTag(kv)
    140 		if tag != nil {
    141 			tags = append(tags, tag)
    142 		}
    143 	}
    144 
    145 	if is := ss.InstrumentationScope(); is.Name != "" {
    146 		tags = append(tags, getStringTag(keyInstrumentationLibraryName, is.Name))
    147 		if is.Version != "" {
    148 			tags = append(tags, getStringTag(keyInstrumentationLibraryVersion, is.Version))
    149 		}
    150 	}
    151 
    152 	if ss.SpanKind() != trace.SpanKindInternal {
    153 		tags = append(tags,
    154 			getStringTag(keySpanKind, ss.SpanKind().String()),
    155 		)
    156 	}
    157 
    158 	if ss.Status().Code != codes.Unset {
    159 		switch ss.Status().Code {
    160 		case codes.Ok:
    161 			tags = append(tags, getStringTag(keyStatusCode, "OK"))
    162 		case codes.Error:
    163 			tags = append(tags, getBoolTag(keyError, true))
    164 			tags = append(tags, getStringTag(keyStatusCode, "ERROR"))
    165 		}
    166 		if ss.Status().Description != "" {
    167 			tags = append(tags, getStringTag(keyStatusMessage, ss.Status().Description))
    168 		}
    169 	}
    170 
    171 	var logs []*gen.Log
    172 	for _, a := range ss.Events() {
    173 		nTags := len(a.Attributes)
    174 		if a.Name != "" {
    175 			nTags++
    176 		}
    177 		if a.DroppedAttributeCount != 0 {
    178 			nTags++
    179 		}
    180 		fields := make([]*gen.Tag, 0, nTags)
    181 		if a.Name != "" {
    182 			// If an event contains an attribute with the same key, it needs
    183 			// to be given precedence and overwrite this.
    184 			fields = append(fields, getStringTag(keyEventName, a.Name))
    185 		}
    186 		for _, kv := range a.Attributes {
    187 			tag := keyValueToTag(kv)
    188 			if tag != nil {
    189 				fields = append(fields, tag)
    190 			}
    191 		}
    192 		if a.DroppedAttributeCount != 0 {
    193 			fields = append(fields, getInt64Tag(keyDroppedAttributeCount, int64(a.DroppedAttributeCount)))
    194 		}
    195 		logs = append(logs, &gen.Log{
    196 			Timestamp: a.Time.UnixNano() / 1000,
    197 			Fields:    fields,
    198 		})
    199 	}
    200 
    201 	var refs []*gen.SpanRef
    202 	for _, link := range ss.Links() {
    203 		tid := link.SpanContext.TraceID()
    204 		sid := link.SpanContext.SpanID()
    205 		refs = append(refs, &gen.SpanRef{
    206 			TraceIdHigh: int64(binary.BigEndian.Uint64(tid[0:8])),
    207 			TraceIdLow:  int64(binary.BigEndian.Uint64(tid[8:16])),
    208 			SpanId:      int64(binary.BigEndian.Uint64(sid[:])),
    209 			RefType:     gen.SpanRefType_FOLLOWS_FROM,
    210 		})
    211 	}
    212 
    213 	tid := ss.SpanContext().TraceID()
    214 	sid := ss.SpanContext().SpanID()
    215 	psid := ss.Parent().SpanID()
    216 	return &gen.Span{
    217 		TraceIdHigh:   int64(binary.BigEndian.Uint64(tid[0:8])),
    218 		TraceIdLow:    int64(binary.BigEndian.Uint64(tid[8:16])),
    219 		SpanId:        int64(binary.BigEndian.Uint64(sid[:])),
    220 		ParentSpanId:  int64(binary.BigEndian.Uint64(psid[:])),
    221 		OperationName: ss.Name(), // TODO: if span kind is added then add prefix "Sent"/"Recv"
    222 		Flags:         int32(ss.SpanContext().TraceFlags()),
    223 		StartTime:     ss.StartTime().UnixNano() / 1000,
    224 		Duration:      ss.EndTime().Sub(ss.StartTime()).Nanoseconds() / 1000,
    225 		Tags:          tags,
    226 		Logs:          logs,
    227 		References:    refs,
    228 	}
    229 }
    230 
    231 func keyValueToTag(keyValue attribute.KeyValue) *gen.Tag {
    232 	var tag *gen.Tag
    233 	switch keyValue.Value.Type() {
    234 	case attribute.STRING:
    235 		s := keyValue.Value.AsString()
    236 		tag = &gen.Tag{
    237 			Key:   string(keyValue.Key),
    238 			VStr:  &s,
    239 			VType: gen.TagType_STRING,
    240 		}
    241 	case attribute.BOOL:
    242 		b := keyValue.Value.AsBool()
    243 		tag = &gen.Tag{
    244 			Key:   string(keyValue.Key),
    245 			VBool: &b,
    246 			VType: gen.TagType_BOOL,
    247 		}
    248 	case attribute.INT64:
    249 		i := keyValue.Value.AsInt64()
    250 		tag = &gen.Tag{
    251 			Key:   string(keyValue.Key),
    252 			VLong: &i,
    253 			VType: gen.TagType_LONG,
    254 		}
    255 	case attribute.FLOAT64:
    256 		f := keyValue.Value.AsFloat64()
    257 		tag = &gen.Tag{
    258 			Key:     string(keyValue.Key),
    259 			VDouble: &f,
    260 			VType:   gen.TagType_DOUBLE,
    261 		}
    262 	case attribute.BOOLSLICE,
    263 		attribute.INT64SLICE,
    264 		attribute.FLOAT64SLICE,
    265 		attribute.STRINGSLICE:
    266 		data, _ := json.Marshal(keyValue.Value.AsInterface())
    267 		a := (string)(data)
    268 		tag = &gen.Tag{
    269 			Key:   string(keyValue.Key),
    270 			VStr:  &a,
    271 			VType: gen.TagType_STRING,
    272 		}
    273 	}
    274 	return tag
    275 }
    276 
    277 func getInt64Tag(k string, i int64) *gen.Tag {
    278 	return &gen.Tag{
    279 		Key:   k,
    280 		VLong: &i,
    281 		VType: gen.TagType_LONG,
    282 	}
    283 }
    284 
    285 func getStringTag(k, s string) *gen.Tag {
    286 	return &gen.Tag{
    287 		Key:   k,
    288 		VStr:  &s,
    289 		VType: gen.TagType_STRING,
    290 	}
    291 }
    292 
    293 func getBoolTag(k string, b bool) *gen.Tag {
    294 	return &gen.Tag{
    295 		Key:   k,
    296 		VBool: &b,
    297 		VType: gen.TagType_BOOL,
    298 	}
    299 }
    300 
    301 // jaegerBatchList transforms a slice of spans into a slice of jaeger Batch.
    302 func jaegerBatchList(ssl []sdktrace.ReadOnlySpan, defaultServiceName string) []*gen.Batch {
    303 	if len(ssl) == 0 {
    304 		return nil
    305 	}
    306 
    307 	batchDict := make(map[attribute.Distinct]*gen.Batch)
    308 
    309 	for _, ss := range ssl {
    310 		if ss == nil {
    311 			continue
    312 		}
    313 
    314 		resourceKey := ss.Resource().Equivalent()
    315 		batch, bOK := batchDict[resourceKey]
    316 		if !bOK {
    317 			batch = &gen.Batch{
    318 				Process: process(ss.Resource(), defaultServiceName),
    319 				Spans:   []*gen.Span{},
    320 			}
    321 		}
    322 		batch.Spans = append(batch.Spans, spanToThrift(ss))
    323 		batchDict[resourceKey] = batch
    324 	}
    325 
    326 	// Transform the categorized map into a slice
    327 	batchList := make([]*gen.Batch, 0, len(batchDict))
    328 	for _, batch := range batchDict {
    329 		batchList = append(batchList, batch)
    330 	}
    331 	return batchList
    332 }
    333 
    334 // process transforms an OTel Resource into a jaeger Process.
    335 func process(res *resource.Resource, defaultServiceName string) *gen.Process {
    336 	var process gen.Process
    337 
    338 	var serviceName attribute.KeyValue
    339 	if res != nil {
    340 		for iter := res.Iter(); iter.Next(); {
    341 			if iter.Attribute().Key == semconv.ServiceNameKey {
    342 				serviceName = iter.Attribute()
    343 				// Don't convert service.name into tag.
    344 				continue
    345 			}
    346 			if tag := keyValueToTag(iter.Attribute()); tag != nil {
    347 				process.Tags = append(process.Tags, tag)
    348 			}
    349 		}
    350 	}
    351 
    352 	// If no service.name is contained in a Span's Resource,
    353 	// that field MUST be populated from the default Resource.
    354 	if serviceName.Value.AsString() == "" {
    355 		serviceName = semconv.ServiceName(defaultServiceName)
    356 	}
    357 	process.ServiceName = serviceName.Value.AsString()
    358 
    359 	return &process
    360 }