diff --git a/common/net/transport.go b/common/net/transport.go index a25d4f6a7..fa02af7eb 100644 --- a/common/net/transport.go +++ b/common/net/transport.go @@ -4,6 +4,11 @@ import ( "io" ) +const ( + minBufferSizeKilo = 2 + maxBufferSizeKilo = 128 +) + func ReadFrom(reader io.Reader, sizeInKilo int) ([]byte, error) { buffer := make([]byte, sizeInKilo<<10) nBytes, err := reader.Read(buffer) @@ -13,9 +18,23 @@ func ReadFrom(reader io.Reader, sizeInKilo int) ([]byte, error) { return buffer[:nBytes], err } +func roundUp(size int) int { + if size <= minBufferSizeKilo { + return minBufferSizeKilo + } + if size >= maxBufferSizeKilo { + return maxBufferSizeKilo + } + size-- + size |= size >> 1 + size |= size >> 2 + size |= size >> 4 + return size + 1 +} + // ReaderToChan dumps all content from a given reader to a chan by constantly reading it until EOF. func ReaderToChan(stream chan<- []byte, reader io.Reader) error { - bufferSizeKilo := 4 + bufferSizeKilo := 2 for { data, err := ReadFrom(reader, bufferSizeKilo) if len(data) > 0 { @@ -24,6 +43,15 @@ func ReaderToChan(stream chan<- []byte, reader io.Reader) error { if err != nil { return err } + if bufferSizeKilo == maxBufferSizeKilo { + continue + } + dataLenKilo := len(data) >> 10 + if dataLenKilo == bufferSizeKilo { + bufferSizeKilo <<= 1 + } else { + bufferSizeKilo = roundUp(dataLenKilo) + } } } diff --git a/common/net/transport_test.go b/common/net/transport_test.go index a76a4464d..df8ec0dab 100644 --- a/common/net/transport_test.go +++ b/common/net/transport_test.go @@ -44,6 +44,9 @@ func (reader *StaticReader) Read(b []byte) (size int, err error) { if size > reader.total-reader.current { size = reader.total - reader.current } + for i := 0; i < len(b); i++ { + b[i] = byte(i) + } //rand.Read(b[:size]) reader.current += size if reader.current == reader.total { @@ -110,8 +113,8 @@ func BenchmarkTransport10M(b *testing.B) { func runBenchmarkTransport(size int) { - transportChanA := make(chan []byte, 128) - transportChanB := make(chan []byte, 128) + transportChanA := make(chan []byte, 16) + transportChanB := make(chan []byte, 16) readerA := &StaticReader{size, 0} readerB := &StaticReader{size, 0}