gtsocial-umbx

Unnamed repository; edit this file 'description' to name the repository.
Log | Files | Refs | README | LICENSE

otel.go (4316B)


      1 package bunotel
      2 
      3 import (
      4 	"context"
      5 	"database/sql"
      6 	"runtime"
      7 	"strings"
      8 	"time"
      9 
     10 	"go.opentelemetry.io/otel"
     11 	"go.opentelemetry.io/otel/attribute"
     12 	"go.opentelemetry.io/otel/codes"
     13 	"go.opentelemetry.io/otel/metric"
     14 	semconv "go.opentelemetry.io/otel/semconv/v1.20.0"
     15 	"go.opentelemetry.io/otel/trace"
     16 
     17 	"github.com/uptrace/bun"
     18 	"github.com/uptrace/bun/dialect"
     19 	"github.com/uptrace/bun/schema"
     20 	"github.com/uptrace/opentelemetry-go-extra/otelsql"
     21 )
     22 
     23 var (
     24 	tracer = otel.Tracer("github.com/uptrace/bun")
     25 	meter  = otel.Meter("github.com/uptrace/bun")
     26 
     27 	queryHistogram, _ = meter.Int64Histogram(
     28 		"go.sql.query_timing",
     29 		metric.WithDescription("Timing of processed queries"),
     30 		metric.WithUnit("milliseconds"),
     31 	)
     32 )
     33 
     34 type QueryHook struct {
     35 	attrs         []attribute.KeyValue
     36 	formatQueries bool
     37 }
     38 
     39 var _ bun.QueryHook = (*QueryHook)(nil)
     40 
     41 func NewQueryHook(opts ...Option) *QueryHook {
     42 	h := new(QueryHook)
     43 	for _, opt := range opts {
     44 		opt(h)
     45 	}
     46 	return h
     47 }
     48 
     49 func (h *QueryHook) Init(db *bun.DB) {
     50 	labels := make([]attribute.KeyValue, 0, len(h.attrs)+1)
     51 	labels = append(labels, h.attrs...)
     52 	if sys := dbSystem(db); sys.Valid() {
     53 		labels = append(labels, sys)
     54 	}
     55 
     56 	otelsql.ReportDBStatsMetrics(db.DB, otelsql.WithAttributes(labels...))
     57 }
     58 
     59 func (h *QueryHook) BeforeQuery(ctx context.Context, event *bun.QueryEvent) context.Context {
     60 	ctx, _ = tracer.Start(ctx, "", trace.WithSpanKind(trace.SpanKindClient))
     61 	return ctx
     62 }
     63 
     64 func (h *QueryHook) AfterQuery(ctx context.Context, event *bun.QueryEvent) {
     65 	operation := event.Operation()
     66 	dbOperation := semconv.DBOperationKey.String(operation)
     67 
     68 	labels := make([]attribute.KeyValue, 0, len(h.attrs)+2)
     69 	labels = append(labels, h.attrs...)
     70 	labels = append(labels, dbOperation)
     71 	if event.IQuery != nil {
     72 		if tableName := event.IQuery.GetTableName(); tableName != "" {
     73 			labels = append(labels, semconv.DBSQLTableKey.String(tableName))
     74 		}
     75 	}
     76 
     77 	dur := time.Since(event.StartTime)
     78 	queryHistogram.Record(ctx, dur.Milliseconds(), metric.WithAttributes(labels...))
     79 
     80 	span := trace.SpanFromContext(ctx)
     81 	if !span.IsRecording() {
     82 		return
     83 	}
     84 
     85 	span.SetName(operation)
     86 	defer span.End()
     87 
     88 	query := h.eventQuery(event)
     89 	fn, file, line := funcFileLine("github.com/uptrace/bun")
     90 
     91 	attrs := make([]attribute.KeyValue, 0, 10)
     92 	attrs = append(attrs, h.attrs...)
     93 	attrs = append(attrs,
     94 		dbOperation,
     95 		semconv.DBStatementKey.String(query),
     96 		semconv.CodeFunctionKey.String(fn),
     97 		semconv.CodeFilepathKey.String(file),
     98 		semconv.CodeLineNumberKey.Int(line),
     99 	)
    100 
    101 	if sys := dbSystem(event.DB); sys.Valid() {
    102 		attrs = append(attrs, sys)
    103 	}
    104 	if event.Result != nil {
    105 		if n, _ := event.Result.RowsAffected(); n > 0 {
    106 			attrs = append(attrs, attribute.Int64("db.rows_affected", n))
    107 		}
    108 	}
    109 
    110 	switch event.Err {
    111 	case nil, sql.ErrNoRows, sql.ErrTxDone:
    112 		// ignore
    113 	default:
    114 		span.RecordError(event.Err)
    115 		span.SetStatus(codes.Error, event.Err.Error())
    116 	}
    117 
    118 	span.SetAttributes(attrs...)
    119 }
    120 
    121 func funcFileLine(pkg string) (string, string, int) {
    122 	const depth = 16
    123 	var pcs [depth]uintptr
    124 	n := runtime.Callers(3, pcs[:])
    125 	ff := runtime.CallersFrames(pcs[:n])
    126 
    127 	var fn, file string
    128 	var line int
    129 	for {
    130 		f, ok := ff.Next()
    131 		if !ok {
    132 			break
    133 		}
    134 		fn, file, line = f.Function, f.File, f.Line
    135 		if !strings.Contains(fn, pkg) {
    136 			break
    137 		}
    138 	}
    139 
    140 	if ind := strings.LastIndexByte(fn, '/'); ind != -1 {
    141 		fn = fn[ind+1:]
    142 	}
    143 
    144 	return fn, file, line
    145 }
    146 
    147 func (h *QueryHook) eventQuery(event *bun.QueryEvent) string {
    148 	const softQueryLimit = 8000
    149 	const hardQueryLimit = 16000
    150 
    151 	var query string
    152 
    153 	if h.formatQueries && len(event.Query) <= softQueryLimit {
    154 		query = event.Query
    155 	} else {
    156 		query = unformattedQuery(event)
    157 	}
    158 
    159 	if len(query) > hardQueryLimit {
    160 		query = query[:hardQueryLimit]
    161 	}
    162 
    163 	return query
    164 }
    165 
    166 func unformattedQuery(event *bun.QueryEvent) string {
    167 	if event.IQuery != nil {
    168 		if b, err := event.IQuery.AppendQuery(schema.NewNopFormatter(), nil); err == nil {
    169 			return bytesToString(b)
    170 		}
    171 	}
    172 	return string(event.QueryTemplate)
    173 }
    174 
    175 func dbSystem(db *bun.DB) attribute.KeyValue {
    176 	switch db.Dialect().Name() {
    177 	case dialect.PG:
    178 		return semconv.DBSystemPostgreSQL
    179 	case dialect.MySQL:
    180 		return semconv.DBSystemMySQL
    181 	case dialect.SQLite:
    182 		return semconv.DBSystemSqlite
    183 	case dialect.MSSQL:
    184 		return semconv.DBSystemMSSQL
    185 	default:
    186 		return attribute.KeyValue{}
    187 	}
    188 }