jaeger.go (9792B)
1 // Copyright The OpenTelemetry Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 package jaeger // import "go.opentelemetry.io/otel/exporters/jaeger" 16 17 import ( 18 "context" 19 "encoding/binary" 20 "encoding/json" 21 "fmt" 22 "sync" 23 24 "go.opentelemetry.io/otel/attribute" 25 "go.opentelemetry.io/otel/codes" 26 gen "go.opentelemetry.io/otel/exporters/jaeger/internal/gen-go/jaeger" 27 "go.opentelemetry.io/otel/sdk/resource" 28 sdktrace "go.opentelemetry.io/otel/sdk/trace" 29 semconv "go.opentelemetry.io/otel/semconv/v1.17.0" 30 "go.opentelemetry.io/otel/trace" 31 ) 32 33 const ( 34 keyInstrumentationLibraryName = "otel.library.name" 35 keyInstrumentationLibraryVersion = "otel.library.version" 36 keyError = "error" 37 keySpanKind = "span.kind" 38 keyStatusCode = "otel.status_code" 39 keyStatusMessage = "otel.status_description" 40 keyDroppedAttributeCount = "otel.event.dropped_attributes_count" 41 keyEventName = "event" 42 ) 43 44 // New returns an OTel Exporter implementation that exports the collected 45 // spans to Jaeger. 46 func New(endpointOption EndpointOption) (*Exporter, error) { 47 uploader, err := endpointOption.newBatchUploader() 48 if err != nil { 49 return nil, err 50 } 51 52 // Fetch default service.name from default resource for backup 53 var defaultServiceName string 54 defaultResource := resource.Default() 55 if value, exists := defaultResource.Set().Value(semconv.ServiceNameKey); exists { 56 defaultServiceName = value.AsString() 57 } 58 if defaultServiceName == "" { 59 return nil, fmt.Errorf("failed to get service name from default resource") 60 } 61 62 stopCh := make(chan struct{}) 63 e := &Exporter{ 64 uploader: uploader, 65 stopCh: stopCh, 66 defaultServiceName: defaultServiceName, 67 } 68 return e, nil 69 } 70 71 // Exporter exports OpenTelemetry spans to a Jaeger agent or collector. 72 type Exporter struct { 73 uploader batchUploader 74 stopOnce sync.Once 75 stopCh chan struct{} 76 defaultServiceName string 77 } 78 79 var _ sdktrace.SpanExporter = (*Exporter)(nil) 80 81 // ExportSpans transforms and exports OpenTelemetry spans to Jaeger. 82 func (e *Exporter) ExportSpans(ctx context.Context, spans []sdktrace.ReadOnlySpan) error { 83 // Return fast if context is already canceled or Exporter shutdown. 84 select { 85 case <-ctx.Done(): 86 return ctx.Err() 87 case <-e.stopCh: 88 return nil 89 default: 90 } 91 92 // Cancel export if Exporter is shutdown. 93 var cancel context.CancelFunc 94 ctx, cancel = context.WithCancel(ctx) 95 defer cancel() 96 go func(ctx context.Context, cancel context.CancelFunc) { 97 select { 98 case <-ctx.Done(): 99 case <-e.stopCh: 100 cancel() 101 } 102 }(ctx, cancel) 103 104 for _, batch := range jaegerBatchList(spans, e.defaultServiceName) { 105 if err := e.uploader.upload(ctx, batch); err != nil { 106 return err 107 } 108 } 109 110 return nil 111 } 112 113 // Shutdown stops the Exporter. This will close all connections and release 114 // all resources held by the Exporter. 115 func (e *Exporter) Shutdown(ctx context.Context) error { 116 // Stop any active and subsequent exports. 117 e.stopOnce.Do(func() { close(e.stopCh) }) 118 select { 119 case <-ctx.Done(): 120 return ctx.Err() 121 default: 122 } 123 return e.uploader.shutdown(ctx) 124 } 125 126 // MarshalLog is the marshaling function used by the logging system to represent this exporter. 127 func (e *Exporter) MarshalLog() interface{} { 128 return struct { 129 Type string 130 }{ 131 Type: "jaeger", 132 } 133 } 134 135 func spanToThrift(ss sdktrace.ReadOnlySpan) *gen.Span { 136 attr := ss.Attributes() 137 tags := make([]*gen.Tag, 0, len(attr)) 138 for _, kv := range attr { 139 tag := keyValueToTag(kv) 140 if tag != nil { 141 tags = append(tags, tag) 142 } 143 } 144 145 if is := ss.InstrumentationScope(); is.Name != "" { 146 tags = append(tags, getStringTag(keyInstrumentationLibraryName, is.Name)) 147 if is.Version != "" { 148 tags = append(tags, getStringTag(keyInstrumentationLibraryVersion, is.Version)) 149 } 150 } 151 152 if ss.SpanKind() != trace.SpanKindInternal { 153 tags = append(tags, 154 getStringTag(keySpanKind, ss.SpanKind().String()), 155 ) 156 } 157 158 if ss.Status().Code != codes.Unset { 159 switch ss.Status().Code { 160 case codes.Ok: 161 tags = append(tags, getStringTag(keyStatusCode, "OK")) 162 case codes.Error: 163 tags = append(tags, getBoolTag(keyError, true)) 164 tags = append(tags, getStringTag(keyStatusCode, "ERROR")) 165 } 166 if ss.Status().Description != "" { 167 tags = append(tags, getStringTag(keyStatusMessage, ss.Status().Description)) 168 } 169 } 170 171 var logs []*gen.Log 172 for _, a := range ss.Events() { 173 nTags := len(a.Attributes) 174 if a.Name != "" { 175 nTags++ 176 } 177 if a.DroppedAttributeCount != 0 { 178 nTags++ 179 } 180 fields := make([]*gen.Tag, 0, nTags) 181 if a.Name != "" { 182 // If an event contains an attribute with the same key, it needs 183 // to be given precedence and overwrite this. 184 fields = append(fields, getStringTag(keyEventName, a.Name)) 185 } 186 for _, kv := range a.Attributes { 187 tag := keyValueToTag(kv) 188 if tag != nil { 189 fields = append(fields, tag) 190 } 191 } 192 if a.DroppedAttributeCount != 0 { 193 fields = append(fields, getInt64Tag(keyDroppedAttributeCount, int64(a.DroppedAttributeCount))) 194 } 195 logs = append(logs, &gen.Log{ 196 Timestamp: a.Time.UnixNano() / 1000, 197 Fields: fields, 198 }) 199 } 200 201 var refs []*gen.SpanRef 202 for _, link := range ss.Links() { 203 tid := link.SpanContext.TraceID() 204 sid := link.SpanContext.SpanID() 205 refs = append(refs, &gen.SpanRef{ 206 TraceIdHigh: int64(binary.BigEndian.Uint64(tid[0:8])), 207 TraceIdLow: int64(binary.BigEndian.Uint64(tid[8:16])), 208 SpanId: int64(binary.BigEndian.Uint64(sid[:])), 209 RefType: gen.SpanRefType_FOLLOWS_FROM, 210 }) 211 } 212 213 tid := ss.SpanContext().TraceID() 214 sid := ss.SpanContext().SpanID() 215 psid := ss.Parent().SpanID() 216 return &gen.Span{ 217 TraceIdHigh: int64(binary.BigEndian.Uint64(tid[0:8])), 218 TraceIdLow: int64(binary.BigEndian.Uint64(tid[8:16])), 219 SpanId: int64(binary.BigEndian.Uint64(sid[:])), 220 ParentSpanId: int64(binary.BigEndian.Uint64(psid[:])), 221 OperationName: ss.Name(), // TODO: if span kind is added then add prefix "Sent"/"Recv" 222 Flags: int32(ss.SpanContext().TraceFlags()), 223 StartTime: ss.StartTime().UnixNano() / 1000, 224 Duration: ss.EndTime().Sub(ss.StartTime()).Nanoseconds() / 1000, 225 Tags: tags, 226 Logs: logs, 227 References: refs, 228 } 229 } 230 231 func keyValueToTag(keyValue attribute.KeyValue) *gen.Tag { 232 var tag *gen.Tag 233 switch keyValue.Value.Type() { 234 case attribute.STRING: 235 s := keyValue.Value.AsString() 236 tag = &gen.Tag{ 237 Key: string(keyValue.Key), 238 VStr: &s, 239 VType: gen.TagType_STRING, 240 } 241 case attribute.BOOL: 242 b := keyValue.Value.AsBool() 243 tag = &gen.Tag{ 244 Key: string(keyValue.Key), 245 VBool: &b, 246 VType: gen.TagType_BOOL, 247 } 248 case attribute.INT64: 249 i := keyValue.Value.AsInt64() 250 tag = &gen.Tag{ 251 Key: string(keyValue.Key), 252 VLong: &i, 253 VType: gen.TagType_LONG, 254 } 255 case attribute.FLOAT64: 256 f := keyValue.Value.AsFloat64() 257 tag = &gen.Tag{ 258 Key: string(keyValue.Key), 259 VDouble: &f, 260 VType: gen.TagType_DOUBLE, 261 } 262 case attribute.BOOLSLICE, 263 attribute.INT64SLICE, 264 attribute.FLOAT64SLICE, 265 attribute.STRINGSLICE: 266 data, _ := json.Marshal(keyValue.Value.AsInterface()) 267 a := (string)(data) 268 tag = &gen.Tag{ 269 Key: string(keyValue.Key), 270 VStr: &a, 271 VType: gen.TagType_STRING, 272 } 273 } 274 return tag 275 } 276 277 func getInt64Tag(k string, i int64) *gen.Tag { 278 return &gen.Tag{ 279 Key: k, 280 VLong: &i, 281 VType: gen.TagType_LONG, 282 } 283 } 284 285 func getStringTag(k, s string) *gen.Tag { 286 return &gen.Tag{ 287 Key: k, 288 VStr: &s, 289 VType: gen.TagType_STRING, 290 } 291 } 292 293 func getBoolTag(k string, b bool) *gen.Tag { 294 return &gen.Tag{ 295 Key: k, 296 VBool: &b, 297 VType: gen.TagType_BOOL, 298 } 299 } 300 301 // jaegerBatchList transforms a slice of spans into a slice of jaeger Batch. 302 func jaegerBatchList(ssl []sdktrace.ReadOnlySpan, defaultServiceName string) []*gen.Batch { 303 if len(ssl) == 0 { 304 return nil 305 } 306 307 batchDict := make(map[attribute.Distinct]*gen.Batch) 308 309 for _, ss := range ssl { 310 if ss == nil { 311 continue 312 } 313 314 resourceKey := ss.Resource().Equivalent() 315 batch, bOK := batchDict[resourceKey] 316 if !bOK { 317 batch = &gen.Batch{ 318 Process: process(ss.Resource(), defaultServiceName), 319 Spans: []*gen.Span{}, 320 } 321 } 322 batch.Spans = append(batch.Spans, spanToThrift(ss)) 323 batchDict[resourceKey] = batch 324 } 325 326 // Transform the categorized map into a slice 327 batchList := make([]*gen.Batch, 0, len(batchDict)) 328 for _, batch := range batchDict { 329 batchList = append(batchList, batch) 330 } 331 return batchList 332 } 333 334 // process transforms an OTel Resource into a jaeger Process. 335 func process(res *resource.Resource, defaultServiceName string) *gen.Process { 336 var process gen.Process 337 338 var serviceName attribute.KeyValue 339 if res != nil { 340 for iter := res.Iter(); iter.Next(); { 341 if iter.Attribute().Key == semconv.ServiceNameKey { 342 serviceName = iter.Attribute() 343 // Don't convert service.name into tag. 344 continue 345 } 346 if tag := keyValueToTag(iter.Attribute()); tag != nil { 347 process.Tags = append(process.Tags, tag) 348 } 349 } 350 } 351 352 // If no service.name is contained in a Span's Resource, 353 // that field MUST be populated from the default Resource. 354 if serviceName.Value.AsString() == "" { 355 serviceName = semconv.ServiceName(defaultServiceName) 356 } 357 process.ServiceName = serviceName.Value.AsString() 358 359 return &process 360 }