gtsocial-umbx

Unnamed repository; edit this file 'description' to name the repository.
Log | Files | Refs | README | LICENSE

driver.go (12199B)


      1 package otelsql
      2 
      3 import (
      4 	"context"
      5 	"database/sql"
      6 	"database/sql/driver"
      7 	"errors"
      8 
      9 	"go.opentelemetry.io/otel/trace"
     10 )
     11 
     12 // Open is a wrapper over sql.Open that instruments the sql.DB to record executed queries
     13 // using OpenTelemetry API.
     14 func Open(driverName, dsn string, opts ...Option) (*sql.DB, error) {
     15 	db, err := sql.Open(driverName, dsn)
     16 	if err != nil {
     17 		return nil, err
     18 	}
     19 	return patchDB(db, dsn, opts...)
     20 }
     21 
     22 func patchDB(db *sql.DB, dsn string, opts ...Option) (*sql.DB, error) {
     23 	dbDriver := db.Driver()
     24 	d := newDriver(dbDriver, opts)
     25 
     26 	if _, ok := dbDriver.(driver.DriverContext); ok {
     27 		connector, err := d.OpenConnector(dsn)
     28 		if err != nil {
     29 			return nil, err
     30 		}
     31 		return sqlOpenDB(connector, d.instrum), nil
     32 	}
     33 
     34 	return sqlOpenDB(&dsnConnector{
     35 		driver: d,
     36 		dsn:    dsn,
     37 	}, d.instrum), nil
     38 }
     39 
     40 // OpenDB is a wrapper over sql.OpenDB that instruments the sql.DB to record executed queries
     41 // using OpenTelemetry API.
     42 func OpenDB(connector driver.Connector, opts ...Option) *sql.DB {
     43 	instrum := newDBInstrum(opts)
     44 	c := newConnector(connector.Driver(), connector, instrum)
     45 	return sqlOpenDB(c, instrum)
     46 }
     47 
     48 func sqlOpenDB(connector driver.Connector, instrum *dbInstrum) *sql.DB {
     49 	db := sql.OpenDB(connector)
     50 	ReportDBStatsMetrics(db, WithMeterProvider(instrum.meterProvider), WithAttributes(instrum.attrs...))
     51 	return db
     52 }
     53 
     54 type dsnConnector struct {
     55 	driver *otelDriver
     56 	dsn    string
     57 }
     58 
     59 func (c *dsnConnector) Connect(ctx context.Context) (driver.Conn, error) {
     60 	var conn driver.Conn
     61 	err := c.driver.instrum.withSpan(ctx, "db.Connect", "",
     62 		func(ctx context.Context, span trace.Span) error {
     63 			var err error
     64 			conn, err = c.driver.Open(c.dsn)
     65 			return err
     66 		})
     67 	return conn, err
     68 }
     69 
     70 func (c *dsnConnector) Driver() driver.Driver {
     71 	return c.driver
     72 }
     73 
     74 //------------------------------------------------------------------------------
     75 
     76 type otelDriver struct {
     77 	driver    driver.Driver
     78 	driverCtx driver.DriverContext
     79 	instrum   *dbInstrum
     80 }
     81 
     82 var _ driver.DriverContext = (*otelDriver)(nil)
     83 
     84 func newDriver(dr driver.Driver, opts []Option) *otelDriver {
     85 	driverCtx, _ := dr.(driver.DriverContext)
     86 	d := &otelDriver{
     87 		driver:    dr,
     88 		driverCtx: driverCtx,
     89 		instrum:   newDBInstrum(opts),
     90 	}
     91 	return d
     92 }
     93 
     94 func (d *otelDriver) Open(name string) (driver.Conn, error) {
     95 	conn, err := d.driver.Open(name)
     96 	if err != nil {
     97 		return nil, err
     98 	}
     99 	return newConn(conn, d.instrum), nil
    100 }
    101 
    102 func (d *otelDriver) OpenConnector(dsn string) (driver.Connector, error) {
    103 	connector, err := d.driverCtx.OpenConnector(dsn)
    104 	if err != nil {
    105 		return nil, err
    106 	}
    107 	return newConnector(d, connector, d.instrum), nil
    108 }
    109 
    110 //------------------------------------------------------------------------------
    111 
    112 type connector struct {
    113 	driver.Connector
    114 	driver  driver.Driver
    115 	instrum *dbInstrum
    116 }
    117 
    118 var _ driver.Connector = (*connector)(nil)
    119 
    120 func newConnector(d driver.Driver, c driver.Connector, instrum *dbInstrum) *connector {
    121 	return &connector{
    122 		driver:    d,
    123 		Connector: c,
    124 		instrum:   instrum,
    125 	}
    126 }
    127 
    128 func (c *connector) Connect(ctx context.Context) (driver.Conn, error) {
    129 	var conn driver.Conn
    130 	if err := c.instrum.withSpan(ctx, "db.Connect", "",
    131 		func(ctx context.Context, span trace.Span) error {
    132 			var err error
    133 			conn, err = c.Connector.Connect(ctx)
    134 			return err
    135 		}); err != nil {
    136 		return nil, err
    137 	}
    138 	return newConn(conn, c.instrum), nil
    139 }
    140 
    141 func (c *connector) Driver() driver.Driver {
    142 	return c.driver
    143 }
    144 
    145 //------------------------------------------------------------------------------
    146 
    147 type otelConn struct {
    148 	driver.Conn
    149 
    150 	instrum *dbInstrum
    151 
    152 	ping            pingFunc
    153 	exec            execFunc
    154 	execCtx         execCtxFunc
    155 	query           queryFunc
    156 	queryCtx        queryCtxFunc
    157 	prepareCtx      prepareCtxFunc
    158 	beginTx         beginTxFunc
    159 	resetSession    resetSessionFunc
    160 	checkNamedValue checkNamedValueFunc
    161 }
    162 
    163 var _ driver.Conn = (*otelConn)(nil)
    164 
    165 func newConn(conn driver.Conn, instrum *dbInstrum) *otelConn {
    166 	cn := &otelConn{
    167 		Conn:    conn,
    168 		instrum: instrum,
    169 	}
    170 
    171 	cn.ping = cn.createPingFunc(conn)
    172 	cn.exec = cn.createExecFunc(conn)
    173 	cn.execCtx = cn.createExecCtxFunc(conn)
    174 	cn.query = cn.createQueryFunc(conn)
    175 	cn.queryCtx = cn.createQueryCtxFunc(conn)
    176 	cn.prepareCtx = cn.createPrepareCtxFunc(conn)
    177 	cn.beginTx = cn.createBeginTxFunc(conn)
    178 	cn.resetSession = cn.createResetSessionFunc(conn)
    179 	cn.checkNamedValue = cn.createCheckNamedValueFunc(conn)
    180 
    181 	return cn
    182 }
    183 
    184 var _ driver.Pinger = (*otelConn)(nil)
    185 
    186 func (c *otelConn) Ping(ctx context.Context) error {
    187 	return c.ping(ctx)
    188 }
    189 
    190 type pingFunc func(ctx context.Context) error
    191 
    192 func (c *otelConn) createPingFunc(conn driver.Conn) pingFunc {
    193 	if pinger, ok := conn.(driver.Pinger); ok {
    194 		return func(ctx context.Context) error {
    195 			return c.instrum.withSpan(ctx, "db.Ping", "",
    196 				func(ctx context.Context, span trace.Span) error {
    197 					return pinger.Ping(ctx)
    198 				})
    199 		}
    200 	}
    201 	return func(ctx context.Context) error {
    202 		return driver.ErrSkip
    203 	}
    204 }
    205 
    206 //------------------------------------------------------------------------------
    207 
    208 var _ driver.Execer = (*otelConn)(nil)
    209 
    210 func (c *otelConn) Exec(query string, args []driver.Value) (driver.Result, error) {
    211 	return c.exec(query, args)
    212 }
    213 
    214 type execFunc func(query string, args []driver.Value) (driver.Result, error)
    215 
    216 func (c *otelConn) createExecFunc(conn driver.Conn) execFunc {
    217 	if execer, ok := conn.(driver.Execer); ok {
    218 		return func(query string, args []driver.Value) (driver.Result, error) {
    219 			return execer.Exec(query, args)
    220 		}
    221 	}
    222 	return func(query string, args []driver.Value) (driver.Result, error) {
    223 		return nil, driver.ErrSkip
    224 	}
    225 }
    226 
    227 //------------------------------------------------------------------------------
    228 
    229 var _ driver.ExecerContext = (*otelConn)(nil)
    230 
    231 func (c *otelConn) ExecContext(
    232 	ctx context.Context, query string, args []driver.NamedValue,
    233 ) (driver.Result, error) {
    234 	return c.execCtx(ctx, query, args)
    235 }
    236 
    237 type execCtxFunc func(ctx context.Context, query string, args []driver.NamedValue) (driver.Result, error)
    238 
    239 func (c *otelConn) createExecCtxFunc(conn driver.Conn) execCtxFunc {
    240 	var fn execCtxFunc
    241 
    242 	if execer, ok := conn.(driver.ExecerContext); ok {
    243 		fn = execer.ExecContext
    244 	} else {
    245 		fn = func(ctx context.Context, query string, args []driver.NamedValue) (driver.Result, error) {
    246 			vArgs, err := namedValueToValue(args)
    247 			if err != nil {
    248 				return nil, err
    249 			}
    250 			return c.exec(query, vArgs)
    251 		}
    252 	}
    253 
    254 	return func(ctx context.Context, query string, args []driver.NamedValue) (driver.Result, error) {
    255 		var res driver.Result
    256 		if err := c.instrum.withSpan(ctx, "db.Exec", query,
    257 			func(ctx context.Context, span trace.Span) error {
    258 				var err error
    259 				res, err = fn(ctx, query, args)
    260 				if err != nil {
    261 					return err
    262 				}
    263 
    264 				if span.IsRecording() {
    265 					rows, err := res.RowsAffected()
    266 					if err == nil {
    267 						span.SetAttributes(dbRowsAffected.Int64(rows))
    268 					}
    269 				}
    270 
    271 				return nil
    272 			}); err != nil {
    273 			return nil, err
    274 		}
    275 		return res, nil
    276 	}
    277 }
    278 
    279 //------------------------------------------------------------------------------
    280 
    281 var _ driver.Queryer = (*otelConn)(nil)
    282 
    283 func (c *otelConn) Query(query string, args []driver.Value) (driver.Rows, error) {
    284 	return c.query(query, args)
    285 }
    286 
    287 type queryFunc func(query string, args []driver.Value) (driver.Rows, error)
    288 
    289 func (c *otelConn) createQueryFunc(conn driver.Conn) queryFunc {
    290 	if queryer, ok := c.Conn.(driver.Queryer); ok {
    291 		return func(query string, args []driver.Value) (driver.Rows, error) {
    292 			return queryer.Query(query, args)
    293 		}
    294 	}
    295 	return func(query string, args []driver.Value) (driver.Rows, error) {
    296 		return nil, driver.ErrSkip
    297 	}
    298 }
    299 
    300 //------------------------------------------------------------------------------
    301 
    302 var _ driver.QueryerContext = (*otelConn)(nil)
    303 
    304 func (c *otelConn) QueryContext(
    305 	ctx context.Context, query string, args []driver.NamedValue,
    306 ) (driver.Rows, error) {
    307 	return c.queryCtx(ctx, query, args)
    308 }
    309 
    310 type queryCtxFunc func(ctx context.Context, query string, args []driver.NamedValue) (driver.Rows, error)
    311 
    312 func (c *otelConn) createQueryCtxFunc(conn driver.Conn) queryCtxFunc {
    313 	var fn queryCtxFunc
    314 
    315 	if queryer, ok := c.Conn.(driver.QueryerContext); ok {
    316 		fn = queryer.QueryContext
    317 	} else {
    318 		fn = func(ctx context.Context, query string, args []driver.NamedValue) (driver.Rows, error) {
    319 			vArgs, err := namedValueToValue(args)
    320 			if err != nil {
    321 				return nil, err
    322 			}
    323 			return c.query(query, vArgs)
    324 		}
    325 	}
    326 
    327 	return func(ctx context.Context, query string, args []driver.NamedValue) (driver.Rows, error) {
    328 		var rows driver.Rows
    329 		err := c.instrum.withSpan(ctx, "db.Query", query,
    330 			func(ctx context.Context, span trace.Span) error {
    331 				var err error
    332 				rows, err = fn(ctx, query, args)
    333 				return err
    334 			})
    335 		return rows, err
    336 	}
    337 }
    338 
    339 //------------------------------------------------------------------------------
    340 
    341 var _ driver.ConnPrepareContext = (*otelConn)(nil)
    342 
    343 func (c *otelConn) PrepareContext(ctx context.Context, query string) (driver.Stmt, error) {
    344 	return c.prepareCtx(ctx, query)
    345 }
    346 
    347 type prepareCtxFunc func(ctx context.Context, query string) (driver.Stmt, error)
    348 
    349 func (c *otelConn) createPrepareCtxFunc(conn driver.Conn) prepareCtxFunc {
    350 	var fn prepareCtxFunc
    351 
    352 	if preparer, ok := c.Conn.(driver.ConnPrepareContext); ok {
    353 		fn = preparer.PrepareContext
    354 	} else {
    355 		fn = func(ctx context.Context, query string) (driver.Stmt, error) {
    356 			return c.Conn.Prepare(query)
    357 		}
    358 	}
    359 
    360 	return func(ctx context.Context, query string) (driver.Stmt, error) {
    361 		var stmt driver.Stmt
    362 		if err := c.instrum.withSpan(ctx, "db.Prepare", query,
    363 			func(ctx context.Context, span trace.Span) error {
    364 				var err error
    365 				stmt, err = fn(ctx, query)
    366 				return err
    367 			}); err != nil {
    368 			return nil, err
    369 		}
    370 		return newStmt(stmt, query, c.instrum), nil
    371 	}
    372 }
    373 
    374 var _ driver.ConnBeginTx = (*otelConn)(nil)
    375 
    376 func (c *otelConn) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx, error) {
    377 	return c.beginTx(ctx, opts)
    378 }
    379 
    380 type beginTxFunc func(ctx context.Context, opts driver.TxOptions) (driver.Tx, error)
    381 
    382 func (c *otelConn) createBeginTxFunc(conn driver.Conn) beginTxFunc {
    383 	var fn beginTxFunc
    384 
    385 	if txor, ok := conn.(driver.ConnBeginTx); ok {
    386 		fn = txor.BeginTx
    387 	} else {
    388 		fn = func(ctx context.Context, opts driver.TxOptions) (driver.Tx, error) {
    389 			return conn.Begin()
    390 		}
    391 	}
    392 
    393 	return func(ctx context.Context, opts driver.TxOptions) (driver.Tx, error) {
    394 		var tx driver.Tx
    395 		if err := c.instrum.withSpan(ctx, "db.Begin", "",
    396 			func(ctx context.Context, span trace.Span) error {
    397 				var err error
    398 				tx, err = fn(ctx, opts)
    399 				return err
    400 			}); err != nil {
    401 			return nil, err
    402 		}
    403 		return newTx(ctx, tx, c.instrum), nil
    404 	}
    405 }
    406 
    407 //------------------------------------------------------------------------------
    408 
    409 var _ driver.SessionResetter = (*otelConn)(nil)
    410 
    411 func (c *otelConn) ResetSession(ctx context.Context) error {
    412 	return c.resetSession(ctx)
    413 }
    414 
    415 type resetSessionFunc func(ctx context.Context) error
    416 
    417 func (c *otelConn) createResetSessionFunc(conn driver.Conn) resetSessionFunc {
    418 	if resetter, ok := c.Conn.(driver.SessionResetter); ok {
    419 		return func(ctx context.Context) error {
    420 			return resetter.ResetSession(ctx)
    421 		}
    422 	}
    423 	return func(ctx context.Context) error {
    424 		return driver.ErrSkip
    425 	}
    426 }
    427 
    428 //------------------------------------------------------------------------------
    429 
    430 var _ driver.NamedValueChecker = (*otelConn)(nil)
    431 
    432 func (c *otelConn) CheckNamedValue(value *driver.NamedValue) error {
    433 	return c.checkNamedValue(value)
    434 }
    435 
    436 type checkNamedValueFunc func(*driver.NamedValue) error
    437 
    438 func (c *otelConn) createCheckNamedValueFunc(conn driver.Conn) checkNamedValueFunc {
    439 	if checker, ok := c.Conn.(driver.NamedValueChecker); ok {
    440 		return func(value *driver.NamedValue) error {
    441 			return checker.CheckNamedValue(value)
    442 		}
    443 	}
    444 	return func(value *driver.NamedValue) error {
    445 		return driver.ErrSkip
    446 	}
    447 }
    448 
    449 //------------------------------------------------------------------------------
    450 
    451 func namedValueToValue(named []driver.NamedValue) ([]driver.Value, error) {
    452 	args := make([]driver.Value, len(named))
    453 	for n, param := range named {
    454 		if len(param.Name) > 0 {
    455 			return nil, errors.New("otelsql: driver does not support named parameters")
    456 		}
    457 		args[n] = param.Value
    458 	}
    459 	return args, nil
    460 }