
Unnamed repository; edit this file 'description' to name the repository.
Log | Files | Refs | README | LICENSE

tx.go (13600B)

      1 package pgx
      3 import (
      4 	"context"
      5 	"errors"
      6 	"fmt"
      7 	"strconv"
      8 	"strings"
     10 	""
     11 )
     13 // TxIsoLevel is the transaction isolation level (serializable, repeatable read, read committed or read uncommitted)
     14 type TxIsoLevel string
     16 // Transaction isolation levels
     17 const (
     18 	Serializable    TxIsoLevel = "serializable"
     19 	RepeatableRead  TxIsoLevel = "repeatable read"
     20 	ReadCommitted   TxIsoLevel = "read committed"
     21 	ReadUncommitted TxIsoLevel = "read uncommitted"
     22 )
     24 // TxAccessMode is the transaction access mode (read write or read only)
     25 type TxAccessMode string
     27 // Transaction access modes
     28 const (
     29 	ReadWrite TxAccessMode = "read write"
     30 	ReadOnly  TxAccessMode = "read only"
     31 )
     33 // TxDeferrableMode is the transaction deferrable mode (deferrable or not deferrable)
     34 type TxDeferrableMode string
     36 // Transaction deferrable modes
     37 const (
     38 	Deferrable    TxDeferrableMode = "deferrable"
     39 	NotDeferrable TxDeferrableMode = "not deferrable"
     40 )
     42 // TxOptions are transaction modes within a transaction block
     43 type TxOptions struct {
     44 	IsoLevel       TxIsoLevel
     45 	AccessMode     TxAccessMode
     46 	DeferrableMode TxDeferrableMode
     48 	// BeginQuery is the SQL query that will be executed to begin the transaction. This allows using non-standard syntax
     49 	// such as BEGIN PRIORITY HIGH with CockroachDB. If set this will override the other settings.
     50 	BeginQuery string
     51 }
     53 var emptyTxOptions TxOptions
     55 func (txOptions TxOptions) beginSQL() string {
     56 	if txOptions == emptyTxOptions {
     57 		return "begin"
     58 	}
     60 	if txOptions.BeginQuery != "" {
     61 		return txOptions.BeginQuery
     62 	}
     64 	var buf strings.Builder
     65 	buf.Grow(64) // 64 - maximum length of string with available options
     66 	buf.WriteString("begin")
     68 	if txOptions.IsoLevel != "" {
     69 		buf.WriteString(" isolation level ")
     70 		buf.WriteString(string(txOptions.IsoLevel))
     71 	}
     72 	if txOptions.AccessMode != "" {
     73 		buf.WriteByte(' ')
     74 		buf.WriteString(string(txOptions.AccessMode))
     75 	}
     76 	if txOptions.DeferrableMode != "" {
     77 		buf.WriteByte(' ')
     78 		buf.WriteString(string(txOptions.DeferrableMode))
     79 	}
     81 	return buf.String()
     82 }
     84 var ErrTxClosed = errors.New("tx is closed")
     86 // ErrTxCommitRollback occurs when an error has occurred in a transaction and
     87 // Commit() is called. PostgreSQL accepts COMMIT on aborted transactions, but
     88 // it is treated as ROLLBACK.
     89 var ErrTxCommitRollback = errors.New("commit unexpectedly resulted in rollback")
     91 // Begin starts a transaction. Unlike database/sql, the context only affects the begin command. i.e. there is no
     92 // auto-rollback on context cancellation.
     93 func (c *Conn) Begin(ctx context.Context) (Tx, error) {
     94 	return c.BeginTx(ctx, TxOptions{})
     95 }
     97 // BeginTx starts a transaction with txOptions determining the transaction mode. Unlike database/sql, the context only
     98 // affects the begin command. i.e. there is no auto-rollback on context cancellation.
     99 func (c *Conn) BeginTx(ctx context.Context, txOptions TxOptions) (Tx, error) {
    100 	_, err := c.Exec(ctx, txOptions.beginSQL())
    101 	if err != nil {
    102 		// begin should never fail unless there is an underlying connection issue or
    103 		// a context timeout. In either case, the connection is possibly broken.
    104 		c.die(errors.New("failed to begin transaction"))
    105 		return nil, err
    106 	}
    108 	return &dbTx{conn: c}, nil
    109 }
    111 // Tx represents a database transaction.
    112 //
    113 // Tx is an interface instead of a struct to enable connection pools to be implemented without relying on internal pgx
    114 // state, to support pseudo-nested transactions with savepoints, and to allow tests to mock transactions. However,
    115 // adding a method to an interface is technically a breaking change. If new methods are added to Conn it may be
    116 // desirable to add them to Tx as well. Because of this the Tx interface is partially excluded from semantic version
    117 // requirements. Methods will not be removed or changed, but new methods may be added.
    118 type Tx interface {
    119 	// Begin starts a pseudo nested transaction.
    120 	Begin(ctx context.Context) (Tx, error)
    122 	// Commit commits the transaction if this is a real transaction or releases the savepoint if this is a pseudo nested
    123 	// transaction. Commit will return an error where errors.Is(ErrTxClosed) is true if the Tx is already closed, but is
    124 	// otherwise safe to call multiple times. If the commit fails with a rollback status (e.g. the transaction was already
    125 	// in a broken state) then an error where errors.Is(ErrTxCommitRollback) is true will be returned.
    126 	Commit(ctx context.Context) error
    128 	// Rollback rolls back the transaction if this is a real transaction or rolls back to the savepoint if this is a
    129 	// pseudo nested transaction. Rollback will return an error where errors.Is(ErrTxClosed) is true if the Tx is already
    130 	// closed, but is otherwise safe to call multiple times. Hence, a defer tx.Rollback() is safe even if tx.Commit() will
    131 	// be called first in a non-error condition. Any other failure of a real transaction will result in the connection
    132 	// being closed.
    133 	Rollback(ctx context.Context) error
    135 	CopyFrom(ctx context.Context, tableName Identifier, columnNames []string, rowSrc CopyFromSource) (int64, error)
    136 	SendBatch(ctx context.Context, b *Batch) BatchResults
    137 	LargeObjects() LargeObjects
    139 	Prepare(ctx context.Context, name, sql string) (*pgconn.StatementDescription, error)
    141 	Exec(ctx context.Context, sql string, arguments ...any) (commandTag pgconn.CommandTag, err error)
    142 	Query(ctx context.Context, sql string, args ...any) (Rows, error)
    143 	QueryRow(ctx context.Context, sql string, args ...any) Row
    145 	// Conn returns the underlying *Conn that on which this transaction is executing.
    146 	Conn() *Conn
    147 }
    149 // dbTx represents a database transaction.
    150 //
    151 // All dbTx methods return ErrTxClosed if Commit or Rollback has already been
    152 // called on the dbTx.
    153 type dbTx struct {
    154 	conn         *Conn
    155 	err          error
    156 	savepointNum int64
    157 	closed       bool
    158 }
    160 // Begin starts a pseudo nested transaction implemented with a savepoint.
    161 func (tx *dbTx) Begin(ctx context.Context) (Tx, error) {
    162 	if tx.closed {
    163 		return nil, ErrTxClosed
    164 	}
    166 	tx.savepointNum++
    167 	_, err := tx.conn.Exec(ctx, "savepoint sp_"+strconv.FormatInt(tx.savepointNum, 10))
    168 	if err != nil {
    169 		return nil, err
    170 	}
    172 	return &dbSimulatedNestedTx{tx: tx, savepointNum: tx.savepointNum}, nil
    173 }
    175 // Commit commits the transaction.
    176 func (tx *dbTx) Commit(ctx context.Context) error {
    177 	if tx.closed {
    178 		return ErrTxClosed
    179 	}
    181 	commandTag, err := tx.conn.Exec(ctx, "commit")
    182 	tx.closed = true
    183 	if err != nil {
    184 		if tx.conn.PgConn().TxStatus() != 'I' {
    185 			_ = tx.conn.Close(ctx) // already have error to return
    186 		}
    187 		return err
    188 	}
    189 	if commandTag.String() == "ROLLBACK" {
    190 		return ErrTxCommitRollback
    191 	}
    193 	return nil
    194 }
    196 // Rollback rolls back the transaction. Rollback will return ErrTxClosed if the
    197 // Tx is already closed, but is otherwise safe to call multiple times. Hence, a
    198 // defer tx.Rollback() is safe even if tx.Commit() will be called first in a
    199 // non-error condition.
    200 func (tx *dbTx) Rollback(ctx context.Context) error {
    201 	if tx.closed {
    202 		return ErrTxClosed
    203 	}
    205 	_, err := tx.conn.Exec(ctx, "rollback")
    206 	tx.closed = true
    207 	if err != nil {
    208 		// A rollback failure leaves the connection in an undefined state
    209 		tx.conn.die(fmt.Errorf("rollback failed: %w", err))
    210 		return err
    211 	}
    213 	return nil
    214 }
    216 // Exec delegates to the underlying *Conn
    217 func (tx *dbTx) Exec(ctx context.Context, sql string, arguments ...any) (commandTag pgconn.CommandTag, err error) {
    218 	if tx.closed {
    219 		return pgconn.CommandTag{}, ErrTxClosed
    220 	}
    222 	return tx.conn.Exec(ctx, sql, arguments...)
    223 }
    225 // Prepare delegates to the underlying *Conn
    226 func (tx *dbTx) Prepare(ctx context.Context, name, sql string) (*pgconn.StatementDescription, error) {
    227 	if tx.closed {
    228 		return nil, ErrTxClosed
    229 	}
    231 	return tx.conn.Prepare(ctx, name, sql)
    232 }
    234 // Query delegates to the underlying *Conn
    235 func (tx *dbTx) Query(ctx context.Context, sql string, args ...any) (Rows, error) {
    236 	if tx.closed {
    237 		// Because checking for errors can be deferred to the *Rows, build one with the error
    238 		err := ErrTxClosed
    239 		return &baseRows{closed: true, err: err}, err
    240 	}
    242 	return tx.conn.Query(ctx, sql, args...)
    243 }
    245 // QueryRow delegates to the underlying *Conn
    246 func (tx *dbTx) QueryRow(ctx context.Context, sql string, args ...any) Row {
    247 	rows, _ := tx.Query(ctx, sql, args...)
    248 	return (*connRow)(rows.(*baseRows))
    249 }
    251 // CopyFrom delegates to the underlying *Conn
    252 func (tx *dbTx) CopyFrom(ctx context.Context, tableName Identifier, columnNames []string, rowSrc CopyFromSource) (int64, error) {
    253 	if tx.closed {
    254 		return 0, ErrTxClosed
    255 	}
    257 	return tx.conn.CopyFrom(ctx, tableName, columnNames, rowSrc)
    258 }
    260 // SendBatch delegates to the underlying *Conn
    261 func (tx *dbTx) SendBatch(ctx context.Context, b *Batch) BatchResults {
    262 	if tx.closed {
    263 		return &batchResults{err: ErrTxClosed}
    264 	}
    266 	return tx.conn.SendBatch(ctx, b)
    267 }
    269 // LargeObjects returns a LargeObjects instance for the transaction.
    270 func (tx *dbTx) LargeObjects() LargeObjects {
    271 	return LargeObjects{tx: tx}
    272 }
    274 func (tx *dbTx) Conn() *Conn {
    275 	return tx.conn
    276 }
    278 // dbSimulatedNestedTx represents a simulated nested transaction implemented by a savepoint.
    279 type dbSimulatedNestedTx struct {
    280 	tx           Tx
    281 	savepointNum int64
    282 	closed       bool
    283 }
    285 // Begin starts a pseudo nested transaction implemented with a savepoint.
    286 func (sp *dbSimulatedNestedTx) Begin(ctx context.Context) (Tx, error) {
    287 	if sp.closed {
    288 		return nil, ErrTxClosed
    289 	}
    291 	return sp.tx.Begin(ctx)
    292 }
    294 // Commit releases the savepoint essentially committing the pseudo nested transaction.
    295 func (sp *dbSimulatedNestedTx) Commit(ctx context.Context) error {
    296 	if sp.closed {
    297 		return ErrTxClosed
    298 	}
    300 	_, err := sp.Exec(ctx, "release savepoint sp_"+strconv.FormatInt(sp.savepointNum, 10))
    301 	sp.closed = true
    302 	return err
    303 }
    305 // Rollback rolls back to the savepoint essentially rolling back the pseudo nested transaction. Rollback will return
    306 // ErrTxClosed if the dbSavepoint is already closed, but is otherwise safe to call multiple times. Hence, a defer sp.Rollback()
    307 // is safe even if sp.Commit() will be called first in a non-error condition.
    308 func (sp *dbSimulatedNestedTx) Rollback(ctx context.Context) error {
    309 	if sp.closed {
    310 		return ErrTxClosed
    311 	}
    313 	_, err := sp.Exec(ctx, "rollback to savepoint sp_"+strconv.FormatInt(sp.savepointNum, 10))
    314 	sp.closed = true
    315 	return err
    316 }
    318 // Exec delegates to the underlying Tx
    319 func (sp *dbSimulatedNestedTx) Exec(ctx context.Context, sql string, arguments ...any) (commandTag pgconn.CommandTag, err error) {
    320 	if sp.closed {
    321 		return pgconn.CommandTag{}, ErrTxClosed
    322 	}
    324 	return sp.tx.Exec(ctx, sql, arguments...)
    325 }
    327 // Prepare delegates to the underlying Tx
    328 func (sp *dbSimulatedNestedTx) Prepare(ctx context.Context, name, sql string) (*pgconn.StatementDescription, error) {
    329 	if sp.closed {
    330 		return nil, ErrTxClosed
    331 	}
    333 	return sp.tx.Prepare(ctx, name, sql)
    334 }
    336 // Query delegates to the underlying Tx
    337 func (sp *dbSimulatedNestedTx) Query(ctx context.Context, sql string, args ...any) (Rows, error) {
    338 	if sp.closed {
    339 		// Because checking for errors can be deferred to the *Rows, build one with the error
    340 		err := ErrTxClosed
    341 		return &baseRows{closed: true, err: err}, err
    342 	}
    344 	return sp.tx.Query(ctx, sql, args...)
    345 }
    347 // QueryRow delegates to the underlying Tx
    348 func (sp *dbSimulatedNestedTx) QueryRow(ctx context.Context, sql string, args ...any) Row {
    349 	rows, _ := sp.Query(ctx, sql, args...)
    350 	return (*connRow)(rows.(*baseRows))
    351 }
    353 // CopyFrom delegates to the underlying *Conn
    354 func (sp *dbSimulatedNestedTx) CopyFrom(ctx context.Context, tableName Identifier, columnNames []string, rowSrc CopyFromSource) (int64, error) {
    355 	if sp.closed {
    356 		return 0, ErrTxClosed
    357 	}
    359 	return sp.tx.CopyFrom(ctx, tableName, columnNames, rowSrc)
    360 }
    362 // SendBatch delegates to the underlying *Conn
    363 func (sp *dbSimulatedNestedTx) SendBatch(ctx context.Context, b *Batch) BatchResults {
    364 	if sp.closed {
    365 		return &batchResults{err: ErrTxClosed}
    366 	}
    368 	return sp.tx.SendBatch(ctx, b)
    369 }
    371 func (sp *dbSimulatedNestedTx) LargeObjects() LargeObjects {
    372 	return LargeObjects{tx: sp}
    373 }
    375 func (sp *dbSimulatedNestedTx) Conn() *Conn {
    376 	return sp.tx.Conn()
    377 }
    379 // BeginFunc calls Begin on db and then calls fn. If fn does not return an error then it calls Commit on db. If fn
    380 // returns an error it calls Rollback on db. The context will be used when executing the transaction control statements
    381 // (BEGIN, ROLLBACK, and COMMIT) but does not otherwise affect the execution of fn.
    382 func BeginFunc(
    383 	ctx context.Context,
    384 	db interface {
    385 		Begin(ctx context.Context) (Tx, error)
    386 	},
    387 	fn func(Tx) error,
    388 ) (err error) {
    389 	var tx Tx
    390 	tx, err = db.Begin(ctx)
    391 	if err != nil {
    392 		return err
    393 	}
    395 	return beginFuncExec(ctx, tx, fn)
    396 }
    398 // BeginTxFunc calls BeginTx on db and then calls fn. If fn does not return an error then it calls Commit on db. If fn
    399 // returns an error it calls Rollback on db. The context will be used when executing the transaction control statements
    400 // (BEGIN, ROLLBACK, and COMMIT) but does not otherwise affect the execution of fn.
    401 func BeginTxFunc(
    402 	ctx context.Context,
    403 	db interface {
    404 		BeginTx(ctx context.Context, txOptions TxOptions) (Tx, error)
    405 	},
    406 	txOptions TxOptions,
    407 	fn func(Tx) error,
    408 ) (err error) {
    409 	var tx Tx
    410 	tx, err = db.BeginTx(ctx, txOptions)
    411 	if err != nil {
    412 		return err
    413 	}
    415 	return beginFuncExec(ctx, tx, fn)
    416 }
    418 func beginFuncExec(ctx context.Context, tx Tx, fn func(Tx) error) (err error) {
    419 	defer func() {
    420 		rollbackErr := tx.Rollback(ctx)
    421 		if rollbackErr != nil && !errors.Is(rollbackErr, ErrTxClosed) {
    422 			err = rollbackErr
    423 		}
    424 	}()
    426 	fErr := fn(tx)
    427 	if fErr != nil {
    428 		_ = tx.Rollback(ctx) // ignore rollback error as there is already an error to return
    429 		return fErr
    430 	}
    432 	return tx.Commit(ctx)
    433 }