4
0
mirror of https://github.com/cwinfo/matterbridge.git synced 2025-06-27 09:59:24 +00:00

Bump github.com/SevereCloud/vksdk/v2 from 2.11.0 to 2.13.0 (#1698)

Bumps [github.com/SevereCloud/vksdk/v2](https://github.com/SevereCloud/vksdk) from 2.11.0 to 2.13.0.
- [Release notes](https://github.com/SevereCloud/vksdk/releases)
- [Commits](https://github.com/SevereCloud/vksdk/compare/v2.11.0...v2.13.0)

---
updated-dependencies:
- dependency-name: github.com/SevereCloud/vksdk/v2
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>

Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
This commit is contained in:
dependabot[bot]
2022-01-28 23:48:40 +01:00
committed by GitHub
parent ac06a26809
commit 5a1fd7dadd
111 changed files with 21525 additions and 264 deletions

View File

@ -20,6 +20,7 @@ This is important, so you don't have to worry about spending CPU cycles on alrea
* Concurrent stream compression
* Faster decompression, even for Snappy compatible content
* Ability to quickly skip forward in compressed stream
* Random seeking with indexes
* Compatible with reading Snappy compressed content
* Smaller block size overhead on incompressible blocks
* Block concatenation
@ -29,8 +30,8 @@ This is important, so you don't have to worry about spending CPU cycles on alrea
## Drawbacks over Snappy
* Not optimized for 32 bit systems.
* Streams use slightly more memory due to larger blocks and concurrency (configurable).
* Not optimized for 32 bit systems
* Streams use slightly more memory due to larger blocks and concurrency (configurable)
# Usage
@ -141,7 +142,7 @@ Binaries can be downloaded on the [Releases Page](https://github.com/klauspost/c
Installing then requires Go to be installed. To install them, use:
`go install github.com/klauspost/compress/s2/cmd/s2c && go install github.com/klauspost/compress/s2/cmd/s2d`
`go install github.com/klauspost/compress/s2/cmd/s2c@latest && go install github.com/klauspost/compress/s2/cmd/s2d@latest`
To build binaries to the current folder use:
@ -176,6 +177,8 @@ Options:
Compress faster, but with a minor compression loss
-help
Display help
-index
Add seek index (default true)
-o string
Write output to another file. Single input file only
-pad string
@ -217,11 +220,15 @@ Options:
Display help
-o string
Write output to another file. Single input file only
-q Don't write any output to terminal, except errors
-offset string
Start at offset. Examples: 92, 64K, 256K, 1M, 4M. Requires Index
-q Don't write any output to terminal, except errors
-rm
Delete source file(s) after successful decompression
Delete source file(s) after successful decompression
-safe
Do not overwrite output files
Do not overwrite output files
-tail string
Return last of compressed file. Examples: 92, 64K, 256K, 1M, 4M. Requires Index
-verify
Verify files, but do not write output
```
@ -633,12 +640,12 @@ Compression and speed is typically a bit better `MaxEncodedLen` is also smaller
Comparison of [`webdevdata.org-2015-01-07-subset`](https://files.klauspost.com/compress/webdevdata.org-2015-01-07-4GB-subset.7z),
53927 files, total input size: 4,014,735,833 bytes. amd64, single goroutine used:
| Encoder | Size | MB/s | Reduction |
|-----------------------|------------|--------|------------
| snappy.Encode | 1128706759 | 725.59 | 71.89% |
| s2.EncodeSnappy | 1093823291 | 899.16 | 72.75% |
| s2.EncodeSnappyBetter | 1001158548 | 578.49 | 75.06% |
| s2.EncodeSnappyBest | 944507998 | 66.00 | 76.47% |
| Encoder | Size | MB/s | Reduction |
|-----------------------|------------|------------|------------
| snappy.Encode | 1128706759 | 725.59 | 71.89% |
| s2.EncodeSnappy | 1093823291 | **899.16** | 72.75% |
| s2.EncodeSnappyBetter | 1001158548 | 578.49 | 75.06% |
| s2.EncodeSnappyBest | 944507998 | 66.00 | **76.47%**|
## Streams
@ -649,11 +656,11 @@ Comparison of different streams, AMD Ryzen 3950x, 16 cores. Size and throughput:
| File | snappy.NewWriter | S2 Snappy | S2 Snappy, Better | S2 Snappy, Best |
|-----------------------------|--------------------------|---------------------------|--------------------------|-------------------------|
| nyc-taxi-data-10M.csv | 1316042016 - 517.54MB/s | 1307003093 - 8406.29MB/s | 1174534014 - 4984.35MB/s | 1115904679 - 177.81MB/s |
| enwik10 | 5088294643 - 433.45MB/s | 5175840939 - 8454.52MB/s | 4560784526 - 4403.10MB/s | 4340299103 - 159.71MB/s |
| 10gb.tar | 6056946612 - 703.25MB/s | 6208571995 - 9035.75MB/s | 5741646126 - 2402.08MB/s | 5548973895 - 171.17MB/s |
| github-june-2days-2019.json | 1525176492 - 908.11MB/s | 1476519054 - 12625.93MB/s | 1400547532 - 6163.61MB/s | 1321887137 - 200.71MB/s |
| consensus.db.10gb | 5412897703 - 1054.38MB/s | 5354073487 - 12634.82MB/s | 5335069899 - 2472.23MB/s | 5201000954 - 166.32MB/s |
| nyc-taxi-data-10M.csv | 1316042016 - 539.47MB/s | 1307003093 - 10132.73MB/s | 1174534014 - 5002.44MB/s | 1115904679 - 177.97MB/s |
| enwik10 (xml) | 5088294643 - 451.13MB/s | 5175840939 - 9440.69MB/s | 4560784526 - 4487.21MB/s | 4340299103 - 158.92MB/s |
| 10gb.tar (mixed) | 6056946612 - 729.73MB/s | 6208571995 - 9978.05MB/s | 5741646126 - 4919.98MB/s | 5548973895 - 180.44MB/s |
| github-june-2days-2019.json | 1525176492 - 933.00MB/s | 1476519054 - 13150.12MB/s | 1400547532 - 5803.40MB/s | 1321887137 - 204.29MB/s |
| consensus.db.10gb (db) | 5412897703 - 1102.14MB/s | 5354073487 - 13562.91MB/s | 5335069899 - 5294.73MB/s | 5201000954 - 175.72MB/s |
# Decompression
@ -679,7 +686,220 @@ The 10 byte 'stream identifier' of the second stream can optionally be stripped,
Blocks can be concatenated using the `ConcatBlocks` function.
Snappy blocks/streams can safely be concatenated with S2 blocks and streams.
Snappy blocks/streams can safely be concatenated with S2 blocks and streams.
Streams with indexes (see below) will currently not work on concatenated streams.
# Stream Seek Index
S2 and Snappy streams can have indexes. These indexes will allow random seeking within the compressed data.
The index can either be appended to the stream as a skippable block or returned for separate storage.
When the index is appended to a stream it will be skipped by regular decoders,
so the output remains compatible with other decoders.
## Creating an Index
To automatically add an index to a stream, add `WriterAddIndex()` option to your writer.
Then the index will be added to the stream when `Close()` is called.
```
// Add Index to stream...
enc := s2.NewWriter(w, s2.WriterAddIndex())
io.Copy(enc, r)
enc.Close()
```
If you want to store the index separately, you can use `CloseIndex()` instead of the regular `Close()`.
This will return the index. Note that `CloseIndex()` should only be called once, and you shouldn't call `Close()`.
```
// Get index for separate storage...
enc := s2.NewWriter(w)
io.Copy(enc, r)
index, err := enc.CloseIndex()
```
The `index` can then be used needing to read from the stream.
This means the index can be used without needing to seek to the end of the stream
or for manually forwarding streams. See below.
Finally, an existing S2/Snappy stream can be indexed using the `s2.IndexStream(r io.Reader)` function.
## Using Indexes
To use indexes there is a `ReadSeeker(random bool, index []byte) (*ReadSeeker, error)` function available.
Calling ReadSeeker will return an [io.ReadSeeker](https://pkg.go.dev/io#ReadSeeker) compatible version of the reader.
If 'random' is specified the returned io.Seeker can be used for random seeking, otherwise only forward seeking is supported.
Enabling random seeking requires the original input to support the [io.Seeker](https://pkg.go.dev/io#Seeker) interface.
```
dec := s2.NewReader(r)
rs, err := dec.ReadSeeker(false, nil)
rs.Seek(wantOffset, io.SeekStart)
```
Get a seeker to seek forward. Since no index is provided, the index is read from the stream.
This requires that an index was added and that `r` supports the [io.Seeker](https://pkg.go.dev/io#Seeker) interface.
A custom index can be specified which will be used if supplied.
When using a custom index, it will not be read from the input stream.
```
dec := s2.NewReader(r)
rs, err := dec.ReadSeeker(false, index)
rs.Seek(wantOffset, io.SeekStart)
```
This will read the index from `index`. Since we specify non-random (forward only) seeking `r` does not have to be an io.Seeker
```
dec := s2.NewReader(r)
rs, err := dec.ReadSeeker(true, index)
rs.Seek(wantOffset, io.SeekStart)
```
Finally, since we specify that we want to do random seeking `r` must be an io.Seeker.
The returned [ReadSeeker](https://pkg.go.dev/github.com/klauspost/compress/s2#ReadSeeker) contains a shallow reference to the existing Reader,
meaning changes performed to one is reflected in the other.
To check if a stream contains an index at the end, the `(*Index).LoadStream(rs io.ReadSeeker) error` can be used.
## Manually Forwarding Streams
Indexes can also be read outside the decoder using the [Index](https://pkg.go.dev/github.com/klauspost/compress/s2#Index) type.
This can be used for parsing indexes, either separate or in streams.
In some cases it may not be possible to serve a seekable stream.
This can for instance be an HTTP stream, where the Range request
is sent at the start of the stream.
With a little bit of extra code it is still possible to use indexes
to forward to specific offset with a single forward skip.
It is possible to load the index manually like this:
```
var index s2.Index
_, err = index.Load(idxBytes)
```
This can be used to figure out how much to offset the compressed stream:
```
compressedOffset, uncompressedOffset, err := index.Find(wantOffset)
```
The `compressedOffset` is the number of bytes that should be skipped
from the beginning of the compressed file.
The `uncompressedOffset` will then be offset of the uncompressed bytes returned
when decoding from that position. This will always be <= wantOffset.
When creating a decoder it must be specified that it should *not* expect a stream identifier
at the beginning of the stream. Assuming the io.Reader `r` has been forwarded to `compressedOffset`
we create the decoder like this:
```
dec := s2.NewReader(r, s2.ReaderIgnoreStreamIdentifier())
```
We are not completely done. We still need to forward the stream the uncompressed bytes we didn't want.
This is done using the regular "Skip" function:
```
err = dec.Skip(wantOffset - uncompressedOffset)
```
This will ensure that we are at exactly the offset we want, and reading from `dec` will start at the requested offset.
## Index Format:
Each block is structured as a snappy skippable block, with the chunk ID 0x99.
The block can be read from the front, but contains information so it can be read from the back as well.
Numbers are stored as fixed size little endian values or [zigzag encoded](https://developers.google.com/protocol-buffers/docs/encoding#signed_integers) [base 128 varints](https://developers.google.com/protocol-buffers/docs/encoding),
with un-encoded value length of 64 bits, unless other limits are specified.
| Content | Format |
|---------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------|
| ID, `[1]byte` | Always 0x99. |
| Data Length, `[3]byte` | 3 byte little-endian length of the chunk in bytes, following this. |
| Header `[6]byte` | Header, must be `[115, 50, 105, 100, 120, 0]` or in text: "s2idx\x00". |
| UncompressedSize, Varint | Total Uncompressed size. |
| CompressedSize, Varint | Total Compressed size if known. Should be -1 if unknown. |
| EstBlockSize, Varint | Block Size, used for guessing uncompressed offsets. Must be >= 0. |
| Entries, Varint | Number of Entries in index, must be < 65536 and >=0. |
| HasUncompressedOffsets `byte` | 0 if no uncompressed offsets are present, 1 if present. Other values are invalid. |
| UncompressedOffsets, [Entries]VarInt | Uncompressed offsets. See below how to decode. |
| CompressedOffsets, [Entries]VarInt | Compressed offsets. See below how to decode. |
| Block Size, `[4]byte` | Little Endian total encoded size (including header and trailer). Can be used for searching backwards to start of block. |
| Trailer `[6]byte` | Trailer, must be `[0, 120, 100, 105, 50, 115]` or in text: "\x00xdi2s". Can be used for identifying block from end of stream. |
For regular streams the uncompressed offsets are fully predictable,
so `HasUncompressedOffsets` allows to specify that compressed blocks all have
exactly `EstBlockSize` bytes of uncompressed content.
Entries *must* be in order, starting with the lowest offset,
and there *must* be no uncompressed offset duplicates.
Entries *may* point to the start of a skippable block,
but it is then not allowed to also have an entry for the next block since
that would give an uncompressed offset duplicate.
There is no requirement for all blocks to be represented in the index.
In fact there is a maximum of 65536 block entries in an index.
The writer can use any method to reduce the number of entries.
An implicit block start at 0,0 can be assumed.
### Decoding entries:
```
// Read Uncompressed entries.
// Each assumes EstBlockSize delta from previous.
for each entry {
uOff = 0
if HasUncompressedOffsets == 1 {
uOff = ReadVarInt // Read value from stream
}
// Except for the first entry, use previous values.
if entryNum == 0 {
entry[entryNum].UncompressedOffset = uOff
continue
}
// Uncompressed uses previous offset and adds EstBlockSize
entry[entryNum].UncompressedOffset = entry[entryNum-1].UncompressedOffset + EstBlockSize
}
// Guess that the first block will be 50% of uncompressed size.
// Integer truncating division must be used.
CompressGuess := EstBlockSize / 2
// Read Compressed entries.
// Each assumes CompressGuess delta from previous.
// CompressGuess is adjusted for each value.
for each entry {
cOff = ReadVarInt // Read value from stream
// Except for the first entry, use previous values.
if entryNum == 0 {
entry[entryNum].CompressedOffset = cOff
continue
}
// Compressed uses previous and our estimate.
entry[entryNum].CompressedOffset = entry[entryNum-1].CompressedOffset + CompressGuess
// Adjust compressed offset for next loop, integer truncating division must be used.
CompressGuess += cOff/2
}
```
# Format Extensions

View File

@ -8,7 +8,9 @@ package s2
import (
"encoding/binary"
"errors"
"fmt"
"io"
"io/ioutil"
)
var (
@ -22,6 +24,16 @@ var (
ErrUnsupported = errors.New("s2: unsupported input")
)
// ErrCantSeek is returned if the stream cannot be seeked.
type ErrCantSeek struct {
Reason string
}
// Error returns the error as string.
func (e ErrCantSeek) Error() string {
return fmt.Sprintf("s2: Can't seek because %s", e.Reason)
}
// DecodedLen returns the length of the decoded block.
func DecodedLen(src []byte) (int, error) {
v, _, err := decodedLen(src)
@ -88,6 +100,7 @@ func NewReader(r io.Reader, opts ...ReaderOption) *Reader {
} else {
nr.buf = make([]byte, MaxEncodedLen(defaultBlockSize)+checksumSize)
}
nr.readHeader = nr.ignoreStreamID
nr.paramsOK = true
return &nr
}
@ -131,12 +144,41 @@ func ReaderAllocBlock(blockSize int) ReaderOption {
}
}
// ReaderIgnoreStreamIdentifier will make the reader skip the expected
// stream identifier at the beginning of the stream.
// This can be used when serving a stream that has been forwarded to a specific point.
func ReaderIgnoreStreamIdentifier() ReaderOption {
return func(r *Reader) error {
r.ignoreStreamID = true
return nil
}
}
// ReaderSkippableCB will register a callback for chuncks with the specified ID.
// ID must be a Reserved skippable chunks ID, 0x80-0xfd (inclusive).
// For each chunk with the ID, the callback is called with the content.
// Any returned non-nil error will abort decompression.
// Only one callback per ID is supported, latest sent will be used.
func ReaderSkippableCB(id uint8, fn func(r io.Reader) error) ReaderOption {
return func(r *Reader) error {
if id < 0x80 || id > 0xfd {
return fmt.Errorf("ReaderSkippableCB: Invalid id provided, must be 0x80-0xfd (inclusive)")
}
r.skippableCB[id] = fn
return nil
}
}
// Reader is an io.Reader that can read Snappy-compressed bytes.
type Reader struct {
r io.Reader
err error
decoded []byte
buf []byte
r io.Reader
err error
decoded []byte
buf []byte
skippableCB [0x80]func(r io.Reader) error
blockStart int64 // Uncompressed offset at start of current.
index *Index
// decoded[i:j] contains decoded bytes that have not yet been passed on.
i, j int
// maximum block size allowed.
@ -144,10 +186,11 @@ type Reader struct {
// maximum expected buffer size.
maxBufSize int
// alloc a buffer this size if > 0.
lazyBuf int
readHeader bool
paramsOK bool
snappyFrame bool
lazyBuf int
readHeader bool
paramsOK bool
snappyFrame bool
ignoreStreamID bool
}
// ensureBufferSize will ensure that the buffer can take at least n bytes.
@ -172,11 +215,12 @@ func (r *Reader) Reset(reader io.Reader) {
if !r.paramsOK {
return
}
r.index = nil
r.r = reader
r.err = nil
r.i = 0
r.j = 0
r.readHeader = false
r.readHeader = r.ignoreStreamID
}
func (r *Reader) readFull(p []byte, allowEOF bool) (ok bool) {
@ -189,11 +233,24 @@ func (r *Reader) readFull(p []byte, allowEOF bool) (ok bool) {
return true
}
// skipN will skip n bytes.
// skippable will skip n bytes.
// If the supplied reader supports seeking that is used.
// tmp is used as a temporary buffer for reading.
// The supplied slice does not need to be the size of the read.
func (r *Reader) skipN(tmp []byte, n int, allowEOF bool) (ok bool) {
func (r *Reader) skippable(tmp []byte, n int, allowEOF bool, id uint8) (ok bool) {
if id < 0x80 {
r.err = fmt.Errorf("interbal error: skippable id < 0x80")
return false
}
if fn := r.skippableCB[id-0x80]; fn != nil {
rd := io.LimitReader(r.r, int64(n))
r.err = fn(rd)
if r.err != nil {
return false
}
_, r.err = io.CopyBuffer(ioutil.Discard, rd, tmp)
return r.err == nil
}
if rs, ok := r.r.(io.ReadSeeker); ok {
_, err := rs.Seek(int64(n), io.SeekCurrent)
if err == nil {
@ -247,6 +304,7 @@ func (r *Reader) Read(p []byte) (int, error) {
// https://github.com/google/snappy/blob/master/framing_format.txt
switch chunkType {
case chunkTypeCompressedData:
r.blockStart += int64(r.j)
// Section 4.2. Compressed data (chunk type 0x00).
if chunkLen < checksumSize {
r.err = ErrCorrupt
@ -294,6 +352,7 @@ func (r *Reader) Read(p []byte) (int, error) {
continue
case chunkTypeUncompressedData:
r.blockStart += int64(r.j)
// Section 4.3. Uncompressed data (chunk type 0x01).
if chunkLen < checksumSize {
r.err = ErrCorrupt
@ -357,17 +416,20 @@ func (r *Reader) Read(p []byte) (int, error) {
if chunkType <= 0x7f {
// Section 4.5. Reserved unskippable chunks (chunk types 0x02-0x7f).
// fmt.Printf("ERR chunktype: 0x%x\n", chunkType)
r.err = ErrUnsupported
return 0, r.err
}
// Section 4.4 Padding (chunk type 0xfe).
// Section 4.6. Reserved skippable chunks (chunk types 0x80-0xfd).
if chunkLen > maxBlockSize {
if chunkLen > maxChunkSize {
// fmt.Printf("ERR chunkLen: 0x%x\n", chunkLen)
r.err = ErrUnsupported
return 0, r.err
}
if !r.skipN(r.buf, chunkLen, false) {
// fmt.Printf("skippable: ID: 0x%x, len: 0x%x\n", chunkType, chunkLen)
if !r.skippable(r.buf, chunkLen, false, chunkType) {
return 0, r.err
}
}
@ -396,7 +458,7 @@ func (r *Reader) Skip(n int64) error {
return nil
}
n -= int64(r.j - r.i)
r.i, r.j = 0, 0
r.i = r.j
}
// Buffer empty; read blocks until we have content.
@ -420,6 +482,7 @@ func (r *Reader) Skip(n int64) error {
// https://github.com/google/snappy/blob/master/framing_format.txt
switch chunkType {
case chunkTypeCompressedData:
r.blockStart += int64(r.j)
// Section 4.2. Compressed data (chunk type 0x00).
if chunkLen < checksumSize {
r.err = ErrCorrupt
@ -468,6 +531,7 @@ func (r *Reader) Skip(n int64) error {
r.i, r.j = 0, dLen
continue
case chunkTypeUncompressedData:
r.blockStart += int64(r.j)
// Section 4.3. Uncompressed data (chunk type 0x01).
if chunkLen < checksumSize {
r.err = ErrCorrupt
@ -528,19 +592,138 @@ func (r *Reader) Skip(n int64) error {
r.err = ErrUnsupported
return r.err
}
if chunkLen > maxBlockSize {
if chunkLen > maxChunkSize {
r.err = ErrUnsupported
return r.err
}
// Section 4.4 Padding (chunk type 0xfe).
// Section 4.6. Reserved skippable chunks (chunk types 0x80-0xfd).
if !r.skipN(r.buf, chunkLen, false) {
if !r.skippable(r.buf, chunkLen, false, chunkType) {
return r.err
}
}
return nil
}
// ReadSeeker provides random or forward seeking in compressed content.
// See Reader.ReadSeeker
type ReadSeeker struct {
*Reader
}
// ReadSeeker will return an io.ReadSeeker compatible version of the reader.
// If 'random' is specified the returned io.Seeker can be used for
// random seeking, otherwise only forward seeking is supported.
// Enabling random seeking requires the original input to support
// the io.Seeker interface.
// A custom index can be specified which will be used if supplied.
// When using a custom index, it will not be read from the input stream.
// The returned ReadSeeker contains a shallow reference to the existing Reader,
// meaning changes performed to one is reflected in the other.
func (r *Reader) ReadSeeker(random bool, index []byte) (*ReadSeeker, error) {
// Read index if provided.
if len(index) != 0 {
if r.index == nil {
r.index = &Index{}
}
if _, err := r.index.Load(index); err != nil {
return nil, ErrCantSeek{Reason: "loading index returned: " + err.Error()}
}
}
// Check if input is seekable
rs, ok := r.r.(io.ReadSeeker)
if !ok {
if !random {
return &ReadSeeker{Reader: r}, nil
}
return nil, ErrCantSeek{Reason: "input stream isn't seekable"}
}
if r.index != nil {
// Seekable and index, ok...
return &ReadSeeker{Reader: r}, nil
}
// Load from stream.
r.index = &Index{}
// Read current position.
pos, err := rs.Seek(0, io.SeekCurrent)
if err != nil {
return nil, ErrCantSeek{Reason: "seeking input returned: " + err.Error()}
}
err = r.index.LoadStream(rs)
if err != nil {
if err == ErrUnsupported {
return nil, ErrCantSeek{Reason: "input stream does not contain an index"}
}
return nil, ErrCantSeek{Reason: "reading index returned: " + err.Error()}
}
// reset position.
_, err = rs.Seek(pos, io.SeekStart)
if err != nil {
return nil, ErrCantSeek{Reason: "seeking input returned: " + err.Error()}
}
return &ReadSeeker{Reader: r}, nil
}
// Seek allows seeking in compressed data.
func (r *ReadSeeker) Seek(offset int64, whence int) (int64, error) {
if r.err != nil {
return 0, r.err
}
if offset == 0 && whence == io.SeekCurrent {
return r.blockStart + int64(r.i), nil
}
if !r.readHeader {
// Make sure we read the header.
_, r.err = r.Read([]byte{})
}
rs, ok := r.r.(io.ReadSeeker)
if r.index == nil || !ok {
if whence == io.SeekCurrent && offset >= 0 {
err := r.Skip(offset)
return r.blockStart + int64(r.i), err
}
if whence == io.SeekStart && offset >= r.blockStart+int64(r.i) {
err := r.Skip(offset - r.blockStart - int64(r.i))
return r.blockStart + int64(r.i), err
}
return 0, ErrUnsupported
}
switch whence {
case io.SeekCurrent:
offset += r.blockStart + int64(r.i)
case io.SeekEnd:
offset = -offset
}
c, u, err := r.index.Find(offset)
if err != nil {
return r.blockStart + int64(r.i), err
}
// Seek to next block
_, err = rs.Seek(c, io.SeekStart)
if err != nil {
return 0, err
}
if offset < 0 {
offset = r.index.TotalUncompressed + offset
}
r.i = r.j // Remove rest of current block.
if u < offset {
// Forward inside block
return offset, r.Skip(offset - u)
}
return offset, nil
}
// ReadByte satisfies the io.ByteReader interface.
func (r *Reader) ReadByte() (byte, error) {
if r.err != nil {
@ -563,3 +746,17 @@ func (r *Reader) ReadByte() (byte, error) {
}
return 0, io.ErrNoProgress
}
// SkippableCB will register a callback for chunks with the specified ID.
// ID must be a Reserved skippable chunks ID, 0x80-0xfe (inclusive).
// For each chunk with the ID, the callback is called with the content.
// Any returned non-nil error will abort decompression.
// Only one callback per ID is supported, latest sent will be used.
// Sending a nil function will disable previous callbacks.
func (r *Reader) SkippableCB(id uint8, fn func(r io.Reader) error) error {
if id < 0x80 || id > chunkTypePadding {
return fmt.Errorf("ReaderSkippableCB: Invalid id provided, must be 0x80-0xfe (inclusive)")
}
r.skippableCB[id] = fn
return nil
}

View File

@ -395,23 +395,26 @@ type Writer struct {
// ibuf is a buffer for the incoming (uncompressed) bytes.
ibuf []byte
blockSize int
obufLen int
concurrency int
written int64
output chan chan result
buffers sync.Pool
pad int
blockSize int
obufLen int
concurrency int
written int64
uncompWritten int64 // Bytes sent to compression
output chan chan result
buffers sync.Pool
pad int
writer io.Writer
randSrc io.Reader
writerWg sync.WaitGroup
index Index
// wroteStreamHeader is whether we have written the stream header.
wroteStreamHeader bool
paramsOK bool
snappy bool
flushOnWrite bool
appendIndex bool
level uint8
}
@ -422,7 +425,11 @@ const (
levelBest
)
type result []byte
type result struct {
b []byte
// Uncompressed start offset
startOffset int64
}
// err returns the previously set error.
// If no error has been set it is set to err if not nil.
@ -454,6 +461,9 @@ func (w *Writer) Reset(writer io.Writer) {
w.wroteStreamHeader = false
w.written = 0
w.writer = writer
w.uncompWritten = 0
w.index.reset(w.blockSize)
// If we didn't get a writer, stop here.
if writer == nil {
return
@ -474,7 +484,8 @@ func (w *Writer) Reset(writer io.Writer) {
// Get a queued write.
for write := range toWrite {
// Wait for the data to be available.
in := <-write
input := <-write
in := input.b
if len(in) > 0 {
if w.err(nil) == nil {
// Don't expose data from previous buffers.
@ -485,11 +496,12 @@ func (w *Writer) Reset(writer io.Writer) {
err = io.ErrShortBuffer
}
_ = w.err(err)
w.err(w.index.add(w.written, input.startOffset))
w.written += int64(n)
}
}
if cap(in) >= w.obufLen {
w.buffers.Put([]byte(in))
w.buffers.Put(in)
}
// close the incoming write request.
// This can be used for synchronizing flushes.
@ -500,6 +512,9 @@ func (w *Writer) Reset(writer io.Writer) {
// Write satisfies the io.Writer interface.
func (w *Writer) Write(p []byte) (nRet int, errRet error) {
if err := w.err(nil); err != nil {
return 0, err
}
if w.flushOnWrite {
return w.write(p)
}
@ -535,6 +550,9 @@ func (w *Writer) Write(p []byte) (nRet int, errRet error) {
// The return value n is the number of bytes read.
// Any error except io.EOF encountered during the read is also returned.
func (w *Writer) ReadFrom(r io.Reader) (n int64, err error) {
if err := w.err(nil); err != nil {
return 0, err
}
if len(w.ibuf) > 0 {
err := w.Flush()
if err != nil {
@ -577,6 +595,85 @@ func (w *Writer) ReadFrom(r io.Reader) (n int64, err error) {
return n, w.err(nil)
}
// AddSkippableBlock will add a skippable block to the stream.
// The ID must be 0x80-0xfe (inclusive).
// Length of the skippable block must be <= 16777215 bytes.
func (w *Writer) AddSkippableBlock(id uint8, data []byte) (err error) {
if err := w.err(nil); err != nil {
return err
}
if len(data) == 0 {
return nil
}
if id < 0x80 || id > chunkTypePadding {
return fmt.Errorf("invalid skippable block id %x", id)
}
if len(data) > maxChunkSize {
return fmt.Errorf("skippable block excessed maximum size")
}
var header [4]byte
chunkLen := 4 + len(data)
header[0] = id
header[1] = uint8(chunkLen >> 0)
header[2] = uint8(chunkLen >> 8)
header[3] = uint8(chunkLen >> 16)
if w.concurrency == 1 {
write := func(b []byte) error {
n, err := w.writer.Write(b)
if err = w.err(err); err != nil {
return err
}
if n != len(data) {
return w.err(io.ErrShortWrite)
}
w.written += int64(n)
return w.err(nil)
}
if !w.wroteStreamHeader {
w.wroteStreamHeader = true
if w.snappy {
if err := write([]byte(magicChunkSnappy)); err != nil {
return err
}
} else {
if err := write([]byte(magicChunk)); err != nil {
return err
}
}
}
if err := write(header[:]); err != nil {
return err
}
if err := write(data); err != nil {
return err
}
}
// Create output...
if !w.wroteStreamHeader {
w.wroteStreamHeader = true
hWriter := make(chan result)
w.output <- hWriter
if w.snappy {
hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunkSnappy)}
} else {
hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunk)}
}
}
// Copy input.
inbuf := w.buffers.Get().([]byte)[:4]
copy(inbuf, header[:])
inbuf = append(inbuf, data...)
output := make(chan result, 1)
// Queue output.
w.output <- output
output <- result{startOffset: w.uncompWritten, b: inbuf}
return nil
}
// EncodeBuffer will add a buffer to the stream.
// This is the fastest way to encode a stream,
// but the input buffer cannot be written to by the caller
@ -614,9 +711,9 @@ func (w *Writer) EncodeBuffer(buf []byte) (err error) {
hWriter := make(chan result)
w.output <- hWriter
if w.snappy {
hWriter <- []byte(magicChunkSnappy)
hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunkSnappy)}
} else {
hWriter <- []byte(magicChunk)
hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunk)}
}
}
@ -632,6 +729,10 @@ func (w *Writer) EncodeBuffer(buf []byte) (err error) {
output := make(chan result)
// Queue output now, so we keep order.
w.output <- output
res := result{
startOffset: w.uncompWritten,
}
w.uncompWritten += int64(len(uncompressed))
go func() {
checksum := crc(uncompressed)
@ -664,7 +765,8 @@ func (w *Writer) EncodeBuffer(buf []byte) (err error) {
obuf[7] = uint8(checksum >> 24)
// Queue final output.
output <- obuf
res.b = obuf
output <- res
}()
}
return nil
@ -708,9 +810,9 @@ func (w *Writer) write(p []byte) (nRet int, errRet error) {
hWriter := make(chan result)
w.output <- hWriter
if w.snappy {
hWriter <- []byte(magicChunkSnappy)
hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunkSnappy)}
} else {
hWriter <- []byte(magicChunk)
hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunk)}
}
}
@ -731,6 +833,11 @@ func (w *Writer) write(p []byte) (nRet int, errRet error) {
output := make(chan result)
// Queue output now, so we keep order.
w.output <- output
res := result{
startOffset: w.uncompWritten,
}
w.uncompWritten += int64(len(uncompressed))
go func() {
checksum := crc(uncompressed)
@ -763,7 +870,8 @@ func (w *Writer) write(p []byte) (nRet int, errRet error) {
obuf[7] = uint8(checksum >> 24)
// Queue final output.
output <- obuf
res.b = obuf
output <- res
// Put unused buffer back in pool.
w.buffers.Put(inbuf)
@ -793,9 +901,9 @@ func (w *Writer) writeFull(inbuf []byte) (errRet error) {
hWriter := make(chan result)
w.output <- hWriter
if w.snappy {
hWriter <- []byte(magicChunkSnappy)
hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunkSnappy)}
} else {
hWriter <- []byte(magicChunk)
hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunk)}
}
}
@ -806,6 +914,11 @@ func (w *Writer) writeFull(inbuf []byte) (errRet error) {
output := make(chan result)
// Queue output now, so we keep order.
w.output <- output
res := result{
startOffset: w.uncompWritten,
}
w.uncompWritten += int64(len(uncompressed))
go func() {
checksum := crc(uncompressed)
@ -838,7 +951,8 @@ func (w *Writer) writeFull(inbuf []byte) (errRet error) {
obuf[7] = uint8(checksum >> 24)
// Queue final output.
output <- obuf
res.b = obuf
output <- res
// Put unused buffer back in pool.
w.buffers.Put(inbuf)
@ -912,7 +1026,10 @@ func (w *Writer) writeSync(p []byte) (nRet int, errRet error) {
if n != len(obuf) {
return 0, w.err(io.ErrShortWrite)
}
w.err(w.index.add(w.written, w.uncompWritten))
w.written += int64(n)
w.uncompWritten += int64(len(uncompressed))
if chunkType == chunkTypeUncompressedData {
// Write uncompressed data.
n, err := w.writer.Write(uncompressed)
@ -961,38 +1078,87 @@ func (w *Writer) Flush() error {
res := make(chan result)
w.output <- res
// Block until this has been picked up.
res <- nil
res <- result{b: nil, startOffset: w.uncompWritten}
// When it is closed, we have flushed.
<-res
return w.err(nil)
}
// Close calls Flush and then closes the Writer.
// Calling Close multiple times is ok.
// Calling Close multiple times is ok,
// but calling CloseIndex after this will make it not return the index.
func (w *Writer) Close() error {
_, err := w.closeIndex(w.appendIndex)
return err
}
// CloseIndex calls Close and returns an index on first call.
// This is not required if you are only adding index to a stream.
func (w *Writer) CloseIndex() ([]byte, error) {
return w.closeIndex(true)
}
func (w *Writer) closeIndex(idx bool) ([]byte, error) {
err := w.Flush()
if w.output != nil {
close(w.output)
w.writerWg.Wait()
w.output = nil
}
if w.err(nil) == nil && w.writer != nil && w.pad > 0 {
add := calcSkippableFrame(w.written, int64(w.pad))
frame, err := skippableFrame(w.ibuf[:0], add, w.randSrc)
if err = w.err(err); err != nil {
return err
}
_, err2 := w.writer.Write(frame)
_ = w.err(err2)
}
_ = w.err(errClosed)
if err == errClosed {
return nil
}
return err
}
const skippableFrameHeader = 4
var index []byte
if w.err(nil) == nil && w.writer != nil {
// Create index.
if idx {
compSize := int64(-1)
if w.pad <= 1 {
compSize = w.written
}
index = w.index.appendTo(w.ibuf[:0], w.uncompWritten, compSize)
// Count as written for padding.
if w.appendIndex {
w.written += int64(len(index))
}
if true {
_, err := w.index.Load(index)
if err != nil {
panic(err)
}
}
}
if w.pad > 1 {
tmp := w.ibuf[:0]
if len(index) > 0 {
// Allocate another buffer.
tmp = w.buffers.Get().([]byte)[:0]
defer w.buffers.Put(tmp)
}
add := calcSkippableFrame(w.written, int64(w.pad))
frame, err := skippableFrame(tmp, add, w.randSrc)
if err = w.err(err); err != nil {
return nil, err
}
n, err2 := w.writer.Write(frame)
if err2 == nil && n != len(frame) {
err2 = io.ErrShortWrite
}
_ = w.err(err2)
}
if len(index) > 0 && w.appendIndex {
n, err2 := w.writer.Write(index)
if err2 == nil && n != len(index) {
err2 = io.ErrShortWrite
}
_ = w.err(err2)
}
}
err = w.err(errClosed)
if err == errClosed {
return index, nil
}
return nil, err
}
// calcSkippableFrame will return a total size to be added for written
// to be divisible by multiple.
@ -1057,6 +1223,15 @@ func WriterConcurrency(n int) WriterOption {
}
}
// WriterAddIndex will append an index to the end of a stream
// when it is closed.
func WriterAddIndex() WriterOption {
return func(w *Writer) error {
w.appendIndex = true
return nil
}
}
// WriterBetterCompression will enable better compression.
// EncodeBetter compresses better than Encode but typically with a
// 10-40% speed decrease on both compression and decompression.

525
vendor/github.com/klauspost/compress/s2/index.go generated vendored Normal file
View File

@ -0,0 +1,525 @@
// Copyright (c) 2022+ Klaus Post. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package s2
import (
"bytes"
"encoding/binary"
"encoding/json"
"fmt"
"io"
)
const (
S2IndexHeader = "s2idx\x00"
S2IndexTrailer = "\x00xdi2s"
maxIndexEntries = 1 << 16
)
// Index represents an S2/Snappy index.
type Index struct {
TotalUncompressed int64 // Total Uncompressed size if known. Will be -1 if unknown.
TotalCompressed int64 // Total Compressed size if known. Will be -1 if unknown.
info []struct {
compressedOffset int64
uncompressedOffset int64
}
estBlockUncomp int64
}
func (i *Index) reset(maxBlock int) {
i.estBlockUncomp = int64(maxBlock)
i.TotalCompressed = -1
i.TotalUncompressed = -1
if len(i.info) > 0 {
i.info = i.info[:0]
}
}
// allocInfos will allocate an empty slice of infos.
func (i *Index) allocInfos(n int) {
if n > maxIndexEntries {
panic("n > maxIndexEntries")
}
i.info = make([]struct {
compressedOffset int64
uncompressedOffset int64
}, 0, n)
}
// add an uncompressed and compressed pair.
// Entries must be sent in order.
func (i *Index) add(compressedOffset, uncompressedOffset int64) error {
if i == nil {
return nil
}
lastIdx := len(i.info) - 1
if lastIdx >= 0 {
latest := i.info[lastIdx]
if latest.uncompressedOffset == uncompressedOffset {
// Uncompressed didn't change, don't add entry,
// but update start index.
latest.compressedOffset = compressedOffset
i.info[lastIdx] = latest
return nil
}
if latest.uncompressedOffset > uncompressedOffset {
return fmt.Errorf("internal error: Earlier uncompressed received (%d > %d)", latest.uncompressedOffset, uncompressedOffset)
}
if latest.compressedOffset > compressedOffset {
return fmt.Errorf("internal error: Earlier compressed received (%d > %d)", latest.uncompressedOffset, uncompressedOffset)
}
}
i.info = append(i.info, struct {
compressedOffset int64
uncompressedOffset int64
}{compressedOffset: compressedOffset, uncompressedOffset: uncompressedOffset})
return nil
}
// Find the offset at or before the wanted (uncompressed) offset.
// If offset is 0 or positive it is the offset from the beginning of the file.
// If the uncompressed size is known, the offset must be within the file.
// If an offset outside the file is requested io.ErrUnexpectedEOF is returned.
// If the offset is negative, it is interpreted as the distance from the end of the file,
// where -1 represents the last byte.
// If offset from the end of the file is requested, but size is unknown,
// ErrUnsupported will be returned.
func (i *Index) Find(offset int64) (compressedOff, uncompressedOff int64, err error) {
if i.TotalUncompressed < 0 {
return 0, 0, ErrCorrupt
}
if offset < 0 {
offset = i.TotalUncompressed + offset
if offset < 0 {
return 0, 0, io.ErrUnexpectedEOF
}
}
if offset > i.TotalUncompressed {
return 0, 0, io.ErrUnexpectedEOF
}
for _, info := range i.info {
if info.uncompressedOffset > offset {
break
}
compressedOff = info.compressedOffset
uncompressedOff = info.uncompressedOffset
}
return compressedOff, uncompressedOff, nil
}
// reduce to stay below maxIndexEntries
func (i *Index) reduce() {
if len(i.info) < maxIndexEntries && i.estBlockUncomp >= 1<<20 {
return
}
// Algorithm, keep 1, remove removeN entries...
removeN := (len(i.info) + 1) / maxIndexEntries
src := i.info
j := 0
// Each block should be at least 1MB, but don't reduce below 1000 entries.
for i.estBlockUncomp*(int64(removeN)+1) < 1<<20 && len(i.info)/(removeN+1) > 1000 {
removeN++
}
for idx := 0; idx < len(src); idx++ {
i.info[j] = src[idx]
j++
idx += removeN
}
i.info = i.info[:j]
// Update maxblock estimate.
i.estBlockUncomp += i.estBlockUncomp * int64(removeN)
}
func (i *Index) appendTo(b []byte, uncompTotal, compTotal int64) []byte {
i.reduce()
var tmp [binary.MaxVarintLen64]byte
initSize := len(b)
// We make the start a skippable header+size.
b = append(b, ChunkTypeIndex, 0, 0, 0)
b = append(b, []byte(S2IndexHeader)...)
// Total Uncompressed size
n := binary.PutVarint(tmp[:], uncompTotal)
b = append(b, tmp[:n]...)
// Total Compressed size
n = binary.PutVarint(tmp[:], compTotal)
b = append(b, tmp[:n]...)
// Put EstBlockUncomp size
n = binary.PutVarint(tmp[:], i.estBlockUncomp)
b = append(b, tmp[:n]...)
// Put length
n = binary.PutVarint(tmp[:], int64(len(i.info)))
b = append(b, tmp[:n]...)
// Check if we should add uncompressed offsets
var hasUncompressed byte
for idx, info := range i.info {
if idx == 0 {
if info.uncompressedOffset != 0 {
hasUncompressed = 1
break
}
continue
}
if info.uncompressedOffset != i.info[idx-1].uncompressedOffset+i.estBlockUncomp {
hasUncompressed = 1
break
}
}
b = append(b, hasUncompressed)
// Add each entry
if hasUncompressed == 1 {
for idx, info := range i.info {
uOff := info.uncompressedOffset
if idx > 0 {
prev := i.info[idx-1]
uOff -= prev.uncompressedOffset + (i.estBlockUncomp)
}
n = binary.PutVarint(tmp[:], uOff)
b = append(b, tmp[:n]...)
}
}
// Initial compressed size estimate.
cPredict := i.estBlockUncomp / 2
for idx, info := range i.info {
cOff := info.compressedOffset
if idx > 0 {
prev := i.info[idx-1]
cOff -= prev.compressedOffset + cPredict
// Update compressed size prediction, with half the error.
cPredict += cOff / 2
}
n = binary.PutVarint(tmp[:], cOff)
b = append(b, tmp[:n]...)
}
// Add Total Size.
// Stored as fixed size for easier reading.
binary.LittleEndian.PutUint32(tmp[:], uint32(len(b)-initSize+4+len(S2IndexTrailer)))
b = append(b, tmp[:4]...)
// Trailer
b = append(b, []byte(S2IndexTrailer)...)
// Update size
chunkLen := len(b) - initSize - skippableFrameHeader
b[initSize+1] = uint8(chunkLen >> 0)
b[initSize+2] = uint8(chunkLen >> 8)
b[initSize+3] = uint8(chunkLen >> 16)
//fmt.Printf("chunklen: 0x%x Uncomp:%d, Comp:%d\n", chunkLen, uncompTotal, compTotal)
return b
}
// Load a binary index.
// A zero value Index can be used or a previous one can be reused.
func (i *Index) Load(b []byte) ([]byte, error) {
if len(b) <= 4+len(S2IndexHeader)+len(S2IndexTrailer) {
return b, io.ErrUnexpectedEOF
}
if b[0] != ChunkTypeIndex {
return b, ErrCorrupt
}
chunkLen := int(b[1]) | int(b[2])<<8 | int(b[3])<<16
b = b[4:]
// Validate we have enough...
if len(b) < chunkLen {
return b, io.ErrUnexpectedEOF
}
if !bytes.Equal(b[:len(S2IndexHeader)], []byte(S2IndexHeader)) {
return b, ErrUnsupported
}
b = b[len(S2IndexHeader):]
// Total Uncompressed
if v, n := binary.Varint(b); n <= 0 || v < 0 {
return b, ErrCorrupt
} else {
i.TotalUncompressed = v
b = b[n:]
}
// Total Compressed
if v, n := binary.Varint(b); n <= 0 {
return b, ErrCorrupt
} else {
i.TotalCompressed = v
b = b[n:]
}
// Read EstBlockUncomp
if v, n := binary.Varint(b); n <= 0 {
return b, ErrCorrupt
} else {
if v < 0 {
return b, ErrCorrupt
}
i.estBlockUncomp = v
b = b[n:]
}
var entries int
if v, n := binary.Varint(b); n <= 0 {
return b, ErrCorrupt
} else {
if v < 0 || v > maxIndexEntries {
return b, ErrCorrupt
}
entries = int(v)
b = b[n:]
}
if cap(i.info) < entries {
i.allocInfos(entries)
}
i.info = i.info[:entries]
if len(b) < 1 {
return b, io.ErrUnexpectedEOF
}
hasUncompressed := b[0]
b = b[1:]
if hasUncompressed&1 != hasUncompressed {
return b, ErrCorrupt
}
// Add each uncompressed entry
for idx := range i.info {
var uOff int64
if hasUncompressed != 0 {
// Load delta
if v, n := binary.Varint(b); n <= 0 {
return b, ErrCorrupt
} else {
uOff = v
b = b[n:]
}
}
if idx > 0 {
prev := i.info[idx-1].uncompressedOffset
uOff += prev + (i.estBlockUncomp)
if uOff <= prev {
return b, ErrCorrupt
}
}
if uOff < 0 {
return b, ErrCorrupt
}
i.info[idx].uncompressedOffset = uOff
}
// Initial compressed size estimate.
cPredict := i.estBlockUncomp / 2
// Add each compressed entry
for idx := range i.info {
var cOff int64
if v, n := binary.Varint(b); n <= 0 {
return b, ErrCorrupt
} else {
cOff = v
b = b[n:]
}
if idx > 0 {
// Update compressed size prediction, with half the error.
cPredictNew := cPredict + cOff/2
prev := i.info[idx-1].compressedOffset
cOff += prev + cPredict
if cOff <= prev {
return b, ErrCorrupt
}
cPredict = cPredictNew
}
if cOff < 0 {
return b, ErrCorrupt
}
i.info[idx].compressedOffset = cOff
}
if len(b) < 4+len(S2IndexTrailer) {
return b, io.ErrUnexpectedEOF
}
// Skip size...
b = b[4:]
// Check trailer...
if !bytes.Equal(b[:len(S2IndexTrailer)], []byte(S2IndexTrailer)) {
return b, ErrCorrupt
}
return b[len(S2IndexTrailer):], nil
}
// LoadStream will load an index from the end of the supplied stream.
// ErrUnsupported will be returned if the signature cannot be found.
// ErrCorrupt will be returned if unexpected values are found.
// io.ErrUnexpectedEOF is returned if there are too few bytes.
// IO errors are returned as-is.
func (i *Index) LoadStream(rs io.ReadSeeker) error {
// Go to end.
_, err := rs.Seek(-10, io.SeekEnd)
if err != nil {
return err
}
var tmp [10]byte
_, err = io.ReadFull(rs, tmp[:])
if err != nil {
return err
}
// Check trailer...
if !bytes.Equal(tmp[4:4+len(S2IndexTrailer)], []byte(S2IndexTrailer)) {
return ErrUnsupported
}
sz := binary.LittleEndian.Uint32(tmp[:4])
if sz > maxChunkSize+skippableFrameHeader {
return ErrCorrupt
}
_, err = rs.Seek(-int64(sz), io.SeekEnd)
if err != nil {
return err
}
// Read index.
buf := make([]byte, sz)
_, err = io.ReadFull(rs, buf)
if err != nil {
return err
}
_, err = i.Load(buf)
return err
}
// IndexStream will return an index for a stream.
// The stream structure will be checked, but
// data within blocks is not verified.
// The returned index can either be appended to the end of the stream
// or stored separately.
func IndexStream(r io.Reader) ([]byte, error) {
var i Index
var buf [maxChunkSize]byte
var readHeader bool
for {
_, err := io.ReadFull(r, buf[:4])
if err != nil {
if err == io.EOF {
return i.appendTo(nil, i.TotalUncompressed, i.TotalCompressed), nil
}
return nil, err
}
// Start of this chunk.
startChunk := i.TotalCompressed
i.TotalCompressed += 4
chunkType := buf[0]
if !readHeader {
if chunkType != chunkTypeStreamIdentifier {
return nil, ErrCorrupt
}
readHeader = true
}
chunkLen := int(buf[1]) | int(buf[2])<<8 | int(buf[3])<<16
if chunkLen < checksumSize {
return nil, ErrCorrupt
}
i.TotalCompressed += int64(chunkLen)
_, err = io.ReadFull(r, buf[:chunkLen])
if err != nil {
return nil, io.ErrUnexpectedEOF
}
// The chunk types are specified at
// https://github.com/google/snappy/blob/master/framing_format.txt
switch chunkType {
case chunkTypeCompressedData:
// Section 4.2. Compressed data (chunk type 0x00).
// Skip checksum.
dLen, err := DecodedLen(buf[checksumSize:])
if err != nil {
return nil, err
}
if dLen > maxBlockSize {
return nil, ErrCorrupt
}
if i.estBlockUncomp == 0 {
// Use first block for estimate...
i.estBlockUncomp = int64(dLen)
}
err = i.add(startChunk, i.TotalUncompressed)
if err != nil {
return nil, err
}
i.TotalUncompressed += int64(dLen)
continue
case chunkTypeUncompressedData:
n2 := chunkLen - checksumSize
if n2 > maxBlockSize {
return nil, ErrCorrupt
}
if i.estBlockUncomp == 0 {
// Use first block for estimate...
i.estBlockUncomp = int64(n2)
}
err = i.add(startChunk, i.TotalUncompressed)
if err != nil {
return nil, err
}
i.TotalUncompressed += int64(n2)
continue
case chunkTypeStreamIdentifier:
// Section 4.1. Stream identifier (chunk type 0xff).
if chunkLen != len(magicBody) {
return nil, ErrCorrupt
}
if string(buf[:len(magicBody)]) != magicBody {
if string(buf[:len(magicBody)]) != magicBodySnappy {
return nil, ErrCorrupt
}
}
continue
}
if chunkType <= 0x7f {
// Section 4.5. Reserved unskippable chunks (chunk types 0x02-0x7f).
return nil, ErrUnsupported
}
if chunkLen > maxChunkSize {
return nil, ErrUnsupported
}
// Section 4.4 Padding (chunk type 0xfe).
// Section 4.6. Reserved skippable chunks (chunk types 0x80-0xfd).
}
}
// JSON returns the index as JSON text.
func (i *Index) JSON() []byte {
x := struct {
TotalUncompressed int64 `json:"total_uncompressed"` // Total Uncompressed size if known. Will be -1 if unknown.
TotalCompressed int64 `json:"total_compressed"` // Total Compressed size if known. Will be -1 if unknown.
Offsets []struct {
CompressedOffset int64 `json:"compressed"`
UncompressedOffset int64 `json:"uncompressed"`
} `json:"offsets"`
EstBlockUncomp int64 `json:"est_block_uncompressed"`
}{
TotalUncompressed: i.TotalUncompressed,
TotalCompressed: i.TotalCompressed,
EstBlockUncomp: i.estBlockUncomp,
}
for _, v := range i.info {
x.Offsets = append(x.Offsets, struct {
CompressedOffset int64 `json:"compressed"`
UncompressedOffset int64 `json:"uncompressed"`
}{CompressedOffset: v.compressedOffset, UncompressedOffset: v.uncompressedOffset})
}
b, _ := json.MarshalIndent(x, "", " ")
return b
}

View File

@ -87,6 +87,9 @@ const (
// minBlockSize is the minimum size of block setting when creating a writer.
minBlockSize = 4 << 10
skippableFrameHeader = 4
maxChunkSize = 1<<24 - 1 // 16777215
// Default block size
defaultBlockSize = 1 << 20
@ -99,6 +102,7 @@ const (
const (
chunkTypeCompressedData = 0x00
chunkTypeUncompressedData = 0x01
ChunkTypeIndex = 0x99
chunkTypePadding = 0xfe
chunkTypeStreamIdentifier = 0xff
)