transaction.go (3648B)
1 // Copyright 2014 Google Inc. All rights reserved. 2 // Use of this source code is governed by the Apache 2.0 3 // license that can be found in the LICENSE file. 4 5 package internal 6 7 // This file implements hooks for applying datastore transactions. 8 9 import ( 10 "errors" 11 "reflect" 12 13 "github.com/golang/protobuf/proto" 14 netcontext "golang.org/x/net/context" 15 16 basepb "google.golang.org/appengine/internal/base" 17 pb "google.golang.org/appengine/internal/datastore" 18 ) 19 20 var transactionSetters = make(map[reflect.Type]reflect.Value) 21 22 // RegisterTransactionSetter registers a function that sets transaction information 23 // in a protocol buffer message. f should be a function with two arguments, 24 // the first being a protocol buffer type, and the second being *datastore.Transaction. 25 func RegisterTransactionSetter(f interface{}) { 26 v := reflect.ValueOf(f) 27 transactionSetters[v.Type().In(0)] = v 28 } 29 30 // applyTransaction applies the transaction t to message pb 31 // by using the relevant setter passed to RegisterTransactionSetter. 32 func applyTransaction(pb proto.Message, t *pb.Transaction) { 33 v := reflect.ValueOf(pb) 34 if f, ok := transactionSetters[v.Type()]; ok { 35 f.Call([]reflect.Value{v, reflect.ValueOf(t)}) 36 } 37 } 38 39 var transactionKey = "used for *Transaction" 40 41 func transactionFromContext(ctx netcontext.Context) *transaction { 42 t, _ := ctx.Value(&transactionKey).(*transaction) 43 return t 44 } 45 46 func withTransaction(ctx netcontext.Context, t *transaction) netcontext.Context { 47 return netcontext.WithValue(ctx, &transactionKey, t) 48 } 49 50 type transaction struct { 51 transaction pb.Transaction 52 finished bool 53 } 54 55 var ErrConcurrentTransaction = errors.New("internal: concurrent transaction") 56 57 func RunTransactionOnce(c netcontext.Context, f func(netcontext.Context) error, xg bool, readOnly bool, previousTransaction *pb.Transaction) (*pb.Transaction, error) { 58 if transactionFromContext(c) != nil { 59 return nil, errors.New("nested transactions are not supported") 60 } 61 62 // Begin the transaction. 63 t := &transaction{} 64 req := &pb.BeginTransactionRequest{ 65 App: proto.String(FullyQualifiedAppID(c)), 66 } 67 if xg { 68 req.AllowMultipleEg = proto.Bool(true) 69 } 70 if previousTransaction != nil { 71 req.PreviousTransaction = previousTransaction 72 } 73 if readOnly { 74 req.Mode = pb.BeginTransactionRequest_READ_ONLY.Enum() 75 } else { 76 req.Mode = pb.BeginTransactionRequest_READ_WRITE.Enum() 77 } 78 if err := Call(c, "datastore_v3", "BeginTransaction", req, &t.transaction); err != nil { 79 return nil, err 80 } 81 82 // Call f, rolling back the transaction if f returns a non-nil error, or panics. 83 // The panic is not recovered. 84 defer func() { 85 if t.finished { 86 return 87 } 88 t.finished = true 89 // Ignore the error return value, since we are already returning a non-nil 90 // error (or we're panicking). 91 Call(c, "datastore_v3", "Rollback", &t.transaction, &basepb.VoidProto{}) 92 }() 93 if err := f(withTransaction(c, t)); err != nil { 94 return &t.transaction, err 95 } 96 t.finished = true 97 98 // Commit the transaction. 99 res := &pb.CommitResponse{} 100 err := Call(c, "datastore_v3", "Commit", &t.transaction, res) 101 if ae, ok := err.(*APIError); ok { 102 /* TODO: restore this conditional 103 if appengine.IsDevAppServer() { 104 */ 105 // The Python Dev AppServer raises an ApplicationError with error code 2 (which is 106 // Error.CONCURRENT_TRANSACTION) and message "Concurrency exception.". 107 if ae.Code == int32(pb.Error_BAD_REQUEST) && ae.Detail == "ApplicationError: 2 Concurrency exception." { 108 return &t.transaction, ErrConcurrentTransaction 109 } 110 if ae.Code == int32(pb.Error_CONCURRENT_TRANSACTION) { 111 return &t.transaction, ErrConcurrentTransaction 112 } 113 } 114 return &t.transaction, err 115 }