pgconn.go (65556B)
1 package pgconn 2 3 import ( 4 "context" 5 "crypto/md5" 6 "crypto/tls" 7 "encoding/binary" 8 "encoding/hex" 9 "errors" 10 "fmt" 11 "io" 12 "math" 13 "net" 14 "strconv" 15 "strings" 16 "sync" 17 "time" 18 19 "github.com/jackc/pgx/v5/internal/iobufpool" 20 "github.com/jackc/pgx/v5/internal/pgio" 21 "github.com/jackc/pgx/v5/pgconn/internal/bgreader" 22 "github.com/jackc/pgx/v5/pgconn/internal/ctxwatch" 23 "github.com/jackc/pgx/v5/pgproto3" 24 ) 25 26 const ( 27 connStatusUninitialized = iota 28 connStatusConnecting 29 connStatusClosed 30 connStatusIdle 31 connStatusBusy 32 ) 33 34 // Notice represents a notice response message reported by the PostgreSQL server. Be aware that this is distinct from 35 // LISTEN/NOTIFY notification. 36 type Notice PgError 37 38 // Notification is a message received from the PostgreSQL LISTEN/NOTIFY system 39 type Notification struct { 40 PID uint32 // backend pid that sent the notification 41 Channel string // channel from which notification was received 42 Payload string 43 } 44 45 // DialFunc is a function that can be used to connect to a PostgreSQL server. 46 type DialFunc func(ctx context.Context, network, addr string) (net.Conn, error) 47 48 // LookupFunc is a function that can be used to lookup IPs addrs from host. Optionally an ip:port combination can be 49 // returned in order to override the connection string's port. 50 type LookupFunc func(ctx context.Context, host string) (addrs []string, err error) 51 52 // BuildFrontendFunc is a function that can be used to create Frontend implementation for connection. 53 type BuildFrontendFunc func(r io.Reader, w io.Writer) *pgproto3.Frontend 54 55 // NoticeHandler is a function that can handle notices received from the PostgreSQL server. Notices can be received at 56 // any time, usually during handling of a query response. The *PgConn is provided so the handler is aware of the origin 57 // of the notice, but it must not invoke any query method. Be aware that this is distinct from LISTEN/NOTIFY 58 // notification. 59 type NoticeHandler func(*PgConn, *Notice) 60 61 // NotificationHandler is a function that can handle notifications received from the PostgreSQL server. Notifications 62 // can be received at any time, usually during handling of a query response. The *PgConn is provided so the handler is 63 // aware of the origin of the notice, but it must not invoke any query method. Be aware that this is distinct from a 64 // notice event. 65 type NotificationHandler func(*PgConn, *Notification) 66 67 // PgConn is a low-level PostgreSQL connection handle. It is not safe for concurrent usage. 68 type PgConn struct { 69 conn net.Conn 70 pid uint32 // backend pid 71 secretKey uint32 // key to use to send a cancel query message to the server 72 parameterStatuses map[string]string // parameters that have been reported by the server 73 txStatus byte 74 frontend *pgproto3.Frontend 75 bgReader *bgreader.BGReader 76 slowWriteTimer *time.Timer 77 78 config *Config 79 80 status byte // One of connStatus* constants 81 82 bufferingReceive bool 83 bufferingReceiveMux sync.Mutex 84 bufferingReceiveMsg pgproto3.BackendMessage 85 bufferingReceiveErr error 86 87 peekedMsg pgproto3.BackendMessage 88 89 // Reusable / preallocated resources 90 resultReader ResultReader 91 multiResultReader MultiResultReader 92 pipeline Pipeline 93 contextWatcher *ctxwatch.ContextWatcher 94 fieldDescriptions [16]FieldDescription 95 96 cleanupDone chan struct{} 97 } 98 99 // Connect establishes a connection to a PostgreSQL server using the environment and connString (in URL or DSN format) 100 // to provide configuration. See documentation for ParseConfig for details. ctx can be used to cancel a connect attempt. 101 func Connect(ctx context.Context, connString string) (*PgConn, error) { 102 config, err := ParseConfig(connString) 103 if err != nil { 104 return nil, err 105 } 106 107 return ConnectConfig(ctx, config) 108 } 109 110 // Connect establishes a connection to a PostgreSQL server using the environment and connString (in URL or DSN format) 111 // and ParseConfigOptions to provide additional configuration. See documentation for ParseConfig for details. ctx can be 112 // used to cancel a connect attempt. 113 func ConnectWithOptions(ctx context.Context, connString string, parseConfigOptions ParseConfigOptions) (*PgConn, error) { 114 config, err := ParseConfigWithOptions(connString, parseConfigOptions) 115 if err != nil { 116 return nil, err 117 } 118 119 return ConnectConfig(ctx, config) 120 } 121 122 // Connect establishes a connection to a PostgreSQL server using config. config must have been constructed with 123 // ParseConfig. ctx can be used to cancel a connect attempt. 124 // 125 // If config.Fallbacks are present they will sequentially be tried in case of error establishing network connection. An 126 // authentication error will terminate the chain of attempts (like libpq: 127 // https://www.postgresql.org/docs/11/libpq-connect.html#LIBPQ-MULTIPLE-HOSTS) and be returned as the error. Otherwise, 128 // if all attempts fail the last error is returned. 129 func ConnectConfig(octx context.Context, config *Config) (pgConn *PgConn, err error) { 130 // Default values are set in ParseConfig. Enforce initial creation by ParseConfig rather than setting defaults from 131 // zero values. 132 if !config.createdByParseConfig { 133 panic("config must be created by ParseConfig") 134 } 135 136 // Simplify usage by treating primary config and fallbacks the same. 137 fallbackConfigs := []*FallbackConfig{ 138 { 139 Host: config.Host, 140 Port: config.Port, 141 TLSConfig: config.TLSConfig, 142 }, 143 } 144 fallbackConfigs = append(fallbackConfigs, config.Fallbacks...) 145 ctx := octx 146 fallbackConfigs, err = expandWithIPs(ctx, config.LookupFunc, fallbackConfigs) 147 if err != nil { 148 return nil, &connectError{config: config, msg: "hostname resolving error", err: err} 149 } 150 151 if len(fallbackConfigs) == 0 { 152 return nil, &connectError{config: config, msg: "hostname resolving error", err: errors.New("ip addr wasn't found")} 153 } 154 155 foundBestServer := false 156 var fallbackConfig *FallbackConfig 157 for _, fc := range fallbackConfigs { 158 // ConnectTimeout restricts the whole connection process. 159 if config.ConnectTimeout != 0 { 160 var cancel context.CancelFunc 161 ctx, cancel = context.WithTimeout(octx, config.ConnectTimeout) 162 defer cancel() 163 } else { 164 ctx = octx 165 } 166 pgConn, err = connect(ctx, config, fc, false) 167 if err == nil { 168 foundBestServer = true 169 break 170 } else if pgerr, ok := err.(*PgError); ok { 171 err = &connectError{config: config, msg: "server error", err: pgerr} 172 const ERRCODE_INVALID_PASSWORD = "28P01" // wrong password 173 const ERRCODE_INVALID_AUTHORIZATION_SPECIFICATION = "28000" // wrong password or bad pg_hba.conf settings 174 const ERRCODE_INVALID_CATALOG_NAME = "3D000" // db does not exist 175 const ERRCODE_INSUFFICIENT_PRIVILEGE = "42501" // missing connect privilege 176 if pgerr.Code == ERRCODE_INVALID_PASSWORD || 177 pgerr.Code == ERRCODE_INVALID_AUTHORIZATION_SPECIFICATION || 178 pgerr.Code == ERRCODE_INVALID_CATALOG_NAME || 179 pgerr.Code == ERRCODE_INSUFFICIENT_PRIVILEGE { 180 break 181 } 182 } else if cerr, ok := err.(*connectError); ok { 183 if _, ok := cerr.err.(*NotPreferredError); ok { 184 fallbackConfig = fc 185 } 186 } 187 } 188 189 if !foundBestServer && fallbackConfig != nil { 190 pgConn, err = connect(ctx, config, fallbackConfig, true) 191 if pgerr, ok := err.(*PgError); ok { 192 err = &connectError{config: config, msg: "server error", err: pgerr} 193 } 194 } 195 196 if err != nil { 197 return nil, err // no need to wrap in connectError because it will already be wrapped in all cases except PgError 198 } 199 200 if config.AfterConnect != nil { 201 err := config.AfterConnect(ctx, pgConn) 202 if err != nil { 203 pgConn.conn.Close() 204 return nil, &connectError{config: config, msg: "AfterConnect error", err: err} 205 } 206 } 207 208 return pgConn, nil 209 } 210 211 func expandWithIPs(ctx context.Context, lookupFn LookupFunc, fallbacks []*FallbackConfig) ([]*FallbackConfig, error) { 212 var configs []*FallbackConfig 213 214 var lookupErrors []error 215 216 for _, fb := range fallbacks { 217 // skip resolve for unix sockets 218 if isAbsolutePath(fb.Host) { 219 configs = append(configs, &FallbackConfig{ 220 Host: fb.Host, 221 Port: fb.Port, 222 TLSConfig: fb.TLSConfig, 223 }) 224 225 continue 226 } 227 228 ips, err := lookupFn(ctx, fb.Host) 229 if err != nil { 230 lookupErrors = append(lookupErrors, err) 231 continue 232 } 233 234 for _, ip := range ips { 235 splitIP, splitPort, err := net.SplitHostPort(ip) 236 if err == nil { 237 port, err := strconv.ParseUint(splitPort, 10, 16) 238 if err != nil { 239 return nil, fmt.Errorf("error parsing port (%s) from lookup: %w", splitPort, err) 240 } 241 configs = append(configs, &FallbackConfig{ 242 Host: splitIP, 243 Port: uint16(port), 244 TLSConfig: fb.TLSConfig, 245 }) 246 } else { 247 configs = append(configs, &FallbackConfig{ 248 Host: ip, 249 Port: fb.Port, 250 TLSConfig: fb.TLSConfig, 251 }) 252 } 253 } 254 } 255 256 // See https://github.com/jackc/pgx/issues/1464. When Go 1.20 can be used in pgx consider using errors.Join so all 257 // errors are reported. 258 if len(configs) == 0 && len(lookupErrors) > 0 { 259 return nil, lookupErrors[0] 260 } 261 262 return configs, nil 263 } 264 265 func connect(ctx context.Context, config *Config, fallbackConfig *FallbackConfig, 266 ignoreNotPreferredErr bool) (*PgConn, error) { 267 pgConn := new(PgConn) 268 pgConn.config = config 269 pgConn.cleanupDone = make(chan struct{}) 270 271 var err error 272 network, address := NetworkAddress(fallbackConfig.Host, fallbackConfig.Port) 273 netConn, err := config.DialFunc(ctx, network, address) 274 if err != nil { 275 return nil, &connectError{config: config, msg: "dial error", err: normalizeTimeoutError(ctx, err)} 276 } 277 278 pgConn.conn = netConn 279 pgConn.contextWatcher = newContextWatcher(netConn) 280 pgConn.contextWatcher.Watch(ctx) 281 282 if fallbackConfig.TLSConfig != nil { 283 nbTLSConn, err := startTLS(netConn, fallbackConfig.TLSConfig) 284 pgConn.contextWatcher.Unwatch() // Always unwatch `netConn` after TLS. 285 if err != nil { 286 netConn.Close() 287 return nil, &connectError{config: config, msg: "tls error", err: err} 288 } 289 290 pgConn.conn = nbTLSConn 291 pgConn.contextWatcher = newContextWatcher(nbTLSConn) 292 pgConn.contextWatcher.Watch(ctx) 293 } 294 295 defer pgConn.contextWatcher.Unwatch() 296 297 pgConn.parameterStatuses = make(map[string]string) 298 pgConn.status = connStatusConnecting 299 pgConn.bgReader = bgreader.New(pgConn.conn) 300 pgConn.slowWriteTimer = time.AfterFunc(time.Duration(math.MaxInt64), pgConn.bgReader.Start) 301 pgConn.frontend = config.BuildFrontend(pgConn.bgReader, pgConn.conn) 302 303 startupMsg := pgproto3.StartupMessage{ 304 ProtocolVersion: pgproto3.ProtocolVersionNumber, 305 Parameters: make(map[string]string), 306 } 307 308 // Copy default run-time params 309 for k, v := range config.RuntimeParams { 310 startupMsg.Parameters[k] = v 311 } 312 313 startupMsg.Parameters["user"] = config.User 314 if config.Database != "" { 315 startupMsg.Parameters["database"] = config.Database 316 } 317 318 pgConn.frontend.Send(&startupMsg) 319 if err := pgConn.flushWithPotentialWriteReadDeadlock(); err != nil { 320 pgConn.conn.Close() 321 return nil, &connectError{config: config, msg: "failed to write startup message", err: normalizeTimeoutError(ctx, err)} 322 } 323 324 for { 325 msg, err := pgConn.receiveMessage() 326 if err != nil { 327 pgConn.conn.Close() 328 if err, ok := err.(*PgError); ok { 329 return nil, err 330 } 331 return nil, &connectError{config: config, msg: "failed to receive message", err: normalizeTimeoutError(ctx, err)} 332 } 333 334 switch msg := msg.(type) { 335 case *pgproto3.BackendKeyData: 336 pgConn.pid = msg.ProcessID 337 pgConn.secretKey = msg.SecretKey 338 339 case *pgproto3.AuthenticationOk: 340 case *pgproto3.AuthenticationCleartextPassword: 341 err = pgConn.txPasswordMessage(pgConn.config.Password) 342 if err != nil { 343 pgConn.conn.Close() 344 return nil, &connectError{config: config, msg: "failed to write password message", err: err} 345 } 346 case *pgproto3.AuthenticationMD5Password: 347 digestedPassword := "md5" + hexMD5(hexMD5(pgConn.config.Password+pgConn.config.User)+string(msg.Salt[:])) 348 err = pgConn.txPasswordMessage(digestedPassword) 349 if err != nil { 350 pgConn.conn.Close() 351 return nil, &connectError{config: config, msg: "failed to write password message", err: err} 352 } 353 case *pgproto3.AuthenticationSASL: 354 err = pgConn.scramAuth(msg.AuthMechanisms) 355 if err != nil { 356 pgConn.conn.Close() 357 return nil, &connectError{config: config, msg: "failed SASL auth", err: err} 358 } 359 case *pgproto3.AuthenticationGSS: 360 err = pgConn.gssAuth() 361 if err != nil { 362 pgConn.conn.Close() 363 return nil, &connectError{config: config, msg: "failed GSS auth", err: err} 364 } 365 case *pgproto3.ReadyForQuery: 366 pgConn.status = connStatusIdle 367 if config.ValidateConnect != nil { 368 // ValidateConnect may execute commands that cause the context to be watched again. Unwatch first to avoid 369 // the watch already in progress panic. This is that last thing done by this method so there is no need to 370 // restart the watch after ValidateConnect returns. 371 // 372 // See https://github.com/jackc/pgconn/issues/40. 373 pgConn.contextWatcher.Unwatch() 374 375 err := config.ValidateConnect(ctx, pgConn) 376 if err != nil { 377 if _, ok := err.(*NotPreferredError); ignoreNotPreferredErr && ok { 378 return pgConn, nil 379 } 380 pgConn.conn.Close() 381 return nil, &connectError{config: config, msg: "ValidateConnect failed", err: err} 382 } 383 } 384 return pgConn, nil 385 case *pgproto3.ParameterStatus, *pgproto3.NoticeResponse: 386 // handled by ReceiveMessage 387 case *pgproto3.ErrorResponse: 388 pgConn.conn.Close() 389 return nil, ErrorResponseToPgError(msg) 390 default: 391 pgConn.conn.Close() 392 return nil, &connectError{config: config, msg: "received unexpected message", err: err} 393 } 394 } 395 } 396 397 func newContextWatcher(conn net.Conn) *ctxwatch.ContextWatcher { 398 return ctxwatch.NewContextWatcher( 399 func() { conn.SetDeadline(time.Date(1, 1, 1, 1, 1, 1, 1, time.UTC)) }, 400 func() { conn.SetDeadline(time.Time{}) }, 401 ) 402 } 403 404 func startTLS(conn net.Conn, tlsConfig *tls.Config) (net.Conn, error) { 405 err := binary.Write(conn, binary.BigEndian, []int32{8, 80877103}) 406 if err != nil { 407 return nil, err 408 } 409 410 response := make([]byte, 1) 411 if _, err = io.ReadFull(conn, response); err != nil { 412 return nil, err 413 } 414 415 if response[0] != 'S' { 416 return nil, errors.New("server refused TLS connection") 417 } 418 419 return tls.Client(conn, tlsConfig), nil 420 } 421 422 func (pgConn *PgConn) txPasswordMessage(password string) (err error) { 423 pgConn.frontend.Send(&pgproto3.PasswordMessage{Password: password}) 424 return pgConn.flushWithPotentialWriteReadDeadlock() 425 } 426 427 func hexMD5(s string) string { 428 hash := md5.New() 429 io.WriteString(hash, s) 430 return hex.EncodeToString(hash.Sum(nil)) 431 } 432 433 func (pgConn *PgConn) signalMessage() chan struct{} { 434 if pgConn.bufferingReceive { 435 panic("BUG: signalMessage when already in progress") 436 } 437 438 pgConn.bufferingReceive = true 439 pgConn.bufferingReceiveMux.Lock() 440 441 ch := make(chan struct{}) 442 go func() { 443 pgConn.bufferingReceiveMsg, pgConn.bufferingReceiveErr = pgConn.frontend.Receive() 444 pgConn.bufferingReceiveMux.Unlock() 445 close(ch) 446 }() 447 448 return ch 449 } 450 451 // ReceiveMessage receives one wire protocol message from the PostgreSQL server. It must only be used when the 452 // connection is not busy. e.g. It is an error to call ReceiveMessage while reading the result of a query. The messages 453 // are still handled by the core pgconn message handling system so receiving a NotificationResponse will still trigger 454 // the OnNotification callback. 455 // 456 // This is a very low level method that requires deep understanding of the PostgreSQL wire protocol to use correctly. 457 // See https://www.postgresql.org/docs/current/protocol.html. 458 func (pgConn *PgConn) ReceiveMessage(ctx context.Context) (pgproto3.BackendMessage, error) { 459 if err := pgConn.lock(); err != nil { 460 return nil, err 461 } 462 defer pgConn.unlock() 463 464 if ctx != context.Background() { 465 select { 466 case <-ctx.Done(): 467 return nil, newContextAlreadyDoneError(ctx) 468 default: 469 } 470 pgConn.contextWatcher.Watch(ctx) 471 defer pgConn.contextWatcher.Unwatch() 472 } 473 474 msg, err := pgConn.receiveMessage() 475 if err != nil { 476 err = &pgconnError{ 477 msg: "receive message failed", 478 err: normalizeTimeoutError(ctx, err), 479 safeToRetry: true} 480 } 481 return msg, err 482 } 483 484 // peekMessage peeks at the next message without setting up context cancellation. 485 func (pgConn *PgConn) peekMessage() (pgproto3.BackendMessage, error) { 486 if pgConn.peekedMsg != nil { 487 return pgConn.peekedMsg, nil 488 } 489 490 var msg pgproto3.BackendMessage 491 var err error 492 if pgConn.bufferingReceive { 493 pgConn.bufferingReceiveMux.Lock() 494 msg = pgConn.bufferingReceiveMsg 495 err = pgConn.bufferingReceiveErr 496 pgConn.bufferingReceiveMux.Unlock() 497 pgConn.bufferingReceive = false 498 499 // If a timeout error happened in the background try the read again. 500 var netErr net.Error 501 if errors.As(err, &netErr) && netErr.Timeout() { 502 msg, err = pgConn.frontend.Receive() 503 } 504 } else { 505 msg, err = pgConn.frontend.Receive() 506 } 507 508 if err != nil { 509 // Close on anything other than timeout error - everything else is fatal 510 var netErr net.Error 511 isNetErr := errors.As(err, &netErr) 512 if !(isNetErr && netErr.Timeout()) { 513 pgConn.asyncClose() 514 } 515 516 return nil, err 517 } 518 519 pgConn.peekedMsg = msg 520 return msg, nil 521 } 522 523 // receiveMessage receives a message without setting up context cancellation 524 func (pgConn *PgConn) receiveMessage() (pgproto3.BackendMessage, error) { 525 msg, err := pgConn.peekMessage() 526 if err != nil { 527 return nil, err 528 } 529 pgConn.peekedMsg = nil 530 531 switch msg := msg.(type) { 532 case *pgproto3.ReadyForQuery: 533 pgConn.txStatus = msg.TxStatus 534 case *pgproto3.ParameterStatus: 535 pgConn.parameterStatuses[msg.Name] = msg.Value 536 case *pgproto3.ErrorResponse: 537 if msg.Severity == "FATAL" { 538 pgConn.status = connStatusClosed 539 pgConn.conn.Close() // Ignore error as the connection is already broken and there is already an error to return. 540 close(pgConn.cleanupDone) 541 return nil, ErrorResponseToPgError(msg) 542 } 543 case *pgproto3.NoticeResponse: 544 if pgConn.config.OnNotice != nil { 545 pgConn.config.OnNotice(pgConn, noticeResponseToNotice(msg)) 546 } 547 case *pgproto3.NotificationResponse: 548 if pgConn.config.OnNotification != nil { 549 pgConn.config.OnNotification(pgConn, &Notification{PID: msg.PID, Channel: msg.Channel, Payload: msg.Payload}) 550 } 551 } 552 553 return msg, nil 554 } 555 556 // Conn returns the underlying net.Conn. This rarely necessary. 557 func (pgConn *PgConn) Conn() net.Conn { 558 return pgConn.conn 559 } 560 561 // PID returns the backend PID. 562 func (pgConn *PgConn) PID() uint32 { 563 return pgConn.pid 564 } 565 566 // TxStatus returns the current TxStatus as reported by the server in the ReadyForQuery message. 567 // 568 // Possible return values: 569 // 570 // 'I' - idle / not in transaction 571 // 'T' - in a transaction 572 // 'E' - in a failed transaction 573 // 574 // See https://www.postgresql.org/docs/current/protocol-message-formats.html. 575 func (pgConn *PgConn) TxStatus() byte { 576 return pgConn.txStatus 577 } 578 579 // SecretKey returns the backend secret key used to send a cancel query message to the server. 580 func (pgConn *PgConn) SecretKey() uint32 { 581 return pgConn.secretKey 582 } 583 584 // Frontend returns the underlying *pgproto3.Frontend. This rarely necessary. 585 func (pgConn *PgConn) Frontend() *pgproto3.Frontend { 586 return pgConn.frontend 587 } 588 589 // Close closes a connection. It is safe to call Close on a already closed connection. Close attempts a clean close by 590 // sending the exit message to PostgreSQL. However, this could block so ctx is available to limit the time to wait. The 591 // underlying net.Conn.Close() will always be called regardless of any other errors. 592 func (pgConn *PgConn) Close(ctx context.Context) error { 593 if pgConn.status == connStatusClosed { 594 return nil 595 } 596 pgConn.status = connStatusClosed 597 598 defer close(pgConn.cleanupDone) 599 defer pgConn.conn.Close() 600 601 if ctx != context.Background() { 602 // Close may be called while a cancellable query is in progress. This will most often be triggered by panic when 603 // a defer closes the connection (possibly indirectly via a transaction or a connection pool). Unwatch to end any 604 // previous watch. It is safe to Unwatch regardless of whether a watch is already is progress. 605 // 606 // See https://github.com/jackc/pgconn/issues/29 607 pgConn.contextWatcher.Unwatch() 608 609 pgConn.contextWatcher.Watch(ctx) 610 defer pgConn.contextWatcher.Unwatch() 611 } 612 613 // Ignore any errors sending Terminate message and waiting for server to close connection. 614 // This mimics the behavior of libpq PQfinish. It calls closePGconn which calls sendTerminateConn which purposefully 615 // ignores errors. 616 // 617 // See https://github.com/jackc/pgx/issues/637 618 pgConn.frontend.Send(&pgproto3.Terminate{}) 619 pgConn.flushWithPotentialWriteReadDeadlock() 620 621 return pgConn.conn.Close() 622 } 623 624 // asyncClose marks the connection as closed and asynchronously sends a cancel query message and closes the underlying 625 // connection. 626 func (pgConn *PgConn) asyncClose() { 627 if pgConn.status == connStatusClosed { 628 return 629 } 630 pgConn.status = connStatusClosed 631 632 go func() { 633 defer close(pgConn.cleanupDone) 634 defer pgConn.conn.Close() 635 636 deadline := time.Now().Add(time.Second * 15) 637 638 ctx, cancel := context.WithDeadline(context.Background(), deadline) 639 defer cancel() 640 641 pgConn.CancelRequest(ctx) 642 643 pgConn.conn.SetDeadline(deadline) 644 645 pgConn.frontend.Send(&pgproto3.Terminate{}) 646 pgConn.flushWithPotentialWriteReadDeadlock() 647 }() 648 } 649 650 // CleanupDone returns a channel that will be closed after all underlying resources have been cleaned up. A closed 651 // connection is no longer usable, but underlying resources, in particular the net.Conn, may not have finished closing 652 // yet. This is because certain errors such as a context cancellation require that the interrupted function call return 653 // immediately, but the error may also cause the connection to be closed. In these cases the underlying resources are 654 // closed asynchronously. 655 // 656 // This is only likely to be useful to connection pools. It gives them a way avoid establishing a new connection while 657 // an old connection is still being cleaned up and thereby exceeding the maximum pool size. 658 func (pgConn *PgConn) CleanupDone() chan (struct{}) { 659 return pgConn.cleanupDone 660 } 661 662 // IsClosed reports if the connection has been closed. 663 // 664 // CleanupDone() can be used to determine if all cleanup has been completed. 665 func (pgConn *PgConn) IsClosed() bool { 666 return pgConn.status < connStatusIdle 667 } 668 669 // IsBusy reports if the connection is busy. 670 func (pgConn *PgConn) IsBusy() bool { 671 return pgConn.status == connStatusBusy 672 } 673 674 // lock locks the connection. 675 func (pgConn *PgConn) lock() error { 676 switch pgConn.status { 677 case connStatusBusy: 678 return &connLockError{status: "conn busy"} // This only should be possible in case of an application bug. 679 case connStatusClosed: 680 return &connLockError{status: "conn closed"} 681 case connStatusUninitialized: 682 return &connLockError{status: "conn uninitialized"} 683 } 684 pgConn.status = connStatusBusy 685 return nil 686 } 687 688 func (pgConn *PgConn) unlock() { 689 switch pgConn.status { 690 case connStatusBusy: 691 pgConn.status = connStatusIdle 692 case connStatusClosed: 693 default: 694 panic("BUG: cannot unlock unlocked connection") // This should only be possible if there is a bug in this package. 695 } 696 } 697 698 // ParameterStatus returns the value of a parameter reported by the server (e.g. 699 // server_version). Returns an empty string for unknown parameters. 700 func (pgConn *PgConn) ParameterStatus(key string) string { 701 return pgConn.parameterStatuses[key] 702 } 703 704 // CommandTag is the status text returned by PostgreSQL for a query. 705 type CommandTag struct { 706 s string 707 } 708 709 // NewCommandTag makes a CommandTag from s. 710 func NewCommandTag(s string) CommandTag { 711 return CommandTag{s: s} 712 } 713 714 // RowsAffected returns the number of rows affected. If the CommandTag was not 715 // for a row affecting command (e.g. "CREATE TABLE") then it returns 0. 716 func (ct CommandTag) RowsAffected() int64 { 717 // Find last non-digit 718 idx := -1 719 for i := len(ct.s) - 1; i >= 0; i-- { 720 if ct.s[i] >= '0' && ct.s[i] <= '9' { 721 idx = i 722 } else { 723 break 724 } 725 } 726 727 if idx == -1 { 728 return 0 729 } 730 731 var n int64 732 for _, b := range ct.s[idx:] { 733 n = n*10 + int64(b-'0') 734 } 735 736 return n 737 } 738 739 func (ct CommandTag) String() string { 740 return ct.s 741 } 742 743 // Insert is true if the command tag starts with "INSERT". 744 func (ct CommandTag) Insert() bool { 745 return strings.HasPrefix(ct.s, "INSERT") 746 } 747 748 // Update is true if the command tag starts with "UPDATE". 749 func (ct CommandTag) Update() bool { 750 return strings.HasPrefix(ct.s, "UPDATE") 751 } 752 753 // Delete is true if the command tag starts with "DELETE". 754 func (ct CommandTag) Delete() bool { 755 return strings.HasPrefix(ct.s, "DELETE") 756 } 757 758 // Select is true if the command tag starts with "SELECT". 759 func (ct CommandTag) Select() bool { 760 return strings.HasPrefix(ct.s, "SELECT") 761 } 762 763 type FieldDescription struct { 764 Name string 765 TableOID uint32 766 TableAttributeNumber uint16 767 DataTypeOID uint32 768 DataTypeSize int16 769 TypeModifier int32 770 Format int16 771 } 772 773 func (pgConn *PgConn) convertRowDescription(dst []FieldDescription, rd *pgproto3.RowDescription) []FieldDescription { 774 if cap(dst) >= len(rd.Fields) { 775 dst = dst[:len(rd.Fields):len(rd.Fields)] 776 } else { 777 dst = make([]FieldDescription, len(rd.Fields)) 778 } 779 780 for i := range rd.Fields { 781 dst[i].Name = string(rd.Fields[i].Name) 782 dst[i].TableOID = rd.Fields[i].TableOID 783 dst[i].TableAttributeNumber = rd.Fields[i].TableAttributeNumber 784 dst[i].DataTypeOID = rd.Fields[i].DataTypeOID 785 dst[i].DataTypeSize = rd.Fields[i].DataTypeSize 786 dst[i].TypeModifier = rd.Fields[i].TypeModifier 787 dst[i].Format = rd.Fields[i].Format 788 } 789 790 return dst 791 } 792 793 type StatementDescription struct { 794 Name string 795 SQL string 796 ParamOIDs []uint32 797 Fields []FieldDescription 798 } 799 800 // Prepare creates a prepared statement. If the name is empty, the anonymous prepared statement will be used. This 801 // allows Prepare to also to describe statements without creating a server-side prepared statement. 802 func (pgConn *PgConn) Prepare(ctx context.Context, name, sql string, paramOIDs []uint32) (*StatementDescription, error) { 803 if err := pgConn.lock(); err != nil { 804 return nil, err 805 } 806 defer pgConn.unlock() 807 808 if ctx != context.Background() { 809 select { 810 case <-ctx.Done(): 811 return nil, newContextAlreadyDoneError(ctx) 812 default: 813 } 814 pgConn.contextWatcher.Watch(ctx) 815 defer pgConn.contextWatcher.Unwatch() 816 } 817 818 pgConn.frontend.SendParse(&pgproto3.Parse{Name: name, Query: sql, ParameterOIDs: paramOIDs}) 819 pgConn.frontend.SendDescribe(&pgproto3.Describe{ObjectType: 'S', Name: name}) 820 pgConn.frontend.SendSync(&pgproto3.Sync{}) 821 err := pgConn.flushWithPotentialWriteReadDeadlock() 822 if err != nil { 823 pgConn.asyncClose() 824 return nil, err 825 } 826 827 psd := &StatementDescription{Name: name, SQL: sql} 828 829 var parseErr error 830 831 readloop: 832 for { 833 msg, err := pgConn.receiveMessage() 834 if err != nil { 835 pgConn.asyncClose() 836 return nil, normalizeTimeoutError(ctx, err) 837 } 838 839 switch msg := msg.(type) { 840 case *pgproto3.ParameterDescription: 841 psd.ParamOIDs = make([]uint32, len(msg.ParameterOIDs)) 842 copy(psd.ParamOIDs, msg.ParameterOIDs) 843 case *pgproto3.RowDescription: 844 psd.Fields = pgConn.convertRowDescription(nil, msg) 845 case *pgproto3.ErrorResponse: 846 parseErr = ErrorResponseToPgError(msg) 847 case *pgproto3.ReadyForQuery: 848 break readloop 849 } 850 } 851 852 if parseErr != nil { 853 return nil, parseErr 854 } 855 return psd, nil 856 } 857 858 // ErrorResponseToPgError converts a wire protocol error message to a *PgError. 859 func ErrorResponseToPgError(msg *pgproto3.ErrorResponse) *PgError { 860 return &PgError{ 861 Severity: msg.Severity, 862 Code: string(msg.Code), 863 Message: string(msg.Message), 864 Detail: string(msg.Detail), 865 Hint: msg.Hint, 866 Position: msg.Position, 867 InternalPosition: msg.InternalPosition, 868 InternalQuery: string(msg.InternalQuery), 869 Where: string(msg.Where), 870 SchemaName: string(msg.SchemaName), 871 TableName: string(msg.TableName), 872 ColumnName: string(msg.ColumnName), 873 DataTypeName: string(msg.DataTypeName), 874 ConstraintName: msg.ConstraintName, 875 File: string(msg.File), 876 Line: msg.Line, 877 Routine: string(msg.Routine), 878 } 879 } 880 881 func noticeResponseToNotice(msg *pgproto3.NoticeResponse) *Notice { 882 pgerr := ErrorResponseToPgError((*pgproto3.ErrorResponse)(msg)) 883 return (*Notice)(pgerr) 884 } 885 886 // CancelRequest sends a cancel request to the PostgreSQL server. It returns an error if unable to deliver the cancel 887 // request, but lack of an error does not ensure that the query was canceled. As specified in the documentation, there 888 // is no way to be sure a query was canceled. See https://www.postgresql.org/docs/11/protocol-flow.html#id-1.10.5.7.9 889 func (pgConn *PgConn) CancelRequest(ctx context.Context) error { 890 // Open a cancellation request to the same server. The address is taken from the net.Conn directly instead of reusing 891 // the connection config. This is important in high availability configurations where fallback connections may be 892 // specified or DNS may be used to load balance. 893 serverAddr := pgConn.conn.RemoteAddr() 894 var serverNetwork string 895 var serverAddress string 896 if serverAddr.Network() == "unix" { 897 // for unix sockets, RemoteAddr() calls getpeername() which returns the name the 898 // server passed to bind(). For Postgres, this is always a relative path "./.s.PGSQL.5432" 899 // so connecting to it will fail. Fall back to the config's value 900 serverNetwork, serverAddress = NetworkAddress(pgConn.config.Host, pgConn.config.Port) 901 } else { 902 serverNetwork, serverAddress = serverAddr.Network(), serverAddr.String() 903 } 904 cancelConn, err := pgConn.config.DialFunc(ctx, serverNetwork, serverAddress) 905 if err != nil { 906 // In case of unix sockets, RemoteAddr() returns only the file part of the path. If the 907 // first connect failed, try the config. 908 if serverAddr.Network() != "unix" { 909 return err 910 } 911 serverNetwork, serverAddr := NetworkAddress(pgConn.config.Host, pgConn.config.Port) 912 cancelConn, err = pgConn.config.DialFunc(ctx, serverNetwork, serverAddr) 913 if err != nil { 914 return err 915 } 916 } 917 defer cancelConn.Close() 918 919 if ctx != context.Background() { 920 contextWatcher := ctxwatch.NewContextWatcher( 921 func() { cancelConn.SetDeadline(time.Date(1, 1, 1, 1, 1, 1, 1, time.UTC)) }, 922 func() { cancelConn.SetDeadline(time.Time{}) }, 923 ) 924 contextWatcher.Watch(ctx) 925 defer contextWatcher.Unwatch() 926 } 927 928 buf := make([]byte, 16) 929 binary.BigEndian.PutUint32(buf[0:4], 16) 930 binary.BigEndian.PutUint32(buf[4:8], 80877102) 931 binary.BigEndian.PutUint32(buf[8:12], uint32(pgConn.pid)) 932 binary.BigEndian.PutUint32(buf[12:16], uint32(pgConn.secretKey)) 933 // Postgres will process the request and close the connection 934 // so when don't need to read the reply 935 // https://www.postgresql.org/docs/current/protocol-flow.html#id-1.10.6.7.10 936 _, err = cancelConn.Write(buf) 937 return err 938 } 939 940 // WaitForNotification waits for a LISTON/NOTIFY message to be received. It returns an error if a notification was not 941 // received. 942 func (pgConn *PgConn) WaitForNotification(ctx context.Context) error { 943 if err := pgConn.lock(); err != nil { 944 return err 945 } 946 defer pgConn.unlock() 947 948 if ctx != context.Background() { 949 select { 950 case <-ctx.Done(): 951 return newContextAlreadyDoneError(ctx) 952 default: 953 } 954 955 pgConn.contextWatcher.Watch(ctx) 956 defer pgConn.contextWatcher.Unwatch() 957 } 958 959 for { 960 msg, err := pgConn.receiveMessage() 961 if err != nil { 962 return normalizeTimeoutError(ctx, err) 963 } 964 965 switch msg.(type) { 966 case *pgproto3.NotificationResponse: 967 return nil 968 } 969 } 970 } 971 972 // Exec executes SQL via the PostgreSQL simple query protocol. SQL may contain multiple queries. Execution is 973 // implicitly wrapped in a transaction unless a transaction is already in progress or SQL contains transaction control 974 // statements. 975 // 976 // Prefer ExecParams unless executing arbitrary SQL that may contain multiple queries. 977 func (pgConn *PgConn) Exec(ctx context.Context, sql string) *MultiResultReader { 978 if err := pgConn.lock(); err != nil { 979 return &MultiResultReader{ 980 closed: true, 981 err: err, 982 } 983 } 984 985 pgConn.multiResultReader = MultiResultReader{ 986 pgConn: pgConn, 987 ctx: ctx, 988 } 989 multiResult := &pgConn.multiResultReader 990 if ctx != context.Background() { 991 select { 992 case <-ctx.Done(): 993 multiResult.closed = true 994 multiResult.err = newContextAlreadyDoneError(ctx) 995 pgConn.unlock() 996 return multiResult 997 default: 998 } 999 pgConn.contextWatcher.Watch(ctx) 1000 } 1001 1002 pgConn.frontend.SendQuery(&pgproto3.Query{String: sql}) 1003 err := pgConn.flushWithPotentialWriteReadDeadlock() 1004 if err != nil { 1005 pgConn.asyncClose() 1006 pgConn.contextWatcher.Unwatch() 1007 multiResult.closed = true 1008 multiResult.err = err 1009 pgConn.unlock() 1010 return multiResult 1011 } 1012 1013 return multiResult 1014 } 1015 1016 // ExecParams executes a command via the PostgreSQL extended query protocol. 1017 // 1018 // sql is a SQL command string. It may only contain one query. Parameter substitution is positional using $1, $2, $3, 1019 // etc. 1020 // 1021 // paramValues are the parameter values. It must be encoded in the format given by paramFormats. 1022 // 1023 // paramOIDs is a slice of data type OIDs for paramValues. If paramOIDs is nil, the server will infer the data type for 1024 // all parameters. Any paramOID element that is 0 that will cause the server to infer the data type for that parameter. 1025 // ExecParams will panic if len(paramOIDs) is not 0, 1, or len(paramValues). 1026 // 1027 // paramFormats is a slice of format codes determining for each paramValue column whether it is encoded in text or 1028 // binary format. If paramFormats is nil all params are text format. ExecParams will panic if 1029 // len(paramFormats) is not 0, 1, or len(paramValues). 1030 // 1031 // resultFormats is a slice of format codes determining for each result column whether it is encoded in text or 1032 // binary format. If resultFormats is nil all results will be in text format. 1033 // 1034 // ResultReader must be closed before PgConn can be used again. 1035 func (pgConn *PgConn) ExecParams(ctx context.Context, sql string, paramValues [][]byte, paramOIDs []uint32, paramFormats []int16, resultFormats []int16) *ResultReader { 1036 result := pgConn.execExtendedPrefix(ctx, paramValues) 1037 if result.closed { 1038 return result 1039 } 1040 1041 pgConn.frontend.SendParse(&pgproto3.Parse{Query: sql, ParameterOIDs: paramOIDs}) 1042 pgConn.frontend.SendBind(&pgproto3.Bind{ParameterFormatCodes: paramFormats, Parameters: paramValues, ResultFormatCodes: resultFormats}) 1043 1044 pgConn.execExtendedSuffix(result) 1045 1046 return result 1047 } 1048 1049 // ExecPrepared enqueues the execution of a prepared statement via the PostgreSQL extended query protocol. 1050 // 1051 // paramValues are the parameter values. It must be encoded in the format given by paramFormats. 1052 // 1053 // paramFormats is a slice of format codes determining for each paramValue column whether it is encoded in text or 1054 // binary format. If paramFormats is nil all params are text format. ExecPrepared will panic if 1055 // len(paramFormats) is not 0, 1, or len(paramValues). 1056 // 1057 // resultFormats is a slice of format codes determining for each result column whether it is encoded in text or 1058 // binary format. If resultFormats is nil all results will be in text format. 1059 // 1060 // ResultReader must be closed before PgConn can be used again. 1061 func (pgConn *PgConn) ExecPrepared(ctx context.Context, stmtName string, paramValues [][]byte, paramFormats []int16, resultFormats []int16) *ResultReader { 1062 result := pgConn.execExtendedPrefix(ctx, paramValues) 1063 if result.closed { 1064 return result 1065 } 1066 1067 pgConn.frontend.SendBind(&pgproto3.Bind{PreparedStatement: stmtName, ParameterFormatCodes: paramFormats, Parameters: paramValues, ResultFormatCodes: resultFormats}) 1068 1069 pgConn.execExtendedSuffix(result) 1070 1071 return result 1072 } 1073 1074 func (pgConn *PgConn) execExtendedPrefix(ctx context.Context, paramValues [][]byte) *ResultReader { 1075 pgConn.resultReader = ResultReader{ 1076 pgConn: pgConn, 1077 ctx: ctx, 1078 } 1079 result := &pgConn.resultReader 1080 1081 if err := pgConn.lock(); err != nil { 1082 result.concludeCommand(CommandTag{}, err) 1083 result.closed = true 1084 return result 1085 } 1086 1087 if len(paramValues) > math.MaxUint16 { 1088 result.concludeCommand(CommandTag{}, fmt.Errorf("extended protocol limited to %v parameters", math.MaxUint16)) 1089 result.closed = true 1090 pgConn.unlock() 1091 return result 1092 } 1093 1094 if ctx != context.Background() { 1095 select { 1096 case <-ctx.Done(): 1097 result.concludeCommand(CommandTag{}, newContextAlreadyDoneError(ctx)) 1098 result.closed = true 1099 pgConn.unlock() 1100 return result 1101 default: 1102 } 1103 pgConn.contextWatcher.Watch(ctx) 1104 } 1105 1106 return result 1107 } 1108 1109 func (pgConn *PgConn) execExtendedSuffix(result *ResultReader) { 1110 pgConn.frontend.SendDescribe(&pgproto3.Describe{ObjectType: 'P'}) 1111 pgConn.frontend.SendExecute(&pgproto3.Execute{}) 1112 pgConn.frontend.SendSync(&pgproto3.Sync{}) 1113 1114 err := pgConn.flushWithPotentialWriteReadDeadlock() 1115 if err != nil { 1116 pgConn.asyncClose() 1117 result.concludeCommand(CommandTag{}, err) 1118 pgConn.contextWatcher.Unwatch() 1119 result.closed = true 1120 pgConn.unlock() 1121 return 1122 } 1123 1124 result.readUntilRowDescription() 1125 } 1126 1127 // CopyTo executes the copy command sql and copies the results to w. 1128 func (pgConn *PgConn) CopyTo(ctx context.Context, w io.Writer, sql string) (CommandTag, error) { 1129 if err := pgConn.lock(); err != nil { 1130 return CommandTag{}, err 1131 } 1132 1133 if ctx != context.Background() { 1134 select { 1135 case <-ctx.Done(): 1136 pgConn.unlock() 1137 return CommandTag{}, newContextAlreadyDoneError(ctx) 1138 default: 1139 } 1140 pgConn.contextWatcher.Watch(ctx) 1141 defer pgConn.contextWatcher.Unwatch() 1142 } 1143 1144 // Send copy to command 1145 pgConn.frontend.SendQuery(&pgproto3.Query{String: sql}) 1146 1147 err := pgConn.flushWithPotentialWriteReadDeadlock() 1148 if err != nil { 1149 pgConn.asyncClose() 1150 pgConn.unlock() 1151 return CommandTag{}, err 1152 } 1153 1154 // Read results 1155 var commandTag CommandTag 1156 var pgErr error 1157 for { 1158 msg, err := pgConn.receiveMessage() 1159 if err != nil { 1160 pgConn.asyncClose() 1161 return CommandTag{}, normalizeTimeoutError(ctx, err) 1162 } 1163 1164 switch msg := msg.(type) { 1165 case *pgproto3.CopyDone: 1166 case *pgproto3.CopyData: 1167 _, err := w.Write(msg.Data) 1168 if err != nil { 1169 pgConn.asyncClose() 1170 return CommandTag{}, err 1171 } 1172 case *pgproto3.ReadyForQuery: 1173 pgConn.unlock() 1174 return commandTag, pgErr 1175 case *pgproto3.CommandComplete: 1176 commandTag = pgConn.makeCommandTag(msg.CommandTag) 1177 case *pgproto3.ErrorResponse: 1178 pgErr = ErrorResponseToPgError(msg) 1179 } 1180 } 1181 } 1182 1183 // CopyFrom executes the copy command sql and copies all of r to the PostgreSQL server. 1184 // 1185 // Note: context cancellation will only interrupt operations on the underlying PostgreSQL network connection. Reads on r 1186 // could still block. 1187 func (pgConn *PgConn) CopyFrom(ctx context.Context, r io.Reader, sql string) (CommandTag, error) { 1188 if err := pgConn.lock(); err != nil { 1189 return CommandTag{}, err 1190 } 1191 defer pgConn.unlock() 1192 1193 if ctx != context.Background() { 1194 select { 1195 case <-ctx.Done(): 1196 return CommandTag{}, newContextAlreadyDoneError(ctx) 1197 default: 1198 } 1199 pgConn.contextWatcher.Watch(ctx) 1200 defer pgConn.contextWatcher.Unwatch() 1201 } 1202 1203 // Send copy from query 1204 pgConn.frontend.SendQuery(&pgproto3.Query{String: sql}) 1205 err := pgConn.flushWithPotentialWriteReadDeadlock() 1206 if err != nil { 1207 pgConn.asyncClose() 1208 return CommandTag{}, err 1209 } 1210 1211 // Send copy data 1212 abortCopyChan := make(chan struct{}) 1213 copyErrChan := make(chan error, 1) 1214 signalMessageChan := pgConn.signalMessage() 1215 var wg sync.WaitGroup 1216 wg.Add(1) 1217 1218 go func() { 1219 defer wg.Done() 1220 buf := iobufpool.Get(65536) 1221 defer iobufpool.Put(buf) 1222 (*buf)[0] = 'd' 1223 1224 for { 1225 n, readErr := r.Read((*buf)[5:cap(*buf)]) 1226 if n > 0 { 1227 *buf = (*buf)[0 : n+5] 1228 pgio.SetInt32((*buf)[1:], int32(n+4)) 1229 1230 writeErr := pgConn.frontend.SendUnbufferedEncodedCopyData(*buf) 1231 if writeErr != nil { 1232 // Write errors are always fatal, but we can't use asyncClose because we are in a different goroutine. Not 1233 // setting pgConn.status or closing pgConn.cleanupDone for the same reason. 1234 pgConn.conn.Close() 1235 1236 copyErrChan <- writeErr 1237 return 1238 } 1239 } 1240 if readErr != nil { 1241 copyErrChan <- readErr 1242 return 1243 } 1244 1245 select { 1246 case <-abortCopyChan: 1247 return 1248 default: 1249 } 1250 } 1251 }() 1252 1253 var pgErr error 1254 var copyErr error 1255 for copyErr == nil && pgErr == nil { 1256 select { 1257 case copyErr = <-copyErrChan: 1258 case <-signalMessageChan: 1259 // If pgConn.receiveMessage encounters an error it will call pgConn.asyncClose. But that is a race condition with 1260 // the goroutine. So instead check pgConn.bufferingReceiveErr which will have been set by the signalMessage. If an 1261 // error is found then forcibly close the connection without sending the Terminate message. 1262 if err := pgConn.bufferingReceiveErr; err != nil { 1263 pgConn.status = connStatusClosed 1264 pgConn.conn.Close() 1265 close(pgConn.cleanupDone) 1266 return CommandTag{}, normalizeTimeoutError(ctx, err) 1267 } 1268 msg, _ := pgConn.receiveMessage() 1269 1270 switch msg := msg.(type) { 1271 case *pgproto3.ErrorResponse: 1272 pgErr = ErrorResponseToPgError(msg) 1273 default: 1274 signalMessageChan = pgConn.signalMessage() 1275 } 1276 } 1277 } 1278 close(abortCopyChan) 1279 // Make sure io goroutine finishes before writing. 1280 wg.Wait() 1281 1282 if copyErr == io.EOF || pgErr != nil { 1283 pgConn.frontend.Send(&pgproto3.CopyDone{}) 1284 } else { 1285 pgConn.frontend.Send(&pgproto3.CopyFail{Message: copyErr.Error()}) 1286 } 1287 err = pgConn.flushWithPotentialWriteReadDeadlock() 1288 if err != nil { 1289 pgConn.asyncClose() 1290 return CommandTag{}, err 1291 } 1292 1293 // Read results 1294 var commandTag CommandTag 1295 for { 1296 msg, err := pgConn.receiveMessage() 1297 if err != nil { 1298 pgConn.asyncClose() 1299 return CommandTag{}, normalizeTimeoutError(ctx, err) 1300 } 1301 1302 switch msg := msg.(type) { 1303 case *pgproto3.ReadyForQuery: 1304 return commandTag, pgErr 1305 case *pgproto3.CommandComplete: 1306 commandTag = pgConn.makeCommandTag(msg.CommandTag) 1307 case *pgproto3.ErrorResponse: 1308 pgErr = ErrorResponseToPgError(msg) 1309 } 1310 } 1311 } 1312 1313 // MultiResultReader is a reader for a command that could return multiple results such as Exec or ExecBatch. 1314 type MultiResultReader struct { 1315 pgConn *PgConn 1316 ctx context.Context 1317 pipeline *Pipeline 1318 1319 rr *ResultReader 1320 1321 closed bool 1322 err error 1323 } 1324 1325 // ReadAll reads all available results. Calling ReadAll is mutually exclusive with all other MultiResultReader methods. 1326 func (mrr *MultiResultReader) ReadAll() ([]*Result, error) { 1327 var results []*Result 1328 1329 for mrr.NextResult() { 1330 results = append(results, mrr.ResultReader().Read()) 1331 } 1332 err := mrr.Close() 1333 1334 return results, err 1335 } 1336 1337 func (mrr *MultiResultReader) receiveMessage() (pgproto3.BackendMessage, error) { 1338 msg, err := mrr.pgConn.receiveMessage() 1339 1340 if err != nil { 1341 mrr.pgConn.contextWatcher.Unwatch() 1342 mrr.err = normalizeTimeoutError(mrr.ctx, err) 1343 mrr.closed = true 1344 mrr.pgConn.asyncClose() 1345 return nil, mrr.err 1346 } 1347 1348 switch msg := msg.(type) { 1349 case *pgproto3.ReadyForQuery: 1350 mrr.closed = true 1351 if mrr.pipeline != nil { 1352 mrr.pipeline.expectedReadyForQueryCount-- 1353 } else { 1354 mrr.pgConn.contextWatcher.Unwatch() 1355 mrr.pgConn.unlock() 1356 } 1357 case *pgproto3.ErrorResponse: 1358 mrr.err = ErrorResponseToPgError(msg) 1359 } 1360 1361 return msg, nil 1362 } 1363 1364 // NextResult returns advances the MultiResultReader to the next result and returns true if a result is available. 1365 func (mrr *MultiResultReader) NextResult() bool { 1366 for !mrr.closed && mrr.err == nil { 1367 msg, err := mrr.receiveMessage() 1368 if err != nil { 1369 return false 1370 } 1371 1372 switch msg := msg.(type) { 1373 case *pgproto3.RowDescription: 1374 mrr.pgConn.resultReader = ResultReader{ 1375 pgConn: mrr.pgConn, 1376 multiResultReader: mrr, 1377 ctx: mrr.ctx, 1378 fieldDescriptions: mrr.pgConn.convertRowDescription(mrr.pgConn.fieldDescriptions[:], msg), 1379 } 1380 1381 mrr.rr = &mrr.pgConn.resultReader 1382 return true 1383 case *pgproto3.CommandComplete: 1384 mrr.pgConn.resultReader = ResultReader{ 1385 commandTag: mrr.pgConn.makeCommandTag(msg.CommandTag), 1386 commandConcluded: true, 1387 closed: true, 1388 } 1389 mrr.rr = &mrr.pgConn.resultReader 1390 return true 1391 case *pgproto3.EmptyQueryResponse: 1392 return false 1393 } 1394 } 1395 1396 return false 1397 } 1398 1399 // ResultReader returns the current ResultReader. 1400 func (mrr *MultiResultReader) ResultReader() *ResultReader { 1401 return mrr.rr 1402 } 1403 1404 // Close closes the MultiResultReader and returns the first error that occurred during the MultiResultReader's use. 1405 func (mrr *MultiResultReader) Close() error { 1406 for !mrr.closed { 1407 _, err := mrr.receiveMessage() 1408 if err != nil { 1409 return mrr.err 1410 } 1411 } 1412 1413 return mrr.err 1414 } 1415 1416 // ResultReader is a reader for the result of a single query. 1417 type ResultReader struct { 1418 pgConn *PgConn 1419 multiResultReader *MultiResultReader 1420 pipeline *Pipeline 1421 ctx context.Context 1422 1423 fieldDescriptions []FieldDescription 1424 rowValues [][]byte 1425 commandTag CommandTag 1426 commandConcluded bool 1427 closed bool 1428 err error 1429 } 1430 1431 // Result is the saved query response that is returned by calling Read on a ResultReader. 1432 type Result struct { 1433 FieldDescriptions []FieldDescription 1434 Rows [][][]byte 1435 CommandTag CommandTag 1436 Err error 1437 } 1438 1439 // Read saves the query response to a Result. 1440 func (rr *ResultReader) Read() *Result { 1441 br := &Result{} 1442 1443 for rr.NextRow() { 1444 if br.FieldDescriptions == nil { 1445 br.FieldDescriptions = make([]FieldDescription, len(rr.FieldDescriptions())) 1446 copy(br.FieldDescriptions, rr.FieldDescriptions()) 1447 } 1448 1449 values := rr.Values() 1450 row := make([][]byte, len(values)) 1451 for i := range row { 1452 row[i] = make([]byte, len(values[i])) 1453 copy(row[i], values[i]) 1454 } 1455 br.Rows = append(br.Rows, row) 1456 } 1457 1458 br.CommandTag, br.Err = rr.Close() 1459 1460 return br 1461 } 1462 1463 // NextRow advances the ResultReader to the next row and returns true if a row is available. 1464 func (rr *ResultReader) NextRow() bool { 1465 for !rr.commandConcluded { 1466 msg, err := rr.receiveMessage() 1467 if err != nil { 1468 return false 1469 } 1470 1471 switch msg := msg.(type) { 1472 case *pgproto3.DataRow: 1473 rr.rowValues = msg.Values 1474 return true 1475 } 1476 } 1477 1478 return false 1479 } 1480 1481 // FieldDescriptions returns the field descriptions for the current result set. The returned slice is only valid until 1482 // the ResultReader is closed. It may return nil (for example, if the query did not return a result set or an error was 1483 // encountered.) 1484 func (rr *ResultReader) FieldDescriptions() []FieldDescription { 1485 return rr.fieldDescriptions 1486 } 1487 1488 // Values returns the current row data. NextRow must have been previously been called. The returned [][]byte is only 1489 // valid until the next NextRow call or the ResultReader is closed. 1490 func (rr *ResultReader) Values() [][]byte { 1491 return rr.rowValues 1492 } 1493 1494 // Close consumes any remaining result data and returns the command tag or 1495 // error. 1496 func (rr *ResultReader) Close() (CommandTag, error) { 1497 if rr.closed { 1498 return rr.commandTag, rr.err 1499 } 1500 rr.closed = true 1501 1502 for !rr.commandConcluded { 1503 _, err := rr.receiveMessage() 1504 if err != nil { 1505 return CommandTag{}, rr.err 1506 } 1507 } 1508 1509 if rr.multiResultReader == nil && rr.pipeline == nil { 1510 for { 1511 msg, err := rr.receiveMessage() 1512 if err != nil { 1513 return CommandTag{}, rr.err 1514 } 1515 1516 switch msg := msg.(type) { 1517 // Detect a deferred constraint violation where the ErrorResponse is sent after CommandComplete. 1518 case *pgproto3.ErrorResponse: 1519 rr.err = ErrorResponseToPgError(msg) 1520 case *pgproto3.ReadyForQuery: 1521 rr.pgConn.contextWatcher.Unwatch() 1522 rr.pgConn.unlock() 1523 return rr.commandTag, rr.err 1524 } 1525 } 1526 } 1527 1528 return rr.commandTag, rr.err 1529 } 1530 1531 // readUntilRowDescription ensures the ResultReader's fieldDescriptions are loaded. It does not return an error as any 1532 // error will be stored in the ResultReader. 1533 func (rr *ResultReader) readUntilRowDescription() { 1534 for !rr.commandConcluded { 1535 // Peek before receive to avoid consuming a DataRow if the result set does not include a RowDescription method. 1536 // This should never happen under normal pgconn usage, but it is possible if SendBytes and ReceiveResults are 1537 // manually used to construct a query that does not issue a describe statement. 1538 msg, _ := rr.pgConn.peekMessage() 1539 if _, ok := msg.(*pgproto3.DataRow); ok { 1540 return 1541 } 1542 1543 // Consume the message 1544 msg, _ = rr.receiveMessage() 1545 if _, ok := msg.(*pgproto3.RowDescription); ok { 1546 return 1547 } 1548 } 1549 } 1550 1551 func (rr *ResultReader) receiveMessage() (msg pgproto3.BackendMessage, err error) { 1552 if rr.multiResultReader == nil { 1553 msg, err = rr.pgConn.receiveMessage() 1554 } else { 1555 msg, err = rr.multiResultReader.receiveMessage() 1556 } 1557 1558 if err != nil { 1559 err = normalizeTimeoutError(rr.ctx, err) 1560 rr.concludeCommand(CommandTag{}, err) 1561 rr.pgConn.contextWatcher.Unwatch() 1562 rr.closed = true 1563 if rr.multiResultReader == nil { 1564 rr.pgConn.asyncClose() 1565 } 1566 1567 return nil, rr.err 1568 } 1569 1570 switch msg := msg.(type) { 1571 case *pgproto3.RowDescription: 1572 rr.fieldDescriptions = rr.pgConn.convertRowDescription(rr.pgConn.fieldDescriptions[:], msg) 1573 case *pgproto3.CommandComplete: 1574 rr.concludeCommand(rr.pgConn.makeCommandTag(msg.CommandTag), nil) 1575 case *pgproto3.EmptyQueryResponse: 1576 rr.concludeCommand(CommandTag{}, nil) 1577 case *pgproto3.ErrorResponse: 1578 rr.concludeCommand(CommandTag{}, ErrorResponseToPgError(msg)) 1579 } 1580 1581 return msg, nil 1582 } 1583 1584 func (rr *ResultReader) concludeCommand(commandTag CommandTag, err error) { 1585 // Keep the first error that is recorded. Store the error before checking if the command is already concluded to 1586 // allow for receiving an error after CommandComplete but before ReadyForQuery. 1587 if err != nil && rr.err == nil { 1588 rr.err = err 1589 } 1590 1591 if rr.commandConcluded { 1592 return 1593 } 1594 1595 rr.commandTag = commandTag 1596 rr.rowValues = nil 1597 rr.commandConcluded = true 1598 } 1599 1600 // Batch is a collection of queries that can be sent to the PostgreSQL server in a single round-trip. 1601 type Batch struct { 1602 buf []byte 1603 } 1604 1605 // ExecParams appends an ExecParams command to the batch. See PgConn.ExecParams for parameter descriptions. 1606 func (batch *Batch) ExecParams(sql string, paramValues [][]byte, paramOIDs []uint32, paramFormats []int16, resultFormats []int16) { 1607 batch.buf = (&pgproto3.Parse{Query: sql, ParameterOIDs: paramOIDs}).Encode(batch.buf) 1608 batch.ExecPrepared("", paramValues, paramFormats, resultFormats) 1609 } 1610 1611 // ExecPrepared appends an ExecPrepared e command to the batch. See PgConn.ExecPrepared for parameter descriptions. 1612 func (batch *Batch) ExecPrepared(stmtName string, paramValues [][]byte, paramFormats []int16, resultFormats []int16) { 1613 batch.buf = (&pgproto3.Bind{PreparedStatement: stmtName, ParameterFormatCodes: paramFormats, Parameters: paramValues, ResultFormatCodes: resultFormats}).Encode(batch.buf) 1614 batch.buf = (&pgproto3.Describe{ObjectType: 'P'}).Encode(batch.buf) 1615 batch.buf = (&pgproto3.Execute{}).Encode(batch.buf) 1616 } 1617 1618 // ExecBatch executes all the queries in batch in a single round-trip. Execution is implicitly transactional unless a 1619 // transaction is already in progress or SQL contains transaction control statements. This is a simpler way of executing 1620 // multiple queries in a single round trip than using pipeline mode. 1621 func (pgConn *PgConn) ExecBatch(ctx context.Context, batch *Batch) *MultiResultReader { 1622 if err := pgConn.lock(); err != nil { 1623 return &MultiResultReader{ 1624 closed: true, 1625 err: err, 1626 } 1627 } 1628 1629 pgConn.multiResultReader = MultiResultReader{ 1630 pgConn: pgConn, 1631 ctx: ctx, 1632 } 1633 multiResult := &pgConn.multiResultReader 1634 1635 if ctx != context.Background() { 1636 select { 1637 case <-ctx.Done(): 1638 multiResult.closed = true 1639 multiResult.err = newContextAlreadyDoneError(ctx) 1640 pgConn.unlock() 1641 return multiResult 1642 default: 1643 } 1644 pgConn.contextWatcher.Watch(ctx) 1645 } 1646 1647 batch.buf = (&pgproto3.Sync{}).Encode(batch.buf) 1648 1649 pgConn.enterPotentialWriteReadDeadlock() 1650 _, err := pgConn.conn.Write(batch.buf) 1651 pgConn.exitPotentialWriteReadDeadlock() 1652 if err != nil { 1653 multiResult.closed = true 1654 multiResult.err = err 1655 pgConn.unlock() 1656 return multiResult 1657 } 1658 1659 return multiResult 1660 } 1661 1662 // EscapeString escapes a string such that it can safely be interpolated into a SQL command string. It does not include 1663 // the surrounding single quotes. 1664 // 1665 // The current implementation requires that standard_conforming_strings=on and client_encoding="UTF8". If these 1666 // conditions are not met an error will be returned. It is possible these restrictions will be lifted in the future. 1667 func (pgConn *PgConn) EscapeString(s string) (string, error) { 1668 if pgConn.ParameterStatus("standard_conforming_strings") != "on" { 1669 return "", errors.New("EscapeString must be run with standard_conforming_strings=on") 1670 } 1671 1672 if pgConn.ParameterStatus("client_encoding") != "UTF8" { 1673 return "", errors.New("EscapeString must be run with client_encoding=UTF8") 1674 } 1675 1676 return strings.Replace(s, "'", "''", -1), nil 1677 } 1678 1679 // CheckConn checks the underlying connection without writing any bytes. This is currently implemented by doing a read 1680 // with a very short deadline. This can be useful because a TCP connection can be broken such that a write will appear 1681 // to succeed even though it will never actually reach the server. Reading immediately before a write will detect this 1682 // condition. If this is done immediately before sending a query it reduces the chances a query will be sent that fails 1683 // without the client knowing whether the server received it or not. 1684 // 1685 // Deprecated: CheckConn is deprecated in favor of Ping. CheckConn cannot detect all types of broken connections where 1686 // the write would still appear to succeed. Prefer Ping unless on a high latency connection. 1687 func (pgConn *PgConn) CheckConn() error { 1688 ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond) 1689 defer cancel() 1690 1691 _, err := pgConn.ReceiveMessage(ctx) 1692 if err != nil { 1693 if !Timeout(err) { 1694 return err 1695 } 1696 } 1697 1698 return nil 1699 } 1700 1701 // Ping pings the server. This can be useful because a TCP connection can be broken such that a write will appear to 1702 // succeed even though it will never actually reach the server. Pinging immediately before sending a query reduces the 1703 // chances a query will be sent that fails without the client knowing whether the server received it or not. 1704 func (pgConn *PgConn) Ping(ctx context.Context) error { 1705 return pgConn.Exec(ctx, "-- ping").Close() 1706 } 1707 1708 // makeCommandTag makes a CommandTag. It does not retain a reference to buf or buf's underlying memory. 1709 func (pgConn *PgConn) makeCommandTag(buf []byte) CommandTag { 1710 return CommandTag{s: string(buf)} 1711 } 1712 1713 // enterPotentialWriteReadDeadlock must be called before a write that could deadlock if the server is simultaneously 1714 // blocked writing to us. 1715 func (pgConn *PgConn) enterPotentialWriteReadDeadlock() { 1716 // The time to wait is somewhat arbitrary. A Write should only take as long as the syscall and memcpy to the OS 1717 // outbound network buffer unless the buffer is full (which potentially is a block). It needs to be long enough for 1718 // the normal case, but short enough not to kill performance if a block occurs. 1719 // 1720 // In addition, on Windows the default timer resolution is 15.6ms. So setting the timer to less than that is 1721 // ineffective. 1722 pgConn.slowWriteTimer.Reset(15 * time.Millisecond) 1723 } 1724 1725 // exitPotentialWriteReadDeadlock must be called after a call to enterPotentialWriteReadDeadlock. 1726 func (pgConn *PgConn) exitPotentialWriteReadDeadlock() { 1727 if !pgConn.slowWriteTimer.Reset(time.Duration(math.MaxInt64)) { 1728 pgConn.slowWriteTimer.Stop() 1729 } 1730 } 1731 1732 func (pgConn *PgConn) flushWithPotentialWriteReadDeadlock() error { 1733 pgConn.enterPotentialWriteReadDeadlock() 1734 err := pgConn.frontend.Flush() 1735 pgConn.exitPotentialWriteReadDeadlock() 1736 return err 1737 } 1738 1739 // HijackedConn is the result of hijacking a connection. 1740 // 1741 // Due to the necessary exposure of internal implementation details, it is not covered by the semantic versioning 1742 // compatibility. 1743 type HijackedConn struct { 1744 Conn net.Conn 1745 PID uint32 // backend pid 1746 SecretKey uint32 // key to use to send a cancel query message to the server 1747 ParameterStatuses map[string]string // parameters that have been reported by the server 1748 TxStatus byte 1749 Frontend *pgproto3.Frontend 1750 Config *Config 1751 } 1752 1753 // Hijack extracts the internal connection data. pgConn must be in an idle state. pgConn is unusable after hijacking. 1754 // Hijacking is typically only useful when using pgconn to establish a connection, but taking complete control of the 1755 // raw connection after that (e.g. a load balancer or proxy). 1756 // 1757 // Due to the necessary exposure of internal implementation details, it is not covered by the semantic versioning 1758 // compatibility. 1759 func (pgConn *PgConn) Hijack() (*HijackedConn, error) { 1760 if err := pgConn.lock(); err != nil { 1761 return nil, err 1762 } 1763 pgConn.status = connStatusClosed 1764 1765 return &HijackedConn{ 1766 Conn: pgConn.conn, 1767 PID: pgConn.pid, 1768 SecretKey: pgConn.secretKey, 1769 ParameterStatuses: pgConn.parameterStatuses, 1770 TxStatus: pgConn.txStatus, 1771 Frontend: pgConn.frontend, 1772 Config: pgConn.config, 1773 }, nil 1774 } 1775 1776 // Construct created a PgConn from an already established connection to a PostgreSQL server. This is the inverse of 1777 // PgConn.Hijack. The connection must be in an idle state. 1778 // 1779 // Due to the necessary exposure of internal implementation details, it is not covered by the semantic versioning 1780 // compatibility. 1781 func Construct(hc *HijackedConn) (*PgConn, error) { 1782 pgConn := &PgConn{ 1783 conn: hc.Conn, 1784 pid: hc.PID, 1785 secretKey: hc.SecretKey, 1786 parameterStatuses: hc.ParameterStatuses, 1787 txStatus: hc.TxStatus, 1788 frontend: hc.Frontend, 1789 config: hc.Config, 1790 1791 status: connStatusIdle, 1792 1793 cleanupDone: make(chan struct{}), 1794 } 1795 1796 pgConn.contextWatcher = newContextWatcher(pgConn.conn) 1797 pgConn.bgReader = bgreader.New(pgConn.conn) 1798 pgConn.slowWriteTimer = time.AfterFunc(time.Duration(math.MaxInt64), pgConn.bgReader.Start) 1799 1800 return pgConn, nil 1801 } 1802 1803 // Pipeline represents a connection in pipeline mode. 1804 // 1805 // SendPrepare, SendQueryParams, and SendQueryPrepared queue requests to the server. These requests are not written until 1806 // pipeline is flushed by Flush or Sync. Sync must be called after the last request is queued. Requests between 1807 // synchronization points are implicitly transactional unless explicit transaction control statements have been issued. 1808 // 1809 // The context the pipeline was started with is in effect for the entire life of the Pipeline. 1810 // 1811 // For a deeper understanding of pipeline mode see the PostgreSQL documentation for the extended query protocol 1812 // (https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-FLOW-EXT-QUERY) and the libpq pipeline mode 1813 // (https://www.postgresql.org/docs/current/libpq-pipeline-mode.html). 1814 type Pipeline struct { 1815 conn *PgConn 1816 ctx context.Context 1817 1818 expectedReadyForQueryCount int 1819 pendingSync bool 1820 1821 err error 1822 closed bool 1823 } 1824 1825 // PipelineSync is returned by GetResults when a ReadyForQuery message is received. 1826 type PipelineSync struct{} 1827 1828 // CloseComplete is returned by GetResults when a CloseComplete message is received. 1829 type CloseComplete struct{} 1830 1831 // StartPipeline switches the connection to pipeline mode and returns a *Pipeline. In pipeline mode requests can be sent 1832 // to the server without waiting for a response. Close must be called on the returned *Pipeline to return the connection 1833 // to normal mode. While in pipeline mode, no methods that communicate with the server may be called except 1834 // CancelRequest and Close. ctx is in effect for entire life of the *Pipeline. 1835 // 1836 // Prefer ExecBatch when only sending one group of queries at once. 1837 func (pgConn *PgConn) StartPipeline(ctx context.Context) *Pipeline { 1838 if err := pgConn.lock(); err != nil { 1839 return &Pipeline{ 1840 closed: true, 1841 err: err, 1842 } 1843 } 1844 1845 pgConn.pipeline = Pipeline{ 1846 conn: pgConn, 1847 ctx: ctx, 1848 } 1849 pipeline := &pgConn.pipeline 1850 1851 if ctx != context.Background() { 1852 select { 1853 case <-ctx.Done(): 1854 pipeline.closed = true 1855 pipeline.err = newContextAlreadyDoneError(ctx) 1856 pgConn.unlock() 1857 return pipeline 1858 default: 1859 } 1860 pgConn.contextWatcher.Watch(ctx) 1861 } 1862 1863 return pipeline 1864 } 1865 1866 // SendPrepare is the pipeline version of *PgConn.Prepare. 1867 func (p *Pipeline) SendPrepare(name, sql string, paramOIDs []uint32) { 1868 if p.closed { 1869 return 1870 } 1871 p.pendingSync = true 1872 1873 p.conn.frontend.SendParse(&pgproto3.Parse{Name: name, Query: sql, ParameterOIDs: paramOIDs}) 1874 p.conn.frontend.SendDescribe(&pgproto3.Describe{ObjectType: 'S', Name: name}) 1875 } 1876 1877 // SendDeallocate deallocates a prepared statement. 1878 func (p *Pipeline) SendDeallocate(name string) { 1879 if p.closed { 1880 return 1881 } 1882 p.pendingSync = true 1883 1884 p.conn.frontend.SendClose(&pgproto3.Close{ObjectType: 'S', Name: name}) 1885 } 1886 1887 // SendQueryParams is the pipeline version of *PgConn.QueryParams. 1888 func (p *Pipeline) SendQueryParams(sql string, paramValues [][]byte, paramOIDs []uint32, paramFormats []int16, resultFormats []int16) { 1889 if p.closed { 1890 return 1891 } 1892 p.pendingSync = true 1893 1894 p.conn.frontend.SendParse(&pgproto3.Parse{Query: sql, ParameterOIDs: paramOIDs}) 1895 p.conn.frontend.SendBind(&pgproto3.Bind{ParameterFormatCodes: paramFormats, Parameters: paramValues, ResultFormatCodes: resultFormats}) 1896 p.conn.frontend.SendDescribe(&pgproto3.Describe{ObjectType: 'P'}) 1897 p.conn.frontend.SendExecute(&pgproto3.Execute{}) 1898 } 1899 1900 // SendQueryPrepared is the pipeline version of *PgConn.QueryPrepared. 1901 func (p *Pipeline) SendQueryPrepared(stmtName string, paramValues [][]byte, paramFormats []int16, resultFormats []int16) { 1902 if p.closed { 1903 return 1904 } 1905 p.pendingSync = true 1906 1907 p.conn.frontend.SendBind(&pgproto3.Bind{PreparedStatement: stmtName, ParameterFormatCodes: paramFormats, Parameters: paramValues, ResultFormatCodes: resultFormats}) 1908 p.conn.frontend.SendDescribe(&pgproto3.Describe{ObjectType: 'P'}) 1909 p.conn.frontend.SendExecute(&pgproto3.Execute{}) 1910 } 1911 1912 // Flush flushes the queued requests without establishing a synchronization point. 1913 func (p *Pipeline) Flush() error { 1914 if p.closed { 1915 if p.err != nil { 1916 return p.err 1917 } 1918 return errors.New("pipeline closed") 1919 } 1920 1921 err := p.conn.flushWithPotentialWriteReadDeadlock() 1922 if err != nil { 1923 err = normalizeTimeoutError(p.ctx, err) 1924 1925 p.conn.asyncClose() 1926 1927 p.conn.contextWatcher.Unwatch() 1928 p.conn.unlock() 1929 p.closed = true 1930 p.err = err 1931 return err 1932 } 1933 1934 return nil 1935 } 1936 1937 // Sync establishes a synchronization point and flushes the queued requests. 1938 func (p *Pipeline) Sync() error { 1939 p.conn.frontend.SendSync(&pgproto3.Sync{}) 1940 err := p.Flush() 1941 if err != nil { 1942 return err 1943 } 1944 1945 p.pendingSync = false 1946 p.expectedReadyForQueryCount++ 1947 1948 return nil 1949 } 1950 1951 // GetResults gets the next results. If results are present, results may be a *ResultReader, *StatementDescription, or 1952 // *PipelineSync. If an ErrorResponse is received from the server, results will be nil and err will be a *PgError. If no 1953 // results are available, results and err will both be nil. 1954 func (p *Pipeline) GetResults() (results any, err error) { 1955 if p.expectedReadyForQueryCount == 0 { 1956 return nil, nil 1957 } 1958 1959 for { 1960 msg, err := p.conn.receiveMessage() 1961 if err != nil { 1962 return nil, err 1963 } 1964 1965 switch msg := msg.(type) { 1966 case *pgproto3.RowDescription: 1967 p.conn.resultReader = ResultReader{ 1968 pgConn: p.conn, 1969 pipeline: p, 1970 ctx: p.ctx, 1971 fieldDescriptions: p.conn.convertRowDescription(p.conn.fieldDescriptions[:], msg), 1972 } 1973 return &p.conn.resultReader, nil 1974 case *pgproto3.CommandComplete: 1975 p.conn.resultReader = ResultReader{ 1976 commandTag: p.conn.makeCommandTag(msg.CommandTag), 1977 commandConcluded: true, 1978 closed: true, 1979 } 1980 return &p.conn.resultReader, nil 1981 case *pgproto3.ParseComplete: 1982 peekedMsg, err := p.conn.peekMessage() 1983 if err != nil { 1984 return nil, err 1985 } 1986 if _, ok := peekedMsg.(*pgproto3.ParameterDescription); ok { 1987 return p.getResultsPrepare() 1988 } 1989 case *pgproto3.CloseComplete: 1990 return &CloseComplete{}, nil 1991 case *pgproto3.ReadyForQuery: 1992 p.expectedReadyForQueryCount-- 1993 return &PipelineSync{}, nil 1994 case *pgproto3.ErrorResponse: 1995 pgErr := ErrorResponseToPgError(msg) 1996 return nil, pgErr 1997 } 1998 1999 } 2000 2001 } 2002 2003 func (p *Pipeline) getResultsPrepare() (*StatementDescription, error) { 2004 psd := &StatementDescription{} 2005 2006 for { 2007 msg, err := p.conn.receiveMessage() 2008 if err != nil { 2009 p.conn.asyncClose() 2010 return nil, normalizeTimeoutError(p.ctx, err) 2011 } 2012 2013 switch msg := msg.(type) { 2014 case *pgproto3.ParameterDescription: 2015 psd.ParamOIDs = make([]uint32, len(msg.ParameterOIDs)) 2016 copy(psd.ParamOIDs, msg.ParameterOIDs) 2017 case *pgproto3.RowDescription: 2018 psd.Fields = p.conn.convertRowDescription(nil, msg) 2019 return psd, nil 2020 2021 // NoData is returned instead of RowDescription when there is no expected result. e.g. An INSERT without a RETURNING 2022 // clause. 2023 case *pgproto3.NoData: 2024 return psd, nil 2025 2026 // These should never happen here. But don't take chances that could lead to a deadlock. 2027 case *pgproto3.ErrorResponse: 2028 pgErr := ErrorResponseToPgError(msg) 2029 return nil, pgErr 2030 case *pgproto3.CommandComplete: 2031 p.conn.asyncClose() 2032 return nil, errors.New("BUG: received CommandComplete while handling Describe") 2033 case *pgproto3.ReadyForQuery: 2034 p.conn.asyncClose() 2035 return nil, errors.New("BUG: received ReadyForQuery while handling Describe") 2036 } 2037 } 2038 } 2039 2040 // Close closes the pipeline and returns the connection to normal mode. 2041 func (p *Pipeline) Close() error { 2042 if p.closed { 2043 return p.err 2044 } 2045 p.closed = true 2046 2047 if p.pendingSync { 2048 p.conn.asyncClose() 2049 p.err = errors.New("pipeline has unsynced requests") 2050 p.conn.contextWatcher.Unwatch() 2051 p.conn.unlock() 2052 2053 return p.err 2054 } 2055 2056 for p.expectedReadyForQueryCount > 0 { 2057 _, err := p.GetResults() 2058 if err != nil { 2059 p.err = err 2060 var pgErr *PgError 2061 if !errors.As(err, &pgErr) { 2062 p.conn.asyncClose() 2063 break 2064 } 2065 } 2066 } 2067 2068 p.conn.contextWatcher.Unwatch() 2069 p.conn.unlock() 2070 2071 return p.err 2072 }