sql.go (22081B)
1 // Package stdlib is the compatibility layer from pgx to database/sql. 2 // 3 // A database/sql connection can be established through sql.Open. 4 // 5 // db, err := sql.Open("pgx", "postgres://pgx_md5:secret@localhost:5432/pgx_test?sslmode=disable") 6 // if err != nil { 7 // return err 8 // } 9 // 10 // Or from a DSN string. 11 // 12 // db, err := sql.Open("pgx", "user=postgres password=secret host=localhost port=5432 database=pgx_test sslmode=disable") 13 // if err != nil { 14 // return err 15 // } 16 // 17 // Or a pgx.ConnConfig can be used to set configuration not accessible via connection string. In this case the 18 // pgx.ConnConfig must first be registered with the driver. This registration returns a connection string which is used 19 // with sql.Open. 20 // 21 // connConfig, _ := pgx.ParseConfig(os.Getenv("DATABASE_URL")) 22 // connConfig.Logger = myLogger 23 // connStr := stdlib.RegisterConnConfig(connConfig) 24 // db, _ := sql.Open("pgx", connStr) 25 // 26 // pgx uses standard PostgreSQL positional parameters in queries. e.g. $1, $2. It does not support named parameters. 27 // 28 // db.QueryRow("select * from users where id=$1", userID) 29 // 30 // (*sql.Conn) Raw() can be used to get a *pgx.Conn from the standard database/sql.DB connection pool. This allows 31 // operations that use pgx specific functionality. 32 // 33 // // Given db is a *sql.DB 34 // conn, err := db.Conn(context.Background()) 35 // if err != nil { 36 // // handle error from acquiring connection from DB pool 37 // } 38 // 39 // err = conn.Raw(func(driverConn any) error { 40 // conn := driverConn.(*stdlib.Conn).Conn() // conn is a *pgx.Conn 41 // // Do pgx specific stuff with conn 42 // conn.CopyFrom(...) 43 // return nil 44 // }) 45 // if err != nil { 46 // // handle error that occurred while using *pgx.Conn 47 // } 48 // 49 // # PostgreSQL Specific Data Types 50 // 51 // The pgtype package provides support for PostgreSQL specific types. *pgtype.Map.SQLScanner is an adapter that makes 52 // these types usable as a sql.Scanner. 53 // 54 // m := pgtype.NewMap() 55 // var a []int64 56 // err := db.QueryRow("select '{1,2,3}'::bigint[]").Scan(m.SQLScanner(&a)) 57 package stdlib 58 59 import ( 60 "context" 61 "database/sql" 62 "database/sql/driver" 63 "errors" 64 "fmt" 65 "io" 66 "math" 67 "math/rand" 68 "reflect" 69 "strconv" 70 "strings" 71 "sync" 72 "time" 73 74 "github.com/jackc/pgx/v5" 75 "github.com/jackc/pgx/v5/pgconn" 76 "github.com/jackc/pgx/v5/pgtype" 77 ) 78 79 // Only intrinsic types should be binary format with database/sql. 80 var databaseSQLResultFormats pgx.QueryResultFormatsByOID 81 82 var pgxDriver *Driver 83 84 func init() { 85 pgxDriver = &Driver{ 86 configs: make(map[string]*pgx.ConnConfig), 87 } 88 89 // if pgx driver was already registered by different pgx major version then we 90 // skip registration under the default name. 91 if !contains(sql.Drivers(), "pgx") { 92 sql.Register("pgx", pgxDriver) 93 } 94 sql.Register("pgx/v5", pgxDriver) 95 96 databaseSQLResultFormats = pgx.QueryResultFormatsByOID{ 97 pgtype.BoolOID: 1, 98 pgtype.ByteaOID: 1, 99 pgtype.CIDOID: 1, 100 pgtype.DateOID: 1, 101 pgtype.Float4OID: 1, 102 pgtype.Float8OID: 1, 103 pgtype.Int2OID: 1, 104 pgtype.Int4OID: 1, 105 pgtype.Int8OID: 1, 106 pgtype.OIDOID: 1, 107 pgtype.TimestampOID: 1, 108 pgtype.TimestamptzOID: 1, 109 pgtype.XIDOID: 1, 110 } 111 } 112 113 // TODO replace by slices.Contains when experimental package will be merged to stdlib 114 // https://pkg.go.dev/golang.org/x/exp/slices#Contains 115 func contains(list []string, y string) bool { 116 for _, x := range list { 117 if x == y { 118 return true 119 } 120 } 121 return false 122 } 123 124 // OptionOpenDB options for configuring the driver when opening a new db pool. 125 type OptionOpenDB func(*connector) 126 127 // OptionBeforeConnect provides a callback for before connect. It is passed a shallow copy of the ConnConfig that will 128 // be used to connect, so only its immediate members should be modified. 129 func OptionBeforeConnect(bc func(context.Context, *pgx.ConnConfig) error) OptionOpenDB { 130 return func(dc *connector) { 131 dc.BeforeConnect = bc 132 } 133 } 134 135 // OptionAfterConnect provides a callback for after connect. 136 func OptionAfterConnect(ac func(context.Context, *pgx.Conn) error) OptionOpenDB { 137 return func(dc *connector) { 138 dc.AfterConnect = ac 139 } 140 } 141 142 // OptionResetSession provides a callback that can be used to add custom logic prior to executing a query on the 143 // connection if the connection has been used before. 144 // If ResetSessionFunc returns ErrBadConn error the connection will be discarded. 145 func OptionResetSession(rs func(context.Context, *pgx.Conn) error) OptionOpenDB { 146 return func(dc *connector) { 147 dc.ResetSession = rs 148 } 149 } 150 151 // RandomizeHostOrderFunc is a BeforeConnect hook that randomizes the host order in the provided connConfig, so that a 152 // new host becomes primary each time. This is useful to distribute connections for multi-master databases like 153 // CockroachDB. If you use this you likely should set https://golang.org/pkg/database/sql/#DB.SetConnMaxLifetime as well 154 // to ensure that connections are periodically rebalanced across your nodes. 155 func RandomizeHostOrderFunc(ctx context.Context, connConfig *pgx.ConnConfig) error { 156 if len(connConfig.Fallbacks) == 0 { 157 return nil 158 } 159 160 newFallbacks := append([]*pgconn.FallbackConfig{{ 161 Host: connConfig.Host, 162 Port: connConfig.Port, 163 TLSConfig: connConfig.TLSConfig, 164 }}, connConfig.Fallbacks...) 165 166 rand.Shuffle(len(newFallbacks), func(i, j int) { 167 newFallbacks[i], newFallbacks[j] = newFallbacks[j], newFallbacks[i] 168 }) 169 170 // Use the one that sorted last as the primary and keep the rest as the fallbacks 171 newPrimary := newFallbacks[len(newFallbacks)-1] 172 connConfig.Host = newPrimary.Host 173 connConfig.Port = newPrimary.Port 174 connConfig.TLSConfig = newPrimary.TLSConfig 175 connConfig.Fallbacks = newFallbacks[:len(newFallbacks)-1] 176 return nil 177 } 178 179 func GetConnector(config pgx.ConnConfig, opts ...OptionOpenDB) driver.Connector { 180 c := connector{ 181 ConnConfig: config, 182 BeforeConnect: func(context.Context, *pgx.ConnConfig) error { return nil }, // noop before connect by default 183 AfterConnect: func(context.Context, *pgx.Conn) error { return nil }, // noop after connect by default 184 ResetSession: func(context.Context, *pgx.Conn) error { return nil }, // noop reset session by default 185 driver: pgxDriver, 186 } 187 188 for _, opt := range opts { 189 opt(&c) 190 } 191 return c 192 } 193 194 func OpenDB(config pgx.ConnConfig, opts ...OptionOpenDB) *sql.DB { 195 c := GetConnector(config, opts...) 196 return sql.OpenDB(c) 197 } 198 199 type connector struct { 200 pgx.ConnConfig 201 BeforeConnect func(context.Context, *pgx.ConnConfig) error // function to call before creation of every new connection 202 AfterConnect func(context.Context, *pgx.Conn) error // function to call after creation of every new connection 203 ResetSession func(context.Context, *pgx.Conn) error // function is called before a connection is reused 204 driver *Driver 205 } 206 207 // Connect implement driver.Connector interface 208 func (c connector) Connect(ctx context.Context) (driver.Conn, error) { 209 var ( 210 err error 211 conn *pgx.Conn 212 ) 213 214 // Create a shallow copy of the config, so that BeforeConnect can safely modify it 215 connConfig := c.ConnConfig 216 if err = c.BeforeConnect(ctx, &connConfig); err != nil { 217 return nil, err 218 } 219 220 if conn, err = pgx.ConnectConfig(ctx, &connConfig); err != nil { 221 return nil, err 222 } 223 224 if err = c.AfterConnect(ctx, conn); err != nil { 225 return nil, err 226 } 227 228 return &Conn{conn: conn, driver: c.driver, connConfig: connConfig, resetSessionFunc: c.ResetSession}, nil 229 } 230 231 // Driver implement driver.Connector interface 232 func (c connector) Driver() driver.Driver { 233 return c.driver 234 } 235 236 // GetDefaultDriver returns the driver initialized in the init function 237 // and used when the pgx driver is registered. 238 func GetDefaultDriver() driver.Driver { 239 return pgxDriver 240 } 241 242 type Driver struct { 243 configMutex sync.Mutex 244 configs map[string]*pgx.ConnConfig 245 sequence int 246 } 247 248 func (d *Driver) Open(name string) (driver.Conn, error) { 249 ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) // Ensure eventual timeout 250 defer cancel() 251 252 connector, err := d.OpenConnector(name) 253 if err != nil { 254 return nil, err 255 } 256 return connector.Connect(ctx) 257 } 258 259 func (d *Driver) OpenConnector(name string) (driver.Connector, error) { 260 return &driverConnector{driver: d, name: name}, nil 261 } 262 263 func (d *Driver) registerConnConfig(c *pgx.ConnConfig) string { 264 d.configMutex.Lock() 265 connStr := fmt.Sprintf("registeredConnConfig%d", d.sequence) 266 d.sequence++ 267 d.configs[connStr] = c 268 d.configMutex.Unlock() 269 return connStr 270 } 271 272 func (d *Driver) unregisterConnConfig(connStr string) { 273 d.configMutex.Lock() 274 delete(d.configs, connStr) 275 d.configMutex.Unlock() 276 } 277 278 type driverConnector struct { 279 driver *Driver 280 name string 281 } 282 283 func (dc *driverConnector) Connect(ctx context.Context) (driver.Conn, error) { 284 var connConfig *pgx.ConnConfig 285 286 dc.driver.configMutex.Lock() 287 connConfig = dc.driver.configs[dc.name] 288 dc.driver.configMutex.Unlock() 289 290 if connConfig == nil { 291 var err error 292 connConfig, err = pgx.ParseConfig(dc.name) 293 if err != nil { 294 return nil, err 295 } 296 } 297 298 conn, err := pgx.ConnectConfig(ctx, connConfig) 299 if err != nil { 300 return nil, err 301 } 302 303 c := &Conn{ 304 conn: conn, 305 driver: dc.driver, 306 connConfig: *connConfig, 307 resetSessionFunc: func(context.Context, *pgx.Conn) error { return nil }, 308 } 309 310 return c, nil 311 } 312 313 func (dc *driverConnector) Driver() driver.Driver { 314 return dc.driver 315 } 316 317 // RegisterConnConfig registers a ConnConfig and returns the connection string to use with Open. 318 func RegisterConnConfig(c *pgx.ConnConfig) string { 319 return pgxDriver.registerConnConfig(c) 320 } 321 322 // UnregisterConnConfig removes the ConnConfig registration for connStr. 323 func UnregisterConnConfig(connStr string) { 324 pgxDriver.unregisterConnConfig(connStr) 325 } 326 327 type Conn struct { 328 conn *pgx.Conn 329 psCount int64 // Counter used for creating unique prepared statement names 330 driver *Driver 331 connConfig pgx.ConnConfig 332 resetSessionFunc func(context.Context, *pgx.Conn) error // Function is called before a connection is reused 333 lastResetSessionTime time.Time 334 } 335 336 // Conn returns the underlying *pgx.Conn 337 func (c *Conn) Conn() *pgx.Conn { 338 return c.conn 339 } 340 341 func (c *Conn) Prepare(query string) (driver.Stmt, error) { 342 return c.PrepareContext(context.Background(), query) 343 } 344 345 func (c *Conn) PrepareContext(ctx context.Context, query string) (driver.Stmt, error) { 346 if c.conn.IsClosed() { 347 return nil, driver.ErrBadConn 348 } 349 350 name := fmt.Sprintf("pgx_%d", c.psCount) 351 c.psCount++ 352 353 sd, err := c.conn.Prepare(ctx, name, query) 354 if err != nil { 355 return nil, err 356 } 357 358 return &Stmt{sd: sd, conn: c}, nil 359 } 360 361 func (c *Conn) Close() error { 362 ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) 363 defer cancel() 364 return c.conn.Close(ctx) 365 } 366 367 func (c *Conn) Begin() (driver.Tx, error) { 368 return c.BeginTx(context.Background(), driver.TxOptions{}) 369 } 370 371 func (c *Conn) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx, error) { 372 if c.conn.IsClosed() { 373 return nil, driver.ErrBadConn 374 } 375 376 var pgxOpts pgx.TxOptions 377 switch sql.IsolationLevel(opts.Isolation) { 378 case sql.LevelDefault: 379 case sql.LevelReadUncommitted: 380 pgxOpts.IsoLevel = pgx.ReadUncommitted 381 case sql.LevelReadCommitted: 382 pgxOpts.IsoLevel = pgx.ReadCommitted 383 case sql.LevelRepeatableRead, sql.LevelSnapshot: 384 pgxOpts.IsoLevel = pgx.RepeatableRead 385 case sql.LevelSerializable: 386 pgxOpts.IsoLevel = pgx.Serializable 387 default: 388 return nil, fmt.Errorf("unsupported isolation: %v", opts.Isolation) 389 } 390 391 if opts.ReadOnly { 392 pgxOpts.AccessMode = pgx.ReadOnly 393 } 394 395 tx, err := c.conn.BeginTx(ctx, pgxOpts) 396 if err != nil { 397 return nil, err 398 } 399 400 return wrapTx{ctx: ctx, tx: tx}, nil 401 } 402 403 func (c *Conn) ExecContext(ctx context.Context, query string, argsV []driver.NamedValue) (driver.Result, error) { 404 if c.conn.IsClosed() { 405 return nil, driver.ErrBadConn 406 } 407 408 args := namedValueToInterface(argsV) 409 410 commandTag, err := c.conn.Exec(ctx, query, args...) 411 // if we got a network error before we had a chance to send the query, retry 412 if err != nil { 413 if pgconn.SafeToRetry(err) { 414 return nil, driver.ErrBadConn 415 } 416 } 417 return driver.RowsAffected(commandTag.RowsAffected()), err 418 } 419 420 func (c *Conn) QueryContext(ctx context.Context, query string, argsV []driver.NamedValue) (driver.Rows, error) { 421 if c.conn.IsClosed() { 422 return nil, driver.ErrBadConn 423 } 424 425 args := []any{databaseSQLResultFormats} 426 args = append(args, namedValueToInterface(argsV)...) 427 428 rows, err := c.conn.Query(ctx, query, args...) 429 if err != nil { 430 if pgconn.SafeToRetry(err) { 431 return nil, driver.ErrBadConn 432 } 433 return nil, err 434 } 435 436 // Preload first row because otherwise we won't know what columns are available when database/sql asks. 437 more := rows.Next() 438 if err = rows.Err(); err != nil { 439 rows.Close() 440 return nil, err 441 } 442 return &Rows{conn: c, rows: rows, skipNext: true, skipNextMore: more}, nil 443 } 444 445 func (c *Conn) Ping(ctx context.Context) error { 446 if c.conn.IsClosed() { 447 return driver.ErrBadConn 448 } 449 450 err := c.conn.Ping(ctx) 451 if err != nil { 452 // A Ping failure implies some sort of fatal state. The connection is almost certainly already closed by the 453 // failure, but manually close it just to be sure. 454 c.Close() 455 return driver.ErrBadConn 456 } 457 458 return nil 459 } 460 461 func (c *Conn) CheckNamedValue(*driver.NamedValue) error { 462 // Underlying pgx supports sql.Scanner and driver.Valuer interfaces natively. So everything can be passed through directly. 463 return nil 464 } 465 466 func (c *Conn) ResetSession(ctx context.Context) error { 467 if c.conn.IsClosed() { 468 return driver.ErrBadConn 469 } 470 471 now := time.Now() 472 if now.Sub(c.lastResetSessionTime) > time.Second { 473 if err := c.conn.PgConn().CheckConn(); err != nil { 474 return driver.ErrBadConn 475 } 476 } 477 c.lastResetSessionTime = now 478 479 return c.resetSessionFunc(ctx, c.conn) 480 } 481 482 type Stmt struct { 483 sd *pgconn.StatementDescription 484 conn *Conn 485 } 486 487 func (s *Stmt) Close() error { 488 ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) 489 defer cancel() 490 return s.conn.conn.Deallocate(ctx, s.sd.Name) 491 } 492 493 func (s *Stmt) NumInput() int { 494 return len(s.sd.ParamOIDs) 495 } 496 497 func (s *Stmt) Exec(argsV []driver.Value) (driver.Result, error) { 498 return nil, errors.New("Stmt.Exec deprecated and not implemented") 499 } 500 501 func (s *Stmt) ExecContext(ctx context.Context, argsV []driver.NamedValue) (driver.Result, error) { 502 return s.conn.ExecContext(ctx, s.sd.Name, argsV) 503 } 504 505 func (s *Stmt) Query(argsV []driver.Value) (driver.Rows, error) { 506 return nil, errors.New("Stmt.Query deprecated and not implemented") 507 } 508 509 func (s *Stmt) QueryContext(ctx context.Context, argsV []driver.NamedValue) (driver.Rows, error) { 510 return s.conn.QueryContext(ctx, s.sd.Name, argsV) 511 } 512 513 type rowValueFunc func(src []byte) (driver.Value, error) 514 515 type Rows struct { 516 conn *Conn 517 rows pgx.Rows 518 valueFuncs []rowValueFunc 519 skipNext bool 520 skipNextMore bool 521 522 columnNames []string 523 } 524 525 func (r *Rows) Columns() []string { 526 if r.columnNames == nil { 527 fields := r.rows.FieldDescriptions() 528 r.columnNames = make([]string, len(fields)) 529 for i, fd := range fields { 530 r.columnNames[i] = string(fd.Name) 531 } 532 } 533 534 return r.columnNames 535 } 536 537 // ColumnTypeDatabaseTypeName returns the database system type name. If the name is unknown the OID is returned. 538 func (r *Rows) ColumnTypeDatabaseTypeName(index int) string { 539 if dt, ok := r.conn.conn.TypeMap().TypeForOID(r.rows.FieldDescriptions()[index].DataTypeOID); ok { 540 return strings.ToUpper(dt.Name) 541 } 542 543 return strconv.FormatInt(int64(r.rows.FieldDescriptions()[index].DataTypeOID), 10) 544 } 545 546 const varHeaderSize = 4 547 548 // ColumnTypeLength returns the length of the column type if the column is a 549 // variable length type. If the column is not a variable length type ok 550 // should return false. 551 func (r *Rows) ColumnTypeLength(index int) (int64, bool) { 552 fd := r.rows.FieldDescriptions()[index] 553 554 switch fd.DataTypeOID { 555 case pgtype.TextOID, pgtype.ByteaOID: 556 return math.MaxInt64, true 557 case pgtype.VarcharOID, pgtype.BPCharArrayOID: 558 return int64(fd.TypeModifier - varHeaderSize), true 559 default: 560 return 0, false 561 } 562 } 563 564 // ColumnTypePrecisionScale should return the precision and scale for decimal 565 // types. If not applicable, ok should be false. 566 func (r *Rows) ColumnTypePrecisionScale(index int) (precision, scale int64, ok bool) { 567 fd := r.rows.FieldDescriptions()[index] 568 569 switch fd.DataTypeOID { 570 case pgtype.NumericOID: 571 mod := fd.TypeModifier - varHeaderSize 572 precision = int64((mod >> 16) & 0xffff) 573 scale = int64(mod & 0xffff) 574 return precision, scale, true 575 default: 576 return 0, 0, false 577 } 578 } 579 580 // ColumnTypeScanType returns the value type that can be used to scan types into. 581 func (r *Rows) ColumnTypeScanType(index int) reflect.Type { 582 fd := r.rows.FieldDescriptions()[index] 583 584 switch fd.DataTypeOID { 585 case pgtype.Float8OID: 586 return reflect.TypeOf(float64(0)) 587 case pgtype.Float4OID: 588 return reflect.TypeOf(float32(0)) 589 case pgtype.Int8OID: 590 return reflect.TypeOf(int64(0)) 591 case pgtype.Int4OID: 592 return reflect.TypeOf(int32(0)) 593 case pgtype.Int2OID: 594 return reflect.TypeOf(int16(0)) 595 case pgtype.BoolOID: 596 return reflect.TypeOf(false) 597 case pgtype.NumericOID: 598 return reflect.TypeOf(float64(0)) 599 case pgtype.DateOID, pgtype.TimestampOID, pgtype.TimestamptzOID: 600 return reflect.TypeOf(time.Time{}) 601 case pgtype.ByteaOID: 602 return reflect.TypeOf([]byte(nil)) 603 default: 604 return reflect.TypeOf("") 605 } 606 } 607 608 func (r *Rows) Close() error { 609 r.rows.Close() 610 return r.rows.Err() 611 } 612 613 func (r *Rows) Next(dest []driver.Value) error { 614 m := r.conn.conn.TypeMap() 615 fieldDescriptions := r.rows.FieldDescriptions() 616 617 if r.valueFuncs == nil { 618 r.valueFuncs = make([]rowValueFunc, len(fieldDescriptions)) 619 620 for i, fd := range fieldDescriptions { 621 dataTypeOID := fd.DataTypeOID 622 format := fd.Format 623 624 switch fd.DataTypeOID { 625 case pgtype.BoolOID: 626 var d bool 627 scanPlan := m.PlanScan(dataTypeOID, format, &d) 628 r.valueFuncs[i] = func(src []byte) (driver.Value, error) { 629 err := scanPlan.Scan(src, &d) 630 return d, err 631 } 632 case pgtype.ByteaOID: 633 var d []byte 634 scanPlan := m.PlanScan(dataTypeOID, format, &d) 635 r.valueFuncs[i] = func(src []byte) (driver.Value, error) { 636 err := scanPlan.Scan(src, &d) 637 return d, err 638 } 639 case pgtype.CIDOID, pgtype.OIDOID, pgtype.XIDOID: 640 var d pgtype.Uint32 641 scanPlan := m.PlanScan(dataTypeOID, format, &d) 642 r.valueFuncs[i] = func(src []byte) (driver.Value, error) { 643 err := scanPlan.Scan(src, &d) 644 if err != nil { 645 return nil, err 646 } 647 return d.Value() 648 } 649 case pgtype.DateOID: 650 var d pgtype.Date 651 scanPlan := m.PlanScan(dataTypeOID, format, &d) 652 r.valueFuncs[i] = func(src []byte) (driver.Value, error) { 653 err := scanPlan.Scan(src, &d) 654 if err != nil { 655 return nil, err 656 } 657 return d.Value() 658 } 659 case pgtype.Float4OID: 660 var d float32 661 scanPlan := m.PlanScan(dataTypeOID, format, &d) 662 r.valueFuncs[i] = func(src []byte) (driver.Value, error) { 663 err := scanPlan.Scan(src, &d) 664 return float64(d), err 665 } 666 case pgtype.Float8OID: 667 var d float64 668 scanPlan := m.PlanScan(dataTypeOID, format, &d) 669 r.valueFuncs[i] = func(src []byte) (driver.Value, error) { 670 err := scanPlan.Scan(src, &d) 671 return d, err 672 } 673 case pgtype.Int2OID: 674 var d int16 675 scanPlan := m.PlanScan(dataTypeOID, format, &d) 676 r.valueFuncs[i] = func(src []byte) (driver.Value, error) { 677 err := scanPlan.Scan(src, &d) 678 return int64(d), err 679 } 680 case pgtype.Int4OID: 681 var d int32 682 scanPlan := m.PlanScan(dataTypeOID, format, &d) 683 r.valueFuncs[i] = func(src []byte) (driver.Value, error) { 684 err := scanPlan.Scan(src, &d) 685 return int64(d), err 686 } 687 case pgtype.Int8OID: 688 var d int64 689 scanPlan := m.PlanScan(dataTypeOID, format, &d) 690 r.valueFuncs[i] = func(src []byte) (driver.Value, error) { 691 err := scanPlan.Scan(src, &d) 692 return d, err 693 } 694 case pgtype.JSONOID, pgtype.JSONBOID: 695 var d []byte 696 scanPlan := m.PlanScan(dataTypeOID, format, &d) 697 r.valueFuncs[i] = func(src []byte) (driver.Value, error) { 698 err := scanPlan.Scan(src, &d) 699 if err != nil { 700 return nil, err 701 } 702 return d, nil 703 } 704 case pgtype.TimestampOID: 705 var d pgtype.Timestamp 706 scanPlan := m.PlanScan(dataTypeOID, format, &d) 707 r.valueFuncs[i] = func(src []byte) (driver.Value, error) { 708 err := scanPlan.Scan(src, &d) 709 if err != nil { 710 return nil, err 711 } 712 return d.Value() 713 } 714 case pgtype.TimestamptzOID: 715 var d pgtype.Timestamptz 716 scanPlan := m.PlanScan(dataTypeOID, format, &d) 717 r.valueFuncs[i] = func(src []byte) (driver.Value, error) { 718 err := scanPlan.Scan(src, &d) 719 if err != nil { 720 return nil, err 721 } 722 return d.Value() 723 } 724 default: 725 var d string 726 scanPlan := m.PlanScan(dataTypeOID, format, &d) 727 r.valueFuncs[i] = func(src []byte) (driver.Value, error) { 728 err := scanPlan.Scan(src, &d) 729 return d, err 730 } 731 } 732 } 733 } 734 735 var more bool 736 if r.skipNext { 737 more = r.skipNextMore 738 r.skipNext = false 739 } else { 740 more = r.rows.Next() 741 } 742 743 if !more { 744 if r.rows.Err() == nil { 745 return io.EOF 746 } else { 747 return r.rows.Err() 748 } 749 } 750 751 for i, rv := range r.rows.RawValues() { 752 if rv != nil { 753 var err error 754 dest[i], err = r.valueFuncs[i](rv) 755 if err != nil { 756 return fmt.Errorf("convert field %d failed: %v", i, err) 757 } 758 } else { 759 dest[i] = nil 760 } 761 } 762 763 return nil 764 } 765 766 func valueToInterface(argsV []driver.Value) []any { 767 args := make([]any, 0, len(argsV)) 768 for _, v := range argsV { 769 if v != nil { 770 args = append(args, v.(any)) 771 } else { 772 args = append(args, nil) 773 } 774 } 775 return args 776 } 777 778 func namedValueToInterface(argsV []driver.NamedValue) []any { 779 args := make([]any, 0, len(argsV)) 780 for _, v := range argsV { 781 if v.Value != nil { 782 args = append(args, v.Value.(any)) 783 } else { 784 args = append(args, nil) 785 } 786 } 787 return args 788 } 789 790 type wrapTx struct { 791 ctx context.Context 792 tx pgx.Tx 793 } 794 795 func (wtx wrapTx) Commit() error { return wtx.tx.Commit(wtx.ctx) } 796 797 func (wtx wrapTx) Rollback() error { return wtx.tx.Rollback(wtx.ctx) }