otel.go (4316B)
1 package bunotel 2 3 import ( 4 "context" 5 "database/sql" 6 "runtime" 7 "strings" 8 "time" 9 10 "go.opentelemetry.io/otel" 11 "go.opentelemetry.io/otel/attribute" 12 "go.opentelemetry.io/otel/codes" 13 "go.opentelemetry.io/otel/metric" 14 semconv "go.opentelemetry.io/otel/semconv/v1.20.0" 15 "go.opentelemetry.io/otel/trace" 16 17 "github.com/uptrace/bun" 18 "github.com/uptrace/bun/dialect" 19 "github.com/uptrace/bun/schema" 20 "github.com/uptrace/opentelemetry-go-extra/otelsql" 21 ) 22 23 var ( 24 tracer = otel.Tracer("github.com/uptrace/bun") 25 meter = otel.Meter("github.com/uptrace/bun") 26 27 queryHistogram, _ = meter.Int64Histogram( 28 "go.sql.query_timing", 29 metric.WithDescription("Timing of processed queries"), 30 metric.WithUnit("milliseconds"), 31 ) 32 ) 33 34 type QueryHook struct { 35 attrs []attribute.KeyValue 36 formatQueries bool 37 } 38 39 var _ bun.QueryHook = (*QueryHook)(nil) 40 41 func NewQueryHook(opts ...Option) *QueryHook { 42 h := new(QueryHook) 43 for _, opt := range opts { 44 opt(h) 45 } 46 return h 47 } 48 49 func (h *QueryHook) Init(db *bun.DB) { 50 labels := make([]attribute.KeyValue, 0, len(h.attrs)+1) 51 labels = append(labels, h.attrs...) 52 if sys := dbSystem(db); sys.Valid() { 53 labels = append(labels, sys) 54 } 55 56 otelsql.ReportDBStatsMetrics(db.DB, otelsql.WithAttributes(labels...)) 57 } 58 59 func (h *QueryHook) BeforeQuery(ctx context.Context, event *bun.QueryEvent) context.Context { 60 ctx, _ = tracer.Start(ctx, "", trace.WithSpanKind(trace.SpanKindClient)) 61 return ctx 62 } 63 64 func (h *QueryHook) AfterQuery(ctx context.Context, event *bun.QueryEvent) { 65 operation := event.Operation() 66 dbOperation := semconv.DBOperationKey.String(operation) 67 68 labels := make([]attribute.KeyValue, 0, len(h.attrs)+2) 69 labels = append(labels, h.attrs...) 70 labels = append(labels, dbOperation) 71 if event.IQuery != nil { 72 if tableName := event.IQuery.GetTableName(); tableName != "" { 73 labels = append(labels, semconv.DBSQLTableKey.String(tableName)) 74 } 75 } 76 77 dur := time.Since(event.StartTime) 78 queryHistogram.Record(ctx, dur.Milliseconds(), metric.WithAttributes(labels...)) 79 80 span := trace.SpanFromContext(ctx) 81 if !span.IsRecording() { 82 return 83 } 84 85 span.SetName(operation) 86 defer span.End() 87 88 query := h.eventQuery(event) 89 fn, file, line := funcFileLine("github.com/uptrace/bun") 90 91 attrs := make([]attribute.KeyValue, 0, 10) 92 attrs = append(attrs, h.attrs...) 93 attrs = append(attrs, 94 dbOperation, 95 semconv.DBStatementKey.String(query), 96 semconv.CodeFunctionKey.String(fn), 97 semconv.CodeFilepathKey.String(file), 98 semconv.CodeLineNumberKey.Int(line), 99 ) 100 101 if sys := dbSystem(event.DB); sys.Valid() { 102 attrs = append(attrs, sys) 103 } 104 if event.Result != nil { 105 if n, _ := event.Result.RowsAffected(); n > 0 { 106 attrs = append(attrs, attribute.Int64("db.rows_affected", n)) 107 } 108 } 109 110 switch event.Err { 111 case nil, sql.ErrNoRows, sql.ErrTxDone: 112 // ignore 113 default: 114 span.RecordError(event.Err) 115 span.SetStatus(codes.Error, event.Err.Error()) 116 } 117 118 span.SetAttributes(attrs...) 119 } 120 121 func funcFileLine(pkg string) (string, string, int) { 122 const depth = 16 123 var pcs [depth]uintptr 124 n := runtime.Callers(3, pcs[:]) 125 ff := runtime.CallersFrames(pcs[:n]) 126 127 var fn, file string 128 var line int 129 for { 130 f, ok := ff.Next() 131 if !ok { 132 break 133 } 134 fn, file, line = f.Function, f.File, f.Line 135 if !strings.Contains(fn, pkg) { 136 break 137 } 138 } 139 140 if ind := strings.LastIndexByte(fn, '/'); ind != -1 { 141 fn = fn[ind+1:] 142 } 143 144 return fn, file, line 145 } 146 147 func (h *QueryHook) eventQuery(event *bun.QueryEvent) string { 148 const softQueryLimit = 8000 149 const hardQueryLimit = 16000 150 151 var query string 152 153 if h.formatQueries && len(event.Query) <= softQueryLimit { 154 query = event.Query 155 } else { 156 query = unformattedQuery(event) 157 } 158 159 if len(query) > hardQueryLimit { 160 query = query[:hardQueryLimit] 161 } 162 163 return query 164 } 165 166 func unformattedQuery(event *bun.QueryEvent) string { 167 if event.IQuery != nil { 168 if b, err := event.IQuery.AppendQuery(schema.NewNopFormatter(), nil); err == nil { 169 return bytesToString(b) 170 } 171 } 172 return string(event.QueryTemplate) 173 } 174 175 func dbSystem(db *bun.DB) attribute.KeyValue { 176 switch db.Dialect().Name() { 177 case dialect.PG: 178 return semconv.DBSystemPostgreSQL 179 case dialect.MySQL: 180 return semconv.DBSystemMySQL 181 case dialect.SQLite: 182 return semconv.DBSystemSqlite 183 case dialect.MSSQL: 184 return semconv.DBSystemMSSQL 185 default: 186 return attribute.KeyValue{} 187 } 188 }