1
0
mirror of https://github.com/v2fly/v2ray-core.git synced 2026-02-14 04:05:22 -05:00

massive refactoring for interoperability

This commit is contained in:
Darien Raymond
2018-01-10 12:22:37 +01:00
parent 5a3c7fdd20
commit 292d7cc353
66 changed files with 1515 additions and 1200 deletions

View File

@@ -4,10 +4,9 @@ package dokodemo
import (
"context"
"time"
"v2ray.com/core/app"
"v2ray.com/core/app/dispatcher"
"v2ray.com/core/app/policy"
"v2ray.com/core"
"v2ray.com/core/common"
"v2ray.com/core/common/buf"
"v2ray.com/core/common/net"
@@ -18,36 +17,29 @@ import (
)
type DokodemoDoor struct {
config *Config
address net.Address
port net.Port
policy policy.Policy
policyManager core.PolicyManager
config *Config
address net.Address
port net.Port
v *core.Instance
}
func New(ctx context.Context, config *Config) (*DokodemoDoor, error) {
space := app.SpaceFromContext(ctx)
if space == nil {
return nil, newError("no space in context")
}
if config.NetworkList == nil || config.NetworkList.Size() == 0 {
return nil, newError("no network specified")
}
d := &DokodemoDoor{
config: config,
address: config.GetPredefinedAddress(),
port: net.Port(config.Port),
v := core.FromContext(ctx)
if v == nil {
return nil, newError("V is not in context.")
}
space.On(app.SpaceInitializing, func(interface{}) error {
pm := policy.FromSpace(space)
if pm == nil {
return newError("Policy not found in space.")
}
d.policy = pm.GetPolicy(config.UserLevel)
if config.Timeout > 0 && config.UserLevel == 0 {
d.policy.Timeout.ConnectionIdle.Value = config.Timeout
}
return nil
})
d := &DokodemoDoor{
config: config,
address: config.GetPredefinedAddress(),
port: net.Port(config.Port),
policyManager: v.PolicyManager(),
}
return d, nil
}
@@ -55,7 +47,16 @@ func (d *DokodemoDoor) Network() net.NetworkList {
return *(d.config.NetworkList)
}
func (d *DokodemoDoor) Process(ctx context.Context, network net.Network, conn internet.Connection, dispatcher dispatcher.Interface) error {
func (d *DokodemoDoor) policy() core.Policy {
config := d.config
p := d.policyManager.ForLevel(config.UserLevel)
if config.Timeout > 0 && config.UserLevel == 0 {
p.Timeouts.ConnectionIdle = time.Duration(config.Timeout) * time.Second
}
return p
}
func (d *DokodemoDoor) Process(ctx context.Context, network net.Network, conn internet.Connection, dispatcher core.Dispatcher) error {
newError("processing connection from: ", conn.RemoteAddr()).AtDebug().WriteToLog()
dest := net.Destination{
Network: network,
@@ -72,7 +73,7 @@ func (d *DokodemoDoor) Process(ctx context.Context, network net.Network, conn in
}
ctx, cancel := context.WithCancel(ctx)
timer := signal.CancelAfterInactivity(ctx, cancel, d.policy.Timeout.ConnectionIdle.Duration())
timer := signal.CancelAfterInactivity(ctx, cancel, d.policy().Timeouts.ConnectionIdle)
inboundRay, err := dispatcher.Dispatch(ctx, dest)
if err != nil {
@@ -88,7 +89,7 @@ func (d *DokodemoDoor) Process(ctx context.Context, network net.Network, conn in
return newError("failed to transport request").Base(err)
}
timer.SetTimeout(d.policy.Timeout.DownlinkOnly.Duration())
timer.SetTimeout(d.policy().Timeouts.DownlinkOnly)
return nil
})
@@ -115,7 +116,7 @@ func (d *DokodemoDoor) Process(ctx context.Context, network net.Network, conn in
return newError("failed to transport response").Base(err)
}
timer.SetTimeout(d.policy.Timeout.UplinkOnly.Duration())
timer.SetTimeout(d.policy().Timeouts.UplinkOnly)
return nil
})

View File

@@ -4,9 +4,9 @@ package freedom
import (
"context"
"time"
"v2ray.com/core/app"
"v2ray.com/core/app/policy"
"v2ray.com/core"
"v2ray.com/core/common"
"v2ray.com/core/common/buf"
"v2ray.com/core/common/dice"
@@ -20,37 +20,35 @@ import (
// Handler handles Freedom connections.
type Handler struct {
domainStrategy Config_DomainStrategy
timeout uint32
destOverride *DestinationOverride
policy policy.Policy
policyManager core.PolicyManager
dns core.DNSClient
config Config
}
// New creates a new Freedom handler.
func New(ctx context.Context, config *Config) (*Handler, error) {
space := app.SpaceFromContext(ctx)
if space == nil {
return nil, newError("no space in context")
v := core.FromContext(ctx)
if v == nil {
return nil, newError("V is not found in context.")
}
f := &Handler{
domainStrategy: config.DomainStrategy,
timeout: config.Timeout,
destOverride: config.DestinationOverride,
config: *config,
policyManager: v.PolicyManager(),
dns: v.DNSClient(),
}
space.On(app.SpaceInitializing, func(interface{}) error {
pm := policy.FromSpace(space)
if pm == nil {
return newError("Policy not found in space.")
}
f.policy = pm.GetPolicy(config.UserLevel)
if config.Timeout > 0 && config.UserLevel == 0 {
f.policy.Timeout.ConnectionIdle.Value = config.Timeout
}
return nil
})
return f, nil
}
func (h *Handler) policy() core.Policy {
p := h.policyManager.ForLevel(h.config.UserLevel)
if h.config.Timeout > 0 && h.config.UserLevel == 0 {
p.Timeouts.ConnectionIdle = time.Duration(h.config.Timeout) * time.Second
}
return p
}
func (h *Handler) resolveIP(ctx context.Context, domain string) net.Address {
if resolver, ok := proxy.ResolvedIPsFromContext(ctx); ok {
ips := resolver.Resolve()
@@ -60,7 +58,7 @@ func (h *Handler) resolveIP(ctx context.Context, domain string) net.Address {
return ips[dice.Roll(len(ips))]
}
ips, err := net.LookupIP(domain)
ips, err := h.dns.LookupIP(domain)
if err != nil {
newError("failed to get IP address for domain ", domain).Base(err).WriteToLog()
}
@@ -73,8 +71,8 @@ func (h *Handler) resolveIP(ctx context.Context, domain string) net.Address {
// Process implements proxy.Outbound.
func (h *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay, dialer proxy.Dialer) error {
destination, _ := proxy.TargetFromContext(ctx)
if h.destOverride != nil {
server := h.destOverride.Server
if h.config.DestinationOverride != nil {
server := h.config.DestinationOverride.Server
destination = net.Destination{
Network: destination.Network,
Address: server.Address.AsAddress(),
@@ -86,7 +84,7 @@ func (h *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay, dial
input := outboundRay.OutboundInput()
output := outboundRay.OutboundOutput()
if h.domainStrategy == Config_USE_IP && destination.Address.Family().IsDomain() {
if h.config.DomainStrategy == Config_USE_IP && destination.Address.Family().IsDomain() {
ip := h.resolveIP(ctx, destination.Address.Domain())
if ip != nil {
destination = net.Destination{
@@ -113,7 +111,7 @@ func (h *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay, dial
defer conn.Close()
ctx, cancel := context.WithCancel(ctx)
timer := signal.CancelAfterInactivity(ctx, cancel, h.policy.Timeout.ConnectionIdle.Duration())
timer := signal.CancelAfterInactivity(ctx, cancel, h.policy().Timeouts.ConnectionIdle)
requestDone := signal.ExecuteAsync(func() error {
var writer buf.Writer
@@ -125,7 +123,7 @@ func (h *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay, dial
if err := buf.Copy(input, writer, buf.UpdateActivity(timer)); err != nil {
return newError("failed to process request").Base(err)
}
timer.SetTimeout(h.policy.Timeout.DownlinkOnly.Duration())
timer.SetTimeout(h.policy().Timeouts.DownlinkOnly)
return nil
})
@@ -136,7 +134,7 @@ func (h *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay, dial
if err := buf.Copy(v2reader, output, buf.UpdateActivity(timer)); err != nil {
return newError("failed to process response").Base(err)
}
timer.SetTimeout(h.policy.Timeout.UplinkOnly.Duration())
timer.SetTimeout(h.policy().Timeouts.UplinkOnly)
return nil
})

View File

@@ -10,9 +10,7 @@ import (
"strings"
"time"
"v2ray.com/core/app"
"v2ray.com/core/app/dispatcher"
"v2ray.com/core/app/policy"
"v2ray.com/core"
"v2ray.com/core/common"
"v2ray.com/core/common/buf"
"v2ray.com/core/common/errors"
@@ -26,32 +24,31 @@ import (
// Server is a HTTP proxy server.
type Server struct {
config *ServerConfig
policy policy.Policy
v *core.Instance
}
// NewServer creates a new HTTP inbound handler.
func NewServer(ctx context.Context, config *ServerConfig) (*Server, error) {
space := app.SpaceFromContext(ctx)
if space == nil {
return nil, newError("no space in context.")
}
s := &Server{
config: config,
v: core.FromContext(ctx),
}
space.On(app.SpaceInitializing, func(interface{}) error {
pm := policy.FromSpace(space)
if pm == nil {
return newError("Policy not found in space.")
}
s.policy = pm.GetPolicy(config.UserLevel)
if config.Timeout > 0 && config.UserLevel == 0 {
s.policy.Timeout.ConnectionIdle.Value = config.Timeout
}
return nil
})
if s.v == nil {
return nil, newError("V is not in context.")
}
return s, nil
}
func (s *Server) policy() core.Policy {
config := s.config
p := s.v.PolicyManager().ForLevel(config.UserLevel)
if config.Timeout > 0 && config.UserLevel == 0 {
p.Timeouts.ConnectionIdle = time.Duration(config.Timeout) * time.Second
}
return p
}
func (*Server) Network() net.NetworkList {
return net.NetworkList{
Network: []net.Network{net.Network_TCP},
@@ -104,11 +101,11 @@ type readerOnly struct {
io.Reader
}
func (s *Server) Process(ctx context.Context, network net.Network, conn internet.Connection, dispatcher dispatcher.Interface) error {
func (s *Server) Process(ctx context.Context, network net.Network, conn internet.Connection, dispatcher core.Dispatcher) error {
reader := bufio.NewReaderSize(readerOnly{conn}, buf.Size)
Start:
conn.SetReadDeadline(time.Now().Add(s.policy.Timeout.Handshake.Duration()))
conn.SetReadDeadline(time.Now().Add(s.policy().Timeouts.Handshake))
request, err := http.ReadRequest(reader)
if err != nil {
@@ -165,14 +162,14 @@ Start:
return err
}
func (s *Server) handleConnect(ctx context.Context, request *http.Request, reader *bufio.Reader, conn internet.Connection, dest net.Destination, dispatcher dispatcher.Interface) error {
func (s *Server) handleConnect(ctx context.Context, request *http.Request, reader *bufio.Reader, conn internet.Connection, dest net.Destination, dispatcher core.Dispatcher) error {
_, err := conn.Write([]byte("HTTP/1.1 200 Connection established\r\n\r\n"))
if err != nil {
return newError("failed to write back OK response").Base(err)
}
ctx, cancel := context.WithCancel(ctx)
timer := signal.CancelAfterInactivity(ctx, cancel, s.policy.Timeout.ConnectionIdle.Duration())
timer := signal.CancelAfterInactivity(ctx, cancel, s.policy().Timeouts.ConnectionIdle)
ray, err := dispatcher.Dispatch(ctx, dest)
if err != nil {
return err
@@ -191,7 +188,7 @@ func (s *Server) handleConnect(ctx context.Context, request *http.Request, reade
requestDone := signal.ExecuteAsync(func() error {
defer ray.InboundInput().Close()
defer timer.SetTimeout(s.policy.Timeout.DownlinkOnly.Duration())
defer timer.SetTimeout(s.policy().Timeouts.DownlinkOnly)
v2reader := buf.NewReader(conn)
return buf.Copy(v2reader, ray.InboundInput(), buf.UpdateActivity(timer))
@@ -202,7 +199,7 @@ func (s *Server) handleConnect(ctx context.Context, request *http.Request, reade
if err := buf.Copy(ray.InboundOutput(), v2writer, buf.UpdateActivity(timer)); err != nil {
return err
}
timer.SetTimeout(s.policy.Timeout.UplinkOnly.Duration())
timer.SetTimeout(s.policy().Timeouts.UplinkOnly)
return nil
})
@@ -217,7 +214,7 @@ func (s *Server) handleConnect(ctx context.Context, request *http.Request, reade
var errWaitAnother = newError("keep alive")
func (s *Server) handlePlainHTTP(ctx context.Context, request *http.Request, writer io.Writer, dest net.Destination, dispatcher dispatcher.Interface) error {
func (s *Server) handlePlainHTTP(ctx context.Context, request *http.Request, writer io.Writer, dest net.Destination, dispatcher core.Dispatcher) error {
if !s.config.AllowTransparent && len(request.URL.Host) <= 0 {
// RFC 2068 (HTTP/1.1) requires URL to be absolute URL in HTTP proxy.
response := &http.Response{

View File

@@ -10,7 +10,7 @@ package proxy
import (
"context"
"v2ray.com/core/app/dispatcher"
"v2ray.com/core"
"v2ray.com/core/common/net"
"v2ray.com/core/transport/internet"
"v2ray.com/core/transport/ray"
@@ -22,7 +22,7 @@ type Inbound interface {
Network() net.NetworkList
// Process processes a connection of given network. If necessary, the Inbound can dispatch the connection to an Outbound.
Process(context.Context, net.Network, internet.Connection, dispatcher.Interface) error
Process(context.Context, net.Network, internet.Connection, core.Dispatcher) error
}
// An Outbound process outbound connections.

View File

@@ -3,8 +3,7 @@ package shadowsocks
import (
"context"
"v2ray.com/core/app"
"v2ray.com/core/app/policy"
"v2ray.com/core"
"v2ray.com/core/common"
"v2ray.com/core/common/buf"
"v2ray.com/core/common/net"
@@ -18,8 +17,8 @@ import (
// Client is a inbound handler for Shadowsocks protocol
type Client struct {
serverPicker protocol.ServerPicker
policyManager policy.Manager
serverPicker protocol.ServerPicker
v *core.Instance
}
// NewClient create a new Shadowsocks client.
@@ -33,19 +32,11 @@ func NewClient(ctx context.Context, config *ClientConfig) (*Client, error) {
}
client := &Client{
serverPicker: protocol.NewRoundRobinServerPicker(serverList),
v: core.FromContext(ctx),
}
space := app.SpaceFromContext(ctx)
if space == nil {
return nil, newError("Space not found.")
if client.v == nil {
return nil, newError("V is not in context.")
}
space.On(app.SpaceInitializing, func(interface{}) error {
pm := policy.FromSpace(space)
if pm == nil {
return newError("Policy not found in space.")
}
client.policyManager = pm
return nil
})
return client, nil
}
@@ -103,9 +94,9 @@ func (v *Client) Process(ctx context.Context, outboundRay ray.OutboundRay, diale
request.Option |= RequestOptionOneTimeAuth
}
sessionPolicy := v.policyManager.GetPolicy(user.Level)
sessionPolicy := v.v.PolicyManager().ForLevel(user.Level)
ctx, cancel := context.WithCancel(ctx)
timer := signal.CancelAfterInactivity(ctx, cancel, sessionPolicy.Timeout.ConnectionIdle.Duration())
timer := signal.CancelAfterInactivity(ctx, cancel, sessionPolicy.Timeouts.ConnectionIdle)
if request.Command == protocol.RequestCommandTCP {
bufferedWriter := buf.NewBufferedWriter(buf.NewWriter(conn))
@@ -119,13 +110,13 @@ func (v *Client) Process(ctx context.Context, outboundRay ray.OutboundRay, diale
}
requestDone := signal.ExecuteAsync(func() error {
defer timer.SetTimeout(sessionPolicy.Timeout.DownlinkOnly.Duration())
defer timer.SetTimeout(sessionPolicy.Timeouts.DownlinkOnly)
return buf.Copy(outboundRay.OutboundInput(), bodyWriter, buf.UpdateActivity(timer))
})
responseDone := signal.ExecuteAsync(func() error {
defer outboundRay.OutboundOutput().Close()
defer timer.SetTimeout(sessionPolicy.Timeout.UplinkOnly.Duration())
defer timer.SetTimeout(sessionPolicy.Timeouts.UplinkOnly)
responseReader, err := ReadTCPResponse(user, conn)
if err != nil {

View File

@@ -4,9 +4,7 @@ import (
"context"
"time"
"v2ray.com/core/app"
"v2ray.com/core/app/dispatcher"
"v2ray.com/core/app/policy"
"v2ray.com/core"
"v2ray.com/core/common"
"v2ray.com/core/common/buf"
"v2ray.com/core/common/log"
@@ -19,18 +17,14 @@ import (
)
type Server struct {
config *ServerConfig
user *protocol.User
account *MemoryAccount
policyManager policy.Manager
config *ServerConfig
user *protocol.User
account *MemoryAccount
v *core.Instance
}
// NewServer create a new Shadowsocks server.
func NewServer(ctx context.Context, config *ServerConfig) (*Server, error) {
space := app.SpaceFromContext(ctx)
if space == nil {
return nil, newError("no space in context")
}
if config.GetUser() == nil {
return nil, newError("user is not specified")
}
@@ -45,16 +39,12 @@ func NewServer(ctx context.Context, config *ServerConfig) (*Server, error) {
config: config,
user: config.GetUser(),
account: account,
v: core.FromContext(ctx),
}
space.On(app.SpaceInitializing, func(interface{}) error {
pm := policy.FromSpace(space)
if pm == nil {
return newError("Policy not found in space.")
}
s.policyManager = pm
return nil
})
if s.v == nil {
return nil, newError("V is not in context.")
}
return s, nil
}
@@ -69,7 +59,7 @@ func (s *Server) Network() net.NetworkList {
return list
}
func (s *Server) Process(ctx context.Context, network net.Network, conn internet.Connection, dispatcher dispatcher.Interface) error {
func (s *Server) Process(ctx context.Context, network net.Network, conn internet.Connection, dispatcher core.Dispatcher) error {
switch network {
case net.Network_TCP:
return s.handleConnection(ctx, conn, dispatcher)
@@ -80,7 +70,7 @@ func (s *Server) Process(ctx context.Context, network net.Network, conn internet
}
}
func (s *Server) handlerUDPPayload(ctx context.Context, conn internet.Connection, dispatcher dispatcher.Interface) error {
func (s *Server) handlerUDPPayload(ctx context.Context, conn internet.Connection, dispatcher core.Dispatcher) error {
udpServer := udp.NewDispatcher(dispatcher)
reader := buf.NewReader(conn)
@@ -148,9 +138,9 @@ func (s *Server) handlerUDPPayload(ctx context.Context, conn internet.Connection
return nil
}
func (s *Server) handleConnection(ctx context.Context, conn internet.Connection, dispatcher dispatcher.Interface) error {
sessionPolicy := s.policyManager.GetPolicy(s.user.Level)
conn.SetReadDeadline(time.Now().Add(sessionPolicy.Timeout.Handshake.Duration()))
func (s *Server) handleConnection(ctx context.Context, conn internet.Connection, dispatcher core.Dispatcher) error {
sessionPolicy := s.v.PolicyManager().ForLevel(s.user.Level)
conn.SetReadDeadline(time.Now().Add(sessionPolicy.Timeouts.Handshake))
bufferedReader := buf.NewBufferedReader(buf.NewReader(conn))
request, bodyReader, err := ReadTCPSession(s.user, bufferedReader)
if err != nil {
@@ -178,7 +168,7 @@ func (s *Server) handleConnection(ctx context.Context, conn internet.Connection,
ctx = protocol.ContextWithUser(ctx, request.User)
ctx, cancel := context.WithCancel(ctx)
timer := signal.CancelAfterInactivity(ctx, cancel, sessionPolicy.Timeout.ConnectionIdle.Duration())
timer := signal.CancelAfterInactivity(ctx, cancel, sessionPolicy.Timeouts.ConnectionIdle)
ray, err := dispatcher.Dispatch(ctx, dest)
if err != nil {
return err
@@ -208,7 +198,7 @@ func (s *Server) handleConnection(ctx context.Context, conn internet.Connection,
return newError("failed to transport all TCP response").Base(err)
}
timer.SetTimeout(sessionPolicy.Timeout.UplinkOnly.Duration())
timer.SetTimeout(sessionPolicy.Timeouts.UplinkOnly)
return nil
})
@@ -219,7 +209,7 @@ func (s *Server) handleConnection(ctx context.Context, conn internet.Connection,
if err := buf.Copy(bodyReader, ray.InboundInput(), buf.UpdateActivity(timer)); err != nil {
return newError("failed to transport all TCP request").Base(err)
}
timer.SetTimeout(sessionPolicy.Timeout.DownlinkOnly.Duration())
timer.SetTimeout(sessionPolicy.Timeouts.DownlinkOnly)
return nil
})

View File

@@ -5,9 +5,7 @@ import (
"io"
"time"
"v2ray.com/core/app"
"v2ray.com/core/app/dispatcher"
"v2ray.com/core/app/policy"
"v2ray.com/core"
"v2ray.com/core/common"
"v2ray.com/core/common/buf"
"v2ray.com/core/common/log"
@@ -22,32 +20,30 @@ import (
// Server is a SOCKS 5 proxy server
type Server struct {
config *ServerConfig
policy policy.Policy
v *core.Instance
}
// NewServer creates a new Server object.
func NewServer(ctx context.Context, config *ServerConfig) (*Server, error) {
space := app.SpaceFromContext(ctx)
if space == nil {
return nil, newError("no space in context").AtWarning()
}
s := &Server{
config: config,
v: core.FromContext(ctx),
}
if s.v == nil {
return nil, newError("V is not in context.")
}
space.On(app.SpaceInitializing, func(interface{}) error {
pm := policy.FromSpace(space)
if pm == nil {
return newError("Policy not found in space.")
}
s.policy = pm.GetPolicy(config.UserLevel)
if config.Timeout > 0 && config.UserLevel == 0 {
s.policy.Timeout.ConnectionIdle.Value = config.Timeout
}
return nil
})
return s, nil
}
func (s *Server) policy() core.Policy {
config := s.config
p := s.v.PolicyManager().ForLevel(config.UserLevel)
if config.Timeout > 0 && config.UserLevel == 0 {
p.Timeouts.ConnectionIdle = time.Duration(config.Timeout) * time.Second
}
return p
}
func (s *Server) Network() net.NetworkList {
list := net.NetworkList{
Network: []net.Network{net.Network_TCP},
@@ -58,7 +54,7 @@ func (s *Server) Network() net.NetworkList {
return list
}
func (s *Server) Process(ctx context.Context, network net.Network, conn internet.Connection, dispatcher dispatcher.Interface) error {
func (s *Server) Process(ctx context.Context, network net.Network, conn internet.Connection, dispatcher core.Dispatcher) error {
switch network {
case net.Network_TCP:
return s.processTCP(ctx, conn, dispatcher)
@@ -69,8 +65,8 @@ func (s *Server) Process(ctx context.Context, network net.Network, conn internet
}
}
func (s *Server) processTCP(ctx context.Context, conn internet.Connection, dispatcher dispatcher.Interface) error {
conn.SetReadDeadline(time.Now().Add(s.policy.Timeout.Handshake.Duration()))
func (s *Server) processTCP(ctx context.Context, conn internet.Connection, dispatcher core.Dispatcher) error {
conn.SetReadDeadline(time.Now().Add(s.policy().Timeouts.Handshake))
reader := buf.NewBufferedReader(buf.NewReader(conn))
inboundDest, ok := proxy.InboundEntryPointFromContext(ctx)
@@ -125,9 +121,9 @@ func (*Server) handleUDP(c net.Conn) error {
return err
}
func (v *Server) transport(ctx context.Context, reader io.Reader, writer io.Writer, dest net.Destination, dispatcher dispatcher.Interface) error {
func (v *Server) transport(ctx context.Context, reader io.Reader, writer io.Writer, dest net.Destination, dispatcher core.Dispatcher) error {
ctx, cancel := context.WithCancel(ctx)
timer := signal.CancelAfterInactivity(ctx, cancel, v.policy.Timeout.ConnectionIdle.Duration())
timer := signal.CancelAfterInactivity(ctx, cancel, v.policy().Timeouts.ConnectionIdle)
ray, err := dispatcher.Dispatch(ctx, dest)
if err != nil {
@@ -144,7 +140,7 @@ func (v *Server) transport(ctx context.Context, reader io.Reader, writer io.Writ
if err := buf.Copy(v2reader, input, buf.UpdateActivity(timer)); err != nil {
return newError("failed to transport all TCP request").Base(err)
}
timer.SetTimeout(v.policy.Timeout.DownlinkOnly.Duration())
timer.SetTimeout(v.policy().Timeouts.DownlinkOnly)
return nil
})
@@ -153,7 +149,7 @@ func (v *Server) transport(ctx context.Context, reader io.Reader, writer io.Writ
if err := buf.Copy(output, v2writer, buf.UpdateActivity(timer)); err != nil {
return newError("failed to transport all TCP response").Base(err)
}
timer.SetTimeout(v.policy.Timeout.UplinkOnly.Duration())
timer.SetTimeout(v.policy().Timeouts.UplinkOnly)
return nil
})
@@ -166,7 +162,7 @@ func (v *Server) transport(ctx context.Context, reader io.Reader, writer io.Writ
return nil
}
func (v *Server) handleUDPPayload(ctx context.Context, conn internet.Connection, dispatcher dispatcher.Interface) error {
func (v *Server) handleUDPPayload(ctx context.Context, conn internet.Connection, dispatcher core.Dispatcher) error {
udpServer := udp.NewDispatcher(dispatcher)
if source, ok := proxy.SourceFromContext(ctx); ok {

View File

@@ -8,10 +8,7 @@ import (
"sync"
"time"
"v2ray.com/core/app"
"v2ray.com/core/app/dispatcher"
"v2ray.com/core/app/policy"
"v2ray.com/core/app/proxyman"
"v2ray.com/core"
"v2ray.com/core/common"
"v2ray.com/core/common/buf"
"v2ray.com/core/common/errors"
@@ -74,21 +71,16 @@ func (v *userByEmail) Get(email string) (*protocol.User, bool) {
// Handler is an inbound connection handler that handles messages in VMess protocol.
type Handler struct {
inboundHandlerManager proxyman.InboundHandlerManager
policyManager core.PolicyManager
inboundHandlerManager core.InboundHandlerManager
clients protocol.UserValidator
usersByEmail *userByEmail
detours *DetourConfig
sessionHistory *encoding.SessionHistory
policyManager policy.Manager
}
// New creates a new VMess inbound handler.
func New(ctx context.Context, config *Config) (*Handler, error) {
space := app.SpaceFromContext(ctx)
if space == nil {
return nil, newError("no space in context")
}
allowedClients := vmess.NewTimedUserValidator(ctx, protocol.DefaultIDHash)
for _, user := range config.User {
if err := allowedClients.Add(user); err != nil {
@@ -96,24 +88,19 @@ func New(ctx context.Context, config *Config) (*Handler, error) {
}
}
handler := &Handler{
clients: allowedClients,
detours: config.Detour,
usersByEmail: newUserByEmail(config.User, config.GetDefaultValue()),
sessionHistory: encoding.NewSessionHistory(ctx),
v := core.FromContext(ctx)
if v == nil {
return nil, newError("V is not in context.")
}
space.On(app.SpaceInitializing, func(interface{}) error {
handler.inboundHandlerManager = proxyman.InboundHandlerManagerFromSpace(space)
if handler.inboundHandlerManager == nil {
return newError("InboundHandlerManager is not found is space.")
}
handler.policyManager = policy.FromSpace(space)
if handler.policyManager == nil {
return newError("Policy is not found in space.")
}
return nil
})
handler := &Handler{
policyManager: v.PolicyManager(),
inboundHandlerManager: v.InboundHandlerManager(),
clients: allowedClients,
detours: config.Detour,
usersByEmail: newUserByEmail(config.User, config.GetDefaultValue()),
sessionHistory: encoding.NewSessionHistory(ctx),
}
return handler, nil
}
@@ -179,9 +166,9 @@ func transferResponse(timer signal.ActivityUpdater, session *encoding.ServerSess
}
// Process implements proxy.Inbound.Process().
func (h *Handler) Process(ctx context.Context, network net.Network, connection internet.Connection, dispatcher dispatcher.Interface) error {
sessionPolicy := h.policyManager.GetPolicy(0)
if err := connection.SetReadDeadline(time.Now().Add(sessionPolicy.Timeout.Handshake.Duration())); err != nil {
func (h *Handler) Process(ctx context.Context, network net.Network, connection internet.Connection, dispatcher core.Dispatcher) error {
sessionPolicy := h.policyManager.ForLevel(0)
if err := connection.SetReadDeadline(time.Now().Add(sessionPolicy.Timeouts.Handshake)); err != nil {
return newError("unable to set read deadline").Base(err).AtWarning()
}
@@ -221,11 +208,11 @@ func (h *Handler) Process(ctx context.Context, network net.Network, connection i
newError("unable to set back read deadline").Base(err).WriteToLog()
}
sessionPolicy = h.policyManager.GetPolicy(request.User.Level)
sessionPolicy = h.policyManager.ForLevel(request.User.Level)
ctx = protocol.ContextWithUser(ctx, request.User)
ctx, cancel := context.WithCancel(ctx)
timer := signal.CancelAfterInactivity(ctx, cancel, sessionPolicy.Timeout.ConnectionIdle.Duration())
timer := signal.CancelAfterInactivity(ctx, cancel, sessionPolicy.Timeouts.ConnectionIdle)
ray, err := dispatcher.Dispatch(ctx, request.Destination())
if err != nil {
return newError("failed to dispatch request to ", request.Destination()).Base(err)
@@ -235,14 +222,14 @@ func (h *Handler) Process(ctx context.Context, network net.Network, connection i
output := ray.InboundOutput()
requestDone := signal.ExecuteAsync(func() error {
defer timer.SetTimeout(sessionPolicy.Timeout.DownlinkOnly.Duration())
defer timer.SetTimeout(sessionPolicy.Timeouts.DownlinkOnly)
return transferRequest(timer, session, request, reader, input)
})
responseDone := signal.ExecuteAsync(func() error {
writer := buf.NewBufferedWriter(buf.NewWriter(connection))
defer writer.Flush()
defer timer.SetTimeout(sessionPolicy.Timeout.UplinkOnly.Duration())
defer timer.SetTimeout(sessionPolicy.Timeouts.UplinkOnly)
response := &protocol.ResponseHeader{
Command: h.generateCommand(ctx, request),

View File

@@ -6,8 +6,7 @@ import (
"context"
"time"
"v2ray.com/core/app"
"v2ray.com/core/app/policy"
"v2ray.com/core"
"v2ray.com/core/common"
"v2ray.com/core/common/buf"
"v2ray.com/core/common/net"
@@ -23,17 +22,12 @@ import (
// Handler is an outbound connection handler for VMess protocol.
type Handler struct {
serverList *protocol.ServerList
serverPicker protocol.ServerPicker
policyManager policy.Manager
serverList *protocol.ServerList
serverPicker protocol.ServerPicker
v *core.Instance
}
func New(ctx context.Context, config *Config) (*Handler, error) {
space := app.SpaceFromContext(ctx)
if space == nil {
return nil, newError("no space in context.")
}
serverList := protocol.NewServerList()
for _, rec := range config.Receiver {
serverList.AddServer(protocol.NewServerSpecFromPB(*rec))
@@ -41,16 +35,12 @@ func New(ctx context.Context, config *Config) (*Handler, error) {
handler := &Handler{
serverList: serverList,
serverPicker: protocol.NewRoundRobinServerPicker(serverList),
v: core.FromContext(ctx),
}
space.On(app.SpaceInitializing, func(interface{}) error {
pm := policy.FromSpace(space)
if pm == nil {
return newError("Policy is not found in space.")
}
handler.policyManager = pm
return nil
})
if handler.v == nil {
return nil, newError("V is not in context.")
}
return handler, nil
}
@@ -112,10 +102,10 @@ func (v *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay, dial
output := outboundRay.OutboundOutput()
session := encoding.NewClientSession(protocol.DefaultIDHash)
sessionPolicy := v.policyManager.GetPolicy(request.User.Level)
sessionPolicy := v.v.PolicyManager().ForLevel(request.User.Level)
ctx, cancel := context.WithCancel(ctx)
timer := signal.CancelAfterInactivity(ctx, cancel, sessionPolicy.Timeout.ConnectionIdle.Duration())
timer := signal.CancelAfterInactivity(ctx, cancel, sessionPolicy.Timeouts.ConnectionIdle)
requestDone := signal.ExecuteAsync(func() error {
writer := buf.NewBufferedWriter(buf.NewWriter(conn))
@@ -148,13 +138,13 @@ func (v *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay, dial
return err
}
}
timer.SetTimeout(sessionPolicy.Timeout.DownlinkOnly.Duration())
timer.SetTimeout(sessionPolicy.Timeouts.DownlinkOnly)
return nil
})
responseDone := signal.ExecuteAsync(func() error {
defer output.Close()
defer timer.SetTimeout(sessionPolicy.Timeout.UplinkOnly.Duration())
defer timer.SetTimeout(sessionPolicy.Timeouts.UplinkOnly)
reader := buf.NewBufferedReader(buf.NewReader(conn))
header, err := session.DecodeResponseHeader(reader)