index.go (15437B)
1 // Copyright (c) 2022+ Klaus Post. All rights reserved. 2 // Use of this source code is governed by a BSD-style 3 // license that can be found in the LICENSE file. 4 5 package s2 6 7 import ( 8 "bytes" 9 "encoding/binary" 10 "encoding/json" 11 "fmt" 12 "io" 13 "sort" 14 ) 15 16 const ( 17 S2IndexHeader = "s2idx\x00" 18 S2IndexTrailer = "\x00xdi2s" 19 maxIndexEntries = 1 << 16 20 ) 21 22 // Index represents an S2/Snappy index. 23 type Index struct { 24 TotalUncompressed int64 // Total Uncompressed size if known. Will be -1 if unknown. 25 TotalCompressed int64 // Total Compressed size if known. Will be -1 if unknown. 26 info []struct { 27 compressedOffset int64 28 uncompressedOffset int64 29 } 30 estBlockUncomp int64 31 } 32 33 func (i *Index) reset(maxBlock int) { 34 i.estBlockUncomp = int64(maxBlock) 35 i.TotalCompressed = -1 36 i.TotalUncompressed = -1 37 if len(i.info) > 0 { 38 i.info = i.info[:0] 39 } 40 } 41 42 // allocInfos will allocate an empty slice of infos. 43 func (i *Index) allocInfos(n int) { 44 if n > maxIndexEntries { 45 panic("n > maxIndexEntries") 46 } 47 i.info = make([]struct { 48 compressedOffset int64 49 uncompressedOffset int64 50 }, 0, n) 51 } 52 53 // add an uncompressed and compressed pair. 54 // Entries must be sent in order. 55 func (i *Index) add(compressedOffset, uncompressedOffset int64) error { 56 if i == nil { 57 return nil 58 } 59 lastIdx := len(i.info) - 1 60 if lastIdx >= 0 { 61 latest := i.info[lastIdx] 62 if latest.uncompressedOffset == uncompressedOffset { 63 // Uncompressed didn't change, don't add entry, 64 // but update start index. 65 latest.compressedOffset = compressedOffset 66 i.info[lastIdx] = latest 67 return nil 68 } 69 if latest.uncompressedOffset > uncompressedOffset { 70 return fmt.Errorf("internal error: Earlier uncompressed received (%d > %d)", latest.uncompressedOffset, uncompressedOffset) 71 } 72 if latest.compressedOffset > compressedOffset { 73 return fmt.Errorf("internal error: Earlier compressed received (%d > %d)", latest.uncompressedOffset, uncompressedOffset) 74 } 75 } 76 i.info = append(i.info, struct { 77 compressedOffset int64 78 uncompressedOffset int64 79 }{compressedOffset: compressedOffset, uncompressedOffset: uncompressedOffset}) 80 return nil 81 } 82 83 // Find the offset at or before the wanted (uncompressed) offset. 84 // If offset is 0 or positive it is the offset from the beginning of the file. 85 // If the uncompressed size is known, the offset must be within the file. 86 // If an offset outside the file is requested io.ErrUnexpectedEOF is returned. 87 // If the offset is negative, it is interpreted as the distance from the end of the file, 88 // where -1 represents the last byte. 89 // If offset from the end of the file is requested, but size is unknown, 90 // ErrUnsupported will be returned. 91 func (i *Index) Find(offset int64) (compressedOff, uncompressedOff int64, err error) { 92 if i.TotalUncompressed < 0 { 93 return 0, 0, ErrCorrupt 94 } 95 if offset < 0 { 96 offset = i.TotalUncompressed + offset 97 if offset < 0 { 98 return 0, 0, io.ErrUnexpectedEOF 99 } 100 } 101 if offset > i.TotalUncompressed { 102 return 0, 0, io.ErrUnexpectedEOF 103 } 104 if len(i.info) > 200 { 105 n := sort.Search(len(i.info), func(n int) bool { 106 return i.info[n].uncompressedOffset > offset 107 }) 108 if n == 0 { 109 n = 1 110 } 111 return i.info[n-1].compressedOffset, i.info[n-1].uncompressedOffset, nil 112 } 113 for _, info := range i.info { 114 if info.uncompressedOffset > offset { 115 break 116 } 117 compressedOff = info.compressedOffset 118 uncompressedOff = info.uncompressedOffset 119 } 120 return compressedOff, uncompressedOff, nil 121 } 122 123 // reduce to stay below maxIndexEntries 124 func (i *Index) reduce() { 125 if len(i.info) < maxIndexEntries && i.estBlockUncomp >= 1<<20 { 126 return 127 } 128 129 // Algorithm, keep 1, remove removeN entries... 130 removeN := (len(i.info) + 1) / maxIndexEntries 131 src := i.info 132 j := 0 133 134 // Each block should be at least 1MB, but don't reduce below 1000 entries. 135 for i.estBlockUncomp*(int64(removeN)+1) < 1<<20 && len(i.info)/(removeN+1) > 1000 { 136 removeN++ 137 } 138 for idx := 0; idx < len(src); idx++ { 139 i.info[j] = src[idx] 140 j++ 141 idx += removeN 142 } 143 i.info = i.info[:j] 144 // Update maxblock estimate. 145 i.estBlockUncomp += i.estBlockUncomp * int64(removeN) 146 } 147 148 func (i *Index) appendTo(b []byte, uncompTotal, compTotal int64) []byte { 149 i.reduce() 150 var tmp [binary.MaxVarintLen64]byte 151 152 initSize := len(b) 153 // We make the start a skippable header+size. 154 b = append(b, ChunkTypeIndex, 0, 0, 0) 155 b = append(b, []byte(S2IndexHeader)...) 156 // Total Uncompressed size 157 n := binary.PutVarint(tmp[:], uncompTotal) 158 b = append(b, tmp[:n]...) 159 // Total Compressed size 160 n = binary.PutVarint(tmp[:], compTotal) 161 b = append(b, tmp[:n]...) 162 // Put EstBlockUncomp size 163 n = binary.PutVarint(tmp[:], i.estBlockUncomp) 164 b = append(b, tmp[:n]...) 165 // Put length 166 n = binary.PutVarint(tmp[:], int64(len(i.info))) 167 b = append(b, tmp[:n]...) 168 169 // Check if we should add uncompressed offsets 170 var hasUncompressed byte 171 for idx, info := range i.info { 172 if idx == 0 { 173 if info.uncompressedOffset != 0 { 174 hasUncompressed = 1 175 break 176 } 177 continue 178 } 179 if info.uncompressedOffset != i.info[idx-1].uncompressedOffset+i.estBlockUncomp { 180 hasUncompressed = 1 181 break 182 } 183 } 184 b = append(b, hasUncompressed) 185 186 // Add each entry 187 if hasUncompressed == 1 { 188 for idx, info := range i.info { 189 uOff := info.uncompressedOffset 190 if idx > 0 { 191 prev := i.info[idx-1] 192 uOff -= prev.uncompressedOffset + (i.estBlockUncomp) 193 } 194 n = binary.PutVarint(tmp[:], uOff) 195 b = append(b, tmp[:n]...) 196 } 197 } 198 199 // Initial compressed size estimate. 200 cPredict := i.estBlockUncomp / 2 201 202 for idx, info := range i.info { 203 cOff := info.compressedOffset 204 if idx > 0 { 205 prev := i.info[idx-1] 206 cOff -= prev.compressedOffset + cPredict 207 // Update compressed size prediction, with half the error. 208 cPredict += cOff / 2 209 } 210 n = binary.PutVarint(tmp[:], cOff) 211 b = append(b, tmp[:n]...) 212 } 213 214 // Add Total Size. 215 // Stored as fixed size for easier reading. 216 binary.LittleEndian.PutUint32(tmp[:], uint32(len(b)-initSize+4+len(S2IndexTrailer))) 217 b = append(b, tmp[:4]...) 218 // Trailer 219 b = append(b, []byte(S2IndexTrailer)...) 220 221 // Update size 222 chunkLen := len(b) - initSize - skippableFrameHeader 223 b[initSize+1] = uint8(chunkLen >> 0) 224 b[initSize+2] = uint8(chunkLen >> 8) 225 b[initSize+3] = uint8(chunkLen >> 16) 226 //fmt.Printf("chunklen: 0x%x Uncomp:%d, Comp:%d\n", chunkLen, uncompTotal, compTotal) 227 return b 228 } 229 230 // Load a binary index. 231 // A zero value Index can be used or a previous one can be reused. 232 func (i *Index) Load(b []byte) ([]byte, error) { 233 if len(b) <= 4+len(S2IndexHeader)+len(S2IndexTrailer) { 234 return b, io.ErrUnexpectedEOF 235 } 236 if b[0] != ChunkTypeIndex { 237 return b, ErrCorrupt 238 } 239 chunkLen := int(b[1]) | int(b[2])<<8 | int(b[3])<<16 240 b = b[4:] 241 242 // Validate we have enough... 243 if len(b) < chunkLen { 244 return b, io.ErrUnexpectedEOF 245 } 246 if !bytes.Equal(b[:len(S2IndexHeader)], []byte(S2IndexHeader)) { 247 return b, ErrUnsupported 248 } 249 b = b[len(S2IndexHeader):] 250 251 // Total Uncompressed 252 if v, n := binary.Varint(b); n <= 0 || v < 0 { 253 return b, ErrCorrupt 254 } else { 255 i.TotalUncompressed = v 256 b = b[n:] 257 } 258 259 // Total Compressed 260 if v, n := binary.Varint(b); n <= 0 { 261 return b, ErrCorrupt 262 } else { 263 i.TotalCompressed = v 264 b = b[n:] 265 } 266 267 // Read EstBlockUncomp 268 if v, n := binary.Varint(b); n <= 0 { 269 return b, ErrCorrupt 270 } else { 271 if v < 0 { 272 return b, ErrCorrupt 273 } 274 i.estBlockUncomp = v 275 b = b[n:] 276 } 277 278 var entries int 279 if v, n := binary.Varint(b); n <= 0 { 280 return b, ErrCorrupt 281 } else { 282 if v < 0 || v > maxIndexEntries { 283 return b, ErrCorrupt 284 } 285 entries = int(v) 286 b = b[n:] 287 } 288 if cap(i.info) < entries { 289 i.allocInfos(entries) 290 } 291 i.info = i.info[:entries] 292 293 if len(b) < 1 { 294 return b, io.ErrUnexpectedEOF 295 } 296 hasUncompressed := b[0] 297 b = b[1:] 298 if hasUncompressed&1 != hasUncompressed { 299 return b, ErrCorrupt 300 } 301 302 // Add each uncompressed entry 303 for idx := range i.info { 304 var uOff int64 305 if hasUncompressed != 0 { 306 // Load delta 307 if v, n := binary.Varint(b); n <= 0 { 308 return b, ErrCorrupt 309 } else { 310 uOff = v 311 b = b[n:] 312 } 313 } 314 315 if idx > 0 { 316 prev := i.info[idx-1].uncompressedOffset 317 uOff += prev + (i.estBlockUncomp) 318 if uOff <= prev { 319 return b, ErrCorrupt 320 } 321 } 322 if uOff < 0 { 323 return b, ErrCorrupt 324 } 325 i.info[idx].uncompressedOffset = uOff 326 } 327 328 // Initial compressed size estimate. 329 cPredict := i.estBlockUncomp / 2 330 331 // Add each compressed entry 332 for idx := range i.info { 333 var cOff int64 334 if v, n := binary.Varint(b); n <= 0 { 335 return b, ErrCorrupt 336 } else { 337 cOff = v 338 b = b[n:] 339 } 340 341 if idx > 0 { 342 // Update compressed size prediction, with half the error. 343 cPredictNew := cPredict + cOff/2 344 345 prev := i.info[idx-1].compressedOffset 346 cOff += prev + cPredict 347 if cOff <= prev { 348 return b, ErrCorrupt 349 } 350 cPredict = cPredictNew 351 } 352 if cOff < 0 { 353 return b, ErrCorrupt 354 } 355 i.info[idx].compressedOffset = cOff 356 } 357 if len(b) < 4+len(S2IndexTrailer) { 358 return b, io.ErrUnexpectedEOF 359 } 360 // Skip size... 361 b = b[4:] 362 363 // Check trailer... 364 if !bytes.Equal(b[:len(S2IndexTrailer)], []byte(S2IndexTrailer)) { 365 return b, ErrCorrupt 366 } 367 return b[len(S2IndexTrailer):], nil 368 } 369 370 // LoadStream will load an index from the end of the supplied stream. 371 // ErrUnsupported will be returned if the signature cannot be found. 372 // ErrCorrupt will be returned if unexpected values are found. 373 // io.ErrUnexpectedEOF is returned if there are too few bytes. 374 // IO errors are returned as-is. 375 func (i *Index) LoadStream(rs io.ReadSeeker) error { 376 // Go to end. 377 _, err := rs.Seek(-10, io.SeekEnd) 378 if err != nil { 379 return err 380 } 381 var tmp [10]byte 382 _, err = io.ReadFull(rs, tmp[:]) 383 if err != nil { 384 return err 385 } 386 // Check trailer... 387 if !bytes.Equal(tmp[4:4+len(S2IndexTrailer)], []byte(S2IndexTrailer)) { 388 return ErrUnsupported 389 } 390 sz := binary.LittleEndian.Uint32(tmp[:4]) 391 if sz > maxChunkSize+skippableFrameHeader { 392 return ErrCorrupt 393 } 394 _, err = rs.Seek(-int64(sz), io.SeekEnd) 395 if err != nil { 396 return err 397 } 398 399 // Read index. 400 buf := make([]byte, sz) 401 _, err = io.ReadFull(rs, buf) 402 if err != nil { 403 return err 404 } 405 _, err = i.Load(buf) 406 return err 407 } 408 409 // IndexStream will return an index for a stream. 410 // The stream structure will be checked, but 411 // data within blocks is not verified. 412 // The returned index can either be appended to the end of the stream 413 // or stored separately. 414 func IndexStream(r io.Reader) ([]byte, error) { 415 var i Index 416 var buf [maxChunkSize]byte 417 var readHeader bool 418 for { 419 _, err := io.ReadFull(r, buf[:4]) 420 if err != nil { 421 if err == io.EOF { 422 return i.appendTo(nil, i.TotalUncompressed, i.TotalCompressed), nil 423 } 424 return nil, err 425 } 426 // Start of this chunk. 427 startChunk := i.TotalCompressed 428 i.TotalCompressed += 4 429 430 chunkType := buf[0] 431 if !readHeader { 432 if chunkType != chunkTypeStreamIdentifier { 433 return nil, ErrCorrupt 434 } 435 readHeader = true 436 } 437 chunkLen := int(buf[1]) | int(buf[2])<<8 | int(buf[3])<<16 438 if chunkLen < checksumSize { 439 return nil, ErrCorrupt 440 } 441 442 i.TotalCompressed += int64(chunkLen) 443 _, err = io.ReadFull(r, buf[:chunkLen]) 444 if err != nil { 445 return nil, io.ErrUnexpectedEOF 446 } 447 // The chunk types are specified at 448 // https://github.com/google/snappy/blob/master/framing_format.txt 449 switch chunkType { 450 case chunkTypeCompressedData: 451 // Section 4.2. Compressed data (chunk type 0x00). 452 // Skip checksum. 453 dLen, err := DecodedLen(buf[checksumSize:]) 454 if err != nil { 455 return nil, err 456 } 457 if dLen > maxBlockSize { 458 return nil, ErrCorrupt 459 } 460 if i.estBlockUncomp == 0 { 461 // Use first block for estimate... 462 i.estBlockUncomp = int64(dLen) 463 } 464 err = i.add(startChunk, i.TotalUncompressed) 465 if err != nil { 466 return nil, err 467 } 468 i.TotalUncompressed += int64(dLen) 469 continue 470 case chunkTypeUncompressedData: 471 n2 := chunkLen - checksumSize 472 if n2 > maxBlockSize { 473 return nil, ErrCorrupt 474 } 475 if i.estBlockUncomp == 0 { 476 // Use first block for estimate... 477 i.estBlockUncomp = int64(n2) 478 } 479 err = i.add(startChunk, i.TotalUncompressed) 480 if err != nil { 481 return nil, err 482 } 483 i.TotalUncompressed += int64(n2) 484 continue 485 case chunkTypeStreamIdentifier: 486 // Section 4.1. Stream identifier (chunk type 0xff). 487 if chunkLen != len(magicBody) { 488 return nil, ErrCorrupt 489 } 490 491 if string(buf[:len(magicBody)]) != magicBody { 492 if string(buf[:len(magicBody)]) != magicBodySnappy { 493 return nil, ErrCorrupt 494 } 495 } 496 497 continue 498 } 499 500 if chunkType <= 0x7f { 501 // Section 4.5. Reserved unskippable chunks (chunk types 0x02-0x7f). 502 return nil, ErrUnsupported 503 } 504 if chunkLen > maxChunkSize { 505 return nil, ErrUnsupported 506 } 507 // Section 4.4 Padding (chunk type 0xfe). 508 // Section 4.6. Reserved skippable chunks (chunk types 0x80-0xfd). 509 } 510 } 511 512 // JSON returns the index as JSON text. 513 func (i *Index) JSON() []byte { 514 x := struct { 515 TotalUncompressed int64 `json:"total_uncompressed"` // Total Uncompressed size if known. Will be -1 if unknown. 516 TotalCompressed int64 `json:"total_compressed"` // Total Compressed size if known. Will be -1 if unknown. 517 Offsets []struct { 518 CompressedOffset int64 `json:"compressed"` 519 UncompressedOffset int64 `json:"uncompressed"` 520 } `json:"offsets"` 521 EstBlockUncomp int64 `json:"est_block_uncompressed"` 522 }{ 523 TotalUncompressed: i.TotalUncompressed, 524 TotalCompressed: i.TotalCompressed, 525 EstBlockUncomp: i.estBlockUncomp, 526 } 527 for _, v := range i.info { 528 x.Offsets = append(x.Offsets, struct { 529 CompressedOffset int64 `json:"compressed"` 530 UncompressedOffset int64 `json:"uncompressed"` 531 }{CompressedOffset: v.compressedOffset, UncompressedOffset: v.uncompressedOffset}) 532 } 533 b, _ := json.MarshalIndent(x, "", " ") 534 return b 535 } 536 537 // RemoveIndexHeaders will trim all headers and trailers from a given index. 538 // This is expected to save 20 bytes. 539 // These can be restored using RestoreIndexHeaders. 540 // This removes a layer of security, but is the most compact representation. 541 // Returns nil if headers contains errors. 542 // The returned slice references the provided slice. 543 func RemoveIndexHeaders(b []byte) []byte { 544 const save = 4 + len(S2IndexHeader) + len(S2IndexTrailer) + 4 545 if len(b) <= save { 546 return nil 547 } 548 if b[0] != ChunkTypeIndex { 549 return nil 550 } 551 chunkLen := int(b[1]) | int(b[2])<<8 | int(b[3])<<16 552 b = b[4:] 553 554 // Validate we have enough... 555 if len(b) < chunkLen { 556 return nil 557 } 558 b = b[:chunkLen] 559 560 if !bytes.Equal(b[:len(S2IndexHeader)], []byte(S2IndexHeader)) { 561 return nil 562 } 563 b = b[len(S2IndexHeader):] 564 if !bytes.HasSuffix(b, []byte(S2IndexTrailer)) { 565 return nil 566 } 567 b = bytes.TrimSuffix(b, []byte(S2IndexTrailer)) 568 569 if len(b) < 4 { 570 return nil 571 } 572 return b[:len(b)-4] 573 } 574 575 // RestoreIndexHeaders will index restore headers removed by RemoveIndexHeaders. 576 // No error checking is performed on the input. 577 // If a 0 length slice is sent, it is returned without modification. 578 func RestoreIndexHeaders(in []byte) []byte { 579 if len(in) == 0 { 580 return in 581 } 582 b := make([]byte, 0, 4+len(S2IndexHeader)+len(in)+len(S2IndexTrailer)+4) 583 b = append(b, ChunkTypeIndex, 0, 0, 0) 584 b = append(b, []byte(S2IndexHeader)...) 585 b = append(b, in...) 586 587 var tmp [4]byte 588 binary.LittleEndian.PutUint32(tmp[:], uint32(len(b)+4+len(S2IndexTrailer))) 589 b = append(b, tmp[:4]...) 590 // Trailer 591 b = append(b, []byte(S2IndexTrailer)...) 592 593 chunkLen := len(b) - skippableFrameHeader 594 b[1] = uint8(chunkLen >> 0) 595 b[2] = uint8(chunkLen >> 8) 596 b[3] = uint8(chunkLen >> 16) 597 return b 598 }