batch.go (10839B)
1 package pgx 2 3 import ( 4 "context" 5 "errors" 6 "fmt" 7 8 "github.com/jackc/pgx/v5/pgconn" 9 ) 10 11 // QueuedQuery is a query that has been queued for execution via a Batch. 12 type QueuedQuery struct { 13 query string 14 arguments []any 15 fn batchItemFunc 16 sd *pgconn.StatementDescription 17 } 18 19 type batchItemFunc func(br BatchResults) error 20 21 // Query sets fn to be called when the response to qq is received. 22 func (qq *QueuedQuery) Query(fn func(rows Rows) error) { 23 qq.fn = func(br BatchResults) error { 24 rows, _ := br.Query() 25 defer rows.Close() 26 27 err := fn(rows) 28 if err != nil { 29 return err 30 } 31 rows.Close() 32 33 return rows.Err() 34 } 35 } 36 37 // Query sets fn to be called when the response to qq is received. 38 func (qq *QueuedQuery) QueryRow(fn func(row Row) error) { 39 qq.fn = func(br BatchResults) error { 40 row := br.QueryRow() 41 return fn(row) 42 } 43 } 44 45 // Exec sets fn to be called when the response to qq is received. 46 func (qq *QueuedQuery) Exec(fn func(ct pgconn.CommandTag) error) { 47 qq.fn = func(br BatchResults) error { 48 ct, err := br.Exec() 49 if err != nil { 50 return err 51 } 52 53 return fn(ct) 54 } 55 } 56 57 // Batch queries are a way of bundling multiple queries together to avoid 58 // unnecessary network round trips. A Batch must only be sent once. 59 type Batch struct { 60 queuedQueries []*QueuedQuery 61 } 62 63 // Queue queues a query to batch b. query can be an SQL query or the name of a prepared statement. 64 func (b *Batch) Queue(query string, arguments ...any) *QueuedQuery { 65 qq := &QueuedQuery{ 66 query: query, 67 arguments: arguments, 68 } 69 b.queuedQueries = append(b.queuedQueries, qq) 70 return qq 71 } 72 73 // Len returns number of queries that have been queued so far. 74 func (b *Batch) Len() int { 75 return len(b.queuedQueries) 76 } 77 78 type BatchResults interface { 79 // Exec reads the results from the next query in the batch as if the query has been sent with Conn.Exec. Prefer 80 // calling Exec on the QueuedQuery. 81 Exec() (pgconn.CommandTag, error) 82 83 // Query reads the results from the next query in the batch as if the query has been sent with Conn.Query. Prefer 84 // calling Query on the QueuedQuery. 85 Query() (Rows, error) 86 87 // QueryRow reads the results from the next query in the batch as if the query has been sent with Conn.QueryRow. 88 // Prefer calling QueryRow on the QueuedQuery. 89 QueryRow() Row 90 91 // Close closes the batch operation. All unread results are read and any callback functions registered with 92 // QueuedQuery.Query, QueuedQuery.QueryRow, or QueuedQuery.Exec will be called. If a callback function returns an 93 // error or the batch encounters an error subsequent callback functions will not be called. 94 // 95 // Close must be called before the underlying connection can be used again. Any error that occurred during a batch 96 // operation may have made it impossible to resyncronize the connection with the server. In this case the underlying 97 // connection will have been closed. 98 // 99 // Close is safe to call multiple times. If it returns an error subsequent calls will return the same error. Callback 100 // functions will not be rerun. 101 Close() error 102 } 103 104 type batchResults struct { 105 ctx context.Context 106 conn *Conn 107 mrr *pgconn.MultiResultReader 108 err error 109 b *Batch 110 qqIdx int 111 closed bool 112 endTraced bool 113 } 114 115 // Exec reads the results from the next query in the batch as if the query has been sent with Exec. 116 func (br *batchResults) Exec() (pgconn.CommandTag, error) { 117 if br.err != nil { 118 return pgconn.CommandTag{}, br.err 119 } 120 if br.closed { 121 return pgconn.CommandTag{}, fmt.Errorf("batch already closed") 122 } 123 124 query, arguments, _ := br.nextQueryAndArgs() 125 126 if !br.mrr.NextResult() { 127 err := br.mrr.Close() 128 if err == nil { 129 err = errors.New("no result") 130 } 131 if br.conn.batchTracer != nil { 132 br.conn.batchTracer.TraceBatchQuery(br.ctx, br.conn, TraceBatchQueryData{ 133 SQL: query, 134 Args: arguments, 135 Err: err, 136 }) 137 } 138 return pgconn.CommandTag{}, err 139 } 140 141 commandTag, err := br.mrr.ResultReader().Close() 142 if err != nil { 143 br.err = err 144 br.mrr.Close() 145 } 146 147 if br.conn.batchTracer != nil { 148 br.conn.batchTracer.TraceBatchQuery(br.ctx, br.conn, TraceBatchQueryData{ 149 SQL: query, 150 Args: arguments, 151 CommandTag: commandTag, 152 Err: br.err, 153 }) 154 } 155 156 return commandTag, br.err 157 } 158 159 // Query reads the results from the next query in the batch as if the query has been sent with Query. 160 func (br *batchResults) Query() (Rows, error) { 161 query, arguments, ok := br.nextQueryAndArgs() 162 if !ok { 163 query = "batch query" 164 } 165 166 if br.err != nil { 167 return &baseRows{err: br.err, closed: true}, br.err 168 } 169 170 if br.closed { 171 alreadyClosedErr := fmt.Errorf("batch already closed") 172 return &baseRows{err: alreadyClosedErr, closed: true}, alreadyClosedErr 173 } 174 175 rows := br.conn.getRows(br.ctx, query, arguments) 176 rows.batchTracer = br.conn.batchTracer 177 178 if !br.mrr.NextResult() { 179 rows.err = br.mrr.Close() 180 if rows.err == nil { 181 rows.err = errors.New("no result") 182 } 183 rows.closed = true 184 185 if br.conn.batchTracer != nil { 186 br.conn.batchTracer.TraceBatchQuery(br.ctx, br.conn, TraceBatchQueryData{ 187 SQL: query, 188 Args: arguments, 189 Err: rows.err, 190 }) 191 } 192 193 return rows, rows.err 194 } 195 196 rows.resultReader = br.mrr.ResultReader() 197 return rows, nil 198 } 199 200 // QueryRow reads the results from the next query in the batch as if the query has been sent with QueryRow. 201 func (br *batchResults) QueryRow() Row { 202 rows, _ := br.Query() 203 return (*connRow)(rows.(*baseRows)) 204 205 } 206 207 // Close closes the batch operation. Any error that occurred during a batch operation may have made it impossible to 208 // resyncronize the connection with the server. In this case the underlying connection will have been closed. 209 func (br *batchResults) Close() error { 210 defer func() { 211 if !br.endTraced { 212 if br.conn != nil && br.conn.batchTracer != nil { 213 br.conn.batchTracer.TraceBatchEnd(br.ctx, br.conn, TraceBatchEndData{Err: br.err}) 214 } 215 br.endTraced = true 216 } 217 }() 218 219 if br.err != nil { 220 return br.err 221 } 222 223 if br.closed { 224 return nil 225 } 226 227 // Read and run fn for all remaining items 228 for br.err == nil && !br.closed && br.b != nil && br.qqIdx < len(br.b.queuedQueries) { 229 if br.b.queuedQueries[br.qqIdx].fn != nil { 230 err := br.b.queuedQueries[br.qqIdx].fn(br) 231 if err != nil { 232 br.err = err 233 } 234 } else { 235 br.Exec() 236 } 237 } 238 239 br.closed = true 240 241 err := br.mrr.Close() 242 if br.err == nil { 243 br.err = err 244 } 245 246 return br.err 247 } 248 249 func (br *batchResults) earlyError() error { 250 return br.err 251 } 252 253 func (br *batchResults) nextQueryAndArgs() (query string, args []any, ok bool) { 254 if br.b != nil && br.qqIdx < len(br.b.queuedQueries) { 255 bi := br.b.queuedQueries[br.qqIdx] 256 query = bi.query 257 args = bi.arguments 258 ok = true 259 br.qqIdx++ 260 } 261 return 262 } 263 264 type pipelineBatchResults struct { 265 ctx context.Context 266 conn *Conn 267 pipeline *pgconn.Pipeline 268 lastRows *baseRows 269 err error 270 b *Batch 271 qqIdx int 272 closed bool 273 endTraced bool 274 } 275 276 // Exec reads the results from the next query in the batch as if the query has been sent with Exec. 277 func (br *pipelineBatchResults) Exec() (pgconn.CommandTag, error) { 278 if br.err != nil { 279 return pgconn.CommandTag{}, br.err 280 } 281 if br.closed { 282 return pgconn.CommandTag{}, fmt.Errorf("batch already closed") 283 } 284 if br.lastRows != nil && br.lastRows.err != nil { 285 return pgconn.CommandTag{}, br.err 286 } 287 288 query, arguments, _ := br.nextQueryAndArgs() 289 290 results, err := br.pipeline.GetResults() 291 if err != nil { 292 br.err = err 293 return pgconn.CommandTag{}, br.err 294 } 295 var commandTag pgconn.CommandTag 296 switch results := results.(type) { 297 case *pgconn.ResultReader: 298 commandTag, br.err = results.Close() 299 default: 300 return pgconn.CommandTag{}, fmt.Errorf("unexpected pipeline result: %T", results) 301 } 302 303 if br.conn.batchTracer != nil { 304 br.conn.batchTracer.TraceBatchQuery(br.ctx, br.conn, TraceBatchQueryData{ 305 SQL: query, 306 Args: arguments, 307 CommandTag: commandTag, 308 Err: br.err, 309 }) 310 } 311 312 return commandTag, br.err 313 } 314 315 // Query reads the results from the next query in the batch as if the query has been sent with Query. 316 func (br *pipelineBatchResults) Query() (Rows, error) { 317 if br.err != nil { 318 return &baseRows{err: br.err, closed: true}, br.err 319 } 320 321 if br.closed { 322 alreadyClosedErr := fmt.Errorf("batch already closed") 323 return &baseRows{err: alreadyClosedErr, closed: true}, alreadyClosedErr 324 } 325 326 if br.lastRows != nil && br.lastRows.err != nil { 327 br.err = br.lastRows.err 328 return &baseRows{err: br.err, closed: true}, br.err 329 } 330 331 query, arguments, ok := br.nextQueryAndArgs() 332 if !ok { 333 query = "batch query" 334 } 335 336 rows := br.conn.getRows(br.ctx, query, arguments) 337 rows.batchTracer = br.conn.batchTracer 338 br.lastRows = rows 339 340 results, err := br.pipeline.GetResults() 341 if err != nil { 342 br.err = err 343 rows.err = err 344 rows.closed = true 345 346 if br.conn.batchTracer != nil { 347 br.conn.batchTracer.TraceBatchQuery(br.ctx, br.conn, TraceBatchQueryData{ 348 SQL: query, 349 Args: arguments, 350 Err: err, 351 }) 352 } 353 } else { 354 switch results := results.(type) { 355 case *pgconn.ResultReader: 356 rows.resultReader = results 357 default: 358 err = fmt.Errorf("unexpected pipeline result: %T", results) 359 br.err = err 360 rows.err = err 361 rows.closed = true 362 } 363 } 364 365 return rows, rows.err 366 } 367 368 // QueryRow reads the results from the next query in the batch as if the query has been sent with QueryRow. 369 func (br *pipelineBatchResults) QueryRow() Row { 370 rows, _ := br.Query() 371 return (*connRow)(rows.(*baseRows)) 372 373 } 374 375 // Close closes the batch operation. Any error that occurred during a batch operation may have made it impossible to 376 // resyncronize the connection with the server. In this case the underlying connection will have been closed. 377 func (br *pipelineBatchResults) Close() error { 378 defer func() { 379 if !br.endTraced { 380 if br.conn.batchTracer != nil { 381 br.conn.batchTracer.TraceBatchEnd(br.ctx, br.conn, TraceBatchEndData{Err: br.err}) 382 } 383 br.endTraced = true 384 } 385 }() 386 387 if br.err == nil && br.lastRows != nil && br.lastRows.err != nil { 388 br.err = br.lastRows.err 389 return br.err 390 } 391 392 if br.closed { 393 return br.err 394 } 395 396 // Read and run fn for all remaining items 397 for br.err == nil && !br.closed && br.b != nil && br.qqIdx < len(br.b.queuedQueries) { 398 if br.b.queuedQueries[br.qqIdx].fn != nil { 399 err := br.b.queuedQueries[br.qqIdx].fn(br) 400 if err != nil { 401 br.err = err 402 } 403 } else { 404 br.Exec() 405 } 406 } 407 408 br.closed = true 409 410 err := br.pipeline.Close() 411 if br.err == nil { 412 br.err = err 413 } 414 415 return br.err 416 } 417 418 func (br *pipelineBatchResults) earlyError() error { 419 return br.err 420 } 421 422 func (br *pipelineBatchResults) nextQueryAndArgs() (query string, args []any, ok bool) { 423 if br.b != nil && br.qqIdx < len(br.b.queuedQueries) { 424 bi := br.b.queuedQueries[br.qqIdx] 425 query = bi.query 426 args = bi.arguments 427 ok = true 428 br.qqIdx++ 429 } 430 return 431 }