diff --git a/app/proxyman/mux/mux.go b/app/proxyman/mux/mux.go index e2ae68fbc..b921dbd69 100644 --- a/app/proxyman/mux/mux.go +++ b/app/proxyman/mux/mux.go @@ -14,6 +14,7 @@ import ( "v2ray.com/core/common/errors" "v2ray.com/core/common/net" "v2ray.com/core/common/protocol" + "v2ray.com/core/common/signal" "v2ray.com/core/proxy" "v2ray.com/core/transport/ray" ) @@ -74,8 +75,7 @@ func (m *ClientManager) onClientFinish() { type Client struct { sessionManager *SessionManager inboundRay ray.InboundRay - ctx context.Context - cancel context.CancelFunc + done *signal.Done manager *ClientManager concurrency uint32 } @@ -85,26 +85,26 @@ var muxCoolPort = net.Port(9527) // NewClient creates a new mux.Client. func NewClient(p proxy.Outbound, dialer proxy.Dialer, m *ClientManager) (*Client, error) { - ctx, cancel := context.WithCancel(context.Background()) - ctx = proxy.ContextWithTarget(ctx, net.TCPDestination(muxCoolAddress, muxCoolPort)) + ctx := proxy.ContextWithTarget(context.Background(), net.TCPDestination(muxCoolAddress, muxCoolPort)) + ctx, cancel := context.WithCancel(ctx) pipe := ray.NewRay(ctx) - go func() { - if err := p.Process(ctx, pipe, dialer); err != nil { - cancel() - - errors.New("failed to handler mux client connection").Base(err).WriteToLog() - } - }() - c := &Client{ sessionManager: NewSessionManager(), inboundRay: pipe, - ctx: ctx, - cancel: cancel, + done: signal.NewDone(), manager: m, concurrency: m.config.Concurrency, } + + go func() { + if err := p.Process(ctx, pipe, dialer); err != nil { + errors.New("failed to handler mux client connection").Base(err).WriteToLog() + } + c.done.Close() + cancel() + }() + go c.fetchOutput() go c.monitor() return c, nil @@ -112,12 +112,7 @@ func NewClient(p proxy.Outbound, dialer proxy.Dialer, m *ClientManager) (*Client // Closed returns true if this Client is closed. func (m *Client) Closed() bool { - select { - case <-m.ctx.Done(): - return true - default: - return false - } + return m.done.Done() } func (m *Client) monitor() { @@ -128,7 +123,7 @@ func (m *Client) monitor() { for { select { - case <-m.ctx.Done(): + case <-m.done.C(): m.sessionManager.Close() m.inboundRay.InboundInput().Close() m.inboundRay.InboundOutput().CloseError() @@ -136,7 +131,8 @@ func (m *Client) monitor() { case <-timer.C: size := m.sessionManager.Size() if size == 0 && m.sessionManager.CloseIfNoSession() { - m.cancel() + m.done.Close() + return } } } @@ -170,10 +166,8 @@ func (m *Client) Dispatch(ctx context.Context, outboundRay ray.OutboundRay) bool return false } - select { - case <-m.ctx.Done(): + if m.done.Done() { return false - default: } s := sm.Allocate() @@ -226,7 +220,7 @@ func (m *Client) handleStatusEnd(meta *FrameMetadata, reader *buf.BufferedReader } func (m *Client) fetchOutput() { - defer m.cancel() + defer m.done.Close() reader := buf.NewBufferedReader(m.inboundRay.InboundOutput()) diff --git a/app/proxyman/outbound/handler.go b/app/proxyman/outbound/handler.go index 9ef25c160..98a1d979b 100644 --- a/app/proxyman/outbound/handler.go +++ b/app/proxyman/outbound/handler.go @@ -93,8 +93,6 @@ func (h *Handler) Dispatch(ctx context.Context, outboundRay ray.OutboundRay) { } } -var zeroAddr net.Addr = &net.TCPAddr{IP: []byte{0, 0, 0, 0}, Port: 0} - // Dial implements proxy.Dialer.Dial(). func (h *Handler) Dial(ctx context.Context, dest net.Destination) (internet.Connection, error) { if h.senderSettings != nil { @@ -124,14 +122,17 @@ func (h *Handler) Dial(ctx context.Context, dest net.Destination) (internet.Conn return internet.Dial(ctx, dest) } +// GetOutbound implements proxy.GetOutbound. func (h *Handler) GetOutbound() proxy.Outbound { return h.proxy } +// Start implements common.Runnable. func (h *Handler) Start() error { return nil } +// Close implements common.Runnable. func (h *Handler) Close() error { return nil }