diff --git a/common/buf/io.go b/common/buf/io.go index 889fdc34e..e51db9502 100644 --- a/common/buf/io.go +++ b/common/buf/io.go @@ -2,7 +2,11 @@ package buf import ( "io" + "runtime" + "syscall" "time" + + "v2ray.com/core/common/platform" ) // Reader extends io.Reader with MultiBuffer. @@ -46,6 +50,22 @@ func ReadAtLeastFrom(reader io.Reader, size int) Supplier { } } +var useReadv = false + +func init() { + const defaultFlagValue = "NOT_DEFINED_AT_ALL" + value := platform.NewEnvFlag("v2ray.buf.readv.disable").GetValue(func() string { return defaultFlagValue }) + if value != defaultFlagValue { + useReadv = false + return + } + + if runtime.GOOS == "linux" || runtime.GOOS == "darwin" { + newError("ReadV enabled").WriteToLog() + useReadv = true + } +} + // NewReader creates a new Reader. // The Reader instance doesn't take the ownership of reader. func NewReader(reader io.Reader) Reader { @@ -53,6 +73,17 @@ func NewReader(reader io.Reader) Reader { return mr } + if useReadv { + if sc, ok := reader.(syscall.Conn); ok { + rawConn, err := sc.SyscallConn() + if err != nil { + newError("failed to get sysconn").Base(err).WriteToLog() + } else { + return NewReadVReader(reader, rawConn) + } + } + } + return NewBytesToBufferReader(reader) } diff --git a/common/buf/readv_reader.go b/common/buf/readv_reader.go new file mode 100644 index 000000000..d17192b77 --- /dev/null +++ b/common/buf/readv_reader.go @@ -0,0 +1,91 @@ +package buf + +import ( + "io" + "syscall" + "unsafe" +) + +type ReadVReader struct { + io.Reader + rawConn syscall.RawConn + nBuf int32 +} + +func NewReadVReader(reader io.Reader, rawConn syscall.RawConn) *ReadVReader { + return &ReadVReader{ + Reader: reader, + rawConn: rawConn, + nBuf: 1, + } +} + +func allocN(n int32) []*Buffer { + bs := make([]*Buffer, 0, n) + for i := int32(0); i < n; i++ { + bs = append(bs, New()) + } + return bs +} + +func (r *ReadVReader) ReadMultiBuffer() (MultiBuffer, error) { + bs := allocN(r.nBuf) + + iovecs := make([]syscall.Iovec, r.nBuf) + for idx, b := range bs { + iovecs[idx] = syscall.Iovec{ + Base: &(b.v[0]), + } + iovecs[idx].SetLen(int(Size)) + } + + var nBytes int + + err := r.rawConn.Read(func(fd uintptr) bool { + n, _, e := syscall.Syscall(syscall.SYS_READV, fd, uintptr(unsafe.Pointer(&iovecs[0])), uintptr(len(iovecs))) + if e != 0 { + return false + } + nBytes = int(n) + return true + }) + + if err != nil { + mb := MultiBuffer(bs) + mb.Release() + return nil, err + } + + if nBytes == 0 { + mb := MultiBuffer(bs) + mb.Release() + return nil, io.EOF + } + + nBuf := 0 + for nBuf < len(bs) { + if nBytes <= 0 { + break + } + end := int32(nBytes) + if end > Size { + end = Size + } + bs[nBuf].end = end + nBytes -= int(end) + nBuf++ + } + + for i := nBuf; i < len(bs); i++ { + bs[i].Release() + bs[i] = nil + } + + if int32(nBuf) == r.nBuf && nBuf < 128 { + r.nBuf *= 4 + } else { + r.nBuf = int32(nBuf) + } + + return MultiBuffer(bs[:nBuf]), nil +} diff --git a/testing/scenarios/vmess_test.go b/testing/scenarios/vmess_test.go index 8a73cec84..a1a8e2c16 100644 --- a/testing/scenarios/vmess_test.go +++ b/testing/scenarios/vmess_test.go @@ -9,6 +9,7 @@ import ( "v2ray.com/core" "v2ray.com/core/app/log" "v2ray.com/core/app/proxyman" + "v2ray.com/core/common/compare" clog "v2ray.com/core/common/log" "v2ray.com/core/common/net" "v2ray.com/core/common/protocol" @@ -272,6 +273,7 @@ func TestVMessGCM(t *testing.T) { Port: int(clientPort), }) assert(err, IsNil) + defer conn.Close() // nolint: errcheck payload := make([]byte, 10240*1024) rand.Read(payload) @@ -280,9 +282,10 @@ func TestVMessGCM(t *testing.T) { assert(err, IsNil) assert(nBytes, Equals, len(payload)) - response := readFrom(conn, time.Second*20, 10240*1024) - assert(response, Equals, xor([]byte(payload))) - assert(conn.Close(), IsNil) + response := readFrom(conn, time.Second*40, 10240*1024) + if err := compare.BytesEqualWithDetail(response, xor([]byte(payload))); err != nil { + t.Error(err) + } wg.Done() }() }