conn.go (43111B)
1 package pgx 2 3 import ( 4 "context" 5 "errors" 6 "fmt" 7 "strconv" 8 "strings" 9 "time" 10 11 "github.com/jackc/pgx/v5/internal/anynil" 12 "github.com/jackc/pgx/v5/internal/sanitize" 13 "github.com/jackc/pgx/v5/internal/stmtcache" 14 "github.com/jackc/pgx/v5/pgconn" 15 "github.com/jackc/pgx/v5/pgtype" 16 ) 17 18 // ConnConfig contains all the options used to establish a connection. It must be created by ParseConfig and 19 // then it can be modified. A manually initialized ConnConfig will cause ConnectConfig to panic. 20 type ConnConfig struct { 21 pgconn.Config 22 23 Tracer QueryTracer 24 25 // Original connection string that was parsed into config. 26 connString string 27 28 // StatementCacheCapacity is maximum size of the statement cache used when executing a query with "cache_statement" 29 // query exec mode. 30 StatementCacheCapacity int 31 32 // DescriptionCacheCapacity is the maximum size of the description cache used when executing a query with 33 // "cache_describe" query exec mode. 34 DescriptionCacheCapacity int 35 36 // DefaultQueryExecMode controls the default mode for executing queries. By default pgx uses the extended protocol 37 // and automatically prepares and caches prepared statements. However, this may be incompatible with proxies such as 38 // PGBouncer. In this case it may be preferrable to use QueryExecModeExec or QueryExecModeSimpleProtocol. The same 39 // functionality can be controlled on a per query basis by passing a QueryExecMode as the first query argument. 40 DefaultQueryExecMode QueryExecMode 41 42 createdByParseConfig bool // Used to enforce created by ParseConfig rule. 43 } 44 45 // ParseConfigOptions contains options that control how a config is built such as getsslpassword. 46 type ParseConfigOptions struct { 47 pgconn.ParseConfigOptions 48 } 49 50 // Copy returns a deep copy of the config that is safe to use and modify. 51 // The only exception is the tls.Config: 52 // according to the tls.Config docs it must not be modified after creation. 53 func (cc *ConnConfig) Copy() *ConnConfig { 54 newConfig := new(ConnConfig) 55 *newConfig = *cc 56 newConfig.Config = *newConfig.Config.Copy() 57 return newConfig 58 } 59 60 // ConnString returns the connection string as parsed by pgx.ParseConfig into pgx.ConnConfig. 61 func (cc *ConnConfig) ConnString() string { return cc.connString } 62 63 // Conn is a PostgreSQL connection handle. It is not safe for concurrent usage. Use a connection pool to manage access 64 // to multiple database connections from multiple goroutines. 65 type Conn struct { 66 pgConn *pgconn.PgConn 67 config *ConnConfig // config used when establishing this connection 68 preparedStatements map[string]*pgconn.StatementDescription 69 statementCache stmtcache.Cache 70 descriptionCache stmtcache.Cache 71 72 queryTracer QueryTracer 73 batchTracer BatchTracer 74 copyFromTracer CopyFromTracer 75 prepareTracer PrepareTracer 76 77 notifications []*pgconn.Notification 78 79 doneChan chan struct{} 80 closedChan chan error 81 82 typeMap *pgtype.Map 83 84 wbuf []byte 85 eqb ExtendedQueryBuilder 86 } 87 88 // Identifier a PostgreSQL identifier or name. Identifiers can be composed of 89 // multiple parts such as ["schema", "table"] or ["table", "column"]. 90 type Identifier []string 91 92 // Sanitize returns a sanitized string safe for SQL interpolation. 93 func (ident Identifier) Sanitize() string { 94 parts := make([]string, len(ident)) 95 for i := range ident { 96 s := strings.ReplaceAll(ident[i], string([]byte{0}), "") 97 parts[i] = `"` + strings.ReplaceAll(s, `"`, `""`) + `"` 98 } 99 return strings.Join(parts, ".") 100 } 101 102 // ErrNoRows occurs when rows are expected but none are returned. 103 var ErrNoRows = errors.New("no rows in result set") 104 105 var errDisabledStatementCache = fmt.Errorf("cannot use QueryExecModeCacheStatement with disabled statement cache") 106 var errDisabledDescriptionCache = fmt.Errorf("cannot use QueryExecModeCacheDescribe with disabled description cache") 107 108 // Connect establishes a connection with a PostgreSQL server with a connection string. See 109 // pgconn.Connect for details. 110 func Connect(ctx context.Context, connString string) (*Conn, error) { 111 connConfig, err := ParseConfig(connString) 112 if err != nil { 113 return nil, err 114 } 115 return connect(ctx, connConfig) 116 } 117 118 // ConnectWithOptions behaves exactly like Connect with the addition of options. At the present options is only used to 119 // provide a GetSSLPassword function. 120 func ConnectWithOptions(ctx context.Context, connString string, options ParseConfigOptions) (*Conn, error) { 121 connConfig, err := ParseConfigWithOptions(connString, options) 122 if err != nil { 123 return nil, err 124 } 125 return connect(ctx, connConfig) 126 } 127 128 // ConnectConfig establishes a connection with a PostgreSQL server with a configuration struct. 129 // connConfig must have been created by ParseConfig. 130 func ConnectConfig(ctx context.Context, connConfig *ConnConfig) (*Conn, error) { 131 // In general this improves safety. In particular avoid the config.Config.OnNotification mutation from affecting other 132 // connections with the same config. See https://github.com/jackc/pgx/issues/618. 133 connConfig = connConfig.Copy() 134 135 return connect(ctx, connConfig) 136 } 137 138 // ParseConfigWithOptions behaves exactly as ParseConfig does with the addition of options. At the present options is 139 // only used to provide a GetSSLPassword function. 140 func ParseConfigWithOptions(connString string, options ParseConfigOptions) (*ConnConfig, error) { 141 config, err := pgconn.ParseConfigWithOptions(connString, options.ParseConfigOptions) 142 if err != nil { 143 return nil, err 144 } 145 146 statementCacheCapacity := 512 147 if s, ok := config.RuntimeParams["statement_cache_capacity"]; ok { 148 delete(config.RuntimeParams, "statement_cache_capacity") 149 n, err := strconv.ParseInt(s, 10, 32) 150 if err != nil { 151 return nil, fmt.Errorf("cannot parse statement_cache_capacity: %w", err) 152 } 153 statementCacheCapacity = int(n) 154 } 155 156 descriptionCacheCapacity := 512 157 if s, ok := config.RuntimeParams["description_cache_capacity"]; ok { 158 delete(config.RuntimeParams, "description_cache_capacity") 159 n, err := strconv.ParseInt(s, 10, 32) 160 if err != nil { 161 return nil, fmt.Errorf("cannot parse description_cache_capacity: %w", err) 162 } 163 descriptionCacheCapacity = int(n) 164 } 165 166 defaultQueryExecMode := QueryExecModeCacheStatement 167 if s, ok := config.RuntimeParams["default_query_exec_mode"]; ok { 168 delete(config.RuntimeParams, "default_query_exec_mode") 169 switch s { 170 case "cache_statement": 171 defaultQueryExecMode = QueryExecModeCacheStatement 172 case "cache_describe": 173 defaultQueryExecMode = QueryExecModeCacheDescribe 174 case "describe_exec": 175 defaultQueryExecMode = QueryExecModeDescribeExec 176 case "exec": 177 defaultQueryExecMode = QueryExecModeExec 178 case "simple_protocol": 179 defaultQueryExecMode = QueryExecModeSimpleProtocol 180 default: 181 return nil, fmt.Errorf("invalid default_query_exec_mode: %s", s) 182 } 183 } 184 185 connConfig := &ConnConfig{ 186 Config: *config, 187 createdByParseConfig: true, 188 StatementCacheCapacity: statementCacheCapacity, 189 DescriptionCacheCapacity: descriptionCacheCapacity, 190 DefaultQueryExecMode: defaultQueryExecMode, 191 connString: connString, 192 } 193 194 return connConfig, nil 195 } 196 197 // ParseConfig creates a ConnConfig from a connection string. ParseConfig handles all options that pgconn.ParseConfig 198 // does. In addition, it accepts the following options: 199 // 200 // - default_query_exec_mode. 201 // Possible values: "cache_statement", "cache_describe", "describe_exec", "exec", and "simple_protocol". See 202 // QueryExecMode constant documentation for the meaning of these values. Default: "cache_statement". 203 // 204 // - statement_cache_capacity. 205 // The maximum size of the statement cache used when executing a query with "cache_statement" query exec mode. 206 // Default: 512. 207 // 208 // - description_cache_capacity. 209 // The maximum size of the description cache used when executing a query with "cache_describe" query exec mode. 210 // Default: 512. 211 func ParseConfig(connString string) (*ConnConfig, error) { 212 return ParseConfigWithOptions(connString, ParseConfigOptions{}) 213 } 214 215 // connect connects to a database. connect takes ownership of config. The caller must not use or access it again. 216 func connect(ctx context.Context, config *ConnConfig) (c *Conn, err error) { 217 if connectTracer, ok := config.Tracer.(ConnectTracer); ok { 218 ctx = connectTracer.TraceConnectStart(ctx, TraceConnectStartData{ConnConfig: config}) 219 defer func() { 220 connectTracer.TraceConnectEnd(ctx, TraceConnectEndData{Conn: c, Err: err}) 221 }() 222 } 223 224 // Default values are set in ParseConfig. Enforce initial creation by ParseConfig rather than setting defaults from 225 // zero values. 226 if !config.createdByParseConfig { 227 panic("config must be created by ParseConfig") 228 } 229 230 c = &Conn{ 231 config: config, 232 typeMap: pgtype.NewMap(), 233 queryTracer: config.Tracer, 234 } 235 236 if t, ok := c.queryTracer.(BatchTracer); ok { 237 c.batchTracer = t 238 } 239 if t, ok := c.queryTracer.(CopyFromTracer); ok { 240 c.copyFromTracer = t 241 } 242 if t, ok := c.queryTracer.(PrepareTracer); ok { 243 c.prepareTracer = t 244 } 245 246 // Only install pgx notification system if no other callback handler is present. 247 if config.Config.OnNotification == nil { 248 config.Config.OnNotification = c.bufferNotifications 249 } 250 251 c.pgConn, err = pgconn.ConnectConfig(ctx, &config.Config) 252 if err != nil { 253 return nil, err 254 } 255 256 c.preparedStatements = make(map[string]*pgconn.StatementDescription) 257 c.doneChan = make(chan struct{}) 258 c.closedChan = make(chan error) 259 c.wbuf = make([]byte, 0, 1024) 260 261 if c.config.StatementCacheCapacity > 0 { 262 c.statementCache = stmtcache.NewLRUCache(c.config.StatementCacheCapacity) 263 } 264 265 if c.config.DescriptionCacheCapacity > 0 { 266 c.descriptionCache = stmtcache.NewLRUCache(c.config.DescriptionCacheCapacity) 267 } 268 269 return c, nil 270 } 271 272 // Close closes a connection. It is safe to call Close on a already closed 273 // connection. 274 func (c *Conn) Close(ctx context.Context) error { 275 if c.IsClosed() { 276 return nil 277 } 278 279 err := c.pgConn.Close(ctx) 280 return err 281 } 282 283 // Prepare creates a prepared statement with name and sql. sql can contain placeholders 284 // for bound parameters. These placeholders are referenced positional as $1, $2, etc. 285 // 286 // Prepare is idempotent; i.e. it is safe to call Prepare multiple times with the same 287 // name and sql arguments. This allows a code path to Prepare and Query/Exec without 288 // concern for if the statement has already been prepared. 289 func (c *Conn) Prepare(ctx context.Context, name, sql string) (sd *pgconn.StatementDescription, err error) { 290 if c.prepareTracer != nil { 291 ctx = c.prepareTracer.TracePrepareStart(ctx, c, TracePrepareStartData{Name: name, SQL: sql}) 292 } 293 294 if name != "" { 295 var ok bool 296 if sd, ok = c.preparedStatements[name]; ok && sd.SQL == sql { 297 if c.prepareTracer != nil { 298 c.prepareTracer.TracePrepareEnd(ctx, c, TracePrepareEndData{AlreadyPrepared: true}) 299 } 300 return sd, nil 301 } 302 } 303 304 if c.prepareTracer != nil { 305 defer func() { 306 c.prepareTracer.TracePrepareEnd(ctx, c, TracePrepareEndData{Err: err}) 307 }() 308 } 309 310 sd, err = c.pgConn.Prepare(ctx, name, sql, nil) 311 if err != nil { 312 return nil, err 313 } 314 315 if name != "" { 316 c.preparedStatements[name] = sd 317 } 318 319 return sd, nil 320 } 321 322 // Deallocate released a prepared statement 323 func (c *Conn) Deallocate(ctx context.Context, name string) error { 324 delete(c.preparedStatements, name) 325 _, err := c.pgConn.Exec(ctx, "deallocate "+quoteIdentifier(name)).ReadAll() 326 return err 327 } 328 329 // DeallocateAll releases all previously prepared statements from the server and client, where it also resets the statement and description cache. 330 func (c *Conn) DeallocateAll(ctx context.Context) error { 331 c.preparedStatements = map[string]*pgconn.StatementDescription{} 332 if c.config.StatementCacheCapacity > 0 { 333 c.statementCache = stmtcache.NewLRUCache(c.config.StatementCacheCapacity) 334 } 335 if c.config.DescriptionCacheCapacity > 0 { 336 c.descriptionCache = stmtcache.NewLRUCache(c.config.DescriptionCacheCapacity) 337 } 338 _, err := c.pgConn.Exec(ctx, "deallocate all").ReadAll() 339 return err 340 } 341 342 func (c *Conn) bufferNotifications(_ *pgconn.PgConn, n *pgconn.Notification) { 343 c.notifications = append(c.notifications, n) 344 } 345 346 // WaitForNotification waits for a PostgreSQL notification. It wraps the underlying pgconn notification system in a 347 // slightly more convenient form. 348 func (c *Conn) WaitForNotification(ctx context.Context) (*pgconn.Notification, error) { 349 var n *pgconn.Notification 350 351 // Return already received notification immediately 352 if len(c.notifications) > 0 { 353 n = c.notifications[0] 354 c.notifications = c.notifications[1:] 355 return n, nil 356 } 357 358 err := c.pgConn.WaitForNotification(ctx) 359 if len(c.notifications) > 0 { 360 n = c.notifications[0] 361 c.notifications = c.notifications[1:] 362 } 363 return n, err 364 } 365 366 // IsClosed reports if the connection has been closed. 367 func (c *Conn) IsClosed() bool { 368 return c.pgConn.IsClosed() 369 } 370 371 func (c *Conn) die(err error) { 372 if c.IsClosed() { 373 return 374 } 375 376 ctx, cancel := context.WithCancel(context.Background()) 377 cancel() // force immediate hard cancel 378 c.pgConn.Close(ctx) 379 } 380 381 func quoteIdentifier(s string) string { 382 return `"` + strings.ReplaceAll(s, `"`, `""`) + `"` 383 } 384 385 // Ping delegates to the underlying *pgconn.PgConn.Ping. 386 func (c *Conn) Ping(ctx context.Context) error { 387 return c.pgConn.Ping(ctx) 388 } 389 390 // PgConn returns the underlying *pgconn.PgConn. This is an escape hatch method that allows lower level access to the 391 // PostgreSQL connection than pgx exposes. 392 // 393 // It is strongly recommended that the connection be idle (no in-progress queries) before the underlying *pgconn.PgConn 394 // is used and the connection must be returned to the same state before any *pgx.Conn methods are again used. 395 func (c *Conn) PgConn() *pgconn.PgConn { return c.pgConn } 396 397 // TypeMap returns the connection info used for this connection. 398 func (c *Conn) TypeMap() *pgtype.Map { return c.typeMap } 399 400 // Config returns a copy of config that was used to establish this connection. 401 func (c *Conn) Config() *ConnConfig { return c.config.Copy() } 402 403 // Exec executes sql. sql can be either a prepared statement name or an SQL string. arguments should be referenced 404 // positionally from the sql string as $1, $2, etc. 405 func (c *Conn) Exec(ctx context.Context, sql string, arguments ...any) (pgconn.CommandTag, error) { 406 if c.queryTracer != nil { 407 ctx = c.queryTracer.TraceQueryStart(ctx, c, TraceQueryStartData{SQL: sql, Args: arguments}) 408 } 409 410 if err := c.deallocateInvalidatedCachedStatements(ctx); err != nil { 411 return pgconn.CommandTag{}, err 412 } 413 414 commandTag, err := c.exec(ctx, sql, arguments...) 415 416 if c.queryTracer != nil { 417 c.queryTracer.TraceQueryEnd(ctx, c, TraceQueryEndData{CommandTag: commandTag, Err: err}) 418 } 419 420 return commandTag, err 421 } 422 423 func (c *Conn) exec(ctx context.Context, sql string, arguments ...any) (commandTag pgconn.CommandTag, err error) { 424 mode := c.config.DefaultQueryExecMode 425 var queryRewriter QueryRewriter 426 427 optionLoop: 428 for len(arguments) > 0 { 429 switch arg := arguments[0].(type) { 430 case QueryExecMode: 431 mode = arg 432 arguments = arguments[1:] 433 case QueryRewriter: 434 queryRewriter = arg 435 arguments = arguments[1:] 436 default: 437 break optionLoop 438 } 439 } 440 441 if queryRewriter != nil { 442 sql, arguments, err = queryRewriter.RewriteQuery(ctx, c, sql, arguments) 443 if err != nil { 444 return pgconn.CommandTag{}, fmt.Errorf("rewrite query failed: %v", err) 445 } 446 } 447 448 // Always use simple protocol when there are no arguments. 449 if len(arguments) == 0 { 450 mode = QueryExecModeSimpleProtocol 451 } 452 453 if sd, ok := c.preparedStatements[sql]; ok { 454 return c.execPrepared(ctx, sd, arguments) 455 } 456 457 switch mode { 458 case QueryExecModeCacheStatement: 459 if c.statementCache == nil { 460 return pgconn.CommandTag{}, errDisabledStatementCache 461 } 462 sd := c.statementCache.Get(sql) 463 if sd == nil { 464 sd, err = c.Prepare(ctx, stmtcache.NextStatementName(), sql) 465 if err != nil { 466 return pgconn.CommandTag{}, err 467 } 468 c.statementCache.Put(sd) 469 } 470 471 return c.execPrepared(ctx, sd, arguments) 472 case QueryExecModeCacheDescribe: 473 if c.descriptionCache == nil { 474 return pgconn.CommandTag{}, errDisabledDescriptionCache 475 } 476 sd := c.descriptionCache.Get(sql) 477 if sd == nil { 478 sd, err = c.Prepare(ctx, "", sql) 479 if err != nil { 480 return pgconn.CommandTag{}, err 481 } 482 } 483 484 return c.execParams(ctx, sd, arguments) 485 case QueryExecModeDescribeExec: 486 sd, err := c.Prepare(ctx, "", sql) 487 if err != nil { 488 return pgconn.CommandTag{}, err 489 } 490 return c.execPrepared(ctx, sd, arguments) 491 case QueryExecModeExec: 492 return c.execSQLParams(ctx, sql, arguments) 493 case QueryExecModeSimpleProtocol: 494 return c.execSimpleProtocol(ctx, sql, arguments) 495 default: 496 return pgconn.CommandTag{}, fmt.Errorf("unknown QueryExecMode: %v", mode) 497 } 498 } 499 500 func (c *Conn) execSimpleProtocol(ctx context.Context, sql string, arguments []any) (commandTag pgconn.CommandTag, err error) { 501 if len(arguments) > 0 { 502 sql, err = c.sanitizeForSimpleQuery(sql, arguments...) 503 if err != nil { 504 return pgconn.CommandTag{}, err 505 } 506 } 507 508 mrr := c.pgConn.Exec(ctx, sql) 509 for mrr.NextResult() { 510 commandTag, err = mrr.ResultReader().Close() 511 } 512 err = mrr.Close() 513 return commandTag, err 514 } 515 516 func (c *Conn) execParams(ctx context.Context, sd *pgconn.StatementDescription, arguments []any) (pgconn.CommandTag, error) { 517 err := c.eqb.Build(c.typeMap, sd, arguments) 518 if err != nil { 519 return pgconn.CommandTag{}, err 520 } 521 522 result := c.pgConn.ExecParams(ctx, sd.SQL, c.eqb.ParamValues, sd.ParamOIDs, c.eqb.ParamFormats, c.eqb.ResultFormats).Read() 523 c.eqb.reset() // Allow c.eqb internal memory to be GC'ed as soon as possible. 524 return result.CommandTag, result.Err 525 } 526 527 func (c *Conn) execPrepared(ctx context.Context, sd *pgconn.StatementDescription, arguments []any) (pgconn.CommandTag, error) { 528 err := c.eqb.Build(c.typeMap, sd, arguments) 529 if err != nil { 530 return pgconn.CommandTag{}, err 531 } 532 533 result := c.pgConn.ExecPrepared(ctx, sd.Name, c.eqb.ParamValues, c.eqb.ParamFormats, c.eqb.ResultFormats).Read() 534 c.eqb.reset() // Allow c.eqb internal memory to be GC'ed as soon as possible. 535 return result.CommandTag, result.Err 536 } 537 538 type unknownArgumentTypeQueryExecModeExecError struct { 539 arg any 540 } 541 542 func (e *unknownArgumentTypeQueryExecModeExecError) Error() string { 543 return fmt.Sprintf("cannot use unregistered type %T as query argument in QueryExecModeExec", e.arg) 544 } 545 546 func (c *Conn) execSQLParams(ctx context.Context, sql string, args []any) (pgconn.CommandTag, error) { 547 err := c.eqb.Build(c.typeMap, nil, args) 548 if err != nil { 549 return pgconn.CommandTag{}, err 550 } 551 552 result := c.pgConn.ExecParams(ctx, sql, c.eqb.ParamValues, nil, c.eqb.ParamFormats, c.eqb.ResultFormats).Read() 553 c.eqb.reset() // Allow c.eqb internal memory to be GC'ed as soon as possible. 554 return result.CommandTag, result.Err 555 } 556 557 func (c *Conn) getRows(ctx context.Context, sql string, args []any) *baseRows { 558 r := &baseRows{} 559 560 r.ctx = ctx 561 r.queryTracer = c.queryTracer 562 r.typeMap = c.typeMap 563 r.startTime = time.Now() 564 r.sql = sql 565 r.args = args 566 r.conn = c 567 568 return r 569 } 570 571 type QueryExecMode int32 572 573 const ( 574 _ QueryExecMode = iota 575 576 // Automatically prepare and cache statements. This uses the extended protocol. Queries are executed in a single 577 // round trip after the statement is cached. This is the default. 578 QueryExecModeCacheStatement 579 580 // Cache statement descriptions (i.e. argument and result types) and assume they do not change. This uses the 581 // extended protocol. Queries are executed in a single round trip after the description is cached. If the database 582 // schema is modified or the search_path is changed this may result in undetected result decoding errors. 583 QueryExecModeCacheDescribe 584 585 // Get the statement description on every execution. This uses the extended protocol. Queries require two round trips 586 // to execute. It does not use named prepared statements. But it does use the unnamed prepared statement to get the 587 // statement description on the first round trip and then uses it to execute the query on the second round trip. This 588 // may cause problems with connection poolers that switch the underlying connection between round trips. It is safe 589 // even when the the database schema is modified concurrently. 590 QueryExecModeDescribeExec 591 592 // Assume the PostgreSQL query parameter types based on the Go type of the arguments. This uses the extended protocol 593 // with text formatted parameters and results. Queries are executed in a single round trip. Type mappings can be 594 // registered with pgtype.Map.RegisterDefaultPgType. Queries will be rejected that have arguments that are 595 // unregistered or ambigious. e.g. A map[string]string may have the PostgreSQL type json or hstore. Modes that know 596 // the PostgreSQL type can use a map[string]string directly as an argument. This mode cannot. 597 QueryExecModeExec 598 599 // Use the simple protocol. Assume the PostgreSQL query parameter types based on the Go type of the arguments. 600 // Queries are executed in a single round trip. Type mappings can be registered with 601 // pgtype.Map.RegisterDefaultPgType. Queries will be rejected that have arguments that are unregistered or ambigious. 602 // e.g. A map[string]string may have the PostgreSQL type json or hstore. Modes that know the PostgreSQL type can use 603 // a map[string]string directly as an argument. This mode cannot. 604 // 605 // QueryExecModeSimpleProtocol should have the user application visible behavior as QueryExecModeExec with minor 606 // exceptions such as behavior when multiple result returning queries are erroneously sent in a single string. 607 // 608 // QueryExecModeSimpleProtocol uses client side parameter interpolation. All values are quoted and escaped. Prefer 609 // QueryExecModeExec over QueryExecModeSimpleProtocol whenever possible. In general QueryExecModeSimpleProtocol 610 // should only be used if connecting to a proxy server, connection pool server, or non-PostgreSQL server that does 611 // not support the extended protocol. 612 QueryExecModeSimpleProtocol 613 ) 614 615 func (m QueryExecMode) String() string { 616 switch m { 617 case QueryExecModeCacheStatement: 618 return "cache statement" 619 case QueryExecModeCacheDescribe: 620 return "cache describe" 621 case QueryExecModeDescribeExec: 622 return "describe exec" 623 case QueryExecModeExec: 624 return "exec" 625 case QueryExecModeSimpleProtocol: 626 return "simple protocol" 627 default: 628 return "invalid" 629 } 630 } 631 632 // QueryResultFormats controls the result format (text=0, binary=1) of a query by result column position. 633 type QueryResultFormats []int16 634 635 // QueryResultFormatsByOID controls the result format (text=0, binary=1) of a query by the result column OID. 636 type QueryResultFormatsByOID map[uint32]int16 637 638 // QueryRewriter rewrites a query when used as the first arguments to a query method. 639 type QueryRewriter interface { 640 RewriteQuery(ctx context.Context, conn *Conn, sql string, args []any) (newSQL string, newArgs []any, err error) 641 } 642 643 // Query sends a query to the server and returns a Rows to read the results. Only errors encountered sending the query 644 // and initializing Rows will be returned. Err() on the returned Rows must be checked after the Rows is closed to 645 // determine if the query executed successfully. 646 // 647 // The returned Rows must be closed before the connection can be used again. It is safe to attempt to read from the 648 // returned Rows even if an error is returned. The error will be the available in rows.Err() after rows are closed. It 649 // is allowed to ignore the error returned from Query and handle it in Rows. 650 // 651 // It is possible for a call of FieldDescriptions on the returned Rows to return nil even if the Query call did not 652 // return an error. 653 // 654 // It is possible for a query to return one or more rows before encountering an error. In most cases the rows should be 655 // collected before processing rather than processed while receiving each row. This avoids the possibility of the 656 // application processing rows from a query that the server rejected. The CollectRows function is useful here. 657 // 658 // An implementor of QueryRewriter may be passed as the first element of args. It can rewrite the sql and change or 659 // replace args. For example, NamedArgs is QueryRewriter that implements named arguments. 660 // 661 // For extra control over how the query is executed, the types QueryExecMode, QueryResultFormats, and 662 // QueryResultFormatsByOID may be used as the first args to control exactly how the query is executed. This is rarely 663 // needed. See the documentation for those types for details. 664 func (c *Conn) Query(ctx context.Context, sql string, args ...any) (Rows, error) { 665 if c.queryTracer != nil { 666 ctx = c.queryTracer.TraceQueryStart(ctx, c, TraceQueryStartData{SQL: sql, Args: args}) 667 } 668 669 if err := c.deallocateInvalidatedCachedStatements(ctx); err != nil { 670 if c.queryTracer != nil { 671 c.queryTracer.TraceQueryEnd(ctx, c, TraceQueryEndData{Err: err}) 672 } 673 return &baseRows{err: err, closed: true}, err 674 } 675 676 var resultFormats QueryResultFormats 677 var resultFormatsByOID QueryResultFormatsByOID 678 mode := c.config.DefaultQueryExecMode 679 var queryRewriter QueryRewriter 680 681 optionLoop: 682 for len(args) > 0 { 683 switch arg := args[0].(type) { 684 case QueryResultFormats: 685 resultFormats = arg 686 args = args[1:] 687 case QueryResultFormatsByOID: 688 resultFormatsByOID = arg 689 args = args[1:] 690 case QueryExecMode: 691 mode = arg 692 args = args[1:] 693 case QueryRewriter: 694 queryRewriter = arg 695 args = args[1:] 696 default: 697 break optionLoop 698 } 699 } 700 701 if queryRewriter != nil { 702 var err error 703 originalSQL := sql 704 originalArgs := args 705 sql, args, err = queryRewriter.RewriteQuery(ctx, c, sql, args) 706 if err != nil { 707 rows := c.getRows(ctx, originalSQL, originalArgs) 708 err = fmt.Errorf("rewrite query failed: %v", err) 709 rows.fatal(err) 710 return rows, err 711 } 712 } 713 714 // Bypass any statement caching. 715 if sql == "" { 716 mode = QueryExecModeSimpleProtocol 717 } 718 719 c.eqb.reset() 720 anynil.NormalizeSlice(args) 721 rows := c.getRows(ctx, sql, args) 722 723 var err error 724 sd, explicitPreparedStatement := c.preparedStatements[sql] 725 if sd != nil || mode == QueryExecModeCacheStatement || mode == QueryExecModeCacheDescribe || mode == QueryExecModeDescribeExec { 726 if sd == nil { 727 sd, err = c.getStatementDescription(ctx, mode, sql) 728 if err != nil { 729 rows.fatal(err) 730 return rows, err 731 } 732 } 733 734 if len(sd.ParamOIDs) != len(args) { 735 rows.fatal(fmt.Errorf("expected %d arguments, got %d", len(sd.ParamOIDs), len(args))) 736 return rows, rows.err 737 } 738 739 rows.sql = sd.SQL 740 741 err = c.eqb.Build(c.typeMap, sd, args) 742 if err != nil { 743 rows.fatal(err) 744 return rows, rows.err 745 } 746 747 if resultFormatsByOID != nil { 748 resultFormats = make([]int16, len(sd.Fields)) 749 for i := range resultFormats { 750 resultFormats[i] = resultFormatsByOID[uint32(sd.Fields[i].DataTypeOID)] 751 } 752 } 753 754 if resultFormats == nil { 755 resultFormats = c.eqb.ResultFormats 756 } 757 758 if !explicitPreparedStatement && mode == QueryExecModeCacheDescribe { 759 rows.resultReader = c.pgConn.ExecParams(ctx, sql, c.eqb.ParamValues, sd.ParamOIDs, c.eqb.ParamFormats, resultFormats) 760 } else { 761 rows.resultReader = c.pgConn.ExecPrepared(ctx, sd.Name, c.eqb.ParamValues, c.eqb.ParamFormats, resultFormats) 762 } 763 } else if mode == QueryExecModeExec { 764 err := c.eqb.Build(c.typeMap, nil, args) 765 if err != nil { 766 rows.fatal(err) 767 return rows, rows.err 768 } 769 770 rows.resultReader = c.pgConn.ExecParams(ctx, sql, c.eqb.ParamValues, nil, c.eqb.ParamFormats, c.eqb.ResultFormats) 771 } else if mode == QueryExecModeSimpleProtocol { 772 sql, err = c.sanitizeForSimpleQuery(sql, args...) 773 if err != nil { 774 rows.fatal(err) 775 return rows, err 776 } 777 778 mrr := c.pgConn.Exec(ctx, sql) 779 if mrr.NextResult() { 780 rows.resultReader = mrr.ResultReader() 781 rows.multiResultReader = mrr 782 } else { 783 err = mrr.Close() 784 rows.fatal(err) 785 return rows, err 786 } 787 788 return rows, nil 789 } else { 790 err = fmt.Errorf("unknown QueryExecMode: %v", mode) 791 rows.fatal(err) 792 return rows, rows.err 793 } 794 795 c.eqb.reset() // Allow c.eqb internal memory to be GC'ed as soon as possible. 796 797 return rows, rows.err 798 } 799 800 // getStatementDescription returns the statement description of the sql query 801 // according to the given mode. 802 // 803 // If the mode is one that doesn't require to know the param and result OIDs 804 // then nil is returned without error. 805 func (c *Conn) getStatementDescription( 806 ctx context.Context, 807 mode QueryExecMode, 808 sql string, 809 ) (sd *pgconn.StatementDescription, err error) { 810 811 switch mode { 812 case QueryExecModeCacheStatement: 813 if c.statementCache == nil { 814 return nil, errDisabledStatementCache 815 } 816 sd = c.statementCache.Get(sql) 817 if sd == nil { 818 sd, err = c.Prepare(ctx, stmtcache.NextStatementName(), sql) 819 if err != nil { 820 return nil, err 821 } 822 c.statementCache.Put(sd) 823 } 824 case QueryExecModeCacheDescribe: 825 if c.descriptionCache == nil { 826 return nil, errDisabledDescriptionCache 827 } 828 sd = c.descriptionCache.Get(sql) 829 if sd == nil { 830 sd, err = c.Prepare(ctx, "", sql) 831 if err != nil { 832 return nil, err 833 } 834 c.descriptionCache.Put(sd) 835 } 836 case QueryExecModeDescribeExec: 837 return c.Prepare(ctx, "", sql) 838 } 839 return sd, err 840 } 841 842 // QueryRow is a convenience wrapper over Query. Any error that occurs while 843 // querying is deferred until calling Scan on the returned Row. That Row will 844 // error with ErrNoRows if no rows are returned. 845 func (c *Conn) QueryRow(ctx context.Context, sql string, args ...any) Row { 846 rows, _ := c.Query(ctx, sql, args...) 847 return (*connRow)(rows.(*baseRows)) 848 } 849 850 // SendBatch sends all queued queries to the server at once. All queries are run in an implicit transaction unless 851 // explicit transaction control statements are executed. The returned BatchResults must be closed before the connection 852 // is used again. 853 func (c *Conn) SendBatch(ctx context.Context, b *Batch) (br BatchResults) { 854 if c.batchTracer != nil { 855 ctx = c.batchTracer.TraceBatchStart(ctx, c, TraceBatchStartData{Batch: b}) 856 defer func() { 857 err := br.(interface{ earlyError() error }).earlyError() 858 if err != nil { 859 c.batchTracer.TraceBatchEnd(ctx, c, TraceBatchEndData{Err: err}) 860 } 861 }() 862 } 863 864 if err := c.deallocateInvalidatedCachedStatements(ctx); err != nil { 865 return &batchResults{ctx: ctx, conn: c, err: err} 866 } 867 868 mode := c.config.DefaultQueryExecMode 869 870 for _, bi := range b.queuedQueries { 871 var queryRewriter QueryRewriter 872 sql := bi.query 873 arguments := bi.arguments 874 875 optionLoop: 876 for len(arguments) > 0 { 877 switch arg := arguments[0].(type) { 878 case QueryRewriter: 879 queryRewriter = arg 880 arguments = arguments[1:] 881 default: 882 break optionLoop 883 } 884 } 885 886 if queryRewriter != nil { 887 var err error 888 sql, arguments, err = queryRewriter.RewriteQuery(ctx, c, sql, arguments) 889 if err != nil { 890 return &batchResults{ctx: ctx, conn: c, err: fmt.Errorf("rewrite query failed: %v", err)} 891 } 892 } 893 894 bi.query = sql 895 bi.arguments = arguments 896 } 897 898 if mode == QueryExecModeSimpleProtocol { 899 return c.sendBatchQueryExecModeSimpleProtocol(ctx, b) 900 } 901 902 // All other modes use extended protocol and thus can use prepared statements. 903 for _, bi := range b.queuedQueries { 904 if sd, ok := c.preparedStatements[bi.query]; ok { 905 bi.sd = sd 906 } 907 } 908 909 switch mode { 910 case QueryExecModeExec: 911 return c.sendBatchQueryExecModeExec(ctx, b) 912 case QueryExecModeCacheStatement: 913 return c.sendBatchQueryExecModeCacheStatement(ctx, b) 914 case QueryExecModeCacheDescribe: 915 return c.sendBatchQueryExecModeCacheDescribe(ctx, b) 916 case QueryExecModeDescribeExec: 917 return c.sendBatchQueryExecModeDescribeExec(ctx, b) 918 default: 919 panic("unknown QueryExecMode") 920 } 921 } 922 923 func (c *Conn) sendBatchQueryExecModeSimpleProtocol(ctx context.Context, b *Batch) *batchResults { 924 var sb strings.Builder 925 for i, bi := range b.queuedQueries { 926 if i > 0 { 927 sb.WriteByte(';') 928 } 929 sql, err := c.sanitizeForSimpleQuery(bi.query, bi.arguments...) 930 if err != nil { 931 return &batchResults{ctx: ctx, conn: c, err: err} 932 } 933 sb.WriteString(sql) 934 } 935 mrr := c.pgConn.Exec(ctx, sb.String()) 936 return &batchResults{ 937 ctx: ctx, 938 conn: c, 939 mrr: mrr, 940 b: b, 941 qqIdx: 0, 942 } 943 } 944 945 func (c *Conn) sendBatchQueryExecModeExec(ctx context.Context, b *Batch) *batchResults { 946 batch := &pgconn.Batch{} 947 948 for _, bi := range b.queuedQueries { 949 sd := bi.sd 950 if sd != nil { 951 err := c.eqb.Build(c.typeMap, sd, bi.arguments) 952 if err != nil { 953 return &batchResults{ctx: ctx, conn: c, err: err} 954 } 955 956 batch.ExecPrepared(sd.Name, c.eqb.ParamValues, c.eqb.ParamFormats, c.eqb.ResultFormats) 957 } else { 958 err := c.eqb.Build(c.typeMap, nil, bi.arguments) 959 if err != nil { 960 return &batchResults{ctx: ctx, conn: c, err: err} 961 } 962 batch.ExecParams(bi.query, c.eqb.ParamValues, nil, c.eqb.ParamFormats, c.eqb.ResultFormats) 963 } 964 } 965 966 c.eqb.reset() // Allow c.eqb internal memory to be GC'ed as soon as possible. 967 968 mrr := c.pgConn.ExecBatch(ctx, batch) 969 970 return &batchResults{ 971 ctx: ctx, 972 conn: c, 973 mrr: mrr, 974 b: b, 975 qqIdx: 0, 976 } 977 } 978 979 func (c *Conn) sendBatchQueryExecModeCacheStatement(ctx context.Context, b *Batch) (pbr *pipelineBatchResults) { 980 if c.statementCache == nil { 981 return &pipelineBatchResults{ctx: ctx, conn: c, err: errDisabledStatementCache, closed: true} 982 } 983 984 distinctNewQueries := []*pgconn.StatementDescription{} 985 distinctNewQueriesIdxMap := make(map[string]int) 986 987 for _, bi := range b.queuedQueries { 988 if bi.sd == nil { 989 sd := c.statementCache.Get(bi.query) 990 if sd != nil { 991 bi.sd = sd 992 } else { 993 if idx, present := distinctNewQueriesIdxMap[bi.query]; present { 994 bi.sd = distinctNewQueries[idx] 995 } else { 996 sd = &pgconn.StatementDescription{ 997 Name: stmtcache.NextStatementName(), 998 SQL: bi.query, 999 } 1000 distinctNewQueriesIdxMap[sd.SQL] = len(distinctNewQueries) 1001 distinctNewQueries = append(distinctNewQueries, sd) 1002 bi.sd = sd 1003 } 1004 } 1005 } 1006 } 1007 1008 return c.sendBatchExtendedWithDescription(ctx, b, distinctNewQueries, c.statementCache) 1009 } 1010 1011 func (c *Conn) sendBatchQueryExecModeCacheDescribe(ctx context.Context, b *Batch) (pbr *pipelineBatchResults) { 1012 if c.descriptionCache == nil { 1013 return &pipelineBatchResults{ctx: ctx, conn: c, err: errDisabledDescriptionCache, closed: true} 1014 } 1015 1016 distinctNewQueries := []*pgconn.StatementDescription{} 1017 distinctNewQueriesIdxMap := make(map[string]int) 1018 1019 for _, bi := range b.queuedQueries { 1020 if bi.sd == nil { 1021 sd := c.descriptionCache.Get(bi.query) 1022 if sd != nil { 1023 bi.sd = sd 1024 } else { 1025 if idx, present := distinctNewQueriesIdxMap[bi.query]; present { 1026 bi.sd = distinctNewQueries[idx] 1027 } else { 1028 sd = &pgconn.StatementDescription{ 1029 SQL: bi.query, 1030 } 1031 distinctNewQueriesIdxMap[sd.SQL] = len(distinctNewQueries) 1032 distinctNewQueries = append(distinctNewQueries, sd) 1033 bi.sd = sd 1034 } 1035 } 1036 } 1037 } 1038 1039 return c.sendBatchExtendedWithDescription(ctx, b, distinctNewQueries, c.descriptionCache) 1040 } 1041 1042 func (c *Conn) sendBatchQueryExecModeDescribeExec(ctx context.Context, b *Batch) (pbr *pipelineBatchResults) { 1043 distinctNewQueries := []*pgconn.StatementDescription{} 1044 distinctNewQueriesIdxMap := make(map[string]int) 1045 1046 for _, bi := range b.queuedQueries { 1047 if bi.sd == nil { 1048 if idx, present := distinctNewQueriesIdxMap[bi.query]; present { 1049 bi.sd = distinctNewQueries[idx] 1050 } else { 1051 sd := &pgconn.StatementDescription{ 1052 SQL: bi.query, 1053 } 1054 distinctNewQueriesIdxMap[sd.SQL] = len(distinctNewQueries) 1055 distinctNewQueries = append(distinctNewQueries, sd) 1056 bi.sd = sd 1057 } 1058 } 1059 } 1060 1061 return c.sendBatchExtendedWithDescription(ctx, b, distinctNewQueries, nil) 1062 } 1063 1064 func (c *Conn) sendBatchExtendedWithDescription(ctx context.Context, b *Batch, distinctNewQueries []*pgconn.StatementDescription, sdCache stmtcache.Cache) (pbr *pipelineBatchResults) { 1065 pipeline := c.pgConn.StartPipeline(context.Background()) 1066 defer func() { 1067 if pbr.err != nil { 1068 pipeline.Close() 1069 } 1070 }() 1071 1072 // Prepare any needed queries 1073 if len(distinctNewQueries) > 0 { 1074 for _, sd := range distinctNewQueries { 1075 pipeline.SendPrepare(sd.Name, sd.SQL, nil) 1076 } 1077 1078 err := pipeline.Sync() 1079 if err != nil { 1080 return &pipelineBatchResults{ctx: ctx, conn: c, err: err, closed: true} 1081 } 1082 1083 for _, sd := range distinctNewQueries { 1084 results, err := pipeline.GetResults() 1085 if err != nil { 1086 return &pipelineBatchResults{ctx: ctx, conn: c, err: err, closed: true} 1087 } 1088 1089 resultSD, ok := results.(*pgconn.StatementDescription) 1090 if !ok { 1091 return &pipelineBatchResults{ctx: ctx, conn: c, err: fmt.Errorf("expected statement description, got %T", results), closed: true} 1092 } 1093 1094 // Fill in the previously empty / pending statement descriptions. 1095 sd.ParamOIDs = resultSD.ParamOIDs 1096 sd.Fields = resultSD.Fields 1097 } 1098 1099 results, err := pipeline.GetResults() 1100 if err != nil { 1101 return &pipelineBatchResults{ctx: ctx, conn: c, err: err, closed: true} 1102 } 1103 1104 _, ok := results.(*pgconn.PipelineSync) 1105 if !ok { 1106 return &pipelineBatchResults{ctx: ctx, conn: c, err: fmt.Errorf("expected sync, got %T", results), closed: true} 1107 } 1108 } 1109 1110 // Put all statements into the cache. It's fine if it overflows because HandleInvalidated will clean them up later. 1111 if sdCache != nil { 1112 for _, sd := range distinctNewQueries { 1113 sdCache.Put(sd) 1114 } 1115 } 1116 1117 // Queue the queries. 1118 for _, bi := range b.queuedQueries { 1119 err := c.eqb.Build(c.typeMap, bi.sd, bi.arguments) 1120 if err != nil { 1121 // we wrap the error so we the user can understand which query failed inside the batch 1122 err = fmt.Errorf("error building query %s: %w", bi.query, err) 1123 return &pipelineBatchResults{ctx: ctx, conn: c, err: err, closed: true} 1124 } 1125 1126 if bi.sd.Name == "" { 1127 pipeline.SendQueryParams(bi.sd.SQL, c.eqb.ParamValues, bi.sd.ParamOIDs, c.eqb.ParamFormats, c.eqb.ResultFormats) 1128 } else { 1129 pipeline.SendQueryPrepared(bi.sd.Name, c.eqb.ParamValues, c.eqb.ParamFormats, c.eqb.ResultFormats) 1130 } 1131 } 1132 1133 err := pipeline.Sync() 1134 if err != nil { 1135 return &pipelineBatchResults{ctx: ctx, conn: c, err: err, closed: true} 1136 } 1137 1138 return &pipelineBatchResults{ 1139 ctx: ctx, 1140 conn: c, 1141 pipeline: pipeline, 1142 b: b, 1143 } 1144 } 1145 1146 func (c *Conn) sanitizeForSimpleQuery(sql string, args ...any) (string, error) { 1147 if c.pgConn.ParameterStatus("standard_conforming_strings") != "on" { 1148 return "", errors.New("simple protocol queries must be run with standard_conforming_strings=on") 1149 } 1150 1151 if c.pgConn.ParameterStatus("client_encoding") != "UTF8" { 1152 return "", errors.New("simple protocol queries must be run with client_encoding=UTF8") 1153 } 1154 1155 var err error 1156 valueArgs := make([]any, len(args)) 1157 for i, a := range args { 1158 valueArgs[i], err = convertSimpleArgument(c.typeMap, a) 1159 if err != nil { 1160 return "", err 1161 } 1162 } 1163 1164 return sanitize.SanitizeSQL(sql, valueArgs...) 1165 } 1166 1167 // LoadType inspects the database for typeName and produces a pgtype.Type suitable for registration. 1168 func (c *Conn) LoadType(ctx context.Context, typeName string) (*pgtype.Type, error) { 1169 var oid uint32 1170 1171 err := c.QueryRow(ctx, "select $1::text::regtype::oid;", typeName).Scan(&oid) 1172 if err != nil { 1173 return nil, err 1174 } 1175 1176 var typtype string 1177 var typbasetype uint32 1178 1179 err = c.QueryRow(ctx, "select typtype::text, typbasetype from pg_type where oid=$1", oid).Scan(&typtype, &typbasetype) 1180 if err != nil { 1181 return nil, err 1182 } 1183 1184 switch typtype { 1185 case "b": // array 1186 elementOID, err := c.getArrayElementOID(ctx, oid) 1187 if err != nil { 1188 return nil, err 1189 } 1190 1191 dt, ok := c.TypeMap().TypeForOID(elementOID) 1192 if !ok { 1193 return nil, errors.New("array element OID not registered") 1194 } 1195 1196 return &pgtype.Type{Name: typeName, OID: oid, Codec: &pgtype.ArrayCodec{ElementType: dt}}, nil 1197 case "c": // composite 1198 fields, err := c.getCompositeFields(ctx, oid) 1199 if err != nil { 1200 return nil, err 1201 } 1202 1203 return &pgtype.Type{Name: typeName, OID: oid, Codec: &pgtype.CompositeCodec{Fields: fields}}, nil 1204 case "d": // domain 1205 dt, ok := c.TypeMap().TypeForOID(typbasetype) 1206 if !ok { 1207 return nil, errors.New("domain base type OID not registered") 1208 } 1209 1210 return &pgtype.Type{Name: typeName, OID: oid, Codec: dt.Codec}, nil 1211 case "e": // enum 1212 return &pgtype.Type{Name: typeName, OID: oid, Codec: &pgtype.EnumCodec{}}, nil 1213 case "r": // range 1214 elementOID, err := c.getRangeElementOID(ctx, oid) 1215 if err != nil { 1216 return nil, err 1217 } 1218 1219 dt, ok := c.TypeMap().TypeForOID(elementOID) 1220 if !ok { 1221 return nil, errors.New("range element OID not registered") 1222 } 1223 1224 return &pgtype.Type{Name: typeName, OID: oid, Codec: &pgtype.RangeCodec{ElementType: dt}}, nil 1225 case "m": // multirange 1226 elementOID, err := c.getMultiRangeElementOID(ctx, oid) 1227 if err != nil { 1228 return nil, err 1229 } 1230 1231 dt, ok := c.TypeMap().TypeForOID(elementOID) 1232 if !ok { 1233 return nil, errors.New("multirange element OID not registered") 1234 } 1235 1236 return &pgtype.Type{Name: typeName, OID: oid, Codec: &pgtype.MultirangeCodec{ElementType: dt}}, nil 1237 default: 1238 return &pgtype.Type{}, errors.New("unknown typtype") 1239 } 1240 } 1241 1242 func (c *Conn) getArrayElementOID(ctx context.Context, oid uint32) (uint32, error) { 1243 var typelem uint32 1244 1245 err := c.QueryRow(ctx, "select typelem from pg_type where oid=$1", oid).Scan(&typelem) 1246 if err != nil { 1247 return 0, err 1248 } 1249 1250 return typelem, nil 1251 } 1252 1253 func (c *Conn) getRangeElementOID(ctx context.Context, oid uint32) (uint32, error) { 1254 var typelem uint32 1255 1256 err := c.QueryRow(ctx, "select rngsubtype from pg_range where rngtypid=$1", oid).Scan(&typelem) 1257 if err != nil { 1258 return 0, err 1259 } 1260 1261 return typelem, nil 1262 } 1263 1264 func (c *Conn) getMultiRangeElementOID(ctx context.Context, oid uint32) (uint32, error) { 1265 var typelem uint32 1266 1267 err := c.QueryRow(ctx, "select rngtypid from pg_range where rngmultitypid=$1", oid).Scan(&typelem) 1268 if err != nil { 1269 return 0, err 1270 } 1271 1272 return typelem, nil 1273 } 1274 1275 func (c *Conn) getCompositeFields(ctx context.Context, oid uint32) ([]pgtype.CompositeCodecField, error) { 1276 var typrelid uint32 1277 1278 err := c.QueryRow(ctx, "select typrelid from pg_type where oid=$1", oid).Scan(&typrelid) 1279 if err != nil { 1280 return nil, err 1281 } 1282 1283 var fields []pgtype.CompositeCodecField 1284 var fieldName string 1285 var fieldOID uint32 1286 rows, _ := c.Query(ctx, `select attname, atttypid 1287 from pg_attribute 1288 where attrelid=$1 1289 and not attisdropped 1290 and attnum > 0 1291 order by attnum`, 1292 typrelid, 1293 ) 1294 _, err = ForEachRow(rows, []any{&fieldName, &fieldOID}, func() error { 1295 dt, ok := c.TypeMap().TypeForOID(fieldOID) 1296 if !ok { 1297 return fmt.Errorf("unknown composite type field OID: %v", fieldOID) 1298 } 1299 fields = append(fields, pgtype.CompositeCodecField{Name: fieldName, Type: dt}) 1300 return nil 1301 }) 1302 if err != nil { 1303 return nil, err 1304 } 1305 1306 return fields, nil 1307 } 1308 1309 func (c *Conn) deallocateInvalidatedCachedStatements(ctx context.Context) error { 1310 if c.pgConn.TxStatus() != 'I' { 1311 return nil 1312 } 1313 1314 if c.descriptionCache != nil { 1315 c.descriptionCache.HandleInvalidated() 1316 } 1317 1318 var invalidatedStatements []*pgconn.StatementDescription 1319 if c.statementCache != nil { 1320 invalidatedStatements = c.statementCache.HandleInvalidated() 1321 } 1322 1323 if len(invalidatedStatements) == 0 { 1324 return nil 1325 } 1326 1327 pipeline := c.pgConn.StartPipeline(ctx) 1328 defer pipeline.Close() 1329 1330 for _, sd := range invalidatedStatements { 1331 pipeline.SendDeallocate(sd.Name) 1332 delete(c.preparedStatements, sd.Name) 1333 } 1334 1335 err := pipeline.Sync() 1336 if err != nil { 1337 return fmt.Errorf("failed to deallocate cached statement(s): %w", err) 1338 } 1339 1340 err = pipeline.Close() 1341 if err != nil { 1342 return fmt.Errorf("failed to deallocate cached statement(s): %w", err) 1343 } 1344 1345 return nil 1346 }