tx.go (13600B)
1 package pgx 2 3 import ( 4 "context" 5 "errors" 6 "fmt" 7 "strconv" 8 "strings" 9 10 "github.com/jackc/pgx/v5/pgconn" 11 ) 12 13 // TxIsoLevel is the transaction isolation level (serializable, repeatable read, read committed or read uncommitted) 14 type TxIsoLevel string 15 16 // Transaction isolation levels 17 const ( 18 Serializable TxIsoLevel = "serializable" 19 RepeatableRead TxIsoLevel = "repeatable read" 20 ReadCommitted TxIsoLevel = "read committed" 21 ReadUncommitted TxIsoLevel = "read uncommitted" 22 ) 23 24 // TxAccessMode is the transaction access mode (read write or read only) 25 type TxAccessMode string 26 27 // Transaction access modes 28 const ( 29 ReadWrite TxAccessMode = "read write" 30 ReadOnly TxAccessMode = "read only" 31 ) 32 33 // TxDeferrableMode is the transaction deferrable mode (deferrable or not deferrable) 34 type TxDeferrableMode string 35 36 // Transaction deferrable modes 37 const ( 38 Deferrable TxDeferrableMode = "deferrable" 39 NotDeferrable TxDeferrableMode = "not deferrable" 40 ) 41 42 // TxOptions are transaction modes within a transaction block 43 type TxOptions struct { 44 IsoLevel TxIsoLevel 45 AccessMode TxAccessMode 46 DeferrableMode TxDeferrableMode 47 48 // BeginQuery is the SQL query that will be executed to begin the transaction. This allows using non-standard syntax 49 // such as BEGIN PRIORITY HIGH with CockroachDB. If set this will override the other settings. 50 BeginQuery string 51 } 52 53 var emptyTxOptions TxOptions 54 55 func (txOptions TxOptions) beginSQL() string { 56 if txOptions == emptyTxOptions { 57 return "begin" 58 } 59 60 if txOptions.BeginQuery != "" { 61 return txOptions.BeginQuery 62 } 63 64 var buf strings.Builder 65 buf.Grow(64) // 64 - maximum length of string with available options 66 buf.WriteString("begin") 67 68 if txOptions.IsoLevel != "" { 69 buf.WriteString(" isolation level ") 70 buf.WriteString(string(txOptions.IsoLevel)) 71 } 72 if txOptions.AccessMode != "" { 73 buf.WriteByte(' ') 74 buf.WriteString(string(txOptions.AccessMode)) 75 } 76 if txOptions.DeferrableMode != "" { 77 buf.WriteByte(' ') 78 buf.WriteString(string(txOptions.DeferrableMode)) 79 } 80 81 return buf.String() 82 } 83 84 var ErrTxClosed = errors.New("tx is closed") 85 86 // ErrTxCommitRollback occurs when an error has occurred in a transaction and 87 // Commit() is called. PostgreSQL accepts COMMIT on aborted transactions, but 88 // it is treated as ROLLBACK. 89 var ErrTxCommitRollback = errors.New("commit unexpectedly resulted in rollback") 90 91 // Begin starts a transaction. Unlike database/sql, the context only affects the begin command. i.e. there is no 92 // auto-rollback on context cancellation. 93 func (c *Conn) Begin(ctx context.Context) (Tx, error) { 94 return c.BeginTx(ctx, TxOptions{}) 95 } 96 97 // BeginTx starts a transaction with txOptions determining the transaction mode. Unlike database/sql, the context only 98 // affects the begin command. i.e. there is no auto-rollback on context cancellation. 99 func (c *Conn) BeginTx(ctx context.Context, txOptions TxOptions) (Tx, error) { 100 _, err := c.Exec(ctx, txOptions.beginSQL()) 101 if err != nil { 102 // begin should never fail unless there is an underlying connection issue or 103 // a context timeout. In either case, the connection is possibly broken. 104 c.die(errors.New("failed to begin transaction")) 105 return nil, err 106 } 107 108 return &dbTx{conn: c}, nil 109 } 110 111 // Tx represents a database transaction. 112 // 113 // Tx is an interface instead of a struct to enable connection pools to be implemented without relying on internal pgx 114 // state, to support pseudo-nested transactions with savepoints, and to allow tests to mock transactions. However, 115 // adding a method to an interface is technically a breaking change. If new methods are added to Conn it may be 116 // desirable to add them to Tx as well. Because of this the Tx interface is partially excluded from semantic version 117 // requirements. Methods will not be removed or changed, but new methods may be added. 118 type Tx interface { 119 // Begin starts a pseudo nested transaction. 120 Begin(ctx context.Context) (Tx, error) 121 122 // Commit commits the transaction if this is a real transaction or releases the savepoint if this is a pseudo nested 123 // transaction. Commit will return an error where errors.Is(ErrTxClosed) is true if the Tx is already closed, but is 124 // otherwise safe to call multiple times. If the commit fails with a rollback status (e.g. the transaction was already 125 // in a broken state) then an error where errors.Is(ErrTxCommitRollback) is true will be returned. 126 Commit(ctx context.Context) error 127 128 // Rollback rolls back the transaction if this is a real transaction or rolls back to the savepoint if this is a 129 // pseudo nested transaction. Rollback will return an error where errors.Is(ErrTxClosed) is true if the Tx is already 130 // closed, but is otherwise safe to call multiple times. Hence, a defer tx.Rollback() is safe even if tx.Commit() will 131 // be called first in a non-error condition. Any other failure of a real transaction will result in the connection 132 // being closed. 133 Rollback(ctx context.Context) error 134 135 CopyFrom(ctx context.Context, tableName Identifier, columnNames []string, rowSrc CopyFromSource) (int64, error) 136 SendBatch(ctx context.Context, b *Batch) BatchResults 137 LargeObjects() LargeObjects 138 139 Prepare(ctx context.Context, name, sql string) (*pgconn.StatementDescription, error) 140 141 Exec(ctx context.Context, sql string, arguments ...any) (commandTag pgconn.CommandTag, err error) 142 Query(ctx context.Context, sql string, args ...any) (Rows, error) 143 QueryRow(ctx context.Context, sql string, args ...any) Row 144 145 // Conn returns the underlying *Conn that on which this transaction is executing. 146 Conn() *Conn 147 } 148 149 // dbTx represents a database transaction. 150 // 151 // All dbTx methods return ErrTxClosed if Commit or Rollback has already been 152 // called on the dbTx. 153 type dbTx struct { 154 conn *Conn 155 err error 156 savepointNum int64 157 closed bool 158 } 159 160 // Begin starts a pseudo nested transaction implemented with a savepoint. 161 func (tx *dbTx) Begin(ctx context.Context) (Tx, error) { 162 if tx.closed { 163 return nil, ErrTxClosed 164 } 165 166 tx.savepointNum++ 167 _, err := tx.conn.Exec(ctx, "savepoint sp_"+strconv.FormatInt(tx.savepointNum, 10)) 168 if err != nil { 169 return nil, err 170 } 171 172 return &dbSimulatedNestedTx{tx: tx, savepointNum: tx.savepointNum}, nil 173 } 174 175 // Commit commits the transaction. 176 func (tx *dbTx) Commit(ctx context.Context) error { 177 if tx.closed { 178 return ErrTxClosed 179 } 180 181 commandTag, err := tx.conn.Exec(ctx, "commit") 182 tx.closed = true 183 if err != nil { 184 if tx.conn.PgConn().TxStatus() != 'I' { 185 _ = tx.conn.Close(ctx) // already have error to return 186 } 187 return err 188 } 189 if commandTag.String() == "ROLLBACK" { 190 return ErrTxCommitRollback 191 } 192 193 return nil 194 } 195 196 // Rollback rolls back the transaction. Rollback will return ErrTxClosed if the 197 // Tx is already closed, but is otherwise safe to call multiple times. Hence, a 198 // defer tx.Rollback() is safe even if tx.Commit() will be called first in a 199 // non-error condition. 200 func (tx *dbTx) Rollback(ctx context.Context) error { 201 if tx.closed { 202 return ErrTxClosed 203 } 204 205 _, err := tx.conn.Exec(ctx, "rollback") 206 tx.closed = true 207 if err != nil { 208 // A rollback failure leaves the connection in an undefined state 209 tx.conn.die(fmt.Errorf("rollback failed: %w", err)) 210 return err 211 } 212 213 return nil 214 } 215 216 // Exec delegates to the underlying *Conn 217 func (tx *dbTx) Exec(ctx context.Context, sql string, arguments ...any) (commandTag pgconn.CommandTag, err error) { 218 if tx.closed { 219 return pgconn.CommandTag{}, ErrTxClosed 220 } 221 222 return tx.conn.Exec(ctx, sql, arguments...) 223 } 224 225 // Prepare delegates to the underlying *Conn 226 func (tx *dbTx) Prepare(ctx context.Context, name, sql string) (*pgconn.StatementDescription, error) { 227 if tx.closed { 228 return nil, ErrTxClosed 229 } 230 231 return tx.conn.Prepare(ctx, name, sql) 232 } 233 234 // Query delegates to the underlying *Conn 235 func (tx *dbTx) Query(ctx context.Context, sql string, args ...any) (Rows, error) { 236 if tx.closed { 237 // Because checking for errors can be deferred to the *Rows, build one with the error 238 err := ErrTxClosed 239 return &baseRows{closed: true, err: err}, err 240 } 241 242 return tx.conn.Query(ctx, sql, args...) 243 } 244 245 // QueryRow delegates to the underlying *Conn 246 func (tx *dbTx) QueryRow(ctx context.Context, sql string, args ...any) Row { 247 rows, _ := tx.Query(ctx, sql, args...) 248 return (*connRow)(rows.(*baseRows)) 249 } 250 251 // CopyFrom delegates to the underlying *Conn 252 func (tx *dbTx) CopyFrom(ctx context.Context, tableName Identifier, columnNames []string, rowSrc CopyFromSource) (int64, error) { 253 if tx.closed { 254 return 0, ErrTxClosed 255 } 256 257 return tx.conn.CopyFrom(ctx, tableName, columnNames, rowSrc) 258 } 259 260 // SendBatch delegates to the underlying *Conn 261 func (tx *dbTx) SendBatch(ctx context.Context, b *Batch) BatchResults { 262 if tx.closed { 263 return &batchResults{err: ErrTxClosed} 264 } 265 266 return tx.conn.SendBatch(ctx, b) 267 } 268 269 // LargeObjects returns a LargeObjects instance for the transaction. 270 func (tx *dbTx) LargeObjects() LargeObjects { 271 return LargeObjects{tx: tx} 272 } 273 274 func (tx *dbTx) Conn() *Conn { 275 return tx.conn 276 } 277 278 // dbSimulatedNestedTx represents a simulated nested transaction implemented by a savepoint. 279 type dbSimulatedNestedTx struct { 280 tx Tx 281 savepointNum int64 282 closed bool 283 } 284 285 // Begin starts a pseudo nested transaction implemented with a savepoint. 286 func (sp *dbSimulatedNestedTx) Begin(ctx context.Context) (Tx, error) { 287 if sp.closed { 288 return nil, ErrTxClosed 289 } 290 291 return sp.tx.Begin(ctx) 292 } 293 294 // Commit releases the savepoint essentially committing the pseudo nested transaction. 295 func (sp *dbSimulatedNestedTx) Commit(ctx context.Context) error { 296 if sp.closed { 297 return ErrTxClosed 298 } 299 300 _, err := sp.Exec(ctx, "release savepoint sp_"+strconv.FormatInt(sp.savepointNum, 10)) 301 sp.closed = true 302 return err 303 } 304 305 // Rollback rolls back to the savepoint essentially rolling back the pseudo nested transaction. Rollback will return 306 // ErrTxClosed if the dbSavepoint is already closed, but is otherwise safe to call multiple times. Hence, a defer sp.Rollback() 307 // is safe even if sp.Commit() will be called first in a non-error condition. 308 func (sp *dbSimulatedNestedTx) Rollback(ctx context.Context) error { 309 if sp.closed { 310 return ErrTxClosed 311 } 312 313 _, err := sp.Exec(ctx, "rollback to savepoint sp_"+strconv.FormatInt(sp.savepointNum, 10)) 314 sp.closed = true 315 return err 316 } 317 318 // Exec delegates to the underlying Tx 319 func (sp *dbSimulatedNestedTx) Exec(ctx context.Context, sql string, arguments ...any) (commandTag pgconn.CommandTag, err error) { 320 if sp.closed { 321 return pgconn.CommandTag{}, ErrTxClosed 322 } 323 324 return sp.tx.Exec(ctx, sql, arguments...) 325 } 326 327 // Prepare delegates to the underlying Tx 328 func (sp *dbSimulatedNestedTx) Prepare(ctx context.Context, name, sql string) (*pgconn.StatementDescription, error) { 329 if sp.closed { 330 return nil, ErrTxClosed 331 } 332 333 return sp.tx.Prepare(ctx, name, sql) 334 } 335 336 // Query delegates to the underlying Tx 337 func (sp *dbSimulatedNestedTx) Query(ctx context.Context, sql string, args ...any) (Rows, error) { 338 if sp.closed { 339 // Because checking for errors can be deferred to the *Rows, build one with the error 340 err := ErrTxClosed 341 return &baseRows{closed: true, err: err}, err 342 } 343 344 return sp.tx.Query(ctx, sql, args...) 345 } 346 347 // QueryRow delegates to the underlying Tx 348 func (sp *dbSimulatedNestedTx) QueryRow(ctx context.Context, sql string, args ...any) Row { 349 rows, _ := sp.Query(ctx, sql, args...) 350 return (*connRow)(rows.(*baseRows)) 351 } 352 353 // CopyFrom delegates to the underlying *Conn 354 func (sp *dbSimulatedNestedTx) CopyFrom(ctx context.Context, tableName Identifier, columnNames []string, rowSrc CopyFromSource) (int64, error) { 355 if sp.closed { 356 return 0, ErrTxClosed 357 } 358 359 return sp.tx.CopyFrom(ctx, tableName, columnNames, rowSrc) 360 } 361 362 // SendBatch delegates to the underlying *Conn 363 func (sp *dbSimulatedNestedTx) SendBatch(ctx context.Context, b *Batch) BatchResults { 364 if sp.closed { 365 return &batchResults{err: ErrTxClosed} 366 } 367 368 return sp.tx.SendBatch(ctx, b) 369 } 370 371 func (sp *dbSimulatedNestedTx) LargeObjects() LargeObjects { 372 return LargeObjects{tx: sp} 373 } 374 375 func (sp *dbSimulatedNestedTx) Conn() *Conn { 376 return sp.tx.Conn() 377 } 378 379 // BeginFunc calls Begin on db and then calls fn. If fn does not return an error then it calls Commit on db. If fn 380 // returns an error it calls Rollback on db. The context will be used when executing the transaction control statements 381 // (BEGIN, ROLLBACK, and COMMIT) but does not otherwise affect the execution of fn. 382 func BeginFunc( 383 ctx context.Context, 384 db interface { 385 Begin(ctx context.Context) (Tx, error) 386 }, 387 fn func(Tx) error, 388 ) (err error) { 389 var tx Tx 390 tx, err = db.Begin(ctx) 391 if err != nil { 392 return err 393 } 394 395 return beginFuncExec(ctx, tx, fn) 396 } 397 398 // BeginTxFunc calls BeginTx on db and then calls fn. If fn does not return an error then it calls Commit on db. If fn 399 // returns an error it calls Rollback on db. The context will be used when executing the transaction control statements 400 // (BEGIN, ROLLBACK, and COMMIT) but does not otherwise affect the execution of fn. 401 func BeginTxFunc( 402 ctx context.Context, 403 db interface { 404 BeginTx(ctx context.Context, txOptions TxOptions) (Tx, error) 405 }, 406 txOptions TxOptions, 407 fn func(Tx) error, 408 ) (err error) { 409 var tx Tx 410 tx, err = db.BeginTx(ctx, txOptions) 411 if err != nil { 412 return err 413 } 414 415 return beginFuncExec(ctx, tx, fn) 416 } 417 418 func beginFuncExec(ctx context.Context, tx Tx, fn func(Tx) error) (err error) { 419 defer func() { 420 rollbackErr := tx.Rollback(ctx) 421 if rollbackErr != nil && !errors.Is(rollbackErr, ErrTxClosed) { 422 err = rollbackErr 423 } 424 }() 425 426 fErr := fn(tx) 427 if fErr != nil { 428 _ = tx.Rollback(ctx) // ignore rollback error as there is already an error to return 429 return fErr 430 } 431 432 return tx.Commit(ctx) 433 }