driver.go (12199B)
1 package otelsql 2 3 import ( 4 "context" 5 "database/sql" 6 "database/sql/driver" 7 "errors" 8 9 "go.opentelemetry.io/otel/trace" 10 ) 11 12 // Open is a wrapper over sql.Open that instruments the sql.DB to record executed queries 13 // using OpenTelemetry API. 14 func Open(driverName, dsn string, opts ...Option) (*sql.DB, error) { 15 db, err := sql.Open(driverName, dsn) 16 if err != nil { 17 return nil, err 18 } 19 return patchDB(db, dsn, opts...) 20 } 21 22 func patchDB(db *sql.DB, dsn string, opts ...Option) (*sql.DB, error) { 23 dbDriver := db.Driver() 24 d := newDriver(dbDriver, opts) 25 26 if _, ok := dbDriver.(driver.DriverContext); ok { 27 connector, err := d.OpenConnector(dsn) 28 if err != nil { 29 return nil, err 30 } 31 return sqlOpenDB(connector, d.instrum), nil 32 } 33 34 return sqlOpenDB(&dsnConnector{ 35 driver: d, 36 dsn: dsn, 37 }, d.instrum), nil 38 } 39 40 // OpenDB is a wrapper over sql.OpenDB that instruments the sql.DB to record executed queries 41 // using OpenTelemetry API. 42 func OpenDB(connector driver.Connector, opts ...Option) *sql.DB { 43 instrum := newDBInstrum(opts) 44 c := newConnector(connector.Driver(), connector, instrum) 45 return sqlOpenDB(c, instrum) 46 } 47 48 func sqlOpenDB(connector driver.Connector, instrum *dbInstrum) *sql.DB { 49 db := sql.OpenDB(connector) 50 ReportDBStatsMetrics(db, WithMeterProvider(instrum.meterProvider), WithAttributes(instrum.attrs...)) 51 return db 52 } 53 54 type dsnConnector struct { 55 driver *otelDriver 56 dsn string 57 } 58 59 func (c *dsnConnector) Connect(ctx context.Context) (driver.Conn, error) { 60 var conn driver.Conn 61 err := c.driver.instrum.withSpan(ctx, "db.Connect", "", 62 func(ctx context.Context, span trace.Span) error { 63 var err error 64 conn, err = c.driver.Open(c.dsn) 65 return err 66 }) 67 return conn, err 68 } 69 70 func (c *dsnConnector) Driver() driver.Driver { 71 return c.driver 72 } 73 74 //------------------------------------------------------------------------------ 75 76 type otelDriver struct { 77 driver driver.Driver 78 driverCtx driver.DriverContext 79 instrum *dbInstrum 80 } 81 82 var _ driver.DriverContext = (*otelDriver)(nil) 83 84 func newDriver(dr driver.Driver, opts []Option) *otelDriver { 85 driverCtx, _ := dr.(driver.DriverContext) 86 d := &otelDriver{ 87 driver: dr, 88 driverCtx: driverCtx, 89 instrum: newDBInstrum(opts), 90 } 91 return d 92 } 93 94 func (d *otelDriver) Open(name string) (driver.Conn, error) { 95 conn, err := d.driver.Open(name) 96 if err != nil { 97 return nil, err 98 } 99 return newConn(conn, d.instrum), nil 100 } 101 102 func (d *otelDriver) OpenConnector(dsn string) (driver.Connector, error) { 103 connector, err := d.driverCtx.OpenConnector(dsn) 104 if err != nil { 105 return nil, err 106 } 107 return newConnector(d, connector, d.instrum), nil 108 } 109 110 //------------------------------------------------------------------------------ 111 112 type connector struct { 113 driver.Connector 114 driver driver.Driver 115 instrum *dbInstrum 116 } 117 118 var _ driver.Connector = (*connector)(nil) 119 120 func newConnector(d driver.Driver, c driver.Connector, instrum *dbInstrum) *connector { 121 return &connector{ 122 driver: d, 123 Connector: c, 124 instrum: instrum, 125 } 126 } 127 128 func (c *connector) Connect(ctx context.Context) (driver.Conn, error) { 129 var conn driver.Conn 130 if err := c.instrum.withSpan(ctx, "db.Connect", "", 131 func(ctx context.Context, span trace.Span) error { 132 var err error 133 conn, err = c.Connector.Connect(ctx) 134 return err 135 }); err != nil { 136 return nil, err 137 } 138 return newConn(conn, c.instrum), nil 139 } 140 141 func (c *connector) Driver() driver.Driver { 142 return c.driver 143 } 144 145 //------------------------------------------------------------------------------ 146 147 type otelConn struct { 148 driver.Conn 149 150 instrum *dbInstrum 151 152 ping pingFunc 153 exec execFunc 154 execCtx execCtxFunc 155 query queryFunc 156 queryCtx queryCtxFunc 157 prepareCtx prepareCtxFunc 158 beginTx beginTxFunc 159 resetSession resetSessionFunc 160 checkNamedValue checkNamedValueFunc 161 } 162 163 var _ driver.Conn = (*otelConn)(nil) 164 165 func newConn(conn driver.Conn, instrum *dbInstrum) *otelConn { 166 cn := &otelConn{ 167 Conn: conn, 168 instrum: instrum, 169 } 170 171 cn.ping = cn.createPingFunc(conn) 172 cn.exec = cn.createExecFunc(conn) 173 cn.execCtx = cn.createExecCtxFunc(conn) 174 cn.query = cn.createQueryFunc(conn) 175 cn.queryCtx = cn.createQueryCtxFunc(conn) 176 cn.prepareCtx = cn.createPrepareCtxFunc(conn) 177 cn.beginTx = cn.createBeginTxFunc(conn) 178 cn.resetSession = cn.createResetSessionFunc(conn) 179 cn.checkNamedValue = cn.createCheckNamedValueFunc(conn) 180 181 return cn 182 } 183 184 var _ driver.Pinger = (*otelConn)(nil) 185 186 func (c *otelConn) Ping(ctx context.Context) error { 187 return c.ping(ctx) 188 } 189 190 type pingFunc func(ctx context.Context) error 191 192 func (c *otelConn) createPingFunc(conn driver.Conn) pingFunc { 193 if pinger, ok := conn.(driver.Pinger); ok { 194 return func(ctx context.Context) error { 195 return c.instrum.withSpan(ctx, "db.Ping", "", 196 func(ctx context.Context, span trace.Span) error { 197 return pinger.Ping(ctx) 198 }) 199 } 200 } 201 return func(ctx context.Context) error { 202 return driver.ErrSkip 203 } 204 } 205 206 //------------------------------------------------------------------------------ 207 208 var _ driver.Execer = (*otelConn)(nil) 209 210 func (c *otelConn) Exec(query string, args []driver.Value) (driver.Result, error) { 211 return c.exec(query, args) 212 } 213 214 type execFunc func(query string, args []driver.Value) (driver.Result, error) 215 216 func (c *otelConn) createExecFunc(conn driver.Conn) execFunc { 217 if execer, ok := conn.(driver.Execer); ok { 218 return func(query string, args []driver.Value) (driver.Result, error) { 219 return execer.Exec(query, args) 220 } 221 } 222 return func(query string, args []driver.Value) (driver.Result, error) { 223 return nil, driver.ErrSkip 224 } 225 } 226 227 //------------------------------------------------------------------------------ 228 229 var _ driver.ExecerContext = (*otelConn)(nil) 230 231 func (c *otelConn) ExecContext( 232 ctx context.Context, query string, args []driver.NamedValue, 233 ) (driver.Result, error) { 234 return c.execCtx(ctx, query, args) 235 } 236 237 type execCtxFunc func(ctx context.Context, query string, args []driver.NamedValue) (driver.Result, error) 238 239 func (c *otelConn) createExecCtxFunc(conn driver.Conn) execCtxFunc { 240 var fn execCtxFunc 241 242 if execer, ok := conn.(driver.ExecerContext); ok { 243 fn = execer.ExecContext 244 } else { 245 fn = func(ctx context.Context, query string, args []driver.NamedValue) (driver.Result, error) { 246 vArgs, err := namedValueToValue(args) 247 if err != nil { 248 return nil, err 249 } 250 return c.exec(query, vArgs) 251 } 252 } 253 254 return func(ctx context.Context, query string, args []driver.NamedValue) (driver.Result, error) { 255 var res driver.Result 256 if err := c.instrum.withSpan(ctx, "db.Exec", query, 257 func(ctx context.Context, span trace.Span) error { 258 var err error 259 res, err = fn(ctx, query, args) 260 if err != nil { 261 return err 262 } 263 264 if span.IsRecording() { 265 rows, err := res.RowsAffected() 266 if err == nil { 267 span.SetAttributes(dbRowsAffected.Int64(rows)) 268 } 269 } 270 271 return nil 272 }); err != nil { 273 return nil, err 274 } 275 return res, nil 276 } 277 } 278 279 //------------------------------------------------------------------------------ 280 281 var _ driver.Queryer = (*otelConn)(nil) 282 283 func (c *otelConn) Query(query string, args []driver.Value) (driver.Rows, error) { 284 return c.query(query, args) 285 } 286 287 type queryFunc func(query string, args []driver.Value) (driver.Rows, error) 288 289 func (c *otelConn) createQueryFunc(conn driver.Conn) queryFunc { 290 if queryer, ok := c.Conn.(driver.Queryer); ok { 291 return func(query string, args []driver.Value) (driver.Rows, error) { 292 return queryer.Query(query, args) 293 } 294 } 295 return func(query string, args []driver.Value) (driver.Rows, error) { 296 return nil, driver.ErrSkip 297 } 298 } 299 300 //------------------------------------------------------------------------------ 301 302 var _ driver.QueryerContext = (*otelConn)(nil) 303 304 func (c *otelConn) QueryContext( 305 ctx context.Context, query string, args []driver.NamedValue, 306 ) (driver.Rows, error) { 307 return c.queryCtx(ctx, query, args) 308 } 309 310 type queryCtxFunc func(ctx context.Context, query string, args []driver.NamedValue) (driver.Rows, error) 311 312 func (c *otelConn) createQueryCtxFunc(conn driver.Conn) queryCtxFunc { 313 var fn queryCtxFunc 314 315 if queryer, ok := c.Conn.(driver.QueryerContext); ok { 316 fn = queryer.QueryContext 317 } else { 318 fn = func(ctx context.Context, query string, args []driver.NamedValue) (driver.Rows, error) { 319 vArgs, err := namedValueToValue(args) 320 if err != nil { 321 return nil, err 322 } 323 return c.query(query, vArgs) 324 } 325 } 326 327 return func(ctx context.Context, query string, args []driver.NamedValue) (driver.Rows, error) { 328 var rows driver.Rows 329 err := c.instrum.withSpan(ctx, "db.Query", query, 330 func(ctx context.Context, span trace.Span) error { 331 var err error 332 rows, err = fn(ctx, query, args) 333 return err 334 }) 335 return rows, err 336 } 337 } 338 339 //------------------------------------------------------------------------------ 340 341 var _ driver.ConnPrepareContext = (*otelConn)(nil) 342 343 func (c *otelConn) PrepareContext(ctx context.Context, query string) (driver.Stmt, error) { 344 return c.prepareCtx(ctx, query) 345 } 346 347 type prepareCtxFunc func(ctx context.Context, query string) (driver.Stmt, error) 348 349 func (c *otelConn) createPrepareCtxFunc(conn driver.Conn) prepareCtxFunc { 350 var fn prepareCtxFunc 351 352 if preparer, ok := c.Conn.(driver.ConnPrepareContext); ok { 353 fn = preparer.PrepareContext 354 } else { 355 fn = func(ctx context.Context, query string) (driver.Stmt, error) { 356 return c.Conn.Prepare(query) 357 } 358 } 359 360 return func(ctx context.Context, query string) (driver.Stmt, error) { 361 var stmt driver.Stmt 362 if err := c.instrum.withSpan(ctx, "db.Prepare", query, 363 func(ctx context.Context, span trace.Span) error { 364 var err error 365 stmt, err = fn(ctx, query) 366 return err 367 }); err != nil { 368 return nil, err 369 } 370 return newStmt(stmt, query, c.instrum), nil 371 } 372 } 373 374 var _ driver.ConnBeginTx = (*otelConn)(nil) 375 376 func (c *otelConn) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx, error) { 377 return c.beginTx(ctx, opts) 378 } 379 380 type beginTxFunc func(ctx context.Context, opts driver.TxOptions) (driver.Tx, error) 381 382 func (c *otelConn) createBeginTxFunc(conn driver.Conn) beginTxFunc { 383 var fn beginTxFunc 384 385 if txor, ok := conn.(driver.ConnBeginTx); ok { 386 fn = txor.BeginTx 387 } else { 388 fn = func(ctx context.Context, opts driver.TxOptions) (driver.Tx, error) { 389 return conn.Begin() 390 } 391 } 392 393 return func(ctx context.Context, opts driver.TxOptions) (driver.Tx, error) { 394 var tx driver.Tx 395 if err := c.instrum.withSpan(ctx, "db.Begin", "", 396 func(ctx context.Context, span trace.Span) error { 397 var err error 398 tx, err = fn(ctx, opts) 399 return err 400 }); err != nil { 401 return nil, err 402 } 403 return newTx(ctx, tx, c.instrum), nil 404 } 405 } 406 407 //------------------------------------------------------------------------------ 408 409 var _ driver.SessionResetter = (*otelConn)(nil) 410 411 func (c *otelConn) ResetSession(ctx context.Context) error { 412 return c.resetSession(ctx) 413 } 414 415 type resetSessionFunc func(ctx context.Context) error 416 417 func (c *otelConn) createResetSessionFunc(conn driver.Conn) resetSessionFunc { 418 if resetter, ok := c.Conn.(driver.SessionResetter); ok { 419 return func(ctx context.Context) error { 420 return resetter.ResetSession(ctx) 421 } 422 } 423 return func(ctx context.Context) error { 424 return driver.ErrSkip 425 } 426 } 427 428 //------------------------------------------------------------------------------ 429 430 var _ driver.NamedValueChecker = (*otelConn)(nil) 431 432 func (c *otelConn) CheckNamedValue(value *driver.NamedValue) error { 433 return c.checkNamedValue(value) 434 } 435 436 type checkNamedValueFunc func(*driver.NamedValue) error 437 438 func (c *otelConn) createCheckNamedValueFunc(conn driver.Conn) checkNamedValueFunc { 439 if checker, ok := c.Conn.(driver.NamedValueChecker); ok { 440 return func(value *driver.NamedValue) error { 441 return checker.CheckNamedValue(value) 442 } 443 } 444 return func(value *driver.NamedValue) error { 445 return driver.ErrSkip 446 } 447 } 448 449 //------------------------------------------------------------------------------ 450 451 func namedValueToValue(named []driver.NamedValue) ([]driver.Value, error) { 452 args := make([]driver.Value, len(named)) 453 for n, param := range named { 454 if len(param.Name) > 0 { 455 return nil, errors.New("otelsql: driver does not support named parameters") 456 } 457 args[n] = param.Value 458 } 459 return args, nil 460 }