bounceback.go (6001B)
1 package rifs 2 3 import ( 4 "fmt" 5 "io" 6 7 "github.com/dsoprea/go-logging" 8 ) 9 10 // BouncebackStats describes operation counts. 11 type BouncebackStats struct { 12 reads int 13 writes int 14 seeks int 15 syncs int 16 } 17 18 func (bbs BouncebackStats) String() string { 19 return fmt.Sprintf( 20 "BouncebackStats<READS=(%d) WRITES=(%d) SEEKS=(%d) SYNCS=(%d)>", 21 bbs.reads, bbs.writes, bbs.seeks, bbs.syncs) 22 } 23 24 type bouncebackBase struct { 25 currentPosition int64 26 27 stats BouncebackStats 28 } 29 30 // Position returns the position that we're supposed to be at. 31 func (bb *bouncebackBase) Position() int64 { 32 33 // TODO(dustin): Add test 34 35 return bb.currentPosition 36 } 37 38 // StatsReads returns the number of reads that have been attempted. 39 func (bb *bouncebackBase) StatsReads() int { 40 41 // TODO(dustin): Add test 42 43 return bb.stats.reads 44 } 45 46 // StatsWrites returns the number of write operations. 47 func (bb *bouncebackBase) StatsWrites() int { 48 49 // TODO(dustin): Add test 50 51 return bb.stats.writes 52 } 53 54 // StatsSeeks returns the number of seeks. 55 func (bb *bouncebackBase) StatsSeeks() int { 56 57 // TODO(dustin): Add test 58 59 return bb.stats.seeks 60 } 61 62 // StatsSyncs returns the number of corrective seeks ("bounce-backs"). 63 func (bb *bouncebackBase) StatsSyncs() int { 64 65 // TODO(dustin): Add test 66 67 return bb.stats.syncs 68 } 69 70 // Seek does a seek to an arbitrary place in the `io.ReadSeeker`. 71 func (bb *bouncebackBase) seek(s io.Seeker, offset int64, whence int) (newPosition int64, err error) { 72 defer func() { 73 if state := recover(); state != nil { 74 err = log.Wrap(state.(error)) 75 } 76 }() 77 78 // If the seek is relative, make sure we're where we're supposed to be *first*. 79 if whence != io.SeekStart { 80 err = bb.checkPosition(s) 81 log.PanicIf(err) 82 } 83 84 bb.stats.seeks++ 85 86 newPosition, err = s.Seek(offset, whence) 87 log.PanicIf(err) 88 89 // Update our internal tracking. 90 bb.currentPosition = newPosition 91 92 return newPosition, nil 93 } 94 95 func (bb *bouncebackBase) checkPosition(s io.Seeker) (err error) { 96 defer func() { 97 if state := recover(); state != nil { 98 err = log.Wrap(state.(error)) 99 } 100 }() 101 102 // Make sure we're where we're supposed to be. 103 104 // This should have no overhead, and enables us to collect stats. 105 realCurrentPosition, err := s.Seek(0, io.SeekCurrent) 106 log.PanicIf(err) 107 108 if realCurrentPosition != bb.currentPosition { 109 bb.stats.syncs++ 110 111 _, err = s.Seek(bb.currentPosition, io.SeekStart) 112 log.PanicIf(err) 113 } 114 115 return nil 116 } 117 118 // BouncebackReader wraps a ReadSeeker, keeps track of our position, and 119 // seeks back to it before writing. This allows an underlying ReadWriteSeeker 120 // with an unstable position can still be used for a prolonged series of writes. 121 type BouncebackReader struct { 122 rs io.ReadSeeker 123 124 bouncebackBase 125 } 126 127 // NewBouncebackReader returns a `*BouncebackReader` struct. 128 func NewBouncebackReader(rs io.ReadSeeker) (br *BouncebackReader, err error) { 129 defer func() { 130 if state := recover(); state != nil { 131 err = log.Wrap(state.(error)) 132 } 133 }() 134 135 initialPosition, err := rs.Seek(0, io.SeekCurrent) 136 log.PanicIf(err) 137 138 bb := bouncebackBase{ 139 currentPosition: initialPosition, 140 } 141 142 br = &BouncebackReader{ 143 rs: rs, 144 bouncebackBase: bb, 145 } 146 147 return br, nil 148 } 149 150 // Seek does a seek to an arbitrary place in the `io.ReadSeeker`. 151 func (br *BouncebackReader) Seek(offset int64, whence int) (newPosition int64, err error) { 152 defer func() { 153 if state := recover(); state != nil { 154 err = log.Wrap(state.(error)) 155 } 156 }() 157 158 newPosition, err = br.bouncebackBase.seek(br.rs, offset, whence) 159 log.PanicIf(err) 160 161 return newPosition, nil 162 } 163 164 // Seek does a standard read. 165 func (br *BouncebackReader) Read(p []byte) (n int, err error) { 166 defer func() { 167 if state := recover(); state != nil { 168 err = log.Wrap(state.(error)) 169 } 170 }() 171 172 br.bouncebackBase.stats.reads++ 173 174 err = br.bouncebackBase.checkPosition(br.rs) 175 log.PanicIf(err) 176 177 // Do read. 178 179 n, err = br.rs.Read(p) 180 if err != nil { 181 if err == io.EOF { 182 return 0, io.EOF 183 } 184 185 log.Panic(err) 186 } 187 188 // Update our internal tracking. 189 br.bouncebackBase.currentPosition += int64(n) 190 191 return n, nil 192 } 193 194 // BouncebackWriter wraps a WriteSeeker, keeps track of our position, and 195 // seeks back to it before writing. This allows an underlying ReadWriteSeeker 196 // with an unstable position can still be used for a prolonged series of writes. 197 type BouncebackWriter struct { 198 ws io.WriteSeeker 199 200 bouncebackBase 201 } 202 203 // NewBouncebackWriter returns a new `BouncebackWriter` struct. 204 func NewBouncebackWriter(ws io.WriteSeeker) (bw *BouncebackWriter, err error) { 205 defer func() { 206 if state := recover(); state != nil { 207 err = log.Wrap(state.(error)) 208 } 209 }() 210 211 initialPosition, err := ws.Seek(0, io.SeekCurrent) 212 log.PanicIf(err) 213 214 bb := bouncebackBase{ 215 currentPosition: initialPosition, 216 } 217 218 bw = &BouncebackWriter{ 219 ws: ws, 220 bouncebackBase: bb, 221 } 222 223 return bw, nil 224 } 225 226 // Seek puts us at a specific position in the internal writer for the next 227 // write/seek. 228 func (bw *BouncebackWriter) Seek(offset int64, whence int) (newPosition int64, err error) { 229 defer func() { 230 if state := recover(); state != nil { 231 err = log.Wrap(state.(error)) 232 } 233 }() 234 235 newPosition, err = bw.bouncebackBase.seek(bw.ws, offset, whence) 236 log.PanicIf(err) 237 238 return newPosition, nil 239 } 240 241 // Write performs a write against the internal `WriteSeeker` starting at the 242 // position that we're supposed to be at. 243 func (bw *BouncebackWriter) Write(p []byte) (n int, err error) { 244 defer func() { 245 if state := recover(); state != nil { 246 err = log.Wrap(state.(error)) 247 } 248 }() 249 250 bw.bouncebackBase.stats.writes++ 251 252 // Make sure we're where we're supposed to be. 253 254 realCurrentPosition, err := bw.ws.Seek(0, io.SeekCurrent) 255 log.PanicIf(err) 256 257 if realCurrentPosition != bw.bouncebackBase.currentPosition { 258 bw.bouncebackBase.stats.seeks++ 259 260 _, err = bw.ws.Seek(bw.bouncebackBase.currentPosition, io.SeekStart) 261 log.PanicIf(err) 262 } 263 264 // Do write. 265 266 n, err = bw.ws.Write(p) 267 log.PanicIf(err) 268 269 // Update our internal tracking. 270 bw.bouncebackBase.currentPosition += int64(n) 271 272 return n, nil 273 }