summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTonis Tiigi <tonistiigi@gmail.com>2015-09-16 16:51:11 -0800
committerTonis Tiigi <tonistiigi@gmail.com>2015-09-23 17:12:54 -0700
commitc5b23337c37f0bcf01a7bf6a5129c326df136396 (patch)
tree999d36a8ce39777d35052fa6ae020c990b041dec
parent56b70bf84e253bfedef94248a0b6b1f8d730091c (diff)
downloaddocker-c5b23337c37f0bcf01a7bf6a5129c326df136396.tar.gz
Make bytesPipe use linear allocations
Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>
-rw-r--r--pkg/ioutils/bytespipe.go109
-rw-r--r--pkg/ioutils/bytespipe_test.go64
-rw-r--r--pkg/ioutils/readers.go4
3 files changed, 120 insertions, 57 deletions
diff --git a/pkg/ioutils/bytespipe.go b/pkg/ioutils/bytespipe.go
index ab06fa1ef1..932e1d1bcc 100644
--- a/pkg/ioutils/bytespipe.go
+++ b/pkg/ioutils/bytespipe.go
@@ -1,15 +1,16 @@
package ioutils
-const maxCap = 10 * 1e6
+const maxCap = 1e6
-// BytesPipe is io.ReadWriter which works similary to pipe(queue).
-// All written data could be read only once. Also BytesPipe trying to adjust
-// internal []byte slice to current needs, so there won't be overgrown buffer
-// after highload peak.
+// BytesPipe is io.ReadWriter which works similarly to pipe(queue).
+// All written data could be read only once. Also BytesPipe is allocating
+// and releasing new byte slices to adjust to current needs, so there won't be
+// overgrown buffer after high load peak.
// BytesPipe isn't goroutine-safe, caller must synchronize it if needed.
type BytesPipe struct {
- buf []byte
- lastRead int
+ buf [][]byte // slice of byte-slices of buffered data
+ lastRead int // index in the first slice to a read point
+ bufLen int // length of data buffered over the slices
}
// NewBytesPipe creates new BytesPipe, initialized by specified slice.
@@ -20,63 +21,69 @@ func NewBytesPipe(buf []byte) *BytesPipe {
buf = make([]byte, 0, 64)
}
return &BytesPipe{
- buf: buf[:0],
- }
-}
-
-func (bp *BytesPipe) grow(n int) {
- if len(bp.buf)+n > cap(bp.buf) {
- // not enough space
- var buf []byte
- remain := bp.len()
- if remain+n <= cap(bp.buf)/2 {
- // enough space in current buffer, just move data to head
- copy(bp.buf, bp.buf[bp.lastRead:])
- buf = bp.buf[:remain]
- } else {
- // reallocate buffer
- buf = make([]byte, remain, 2*cap(bp.buf)+n)
- copy(buf, bp.buf[bp.lastRead:])
- }
- bp.buf = buf
- bp.lastRead = 0
+ buf: [][]byte{buf[:0]},
}
}
// Write writes p to BytesPipe.
-// It can increase cap of internal []byte slice in a process of writing.
+// It can allocate new []byte slices in a process of writing.
func (bp *BytesPipe) Write(p []byte) (n int, err error) {
- bp.grow(len(p))
- bp.buf = append(bp.buf, p...)
+ for {
+ // write data to the last buffer
+ b := bp.buf[len(bp.buf)-1]
+ // copy data to the current empty allocated area
+ n := copy(b[len(b):cap(b)], p)
+ // increment buffered data length
+ bp.bufLen += n
+ // include written data in last buffer
+ bp.buf[len(bp.buf)-1] = b[:len(b)+n]
+
+ // if there was enough room to write all then break
+ if len(p) == n {
+ break
+ }
+
+ // more data: write to the next slice
+ p = p[n:]
+ // allocate slice that has twice the size of the last unless maximum reached
+ nextCap := 2 * cap(bp.buf[len(bp.buf)-1])
+ if maxCap < nextCap {
+ nextCap = maxCap
+ }
+ // add new byte slice to the buffers slice and continue writing
+ bp.buf = append(bp.buf, make([]byte, 0, nextCap))
+ }
return
}
func (bp *BytesPipe) len() int {
- return len(bp.buf) - bp.lastRead
-}
-
-func (bp *BytesPipe) crop() {
- // shortcut for empty buffer
- if bp.lastRead == len(bp.buf) {
- bp.lastRead = 0
- bp.buf = bp.buf[:0]
- }
- r := bp.len()
- // if we have too large buffer for too small data
- if cap(bp.buf) > maxCap && r < cap(bp.buf)/10 {
- copy(bp.buf, bp.buf[bp.lastRead:])
- // will use same underlying slice until reach cap
- bp.buf = bp.buf[:r : cap(bp.buf)/2]
- bp.lastRead = 0
- }
+ return bp.bufLen - bp.lastRead
}
// Read reads bytes from BytesPipe.
// Data could be read only once.
-// Internal []byte slice could be shrinked.
func (bp *BytesPipe) Read(p []byte) (n int, err error) {
- n = copy(p, bp.buf[bp.lastRead:])
- bp.lastRead += n
- bp.crop()
+ for {
+ read := copy(p, bp.buf[0][bp.lastRead:])
+ n += read
+ bp.lastRead += read
+ if bp.len() == 0 {
+ // we have read everything. reset to the beginning.
+ bp.lastRead = 0
+ bp.bufLen -= len(bp.buf[0])
+ bp.buf[0] = bp.buf[0][:0]
+ break
+ }
+ // break if everything was read
+ if len(p) == read {
+ break
+ }
+ // more buffered data and more asked. read from next slice.
+ p = p[read:]
+ bp.lastRead = 0
+ bp.bufLen -= len(bp.buf[0])
+ bp.buf[0] = nil // throw away old slice
+ bp.buf = bp.buf[1:] // switch to next
+ }
return
}
diff --git a/pkg/ioutils/bytespipe_test.go b/pkg/ioutils/bytespipe_test.go
index c7cf795a4a..62b1a186fe 100644
--- a/pkg/ioutils/bytespipe_test.go
+++ b/pkg/ioutils/bytespipe_test.go
@@ -1,6 +1,10 @@
package ioutils
-import "testing"
+import (
+ "crypto/sha1"
+ "encoding/hex"
+ "testing"
+)
func TestBytesPipeRead(t *testing.T) {
buf := NewBytesPipe(nil)
@@ -49,11 +53,67 @@ func TestBytesPipeWrite(t *testing.T) {
buf.Write([]byte("56"))
buf.Write([]byte("78"))
buf.Write([]byte("90"))
- if string(buf.buf) != "1234567890" {
+ if string(buf.buf[0]) != "1234567890" {
t.Fatalf("Buffer %s, must be %s", buf.buf, "1234567890")
}
}
+// Write and read in different speeds/chunk sizes and check valid data is read.
+func TestBytesPipeWriteRandomChunks(t *testing.T) {
+ cases := []struct{ iterations, writesPerLoop, readsPerLoop int }{
+ {100, 10, 1},
+ {1000, 10, 5},
+ {1000, 100, 0},
+ {1000, 5, 6},
+ {10000, 50, 25},
+ }
+
+ testMessage := []byte("this is a random string for testing")
+ // random slice sizes to read and write
+ writeChunks := []int{25, 35, 15, 20}
+ readChunks := []int{5, 45, 20, 25}
+
+ for _, c := range cases {
+ // first pass: write directly to hash
+ hash := sha1.New()
+ for i := 0; i < c.iterations*c.writesPerLoop; i++ {
+ if _, err := hash.Write(testMessage[:writeChunks[i%len(writeChunks)]]); err != nil {
+ t.Fatal(err)
+ }
+ }
+ expected := hex.EncodeToString(hash.Sum(nil))
+
+ // write/read through buffer
+ buf := NewBytesPipe(nil)
+ hash.Reset()
+ for i := 0; i < c.iterations; i++ {
+ for w := 0; w < c.writesPerLoop; w++ {
+ buf.Write(testMessage[:writeChunks[(i*c.writesPerLoop+w)%len(writeChunks)]])
+ }
+ for r := 0; r < c.readsPerLoop; r++ {
+ p := make([]byte, readChunks[(i*c.readsPerLoop+r)%len(readChunks)])
+ n, _ := buf.Read(p)
+ hash.Write(p[:n])
+ }
+ }
+ // read rest of the data from buffer
+ for i := 0; ; i++ {
+ p := make([]byte, readChunks[(c.iterations*c.readsPerLoop+i)%len(readChunks)])
+ n, _ := buf.Read(p)
+ if n == 0 {
+ break
+ }
+ hash.Write(p[:n])
+ }
+ actual := hex.EncodeToString(hash.Sum(nil))
+
+ if expected != actual {
+ t.Fatalf("BytesPipe returned invalid data. Expected checksum %v, got %v", expected, actual)
+ }
+
+ }
+}
+
func BenchmarkBytesPipeWrite(b *testing.B) {
for i := 0; i < b.N; i++ {
buf := NewBytesPipe(nil)
diff --git a/pkg/ioutils/readers.go b/pkg/ioutils/readers.go
index 3e68410571..6b9637ab82 100644
--- a/pkg/ioutils/readers.go
+++ b/pkg/ioutils/readers.go
@@ -5,12 +5,8 @@ import (
"encoding/hex"
"io"
"sync"
-
- "github.com/docker/docker/pkg/random"
)
-var rndSrc = random.NewSource()
-
type readCloserWrapper struct {
io.Reader
closer func() error