gtsocial-umbx

Unnamed repository; edit this file 'description' to name the repository.
Log | Files | Refs | README | LICENSE

sql.go (22081B)


      1 // Package stdlib is the compatibility layer from pgx to database/sql.
      2 //
      3 // A database/sql connection can be established through sql.Open.
      4 //
      5 //	db, err := sql.Open("pgx", "postgres://pgx_md5:secret@localhost:5432/pgx_test?sslmode=disable")
      6 //	if err != nil {
      7 //	  return err
      8 //	}
      9 //
     10 // Or from a DSN string.
     11 //
     12 //	db, err := sql.Open("pgx", "user=postgres password=secret host=localhost port=5432 database=pgx_test sslmode=disable")
     13 //	if err != nil {
     14 //	  return err
     15 //	}
     16 //
     17 // Or a pgx.ConnConfig can be used to set configuration not accessible via connection string. In this case the
     18 // pgx.ConnConfig must first be registered with the driver. This registration returns a connection string which is used
     19 // with sql.Open.
     20 //
     21 //	connConfig, _ := pgx.ParseConfig(os.Getenv("DATABASE_URL"))
     22 //	connConfig.Logger = myLogger
     23 //	connStr := stdlib.RegisterConnConfig(connConfig)
     24 //	db, _ := sql.Open("pgx", connStr)
     25 //
     26 // pgx uses standard PostgreSQL positional parameters in queries. e.g. $1, $2. It does not support named parameters.
     27 //
     28 //	db.QueryRow("select * from users where id=$1", userID)
     29 //
     30 // (*sql.Conn) Raw() can be used to get a *pgx.Conn from the standard database/sql.DB connection pool. This allows
     31 // operations that use pgx specific functionality.
     32 //
     33 //	// Given db is a *sql.DB
     34 //	conn, err := db.Conn(context.Background())
     35 //	if err != nil {
     36 //	  // handle error from acquiring connection from DB pool
     37 //	}
     38 //
     39 //	err = conn.Raw(func(driverConn any) error {
     40 //	  conn := driverConn.(*stdlib.Conn).Conn() // conn is a *pgx.Conn
     41 //	  // Do pgx specific stuff with conn
     42 //	  conn.CopyFrom(...)
     43 //	  return nil
     44 //	})
     45 //	if err != nil {
     46 //	  // handle error that occurred while using *pgx.Conn
     47 //	}
     48 //
     49 // # PostgreSQL Specific Data Types
     50 //
     51 // The pgtype package provides support for PostgreSQL specific types. *pgtype.Map.SQLScanner is an adapter that makes
     52 // these types usable as a sql.Scanner.
     53 //
     54 //	m := pgtype.NewMap()
     55 //	var a []int64
     56 //	err := db.QueryRow("select '{1,2,3}'::bigint[]").Scan(m.SQLScanner(&a))
     57 package stdlib
     58 
     59 import (
     60 	"context"
     61 	"database/sql"
     62 	"database/sql/driver"
     63 	"errors"
     64 	"fmt"
     65 	"io"
     66 	"math"
     67 	"math/rand"
     68 	"reflect"
     69 	"strconv"
     70 	"strings"
     71 	"sync"
     72 	"time"
     73 
     74 	"github.com/jackc/pgx/v5"
     75 	"github.com/jackc/pgx/v5/pgconn"
     76 	"github.com/jackc/pgx/v5/pgtype"
     77 )
     78 
     79 // Only intrinsic types should be binary format with database/sql.
     80 var databaseSQLResultFormats pgx.QueryResultFormatsByOID
     81 
     82 var pgxDriver *Driver
     83 
     84 func init() {
     85 	pgxDriver = &Driver{
     86 		configs: make(map[string]*pgx.ConnConfig),
     87 	}
     88 
     89 	// if pgx driver was already registered by different pgx major version then we
     90 	// skip registration under the default name.
     91 	if !contains(sql.Drivers(), "pgx") {
     92 		sql.Register("pgx", pgxDriver)
     93 	}
     94 	sql.Register("pgx/v5", pgxDriver)
     95 
     96 	databaseSQLResultFormats = pgx.QueryResultFormatsByOID{
     97 		pgtype.BoolOID:        1,
     98 		pgtype.ByteaOID:       1,
     99 		pgtype.CIDOID:         1,
    100 		pgtype.DateOID:        1,
    101 		pgtype.Float4OID:      1,
    102 		pgtype.Float8OID:      1,
    103 		pgtype.Int2OID:        1,
    104 		pgtype.Int4OID:        1,
    105 		pgtype.Int8OID:        1,
    106 		pgtype.OIDOID:         1,
    107 		pgtype.TimestampOID:   1,
    108 		pgtype.TimestamptzOID: 1,
    109 		pgtype.XIDOID:         1,
    110 	}
    111 }
    112 
    113 // TODO replace by slices.Contains when experimental package will be merged to stdlib
    114 // https://pkg.go.dev/golang.org/x/exp/slices#Contains
    115 func contains(list []string, y string) bool {
    116 	for _, x := range list {
    117 		if x == y {
    118 			return true
    119 		}
    120 	}
    121 	return false
    122 }
    123 
    124 // OptionOpenDB options for configuring the driver when opening a new db pool.
    125 type OptionOpenDB func(*connector)
    126 
    127 // OptionBeforeConnect provides a callback for before connect. It is passed a shallow copy of the ConnConfig that will
    128 // be used to connect, so only its immediate members should be modified.
    129 func OptionBeforeConnect(bc func(context.Context, *pgx.ConnConfig) error) OptionOpenDB {
    130 	return func(dc *connector) {
    131 		dc.BeforeConnect = bc
    132 	}
    133 }
    134 
    135 // OptionAfterConnect provides a callback for after connect.
    136 func OptionAfterConnect(ac func(context.Context, *pgx.Conn) error) OptionOpenDB {
    137 	return func(dc *connector) {
    138 		dc.AfterConnect = ac
    139 	}
    140 }
    141 
    142 // OptionResetSession provides a callback that can be used to add custom logic prior to executing a query on the
    143 // connection if the connection has been used before.
    144 // If ResetSessionFunc returns ErrBadConn error the connection will be discarded.
    145 func OptionResetSession(rs func(context.Context, *pgx.Conn) error) OptionOpenDB {
    146 	return func(dc *connector) {
    147 		dc.ResetSession = rs
    148 	}
    149 }
    150 
    151 // RandomizeHostOrderFunc is a BeforeConnect hook that randomizes the host order in the provided connConfig, so that a
    152 // new host becomes primary each time. This is useful to distribute connections for multi-master databases like
    153 // CockroachDB. If you use this you likely should set https://golang.org/pkg/database/sql/#DB.SetConnMaxLifetime as well
    154 // to ensure that connections are periodically rebalanced across your nodes.
    155 func RandomizeHostOrderFunc(ctx context.Context, connConfig *pgx.ConnConfig) error {
    156 	if len(connConfig.Fallbacks) == 0 {
    157 		return nil
    158 	}
    159 
    160 	newFallbacks := append([]*pgconn.FallbackConfig{{
    161 		Host:      connConfig.Host,
    162 		Port:      connConfig.Port,
    163 		TLSConfig: connConfig.TLSConfig,
    164 	}}, connConfig.Fallbacks...)
    165 
    166 	rand.Shuffle(len(newFallbacks), func(i, j int) {
    167 		newFallbacks[i], newFallbacks[j] = newFallbacks[j], newFallbacks[i]
    168 	})
    169 
    170 	// Use the one that sorted last as the primary and keep the rest as the fallbacks
    171 	newPrimary := newFallbacks[len(newFallbacks)-1]
    172 	connConfig.Host = newPrimary.Host
    173 	connConfig.Port = newPrimary.Port
    174 	connConfig.TLSConfig = newPrimary.TLSConfig
    175 	connConfig.Fallbacks = newFallbacks[:len(newFallbacks)-1]
    176 	return nil
    177 }
    178 
    179 func GetConnector(config pgx.ConnConfig, opts ...OptionOpenDB) driver.Connector {
    180 	c := connector{
    181 		ConnConfig:    config,
    182 		BeforeConnect: func(context.Context, *pgx.ConnConfig) error { return nil }, // noop before connect by default
    183 		AfterConnect:  func(context.Context, *pgx.Conn) error { return nil },       // noop after connect by default
    184 		ResetSession:  func(context.Context, *pgx.Conn) error { return nil },       // noop reset session by default
    185 		driver:        pgxDriver,
    186 	}
    187 
    188 	for _, opt := range opts {
    189 		opt(&c)
    190 	}
    191 	return c
    192 }
    193 
    194 func OpenDB(config pgx.ConnConfig, opts ...OptionOpenDB) *sql.DB {
    195 	c := GetConnector(config, opts...)
    196 	return sql.OpenDB(c)
    197 }
    198 
    199 type connector struct {
    200 	pgx.ConnConfig
    201 	BeforeConnect func(context.Context, *pgx.ConnConfig) error // function to call before creation of every new connection
    202 	AfterConnect  func(context.Context, *pgx.Conn) error       // function to call after creation of every new connection
    203 	ResetSession  func(context.Context, *pgx.Conn) error       // function is called before a connection is reused
    204 	driver        *Driver
    205 }
    206 
    207 // Connect implement driver.Connector interface
    208 func (c connector) Connect(ctx context.Context) (driver.Conn, error) {
    209 	var (
    210 		err  error
    211 		conn *pgx.Conn
    212 	)
    213 
    214 	// Create a shallow copy of the config, so that BeforeConnect can safely modify it
    215 	connConfig := c.ConnConfig
    216 	if err = c.BeforeConnect(ctx, &connConfig); err != nil {
    217 		return nil, err
    218 	}
    219 
    220 	if conn, err = pgx.ConnectConfig(ctx, &connConfig); err != nil {
    221 		return nil, err
    222 	}
    223 
    224 	if err = c.AfterConnect(ctx, conn); err != nil {
    225 		return nil, err
    226 	}
    227 
    228 	return &Conn{conn: conn, driver: c.driver, connConfig: connConfig, resetSessionFunc: c.ResetSession}, nil
    229 }
    230 
    231 // Driver implement driver.Connector interface
    232 func (c connector) Driver() driver.Driver {
    233 	return c.driver
    234 }
    235 
    236 // GetDefaultDriver returns the driver initialized in the init function
    237 // and used when the pgx driver is registered.
    238 func GetDefaultDriver() driver.Driver {
    239 	return pgxDriver
    240 }
    241 
    242 type Driver struct {
    243 	configMutex sync.Mutex
    244 	configs     map[string]*pgx.ConnConfig
    245 	sequence    int
    246 }
    247 
    248 func (d *Driver) Open(name string) (driver.Conn, error) {
    249 	ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) // Ensure eventual timeout
    250 	defer cancel()
    251 
    252 	connector, err := d.OpenConnector(name)
    253 	if err != nil {
    254 		return nil, err
    255 	}
    256 	return connector.Connect(ctx)
    257 }
    258 
    259 func (d *Driver) OpenConnector(name string) (driver.Connector, error) {
    260 	return &driverConnector{driver: d, name: name}, nil
    261 }
    262 
    263 func (d *Driver) registerConnConfig(c *pgx.ConnConfig) string {
    264 	d.configMutex.Lock()
    265 	connStr := fmt.Sprintf("registeredConnConfig%d", d.sequence)
    266 	d.sequence++
    267 	d.configs[connStr] = c
    268 	d.configMutex.Unlock()
    269 	return connStr
    270 }
    271 
    272 func (d *Driver) unregisterConnConfig(connStr string) {
    273 	d.configMutex.Lock()
    274 	delete(d.configs, connStr)
    275 	d.configMutex.Unlock()
    276 }
    277 
    278 type driverConnector struct {
    279 	driver *Driver
    280 	name   string
    281 }
    282 
    283 func (dc *driverConnector) Connect(ctx context.Context) (driver.Conn, error) {
    284 	var connConfig *pgx.ConnConfig
    285 
    286 	dc.driver.configMutex.Lock()
    287 	connConfig = dc.driver.configs[dc.name]
    288 	dc.driver.configMutex.Unlock()
    289 
    290 	if connConfig == nil {
    291 		var err error
    292 		connConfig, err = pgx.ParseConfig(dc.name)
    293 		if err != nil {
    294 			return nil, err
    295 		}
    296 	}
    297 
    298 	conn, err := pgx.ConnectConfig(ctx, connConfig)
    299 	if err != nil {
    300 		return nil, err
    301 	}
    302 
    303 	c := &Conn{
    304 		conn:             conn,
    305 		driver:           dc.driver,
    306 		connConfig:       *connConfig,
    307 		resetSessionFunc: func(context.Context, *pgx.Conn) error { return nil },
    308 	}
    309 
    310 	return c, nil
    311 }
    312 
    313 func (dc *driverConnector) Driver() driver.Driver {
    314 	return dc.driver
    315 }
    316 
    317 // RegisterConnConfig registers a ConnConfig and returns the connection string to use with Open.
    318 func RegisterConnConfig(c *pgx.ConnConfig) string {
    319 	return pgxDriver.registerConnConfig(c)
    320 }
    321 
    322 // UnregisterConnConfig removes the ConnConfig registration for connStr.
    323 func UnregisterConnConfig(connStr string) {
    324 	pgxDriver.unregisterConnConfig(connStr)
    325 }
    326 
    327 type Conn struct {
    328 	conn                 *pgx.Conn
    329 	psCount              int64 // Counter used for creating unique prepared statement names
    330 	driver               *Driver
    331 	connConfig           pgx.ConnConfig
    332 	resetSessionFunc     func(context.Context, *pgx.Conn) error // Function is called before a connection is reused
    333 	lastResetSessionTime time.Time
    334 }
    335 
    336 // Conn returns the underlying *pgx.Conn
    337 func (c *Conn) Conn() *pgx.Conn {
    338 	return c.conn
    339 }
    340 
    341 func (c *Conn) Prepare(query string) (driver.Stmt, error) {
    342 	return c.PrepareContext(context.Background(), query)
    343 }
    344 
    345 func (c *Conn) PrepareContext(ctx context.Context, query string) (driver.Stmt, error) {
    346 	if c.conn.IsClosed() {
    347 		return nil, driver.ErrBadConn
    348 	}
    349 
    350 	name := fmt.Sprintf("pgx_%d", c.psCount)
    351 	c.psCount++
    352 
    353 	sd, err := c.conn.Prepare(ctx, name, query)
    354 	if err != nil {
    355 		return nil, err
    356 	}
    357 
    358 	return &Stmt{sd: sd, conn: c}, nil
    359 }
    360 
    361 func (c *Conn) Close() error {
    362 	ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
    363 	defer cancel()
    364 	return c.conn.Close(ctx)
    365 }
    366 
    367 func (c *Conn) Begin() (driver.Tx, error) {
    368 	return c.BeginTx(context.Background(), driver.TxOptions{})
    369 }
    370 
    371 func (c *Conn) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx, error) {
    372 	if c.conn.IsClosed() {
    373 		return nil, driver.ErrBadConn
    374 	}
    375 
    376 	var pgxOpts pgx.TxOptions
    377 	switch sql.IsolationLevel(opts.Isolation) {
    378 	case sql.LevelDefault:
    379 	case sql.LevelReadUncommitted:
    380 		pgxOpts.IsoLevel = pgx.ReadUncommitted
    381 	case sql.LevelReadCommitted:
    382 		pgxOpts.IsoLevel = pgx.ReadCommitted
    383 	case sql.LevelRepeatableRead, sql.LevelSnapshot:
    384 		pgxOpts.IsoLevel = pgx.RepeatableRead
    385 	case sql.LevelSerializable:
    386 		pgxOpts.IsoLevel = pgx.Serializable
    387 	default:
    388 		return nil, fmt.Errorf("unsupported isolation: %v", opts.Isolation)
    389 	}
    390 
    391 	if opts.ReadOnly {
    392 		pgxOpts.AccessMode = pgx.ReadOnly
    393 	}
    394 
    395 	tx, err := c.conn.BeginTx(ctx, pgxOpts)
    396 	if err != nil {
    397 		return nil, err
    398 	}
    399 
    400 	return wrapTx{ctx: ctx, tx: tx}, nil
    401 }
    402 
    403 func (c *Conn) ExecContext(ctx context.Context, query string, argsV []driver.NamedValue) (driver.Result, error) {
    404 	if c.conn.IsClosed() {
    405 		return nil, driver.ErrBadConn
    406 	}
    407 
    408 	args := namedValueToInterface(argsV)
    409 
    410 	commandTag, err := c.conn.Exec(ctx, query, args...)
    411 	// if we got a network error before we had a chance to send the query, retry
    412 	if err != nil {
    413 		if pgconn.SafeToRetry(err) {
    414 			return nil, driver.ErrBadConn
    415 		}
    416 	}
    417 	return driver.RowsAffected(commandTag.RowsAffected()), err
    418 }
    419 
    420 func (c *Conn) QueryContext(ctx context.Context, query string, argsV []driver.NamedValue) (driver.Rows, error) {
    421 	if c.conn.IsClosed() {
    422 		return nil, driver.ErrBadConn
    423 	}
    424 
    425 	args := []any{databaseSQLResultFormats}
    426 	args = append(args, namedValueToInterface(argsV)...)
    427 
    428 	rows, err := c.conn.Query(ctx, query, args...)
    429 	if err != nil {
    430 		if pgconn.SafeToRetry(err) {
    431 			return nil, driver.ErrBadConn
    432 		}
    433 		return nil, err
    434 	}
    435 
    436 	// Preload first row because otherwise we won't know what columns are available when database/sql asks.
    437 	more := rows.Next()
    438 	if err = rows.Err(); err != nil {
    439 		rows.Close()
    440 		return nil, err
    441 	}
    442 	return &Rows{conn: c, rows: rows, skipNext: true, skipNextMore: more}, nil
    443 }
    444 
    445 func (c *Conn) Ping(ctx context.Context) error {
    446 	if c.conn.IsClosed() {
    447 		return driver.ErrBadConn
    448 	}
    449 
    450 	err := c.conn.Ping(ctx)
    451 	if err != nil {
    452 		// A Ping failure implies some sort of fatal state. The connection is almost certainly already closed by the
    453 		// failure, but manually close it just to be sure.
    454 		c.Close()
    455 		return driver.ErrBadConn
    456 	}
    457 
    458 	return nil
    459 }
    460 
    461 func (c *Conn) CheckNamedValue(*driver.NamedValue) error {
    462 	// Underlying pgx supports sql.Scanner and driver.Valuer interfaces natively. So everything can be passed through directly.
    463 	return nil
    464 }
    465 
    466 func (c *Conn) ResetSession(ctx context.Context) error {
    467 	if c.conn.IsClosed() {
    468 		return driver.ErrBadConn
    469 	}
    470 
    471 	now := time.Now()
    472 	if now.Sub(c.lastResetSessionTime) > time.Second {
    473 		if err := c.conn.PgConn().CheckConn(); err != nil {
    474 			return driver.ErrBadConn
    475 		}
    476 	}
    477 	c.lastResetSessionTime = now
    478 
    479 	return c.resetSessionFunc(ctx, c.conn)
    480 }
    481 
    482 type Stmt struct {
    483 	sd   *pgconn.StatementDescription
    484 	conn *Conn
    485 }
    486 
    487 func (s *Stmt) Close() error {
    488 	ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
    489 	defer cancel()
    490 	return s.conn.conn.Deallocate(ctx, s.sd.Name)
    491 }
    492 
    493 func (s *Stmt) NumInput() int {
    494 	return len(s.sd.ParamOIDs)
    495 }
    496 
    497 func (s *Stmt) Exec(argsV []driver.Value) (driver.Result, error) {
    498 	return nil, errors.New("Stmt.Exec deprecated and not implemented")
    499 }
    500 
    501 func (s *Stmt) ExecContext(ctx context.Context, argsV []driver.NamedValue) (driver.Result, error) {
    502 	return s.conn.ExecContext(ctx, s.sd.Name, argsV)
    503 }
    504 
    505 func (s *Stmt) Query(argsV []driver.Value) (driver.Rows, error) {
    506 	return nil, errors.New("Stmt.Query deprecated and not implemented")
    507 }
    508 
    509 func (s *Stmt) QueryContext(ctx context.Context, argsV []driver.NamedValue) (driver.Rows, error) {
    510 	return s.conn.QueryContext(ctx, s.sd.Name, argsV)
    511 }
    512 
    513 type rowValueFunc func(src []byte) (driver.Value, error)
    514 
    515 type Rows struct {
    516 	conn         *Conn
    517 	rows         pgx.Rows
    518 	valueFuncs   []rowValueFunc
    519 	skipNext     bool
    520 	skipNextMore bool
    521 
    522 	columnNames []string
    523 }
    524 
    525 func (r *Rows) Columns() []string {
    526 	if r.columnNames == nil {
    527 		fields := r.rows.FieldDescriptions()
    528 		r.columnNames = make([]string, len(fields))
    529 		for i, fd := range fields {
    530 			r.columnNames[i] = string(fd.Name)
    531 		}
    532 	}
    533 
    534 	return r.columnNames
    535 }
    536 
    537 // ColumnTypeDatabaseTypeName returns the database system type name. If the name is unknown the OID is returned.
    538 func (r *Rows) ColumnTypeDatabaseTypeName(index int) string {
    539 	if dt, ok := r.conn.conn.TypeMap().TypeForOID(r.rows.FieldDescriptions()[index].DataTypeOID); ok {
    540 		return strings.ToUpper(dt.Name)
    541 	}
    542 
    543 	return strconv.FormatInt(int64(r.rows.FieldDescriptions()[index].DataTypeOID), 10)
    544 }
    545 
    546 const varHeaderSize = 4
    547 
    548 // ColumnTypeLength returns the length of the column type if the column is a
    549 // variable length type. If the column is not a variable length type ok
    550 // should return false.
    551 func (r *Rows) ColumnTypeLength(index int) (int64, bool) {
    552 	fd := r.rows.FieldDescriptions()[index]
    553 
    554 	switch fd.DataTypeOID {
    555 	case pgtype.TextOID, pgtype.ByteaOID:
    556 		return math.MaxInt64, true
    557 	case pgtype.VarcharOID, pgtype.BPCharArrayOID:
    558 		return int64(fd.TypeModifier - varHeaderSize), true
    559 	default:
    560 		return 0, false
    561 	}
    562 }
    563 
    564 // ColumnTypePrecisionScale should return the precision and scale for decimal
    565 // types. If not applicable, ok should be false.
    566 func (r *Rows) ColumnTypePrecisionScale(index int) (precision, scale int64, ok bool) {
    567 	fd := r.rows.FieldDescriptions()[index]
    568 
    569 	switch fd.DataTypeOID {
    570 	case pgtype.NumericOID:
    571 		mod := fd.TypeModifier - varHeaderSize
    572 		precision = int64((mod >> 16) & 0xffff)
    573 		scale = int64(mod & 0xffff)
    574 		return precision, scale, true
    575 	default:
    576 		return 0, 0, false
    577 	}
    578 }
    579 
    580 // ColumnTypeScanType returns the value type that can be used to scan types into.
    581 func (r *Rows) ColumnTypeScanType(index int) reflect.Type {
    582 	fd := r.rows.FieldDescriptions()[index]
    583 
    584 	switch fd.DataTypeOID {
    585 	case pgtype.Float8OID:
    586 		return reflect.TypeOf(float64(0))
    587 	case pgtype.Float4OID:
    588 		return reflect.TypeOf(float32(0))
    589 	case pgtype.Int8OID:
    590 		return reflect.TypeOf(int64(0))
    591 	case pgtype.Int4OID:
    592 		return reflect.TypeOf(int32(0))
    593 	case pgtype.Int2OID:
    594 		return reflect.TypeOf(int16(0))
    595 	case pgtype.BoolOID:
    596 		return reflect.TypeOf(false)
    597 	case pgtype.NumericOID:
    598 		return reflect.TypeOf(float64(0))
    599 	case pgtype.DateOID, pgtype.TimestampOID, pgtype.TimestamptzOID:
    600 		return reflect.TypeOf(time.Time{})
    601 	case pgtype.ByteaOID:
    602 		return reflect.TypeOf([]byte(nil))
    603 	default:
    604 		return reflect.TypeOf("")
    605 	}
    606 }
    607 
    608 func (r *Rows) Close() error {
    609 	r.rows.Close()
    610 	return r.rows.Err()
    611 }
    612 
    613 func (r *Rows) Next(dest []driver.Value) error {
    614 	m := r.conn.conn.TypeMap()
    615 	fieldDescriptions := r.rows.FieldDescriptions()
    616 
    617 	if r.valueFuncs == nil {
    618 		r.valueFuncs = make([]rowValueFunc, len(fieldDescriptions))
    619 
    620 		for i, fd := range fieldDescriptions {
    621 			dataTypeOID := fd.DataTypeOID
    622 			format := fd.Format
    623 
    624 			switch fd.DataTypeOID {
    625 			case pgtype.BoolOID:
    626 				var d bool
    627 				scanPlan := m.PlanScan(dataTypeOID, format, &d)
    628 				r.valueFuncs[i] = func(src []byte) (driver.Value, error) {
    629 					err := scanPlan.Scan(src, &d)
    630 					return d, err
    631 				}
    632 			case pgtype.ByteaOID:
    633 				var d []byte
    634 				scanPlan := m.PlanScan(dataTypeOID, format, &d)
    635 				r.valueFuncs[i] = func(src []byte) (driver.Value, error) {
    636 					err := scanPlan.Scan(src, &d)
    637 					return d, err
    638 				}
    639 			case pgtype.CIDOID, pgtype.OIDOID, pgtype.XIDOID:
    640 				var d pgtype.Uint32
    641 				scanPlan := m.PlanScan(dataTypeOID, format, &d)
    642 				r.valueFuncs[i] = func(src []byte) (driver.Value, error) {
    643 					err := scanPlan.Scan(src, &d)
    644 					if err != nil {
    645 						return nil, err
    646 					}
    647 					return d.Value()
    648 				}
    649 			case pgtype.DateOID:
    650 				var d pgtype.Date
    651 				scanPlan := m.PlanScan(dataTypeOID, format, &d)
    652 				r.valueFuncs[i] = func(src []byte) (driver.Value, error) {
    653 					err := scanPlan.Scan(src, &d)
    654 					if err != nil {
    655 						return nil, err
    656 					}
    657 					return d.Value()
    658 				}
    659 			case pgtype.Float4OID:
    660 				var d float32
    661 				scanPlan := m.PlanScan(dataTypeOID, format, &d)
    662 				r.valueFuncs[i] = func(src []byte) (driver.Value, error) {
    663 					err := scanPlan.Scan(src, &d)
    664 					return float64(d), err
    665 				}
    666 			case pgtype.Float8OID:
    667 				var d float64
    668 				scanPlan := m.PlanScan(dataTypeOID, format, &d)
    669 				r.valueFuncs[i] = func(src []byte) (driver.Value, error) {
    670 					err := scanPlan.Scan(src, &d)
    671 					return d, err
    672 				}
    673 			case pgtype.Int2OID:
    674 				var d int16
    675 				scanPlan := m.PlanScan(dataTypeOID, format, &d)
    676 				r.valueFuncs[i] = func(src []byte) (driver.Value, error) {
    677 					err := scanPlan.Scan(src, &d)
    678 					return int64(d), err
    679 				}
    680 			case pgtype.Int4OID:
    681 				var d int32
    682 				scanPlan := m.PlanScan(dataTypeOID, format, &d)
    683 				r.valueFuncs[i] = func(src []byte) (driver.Value, error) {
    684 					err := scanPlan.Scan(src, &d)
    685 					return int64(d), err
    686 				}
    687 			case pgtype.Int8OID:
    688 				var d int64
    689 				scanPlan := m.PlanScan(dataTypeOID, format, &d)
    690 				r.valueFuncs[i] = func(src []byte) (driver.Value, error) {
    691 					err := scanPlan.Scan(src, &d)
    692 					return d, err
    693 				}
    694 			case pgtype.JSONOID, pgtype.JSONBOID:
    695 				var d []byte
    696 				scanPlan := m.PlanScan(dataTypeOID, format, &d)
    697 				r.valueFuncs[i] = func(src []byte) (driver.Value, error) {
    698 					err := scanPlan.Scan(src, &d)
    699 					if err != nil {
    700 						return nil, err
    701 					}
    702 					return d, nil
    703 				}
    704 			case pgtype.TimestampOID:
    705 				var d pgtype.Timestamp
    706 				scanPlan := m.PlanScan(dataTypeOID, format, &d)
    707 				r.valueFuncs[i] = func(src []byte) (driver.Value, error) {
    708 					err := scanPlan.Scan(src, &d)
    709 					if err != nil {
    710 						return nil, err
    711 					}
    712 					return d.Value()
    713 				}
    714 			case pgtype.TimestamptzOID:
    715 				var d pgtype.Timestamptz
    716 				scanPlan := m.PlanScan(dataTypeOID, format, &d)
    717 				r.valueFuncs[i] = func(src []byte) (driver.Value, error) {
    718 					err := scanPlan.Scan(src, &d)
    719 					if err != nil {
    720 						return nil, err
    721 					}
    722 					return d.Value()
    723 				}
    724 			default:
    725 				var d string
    726 				scanPlan := m.PlanScan(dataTypeOID, format, &d)
    727 				r.valueFuncs[i] = func(src []byte) (driver.Value, error) {
    728 					err := scanPlan.Scan(src, &d)
    729 					return d, err
    730 				}
    731 			}
    732 		}
    733 	}
    734 
    735 	var more bool
    736 	if r.skipNext {
    737 		more = r.skipNextMore
    738 		r.skipNext = false
    739 	} else {
    740 		more = r.rows.Next()
    741 	}
    742 
    743 	if !more {
    744 		if r.rows.Err() == nil {
    745 			return io.EOF
    746 		} else {
    747 			return r.rows.Err()
    748 		}
    749 	}
    750 
    751 	for i, rv := range r.rows.RawValues() {
    752 		if rv != nil {
    753 			var err error
    754 			dest[i], err = r.valueFuncs[i](rv)
    755 			if err != nil {
    756 				return fmt.Errorf("convert field %d failed: %v", i, err)
    757 			}
    758 		} else {
    759 			dest[i] = nil
    760 		}
    761 	}
    762 
    763 	return nil
    764 }
    765 
    766 func valueToInterface(argsV []driver.Value) []any {
    767 	args := make([]any, 0, len(argsV))
    768 	for _, v := range argsV {
    769 		if v != nil {
    770 			args = append(args, v.(any))
    771 		} else {
    772 			args = append(args, nil)
    773 		}
    774 	}
    775 	return args
    776 }
    777 
    778 func namedValueToInterface(argsV []driver.NamedValue) []any {
    779 	args := make([]any, 0, len(argsV))
    780 	for _, v := range argsV {
    781 		if v.Value != nil {
    782 			args = append(args, v.Value.(any))
    783 		} else {
    784 			args = append(args, nil)
    785 		}
    786 	}
    787 	return args
    788 }
    789 
    790 type wrapTx struct {
    791 	ctx context.Context
    792 	tx  pgx.Tx
    793 }
    794 
    795 func (wtx wrapTx) Commit() error { return wtx.tx.Commit(wtx.ctx) }
    796 
    797 func (wtx wrapTx) Rollback() error { return wtx.tx.Rollback(wtx.ctx) }