gtsocial-umbx

Unnamed repository; edit this file 'description' to name the repository.
Log | Files | Refs | README | LICENSE

conn.go (43111B)


      1 package pgx
      2 
      3 import (
      4 	"context"
      5 	"errors"
      6 	"fmt"
      7 	"strconv"
      8 	"strings"
      9 	"time"
     10 
     11 	"github.com/jackc/pgx/v5/internal/anynil"
     12 	"github.com/jackc/pgx/v5/internal/sanitize"
     13 	"github.com/jackc/pgx/v5/internal/stmtcache"
     14 	"github.com/jackc/pgx/v5/pgconn"
     15 	"github.com/jackc/pgx/v5/pgtype"
     16 )
     17 
     18 // ConnConfig contains all the options used to establish a connection. It must be created by ParseConfig and
     19 // then it can be modified. A manually initialized ConnConfig will cause ConnectConfig to panic.
     20 type ConnConfig struct {
     21 	pgconn.Config
     22 
     23 	Tracer QueryTracer
     24 
     25 	// Original connection string that was parsed into config.
     26 	connString string
     27 
     28 	// StatementCacheCapacity is maximum size of the statement cache used when executing a query with "cache_statement"
     29 	// query exec mode.
     30 	StatementCacheCapacity int
     31 
     32 	// DescriptionCacheCapacity is the maximum size of the description cache used when executing a query with
     33 	// "cache_describe" query exec mode.
     34 	DescriptionCacheCapacity int
     35 
     36 	// DefaultQueryExecMode controls the default mode for executing queries. By default pgx uses the extended protocol
     37 	// and automatically prepares and caches prepared statements. However, this may be incompatible with proxies such as
     38 	// PGBouncer. In this case it may be preferrable to use QueryExecModeExec or QueryExecModeSimpleProtocol. The same
     39 	// functionality can be controlled on a per query basis by passing a QueryExecMode as the first query argument.
     40 	DefaultQueryExecMode QueryExecMode
     41 
     42 	createdByParseConfig bool // Used to enforce created by ParseConfig rule.
     43 }
     44 
     45 // ParseConfigOptions contains options that control how a config is built such as getsslpassword.
     46 type ParseConfigOptions struct {
     47 	pgconn.ParseConfigOptions
     48 }
     49 
     50 // Copy returns a deep copy of the config that is safe to use and modify.
     51 // The only exception is the tls.Config:
     52 // according to the tls.Config docs it must not be modified after creation.
     53 func (cc *ConnConfig) Copy() *ConnConfig {
     54 	newConfig := new(ConnConfig)
     55 	*newConfig = *cc
     56 	newConfig.Config = *newConfig.Config.Copy()
     57 	return newConfig
     58 }
     59 
     60 // ConnString returns the connection string as parsed by pgx.ParseConfig into pgx.ConnConfig.
     61 func (cc *ConnConfig) ConnString() string { return cc.connString }
     62 
     63 // Conn is a PostgreSQL connection handle. It is not safe for concurrent usage. Use a connection pool to manage access
     64 // to multiple database connections from multiple goroutines.
     65 type Conn struct {
     66 	pgConn             *pgconn.PgConn
     67 	config             *ConnConfig // config used when establishing this connection
     68 	preparedStatements map[string]*pgconn.StatementDescription
     69 	statementCache     stmtcache.Cache
     70 	descriptionCache   stmtcache.Cache
     71 
     72 	queryTracer    QueryTracer
     73 	batchTracer    BatchTracer
     74 	copyFromTracer CopyFromTracer
     75 	prepareTracer  PrepareTracer
     76 
     77 	notifications []*pgconn.Notification
     78 
     79 	doneChan   chan struct{}
     80 	closedChan chan error
     81 
     82 	typeMap *pgtype.Map
     83 
     84 	wbuf []byte
     85 	eqb  ExtendedQueryBuilder
     86 }
     87 
     88 // Identifier a PostgreSQL identifier or name. Identifiers can be composed of
     89 // multiple parts such as ["schema", "table"] or ["table", "column"].
     90 type Identifier []string
     91 
     92 // Sanitize returns a sanitized string safe for SQL interpolation.
     93 func (ident Identifier) Sanitize() string {
     94 	parts := make([]string, len(ident))
     95 	for i := range ident {
     96 		s := strings.ReplaceAll(ident[i], string([]byte{0}), "")
     97 		parts[i] = `"` + strings.ReplaceAll(s, `"`, `""`) + `"`
     98 	}
     99 	return strings.Join(parts, ".")
    100 }
    101 
    102 // ErrNoRows occurs when rows are expected but none are returned.
    103 var ErrNoRows = errors.New("no rows in result set")
    104 
    105 var errDisabledStatementCache = fmt.Errorf("cannot use QueryExecModeCacheStatement with disabled statement cache")
    106 var errDisabledDescriptionCache = fmt.Errorf("cannot use QueryExecModeCacheDescribe with disabled description cache")
    107 
    108 // Connect establishes a connection with a PostgreSQL server with a connection string. See
    109 // pgconn.Connect for details.
    110 func Connect(ctx context.Context, connString string) (*Conn, error) {
    111 	connConfig, err := ParseConfig(connString)
    112 	if err != nil {
    113 		return nil, err
    114 	}
    115 	return connect(ctx, connConfig)
    116 }
    117 
    118 // ConnectWithOptions behaves exactly like Connect with the addition of options. At the present options is only used to
    119 // provide a GetSSLPassword function.
    120 func ConnectWithOptions(ctx context.Context, connString string, options ParseConfigOptions) (*Conn, error) {
    121 	connConfig, err := ParseConfigWithOptions(connString, options)
    122 	if err != nil {
    123 		return nil, err
    124 	}
    125 	return connect(ctx, connConfig)
    126 }
    127 
    128 // ConnectConfig establishes a connection with a PostgreSQL server with a configuration struct.
    129 // connConfig must have been created by ParseConfig.
    130 func ConnectConfig(ctx context.Context, connConfig *ConnConfig) (*Conn, error) {
    131 	// In general this improves safety. In particular avoid the config.Config.OnNotification mutation from affecting other
    132 	// connections with the same config. See https://github.com/jackc/pgx/issues/618.
    133 	connConfig = connConfig.Copy()
    134 
    135 	return connect(ctx, connConfig)
    136 }
    137 
    138 // ParseConfigWithOptions behaves exactly as ParseConfig does with the addition of options. At the present options is
    139 // only used to provide a GetSSLPassword function.
    140 func ParseConfigWithOptions(connString string, options ParseConfigOptions) (*ConnConfig, error) {
    141 	config, err := pgconn.ParseConfigWithOptions(connString, options.ParseConfigOptions)
    142 	if err != nil {
    143 		return nil, err
    144 	}
    145 
    146 	statementCacheCapacity := 512
    147 	if s, ok := config.RuntimeParams["statement_cache_capacity"]; ok {
    148 		delete(config.RuntimeParams, "statement_cache_capacity")
    149 		n, err := strconv.ParseInt(s, 10, 32)
    150 		if err != nil {
    151 			return nil, fmt.Errorf("cannot parse statement_cache_capacity: %w", err)
    152 		}
    153 		statementCacheCapacity = int(n)
    154 	}
    155 
    156 	descriptionCacheCapacity := 512
    157 	if s, ok := config.RuntimeParams["description_cache_capacity"]; ok {
    158 		delete(config.RuntimeParams, "description_cache_capacity")
    159 		n, err := strconv.ParseInt(s, 10, 32)
    160 		if err != nil {
    161 			return nil, fmt.Errorf("cannot parse description_cache_capacity: %w", err)
    162 		}
    163 		descriptionCacheCapacity = int(n)
    164 	}
    165 
    166 	defaultQueryExecMode := QueryExecModeCacheStatement
    167 	if s, ok := config.RuntimeParams["default_query_exec_mode"]; ok {
    168 		delete(config.RuntimeParams, "default_query_exec_mode")
    169 		switch s {
    170 		case "cache_statement":
    171 			defaultQueryExecMode = QueryExecModeCacheStatement
    172 		case "cache_describe":
    173 			defaultQueryExecMode = QueryExecModeCacheDescribe
    174 		case "describe_exec":
    175 			defaultQueryExecMode = QueryExecModeDescribeExec
    176 		case "exec":
    177 			defaultQueryExecMode = QueryExecModeExec
    178 		case "simple_protocol":
    179 			defaultQueryExecMode = QueryExecModeSimpleProtocol
    180 		default:
    181 			return nil, fmt.Errorf("invalid default_query_exec_mode: %s", s)
    182 		}
    183 	}
    184 
    185 	connConfig := &ConnConfig{
    186 		Config:                   *config,
    187 		createdByParseConfig:     true,
    188 		StatementCacheCapacity:   statementCacheCapacity,
    189 		DescriptionCacheCapacity: descriptionCacheCapacity,
    190 		DefaultQueryExecMode:     defaultQueryExecMode,
    191 		connString:               connString,
    192 	}
    193 
    194 	return connConfig, nil
    195 }
    196 
    197 // ParseConfig creates a ConnConfig from a connection string. ParseConfig handles all options that pgconn.ParseConfig
    198 // does. In addition, it accepts the following options:
    199 //
    200 //   - default_query_exec_mode.
    201 //     Possible values: "cache_statement", "cache_describe", "describe_exec", "exec", and "simple_protocol". See
    202 //     QueryExecMode constant documentation for the meaning of these values. Default: "cache_statement".
    203 //
    204 //   - statement_cache_capacity.
    205 //     The maximum size of the statement cache used when executing a query with "cache_statement" query exec mode.
    206 //     Default: 512.
    207 //
    208 //   - description_cache_capacity.
    209 //     The maximum size of the description cache used when executing a query with "cache_describe" query exec mode.
    210 //     Default: 512.
    211 func ParseConfig(connString string) (*ConnConfig, error) {
    212 	return ParseConfigWithOptions(connString, ParseConfigOptions{})
    213 }
    214 
    215 // connect connects to a database. connect takes ownership of config. The caller must not use or access it again.
    216 func connect(ctx context.Context, config *ConnConfig) (c *Conn, err error) {
    217 	if connectTracer, ok := config.Tracer.(ConnectTracer); ok {
    218 		ctx = connectTracer.TraceConnectStart(ctx, TraceConnectStartData{ConnConfig: config})
    219 		defer func() {
    220 			connectTracer.TraceConnectEnd(ctx, TraceConnectEndData{Conn: c, Err: err})
    221 		}()
    222 	}
    223 
    224 	// Default values are set in ParseConfig. Enforce initial creation by ParseConfig rather than setting defaults from
    225 	// zero values.
    226 	if !config.createdByParseConfig {
    227 		panic("config must be created by ParseConfig")
    228 	}
    229 
    230 	c = &Conn{
    231 		config:      config,
    232 		typeMap:     pgtype.NewMap(),
    233 		queryTracer: config.Tracer,
    234 	}
    235 
    236 	if t, ok := c.queryTracer.(BatchTracer); ok {
    237 		c.batchTracer = t
    238 	}
    239 	if t, ok := c.queryTracer.(CopyFromTracer); ok {
    240 		c.copyFromTracer = t
    241 	}
    242 	if t, ok := c.queryTracer.(PrepareTracer); ok {
    243 		c.prepareTracer = t
    244 	}
    245 
    246 	// Only install pgx notification system if no other callback handler is present.
    247 	if config.Config.OnNotification == nil {
    248 		config.Config.OnNotification = c.bufferNotifications
    249 	}
    250 
    251 	c.pgConn, err = pgconn.ConnectConfig(ctx, &config.Config)
    252 	if err != nil {
    253 		return nil, err
    254 	}
    255 
    256 	c.preparedStatements = make(map[string]*pgconn.StatementDescription)
    257 	c.doneChan = make(chan struct{})
    258 	c.closedChan = make(chan error)
    259 	c.wbuf = make([]byte, 0, 1024)
    260 
    261 	if c.config.StatementCacheCapacity > 0 {
    262 		c.statementCache = stmtcache.NewLRUCache(c.config.StatementCacheCapacity)
    263 	}
    264 
    265 	if c.config.DescriptionCacheCapacity > 0 {
    266 		c.descriptionCache = stmtcache.NewLRUCache(c.config.DescriptionCacheCapacity)
    267 	}
    268 
    269 	return c, nil
    270 }
    271 
    272 // Close closes a connection. It is safe to call Close on a already closed
    273 // connection.
    274 func (c *Conn) Close(ctx context.Context) error {
    275 	if c.IsClosed() {
    276 		return nil
    277 	}
    278 
    279 	err := c.pgConn.Close(ctx)
    280 	return err
    281 }
    282 
    283 // Prepare creates a prepared statement with name and sql. sql can contain placeholders
    284 // for bound parameters. These placeholders are referenced positional as $1, $2, etc.
    285 //
    286 // Prepare is idempotent; i.e. it is safe to call Prepare multiple times with the same
    287 // name and sql arguments. This allows a code path to Prepare and Query/Exec without
    288 // concern for if the statement has already been prepared.
    289 func (c *Conn) Prepare(ctx context.Context, name, sql string) (sd *pgconn.StatementDescription, err error) {
    290 	if c.prepareTracer != nil {
    291 		ctx = c.prepareTracer.TracePrepareStart(ctx, c, TracePrepareStartData{Name: name, SQL: sql})
    292 	}
    293 
    294 	if name != "" {
    295 		var ok bool
    296 		if sd, ok = c.preparedStatements[name]; ok && sd.SQL == sql {
    297 			if c.prepareTracer != nil {
    298 				c.prepareTracer.TracePrepareEnd(ctx, c, TracePrepareEndData{AlreadyPrepared: true})
    299 			}
    300 			return sd, nil
    301 		}
    302 	}
    303 
    304 	if c.prepareTracer != nil {
    305 		defer func() {
    306 			c.prepareTracer.TracePrepareEnd(ctx, c, TracePrepareEndData{Err: err})
    307 		}()
    308 	}
    309 
    310 	sd, err = c.pgConn.Prepare(ctx, name, sql, nil)
    311 	if err != nil {
    312 		return nil, err
    313 	}
    314 
    315 	if name != "" {
    316 		c.preparedStatements[name] = sd
    317 	}
    318 
    319 	return sd, nil
    320 }
    321 
    322 // Deallocate released a prepared statement
    323 func (c *Conn) Deallocate(ctx context.Context, name string) error {
    324 	delete(c.preparedStatements, name)
    325 	_, err := c.pgConn.Exec(ctx, "deallocate "+quoteIdentifier(name)).ReadAll()
    326 	return err
    327 }
    328 
    329 // DeallocateAll releases all previously prepared statements from the server and client, where it also resets the statement and description cache.
    330 func (c *Conn) DeallocateAll(ctx context.Context) error {
    331 	c.preparedStatements = map[string]*pgconn.StatementDescription{}
    332 	if c.config.StatementCacheCapacity > 0 {
    333 		c.statementCache = stmtcache.NewLRUCache(c.config.StatementCacheCapacity)
    334 	}
    335 	if c.config.DescriptionCacheCapacity > 0 {
    336 		c.descriptionCache = stmtcache.NewLRUCache(c.config.DescriptionCacheCapacity)
    337 	}
    338 	_, err := c.pgConn.Exec(ctx, "deallocate all").ReadAll()
    339 	return err
    340 }
    341 
    342 func (c *Conn) bufferNotifications(_ *pgconn.PgConn, n *pgconn.Notification) {
    343 	c.notifications = append(c.notifications, n)
    344 }
    345 
    346 // WaitForNotification waits for a PostgreSQL notification. It wraps the underlying pgconn notification system in a
    347 // slightly more convenient form.
    348 func (c *Conn) WaitForNotification(ctx context.Context) (*pgconn.Notification, error) {
    349 	var n *pgconn.Notification
    350 
    351 	// Return already received notification immediately
    352 	if len(c.notifications) > 0 {
    353 		n = c.notifications[0]
    354 		c.notifications = c.notifications[1:]
    355 		return n, nil
    356 	}
    357 
    358 	err := c.pgConn.WaitForNotification(ctx)
    359 	if len(c.notifications) > 0 {
    360 		n = c.notifications[0]
    361 		c.notifications = c.notifications[1:]
    362 	}
    363 	return n, err
    364 }
    365 
    366 // IsClosed reports if the connection has been closed.
    367 func (c *Conn) IsClosed() bool {
    368 	return c.pgConn.IsClosed()
    369 }
    370 
    371 func (c *Conn) die(err error) {
    372 	if c.IsClosed() {
    373 		return
    374 	}
    375 
    376 	ctx, cancel := context.WithCancel(context.Background())
    377 	cancel() // force immediate hard cancel
    378 	c.pgConn.Close(ctx)
    379 }
    380 
    381 func quoteIdentifier(s string) string {
    382 	return `"` + strings.ReplaceAll(s, `"`, `""`) + `"`
    383 }
    384 
    385 // Ping delegates to the underlying *pgconn.PgConn.Ping.
    386 func (c *Conn) Ping(ctx context.Context) error {
    387 	return c.pgConn.Ping(ctx)
    388 }
    389 
    390 // PgConn returns the underlying *pgconn.PgConn. This is an escape hatch method that allows lower level access to the
    391 // PostgreSQL connection than pgx exposes.
    392 //
    393 // It is strongly recommended that the connection be idle (no in-progress queries) before the underlying *pgconn.PgConn
    394 // is used and the connection must be returned to the same state before any *pgx.Conn methods are again used.
    395 func (c *Conn) PgConn() *pgconn.PgConn { return c.pgConn }
    396 
    397 // TypeMap returns the connection info used for this connection.
    398 func (c *Conn) TypeMap() *pgtype.Map { return c.typeMap }
    399 
    400 // Config returns a copy of config that was used to establish this connection.
    401 func (c *Conn) Config() *ConnConfig { return c.config.Copy() }
    402 
    403 // Exec executes sql. sql can be either a prepared statement name or an SQL string. arguments should be referenced
    404 // positionally from the sql string as $1, $2, etc.
    405 func (c *Conn) Exec(ctx context.Context, sql string, arguments ...any) (pgconn.CommandTag, error) {
    406 	if c.queryTracer != nil {
    407 		ctx = c.queryTracer.TraceQueryStart(ctx, c, TraceQueryStartData{SQL: sql, Args: arguments})
    408 	}
    409 
    410 	if err := c.deallocateInvalidatedCachedStatements(ctx); err != nil {
    411 		return pgconn.CommandTag{}, err
    412 	}
    413 
    414 	commandTag, err := c.exec(ctx, sql, arguments...)
    415 
    416 	if c.queryTracer != nil {
    417 		c.queryTracer.TraceQueryEnd(ctx, c, TraceQueryEndData{CommandTag: commandTag, Err: err})
    418 	}
    419 
    420 	return commandTag, err
    421 }
    422 
    423 func (c *Conn) exec(ctx context.Context, sql string, arguments ...any) (commandTag pgconn.CommandTag, err error) {
    424 	mode := c.config.DefaultQueryExecMode
    425 	var queryRewriter QueryRewriter
    426 
    427 optionLoop:
    428 	for len(arguments) > 0 {
    429 		switch arg := arguments[0].(type) {
    430 		case QueryExecMode:
    431 			mode = arg
    432 			arguments = arguments[1:]
    433 		case QueryRewriter:
    434 			queryRewriter = arg
    435 			arguments = arguments[1:]
    436 		default:
    437 			break optionLoop
    438 		}
    439 	}
    440 
    441 	if queryRewriter != nil {
    442 		sql, arguments, err = queryRewriter.RewriteQuery(ctx, c, sql, arguments)
    443 		if err != nil {
    444 			return pgconn.CommandTag{}, fmt.Errorf("rewrite query failed: %v", err)
    445 		}
    446 	}
    447 
    448 	// Always use simple protocol when there are no arguments.
    449 	if len(arguments) == 0 {
    450 		mode = QueryExecModeSimpleProtocol
    451 	}
    452 
    453 	if sd, ok := c.preparedStatements[sql]; ok {
    454 		return c.execPrepared(ctx, sd, arguments)
    455 	}
    456 
    457 	switch mode {
    458 	case QueryExecModeCacheStatement:
    459 		if c.statementCache == nil {
    460 			return pgconn.CommandTag{}, errDisabledStatementCache
    461 		}
    462 		sd := c.statementCache.Get(sql)
    463 		if sd == nil {
    464 			sd, err = c.Prepare(ctx, stmtcache.NextStatementName(), sql)
    465 			if err != nil {
    466 				return pgconn.CommandTag{}, err
    467 			}
    468 			c.statementCache.Put(sd)
    469 		}
    470 
    471 		return c.execPrepared(ctx, sd, arguments)
    472 	case QueryExecModeCacheDescribe:
    473 		if c.descriptionCache == nil {
    474 			return pgconn.CommandTag{}, errDisabledDescriptionCache
    475 		}
    476 		sd := c.descriptionCache.Get(sql)
    477 		if sd == nil {
    478 			sd, err = c.Prepare(ctx, "", sql)
    479 			if err != nil {
    480 				return pgconn.CommandTag{}, err
    481 			}
    482 		}
    483 
    484 		return c.execParams(ctx, sd, arguments)
    485 	case QueryExecModeDescribeExec:
    486 		sd, err := c.Prepare(ctx, "", sql)
    487 		if err != nil {
    488 			return pgconn.CommandTag{}, err
    489 		}
    490 		return c.execPrepared(ctx, sd, arguments)
    491 	case QueryExecModeExec:
    492 		return c.execSQLParams(ctx, sql, arguments)
    493 	case QueryExecModeSimpleProtocol:
    494 		return c.execSimpleProtocol(ctx, sql, arguments)
    495 	default:
    496 		return pgconn.CommandTag{}, fmt.Errorf("unknown QueryExecMode: %v", mode)
    497 	}
    498 }
    499 
    500 func (c *Conn) execSimpleProtocol(ctx context.Context, sql string, arguments []any) (commandTag pgconn.CommandTag, err error) {
    501 	if len(arguments) > 0 {
    502 		sql, err = c.sanitizeForSimpleQuery(sql, arguments...)
    503 		if err != nil {
    504 			return pgconn.CommandTag{}, err
    505 		}
    506 	}
    507 
    508 	mrr := c.pgConn.Exec(ctx, sql)
    509 	for mrr.NextResult() {
    510 		commandTag, err = mrr.ResultReader().Close()
    511 	}
    512 	err = mrr.Close()
    513 	return commandTag, err
    514 }
    515 
    516 func (c *Conn) execParams(ctx context.Context, sd *pgconn.StatementDescription, arguments []any) (pgconn.CommandTag, error) {
    517 	err := c.eqb.Build(c.typeMap, sd, arguments)
    518 	if err != nil {
    519 		return pgconn.CommandTag{}, err
    520 	}
    521 
    522 	result := c.pgConn.ExecParams(ctx, sd.SQL, c.eqb.ParamValues, sd.ParamOIDs, c.eqb.ParamFormats, c.eqb.ResultFormats).Read()
    523 	c.eqb.reset() // Allow c.eqb internal memory to be GC'ed as soon as possible.
    524 	return result.CommandTag, result.Err
    525 }
    526 
    527 func (c *Conn) execPrepared(ctx context.Context, sd *pgconn.StatementDescription, arguments []any) (pgconn.CommandTag, error) {
    528 	err := c.eqb.Build(c.typeMap, sd, arguments)
    529 	if err != nil {
    530 		return pgconn.CommandTag{}, err
    531 	}
    532 
    533 	result := c.pgConn.ExecPrepared(ctx, sd.Name, c.eqb.ParamValues, c.eqb.ParamFormats, c.eqb.ResultFormats).Read()
    534 	c.eqb.reset() // Allow c.eqb internal memory to be GC'ed as soon as possible.
    535 	return result.CommandTag, result.Err
    536 }
    537 
    538 type unknownArgumentTypeQueryExecModeExecError struct {
    539 	arg any
    540 }
    541 
    542 func (e *unknownArgumentTypeQueryExecModeExecError) Error() string {
    543 	return fmt.Sprintf("cannot use unregistered type %T as query argument in QueryExecModeExec", e.arg)
    544 }
    545 
    546 func (c *Conn) execSQLParams(ctx context.Context, sql string, args []any) (pgconn.CommandTag, error) {
    547 	err := c.eqb.Build(c.typeMap, nil, args)
    548 	if err != nil {
    549 		return pgconn.CommandTag{}, err
    550 	}
    551 
    552 	result := c.pgConn.ExecParams(ctx, sql, c.eqb.ParamValues, nil, c.eqb.ParamFormats, c.eqb.ResultFormats).Read()
    553 	c.eqb.reset() // Allow c.eqb internal memory to be GC'ed as soon as possible.
    554 	return result.CommandTag, result.Err
    555 }
    556 
    557 func (c *Conn) getRows(ctx context.Context, sql string, args []any) *baseRows {
    558 	r := &baseRows{}
    559 
    560 	r.ctx = ctx
    561 	r.queryTracer = c.queryTracer
    562 	r.typeMap = c.typeMap
    563 	r.startTime = time.Now()
    564 	r.sql = sql
    565 	r.args = args
    566 	r.conn = c
    567 
    568 	return r
    569 }
    570 
    571 type QueryExecMode int32
    572 
    573 const (
    574 	_ QueryExecMode = iota
    575 
    576 	// Automatically prepare and cache statements. This uses the extended protocol. Queries are executed in a single
    577 	// round trip after the statement is cached. This is the default.
    578 	QueryExecModeCacheStatement
    579 
    580 	// Cache statement descriptions (i.e. argument and result types) and assume they do not change. This uses the
    581 	// extended protocol. Queries are executed in a single round trip after the description is cached. If the database
    582 	// schema is modified or the search_path is changed this may result in undetected result decoding errors.
    583 	QueryExecModeCacheDescribe
    584 
    585 	// Get the statement description on every execution. This uses the extended protocol. Queries require two round trips
    586 	// to execute. It does not use named prepared statements. But it does use the unnamed prepared statement to get the
    587 	// statement description on the first round trip and then uses it to execute the query on the second round trip. This
    588 	// may cause problems with connection poolers that switch the underlying connection between round trips. It is safe
    589 	// even when the the database schema is modified concurrently.
    590 	QueryExecModeDescribeExec
    591 
    592 	// Assume the PostgreSQL query parameter types based on the Go type of the arguments. This uses the extended protocol
    593 	// with text formatted parameters and results. Queries are executed in a single round trip. Type mappings can be
    594 	// registered with pgtype.Map.RegisterDefaultPgType. Queries will be rejected that have arguments that are
    595 	// unregistered or ambigious. e.g. A map[string]string may have the PostgreSQL type json or hstore. Modes that know
    596 	// the PostgreSQL type can use a map[string]string directly as an argument. This mode cannot.
    597 	QueryExecModeExec
    598 
    599 	// Use the simple protocol. Assume the PostgreSQL query parameter types based on the Go type of the arguments.
    600 	// Queries are executed in a single round trip. Type mappings can be registered with
    601 	// pgtype.Map.RegisterDefaultPgType. Queries will be rejected that have arguments that are unregistered or ambigious.
    602 	// e.g. A map[string]string may have the PostgreSQL type json or hstore. Modes that know the PostgreSQL type can use
    603 	// a map[string]string directly as an argument. This mode cannot.
    604 	//
    605 	// QueryExecModeSimpleProtocol should have the user application visible behavior as QueryExecModeExec with minor
    606 	// exceptions such as behavior when multiple result returning queries are erroneously sent in a single string.
    607 	//
    608 	// QueryExecModeSimpleProtocol uses client side parameter interpolation. All values are quoted and escaped. Prefer
    609 	// QueryExecModeExec over QueryExecModeSimpleProtocol whenever possible. In general QueryExecModeSimpleProtocol
    610 	// should only be used if connecting to a proxy server, connection pool server, or non-PostgreSQL server that does
    611 	// not support the extended protocol.
    612 	QueryExecModeSimpleProtocol
    613 )
    614 
    615 func (m QueryExecMode) String() string {
    616 	switch m {
    617 	case QueryExecModeCacheStatement:
    618 		return "cache statement"
    619 	case QueryExecModeCacheDescribe:
    620 		return "cache describe"
    621 	case QueryExecModeDescribeExec:
    622 		return "describe exec"
    623 	case QueryExecModeExec:
    624 		return "exec"
    625 	case QueryExecModeSimpleProtocol:
    626 		return "simple protocol"
    627 	default:
    628 		return "invalid"
    629 	}
    630 }
    631 
    632 // QueryResultFormats controls the result format (text=0, binary=1) of a query by result column position.
    633 type QueryResultFormats []int16
    634 
    635 // QueryResultFormatsByOID controls the result format (text=0, binary=1) of a query by the result column OID.
    636 type QueryResultFormatsByOID map[uint32]int16
    637 
    638 // QueryRewriter rewrites a query when used as the first arguments to a query method.
    639 type QueryRewriter interface {
    640 	RewriteQuery(ctx context.Context, conn *Conn, sql string, args []any) (newSQL string, newArgs []any, err error)
    641 }
    642 
    643 // Query sends a query to the server and returns a Rows to read the results. Only errors encountered sending the query
    644 // and initializing Rows will be returned. Err() on the returned Rows must be checked after the Rows is closed to
    645 // determine if the query executed successfully.
    646 //
    647 // The returned Rows must be closed before the connection can be used again. It is safe to attempt to read from the
    648 // returned Rows even if an error is returned. The error will be the available in rows.Err() after rows are closed. It
    649 // is allowed to ignore the error returned from Query and handle it in Rows.
    650 //
    651 // It is possible for a call of FieldDescriptions on the returned Rows to return nil even if the Query call did not
    652 // return an error.
    653 //
    654 // It is possible for a query to return one or more rows before encountering an error. In most cases the rows should be
    655 // collected before processing rather than processed while receiving each row. This avoids the possibility of the
    656 // application processing rows from a query that the server rejected. The CollectRows function is useful here.
    657 //
    658 // An implementor of QueryRewriter may be passed as the first element of args. It can rewrite the sql and change or
    659 // replace args. For example, NamedArgs is QueryRewriter that implements named arguments.
    660 //
    661 // For extra control over how the query is executed, the types QueryExecMode, QueryResultFormats, and
    662 // QueryResultFormatsByOID may be used as the first args to control exactly how the query is executed. This is rarely
    663 // needed. See the documentation for those types for details.
    664 func (c *Conn) Query(ctx context.Context, sql string, args ...any) (Rows, error) {
    665 	if c.queryTracer != nil {
    666 		ctx = c.queryTracer.TraceQueryStart(ctx, c, TraceQueryStartData{SQL: sql, Args: args})
    667 	}
    668 
    669 	if err := c.deallocateInvalidatedCachedStatements(ctx); err != nil {
    670 		if c.queryTracer != nil {
    671 			c.queryTracer.TraceQueryEnd(ctx, c, TraceQueryEndData{Err: err})
    672 		}
    673 		return &baseRows{err: err, closed: true}, err
    674 	}
    675 
    676 	var resultFormats QueryResultFormats
    677 	var resultFormatsByOID QueryResultFormatsByOID
    678 	mode := c.config.DefaultQueryExecMode
    679 	var queryRewriter QueryRewriter
    680 
    681 optionLoop:
    682 	for len(args) > 0 {
    683 		switch arg := args[0].(type) {
    684 		case QueryResultFormats:
    685 			resultFormats = arg
    686 			args = args[1:]
    687 		case QueryResultFormatsByOID:
    688 			resultFormatsByOID = arg
    689 			args = args[1:]
    690 		case QueryExecMode:
    691 			mode = arg
    692 			args = args[1:]
    693 		case QueryRewriter:
    694 			queryRewriter = arg
    695 			args = args[1:]
    696 		default:
    697 			break optionLoop
    698 		}
    699 	}
    700 
    701 	if queryRewriter != nil {
    702 		var err error
    703 		originalSQL := sql
    704 		originalArgs := args
    705 		sql, args, err = queryRewriter.RewriteQuery(ctx, c, sql, args)
    706 		if err != nil {
    707 			rows := c.getRows(ctx, originalSQL, originalArgs)
    708 			err = fmt.Errorf("rewrite query failed: %v", err)
    709 			rows.fatal(err)
    710 			return rows, err
    711 		}
    712 	}
    713 
    714 	// Bypass any statement caching.
    715 	if sql == "" {
    716 		mode = QueryExecModeSimpleProtocol
    717 	}
    718 
    719 	c.eqb.reset()
    720 	anynil.NormalizeSlice(args)
    721 	rows := c.getRows(ctx, sql, args)
    722 
    723 	var err error
    724 	sd, explicitPreparedStatement := c.preparedStatements[sql]
    725 	if sd != nil || mode == QueryExecModeCacheStatement || mode == QueryExecModeCacheDescribe || mode == QueryExecModeDescribeExec {
    726 		if sd == nil {
    727 			sd, err = c.getStatementDescription(ctx, mode, sql)
    728 			if err != nil {
    729 				rows.fatal(err)
    730 				return rows, err
    731 			}
    732 		}
    733 
    734 		if len(sd.ParamOIDs) != len(args) {
    735 			rows.fatal(fmt.Errorf("expected %d arguments, got %d", len(sd.ParamOIDs), len(args)))
    736 			return rows, rows.err
    737 		}
    738 
    739 		rows.sql = sd.SQL
    740 
    741 		err = c.eqb.Build(c.typeMap, sd, args)
    742 		if err != nil {
    743 			rows.fatal(err)
    744 			return rows, rows.err
    745 		}
    746 
    747 		if resultFormatsByOID != nil {
    748 			resultFormats = make([]int16, len(sd.Fields))
    749 			for i := range resultFormats {
    750 				resultFormats[i] = resultFormatsByOID[uint32(sd.Fields[i].DataTypeOID)]
    751 			}
    752 		}
    753 
    754 		if resultFormats == nil {
    755 			resultFormats = c.eqb.ResultFormats
    756 		}
    757 
    758 		if !explicitPreparedStatement && mode == QueryExecModeCacheDescribe {
    759 			rows.resultReader = c.pgConn.ExecParams(ctx, sql, c.eqb.ParamValues, sd.ParamOIDs, c.eqb.ParamFormats, resultFormats)
    760 		} else {
    761 			rows.resultReader = c.pgConn.ExecPrepared(ctx, sd.Name, c.eqb.ParamValues, c.eqb.ParamFormats, resultFormats)
    762 		}
    763 	} else if mode == QueryExecModeExec {
    764 		err := c.eqb.Build(c.typeMap, nil, args)
    765 		if err != nil {
    766 			rows.fatal(err)
    767 			return rows, rows.err
    768 		}
    769 
    770 		rows.resultReader = c.pgConn.ExecParams(ctx, sql, c.eqb.ParamValues, nil, c.eqb.ParamFormats, c.eqb.ResultFormats)
    771 	} else if mode == QueryExecModeSimpleProtocol {
    772 		sql, err = c.sanitizeForSimpleQuery(sql, args...)
    773 		if err != nil {
    774 			rows.fatal(err)
    775 			return rows, err
    776 		}
    777 
    778 		mrr := c.pgConn.Exec(ctx, sql)
    779 		if mrr.NextResult() {
    780 			rows.resultReader = mrr.ResultReader()
    781 			rows.multiResultReader = mrr
    782 		} else {
    783 			err = mrr.Close()
    784 			rows.fatal(err)
    785 			return rows, err
    786 		}
    787 
    788 		return rows, nil
    789 	} else {
    790 		err = fmt.Errorf("unknown QueryExecMode: %v", mode)
    791 		rows.fatal(err)
    792 		return rows, rows.err
    793 	}
    794 
    795 	c.eqb.reset() // Allow c.eqb internal memory to be GC'ed as soon as possible.
    796 
    797 	return rows, rows.err
    798 }
    799 
    800 // getStatementDescription returns the statement description of the sql query
    801 // according to the given mode.
    802 //
    803 // If the mode is one that doesn't require to know the param and result OIDs
    804 // then nil is returned without error.
    805 func (c *Conn) getStatementDescription(
    806 	ctx context.Context,
    807 	mode QueryExecMode,
    808 	sql string,
    809 ) (sd *pgconn.StatementDescription, err error) {
    810 
    811 	switch mode {
    812 	case QueryExecModeCacheStatement:
    813 		if c.statementCache == nil {
    814 			return nil, errDisabledStatementCache
    815 		}
    816 		sd = c.statementCache.Get(sql)
    817 		if sd == nil {
    818 			sd, err = c.Prepare(ctx, stmtcache.NextStatementName(), sql)
    819 			if err != nil {
    820 				return nil, err
    821 			}
    822 			c.statementCache.Put(sd)
    823 		}
    824 	case QueryExecModeCacheDescribe:
    825 		if c.descriptionCache == nil {
    826 			return nil, errDisabledDescriptionCache
    827 		}
    828 		sd = c.descriptionCache.Get(sql)
    829 		if sd == nil {
    830 			sd, err = c.Prepare(ctx, "", sql)
    831 			if err != nil {
    832 				return nil, err
    833 			}
    834 			c.descriptionCache.Put(sd)
    835 		}
    836 	case QueryExecModeDescribeExec:
    837 		return c.Prepare(ctx, "", sql)
    838 	}
    839 	return sd, err
    840 }
    841 
    842 // QueryRow is a convenience wrapper over Query. Any error that occurs while
    843 // querying is deferred until calling Scan on the returned Row. That Row will
    844 // error with ErrNoRows if no rows are returned.
    845 func (c *Conn) QueryRow(ctx context.Context, sql string, args ...any) Row {
    846 	rows, _ := c.Query(ctx, sql, args...)
    847 	return (*connRow)(rows.(*baseRows))
    848 }
    849 
    850 // SendBatch sends all queued queries to the server at once. All queries are run in an implicit transaction unless
    851 // explicit transaction control statements are executed. The returned BatchResults must be closed before the connection
    852 // is used again.
    853 func (c *Conn) SendBatch(ctx context.Context, b *Batch) (br BatchResults) {
    854 	if c.batchTracer != nil {
    855 		ctx = c.batchTracer.TraceBatchStart(ctx, c, TraceBatchStartData{Batch: b})
    856 		defer func() {
    857 			err := br.(interface{ earlyError() error }).earlyError()
    858 			if err != nil {
    859 				c.batchTracer.TraceBatchEnd(ctx, c, TraceBatchEndData{Err: err})
    860 			}
    861 		}()
    862 	}
    863 
    864 	if err := c.deallocateInvalidatedCachedStatements(ctx); err != nil {
    865 		return &batchResults{ctx: ctx, conn: c, err: err}
    866 	}
    867 
    868 	mode := c.config.DefaultQueryExecMode
    869 
    870 	for _, bi := range b.queuedQueries {
    871 		var queryRewriter QueryRewriter
    872 		sql := bi.query
    873 		arguments := bi.arguments
    874 
    875 	optionLoop:
    876 		for len(arguments) > 0 {
    877 			switch arg := arguments[0].(type) {
    878 			case QueryRewriter:
    879 				queryRewriter = arg
    880 				arguments = arguments[1:]
    881 			default:
    882 				break optionLoop
    883 			}
    884 		}
    885 
    886 		if queryRewriter != nil {
    887 			var err error
    888 			sql, arguments, err = queryRewriter.RewriteQuery(ctx, c, sql, arguments)
    889 			if err != nil {
    890 				return &batchResults{ctx: ctx, conn: c, err: fmt.Errorf("rewrite query failed: %v", err)}
    891 			}
    892 		}
    893 
    894 		bi.query = sql
    895 		bi.arguments = arguments
    896 	}
    897 
    898 	if mode == QueryExecModeSimpleProtocol {
    899 		return c.sendBatchQueryExecModeSimpleProtocol(ctx, b)
    900 	}
    901 
    902 	// All other modes use extended protocol and thus can use prepared statements.
    903 	for _, bi := range b.queuedQueries {
    904 		if sd, ok := c.preparedStatements[bi.query]; ok {
    905 			bi.sd = sd
    906 		}
    907 	}
    908 
    909 	switch mode {
    910 	case QueryExecModeExec:
    911 		return c.sendBatchQueryExecModeExec(ctx, b)
    912 	case QueryExecModeCacheStatement:
    913 		return c.sendBatchQueryExecModeCacheStatement(ctx, b)
    914 	case QueryExecModeCacheDescribe:
    915 		return c.sendBatchQueryExecModeCacheDescribe(ctx, b)
    916 	case QueryExecModeDescribeExec:
    917 		return c.sendBatchQueryExecModeDescribeExec(ctx, b)
    918 	default:
    919 		panic("unknown QueryExecMode")
    920 	}
    921 }
    922 
    923 func (c *Conn) sendBatchQueryExecModeSimpleProtocol(ctx context.Context, b *Batch) *batchResults {
    924 	var sb strings.Builder
    925 	for i, bi := range b.queuedQueries {
    926 		if i > 0 {
    927 			sb.WriteByte(';')
    928 		}
    929 		sql, err := c.sanitizeForSimpleQuery(bi.query, bi.arguments...)
    930 		if err != nil {
    931 			return &batchResults{ctx: ctx, conn: c, err: err}
    932 		}
    933 		sb.WriteString(sql)
    934 	}
    935 	mrr := c.pgConn.Exec(ctx, sb.String())
    936 	return &batchResults{
    937 		ctx:   ctx,
    938 		conn:  c,
    939 		mrr:   mrr,
    940 		b:     b,
    941 		qqIdx: 0,
    942 	}
    943 }
    944 
    945 func (c *Conn) sendBatchQueryExecModeExec(ctx context.Context, b *Batch) *batchResults {
    946 	batch := &pgconn.Batch{}
    947 
    948 	for _, bi := range b.queuedQueries {
    949 		sd := bi.sd
    950 		if sd != nil {
    951 			err := c.eqb.Build(c.typeMap, sd, bi.arguments)
    952 			if err != nil {
    953 				return &batchResults{ctx: ctx, conn: c, err: err}
    954 			}
    955 
    956 			batch.ExecPrepared(sd.Name, c.eqb.ParamValues, c.eqb.ParamFormats, c.eqb.ResultFormats)
    957 		} else {
    958 			err := c.eqb.Build(c.typeMap, nil, bi.arguments)
    959 			if err != nil {
    960 				return &batchResults{ctx: ctx, conn: c, err: err}
    961 			}
    962 			batch.ExecParams(bi.query, c.eqb.ParamValues, nil, c.eqb.ParamFormats, c.eqb.ResultFormats)
    963 		}
    964 	}
    965 
    966 	c.eqb.reset() // Allow c.eqb internal memory to be GC'ed as soon as possible.
    967 
    968 	mrr := c.pgConn.ExecBatch(ctx, batch)
    969 
    970 	return &batchResults{
    971 		ctx:   ctx,
    972 		conn:  c,
    973 		mrr:   mrr,
    974 		b:     b,
    975 		qqIdx: 0,
    976 	}
    977 }
    978 
    979 func (c *Conn) sendBatchQueryExecModeCacheStatement(ctx context.Context, b *Batch) (pbr *pipelineBatchResults) {
    980 	if c.statementCache == nil {
    981 		return &pipelineBatchResults{ctx: ctx, conn: c, err: errDisabledStatementCache, closed: true}
    982 	}
    983 
    984 	distinctNewQueries := []*pgconn.StatementDescription{}
    985 	distinctNewQueriesIdxMap := make(map[string]int)
    986 
    987 	for _, bi := range b.queuedQueries {
    988 		if bi.sd == nil {
    989 			sd := c.statementCache.Get(bi.query)
    990 			if sd != nil {
    991 				bi.sd = sd
    992 			} else {
    993 				if idx, present := distinctNewQueriesIdxMap[bi.query]; present {
    994 					bi.sd = distinctNewQueries[idx]
    995 				} else {
    996 					sd = &pgconn.StatementDescription{
    997 						Name: stmtcache.NextStatementName(),
    998 						SQL:  bi.query,
    999 					}
   1000 					distinctNewQueriesIdxMap[sd.SQL] = len(distinctNewQueries)
   1001 					distinctNewQueries = append(distinctNewQueries, sd)
   1002 					bi.sd = sd
   1003 				}
   1004 			}
   1005 		}
   1006 	}
   1007 
   1008 	return c.sendBatchExtendedWithDescription(ctx, b, distinctNewQueries, c.statementCache)
   1009 }
   1010 
   1011 func (c *Conn) sendBatchQueryExecModeCacheDescribe(ctx context.Context, b *Batch) (pbr *pipelineBatchResults) {
   1012 	if c.descriptionCache == nil {
   1013 		return &pipelineBatchResults{ctx: ctx, conn: c, err: errDisabledDescriptionCache, closed: true}
   1014 	}
   1015 
   1016 	distinctNewQueries := []*pgconn.StatementDescription{}
   1017 	distinctNewQueriesIdxMap := make(map[string]int)
   1018 
   1019 	for _, bi := range b.queuedQueries {
   1020 		if bi.sd == nil {
   1021 			sd := c.descriptionCache.Get(bi.query)
   1022 			if sd != nil {
   1023 				bi.sd = sd
   1024 			} else {
   1025 				if idx, present := distinctNewQueriesIdxMap[bi.query]; present {
   1026 					bi.sd = distinctNewQueries[idx]
   1027 				} else {
   1028 					sd = &pgconn.StatementDescription{
   1029 						SQL: bi.query,
   1030 					}
   1031 					distinctNewQueriesIdxMap[sd.SQL] = len(distinctNewQueries)
   1032 					distinctNewQueries = append(distinctNewQueries, sd)
   1033 					bi.sd = sd
   1034 				}
   1035 			}
   1036 		}
   1037 	}
   1038 
   1039 	return c.sendBatchExtendedWithDescription(ctx, b, distinctNewQueries, c.descriptionCache)
   1040 }
   1041 
   1042 func (c *Conn) sendBatchQueryExecModeDescribeExec(ctx context.Context, b *Batch) (pbr *pipelineBatchResults) {
   1043 	distinctNewQueries := []*pgconn.StatementDescription{}
   1044 	distinctNewQueriesIdxMap := make(map[string]int)
   1045 
   1046 	for _, bi := range b.queuedQueries {
   1047 		if bi.sd == nil {
   1048 			if idx, present := distinctNewQueriesIdxMap[bi.query]; present {
   1049 				bi.sd = distinctNewQueries[idx]
   1050 			} else {
   1051 				sd := &pgconn.StatementDescription{
   1052 					SQL: bi.query,
   1053 				}
   1054 				distinctNewQueriesIdxMap[sd.SQL] = len(distinctNewQueries)
   1055 				distinctNewQueries = append(distinctNewQueries, sd)
   1056 				bi.sd = sd
   1057 			}
   1058 		}
   1059 	}
   1060 
   1061 	return c.sendBatchExtendedWithDescription(ctx, b, distinctNewQueries, nil)
   1062 }
   1063 
   1064 func (c *Conn) sendBatchExtendedWithDescription(ctx context.Context, b *Batch, distinctNewQueries []*pgconn.StatementDescription, sdCache stmtcache.Cache) (pbr *pipelineBatchResults) {
   1065 	pipeline := c.pgConn.StartPipeline(context.Background())
   1066 	defer func() {
   1067 		if pbr.err != nil {
   1068 			pipeline.Close()
   1069 		}
   1070 	}()
   1071 
   1072 	// Prepare any needed queries
   1073 	if len(distinctNewQueries) > 0 {
   1074 		for _, sd := range distinctNewQueries {
   1075 			pipeline.SendPrepare(sd.Name, sd.SQL, nil)
   1076 		}
   1077 
   1078 		err := pipeline.Sync()
   1079 		if err != nil {
   1080 			return &pipelineBatchResults{ctx: ctx, conn: c, err: err, closed: true}
   1081 		}
   1082 
   1083 		for _, sd := range distinctNewQueries {
   1084 			results, err := pipeline.GetResults()
   1085 			if err != nil {
   1086 				return &pipelineBatchResults{ctx: ctx, conn: c, err: err, closed: true}
   1087 			}
   1088 
   1089 			resultSD, ok := results.(*pgconn.StatementDescription)
   1090 			if !ok {
   1091 				return &pipelineBatchResults{ctx: ctx, conn: c, err: fmt.Errorf("expected statement description, got %T", results), closed: true}
   1092 			}
   1093 
   1094 			// Fill in the previously empty / pending statement descriptions.
   1095 			sd.ParamOIDs = resultSD.ParamOIDs
   1096 			sd.Fields = resultSD.Fields
   1097 		}
   1098 
   1099 		results, err := pipeline.GetResults()
   1100 		if err != nil {
   1101 			return &pipelineBatchResults{ctx: ctx, conn: c, err: err, closed: true}
   1102 		}
   1103 
   1104 		_, ok := results.(*pgconn.PipelineSync)
   1105 		if !ok {
   1106 			return &pipelineBatchResults{ctx: ctx, conn: c, err: fmt.Errorf("expected sync, got %T", results), closed: true}
   1107 		}
   1108 	}
   1109 
   1110 	// Put all statements into the cache. It's fine if it overflows because HandleInvalidated will clean them up later.
   1111 	if sdCache != nil {
   1112 		for _, sd := range distinctNewQueries {
   1113 			sdCache.Put(sd)
   1114 		}
   1115 	}
   1116 
   1117 	// Queue the queries.
   1118 	for _, bi := range b.queuedQueries {
   1119 		err := c.eqb.Build(c.typeMap, bi.sd, bi.arguments)
   1120 		if err != nil {
   1121 			// we wrap the error so we the user can understand which query failed inside the batch
   1122 			err = fmt.Errorf("error building query %s: %w", bi.query, err)
   1123 			return &pipelineBatchResults{ctx: ctx, conn: c, err: err, closed: true}
   1124 		}
   1125 
   1126 		if bi.sd.Name == "" {
   1127 			pipeline.SendQueryParams(bi.sd.SQL, c.eqb.ParamValues, bi.sd.ParamOIDs, c.eqb.ParamFormats, c.eqb.ResultFormats)
   1128 		} else {
   1129 			pipeline.SendQueryPrepared(bi.sd.Name, c.eqb.ParamValues, c.eqb.ParamFormats, c.eqb.ResultFormats)
   1130 		}
   1131 	}
   1132 
   1133 	err := pipeline.Sync()
   1134 	if err != nil {
   1135 		return &pipelineBatchResults{ctx: ctx, conn: c, err: err, closed: true}
   1136 	}
   1137 
   1138 	return &pipelineBatchResults{
   1139 		ctx:      ctx,
   1140 		conn:     c,
   1141 		pipeline: pipeline,
   1142 		b:        b,
   1143 	}
   1144 }
   1145 
   1146 func (c *Conn) sanitizeForSimpleQuery(sql string, args ...any) (string, error) {
   1147 	if c.pgConn.ParameterStatus("standard_conforming_strings") != "on" {
   1148 		return "", errors.New("simple protocol queries must be run with standard_conforming_strings=on")
   1149 	}
   1150 
   1151 	if c.pgConn.ParameterStatus("client_encoding") != "UTF8" {
   1152 		return "", errors.New("simple protocol queries must be run with client_encoding=UTF8")
   1153 	}
   1154 
   1155 	var err error
   1156 	valueArgs := make([]any, len(args))
   1157 	for i, a := range args {
   1158 		valueArgs[i], err = convertSimpleArgument(c.typeMap, a)
   1159 		if err != nil {
   1160 			return "", err
   1161 		}
   1162 	}
   1163 
   1164 	return sanitize.SanitizeSQL(sql, valueArgs...)
   1165 }
   1166 
   1167 // LoadType inspects the database for typeName and produces a pgtype.Type suitable for registration.
   1168 func (c *Conn) LoadType(ctx context.Context, typeName string) (*pgtype.Type, error) {
   1169 	var oid uint32
   1170 
   1171 	err := c.QueryRow(ctx, "select $1::text::regtype::oid;", typeName).Scan(&oid)
   1172 	if err != nil {
   1173 		return nil, err
   1174 	}
   1175 
   1176 	var typtype string
   1177 	var typbasetype uint32
   1178 
   1179 	err = c.QueryRow(ctx, "select typtype::text, typbasetype from pg_type where oid=$1", oid).Scan(&typtype, &typbasetype)
   1180 	if err != nil {
   1181 		return nil, err
   1182 	}
   1183 
   1184 	switch typtype {
   1185 	case "b": // array
   1186 		elementOID, err := c.getArrayElementOID(ctx, oid)
   1187 		if err != nil {
   1188 			return nil, err
   1189 		}
   1190 
   1191 		dt, ok := c.TypeMap().TypeForOID(elementOID)
   1192 		if !ok {
   1193 			return nil, errors.New("array element OID not registered")
   1194 		}
   1195 
   1196 		return &pgtype.Type{Name: typeName, OID: oid, Codec: &pgtype.ArrayCodec{ElementType: dt}}, nil
   1197 	case "c": // composite
   1198 		fields, err := c.getCompositeFields(ctx, oid)
   1199 		if err != nil {
   1200 			return nil, err
   1201 		}
   1202 
   1203 		return &pgtype.Type{Name: typeName, OID: oid, Codec: &pgtype.CompositeCodec{Fields: fields}}, nil
   1204 	case "d": // domain
   1205 		dt, ok := c.TypeMap().TypeForOID(typbasetype)
   1206 		if !ok {
   1207 			return nil, errors.New("domain base type OID not registered")
   1208 		}
   1209 
   1210 		return &pgtype.Type{Name: typeName, OID: oid, Codec: dt.Codec}, nil
   1211 	case "e": // enum
   1212 		return &pgtype.Type{Name: typeName, OID: oid, Codec: &pgtype.EnumCodec{}}, nil
   1213 	case "r": // range
   1214 		elementOID, err := c.getRangeElementOID(ctx, oid)
   1215 		if err != nil {
   1216 			return nil, err
   1217 		}
   1218 
   1219 		dt, ok := c.TypeMap().TypeForOID(elementOID)
   1220 		if !ok {
   1221 			return nil, errors.New("range element OID not registered")
   1222 		}
   1223 
   1224 		return &pgtype.Type{Name: typeName, OID: oid, Codec: &pgtype.RangeCodec{ElementType: dt}}, nil
   1225 	case "m": // multirange
   1226 		elementOID, err := c.getMultiRangeElementOID(ctx, oid)
   1227 		if err != nil {
   1228 			return nil, err
   1229 		}
   1230 
   1231 		dt, ok := c.TypeMap().TypeForOID(elementOID)
   1232 		if !ok {
   1233 			return nil, errors.New("multirange element OID not registered")
   1234 		}
   1235 
   1236 		return &pgtype.Type{Name: typeName, OID: oid, Codec: &pgtype.MultirangeCodec{ElementType: dt}}, nil
   1237 	default:
   1238 		return &pgtype.Type{}, errors.New("unknown typtype")
   1239 	}
   1240 }
   1241 
   1242 func (c *Conn) getArrayElementOID(ctx context.Context, oid uint32) (uint32, error) {
   1243 	var typelem uint32
   1244 
   1245 	err := c.QueryRow(ctx, "select typelem from pg_type where oid=$1", oid).Scan(&typelem)
   1246 	if err != nil {
   1247 		return 0, err
   1248 	}
   1249 
   1250 	return typelem, nil
   1251 }
   1252 
   1253 func (c *Conn) getRangeElementOID(ctx context.Context, oid uint32) (uint32, error) {
   1254 	var typelem uint32
   1255 
   1256 	err := c.QueryRow(ctx, "select rngsubtype from pg_range where rngtypid=$1", oid).Scan(&typelem)
   1257 	if err != nil {
   1258 		return 0, err
   1259 	}
   1260 
   1261 	return typelem, nil
   1262 }
   1263 
   1264 func (c *Conn) getMultiRangeElementOID(ctx context.Context, oid uint32) (uint32, error) {
   1265 	var typelem uint32
   1266 
   1267 	err := c.QueryRow(ctx, "select rngtypid from pg_range where rngmultitypid=$1", oid).Scan(&typelem)
   1268 	if err != nil {
   1269 		return 0, err
   1270 	}
   1271 
   1272 	return typelem, nil
   1273 }
   1274 
   1275 func (c *Conn) getCompositeFields(ctx context.Context, oid uint32) ([]pgtype.CompositeCodecField, error) {
   1276 	var typrelid uint32
   1277 
   1278 	err := c.QueryRow(ctx, "select typrelid from pg_type where oid=$1", oid).Scan(&typrelid)
   1279 	if err != nil {
   1280 		return nil, err
   1281 	}
   1282 
   1283 	var fields []pgtype.CompositeCodecField
   1284 	var fieldName string
   1285 	var fieldOID uint32
   1286 	rows, _ := c.Query(ctx, `select attname, atttypid
   1287 from pg_attribute
   1288 where attrelid=$1
   1289 	and not attisdropped
   1290 	and attnum > 0
   1291 order by attnum`,
   1292 		typrelid,
   1293 	)
   1294 	_, err = ForEachRow(rows, []any{&fieldName, &fieldOID}, func() error {
   1295 		dt, ok := c.TypeMap().TypeForOID(fieldOID)
   1296 		if !ok {
   1297 			return fmt.Errorf("unknown composite type field OID: %v", fieldOID)
   1298 		}
   1299 		fields = append(fields, pgtype.CompositeCodecField{Name: fieldName, Type: dt})
   1300 		return nil
   1301 	})
   1302 	if err != nil {
   1303 		return nil, err
   1304 	}
   1305 
   1306 	return fields, nil
   1307 }
   1308 
   1309 func (c *Conn) deallocateInvalidatedCachedStatements(ctx context.Context) error {
   1310 	if c.pgConn.TxStatus() != 'I' {
   1311 		return nil
   1312 	}
   1313 
   1314 	if c.descriptionCache != nil {
   1315 		c.descriptionCache.HandleInvalidated()
   1316 	}
   1317 
   1318 	var invalidatedStatements []*pgconn.StatementDescription
   1319 	if c.statementCache != nil {
   1320 		invalidatedStatements = c.statementCache.HandleInvalidated()
   1321 	}
   1322 
   1323 	if len(invalidatedStatements) == 0 {
   1324 		return nil
   1325 	}
   1326 
   1327 	pipeline := c.pgConn.StartPipeline(ctx)
   1328 	defer pipeline.Close()
   1329 
   1330 	for _, sd := range invalidatedStatements {
   1331 		pipeline.SendDeallocate(sd.Name)
   1332 		delete(c.preparedStatements, sd.Name)
   1333 	}
   1334 
   1335 	err := pipeline.Sync()
   1336 	if err != nil {
   1337 		return fmt.Errorf("failed to deallocate cached statement(s): %w", err)
   1338 	}
   1339 
   1340 	err = pipeline.Close()
   1341 	if err != nil {
   1342 		return fmt.Errorf("failed to deallocate cached statement(s): %w", err)
   1343 	}
   1344 
   1345 	return nil
   1346 }