gtsocial-umbx

Unnamed repository; edit this file 'description' to name the repository.
Log | Files | Refs | README | LICENSE

batch.go (10839B)


      1 package pgx
      2 
      3 import (
      4 	"context"
      5 	"errors"
      6 	"fmt"
      7 
      8 	"github.com/jackc/pgx/v5/pgconn"
      9 )
     10 
     11 // QueuedQuery is a query that has been queued for execution via a Batch.
     12 type QueuedQuery struct {
     13 	query     string
     14 	arguments []any
     15 	fn        batchItemFunc
     16 	sd        *pgconn.StatementDescription
     17 }
     18 
     19 type batchItemFunc func(br BatchResults) error
     20 
     21 // Query sets fn to be called when the response to qq is received.
     22 func (qq *QueuedQuery) Query(fn func(rows Rows) error) {
     23 	qq.fn = func(br BatchResults) error {
     24 		rows, _ := br.Query()
     25 		defer rows.Close()
     26 
     27 		err := fn(rows)
     28 		if err != nil {
     29 			return err
     30 		}
     31 		rows.Close()
     32 
     33 		return rows.Err()
     34 	}
     35 }
     36 
     37 // Query sets fn to be called when the response to qq is received.
     38 func (qq *QueuedQuery) QueryRow(fn func(row Row) error) {
     39 	qq.fn = func(br BatchResults) error {
     40 		row := br.QueryRow()
     41 		return fn(row)
     42 	}
     43 }
     44 
     45 // Exec sets fn to be called when the response to qq is received.
     46 func (qq *QueuedQuery) Exec(fn func(ct pgconn.CommandTag) error) {
     47 	qq.fn = func(br BatchResults) error {
     48 		ct, err := br.Exec()
     49 		if err != nil {
     50 			return err
     51 		}
     52 
     53 		return fn(ct)
     54 	}
     55 }
     56 
     57 // Batch queries are a way of bundling multiple queries together to avoid
     58 // unnecessary network round trips. A Batch must only be sent once.
     59 type Batch struct {
     60 	queuedQueries []*QueuedQuery
     61 }
     62 
     63 // Queue queues a query to batch b. query can be an SQL query or the name of a prepared statement.
     64 func (b *Batch) Queue(query string, arguments ...any) *QueuedQuery {
     65 	qq := &QueuedQuery{
     66 		query:     query,
     67 		arguments: arguments,
     68 	}
     69 	b.queuedQueries = append(b.queuedQueries, qq)
     70 	return qq
     71 }
     72 
     73 // Len returns number of queries that have been queued so far.
     74 func (b *Batch) Len() int {
     75 	return len(b.queuedQueries)
     76 }
     77 
     78 type BatchResults interface {
     79 	// Exec reads the results from the next query in the batch as if the query has been sent with Conn.Exec. Prefer
     80 	// calling Exec on the QueuedQuery.
     81 	Exec() (pgconn.CommandTag, error)
     82 
     83 	// Query reads the results from the next query in the batch as if the query has been sent with Conn.Query. Prefer
     84 	// calling Query on the QueuedQuery.
     85 	Query() (Rows, error)
     86 
     87 	// QueryRow reads the results from the next query in the batch as if the query has been sent with Conn.QueryRow.
     88 	// Prefer calling QueryRow on the QueuedQuery.
     89 	QueryRow() Row
     90 
     91 	// Close closes the batch operation. All unread results are read and any callback functions registered with
     92 	// QueuedQuery.Query, QueuedQuery.QueryRow, or QueuedQuery.Exec will be called. If a callback function returns an
     93 	// error or the batch encounters an error subsequent callback functions will not be called.
     94 	//
     95 	// Close must be called before the underlying connection can be used again. Any error that occurred during a batch
     96 	// operation may have made it impossible to resyncronize the connection with the server. In this case the underlying
     97 	// connection will have been closed.
     98 	//
     99 	// Close is safe to call multiple times. If it returns an error subsequent calls will return the same error. Callback
    100 	// functions will not be rerun.
    101 	Close() error
    102 }
    103 
    104 type batchResults struct {
    105 	ctx       context.Context
    106 	conn      *Conn
    107 	mrr       *pgconn.MultiResultReader
    108 	err       error
    109 	b         *Batch
    110 	qqIdx     int
    111 	closed    bool
    112 	endTraced bool
    113 }
    114 
    115 // Exec reads the results from the next query in the batch as if the query has been sent with Exec.
    116 func (br *batchResults) Exec() (pgconn.CommandTag, error) {
    117 	if br.err != nil {
    118 		return pgconn.CommandTag{}, br.err
    119 	}
    120 	if br.closed {
    121 		return pgconn.CommandTag{}, fmt.Errorf("batch already closed")
    122 	}
    123 
    124 	query, arguments, _ := br.nextQueryAndArgs()
    125 
    126 	if !br.mrr.NextResult() {
    127 		err := br.mrr.Close()
    128 		if err == nil {
    129 			err = errors.New("no result")
    130 		}
    131 		if br.conn.batchTracer != nil {
    132 			br.conn.batchTracer.TraceBatchQuery(br.ctx, br.conn, TraceBatchQueryData{
    133 				SQL:  query,
    134 				Args: arguments,
    135 				Err:  err,
    136 			})
    137 		}
    138 		return pgconn.CommandTag{}, err
    139 	}
    140 
    141 	commandTag, err := br.mrr.ResultReader().Close()
    142 	if err != nil {
    143 		br.err = err
    144 		br.mrr.Close()
    145 	}
    146 
    147 	if br.conn.batchTracer != nil {
    148 		br.conn.batchTracer.TraceBatchQuery(br.ctx, br.conn, TraceBatchQueryData{
    149 			SQL:        query,
    150 			Args:       arguments,
    151 			CommandTag: commandTag,
    152 			Err:        br.err,
    153 		})
    154 	}
    155 
    156 	return commandTag, br.err
    157 }
    158 
    159 // Query reads the results from the next query in the batch as if the query has been sent with Query.
    160 func (br *batchResults) Query() (Rows, error) {
    161 	query, arguments, ok := br.nextQueryAndArgs()
    162 	if !ok {
    163 		query = "batch query"
    164 	}
    165 
    166 	if br.err != nil {
    167 		return &baseRows{err: br.err, closed: true}, br.err
    168 	}
    169 
    170 	if br.closed {
    171 		alreadyClosedErr := fmt.Errorf("batch already closed")
    172 		return &baseRows{err: alreadyClosedErr, closed: true}, alreadyClosedErr
    173 	}
    174 
    175 	rows := br.conn.getRows(br.ctx, query, arguments)
    176 	rows.batchTracer = br.conn.batchTracer
    177 
    178 	if !br.mrr.NextResult() {
    179 		rows.err = br.mrr.Close()
    180 		if rows.err == nil {
    181 			rows.err = errors.New("no result")
    182 		}
    183 		rows.closed = true
    184 
    185 		if br.conn.batchTracer != nil {
    186 			br.conn.batchTracer.TraceBatchQuery(br.ctx, br.conn, TraceBatchQueryData{
    187 				SQL:  query,
    188 				Args: arguments,
    189 				Err:  rows.err,
    190 			})
    191 		}
    192 
    193 		return rows, rows.err
    194 	}
    195 
    196 	rows.resultReader = br.mrr.ResultReader()
    197 	return rows, nil
    198 }
    199 
    200 // QueryRow reads the results from the next query in the batch as if the query has been sent with QueryRow.
    201 func (br *batchResults) QueryRow() Row {
    202 	rows, _ := br.Query()
    203 	return (*connRow)(rows.(*baseRows))
    204 
    205 }
    206 
    207 // Close closes the batch operation. Any error that occurred during a batch operation may have made it impossible to
    208 // resyncronize the connection with the server. In this case the underlying connection will have been closed.
    209 func (br *batchResults) Close() error {
    210 	defer func() {
    211 		if !br.endTraced {
    212 			if br.conn != nil && br.conn.batchTracer != nil {
    213 				br.conn.batchTracer.TraceBatchEnd(br.ctx, br.conn, TraceBatchEndData{Err: br.err})
    214 			}
    215 			br.endTraced = true
    216 		}
    217 	}()
    218 
    219 	if br.err != nil {
    220 		return br.err
    221 	}
    222 
    223 	if br.closed {
    224 		return nil
    225 	}
    226 
    227 	// Read and run fn for all remaining items
    228 	for br.err == nil && !br.closed && br.b != nil && br.qqIdx < len(br.b.queuedQueries) {
    229 		if br.b.queuedQueries[br.qqIdx].fn != nil {
    230 			err := br.b.queuedQueries[br.qqIdx].fn(br)
    231 			if err != nil {
    232 				br.err = err
    233 			}
    234 		} else {
    235 			br.Exec()
    236 		}
    237 	}
    238 
    239 	br.closed = true
    240 
    241 	err := br.mrr.Close()
    242 	if br.err == nil {
    243 		br.err = err
    244 	}
    245 
    246 	return br.err
    247 }
    248 
    249 func (br *batchResults) earlyError() error {
    250 	return br.err
    251 }
    252 
    253 func (br *batchResults) nextQueryAndArgs() (query string, args []any, ok bool) {
    254 	if br.b != nil && br.qqIdx < len(br.b.queuedQueries) {
    255 		bi := br.b.queuedQueries[br.qqIdx]
    256 		query = bi.query
    257 		args = bi.arguments
    258 		ok = true
    259 		br.qqIdx++
    260 	}
    261 	return
    262 }
    263 
    264 type pipelineBatchResults struct {
    265 	ctx       context.Context
    266 	conn      *Conn
    267 	pipeline  *pgconn.Pipeline
    268 	lastRows  *baseRows
    269 	err       error
    270 	b         *Batch
    271 	qqIdx     int
    272 	closed    bool
    273 	endTraced bool
    274 }
    275 
    276 // Exec reads the results from the next query in the batch as if the query has been sent with Exec.
    277 func (br *pipelineBatchResults) Exec() (pgconn.CommandTag, error) {
    278 	if br.err != nil {
    279 		return pgconn.CommandTag{}, br.err
    280 	}
    281 	if br.closed {
    282 		return pgconn.CommandTag{}, fmt.Errorf("batch already closed")
    283 	}
    284 	if br.lastRows != nil && br.lastRows.err != nil {
    285 		return pgconn.CommandTag{}, br.err
    286 	}
    287 
    288 	query, arguments, _ := br.nextQueryAndArgs()
    289 
    290 	results, err := br.pipeline.GetResults()
    291 	if err != nil {
    292 		br.err = err
    293 		return pgconn.CommandTag{}, br.err
    294 	}
    295 	var commandTag pgconn.CommandTag
    296 	switch results := results.(type) {
    297 	case *pgconn.ResultReader:
    298 		commandTag, br.err = results.Close()
    299 	default:
    300 		return pgconn.CommandTag{}, fmt.Errorf("unexpected pipeline result: %T", results)
    301 	}
    302 
    303 	if br.conn.batchTracer != nil {
    304 		br.conn.batchTracer.TraceBatchQuery(br.ctx, br.conn, TraceBatchQueryData{
    305 			SQL:        query,
    306 			Args:       arguments,
    307 			CommandTag: commandTag,
    308 			Err:        br.err,
    309 		})
    310 	}
    311 
    312 	return commandTag, br.err
    313 }
    314 
    315 // Query reads the results from the next query in the batch as if the query has been sent with Query.
    316 func (br *pipelineBatchResults) Query() (Rows, error) {
    317 	if br.err != nil {
    318 		return &baseRows{err: br.err, closed: true}, br.err
    319 	}
    320 
    321 	if br.closed {
    322 		alreadyClosedErr := fmt.Errorf("batch already closed")
    323 		return &baseRows{err: alreadyClosedErr, closed: true}, alreadyClosedErr
    324 	}
    325 
    326 	if br.lastRows != nil && br.lastRows.err != nil {
    327 		br.err = br.lastRows.err
    328 		return &baseRows{err: br.err, closed: true}, br.err
    329 	}
    330 
    331 	query, arguments, ok := br.nextQueryAndArgs()
    332 	if !ok {
    333 		query = "batch query"
    334 	}
    335 
    336 	rows := br.conn.getRows(br.ctx, query, arguments)
    337 	rows.batchTracer = br.conn.batchTracer
    338 	br.lastRows = rows
    339 
    340 	results, err := br.pipeline.GetResults()
    341 	if err != nil {
    342 		br.err = err
    343 		rows.err = err
    344 		rows.closed = true
    345 
    346 		if br.conn.batchTracer != nil {
    347 			br.conn.batchTracer.TraceBatchQuery(br.ctx, br.conn, TraceBatchQueryData{
    348 				SQL:  query,
    349 				Args: arguments,
    350 				Err:  err,
    351 			})
    352 		}
    353 	} else {
    354 		switch results := results.(type) {
    355 		case *pgconn.ResultReader:
    356 			rows.resultReader = results
    357 		default:
    358 			err = fmt.Errorf("unexpected pipeline result: %T", results)
    359 			br.err = err
    360 			rows.err = err
    361 			rows.closed = true
    362 		}
    363 	}
    364 
    365 	return rows, rows.err
    366 }
    367 
    368 // QueryRow reads the results from the next query in the batch as if the query has been sent with QueryRow.
    369 func (br *pipelineBatchResults) QueryRow() Row {
    370 	rows, _ := br.Query()
    371 	return (*connRow)(rows.(*baseRows))
    372 
    373 }
    374 
    375 // Close closes the batch operation. Any error that occurred during a batch operation may have made it impossible to
    376 // resyncronize the connection with the server. In this case the underlying connection will have been closed.
    377 func (br *pipelineBatchResults) Close() error {
    378 	defer func() {
    379 		if !br.endTraced {
    380 			if br.conn.batchTracer != nil {
    381 				br.conn.batchTracer.TraceBatchEnd(br.ctx, br.conn, TraceBatchEndData{Err: br.err})
    382 			}
    383 			br.endTraced = true
    384 		}
    385 	}()
    386 
    387 	if br.err == nil && br.lastRows != nil && br.lastRows.err != nil {
    388 		br.err = br.lastRows.err
    389 		return br.err
    390 	}
    391 
    392 	if br.closed {
    393 		return br.err
    394 	}
    395 
    396 	// Read and run fn for all remaining items
    397 	for br.err == nil && !br.closed && br.b != nil && br.qqIdx < len(br.b.queuedQueries) {
    398 		if br.b.queuedQueries[br.qqIdx].fn != nil {
    399 			err := br.b.queuedQueries[br.qqIdx].fn(br)
    400 			if err != nil {
    401 				br.err = err
    402 			}
    403 		} else {
    404 			br.Exec()
    405 		}
    406 	}
    407 
    408 	br.closed = true
    409 
    410 	err := br.pipeline.Close()
    411 	if br.err == nil {
    412 		br.err = err
    413 	}
    414 
    415 	return br.err
    416 }
    417 
    418 func (br *pipelineBatchResults) earlyError() error {
    419 	return br.err
    420 }
    421 
    422 func (br *pipelineBatchResults) nextQueryAndArgs() (query string, args []any, ok bool) {
    423 	if br.b != nil && br.qqIdx < len(br.b.queuedQueries) {
    424 		bi := br.b.queuedQueries[br.qqIdx]
    425 		query = bi.query
    426 		args = bi.arguments
    427 		ok = true
    428 		br.qqIdx++
    429 	}
    430 	return
    431 }