gtsocial-umbx

Unnamed repository; edit this file 'description' to name the repository.
Log | Files | Refs | README | LICENSE

bounceback.go (6001B)


      1 package rifs
      2 
      3 import (
      4 	"fmt"
      5 	"io"
      6 
      7 	"github.com/dsoprea/go-logging"
      8 )
      9 
     10 // BouncebackStats describes operation counts.
     11 type BouncebackStats struct {
     12 	reads  int
     13 	writes int
     14 	seeks  int
     15 	syncs  int
     16 }
     17 
     18 func (bbs BouncebackStats) String() string {
     19 	return fmt.Sprintf(
     20 		"BouncebackStats<READS=(%d) WRITES=(%d) SEEKS=(%d) SYNCS=(%d)>",
     21 		bbs.reads, bbs.writes, bbs.seeks, bbs.syncs)
     22 }
     23 
     24 type bouncebackBase struct {
     25 	currentPosition int64
     26 
     27 	stats BouncebackStats
     28 }
     29 
     30 // Position returns the position that we're supposed to be at.
     31 func (bb *bouncebackBase) Position() int64 {
     32 
     33 	// TODO(dustin): Add test
     34 
     35 	return bb.currentPosition
     36 }
     37 
     38 // StatsReads returns the number of reads that have been attempted.
     39 func (bb *bouncebackBase) StatsReads() int {
     40 
     41 	// TODO(dustin): Add test
     42 
     43 	return bb.stats.reads
     44 }
     45 
     46 // StatsWrites returns the number of write operations.
     47 func (bb *bouncebackBase) StatsWrites() int {
     48 
     49 	// TODO(dustin): Add test
     50 
     51 	return bb.stats.writes
     52 }
     53 
     54 // StatsSeeks returns the number of seeks.
     55 func (bb *bouncebackBase) StatsSeeks() int {
     56 
     57 	// TODO(dustin): Add test
     58 
     59 	return bb.stats.seeks
     60 }
     61 
     62 // StatsSyncs returns the number of corrective seeks ("bounce-backs").
     63 func (bb *bouncebackBase) StatsSyncs() int {
     64 
     65 	// TODO(dustin): Add test
     66 
     67 	return bb.stats.syncs
     68 }
     69 
     70 // Seek does a seek to an arbitrary place in the `io.ReadSeeker`.
     71 func (bb *bouncebackBase) seek(s io.Seeker, offset int64, whence int) (newPosition int64, err error) {
     72 	defer func() {
     73 		if state := recover(); state != nil {
     74 			err = log.Wrap(state.(error))
     75 		}
     76 	}()
     77 
     78 	// If the seek is relative, make sure we're where we're supposed to be *first*.
     79 	if whence != io.SeekStart {
     80 		err = bb.checkPosition(s)
     81 		log.PanicIf(err)
     82 	}
     83 
     84 	bb.stats.seeks++
     85 
     86 	newPosition, err = s.Seek(offset, whence)
     87 	log.PanicIf(err)
     88 
     89 	// Update our internal tracking.
     90 	bb.currentPosition = newPosition
     91 
     92 	return newPosition, nil
     93 }
     94 
     95 func (bb *bouncebackBase) checkPosition(s io.Seeker) (err error) {
     96 	defer func() {
     97 		if state := recover(); state != nil {
     98 			err = log.Wrap(state.(error))
     99 		}
    100 	}()
    101 
    102 	// Make sure we're where we're supposed to be.
    103 
    104 	// This should have no overhead, and enables us to collect stats.
    105 	realCurrentPosition, err := s.Seek(0, io.SeekCurrent)
    106 	log.PanicIf(err)
    107 
    108 	if realCurrentPosition != bb.currentPosition {
    109 		bb.stats.syncs++
    110 
    111 		_, err = s.Seek(bb.currentPosition, io.SeekStart)
    112 		log.PanicIf(err)
    113 	}
    114 
    115 	return nil
    116 }
    117 
    118 // BouncebackReader wraps a ReadSeeker, keeps track of our position, and
    119 // seeks back to it before writing. This allows an underlying ReadWriteSeeker
    120 // with an unstable position can still be used for a prolonged series of writes.
    121 type BouncebackReader struct {
    122 	rs io.ReadSeeker
    123 
    124 	bouncebackBase
    125 }
    126 
    127 // NewBouncebackReader returns a `*BouncebackReader` struct.
    128 func NewBouncebackReader(rs io.ReadSeeker) (br *BouncebackReader, err error) {
    129 	defer func() {
    130 		if state := recover(); state != nil {
    131 			err = log.Wrap(state.(error))
    132 		}
    133 	}()
    134 
    135 	initialPosition, err := rs.Seek(0, io.SeekCurrent)
    136 	log.PanicIf(err)
    137 
    138 	bb := bouncebackBase{
    139 		currentPosition: initialPosition,
    140 	}
    141 
    142 	br = &BouncebackReader{
    143 		rs:             rs,
    144 		bouncebackBase: bb,
    145 	}
    146 
    147 	return br, nil
    148 }
    149 
    150 // Seek does a seek to an arbitrary place in the `io.ReadSeeker`.
    151 func (br *BouncebackReader) Seek(offset int64, whence int) (newPosition int64, err error) {
    152 	defer func() {
    153 		if state := recover(); state != nil {
    154 			err = log.Wrap(state.(error))
    155 		}
    156 	}()
    157 
    158 	newPosition, err = br.bouncebackBase.seek(br.rs, offset, whence)
    159 	log.PanicIf(err)
    160 
    161 	return newPosition, nil
    162 }
    163 
    164 // Seek does a standard read.
    165 func (br *BouncebackReader) Read(p []byte) (n int, err error) {
    166 	defer func() {
    167 		if state := recover(); state != nil {
    168 			err = log.Wrap(state.(error))
    169 		}
    170 	}()
    171 
    172 	br.bouncebackBase.stats.reads++
    173 
    174 	err = br.bouncebackBase.checkPosition(br.rs)
    175 	log.PanicIf(err)
    176 
    177 	// Do read.
    178 
    179 	n, err = br.rs.Read(p)
    180 	if err != nil {
    181 		if err == io.EOF {
    182 			return 0, io.EOF
    183 		}
    184 
    185 		log.Panic(err)
    186 	}
    187 
    188 	// Update our internal tracking.
    189 	br.bouncebackBase.currentPosition += int64(n)
    190 
    191 	return n, nil
    192 }
    193 
    194 // BouncebackWriter wraps a WriteSeeker, keeps track of our position, and
    195 // seeks back to it before writing. This allows an underlying ReadWriteSeeker
    196 // with an unstable position can still be used for a prolonged series of writes.
    197 type BouncebackWriter struct {
    198 	ws io.WriteSeeker
    199 
    200 	bouncebackBase
    201 }
    202 
    203 // NewBouncebackWriter returns a new `BouncebackWriter` struct.
    204 func NewBouncebackWriter(ws io.WriteSeeker) (bw *BouncebackWriter, err error) {
    205 	defer func() {
    206 		if state := recover(); state != nil {
    207 			err = log.Wrap(state.(error))
    208 		}
    209 	}()
    210 
    211 	initialPosition, err := ws.Seek(0, io.SeekCurrent)
    212 	log.PanicIf(err)
    213 
    214 	bb := bouncebackBase{
    215 		currentPosition: initialPosition,
    216 	}
    217 
    218 	bw = &BouncebackWriter{
    219 		ws:             ws,
    220 		bouncebackBase: bb,
    221 	}
    222 
    223 	return bw, nil
    224 }
    225 
    226 // Seek puts us at a specific position in the internal writer for the next
    227 // write/seek.
    228 func (bw *BouncebackWriter) Seek(offset int64, whence int) (newPosition int64, err error) {
    229 	defer func() {
    230 		if state := recover(); state != nil {
    231 			err = log.Wrap(state.(error))
    232 		}
    233 	}()
    234 
    235 	newPosition, err = bw.bouncebackBase.seek(bw.ws, offset, whence)
    236 	log.PanicIf(err)
    237 
    238 	return newPosition, nil
    239 }
    240 
    241 // Write performs a write against the internal `WriteSeeker` starting at the
    242 // position that we're supposed to be at.
    243 func (bw *BouncebackWriter) Write(p []byte) (n int, err error) {
    244 	defer func() {
    245 		if state := recover(); state != nil {
    246 			err = log.Wrap(state.(error))
    247 		}
    248 	}()
    249 
    250 	bw.bouncebackBase.stats.writes++
    251 
    252 	// Make sure we're where we're supposed to be.
    253 
    254 	realCurrentPosition, err := bw.ws.Seek(0, io.SeekCurrent)
    255 	log.PanicIf(err)
    256 
    257 	if realCurrentPosition != bw.bouncebackBase.currentPosition {
    258 		bw.bouncebackBase.stats.seeks++
    259 
    260 		_, err = bw.ws.Seek(bw.bouncebackBase.currentPosition, io.SeekStart)
    261 		log.PanicIf(err)
    262 	}
    263 
    264 	// Do write.
    265 
    266 	n, err = bw.ws.Write(p)
    267 	log.PanicIf(err)
    268 
    269 	// Update our internal tracking.
    270 	bw.bouncebackBase.currentPosition += int64(n)
    271 
    272 	return n, nil
    273 }