gtsocial-umbx

Unnamed repository; edit this file 'description' to name the repository.
Log | Files | Refs | README | LICENSE

pgconn.go (65556B)


      1 package pgconn
      2 
      3 import (
      4 	"context"
      5 	"crypto/md5"
      6 	"crypto/tls"
      7 	"encoding/binary"
      8 	"encoding/hex"
      9 	"errors"
     10 	"fmt"
     11 	"io"
     12 	"math"
     13 	"net"
     14 	"strconv"
     15 	"strings"
     16 	"sync"
     17 	"time"
     18 
     19 	"github.com/jackc/pgx/v5/internal/iobufpool"
     20 	"github.com/jackc/pgx/v5/internal/pgio"
     21 	"github.com/jackc/pgx/v5/pgconn/internal/bgreader"
     22 	"github.com/jackc/pgx/v5/pgconn/internal/ctxwatch"
     23 	"github.com/jackc/pgx/v5/pgproto3"
     24 )
     25 
     26 const (
     27 	connStatusUninitialized = iota
     28 	connStatusConnecting
     29 	connStatusClosed
     30 	connStatusIdle
     31 	connStatusBusy
     32 )
     33 
     34 // Notice represents a notice response message reported by the PostgreSQL server. Be aware that this is distinct from
     35 // LISTEN/NOTIFY notification.
     36 type Notice PgError
     37 
     38 // Notification is a message received from the PostgreSQL LISTEN/NOTIFY system
     39 type Notification struct {
     40 	PID     uint32 // backend pid that sent the notification
     41 	Channel string // channel from which notification was received
     42 	Payload string
     43 }
     44 
     45 // DialFunc is a function that can be used to connect to a PostgreSQL server.
     46 type DialFunc func(ctx context.Context, network, addr string) (net.Conn, error)
     47 
     48 // LookupFunc is a function that can be used to lookup IPs addrs from host. Optionally an ip:port combination can be
     49 // returned in order to override the connection string's port.
     50 type LookupFunc func(ctx context.Context, host string) (addrs []string, err error)
     51 
     52 // BuildFrontendFunc is a function that can be used to create Frontend implementation for connection.
     53 type BuildFrontendFunc func(r io.Reader, w io.Writer) *pgproto3.Frontend
     54 
     55 // NoticeHandler is a function that can handle notices received from the PostgreSQL server. Notices can be received at
     56 // any time, usually during handling of a query response. The *PgConn is provided so the handler is aware of the origin
     57 // of the notice, but it must not invoke any query method. Be aware that this is distinct from LISTEN/NOTIFY
     58 // notification.
     59 type NoticeHandler func(*PgConn, *Notice)
     60 
     61 // NotificationHandler is a function that can handle notifications received from the PostgreSQL server. Notifications
     62 // can be received at any time, usually during handling of a query response. The *PgConn is provided so the handler is
     63 // aware of the origin of the notice, but it must not invoke any query method. Be aware that this is distinct from a
     64 // notice event.
     65 type NotificationHandler func(*PgConn, *Notification)
     66 
     67 // PgConn is a low-level PostgreSQL connection handle. It is not safe for concurrent usage.
     68 type PgConn struct {
     69 	conn              net.Conn
     70 	pid               uint32            // backend pid
     71 	secretKey         uint32            // key to use to send a cancel query message to the server
     72 	parameterStatuses map[string]string // parameters that have been reported by the server
     73 	txStatus          byte
     74 	frontend          *pgproto3.Frontend
     75 	bgReader          *bgreader.BGReader
     76 	slowWriteTimer    *time.Timer
     77 
     78 	config *Config
     79 
     80 	status byte // One of connStatus* constants
     81 
     82 	bufferingReceive    bool
     83 	bufferingReceiveMux sync.Mutex
     84 	bufferingReceiveMsg pgproto3.BackendMessage
     85 	bufferingReceiveErr error
     86 
     87 	peekedMsg pgproto3.BackendMessage
     88 
     89 	// Reusable / preallocated resources
     90 	resultReader      ResultReader
     91 	multiResultReader MultiResultReader
     92 	pipeline          Pipeline
     93 	contextWatcher    *ctxwatch.ContextWatcher
     94 	fieldDescriptions [16]FieldDescription
     95 
     96 	cleanupDone chan struct{}
     97 }
     98 
     99 // Connect establishes a connection to a PostgreSQL server using the environment and connString (in URL or DSN format)
    100 // to provide configuration. See documentation for ParseConfig for details. ctx can be used to cancel a connect attempt.
    101 func Connect(ctx context.Context, connString string) (*PgConn, error) {
    102 	config, err := ParseConfig(connString)
    103 	if err != nil {
    104 		return nil, err
    105 	}
    106 
    107 	return ConnectConfig(ctx, config)
    108 }
    109 
    110 // Connect establishes a connection to a PostgreSQL server using the environment and connString (in URL or DSN format)
    111 // and ParseConfigOptions to provide additional configuration. See documentation for ParseConfig for details. ctx can be
    112 // used to cancel a connect attempt.
    113 func ConnectWithOptions(ctx context.Context, connString string, parseConfigOptions ParseConfigOptions) (*PgConn, error) {
    114 	config, err := ParseConfigWithOptions(connString, parseConfigOptions)
    115 	if err != nil {
    116 		return nil, err
    117 	}
    118 
    119 	return ConnectConfig(ctx, config)
    120 }
    121 
    122 // Connect establishes a connection to a PostgreSQL server using config. config must have been constructed with
    123 // ParseConfig. ctx can be used to cancel a connect attempt.
    124 //
    125 // If config.Fallbacks are present they will sequentially be tried in case of error establishing network connection. An
    126 // authentication error will terminate the chain of attempts (like libpq:
    127 // https://www.postgresql.org/docs/11/libpq-connect.html#LIBPQ-MULTIPLE-HOSTS) and be returned as the error. Otherwise,
    128 // if all attempts fail the last error is returned.
    129 func ConnectConfig(octx context.Context, config *Config) (pgConn *PgConn, err error) {
    130 	// Default values are set in ParseConfig. Enforce initial creation by ParseConfig rather than setting defaults from
    131 	// zero values.
    132 	if !config.createdByParseConfig {
    133 		panic("config must be created by ParseConfig")
    134 	}
    135 
    136 	// Simplify usage by treating primary config and fallbacks the same.
    137 	fallbackConfigs := []*FallbackConfig{
    138 		{
    139 			Host:      config.Host,
    140 			Port:      config.Port,
    141 			TLSConfig: config.TLSConfig,
    142 		},
    143 	}
    144 	fallbackConfigs = append(fallbackConfigs, config.Fallbacks...)
    145 	ctx := octx
    146 	fallbackConfigs, err = expandWithIPs(ctx, config.LookupFunc, fallbackConfigs)
    147 	if err != nil {
    148 		return nil, &connectError{config: config, msg: "hostname resolving error", err: err}
    149 	}
    150 
    151 	if len(fallbackConfigs) == 0 {
    152 		return nil, &connectError{config: config, msg: "hostname resolving error", err: errors.New("ip addr wasn't found")}
    153 	}
    154 
    155 	foundBestServer := false
    156 	var fallbackConfig *FallbackConfig
    157 	for _, fc := range fallbackConfigs {
    158 		// ConnectTimeout restricts the whole connection process.
    159 		if config.ConnectTimeout != 0 {
    160 			var cancel context.CancelFunc
    161 			ctx, cancel = context.WithTimeout(octx, config.ConnectTimeout)
    162 			defer cancel()
    163 		} else {
    164 			ctx = octx
    165 		}
    166 		pgConn, err = connect(ctx, config, fc, false)
    167 		if err == nil {
    168 			foundBestServer = true
    169 			break
    170 		} else if pgerr, ok := err.(*PgError); ok {
    171 			err = &connectError{config: config, msg: "server error", err: pgerr}
    172 			const ERRCODE_INVALID_PASSWORD = "28P01"                    // wrong password
    173 			const ERRCODE_INVALID_AUTHORIZATION_SPECIFICATION = "28000" // wrong password or bad pg_hba.conf settings
    174 			const ERRCODE_INVALID_CATALOG_NAME = "3D000"                // db does not exist
    175 			const ERRCODE_INSUFFICIENT_PRIVILEGE = "42501"              // missing connect privilege
    176 			if pgerr.Code == ERRCODE_INVALID_PASSWORD ||
    177 				pgerr.Code == ERRCODE_INVALID_AUTHORIZATION_SPECIFICATION ||
    178 				pgerr.Code == ERRCODE_INVALID_CATALOG_NAME ||
    179 				pgerr.Code == ERRCODE_INSUFFICIENT_PRIVILEGE {
    180 				break
    181 			}
    182 		} else if cerr, ok := err.(*connectError); ok {
    183 			if _, ok := cerr.err.(*NotPreferredError); ok {
    184 				fallbackConfig = fc
    185 			}
    186 		}
    187 	}
    188 
    189 	if !foundBestServer && fallbackConfig != nil {
    190 		pgConn, err = connect(ctx, config, fallbackConfig, true)
    191 		if pgerr, ok := err.(*PgError); ok {
    192 			err = &connectError{config: config, msg: "server error", err: pgerr}
    193 		}
    194 	}
    195 
    196 	if err != nil {
    197 		return nil, err // no need to wrap in connectError because it will already be wrapped in all cases except PgError
    198 	}
    199 
    200 	if config.AfterConnect != nil {
    201 		err := config.AfterConnect(ctx, pgConn)
    202 		if err != nil {
    203 			pgConn.conn.Close()
    204 			return nil, &connectError{config: config, msg: "AfterConnect error", err: err}
    205 		}
    206 	}
    207 
    208 	return pgConn, nil
    209 }
    210 
    211 func expandWithIPs(ctx context.Context, lookupFn LookupFunc, fallbacks []*FallbackConfig) ([]*FallbackConfig, error) {
    212 	var configs []*FallbackConfig
    213 
    214 	var lookupErrors []error
    215 
    216 	for _, fb := range fallbacks {
    217 		// skip resolve for unix sockets
    218 		if isAbsolutePath(fb.Host) {
    219 			configs = append(configs, &FallbackConfig{
    220 				Host:      fb.Host,
    221 				Port:      fb.Port,
    222 				TLSConfig: fb.TLSConfig,
    223 			})
    224 
    225 			continue
    226 		}
    227 
    228 		ips, err := lookupFn(ctx, fb.Host)
    229 		if err != nil {
    230 			lookupErrors = append(lookupErrors, err)
    231 			continue
    232 		}
    233 
    234 		for _, ip := range ips {
    235 			splitIP, splitPort, err := net.SplitHostPort(ip)
    236 			if err == nil {
    237 				port, err := strconv.ParseUint(splitPort, 10, 16)
    238 				if err != nil {
    239 					return nil, fmt.Errorf("error parsing port (%s) from lookup: %w", splitPort, err)
    240 				}
    241 				configs = append(configs, &FallbackConfig{
    242 					Host:      splitIP,
    243 					Port:      uint16(port),
    244 					TLSConfig: fb.TLSConfig,
    245 				})
    246 			} else {
    247 				configs = append(configs, &FallbackConfig{
    248 					Host:      ip,
    249 					Port:      fb.Port,
    250 					TLSConfig: fb.TLSConfig,
    251 				})
    252 			}
    253 		}
    254 	}
    255 
    256 	// See https://github.com/jackc/pgx/issues/1464. When Go 1.20 can be used in pgx consider using errors.Join so all
    257 	// errors are reported.
    258 	if len(configs) == 0 && len(lookupErrors) > 0 {
    259 		return nil, lookupErrors[0]
    260 	}
    261 
    262 	return configs, nil
    263 }
    264 
    265 func connect(ctx context.Context, config *Config, fallbackConfig *FallbackConfig,
    266 	ignoreNotPreferredErr bool) (*PgConn, error) {
    267 	pgConn := new(PgConn)
    268 	pgConn.config = config
    269 	pgConn.cleanupDone = make(chan struct{})
    270 
    271 	var err error
    272 	network, address := NetworkAddress(fallbackConfig.Host, fallbackConfig.Port)
    273 	netConn, err := config.DialFunc(ctx, network, address)
    274 	if err != nil {
    275 		return nil, &connectError{config: config, msg: "dial error", err: normalizeTimeoutError(ctx, err)}
    276 	}
    277 
    278 	pgConn.conn = netConn
    279 	pgConn.contextWatcher = newContextWatcher(netConn)
    280 	pgConn.contextWatcher.Watch(ctx)
    281 
    282 	if fallbackConfig.TLSConfig != nil {
    283 		nbTLSConn, err := startTLS(netConn, fallbackConfig.TLSConfig)
    284 		pgConn.contextWatcher.Unwatch() // Always unwatch `netConn` after TLS.
    285 		if err != nil {
    286 			netConn.Close()
    287 			return nil, &connectError{config: config, msg: "tls error", err: err}
    288 		}
    289 
    290 		pgConn.conn = nbTLSConn
    291 		pgConn.contextWatcher = newContextWatcher(nbTLSConn)
    292 		pgConn.contextWatcher.Watch(ctx)
    293 	}
    294 
    295 	defer pgConn.contextWatcher.Unwatch()
    296 
    297 	pgConn.parameterStatuses = make(map[string]string)
    298 	pgConn.status = connStatusConnecting
    299 	pgConn.bgReader = bgreader.New(pgConn.conn)
    300 	pgConn.slowWriteTimer = time.AfterFunc(time.Duration(math.MaxInt64), pgConn.bgReader.Start)
    301 	pgConn.frontend = config.BuildFrontend(pgConn.bgReader, pgConn.conn)
    302 
    303 	startupMsg := pgproto3.StartupMessage{
    304 		ProtocolVersion: pgproto3.ProtocolVersionNumber,
    305 		Parameters:      make(map[string]string),
    306 	}
    307 
    308 	// Copy default run-time params
    309 	for k, v := range config.RuntimeParams {
    310 		startupMsg.Parameters[k] = v
    311 	}
    312 
    313 	startupMsg.Parameters["user"] = config.User
    314 	if config.Database != "" {
    315 		startupMsg.Parameters["database"] = config.Database
    316 	}
    317 
    318 	pgConn.frontend.Send(&startupMsg)
    319 	if err := pgConn.flushWithPotentialWriteReadDeadlock(); err != nil {
    320 		pgConn.conn.Close()
    321 		return nil, &connectError{config: config, msg: "failed to write startup message", err: normalizeTimeoutError(ctx, err)}
    322 	}
    323 
    324 	for {
    325 		msg, err := pgConn.receiveMessage()
    326 		if err != nil {
    327 			pgConn.conn.Close()
    328 			if err, ok := err.(*PgError); ok {
    329 				return nil, err
    330 			}
    331 			return nil, &connectError{config: config, msg: "failed to receive message", err: normalizeTimeoutError(ctx, err)}
    332 		}
    333 
    334 		switch msg := msg.(type) {
    335 		case *pgproto3.BackendKeyData:
    336 			pgConn.pid = msg.ProcessID
    337 			pgConn.secretKey = msg.SecretKey
    338 
    339 		case *pgproto3.AuthenticationOk:
    340 		case *pgproto3.AuthenticationCleartextPassword:
    341 			err = pgConn.txPasswordMessage(pgConn.config.Password)
    342 			if err != nil {
    343 				pgConn.conn.Close()
    344 				return nil, &connectError{config: config, msg: "failed to write password message", err: err}
    345 			}
    346 		case *pgproto3.AuthenticationMD5Password:
    347 			digestedPassword := "md5" + hexMD5(hexMD5(pgConn.config.Password+pgConn.config.User)+string(msg.Salt[:]))
    348 			err = pgConn.txPasswordMessage(digestedPassword)
    349 			if err != nil {
    350 				pgConn.conn.Close()
    351 				return nil, &connectError{config: config, msg: "failed to write password message", err: err}
    352 			}
    353 		case *pgproto3.AuthenticationSASL:
    354 			err = pgConn.scramAuth(msg.AuthMechanisms)
    355 			if err != nil {
    356 				pgConn.conn.Close()
    357 				return nil, &connectError{config: config, msg: "failed SASL auth", err: err}
    358 			}
    359 		case *pgproto3.AuthenticationGSS:
    360 			err = pgConn.gssAuth()
    361 			if err != nil {
    362 				pgConn.conn.Close()
    363 				return nil, &connectError{config: config, msg: "failed GSS auth", err: err}
    364 			}
    365 		case *pgproto3.ReadyForQuery:
    366 			pgConn.status = connStatusIdle
    367 			if config.ValidateConnect != nil {
    368 				// ValidateConnect may execute commands that cause the context to be watched again. Unwatch first to avoid
    369 				// the watch already in progress panic. This is that last thing done by this method so there is no need to
    370 				// restart the watch after ValidateConnect returns.
    371 				//
    372 				// See https://github.com/jackc/pgconn/issues/40.
    373 				pgConn.contextWatcher.Unwatch()
    374 
    375 				err := config.ValidateConnect(ctx, pgConn)
    376 				if err != nil {
    377 					if _, ok := err.(*NotPreferredError); ignoreNotPreferredErr && ok {
    378 						return pgConn, nil
    379 					}
    380 					pgConn.conn.Close()
    381 					return nil, &connectError{config: config, msg: "ValidateConnect failed", err: err}
    382 				}
    383 			}
    384 			return pgConn, nil
    385 		case *pgproto3.ParameterStatus, *pgproto3.NoticeResponse:
    386 			// handled by ReceiveMessage
    387 		case *pgproto3.ErrorResponse:
    388 			pgConn.conn.Close()
    389 			return nil, ErrorResponseToPgError(msg)
    390 		default:
    391 			pgConn.conn.Close()
    392 			return nil, &connectError{config: config, msg: "received unexpected message", err: err}
    393 		}
    394 	}
    395 }
    396 
    397 func newContextWatcher(conn net.Conn) *ctxwatch.ContextWatcher {
    398 	return ctxwatch.NewContextWatcher(
    399 		func() { conn.SetDeadline(time.Date(1, 1, 1, 1, 1, 1, 1, time.UTC)) },
    400 		func() { conn.SetDeadline(time.Time{}) },
    401 	)
    402 }
    403 
    404 func startTLS(conn net.Conn, tlsConfig *tls.Config) (net.Conn, error) {
    405 	err := binary.Write(conn, binary.BigEndian, []int32{8, 80877103})
    406 	if err != nil {
    407 		return nil, err
    408 	}
    409 
    410 	response := make([]byte, 1)
    411 	if _, err = io.ReadFull(conn, response); err != nil {
    412 		return nil, err
    413 	}
    414 
    415 	if response[0] != 'S' {
    416 		return nil, errors.New("server refused TLS connection")
    417 	}
    418 
    419 	return tls.Client(conn, tlsConfig), nil
    420 }
    421 
    422 func (pgConn *PgConn) txPasswordMessage(password string) (err error) {
    423 	pgConn.frontend.Send(&pgproto3.PasswordMessage{Password: password})
    424 	return pgConn.flushWithPotentialWriteReadDeadlock()
    425 }
    426 
    427 func hexMD5(s string) string {
    428 	hash := md5.New()
    429 	io.WriteString(hash, s)
    430 	return hex.EncodeToString(hash.Sum(nil))
    431 }
    432 
    433 func (pgConn *PgConn) signalMessage() chan struct{} {
    434 	if pgConn.bufferingReceive {
    435 		panic("BUG: signalMessage when already in progress")
    436 	}
    437 
    438 	pgConn.bufferingReceive = true
    439 	pgConn.bufferingReceiveMux.Lock()
    440 
    441 	ch := make(chan struct{})
    442 	go func() {
    443 		pgConn.bufferingReceiveMsg, pgConn.bufferingReceiveErr = pgConn.frontend.Receive()
    444 		pgConn.bufferingReceiveMux.Unlock()
    445 		close(ch)
    446 	}()
    447 
    448 	return ch
    449 }
    450 
    451 // ReceiveMessage receives one wire protocol message from the PostgreSQL server. It must only be used when the
    452 // connection is not busy. e.g. It is an error to call ReceiveMessage while reading the result of a query. The messages
    453 // are still handled by the core pgconn message handling system so receiving a NotificationResponse will still trigger
    454 // the OnNotification callback.
    455 //
    456 // This is a very low level method that requires deep understanding of the PostgreSQL wire protocol to use correctly.
    457 // See https://www.postgresql.org/docs/current/protocol.html.
    458 func (pgConn *PgConn) ReceiveMessage(ctx context.Context) (pgproto3.BackendMessage, error) {
    459 	if err := pgConn.lock(); err != nil {
    460 		return nil, err
    461 	}
    462 	defer pgConn.unlock()
    463 
    464 	if ctx != context.Background() {
    465 		select {
    466 		case <-ctx.Done():
    467 			return nil, newContextAlreadyDoneError(ctx)
    468 		default:
    469 		}
    470 		pgConn.contextWatcher.Watch(ctx)
    471 		defer pgConn.contextWatcher.Unwatch()
    472 	}
    473 
    474 	msg, err := pgConn.receiveMessage()
    475 	if err != nil {
    476 		err = &pgconnError{
    477 			msg:         "receive message failed",
    478 			err:         normalizeTimeoutError(ctx, err),
    479 			safeToRetry: true}
    480 	}
    481 	return msg, err
    482 }
    483 
    484 // peekMessage peeks at the next message without setting up context cancellation.
    485 func (pgConn *PgConn) peekMessage() (pgproto3.BackendMessage, error) {
    486 	if pgConn.peekedMsg != nil {
    487 		return pgConn.peekedMsg, nil
    488 	}
    489 
    490 	var msg pgproto3.BackendMessage
    491 	var err error
    492 	if pgConn.bufferingReceive {
    493 		pgConn.bufferingReceiveMux.Lock()
    494 		msg = pgConn.bufferingReceiveMsg
    495 		err = pgConn.bufferingReceiveErr
    496 		pgConn.bufferingReceiveMux.Unlock()
    497 		pgConn.bufferingReceive = false
    498 
    499 		// If a timeout error happened in the background try the read again.
    500 		var netErr net.Error
    501 		if errors.As(err, &netErr) && netErr.Timeout() {
    502 			msg, err = pgConn.frontend.Receive()
    503 		}
    504 	} else {
    505 		msg, err = pgConn.frontend.Receive()
    506 	}
    507 
    508 	if err != nil {
    509 		// Close on anything other than timeout error - everything else is fatal
    510 		var netErr net.Error
    511 		isNetErr := errors.As(err, &netErr)
    512 		if !(isNetErr && netErr.Timeout()) {
    513 			pgConn.asyncClose()
    514 		}
    515 
    516 		return nil, err
    517 	}
    518 
    519 	pgConn.peekedMsg = msg
    520 	return msg, nil
    521 }
    522 
    523 // receiveMessage receives a message without setting up context cancellation
    524 func (pgConn *PgConn) receiveMessage() (pgproto3.BackendMessage, error) {
    525 	msg, err := pgConn.peekMessage()
    526 	if err != nil {
    527 		return nil, err
    528 	}
    529 	pgConn.peekedMsg = nil
    530 
    531 	switch msg := msg.(type) {
    532 	case *pgproto3.ReadyForQuery:
    533 		pgConn.txStatus = msg.TxStatus
    534 	case *pgproto3.ParameterStatus:
    535 		pgConn.parameterStatuses[msg.Name] = msg.Value
    536 	case *pgproto3.ErrorResponse:
    537 		if msg.Severity == "FATAL" {
    538 			pgConn.status = connStatusClosed
    539 			pgConn.conn.Close() // Ignore error as the connection is already broken and there is already an error to return.
    540 			close(pgConn.cleanupDone)
    541 			return nil, ErrorResponseToPgError(msg)
    542 		}
    543 	case *pgproto3.NoticeResponse:
    544 		if pgConn.config.OnNotice != nil {
    545 			pgConn.config.OnNotice(pgConn, noticeResponseToNotice(msg))
    546 		}
    547 	case *pgproto3.NotificationResponse:
    548 		if pgConn.config.OnNotification != nil {
    549 			pgConn.config.OnNotification(pgConn, &Notification{PID: msg.PID, Channel: msg.Channel, Payload: msg.Payload})
    550 		}
    551 	}
    552 
    553 	return msg, nil
    554 }
    555 
    556 // Conn returns the underlying net.Conn. This rarely necessary.
    557 func (pgConn *PgConn) Conn() net.Conn {
    558 	return pgConn.conn
    559 }
    560 
    561 // PID returns the backend PID.
    562 func (pgConn *PgConn) PID() uint32 {
    563 	return pgConn.pid
    564 }
    565 
    566 // TxStatus returns the current TxStatus as reported by the server in the ReadyForQuery message.
    567 //
    568 // Possible return values:
    569 //
    570 //	'I' - idle / not in transaction
    571 //	'T' - in a transaction
    572 //	'E' - in a failed transaction
    573 //
    574 // See https://www.postgresql.org/docs/current/protocol-message-formats.html.
    575 func (pgConn *PgConn) TxStatus() byte {
    576 	return pgConn.txStatus
    577 }
    578 
    579 // SecretKey returns the backend secret key used to send a cancel query message to the server.
    580 func (pgConn *PgConn) SecretKey() uint32 {
    581 	return pgConn.secretKey
    582 }
    583 
    584 // Frontend returns the underlying *pgproto3.Frontend. This rarely necessary.
    585 func (pgConn *PgConn) Frontend() *pgproto3.Frontend {
    586 	return pgConn.frontend
    587 }
    588 
    589 // Close closes a connection. It is safe to call Close on a already closed connection. Close attempts a clean close by
    590 // sending the exit message to PostgreSQL. However, this could block so ctx is available to limit the time to wait. The
    591 // underlying net.Conn.Close() will always be called regardless of any other errors.
    592 func (pgConn *PgConn) Close(ctx context.Context) error {
    593 	if pgConn.status == connStatusClosed {
    594 		return nil
    595 	}
    596 	pgConn.status = connStatusClosed
    597 
    598 	defer close(pgConn.cleanupDone)
    599 	defer pgConn.conn.Close()
    600 
    601 	if ctx != context.Background() {
    602 		// Close may be called while a cancellable query is in progress. This will most often be triggered by panic when
    603 		// a defer closes the connection (possibly indirectly via a transaction or a connection pool). Unwatch to end any
    604 		// previous watch. It is safe to Unwatch regardless of whether a watch is already is progress.
    605 		//
    606 		// See https://github.com/jackc/pgconn/issues/29
    607 		pgConn.contextWatcher.Unwatch()
    608 
    609 		pgConn.contextWatcher.Watch(ctx)
    610 		defer pgConn.contextWatcher.Unwatch()
    611 	}
    612 
    613 	// Ignore any errors sending Terminate message and waiting for server to close connection.
    614 	// This mimics the behavior of libpq PQfinish. It calls closePGconn which calls sendTerminateConn which purposefully
    615 	// ignores errors.
    616 	//
    617 	// See https://github.com/jackc/pgx/issues/637
    618 	pgConn.frontend.Send(&pgproto3.Terminate{})
    619 	pgConn.flushWithPotentialWriteReadDeadlock()
    620 
    621 	return pgConn.conn.Close()
    622 }
    623 
    624 // asyncClose marks the connection as closed and asynchronously sends a cancel query message and closes the underlying
    625 // connection.
    626 func (pgConn *PgConn) asyncClose() {
    627 	if pgConn.status == connStatusClosed {
    628 		return
    629 	}
    630 	pgConn.status = connStatusClosed
    631 
    632 	go func() {
    633 		defer close(pgConn.cleanupDone)
    634 		defer pgConn.conn.Close()
    635 
    636 		deadline := time.Now().Add(time.Second * 15)
    637 
    638 		ctx, cancel := context.WithDeadline(context.Background(), deadline)
    639 		defer cancel()
    640 
    641 		pgConn.CancelRequest(ctx)
    642 
    643 		pgConn.conn.SetDeadline(deadline)
    644 
    645 		pgConn.frontend.Send(&pgproto3.Terminate{})
    646 		pgConn.flushWithPotentialWriteReadDeadlock()
    647 	}()
    648 }
    649 
    650 // CleanupDone returns a channel that will be closed after all underlying resources have been cleaned up. A closed
    651 // connection is no longer usable, but underlying resources, in particular the net.Conn, may not have finished closing
    652 // yet. This is because certain errors such as a context cancellation require that the interrupted function call return
    653 // immediately, but the error may also cause the connection to be closed. In these cases the underlying resources are
    654 // closed asynchronously.
    655 //
    656 // This is only likely to be useful to connection pools. It gives them a way avoid establishing a new connection while
    657 // an old connection is still being cleaned up and thereby exceeding the maximum pool size.
    658 func (pgConn *PgConn) CleanupDone() chan (struct{}) {
    659 	return pgConn.cleanupDone
    660 }
    661 
    662 // IsClosed reports if the connection has been closed.
    663 //
    664 // CleanupDone() can be used to determine if all cleanup has been completed.
    665 func (pgConn *PgConn) IsClosed() bool {
    666 	return pgConn.status < connStatusIdle
    667 }
    668 
    669 // IsBusy reports if the connection is busy.
    670 func (pgConn *PgConn) IsBusy() bool {
    671 	return pgConn.status == connStatusBusy
    672 }
    673 
    674 // lock locks the connection.
    675 func (pgConn *PgConn) lock() error {
    676 	switch pgConn.status {
    677 	case connStatusBusy:
    678 		return &connLockError{status: "conn busy"} // This only should be possible in case of an application bug.
    679 	case connStatusClosed:
    680 		return &connLockError{status: "conn closed"}
    681 	case connStatusUninitialized:
    682 		return &connLockError{status: "conn uninitialized"}
    683 	}
    684 	pgConn.status = connStatusBusy
    685 	return nil
    686 }
    687 
    688 func (pgConn *PgConn) unlock() {
    689 	switch pgConn.status {
    690 	case connStatusBusy:
    691 		pgConn.status = connStatusIdle
    692 	case connStatusClosed:
    693 	default:
    694 		panic("BUG: cannot unlock unlocked connection") // This should only be possible if there is a bug in this package.
    695 	}
    696 }
    697 
    698 // ParameterStatus returns the value of a parameter reported by the server (e.g.
    699 // server_version). Returns an empty string for unknown parameters.
    700 func (pgConn *PgConn) ParameterStatus(key string) string {
    701 	return pgConn.parameterStatuses[key]
    702 }
    703 
    704 // CommandTag is the status text returned by PostgreSQL for a query.
    705 type CommandTag struct {
    706 	s string
    707 }
    708 
    709 // NewCommandTag makes a CommandTag from s.
    710 func NewCommandTag(s string) CommandTag {
    711 	return CommandTag{s: s}
    712 }
    713 
    714 // RowsAffected returns the number of rows affected. If the CommandTag was not
    715 // for a row affecting command (e.g. "CREATE TABLE") then it returns 0.
    716 func (ct CommandTag) RowsAffected() int64 {
    717 	// Find last non-digit
    718 	idx := -1
    719 	for i := len(ct.s) - 1; i >= 0; i-- {
    720 		if ct.s[i] >= '0' && ct.s[i] <= '9' {
    721 			idx = i
    722 		} else {
    723 			break
    724 		}
    725 	}
    726 
    727 	if idx == -1 {
    728 		return 0
    729 	}
    730 
    731 	var n int64
    732 	for _, b := range ct.s[idx:] {
    733 		n = n*10 + int64(b-'0')
    734 	}
    735 
    736 	return n
    737 }
    738 
    739 func (ct CommandTag) String() string {
    740 	return ct.s
    741 }
    742 
    743 // Insert is true if the command tag starts with "INSERT".
    744 func (ct CommandTag) Insert() bool {
    745 	return strings.HasPrefix(ct.s, "INSERT")
    746 }
    747 
    748 // Update is true if the command tag starts with "UPDATE".
    749 func (ct CommandTag) Update() bool {
    750 	return strings.HasPrefix(ct.s, "UPDATE")
    751 }
    752 
    753 // Delete is true if the command tag starts with "DELETE".
    754 func (ct CommandTag) Delete() bool {
    755 	return strings.HasPrefix(ct.s, "DELETE")
    756 }
    757 
    758 // Select is true if the command tag starts with "SELECT".
    759 func (ct CommandTag) Select() bool {
    760 	return strings.HasPrefix(ct.s, "SELECT")
    761 }
    762 
    763 type FieldDescription struct {
    764 	Name                 string
    765 	TableOID             uint32
    766 	TableAttributeNumber uint16
    767 	DataTypeOID          uint32
    768 	DataTypeSize         int16
    769 	TypeModifier         int32
    770 	Format               int16
    771 }
    772 
    773 func (pgConn *PgConn) convertRowDescription(dst []FieldDescription, rd *pgproto3.RowDescription) []FieldDescription {
    774 	if cap(dst) >= len(rd.Fields) {
    775 		dst = dst[:len(rd.Fields):len(rd.Fields)]
    776 	} else {
    777 		dst = make([]FieldDescription, len(rd.Fields))
    778 	}
    779 
    780 	for i := range rd.Fields {
    781 		dst[i].Name = string(rd.Fields[i].Name)
    782 		dst[i].TableOID = rd.Fields[i].TableOID
    783 		dst[i].TableAttributeNumber = rd.Fields[i].TableAttributeNumber
    784 		dst[i].DataTypeOID = rd.Fields[i].DataTypeOID
    785 		dst[i].DataTypeSize = rd.Fields[i].DataTypeSize
    786 		dst[i].TypeModifier = rd.Fields[i].TypeModifier
    787 		dst[i].Format = rd.Fields[i].Format
    788 	}
    789 
    790 	return dst
    791 }
    792 
    793 type StatementDescription struct {
    794 	Name      string
    795 	SQL       string
    796 	ParamOIDs []uint32
    797 	Fields    []FieldDescription
    798 }
    799 
    800 // Prepare creates a prepared statement. If the name is empty, the anonymous prepared statement will be used. This
    801 // allows Prepare to also to describe statements without creating a server-side prepared statement.
    802 func (pgConn *PgConn) Prepare(ctx context.Context, name, sql string, paramOIDs []uint32) (*StatementDescription, error) {
    803 	if err := pgConn.lock(); err != nil {
    804 		return nil, err
    805 	}
    806 	defer pgConn.unlock()
    807 
    808 	if ctx != context.Background() {
    809 		select {
    810 		case <-ctx.Done():
    811 			return nil, newContextAlreadyDoneError(ctx)
    812 		default:
    813 		}
    814 		pgConn.contextWatcher.Watch(ctx)
    815 		defer pgConn.contextWatcher.Unwatch()
    816 	}
    817 
    818 	pgConn.frontend.SendParse(&pgproto3.Parse{Name: name, Query: sql, ParameterOIDs: paramOIDs})
    819 	pgConn.frontend.SendDescribe(&pgproto3.Describe{ObjectType: 'S', Name: name})
    820 	pgConn.frontend.SendSync(&pgproto3.Sync{})
    821 	err := pgConn.flushWithPotentialWriteReadDeadlock()
    822 	if err != nil {
    823 		pgConn.asyncClose()
    824 		return nil, err
    825 	}
    826 
    827 	psd := &StatementDescription{Name: name, SQL: sql}
    828 
    829 	var parseErr error
    830 
    831 readloop:
    832 	for {
    833 		msg, err := pgConn.receiveMessage()
    834 		if err != nil {
    835 			pgConn.asyncClose()
    836 			return nil, normalizeTimeoutError(ctx, err)
    837 		}
    838 
    839 		switch msg := msg.(type) {
    840 		case *pgproto3.ParameterDescription:
    841 			psd.ParamOIDs = make([]uint32, len(msg.ParameterOIDs))
    842 			copy(psd.ParamOIDs, msg.ParameterOIDs)
    843 		case *pgproto3.RowDescription:
    844 			psd.Fields = pgConn.convertRowDescription(nil, msg)
    845 		case *pgproto3.ErrorResponse:
    846 			parseErr = ErrorResponseToPgError(msg)
    847 		case *pgproto3.ReadyForQuery:
    848 			break readloop
    849 		}
    850 	}
    851 
    852 	if parseErr != nil {
    853 		return nil, parseErr
    854 	}
    855 	return psd, nil
    856 }
    857 
    858 // ErrorResponseToPgError converts a wire protocol error message to a *PgError.
    859 func ErrorResponseToPgError(msg *pgproto3.ErrorResponse) *PgError {
    860 	return &PgError{
    861 		Severity:         msg.Severity,
    862 		Code:             string(msg.Code),
    863 		Message:          string(msg.Message),
    864 		Detail:           string(msg.Detail),
    865 		Hint:             msg.Hint,
    866 		Position:         msg.Position,
    867 		InternalPosition: msg.InternalPosition,
    868 		InternalQuery:    string(msg.InternalQuery),
    869 		Where:            string(msg.Where),
    870 		SchemaName:       string(msg.SchemaName),
    871 		TableName:        string(msg.TableName),
    872 		ColumnName:       string(msg.ColumnName),
    873 		DataTypeName:     string(msg.DataTypeName),
    874 		ConstraintName:   msg.ConstraintName,
    875 		File:             string(msg.File),
    876 		Line:             msg.Line,
    877 		Routine:          string(msg.Routine),
    878 	}
    879 }
    880 
    881 func noticeResponseToNotice(msg *pgproto3.NoticeResponse) *Notice {
    882 	pgerr := ErrorResponseToPgError((*pgproto3.ErrorResponse)(msg))
    883 	return (*Notice)(pgerr)
    884 }
    885 
    886 // CancelRequest sends a cancel request to the PostgreSQL server. It returns an error if unable to deliver the cancel
    887 // request, but lack of an error does not ensure that the query was canceled. As specified in the documentation, there
    888 // is no way to be sure a query was canceled. See https://www.postgresql.org/docs/11/protocol-flow.html#id-1.10.5.7.9
    889 func (pgConn *PgConn) CancelRequest(ctx context.Context) error {
    890 	// Open a cancellation request to the same server. The address is taken from the net.Conn directly instead of reusing
    891 	// the connection config. This is important in high availability configurations where fallback connections may be
    892 	// specified or DNS may be used to load balance.
    893 	serverAddr := pgConn.conn.RemoteAddr()
    894 	var serverNetwork string
    895 	var serverAddress string
    896 	if serverAddr.Network() == "unix" {
    897 		// for unix sockets, RemoteAddr() calls getpeername() which returns the name the
    898 		// server passed to bind(). For Postgres, this is always a relative path "./.s.PGSQL.5432"
    899 		// so connecting to it will fail. Fall back to the config's value
    900 		serverNetwork, serverAddress = NetworkAddress(pgConn.config.Host, pgConn.config.Port)
    901 	} else {
    902 		serverNetwork, serverAddress = serverAddr.Network(), serverAddr.String()
    903 	}
    904 	cancelConn, err := pgConn.config.DialFunc(ctx, serverNetwork, serverAddress)
    905 	if err != nil {
    906 		// In case of unix sockets, RemoteAddr() returns only the file part of the path. If the
    907 		// first connect failed, try the config.
    908 		if serverAddr.Network() != "unix" {
    909 			return err
    910 		}
    911 		serverNetwork, serverAddr := NetworkAddress(pgConn.config.Host, pgConn.config.Port)
    912 		cancelConn, err = pgConn.config.DialFunc(ctx, serverNetwork, serverAddr)
    913 		if err != nil {
    914 			return err
    915 		}
    916 	}
    917 	defer cancelConn.Close()
    918 
    919 	if ctx != context.Background() {
    920 		contextWatcher := ctxwatch.NewContextWatcher(
    921 			func() { cancelConn.SetDeadline(time.Date(1, 1, 1, 1, 1, 1, 1, time.UTC)) },
    922 			func() { cancelConn.SetDeadline(time.Time{}) },
    923 		)
    924 		contextWatcher.Watch(ctx)
    925 		defer contextWatcher.Unwatch()
    926 	}
    927 
    928 	buf := make([]byte, 16)
    929 	binary.BigEndian.PutUint32(buf[0:4], 16)
    930 	binary.BigEndian.PutUint32(buf[4:8], 80877102)
    931 	binary.BigEndian.PutUint32(buf[8:12], uint32(pgConn.pid))
    932 	binary.BigEndian.PutUint32(buf[12:16], uint32(pgConn.secretKey))
    933 	// Postgres will process the request and close the connection
    934 	// so when don't need to read the reply
    935 	// https://www.postgresql.org/docs/current/protocol-flow.html#id-1.10.6.7.10
    936 	_, err = cancelConn.Write(buf)
    937 	return err
    938 }
    939 
    940 // WaitForNotification waits for a LISTON/NOTIFY message to be received. It returns an error if a notification was not
    941 // received.
    942 func (pgConn *PgConn) WaitForNotification(ctx context.Context) error {
    943 	if err := pgConn.lock(); err != nil {
    944 		return err
    945 	}
    946 	defer pgConn.unlock()
    947 
    948 	if ctx != context.Background() {
    949 		select {
    950 		case <-ctx.Done():
    951 			return newContextAlreadyDoneError(ctx)
    952 		default:
    953 		}
    954 
    955 		pgConn.contextWatcher.Watch(ctx)
    956 		defer pgConn.contextWatcher.Unwatch()
    957 	}
    958 
    959 	for {
    960 		msg, err := pgConn.receiveMessage()
    961 		if err != nil {
    962 			return normalizeTimeoutError(ctx, err)
    963 		}
    964 
    965 		switch msg.(type) {
    966 		case *pgproto3.NotificationResponse:
    967 			return nil
    968 		}
    969 	}
    970 }
    971 
    972 // Exec executes SQL via the PostgreSQL simple query protocol. SQL may contain multiple queries. Execution is
    973 // implicitly wrapped in a transaction unless a transaction is already in progress or SQL contains transaction control
    974 // statements.
    975 //
    976 // Prefer ExecParams unless executing arbitrary SQL that may contain multiple queries.
    977 func (pgConn *PgConn) Exec(ctx context.Context, sql string) *MultiResultReader {
    978 	if err := pgConn.lock(); err != nil {
    979 		return &MultiResultReader{
    980 			closed: true,
    981 			err:    err,
    982 		}
    983 	}
    984 
    985 	pgConn.multiResultReader = MultiResultReader{
    986 		pgConn: pgConn,
    987 		ctx:    ctx,
    988 	}
    989 	multiResult := &pgConn.multiResultReader
    990 	if ctx != context.Background() {
    991 		select {
    992 		case <-ctx.Done():
    993 			multiResult.closed = true
    994 			multiResult.err = newContextAlreadyDoneError(ctx)
    995 			pgConn.unlock()
    996 			return multiResult
    997 		default:
    998 		}
    999 		pgConn.contextWatcher.Watch(ctx)
   1000 	}
   1001 
   1002 	pgConn.frontend.SendQuery(&pgproto3.Query{String: sql})
   1003 	err := pgConn.flushWithPotentialWriteReadDeadlock()
   1004 	if err != nil {
   1005 		pgConn.asyncClose()
   1006 		pgConn.contextWatcher.Unwatch()
   1007 		multiResult.closed = true
   1008 		multiResult.err = err
   1009 		pgConn.unlock()
   1010 		return multiResult
   1011 	}
   1012 
   1013 	return multiResult
   1014 }
   1015 
   1016 // ExecParams executes a command via the PostgreSQL extended query protocol.
   1017 //
   1018 // sql is a SQL command string. It may only contain one query. Parameter substitution is positional using $1, $2, $3,
   1019 // etc.
   1020 //
   1021 // paramValues are the parameter values. It must be encoded in the format given by paramFormats.
   1022 //
   1023 // paramOIDs is a slice of data type OIDs for paramValues. If paramOIDs is nil, the server will infer the data type for
   1024 // all parameters. Any paramOID element that is 0 that will cause the server to infer the data type for that parameter.
   1025 // ExecParams will panic if len(paramOIDs) is not 0, 1, or len(paramValues).
   1026 //
   1027 // paramFormats is a slice of format codes determining for each paramValue column whether it is encoded in text or
   1028 // binary format. If paramFormats is nil all params are text format. ExecParams will panic if
   1029 // len(paramFormats) is not 0, 1, or len(paramValues).
   1030 //
   1031 // resultFormats is a slice of format codes determining for each result column whether it is encoded in text or
   1032 // binary format. If resultFormats is nil all results will be in text format.
   1033 //
   1034 // ResultReader must be closed before PgConn can be used again.
   1035 func (pgConn *PgConn) ExecParams(ctx context.Context, sql string, paramValues [][]byte, paramOIDs []uint32, paramFormats []int16, resultFormats []int16) *ResultReader {
   1036 	result := pgConn.execExtendedPrefix(ctx, paramValues)
   1037 	if result.closed {
   1038 		return result
   1039 	}
   1040 
   1041 	pgConn.frontend.SendParse(&pgproto3.Parse{Query: sql, ParameterOIDs: paramOIDs})
   1042 	pgConn.frontend.SendBind(&pgproto3.Bind{ParameterFormatCodes: paramFormats, Parameters: paramValues, ResultFormatCodes: resultFormats})
   1043 
   1044 	pgConn.execExtendedSuffix(result)
   1045 
   1046 	return result
   1047 }
   1048 
   1049 // ExecPrepared enqueues the execution of a prepared statement via the PostgreSQL extended query protocol.
   1050 //
   1051 // paramValues are the parameter values. It must be encoded in the format given by paramFormats.
   1052 //
   1053 // paramFormats is a slice of format codes determining for each paramValue column whether it is encoded in text or
   1054 // binary format. If paramFormats is nil all params are text format. ExecPrepared will panic if
   1055 // len(paramFormats) is not 0, 1, or len(paramValues).
   1056 //
   1057 // resultFormats is a slice of format codes determining for each result column whether it is encoded in text or
   1058 // binary format. If resultFormats is nil all results will be in text format.
   1059 //
   1060 // ResultReader must be closed before PgConn can be used again.
   1061 func (pgConn *PgConn) ExecPrepared(ctx context.Context, stmtName string, paramValues [][]byte, paramFormats []int16, resultFormats []int16) *ResultReader {
   1062 	result := pgConn.execExtendedPrefix(ctx, paramValues)
   1063 	if result.closed {
   1064 		return result
   1065 	}
   1066 
   1067 	pgConn.frontend.SendBind(&pgproto3.Bind{PreparedStatement: stmtName, ParameterFormatCodes: paramFormats, Parameters: paramValues, ResultFormatCodes: resultFormats})
   1068 
   1069 	pgConn.execExtendedSuffix(result)
   1070 
   1071 	return result
   1072 }
   1073 
   1074 func (pgConn *PgConn) execExtendedPrefix(ctx context.Context, paramValues [][]byte) *ResultReader {
   1075 	pgConn.resultReader = ResultReader{
   1076 		pgConn: pgConn,
   1077 		ctx:    ctx,
   1078 	}
   1079 	result := &pgConn.resultReader
   1080 
   1081 	if err := pgConn.lock(); err != nil {
   1082 		result.concludeCommand(CommandTag{}, err)
   1083 		result.closed = true
   1084 		return result
   1085 	}
   1086 
   1087 	if len(paramValues) > math.MaxUint16 {
   1088 		result.concludeCommand(CommandTag{}, fmt.Errorf("extended protocol limited to %v parameters", math.MaxUint16))
   1089 		result.closed = true
   1090 		pgConn.unlock()
   1091 		return result
   1092 	}
   1093 
   1094 	if ctx != context.Background() {
   1095 		select {
   1096 		case <-ctx.Done():
   1097 			result.concludeCommand(CommandTag{}, newContextAlreadyDoneError(ctx))
   1098 			result.closed = true
   1099 			pgConn.unlock()
   1100 			return result
   1101 		default:
   1102 		}
   1103 		pgConn.contextWatcher.Watch(ctx)
   1104 	}
   1105 
   1106 	return result
   1107 }
   1108 
   1109 func (pgConn *PgConn) execExtendedSuffix(result *ResultReader) {
   1110 	pgConn.frontend.SendDescribe(&pgproto3.Describe{ObjectType: 'P'})
   1111 	pgConn.frontend.SendExecute(&pgproto3.Execute{})
   1112 	pgConn.frontend.SendSync(&pgproto3.Sync{})
   1113 
   1114 	err := pgConn.flushWithPotentialWriteReadDeadlock()
   1115 	if err != nil {
   1116 		pgConn.asyncClose()
   1117 		result.concludeCommand(CommandTag{}, err)
   1118 		pgConn.contextWatcher.Unwatch()
   1119 		result.closed = true
   1120 		pgConn.unlock()
   1121 		return
   1122 	}
   1123 
   1124 	result.readUntilRowDescription()
   1125 }
   1126 
   1127 // CopyTo executes the copy command sql and copies the results to w.
   1128 func (pgConn *PgConn) CopyTo(ctx context.Context, w io.Writer, sql string) (CommandTag, error) {
   1129 	if err := pgConn.lock(); err != nil {
   1130 		return CommandTag{}, err
   1131 	}
   1132 
   1133 	if ctx != context.Background() {
   1134 		select {
   1135 		case <-ctx.Done():
   1136 			pgConn.unlock()
   1137 			return CommandTag{}, newContextAlreadyDoneError(ctx)
   1138 		default:
   1139 		}
   1140 		pgConn.contextWatcher.Watch(ctx)
   1141 		defer pgConn.contextWatcher.Unwatch()
   1142 	}
   1143 
   1144 	// Send copy to command
   1145 	pgConn.frontend.SendQuery(&pgproto3.Query{String: sql})
   1146 
   1147 	err := pgConn.flushWithPotentialWriteReadDeadlock()
   1148 	if err != nil {
   1149 		pgConn.asyncClose()
   1150 		pgConn.unlock()
   1151 		return CommandTag{}, err
   1152 	}
   1153 
   1154 	// Read results
   1155 	var commandTag CommandTag
   1156 	var pgErr error
   1157 	for {
   1158 		msg, err := pgConn.receiveMessage()
   1159 		if err != nil {
   1160 			pgConn.asyncClose()
   1161 			return CommandTag{}, normalizeTimeoutError(ctx, err)
   1162 		}
   1163 
   1164 		switch msg := msg.(type) {
   1165 		case *pgproto3.CopyDone:
   1166 		case *pgproto3.CopyData:
   1167 			_, err := w.Write(msg.Data)
   1168 			if err != nil {
   1169 				pgConn.asyncClose()
   1170 				return CommandTag{}, err
   1171 			}
   1172 		case *pgproto3.ReadyForQuery:
   1173 			pgConn.unlock()
   1174 			return commandTag, pgErr
   1175 		case *pgproto3.CommandComplete:
   1176 			commandTag = pgConn.makeCommandTag(msg.CommandTag)
   1177 		case *pgproto3.ErrorResponse:
   1178 			pgErr = ErrorResponseToPgError(msg)
   1179 		}
   1180 	}
   1181 }
   1182 
   1183 // CopyFrom executes the copy command sql and copies all of r to the PostgreSQL server.
   1184 //
   1185 // Note: context cancellation will only interrupt operations on the underlying PostgreSQL network connection. Reads on r
   1186 // could still block.
   1187 func (pgConn *PgConn) CopyFrom(ctx context.Context, r io.Reader, sql string) (CommandTag, error) {
   1188 	if err := pgConn.lock(); err != nil {
   1189 		return CommandTag{}, err
   1190 	}
   1191 	defer pgConn.unlock()
   1192 
   1193 	if ctx != context.Background() {
   1194 		select {
   1195 		case <-ctx.Done():
   1196 			return CommandTag{}, newContextAlreadyDoneError(ctx)
   1197 		default:
   1198 		}
   1199 		pgConn.contextWatcher.Watch(ctx)
   1200 		defer pgConn.contextWatcher.Unwatch()
   1201 	}
   1202 
   1203 	// Send copy from query
   1204 	pgConn.frontend.SendQuery(&pgproto3.Query{String: sql})
   1205 	err := pgConn.flushWithPotentialWriteReadDeadlock()
   1206 	if err != nil {
   1207 		pgConn.asyncClose()
   1208 		return CommandTag{}, err
   1209 	}
   1210 
   1211 	// Send copy data
   1212 	abortCopyChan := make(chan struct{})
   1213 	copyErrChan := make(chan error, 1)
   1214 	signalMessageChan := pgConn.signalMessage()
   1215 	var wg sync.WaitGroup
   1216 	wg.Add(1)
   1217 
   1218 	go func() {
   1219 		defer wg.Done()
   1220 		buf := iobufpool.Get(65536)
   1221 		defer iobufpool.Put(buf)
   1222 		(*buf)[0] = 'd'
   1223 
   1224 		for {
   1225 			n, readErr := r.Read((*buf)[5:cap(*buf)])
   1226 			if n > 0 {
   1227 				*buf = (*buf)[0 : n+5]
   1228 				pgio.SetInt32((*buf)[1:], int32(n+4))
   1229 
   1230 				writeErr := pgConn.frontend.SendUnbufferedEncodedCopyData(*buf)
   1231 				if writeErr != nil {
   1232 					// Write errors are always fatal, but we can't use asyncClose because we are in a different goroutine. Not
   1233 					// setting pgConn.status or closing pgConn.cleanupDone for the same reason.
   1234 					pgConn.conn.Close()
   1235 
   1236 					copyErrChan <- writeErr
   1237 					return
   1238 				}
   1239 			}
   1240 			if readErr != nil {
   1241 				copyErrChan <- readErr
   1242 				return
   1243 			}
   1244 
   1245 			select {
   1246 			case <-abortCopyChan:
   1247 				return
   1248 			default:
   1249 			}
   1250 		}
   1251 	}()
   1252 
   1253 	var pgErr error
   1254 	var copyErr error
   1255 	for copyErr == nil && pgErr == nil {
   1256 		select {
   1257 		case copyErr = <-copyErrChan:
   1258 		case <-signalMessageChan:
   1259 			// If pgConn.receiveMessage encounters an error it will call pgConn.asyncClose. But that is a race condition with
   1260 			// the goroutine. So instead check pgConn.bufferingReceiveErr which will have been set by the signalMessage. If an
   1261 			// error is found then forcibly close the connection without sending the Terminate message.
   1262 			if err := pgConn.bufferingReceiveErr; err != nil {
   1263 				pgConn.status = connStatusClosed
   1264 				pgConn.conn.Close()
   1265 				close(pgConn.cleanupDone)
   1266 				return CommandTag{}, normalizeTimeoutError(ctx, err)
   1267 			}
   1268 			msg, _ := pgConn.receiveMessage()
   1269 
   1270 			switch msg := msg.(type) {
   1271 			case *pgproto3.ErrorResponse:
   1272 				pgErr = ErrorResponseToPgError(msg)
   1273 			default:
   1274 				signalMessageChan = pgConn.signalMessage()
   1275 			}
   1276 		}
   1277 	}
   1278 	close(abortCopyChan)
   1279 	// Make sure io goroutine finishes before writing.
   1280 	wg.Wait()
   1281 
   1282 	if copyErr == io.EOF || pgErr != nil {
   1283 		pgConn.frontend.Send(&pgproto3.CopyDone{})
   1284 	} else {
   1285 		pgConn.frontend.Send(&pgproto3.CopyFail{Message: copyErr.Error()})
   1286 	}
   1287 	err = pgConn.flushWithPotentialWriteReadDeadlock()
   1288 	if err != nil {
   1289 		pgConn.asyncClose()
   1290 		return CommandTag{}, err
   1291 	}
   1292 
   1293 	// Read results
   1294 	var commandTag CommandTag
   1295 	for {
   1296 		msg, err := pgConn.receiveMessage()
   1297 		if err != nil {
   1298 			pgConn.asyncClose()
   1299 			return CommandTag{}, normalizeTimeoutError(ctx, err)
   1300 		}
   1301 
   1302 		switch msg := msg.(type) {
   1303 		case *pgproto3.ReadyForQuery:
   1304 			return commandTag, pgErr
   1305 		case *pgproto3.CommandComplete:
   1306 			commandTag = pgConn.makeCommandTag(msg.CommandTag)
   1307 		case *pgproto3.ErrorResponse:
   1308 			pgErr = ErrorResponseToPgError(msg)
   1309 		}
   1310 	}
   1311 }
   1312 
   1313 // MultiResultReader is a reader for a command that could return multiple results such as Exec or ExecBatch.
   1314 type MultiResultReader struct {
   1315 	pgConn   *PgConn
   1316 	ctx      context.Context
   1317 	pipeline *Pipeline
   1318 
   1319 	rr *ResultReader
   1320 
   1321 	closed bool
   1322 	err    error
   1323 }
   1324 
   1325 // ReadAll reads all available results. Calling ReadAll is mutually exclusive with all other MultiResultReader methods.
   1326 func (mrr *MultiResultReader) ReadAll() ([]*Result, error) {
   1327 	var results []*Result
   1328 
   1329 	for mrr.NextResult() {
   1330 		results = append(results, mrr.ResultReader().Read())
   1331 	}
   1332 	err := mrr.Close()
   1333 
   1334 	return results, err
   1335 }
   1336 
   1337 func (mrr *MultiResultReader) receiveMessage() (pgproto3.BackendMessage, error) {
   1338 	msg, err := mrr.pgConn.receiveMessage()
   1339 
   1340 	if err != nil {
   1341 		mrr.pgConn.contextWatcher.Unwatch()
   1342 		mrr.err = normalizeTimeoutError(mrr.ctx, err)
   1343 		mrr.closed = true
   1344 		mrr.pgConn.asyncClose()
   1345 		return nil, mrr.err
   1346 	}
   1347 
   1348 	switch msg := msg.(type) {
   1349 	case *pgproto3.ReadyForQuery:
   1350 		mrr.closed = true
   1351 		if mrr.pipeline != nil {
   1352 			mrr.pipeline.expectedReadyForQueryCount--
   1353 		} else {
   1354 			mrr.pgConn.contextWatcher.Unwatch()
   1355 			mrr.pgConn.unlock()
   1356 		}
   1357 	case *pgproto3.ErrorResponse:
   1358 		mrr.err = ErrorResponseToPgError(msg)
   1359 	}
   1360 
   1361 	return msg, nil
   1362 }
   1363 
   1364 // NextResult returns advances the MultiResultReader to the next result and returns true if a result is available.
   1365 func (mrr *MultiResultReader) NextResult() bool {
   1366 	for !mrr.closed && mrr.err == nil {
   1367 		msg, err := mrr.receiveMessage()
   1368 		if err != nil {
   1369 			return false
   1370 		}
   1371 
   1372 		switch msg := msg.(type) {
   1373 		case *pgproto3.RowDescription:
   1374 			mrr.pgConn.resultReader = ResultReader{
   1375 				pgConn:            mrr.pgConn,
   1376 				multiResultReader: mrr,
   1377 				ctx:               mrr.ctx,
   1378 				fieldDescriptions: mrr.pgConn.convertRowDescription(mrr.pgConn.fieldDescriptions[:], msg),
   1379 			}
   1380 
   1381 			mrr.rr = &mrr.pgConn.resultReader
   1382 			return true
   1383 		case *pgproto3.CommandComplete:
   1384 			mrr.pgConn.resultReader = ResultReader{
   1385 				commandTag:       mrr.pgConn.makeCommandTag(msg.CommandTag),
   1386 				commandConcluded: true,
   1387 				closed:           true,
   1388 			}
   1389 			mrr.rr = &mrr.pgConn.resultReader
   1390 			return true
   1391 		case *pgproto3.EmptyQueryResponse:
   1392 			return false
   1393 		}
   1394 	}
   1395 
   1396 	return false
   1397 }
   1398 
   1399 // ResultReader returns the current ResultReader.
   1400 func (mrr *MultiResultReader) ResultReader() *ResultReader {
   1401 	return mrr.rr
   1402 }
   1403 
   1404 // Close closes the MultiResultReader and returns the first error that occurred during the MultiResultReader's use.
   1405 func (mrr *MultiResultReader) Close() error {
   1406 	for !mrr.closed {
   1407 		_, err := mrr.receiveMessage()
   1408 		if err != nil {
   1409 			return mrr.err
   1410 		}
   1411 	}
   1412 
   1413 	return mrr.err
   1414 }
   1415 
   1416 // ResultReader is a reader for the result of a single query.
   1417 type ResultReader struct {
   1418 	pgConn            *PgConn
   1419 	multiResultReader *MultiResultReader
   1420 	pipeline          *Pipeline
   1421 	ctx               context.Context
   1422 
   1423 	fieldDescriptions []FieldDescription
   1424 	rowValues         [][]byte
   1425 	commandTag        CommandTag
   1426 	commandConcluded  bool
   1427 	closed            bool
   1428 	err               error
   1429 }
   1430 
   1431 // Result is the saved query response that is returned by calling Read on a ResultReader.
   1432 type Result struct {
   1433 	FieldDescriptions []FieldDescription
   1434 	Rows              [][][]byte
   1435 	CommandTag        CommandTag
   1436 	Err               error
   1437 }
   1438 
   1439 // Read saves the query response to a Result.
   1440 func (rr *ResultReader) Read() *Result {
   1441 	br := &Result{}
   1442 
   1443 	for rr.NextRow() {
   1444 		if br.FieldDescriptions == nil {
   1445 			br.FieldDescriptions = make([]FieldDescription, len(rr.FieldDescriptions()))
   1446 			copy(br.FieldDescriptions, rr.FieldDescriptions())
   1447 		}
   1448 
   1449 		values := rr.Values()
   1450 		row := make([][]byte, len(values))
   1451 		for i := range row {
   1452 			row[i] = make([]byte, len(values[i]))
   1453 			copy(row[i], values[i])
   1454 		}
   1455 		br.Rows = append(br.Rows, row)
   1456 	}
   1457 
   1458 	br.CommandTag, br.Err = rr.Close()
   1459 
   1460 	return br
   1461 }
   1462 
   1463 // NextRow advances the ResultReader to the next row and returns true if a row is available.
   1464 func (rr *ResultReader) NextRow() bool {
   1465 	for !rr.commandConcluded {
   1466 		msg, err := rr.receiveMessage()
   1467 		if err != nil {
   1468 			return false
   1469 		}
   1470 
   1471 		switch msg := msg.(type) {
   1472 		case *pgproto3.DataRow:
   1473 			rr.rowValues = msg.Values
   1474 			return true
   1475 		}
   1476 	}
   1477 
   1478 	return false
   1479 }
   1480 
   1481 // FieldDescriptions returns the field descriptions for the current result set. The returned slice is only valid until
   1482 // the ResultReader is closed. It may return nil (for example, if the query did not return a result set or an error was
   1483 // encountered.)
   1484 func (rr *ResultReader) FieldDescriptions() []FieldDescription {
   1485 	return rr.fieldDescriptions
   1486 }
   1487 
   1488 // Values returns the current row data. NextRow must have been previously been called. The returned [][]byte is only
   1489 // valid until the next NextRow call or the ResultReader is closed.
   1490 func (rr *ResultReader) Values() [][]byte {
   1491 	return rr.rowValues
   1492 }
   1493 
   1494 // Close consumes any remaining result data and returns the command tag or
   1495 // error.
   1496 func (rr *ResultReader) Close() (CommandTag, error) {
   1497 	if rr.closed {
   1498 		return rr.commandTag, rr.err
   1499 	}
   1500 	rr.closed = true
   1501 
   1502 	for !rr.commandConcluded {
   1503 		_, err := rr.receiveMessage()
   1504 		if err != nil {
   1505 			return CommandTag{}, rr.err
   1506 		}
   1507 	}
   1508 
   1509 	if rr.multiResultReader == nil && rr.pipeline == nil {
   1510 		for {
   1511 			msg, err := rr.receiveMessage()
   1512 			if err != nil {
   1513 				return CommandTag{}, rr.err
   1514 			}
   1515 
   1516 			switch msg := msg.(type) {
   1517 			// Detect a deferred constraint violation where the ErrorResponse is sent after CommandComplete.
   1518 			case *pgproto3.ErrorResponse:
   1519 				rr.err = ErrorResponseToPgError(msg)
   1520 			case *pgproto3.ReadyForQuery:
   1521 				rr.pgConn.contextWatcher.Unwatch()
   1522 				rr.pgConn.unlock()
   1523 				return rr.commandTag, rr.err
   1524 			}
   1525 		}
   1526 	}
   1527 
   1528 	return rr.commandTag, rr.err
   1529 }
   1530 
   1531 // readUntilRowDescription ensures the ResultReader's fieldDescriptions are loaded. It does not return an error as any
   1532 // error will be stored in the ResultReader.
   1533 func (rr *ResultReader) readUntilRowDescription() {
   1534 	for !rr.commandConcluded {
   1535 		// Peek before receive to avoid consuming a DataRow if the result set does not include a RowDescription method.
   1536 		// This should never happen under normal pgconn usage, but it is possible if SendBytes and ReceiveResults are
   1537 		// manually used to construct a query that does not issue a describe statement.
   1538 		msg, _ := rr.pgConn.peekMessage()
   1539 		if _, ok := msg.(*pgproto3.DataRow); ok {
   1540 			return
   1541 		}
   1542 
   1543 		// Consume the message
   1544 		msg, _ = rr.receiveMessage()
   1545 		if _, ok := msg.(*pgproto3.RowDescription); ok {
   1546 			return
   1547 		}
   1548 	}
   1549 }
   1550 
   1551 func (rr *ResultReader) receiveMessage() (msg pgproto3.BackendMessage, err error) {
   1552 	if rr.multiResultReader == nil {
   1553 		msg, err = rr.pgConn.receiveMessage()
   1554 	} else {
   1555 		msg, err = rr.multiResultReader.receiveMessage()
   1556 	}
   1557 
   1558 	if err != nil {
   1559 		err = normalizeTimeoutError(rr.ctx, err)
   1560 		rr.concludeCommand(CommandTag{}, err)
   1561 		rr.pgConn.contextWatcher.Unwatch()
   1562 		rr.closed = true
   1563 		if rr.multiResultReader == nil {
   1564 			rr.pgConn.asyncClose()
   1565 		}
   1566 
   1567 		return nil, rr.err
   1568 	}
   1569 
   1570 	switch msg := msg.(type) {
   1571 	case *pgproto3.RowDescription:
   1572 		rr.fieldDescriptions = rr.pgConn.convertRowDescription(rr.pgConn.fieldDescriptions[:], msg)
   1573 	case *pgproto3.CommandComplete:
   1574 		rr.concludeCommand(rr.pgConn.makeCommandTag(msg.CommandTag), nil)
   1575 	case *pgproto3.EmptyQueryResponse:
   1576 		rr.concludeCommand(CommandTag{}, nil)
   1577 	case *pgproto3.ErrorResponse:
   1578 		rr.concludeCommand(CommandTag{}, ErrorResponseToPgError(msg))
   1579 	}
   1580 
   1581 	return msg, nil
   1582 }
   1583 
   1584 func (rr *ResultReader) concludeCommand(commandTag CommandTag, err error) {
   1585 	// Keep the first error that is recorded. Store the error before checking if the command is already concluded to
   1586 	// allow for receiving an error after CommandComplete but before ReadyForQuery.
   1587 	if err != nil && rr.err == nil {
   1588 		rr.err = err
   1589 	}
   1590 
   1591 	if rr.commandConcluded {
   1592 		return
   1593 	}
   1594 
   1595 	rr.commandTag = commandTag
   1596 	rr.rowValues = nil
   1597 	rr.commandConcluded = true
   1598 }
   1599 
   1600 // Batch is a collection of queries that can be sent to the PostgreSQL server in a single round-trip.
   1601 type Batch struct {
   1602 	buf []byte
   1603 }
   1604 
   1605 // ExecParams appends an ExecParams command to the batch. See PgConn.ExecParams for parameter descriptions.
   1606 func (batch *Batch) ExecParams(sql string, paramValues [][]byte, paramOIDs []uint32, paramFormats []int16, resultFormats []int16) {
   1607 	batch.buf = (&pgproto3.Parse{Query: sql, ParameterOIDs: paramOIDs}).Encode(batch.buf)
   1608 	batch.ExecPrepared("", paramValues, paramFormats, resultFormats)
   1609 }
   1610 
   1611 // ExecPrepared appends an ExecPrepared e command to the batch. See PgConn.ExecPrepared for parameter descriptions.
   1612 func (batch *Batch) ExecPrepared(stmtName string, paramValues [][]byte, paramFormats []int16, resultFormats []int16) {
   1613 	batch.buf = (&pgproto3.Bind{PreparedStatement: stmtName, ParameterFormatCodes: paramFormats, Parameters: paramValues, ResultFormatCodes: resultFormats}).Encode(batch.buf)
   1614 	batch.buf = (&pgproto3.Describe{ObjectType: 'P'}).Encode(batch.buf)
   1615 	batch.buf = (&pgproto3.Execute{}).Encode(batch.buf)
   1616 }
   1617 
   1618 // ExecBatch executes all the queries in batch in a single round-trip. Execution is implicitly transactional unless a
   1619 // transaction is already in progress or SQL contains transaction control statements. This is a simpler way of executing
   1620 // multiple queries in a single round trip than using pipeline mode.
   1621 func (pgConn *PgConn) ExecBatch(ctx context.Context, batch *Batch) *MultiResultReader {
   1622 	if err := pgConn.lock(); err != nil {
   1623 		return &MultiResultReader{
   1624 			closed: true,
   1625 			err:    err,
   1626 		}
   1627 	}
   1628 
   1629 	pgConn.multiResultReader = MultiResultReader{
   1630 		pgConn: pgConn,
   1631 		ctx:    ctx,
   1632 	}
   1633 	multiResult := &pgConn.multiResultReader
   1634 
   1635 	if ctx != context.Background() {
   1636 		select {
   1637 		case <-ctx.Done():
   1638 			multiResult.closed = true
   1639 			multiResult.err = newContextAlreadyDoneError(ctx)
   1640 			pgConn.unlock()
   1641 			return multiResult
   1642 		default:
   1643 		}
   1644 		pgConn.contextWatcher.Watch(ctx)
   1645 	}
   1646 
   1647 	batch.buf = (&pgproto3.Sync{}).Encode(batch.buf)
   1648 
   1649 	pgConn.enterPotentialWriteReadDeadlock()
   1650 	_, err := pgConn.conn.Write(batch.buf)
   1651 	pgConn.exitPotentialWriteReadDeadlock()
   1652 	if err != nil {
   1653 		multiResult.closed = true
   1654 		multiResult.err = err
   1655 		pgConn.unlock()
   1656 		return multiResult
   1657 	}
   1658 
   1659 	return multiResult
   1660 }
   1661 
   1662 // EscapeString escapes a string such that it can safely be interpolated into a SQL command string. It does not include
   1663 // the surrounding single quotes.
   1664 //
   1665 // The current implementation requires that standard_conforming_strings=on and client_encoding="UTF8". If these
   1666 // conditions are not met an error will be returned. It is possible these restrictions will be lifted in the future.
   1667 func (pgConn *PgConn) EscapeString(s string) (string, error) {
   1668 	if pgConn.ParameterStatus("standard_conforming_strings") != "on" {
   1669 		return "", errors.New("EscapeString must be run with standard_conforming_strings=on")
   1670 	}
   1671 
   1672 	if pgConn.ParameterStatus("client_encoding") != "UTF8" {
   1673 		return "", errors.New("EscapeString must be run with client_encoding=UTF8")
   1674 	}
   1675 
   1676 	return strings.Replace(s, "'", "''", -1), nil
   1677 }
   1678 
   1679 // CheckConn checks the underlying connection without writing any bytes. This is currently implemented by doing a read
   1680 // with a very short deadline. This can be useful because a TCP connection can be broken such that a write will appear
   1681 // to succeed even though it will never actually reach the server. Reading immediately before a write will detect this
   1682 // condition. If this is done immediately before sending a query it reduces the chances a query will be sent that fails
   1683 // without the client knowing whether the server received it or not.
   1684 //
   1685 // Deprecated: CheckConn is deprecated in favor of Ping. CheckConn cannot detect all types of broken connections where
   1686 // the write would still appear to succeed. Prefer Ping unless on a high latency connection.
   1687 func (pgConn *PgConn) CheckConn() error {
   1688 	ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond)
   1689 	defer cancel()
   1690 
   1691 	_, err := pgConn.ReceiveMessage(ctx)
   1692 	if err != nil {
   1693 		if !Timeout(err) {
   1694 			return err
   1695 		}
   1696 	}
   1697 
   1698 	return nil
   1699 }
   1700 
   1701 // Ping pings the server. This can be useful because a TCP connection can be broken such that a write will appear to
   1702 // succeed even though it will never actually reach the server. Pinging immediately before sending a query reduces the
   1703 // chances a query will be sent that fails without the client knowing whether the server received it or not.
   1704 func (pgConn *PgConn) Ping(ctx context.Context) error {
   1705 	return pgConn.Exec(ctx, "-- ping").Close()
   1706 }
   1707 
   1708 // makeCommandTag makes a CommandTag. It does not retain a reference to buf or buf's underlying memory.
   1709 func (pgConn *PgConn) makeCommandTag(buf []byte) CommandTag {
   1710 	return CommandTag{s: string(buf)}
   1711 }
   1712 
   1713 // enterPotentialWriteReadDeadlock must be called before a write that could deadlock if the server is simultaneously
   1714 // blocked writing to us.
   1715 func (pgConn *PgConn) enterPotentialWriteReadDeadlock() {
   1716 	// The time to wait is somewhat arbitrary. A Write should only take as long as the syscall and memcpy to the OS
   1717 	// outbound network buffer unless the buffer is full (which potentially is a block). It needs to be long enough for
   1718 	// the normal case, but short enough not to kill performance if a block occurs.
   1719 	//
   1720 	// In addition, on Windows the default timer resolution is 15.6ms. So setting the timer to less than that is
   1721 	// ineffective.
   1722 	pgConn.slowWriteTimer.Reset(15 * time.Millisecond)
   1723 }
   1724 
   1725 // exitPotentialWriteReadDeadlock must be called after a call to enterPotentialWriteReadDeadlock.
   1726 func (pgConn *PgConn) exitPotentialWriteReadDeadlock() {
   1727 	if !pgConn.slowWriteTimer.Reset(time.Duration(math.MaxInt64)) {
   1728 		pgConn.slowWriteTimer.Stop()
   1729 	}
   1730 }
   1731 
   1732 func (pgConn *PgConn) flushWithPotentialWriteReadDeadlock() error {
   1733 	pgConn.enterPotentialWriteReadDeadlock()
   1734 	err := pgConn.frontend.Flush()
   1735 	pgConn.exitPotentialWriteReadDeadlock()
   1736 	return err
   1737 }
   1738 
   1739 // HijackedConn is the result of hijacking a connection.
   1740 //
   1741 // Due to the necessary exposure of internal implementation details, it is not covered by the semantic versioning
   1742 // compatibility.
   1743 type HijackedConn struct {
   1744 	Conn              net.Conn
   1745 	PID               uint32            // backend pid
   1746 	SecretKey         uint32            // key to use to send a cancel query message to the server
   1747 	ParameterStatuses map[string]string // parameters that have been reported by the server
   1748 	TxStatus          byte
   1749 	Frontend          *pgproto3.Frontend
   1750 	Config            *Config
   1751 }
   1752 
   1753 // Hijack extracts the internal connection data. pgConn must be in an idle state. pgConn is unusable after hijacking.
   1754 // Hijacking is typically only useful when using pgconn to establish a connection, but taking complete control of the
   1755 // raw connection after that (e.g. a load balancer or proxy).
   1756 //
   1757 // Due to the necessary exposure of internal implementation details, it is not covered by the semantic versioning
   1758 // compatibility.
   1759 func (pgConn *PgConn) Hijack() (*HijackedConn, error) {
   1760 	if err := pgConn.lock(); err != nil {
   1761 		return nil, err
   1762 	}
   1763 	pgConn.status = connStatusClosed
   1764 
   1765 	return &HijackedConn{
   1766 		Conn:              pgConn.conn,
   1767 		PID:               pgConn.pid,
   1768 		SecretKey:         pgConn.secretKey,
   1769 		ParameterStatuses: pgConn.parameterStatuses,
   1770 		TxStatus:          pgConn.txStatus,
   1771 		Frontend:          pgConn.frontend,
   1772 		Config:            pgConn.config,
   1773 	}, nil
   1774 }
   1775 
   1776 // Construct created a PgConn from an already established connection to a PostgreSQL server. This is the inverse of
   1777 // PgConn.Hijack. The connection must be in an idle state.
   1778 //
   1779 // Due to the necessary exposure of internal implementation details, it is not covered by the semantic versioning
   1780 // compatibility.
   1781 func Construct(hc *HijackedConn) (*PgConn, error) {
   1782 	pgConn := &PgConn{
   1783 		conn:              hc.Conn,
   1784 		pid:               hc.PID,
   1785 		secretKey:         hc.SecretKey,
   1786 		parameterStatuses: hc.ParameterStatuses,
   1787 		txStatus:          hc.TxStatus,
   1788 		frontend:          hc.Frontend,
   1789 		config:            hc.Config,
   1790 
   1791 		status: connStatusIdle,
   1792 
   1793 		cleanupDone: make(chan struct{}),
   1794 	}
   1795 
   1796 	pgConn.contextWatcher = newContextWatcher(pgConn.conn)
   1797 	pgConn.bgReader = bgreader.New(pgConn.conn)
   1798 	pgConn.slowWriteTimer = time.AfterFunc(time.Duration(math.MaxInt64), pgConn.bgReader.Start)
   1799 
   1800 	return pgConn, nil
   1801 }
   1802 
   1803 // Pipeline represents a connection in pipeline mode.
   1804 //
   1805 // SendPrepare, SendQueryParams, and SendQueryPrepared queue requests to the server. These requests are not written until
   1806 // pipeline is flushed by Flush or Sync. Sync must be called after the last request is queued. Requests between
   1807 // synchronization points are implicitly transactional unless explicit transaction control statements have been issued.
   1808 //
   1809 // The context the pipeline was started with is in effect for the entire life of the Pipeline.
   1810 //
   1811 // For a deeper understanding of pipeline mode see the PostgreSQL documentation for the extended query protocol
   1812 // (https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-FLOW-EXT-QUERY) and the libpq pipeline mode
   1813 // (https://www.postgresql.org/docs/current/libpq-pipeline-mode.html).
   1814 type Pipeline struct {
   1815 	conn *PgConn
   1816 	ctx  context.Context
   1817 
   1818 	expectedReadyForQueryCount int
   1819 	pendingSync                bool
   1820 
   1821 	err    error
   1822 	closed bool
   1823 }
   1824 
   1825 // PipelineSync is returned by GetResults when a ReadyForQuery message is received.
   1826 type PipelineSync struct{}
   1827 
   1828 // CloseComplete is returned by GetResults when a CloseComplete message is received.
   1829 type CloseComplete struct{}
   1830 
   1831 // StartPipeline switches the connection to pipeline mode and returns a *Pipeline. In pipeline mode requests can be sent
   1832 // to the server without waiting for a response. Close must be called on the returned *Pipeline to return the connection
   1833 // to normal mode. While in pipeline mode, no methods that communicate with the server may be called except
   1834 // CancelRequest and Close. ctx is in effect for entire life of the *Pipeline.
   1835 //
   1836 // Prefer ExecBatch when only sending one group of queries at once.
   1837 func (pgConn *PgConn) StartPipeline(ctx context.Context) *Pipeline {
   1838 	if err := pgConn.lock(); err != nil {
   1839 		return &Pipeline{
   1840 			closed: true,
   1841 			err:    err,
   1842 		}
   1843 	}
   1844 
   1845 	pgConn.pipeline = Pipeline{
   1846 		conn: pgConn,
   1847 		ctx:  ctx,
   1848 	}
   1849 	pipeline := &pgConn.pipeline
   1850 
   1851 	if ctx != context.Background() {
   1852 		select {
   1853 		case <-ctx.Done():
   1854 			pipeline.closed = true
   1855 			pipeline.err = newContextAlreadyDoneError(ctx)
   1856 			pgConn.unlock()
   1857 			return pipeline
   1858 		default:
   1859 		}
   1860 		pgConn.contextWatcher.Watch(ctx)
   1861 	}
   1862 
   1863 	return pipeline
   1864 }
   1865 
   1866 // SendPrepare is the pipeline version of *PgConn.Prepare.
   1867 func (p *Pipeline) SendPrepare(name, sql string, paramOIDs []uint32) {
   1868 	if p.closed {
   1869 		return
   1870 	}
   1871 	p.pendingSync = true
   1872 
   1873 	p.conn.frontend.SendParse(&pgproto3.Parse{Name: name, Query: sql, ParameterOIDs: paramOIDs})
   1874 	p.conn.frontend.SendDescribe(&pgproto3.Describe{ObjectType: 'S', Name: name})
   1875 }
   1876 
   1877 // SendDeallocate deallocates a prepared statement.
   1878 func (p *Pipeline) SendDeallocate(name string) {
   1879 	if p.closed {
   1880 		return
   1881 	}
   1882 	p.pendingSync = true
   1883 
   1884 	p.conn.frontend.SendClose(&pgproto3.Close{ObjectType: 'S', Name: name})
   1885 }
   1886 
   1887 // SendQueryParams is the pipeline version of *PgConn.QueryParams.
   1888 func (p *Pipeline) SendQueryParams(sql string, paramValues [][]byte, paramOIDs []uint32, paramFormats []int16, resultFormats []int16) {
   1889 	if p.closed {
   1890 		return
   1891 	}
   1892 	p.pendingSync = true
   1893 
   1894 	p.conn.frontend.SendParse(&pgproto3.Parse{Query: sql, ParameterOIDs: paramOIDs})
   1895 	p.conn.frontend.SendBind(&pgproto3.Bind{ParameterFormatCodes: paramFormats, Parameters: paramValues, ResultFormatCodes: resultFormats})
   1896 	p.conn.frontend.SendDescribe(&pgproto3.Describe{ObjectType: 'P'})
   1897 	p.conn.frontend.SendExecute(&pgproto3.Execute{})
   1898 }
   1899 
   1900 // SendQueryPrepared is the pipeline version of *PgConn.QueryPrepared.
   1901 func (p *Pipeline) SendQueryPrepared(stmtName string, paramValues [][]byte, paramFormats []int16, resultFormats []int16) {
   1902 	if p.closed {
   1903 		return
   1904 	}
   1905 	p.pendingSync = true
   1906 
   1907 	p.conn.frontend.SendBind(&pgproto3.Bind{PreparedStatement: stmtName, ParameterFormatCodes: paramFormats, Parameters: paramValues, ResultFormatCodes: resultFormats})
   1908 	p.conn.frontend.SendDescribe(&pgproto3.Describe{ObjectType: 'P'})
   1909 	p.conn.frontend.SendExecute(&pgproto3.Execute{})
   1910 }
   1911 
   1912 // Flush flushes the queued requests without establishing a synchronization point.
   1913 func (p *Pipeline) Flush() error {
   1914 	if p.closed {
   1915 		if p.err != nil {
   1916 			return p.err
   1917 		}
   1918 		return errors.New("pipeline closed")
   1919 	}
   1920 
   1921 	err := p.conn.flushWithPotentialWriteReadDeadlock()
   1922 	if err != nil {
   1923 		err = normalizeTimeoutError(p.ctx, err)
   1924 
   1925 		p.conn.asyncClose()
   1926 
   1927 		p.conn.contextWatcher.Unwatch()
   1928 		p.conn.unlock()
   1929 		p.closed = true
   1930 		p.err = err
   1931 		return err
   1932 	}
   1933 
   1934 	return nil
   1935 }
   1936 
   1937 // Sync establishes a synchronization point and flushes the queued requests.
   1938 func (p *Pipeline) Sync() error {
   1939 	p.conn.frontend.SendSync(&pgproto3.Sync{})
   1940 	err := p.Flush()
   1941 	if err != nil {
   1942 		return err
   1943 	}
   1944 
   1945 	p.pendingSync = false
   1946 	p.expectedReadyForQueryCount++
   1947 
   1948 	return nil
   1949 }
   1950 
   1951 // GetResults gets the next results. If results are present, results may be a *ResultReader, *StatementDescription, or
   1952 // *PipelineSync. If an ErrorResponse is received from the server, results will be nil and err will be a *PgError. If no
   1953 // results are available, results and err will both be nil.
   1954 func (p *Pipeline) GetResults() (results any, err error) {
   1955 	if p.expectedReadyForQueryCount == 0 {
   1956 		return nil, nil
   1957 	}
   1958 
   1959 	for {
   1960 		msg, err := p.conn.receiveMessage()
   1961 		if err != nil {
   1962 			return nil, err
   1963 		}
   1964 
   1965 		switch msg := msg.(type) {
   1966 		case *pgproto3.RowDescription:
   1967 			p.conn.resultReader = ResultReader{
   1968 				pgConn:            p.conn,
   1969 				pipeline:          p,
   1970 				ctx:               p.ctx,
   1971 				fieldDescriptions: p.conn.convertRowDescription(p.conn.fieldDescriptions[:], msg),
   1972 			}
   1973 			return &p.conn.resultReader, nil
   1974 		case *pgproto3.CommandComplete:
   1975 			p.conn.resultReader = ResultReader{
   1976 				commandTag:       p.conn.makeCommandTag(msg.CommandTag),
   1977 				commandConcluded: true,
   1978 				closed:           true,
   1979 			}
   1980 			return &p.conn.resultReader, nil
   1981 		case *pgproto3.ParseComplete:
   1982 			peekedMsg, err := p.conn.peekMessage()
   1983 			if err != nil {
   1984 				return nil, err
   1985 			}
   1986 			if _, ok := peekedMsg.(*pgproto3.ParameterDescription); ok {
   1987 				return p.getResultsPrepare()
   1988 			}
   1989 		case *pgproto3.CloseComplete:
   1990 			return &CloseComplete{}, nil
   1991 		case *pgproto3.ReadyForQuery:
   1992 			p.expectedReadyForQueryCount--
   1993 			return &PipelineSync{}, nil
   1994 		case *pgproto3.ErrorResponse:
   1995 			pgErr := ErrorResponseToPgError(msg)
   1996 			return nil, pgErr
   1997 		}
   1998 
   1999 	}
   2000 
   2001 }
   2002 
   2003 func (p *Pipeline) getResultsPrepare() (*StatementDescription, error) {
   2004 	psd := &StatementDescription{}
   2005 
   2006 	for {
   2007 		msg, err := p.conn.receiveMessage()
   2008 		if err != nil {
   2009 			p.conn.asyncClose()
   2010 			return nil, normalizeTimeoutError(p.ctx, err)
   2011 		}
   2012 
   2013 		switch msg := msg.(type) {
   2014 		case *pgproto3.ParameterDescription:
   2015 			psd.ParamOIDs = make([]uint32, len(msg.ParameterOIDs))
   2016 			copy(psd.ParamOIDs, msg.ParameterOIDs)
   2017 		case *pgproto3.RowDescription:
   2018 			psd.Fields = p.conn.convertRowDescription(nil, msg)
   2019 			return psd, nil
   2020 
   2021 		// NoData is returned instead of RowDescription when there is no expected result. e.g. An INSERT without a RETURNING
   2022 		// clause.
   2023 		case *pgproto3.NoData:
   2024 			return psd, nil
   2025 
   2026 		// These should never happen here. But don't take chances that could lead to a deadlock.
   2027 		case *pgproto3.ErrorResponse:
   2028 			pgErr := ErrorResponseToPgError(msg)
   2029 			return nil, pgErr
   2030 		case *pgproto3.CommandComplete:
   2031 			p.conn.asyncClose()
   2032 			return nil, errors.New("BUG: received CommandComplete while handling Describe")
   2033 		case *pgproto3.ReadyForQuery:
   2034 			p.conn.asyncClose()
   2035 			return nil, errors.New("BUG: received ReadyForQuery while handling Describe")
   2036 		}
   2037 	}
   2038 }
   2039 
   2040 // Close closes the pipeline and returns the connection to normal mode.
   2041 func (p *Pipeline) Close() error {
   2042 	if p.closed {
   2043 		return p.err
   2044 	}
   2045 	p.closed = true
   2046 
   2047 	if p.pendingSync {
   2048 		p.conn.asyncClose()
   2049 		p.err = errors.New("pipeline has unsynced requests")
   2050 		p.conn.contextWatcher.Unwatch()
   2051 		p.conn.unlock()
   2052 
   2053 		return p.err
   2054 	}
   2055 
   2056 	for p.expectedReadyForQueryCount > 0 {
   2057 		_, err := p.GetResults()
   2058 		if err != nil {
   2059 			p.err = err
   2060 			var pgErr *PgError
   2061 			if !errors.As(err, &pgErr) {
   2062 				p.conn.asyncClose()
   2063 				break
   2064 			}
   2065 		}
   2066 	}
   2067 
   2068 	p.conn.contextWatcher.Unwatch()
   2069 	p.conn.unlock()
   2070 
   2071 	return p.err
   2072 }