gtsocial-umbx

Unnamed repository; edit this file 'description' to name the repository.
Log | Files | Refs | README | LICENSE

tx.go (13600B)


      1 package pgx
      2 
      3 import (
      4 	"context"
      5 	"errors"
      6 	"fmt"
      7 	"strconv"
      8 	"strings"
      9 
     10 	"github.com/jackc/pgx/v5/pgconn"
     11 )
     12 
     13 // TxIsoLevel is the transaction isolation level (serializable, repeatable read, read committed or read uncommitted)
     14 type TxIsoLevel string
     15 
     16 // Transaction isolation levels
     17 const (
     18 	Serializable    TxIsoLevel = "serializable"
     19 	RepeatableRead  TxIsoLevel = "repeatable read"
     20 	ReadCommitted   TxIsoLevel = "read committed"
     21 	ReadUncommitted TxIsoLevel = "read uncommitted"
     22 )
     23 
     24 // TxAccessMode is the transaction access mode (read write or read only)
     25 type TxAccessMode string
     26 
     27 // Transaction access modes
     28 const (
     29 	ReadWrite TxAccessMode = "read write"
     30 	ReadOnly  TxAccessMode = "read only"
     31 )
     32 
     33 // TxDeferrableMode is the transaction deferrable mode (deferrable or not deferrable)
     34 type TxDeferrableMode string
     35 
     36 // Transaction deferrable modes
     37 const (
     38 	Deferrable    TxDeferrableMode = "deferrable"
     39 	NotDeferrable TxDeferrableMode = "not deferrable"
     40 )
     41 
     42 // TxOptions are transaction modes within a transaction block
     43 type TxOptions struct {
     44 	IsoLevel       TxIsoLevel
     45 	AccessMode     TxAccessMode
     46 	DeferrableMode TxDeferrableMode
     47 
     48 	// BeginQuery is the SQL query that will be executed to begin the transaction. This allows using non-standard syntax
     49 	// such as BEGIN PRIORITY HIGH with CockroachDB. If set this will override the other settings.
     50 	BeginQuery string
     51 }
     52 
     53 var emptyTxOptions TxOptions
     54 
     55 func (txOptions TxOptions) beginSQL() string {
     56 	if txOptions == emptyTxOptions {
     57 		return "begin"
     58 	}
     59 
     60 	if txOptions.BeginQuery != "" {
     61 		return txOptions.BeginQuery
     62 	}
     63 
     64 	var buf strings.Builder
     65 	buf.Grow(64) // 64 - maximum length of string with available options
     66 	buf.WriteString("begin")
     67 
     68 	if txOptions.IsoLevel != "" {
     69 		buf.WriteString(" isolation level ")
     70 		buf.WriteString(string(txOptions.IsoLevel))
     71 	}
     72 	if txOptions.AccessMode != "" {
     73 		buf.WriteByte(' ')
     74 		buf.WriteString(string(txOptions.AccessMode))
     75 	}
     76 	if txOptions.DeferrableMode != "" {
     77 		buf.WriteByte(' ')
     78 		buf.WriteString(string(txOptions.DeferrableMode))
     79 	}
     80 
     81 	return buf.String()
     82 }
     83 
     84 var ErrTxClosed = errors.New("tx is closed")
     85 
     86 // ErrTxCommitRollback occurs when an error has occurred in a transaction and
     87 // Commit() is called. PostgreSQL accepts COMMIT on aborted transactions, but
     88 // it is treated as ROLLBACK.
     89 var ErrTxCommitRollback = errors.New("commit unexpectedly resulted in rollback")
     90 
     91 // Begin starts a transaction. Unlike database/sql, the context only affects the begin command. i.e. there is no
     92 // auto-rollback on context cancellation.
     93 func (c *Conn) Begin(ctx context.Context) (Tx, error) {
     94 	return c.BeginTx(ctx, TxOptions{})
     95 }
     96 
     97 // BeginTx starts a transaction with txOptions determining the transaction mode. Unlike database/sql, the context only
     98 // affects the begin command. i.e. there is no auto-rollback on context cancellation.
     99 func (c *Conn) BeginTx(ctx context.Context, txOptions TxOptions) (Tx, error) {
    100 	_, err := c.Exec(ctx, txOptions.beginSQL())
    101 	if err != nil {
    102 		// begin should never fail unless there is an underlying connection issue or
    103 		// a context timeout. In either case, the connection is possibly broken.
    104 		c.die(errors.New("failed to begin transaction"))
    105 		return nil, err
    106 	}
    107 
    108 	return &dbTx{conn: c}, nil
    109 }
    110 
    111 // Tx represents a database transaction.
    112 //
    113 // Tx is an interface instead of a struct to enable connection pools to be implemented without relying on internal pgx
    114 // state, to support pseudo-nested transactions with savepoints, and to allow tests to mock transactions. However,
    115 // adding a method to an interface is technically a breaking change. If new methods are added to Conn it may be
    116 // desirable to add them to Tx as well. Because of this the Tx interface is partially excluded from semantic version
    117 // requirements. Methods will not be removed or changed, but new methods may be added.
    118 type Tx interface {
    119 	// Begin starts a pseudo nested transaction.
    120 	Begin(ctx context.Context) (Tx, error)
    121 
    122 	// Commit commits the transaction if this is a real transaction or releases the savepoint if this is a pseudo nested
    123 	// transaction. Commit will return an error where errors.Is(ErrTxClosed) is true if the Tx is already closed, but is
    124 	// otherwise safe to call multiple times. If the commit fails with a rollback status (e.g. the transaction was already
    125 	// in a broken state) then an error where errors.Is(ErrTxCommitRollback) is true will be returned.
    126 	Commit(ctx context.Context) error
    127 
    128 	// Rollback rolls back the transaction if this is a real transaction or rolls back to the savepoint if this is a
    129 	// pseudo nested transaction. Rollback will return an error where errors.Is(ErrTxClosed) is true if the Tx is already
    130 	// closed, but is otherwise safe to call multiple times. Hence, a defer tx.Rollback() is safe even if tx.Commit() will
    131 	// be called first in a non-error condition. Any other failure of a real transaction will result in the connection
    132 	// being closed.
    133 	Rollback(ctx context.Context) error
    134 
    135 	CopyFrom(ctx context.Context, tableName Identifier, columnNames []string, rowSrc CopyFromSource) (int64, error)
    136 	SendBatch(ctx context.Context, b *Batch) BatchResults
    137 	LargeObjects() LargeObjects
    138 
    139 	Prepare(ctx context.Context, name, sql string) (*pgconn.StatementDescription, error)
    140 
    141 	Exec(ctx context.Context, sql string, arguments ...any) (commandTag pgconn.CommandTag, err error)
    142 	Query(ctx context.Context, sql string, args ...any) (Rows, error)
    143 	QueryRow(ctx context.Context, sql string, args ...any) Row
    144 
    145 	// Conn returns the underlying *Conn that on which this transaction is executing.
    146 	Conn() *Conn
    147 }
    148 
    149 // dbTx represents a database transaction.
    150 //
    151 // All dbTx methods return ErrTxClosed if Commit or Rollback has already been
    152 // called on the dbTx.
    153 type dbTx struct {
    154 	conn         *Conn
    155 	err          error
    156 	savepointNum int64
    157 	closed       bool
    158 }
    159 
    160 // Begin starts a pseudo nested transaction implemented with a savepoint.
    161 func (tx *dbTx) Begin(ctx context.Context) (Tx, error) {
    162 	if tx.closed {
    163 		return nil, ErrTxClosed
    164 	}
    165 
    166 	tx.savepointNum++
    167 	_, err := tx.conn.Exec(ctx, "savepoint sp_"+strconv.FormatInt(tx.savepointNum, 10))
    168 	if err != nil {
    169 		return nil, err
    170 	}
    171 
    172 	return &dbSimulatedNestedTx{tx: tx, savepointNum: tx.savepointNum}, nil
    173 }
    174 
    175 // Commit commits the transaction.
    176 func (tx *dbTx) Commit(ctx context.Context) error {
    177 	if tx.closed {
    178 		return ErrTxClosed
    179 	}
    180 
    181 	commandTag, err := tx.conn.Exec(ctx, "commit")
    182 	tx.closed = true
    183 	if err != nil {
    184 		if tx.conn.PgConn().TxStatus() != 'I' {
    185 			_ = tx.conn.Close(ctx) // already have error to return
    186 		}
    187 		return err
    188 	}
    189 	if commandTag.String() == "ROLLBACK" {
    190 		return ErrTxCommitRollback
    191 	}
    192 
    193 	return nil
    194 }
    195 
    196 // Rollback rolls back the transaction. Rollback will return ErrTxClosed if the
    197 // Tx is already closed, but is otherwise safe to call multiple times. Hence, a
    198 // defer tx.Rollback() is safe even if tx.Commit() will be called first in a
    199 // non-error condition.
    200 func (tx *dbTx) Rollback(ctx context.Context) error {
    201 	if tx.closed {
    202 		return ErrTxClosed
    203 	}
    204 
    205 	_, err := tx.conn.Exec(ctx, "rollback")
    206 	tx.closed = true
    207 	if err != nil {
    208 		// A rollback failure leaves the connection in an undefined state
    209 		tx.conn.die(fmt.Errorf("rollback failed: %w", err))
    210 		return err
    211 	}
    212 
    213 	return nil
    214 }
    215 
    216 // Exec delegates to the underlying *Conn
    217 func (tx *dbTx) Exec(ctx context.Context, sql string, arguments ...any) (commandTag pgconn.CommandTag, err error) {
    218 	if tx.closed {
    219 		return pgconn.CommandTag{}, ErrTxClosed
    220 	}
    221 
    222 	return tx.conn.Exec(ctx, sql, arguments...)
    223 }
    224 
    225 // Prepare delegates to the underlying *Conn
    226 func (tx *dbTx) Prepare(ctx context.Context, name, sql string) (*pgconn.StatementDescription, error) {
    227 	if tx.closed {
    228 		return nil, ErrTxClosed
    229 	}
    230 
    231 	return tx.conn.Prepare(ctx, name, sql)
    232 }
    233 
    234 // Query delegates to the underlying *Conn
    235 func (tx *dbTx) Query(ctx context.Context, sql string, args ...any) (Rows, error) {
    236 	if tx.closed {
    237 		// Because checking for errors can be deferred to the *Rows, build one with the error
    238 		err := ErrTxClosed
    239 		return &baseRows{closed: true, err: err}, err
    240 	}
    241 
    242 	return tx.conn.Query(ctx, sql, args...)
    243 }
    244 
    245 // QueryRow delegates to the underlying *Conn
    246 func (tx *dbTx) QueryRow(ctx context.Context, sql string, args ...any) Row {
    247 	rows, _ := tx.Query(ctx, sql, args...)
    248 	return (*connRow)(rows.(*baseRows))
    249 }
    250 
    251 // CopyFrom delegates to the underlying *Conn
    252 func (tx *dbTx) CopyFrom(ctx context.Context, tableName Identifier, columnNames []string, rowSrc CopyFromSource) (int64, error) {
    253 	if tx.closed {
    254 		return 0, ErrTxClosed
    255 	}
    256 
    257 	return tx.conn.CopyFrom(ctx, tableName, columnNames, rowSrc)
    258 }
    259 
    260 // SendBatch delegates to the underlying *Conn
    261 func (tx *dbTx) SendBatch(ctx context.Context, b *Batch) BatchResults {
    262 	if tx.closed {
    263 		return &batchResults{err: ErrTxClosed}
    264 	}
    265 
    266 	return tx.conn.SendBatch(ctx, b)
    267 }
    268 
    269 // LargeObjects returns a LargeObjects instance for the transaction.
    270 func (tx *dbTx) LargeObjects() LargeObjects {
    271 	return LargeObjects{tx: tx}
    272 }
    273 
    274 func (tx *dbTx) Conn() *Conn {
    275 	return tx.conn
    276 }
    277 
    278 // dbSimulatedNestedTx represents a simulated nested transaction implemented by a savepoint.
    279 type dbSimulatedNestedTx struct {
    280 	tx           Tx
    281 	savepointNum int64
    282 	closed       bool
    283 }
    284 
    285 // Begin starts a pseudo nested transaction implemented with a savepoint.
    286 func (sp *dbSimulatedNestedTx) Begin(ctx context.Context) (Tx, error) {
    287 	if sp.closed {
    288 		return nil, ErrTxClosed
    289 	}
    290 
    291 	return sp.tx.Begin(ctx)
    292 }
    293 
    294 // Commit releases the savepoint essentially committing the pseudo nested transaction.
    295 func (sp *dbSimulatedNestedTx) Commit(ctx context.Context) error {
    296 	if sp.closed {
    297 		return ErrTxClosed
    298 	}
    299 
    300 	_, err := sp.Exec(ctx, "release savepoint sp_"+strconv.FormatInt(sp.savepointNum, 10))
    301 	sp.closed = true
    302 	return err
    303 }
    304 
    305 // Rollback rolls back to the savepoint essentially rolling back the pseudo nested transaction. Rollback will return
    306 // ErrTxClosed if the dbSavepoint is already closed, but is otherwise safe to call multiple times. Hence, a defer sp.Rollback()
    307 // is safe even if sp.Commit() will be called first in a non-error condition.
    308 func (sp *dbSimulatedNestedTx) Rollback(ctx context.Context) error {
    309 	if sp.closed {
    310 		return ErrTxClosed
    311 	}
    312 
    313 	_, err := sp.Exec(ctx, "rollback to savepoint sp_"+strconv.FormatInt(sp.savepointNum, 10))
    314 	sp.closed = true
    315 	return err
    316 }
    317 
    318 // Exec delegates to the underlying Tx
    319 func (sp *dbSimulatedNestedTx) Exec(ctx context.Context, sql string, arguments ...any) (commandTag pgconn.CommandTag, err error) {
    320 	if sp.closed {
    321 		return pgconn.CommandTag{}, ErrTxClosed
    322 	}
    323 
    324 	return sp.tx.Exec(ctx, sql, arguments...)
    325 }
    326 
    327 // Prepare delegates to the underlying Tx
    328 func (sp *dbSimulatedNestedTx) Prepare(ctx context.Context, name, sql string) (*pgconn.StatementDescription, error) {
    329 	if sp.closed {
    330 		return nil, ErrTxClosed
    331 	}
    332 
    333 	return sp.tx.Prepare(ctx, name, sql)
    334 }
    335 
    336 // Query delegates to the underlying Tx
    337 func (sp *dbSimulatedNestedTx) Query(ctx context.Context, sql string, args ...any) (Rows, error) {
    338 	if sp.closed {
    339 		// Because checking for errors can be deferred to the *Rows, build one with the error
    340 		err := ErrTxClosed
    341 		return &baseRows{closed: true, err: err}, err
    342 	}
    343 
    344 	return sp.tx.Query(ctx, sql, args...)
    345 }
    346 
    347 // QueryRow delegates to the underlying Tx
    348 func (sp *dbSimulatedNestedTx) QueryRow(ctx context.Context, sql string, args ...any) Row {
    349 	rows, _ := sp.Query(ctx, sql, args...)
    350 	return (*connRow)(rows.(*baseRows))
    351 }
    352 
    353 // CopyFrom delegates to the underlying *Conn
    354 func (sp *dbSimulatedNestedTx) CopyFrom(ctx context.Context, tableName Identifier, columnNames []string, rowSrc CopyFromSource) (int64, error) {
    355 	if sp.closed {
    356 		return 0, ErrTxClosed
    357 	}
    358 
    359 	return sp.tx.CopyFrom(ctx, tableName, columnNames, rowSrc)
    360 }
    361 
    362 // SendBatch delegates to the underlying *Conn
    363 func (sp *dbSimulatedNestedTx) SendBatch(ctx context.Context, b *Batch) BatchResults {
    364 	if sp.closed {
    365 		return &batchResults{err: ErrTxClosed}
    366 	}
    367 
    368 	return sp.tx.SendBatch(ctx, b)
    369 }
    370 
    371 func (sp *dbSimulatedNestedTx) LargeObjects() LargeObjects {
    372 	return LargeObjects{tx: sp}
    373 }
    374 
    375 func (sp *dbSimulatedNestedTx) Conn() *Conn {
    376 	return sp.tx.Conn()
    377 }
    378 
    379 // BeginFunc calls Begin on db and then calls fn. If fn does not return an error then it calls Commit on db. If fn
    380 // returns an error it calls Rollback on db. The context will be used when executing the transaction control statements
    381 // (BEGIN, ROLLBACK, and COMMIT) but does not otherwise affect the execution of fn.
    382 func BeginFunc(
    383 	ctx context.Context,
    384 	db interface {
    385 		Begin(ctx context.Context) (Tx, error)
    386 	},
    387 	fn func(Tx) error,
    388 ) (err error) {
    389 	var tx Tx
    390 	tx, err = db.Begin(ctx)
    391 	if err != nil {
    392 		return err
    393 	}
    394 
    395 	return beginFuncExec(ctx, tx, fn)
    396 }
    397 
    398 // BeginTxFunc calls BeginTx on db and then calls fn. If fn does not return an error then it calls Commit on db. If fn
    399 // returns an error it calls Rollback on db. The context will be used when executing the transaction control statements
    400 // (BEGIN, ROLLBACK, and COMMIT) but does not otherwise affect the execution of fn.
    401 func BeginTxFunc(
    402 	ctx context.Context,
    403 	db interface {
    404 		BeginTx(ctx context.Context, txOptions TxOptions) (Tx, error)
    405 	},
    406 	txOptions TxOptions,
    407 	fn func(Tx) error,
    408 ) (err error) {
    409 	var tx Tx
    410 	tx, err = db.BeginTx(ctx, txOptions)
    411 	if err != nil {
    412 		return err
    413 	}
    414 
    415 	return beginFuncExec(ctx, tx, fn)
    416 }
    417 
    418 func beginFuncExec(ctx context.Context, tx Tx, fn func(Tx) error) (err error) {
    419 	defer func() {
    420 		rollbackErr := tx.Rollback(ctx)
    421 		if rollbackErr != nil && !errors.Is(rollbackErr, ErrTxClosed) {
    422 			err = rollbackErr
    423 		}
    424 	}()
    425 
    426 	fErr := fn(tx)
    427 	if fErr != nil {
    428 		_ = tx.Rollback(ctx) // ignore rollback error as there is already an error to return
    429 		return fErr
    430 	}
    431 
    432 	return tx.Commit(ctx)
    433 }