diff --git a/proxy/http/chan_reader.go b/common/io/chan_reader.go similarity index 60% rename from proxy/http/chan_reader.go rename to common/io/chan_reader.go index def0344da..89ed1aa93 100644 --- a/proxy/http/chan_reader.go +++ b/common/io/chan_reader.go @@ -1,27 +1,29 @@ -package http +package io import ( "io" + "sync" "github.com/v2ray/v2ray-core/common/alloc" - v2io "github.com/v2ray/v2ray-core/common/io" ) type ChanReader struct { - stream v2io.Reader + sync.Mutex + stream Reader current *alloc.Buffer eof bool } -func NewChanReader(stream v2io.Reader) *ChanReader { +func NewChanReader(stream Reader) *ChanReader { this := &ChanReader{ stream: stream, } - this.fill() + this.Fill() return this } -func (this *ChanReader) fill() { +// @Private +func (this *ChanReader) Fill() { b, err := this.stream.Read() this.current = b if err != nil { @@ -31,8 +33,14 @@ func (this *ChanReader) fill() { } func (this *ChanReader) Read(b []byte) (int, error) { + if this.eof { + return 0, io.EOF + } + + this.Lock() + defer this.Unlock() if this.current == nil { - this.fill() + this.Fill() if this.eof { return 0, io.EOF } @@ -46,3 +54,13 @@ func (this *ChanReader) Read(b []byte) (int, error) { } return nBytes, nil } + +func (this *ChanReader) Release() { + this.Lock() + defer this.Unlock() + + this.eof = true + this.current.Release() + this.current = nil + this.stream = nil +} diff --git a/proxy/http/server.go b/proxy/http/server.go index 1310e22db..3e58dea24 100644 --- a/proxy/http/server.go +++ b/proxy/http/server.go @@ -248,7 +248,7 @@ func (this *HttpProxyServer) handlePlainHTTP(request *http.Request, dest v2net.D finish.Add(1) go func() { defer finish.Done() - responseReader := bufio.NewReader(NewChanReader(ray.InboundOutput())) + responseReader := bufio.NewReader(v2io.NewChanReader(ray.InboundOutput())) response, err := http.ReadResponse(responseReader, request) if err != nil { log.Warning("HTTP: Failed to read response: ", err)