diff --git a/proxy/proxy.go b/proxy/proxy.go index bf7469ba7..f0c100481 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -18,11 +18,15 @@ type InboundHandlerMeta struct { Tag string Address v2net.Address Port v2net.Port + //Whether this proxy support KCP connections + KcpSupported bool } type OutboundHandlerMeta struct { Tag string Address v2net.Address + //Whether this proxy support KCP connections + KcpSupported bool } // An InboundHandler handles inbound network connections to V2Ray. diff --git a/proxy/vmess/inbound/inbound.go b/proxy/vmess/inbound/inbound.go index 715c33730..e084da2ed 100644 --- a/proxy/vmess/inbound/inbound.go +++ b/proxy/vmess/inbound/inbound.go @@ -106,7 +106,7 @@ func (this *VMessInboundHandler) Start() error { return nil } - tcpListener, err := hub.ListenTCP(this.meta.Address, this.meta.Port, this.HandleConnection, nil) + tcpListener, err := hub.ListenTCP6(this.meta.Address, this.meta.Port, this.HandleConnection, this.meta, nil) if err != nil { log.Error("Unable to listen tcp ", this.meta.Address, ":", this.meta.Port, ": ", err) return err @@ -220,7 +220,9 @@ func (this *VMessInboundHandler) HandleConnection(connection *hub.Connection) { readFinish.Lock() } - +func (this *VMessInboundHandler) setProxyCap() { + this.meta.KcpSupported = true +} func init() { internal.MustRegisterInboundHandlerCreator("vmess", func(space app.Space, rawConfig interface{}, meta *proxy.InboundHandlerMeta) (proxy.InboundHandler, error) { @@ -246,6 +248,8 @@ func init() { handler.inboundHandlerManager = space.GetApp(proxyman.APP_ID_INBOUND_MANAGER).(proxyman.InboundHandlerManager) } + handler.setProxyCap() + return handler, nil }) } diff --git a/proxy/vmess/outbound/outbound.go b/proxy/vmess/outbound/outbound.go index 7699996b2..517f4f633 100644 --- a/proxy/vmess/outbound/outbound.go +++ b/proxy/vmess/outbound/outbound.go @@ -34,7 +34,7 @@ func (this *VMessOutboundHandler) Dispatch(target v2net.Destination, payload *al err := retry.Timed(5, 100).On(func() error { rec = this.receiverManager.PickReceiver() - rawConn, err := hub.Dial(this.meta.Address, rec.Destination) + rawConn, err := hub.Dial3(this.meta.Address, rec.Destination, this.meta) if err != nil { return err } @@ -154,14 +154,21 @@ func (this *VMessOutboundHandler) handleResponse(session *raw.ClientSession, con return } - +func (this *VMessOutboundHandler) setProxyCap() { + this.meta.KcpSupported = true +} func init() { internal.MustRegisterOutboundHandlerCreator("vmess", func(space app.Space, rawConfig interface{}, meta *proxy.OutboundHandlerMeta) (proxy.OutboundHandler, error) { vOutConfig := rawConfig.(*Config) - return &VMessOutboundHandler{ + + handler := &VMessOutboundHandler{ receiverManager: NewReceiverManager(vOutConfig.Receivers), meta: meta, - }, nil + } + + handler.setProxyCap() + + return handler, nil }) } diff --git a/transport/config.go b/transport/config.go index f96021af8..7ea9beccf 100644 --- a/transport/config.go +++ b/transport/config.go @@ -1,8 +1,12 @@ package transport +import "github.com/v2ray/v2ray-core/transport/hub/kcpv" + // Config for V2Ray transport layer. type Config struct { ConnectionReuse bool + enableKcp bool + kcpConfig *kcpv.Config } // Apply applies this Config. @@ -10,5 +14,17 @@ func (this *Config) Apply() error { if this.ConnectionReuse { connectionReuse = true } + enableKcp = this.enableKcp + if enableKcp { + KcpConfig = this.kcpConfig + /* + KCP do not support connectionReuse, + it is mandatory to set connectionReuse to false + Since KCP have no handshake and + does not SlowStart, there isn't benefit to + use that anyway. + */ + connectionReuse = false + } return nil } diff --git a/transport/config_json.go b/transport/config_json.go index 95fd56e02..4328aaa0d 100644 --- a/transport/config_json.go +++ b/transport/config_json.go @@ -2,18 +2,38 @@ package transport -import "encoding/json" +import ( + "encoding/json" + + "github.com/v2ray/v2ray-core/common/log" + "github.com/v2ray/v2ray-core/transport/hub/kcpv" +) func (this *Config) UnmarshalJSON(data []byte) error { type JsonConfig struct { - ConnectionReuse bool `json:"connectionReuse"` + ConnectionReuse bool `json:"connectionReuse"` + EnableKcp bool `json:"EnableKCP,omitempty"` + KcpConfig *kcpv.Config `json:"KcpConfig,omitempty"` } jsonConfig := &JsonConfig{ ConnectionReuse: true, + EnableKcp: false, } if err := json.Unmarshal(data, jsonConfig); err != nil { return err } this.ConnectionReuse = jsonConfig.ConnectionReuse + this.enableKcp = jsonConfig.EnableKcp + if jsonConfig.KcpConfig != nil { + this.kcpConfig = jsonConfig.KcpConfig + if jsonConfig.KcpConfig.AdvancedConfigs == nil { + jsonConfig.KcpConfig.AdvancedConfigs = kcpv.DefaultAdvancedConfigs + } + } else { + if jsonConfig.EnableKcp { + log.Error("transport: You have enabled KCP but no configure is given") + } + } + return nil } diff --git a/transport/hub/dialer.go b/transport/hub/dialer.go index 86ffa032b..e4c73c06a 100644 --- a/transport/hub/dialer.go +++ b/transport/hub/dialer.go @@ -6,6 +6,7 @@ import ( "time" v2net "github.com/v2ray/v2ray-core/common/net" + "github.com/v2ray/v2ray-core/proxy" "github.com/v2ray/v2ray-core/transport" ) @@ -62,3 +63,40 @@ func DialWithoutCache(src v2net.Address, dest v2net.Destination) (net.Conn, erro return dialer.Dial(dest.Network().String(), dest.NetAddr()) } + +func Dial3(src v2net.Address, dest v2net.Destination, proxyMeta *proxy.OutboundHandlerMeta) (*Connection, error) { + if proxyMeta.KcpSupported && transport.IsKcpEnabled() { + return DialKCP3(src, dest, proxyMeta) + } + return Dial(src, dest) +} +func DialWithoutCache3(src v2net.Address, dest v2net.Destination, proxyMeta *proxy.OutboundHandlerMeta) (net.Conn, error) { + if proxyMeta.KcpSupported && transport.IsKcpEnabled() { + return DialKCPWithoutCache(src, dest) + } + return DialWithoutCache(src, dest) +} + +func DialKCP3(src v2net.Address, dest v2net.Destination, proxyMeta *proxy.OutboundHandlerMeta) (*Connection, error) { + if src == nil { + src = v2net.AnyIP + } + id := src.String() + "-" + dest.NetAddr() + conn, err := DialWithoutCache3(src, dest, proxyMeta) + if err != nil { + return nil, err + } + return &Connection{ + dest: id, + conn: conn, + listener: globalCache, + }, nil +} + +/*DialKCPWithoutCache Dial KCP connection +This Dialer will ignore src this is a restriction +due to github.com/xtaci/kcp-go.DialWithOptions +*/ +func DialKCPWithoutCache(src v2net.Address, dest v2net.Destination) (net.Conn, error) { + return DialKCP(dest) +} diff --git a/transport/hub/kcp.go b/transport/hub/kcp.go new file mode 100644 index 000000000..818dc7426 --- /dev/null +++ b/transport/hub/kcp.go @@ -0,0 +1,193 @@ +package hub + +import ( + "errors" + "net" + "time" + + "github.com/v2ray/v2ray-core/common/log" + v2net "github.com/v2ray/v2ray-core/common/net" + "github.com/v2ray/v2ray-core/transport" + "github.com/v2ray/v2ray-core/transport/hub/kcpv" + "github.com/xtaci/kcp-go" +) + +type KCPVlistener struct { + lst *kcp.Listener + conf *kcpv.Config + previousSocketid map[int]uint32 + previousSocketid_mapid int +} + +/*Accept Accept a KCP connection +Since KCP is stateless, if package deliver after it was closed, +It could be reconized as a new connection and call accept. +If we can detect that the connection is of such a kind, +we will discard that conn. +*/ +func (kvl *KCPVlistener) Accept() (net.Conn, error) { + conn, err := kvl.lst.Accept() + if err != nil { + return nil, err + } + + if kvl.previousSocketid == nil { + kvl.previousSocketid = make(map[int]uint32) + } + + var badbit bool = false + + for _, key := range kvl.previousSocketid { + if key == conn.GetConv() { + badbit = true + } + } + if badbit { + conn.Close() + return nil, errors.New("KCP:ConnDup, Don't worry~") + } else { + kvl.previousSocketid_mapid++ + kvl.previousSocketid[kvl.previousSocketid_mapid] = conn.GetConv() + /* + Here we assume that count(connection) < 512 + This won't always true. + More work might be necessary to deal with this in a better way. + */ + if kvl.previousSocketid_mapid >= 512 { + delete(kvl.previousSocketid, kvl.previousSocketid_mapid-512) + } + } + + kcv := &KCPVconn{hc: conn} + kcv.conf = kvl.conf + err = kcv.ApplyConf() + if err != nil { + return nil, err + } + return kcv, nil +} + +func (kvl *KCPVlistener) Close() error { + return kvl.lst.Close() +} + +func (kvl *KCPVlistener) Addr() net.Addr { + return kvl.lst.Addr() +} + +type KCPVconn struct { + hc *kcp.UDPSession + conf *kcpv.Config + conntokeep time.Time +} + +//var counter int + +func (kcpvc *KCPVconn) Read(b []byte) (int, error) { + ifb := time.Now().Add(time.Duration(kcpvc.conf.AdvancedConfigs.ReadTimeout) * time.Second) + if ifb.After(kcpvc.conntokeep) { + kcpvc.conntokeep = ifb + } + kcpvc.hc.SetDeadline(kcpvc.conntokeep) + return kcpvc.hc.Read(b) +} + +func (kcpvc *KCPVconn) Write(b []byte) (int, error) { + ifb := time.Now().Add(time.Duration(kcpvc.conf.AdvancedConfigs.WriteTimeout) * time.Second) + if ifb.After(kcpvc.conntokeep) { + kcpvc.conntokeep = ifb + } + kcpvc.hc.SetDeadline(kcpvc.conntokeep) + return kcpvc.hc.Write(b) +} + +/*ApplyConf will apply kcpvc.conf to current Socket + +It is recommmanded to call this func once and only once +*/ +func (kcpvc *KCPVconn) ApplyConf() error { + nodelay, interval, resend, nc := 0, 40, 0, 0 + if kcpvc.conf.Mode != "manual" { + switch kcpvc.conf.Mode { + case "normal": + nodelay, interval, resend, nc = 0, 30, 2, 1 + case "fast": + nodelay, interval, resend, nc = 0, 20, 2, 1 + case "fast2": + nodelay, interval, resend, nc = 1, 20, 2, 1 + case "fast3": + nodelay, interval, resend, nc = 1, 10, 2, 1 + } + } else { + log.Error("kcp: Failed to Apply configure: Manual mode is not supported.(yet!)") + return errors.New("kcp: Manual Not Implemented") + } + + kcpvc.hc.SetNoDelay(nodelay, interval, resend, nc) + kcpvc.hc.SetWindowSize(kcpvc.conf.AdvancedConfigs.Sndwnd, kcpvc.conf.AdvancedConfigs.Rcvwnd) + kcpvc.hc.SetMtu(kcpvc.conf.AdvancedConfigs.Mtu) + kcpvc.hc.SetACKNoDelay(kcpvc.conf.AdvancedConfigs.Acknodelay) + kcpvc.hc.SetDSCP(kcpvc.conf.AdvancedConfigs.Dscp) + //counter++ + //log.Info(counter) + return nil +} + +/*Close Close the current conn +We have to delay the close of Socket for a few second +or the VMess EOF can be too late to send. +*/ +func (kcpvc *KCPVconn) Close() error { + go func() { + time.Sleep(2000 * time.Millisecond) + //counter-- + //log.Info(counter) + kcpvc.hc.Close() + }() + return nil +} + +func (kcpvc *KCPVconn) LocalAddr() net.Addr { + return kcpvc.hc.LocalAddr() +} + +func (kcpvc *KCPVconn) RemoteAddr() net.Addr { + return kcpvc.hc.RemoteAddr() +} + +func (kcpvc *KCPVconn) SetDeadline(t time.Time) error { + return kcpvc.hc.SetDeadline(t) +} + +func (kcpvc *KCPVconn) SetReadDeadline(t time.Time) error { + return kcpvc.hc.SetReadDeadline(t) +} + +func (kcpvc *KCPVconn) SetWriteDeadline(t time.Time) error { + return kcpvc.hc.SetWriteDeadline(t) +} + +func DialKCP(dest v2net.Destination) (*KCPVconn, error) { + kcpconf := transport.KcpConfig + cpip, _ := kcpv.GetChipher(kcpconf.Key) + kcv, err := kcp.DialWithOptions(kcpconf.AdvancedConfigs.Fec, dest.NetAddr(), cpip) + if err != nil { + return nil, err + } + kcvn := &KCPVconn{hc: kcv} + kcvn.conf = kcpconf + err = kcvn.ApplyConf() + if err != nil { + return nil, err + } + return kcvn, nil +} + +func ListenKCP(address v2net.Address, port v2net.Port) (*KCPVlistener, error) { + kcpconf := transport.KcpConfig + cpip, _ := kcpv.GetChipher(kcpconf.Key) + laddr := address.String() + ":" + port.String() + kcl, err := kcp.ListenWithOptions(kcpconf.AdvancedConfigs.Fec, laddr, cpip) + kcvl := &KCPVlistener{lst: kcl, conf: kcpconf} + return kcvl, err +} diff --git a/transport/hub/kcp_test.go b/transport/hub/kcp_test.go new file mode 100644 index 000000000..30feb80b2 --- /dev/null +++ b/transport/hub/kcp_test.go @@ -0,0 +1,31 @@ +package hub_test + +import "testing" + +import ( + v2net "github.com/v2ray/v2ray-core/common/net" + "github.com/v2ray/v2ray-core/testing/assert" + "github.com/v2ray/v2ray-core/transport" + "github.com/v2ray/v2ray-core/transport/hub" + "github.com/v2ray/v2ray-core/transport/hub/kcpv" +) + +func Test_Pair(t *testing.T) { + assert := assert.On(t) + transport.KcpConfig = &kcpv.Config{} + transport.KcpConfig.Mode = "fast2" + transport.KcpConfig.Key = "key" + transport.KcpConfig.AdvancedConfigs = kcpv.DefaultAdvancedConfigs + lst, _ := hub.ListenKCP(v2net.ParseAddress("127.0.0.1"), 17777) + go func() { + connx, err2 := lst.Accept() + assert.Error(err2).IsNil() + connx.Close() + }() + conn, _ := hub.DialKCP(v2net.TCPDestination(v2net.ParseAddress("127.0.0.1"), 17777)) + conn.LocalAddr() + conn.RemoteAddr() + conn.ApplyConf() + conn.Write([]byte("x")) + conn.Close() +} diff --git a/transport/hub/kcpv/config.go b/transport/hub/kcpv/config.go new file mode 100644 index 000000000..9b162d308 --- /dev/null +++ b/transport/hub/kcpv/config.go @@ -0,0 +1,65 @@ +package kcpv + +/*AdvancedConfig define behavior of KCP in detail + +MaximumTransmissionUnit: +Largest protocol data unit that the layer can pass onwards +can be discovered by running tracepath + +SendingWindowSize , ReceivingWindowSize: +the size of Tx/Rx window, by packet + +ForwardErrorCorrectionGroupSize: +The the size of packet to generate a Forward Error Correction +packet, this is used to counteract packet loss. + +AcknowledgeNoDelay: +Do not wait a certain of time before sending the ACK packet, +increase bandwich cost and might increase performance + +Dscp: +Differentiated services code point, +be used to reconized to discriminate packet. +It is recommanded to keep it 0 to avoid being detected. + +ReadTimeout,WriteTimeout: +Close the Socket if it have been silent for too long, +Small value can recycle zombie socket faster but +can cause v2ray to kill the proxy connection it is relaying, +Higher value can prevent server from closing zombie socket and +waste resources. +*/ +type AdvancedConfig struct { + Mtu int `json:"MaximumTransmissionUnit"` + Sndwnd int `json:"SendingWindowSize"` + Rcvwnd int `json:"ReceivingWindowSize"` + Fec int `json:"ForwardErrorCorrectionGroupSize"` + Acknodelay bool `json:"AcknowledgeNoDelay"` + Dscp int `json:"Dscp"` + ReadTimeout int `json:"ReadTimeout"` + WriteTimeout int `json:"WriteTimeout"` +} + +/*Config define basic behavior of KCP +Mode: +can be one of these values: +fast3,fast2,fast,normal +<<<<<<- less delay +->>>>>> less bandwich wasted + +EncryptionKey: +a string that will be the EncryptionKey of +All KCP connection we Listen-Accpet or +Dial, We are not very sure about how this +encryption hehave and DO use a unique randomly +generated key. +*/ +type Config struct { + Mode string `json:"Mode"` + Key string `json:"EncryptionKey"` + AdvancedConfigs *AdvancedConfig `json:"AdvancedConfig,omitempty"` +} + +var DefaultAdvancedConfigs = &AdvancedConfig{ + Mtu: 1350, Sndwnd: 1024, Rcvwnd: 1024, Fec: 4, Dscp: 0, ReadTimeout: 600, WriteTimeout: 500, Acknodelay: false, +} diff --git a/transport/hub/kcpv/config_json.go b/transport/hub/kcpv/config_json.go new file mode 100644 index 000000000..66974c206 --- /dev/null +++ b/transport/hub/kcpv/config_json.go @@ -0,0 +1,3 @@ +package kcpv + +//We can use the default version of json parser diff --git a/transport/hub/kcpv/crypto.go b/transport/hub/kcpv/crypto.go new file mode 100644 index 000000000..049c9f2d3 --- /dev/null +++ b/transport/hub/kcpv/crypto.go @@ -0,0 +1,21 @@ +package kcpv + +import ( + "crypto/aes" + "crypto/cipher" + "crypto/sha256" +) + +func generateKeyFromConfigString(key string) []byte { + key += "consensus salt: Let's fight arcifical deceleration with our code. We shall prove our believes with action." + keyw := sha256.Sum256([]byte(key)) + return keyw[:] +} + +func generateBlockWithKey(key []byte) (cipher.Block, error) { + return aes.NewCipher(key) +} + +func GetChipher(key string) (cipher.Block, error) { + return generateBlockWithKey(generateKeyFromConfigString(key)) +} diff --git a/transport/hub/tcp.go b/transport/hub/tcp.go index 49b59d004..8f63d3ab5 100644 --- a/transport/hub/tcp.go +++ b/transport/hub/tcp.go @@ -8,6 +8,8 @@ import ( "github.com/v2ray/v2ray-core/common/log" v2net "github.com/v2ray/v2ray-core/common/net" + "github.com/v2ray/v2ray-core/proxy" + "github.com/v2ray/v2ray-core/transport" ) var ( @@ -47,6 +49,36 @@ func ListenTCP(address v2net.Address, port v2net.Port, callback ConnectionHandle go hub.start() return hub, nil } +func ListenKCPhub(address v2net.Address, port v2net.Port, callback ConnectionHandler, tlsConfig *tls.Config) (*TCPHub, error) { + listener, err := ListenKCP(address, port) + if err != nil { + return nil, err + } + var hub *TCPHub + if tlsConfig != nil { + tlsListener := tls.NewListener(listener, tlsConfig) + hub = &TCPHub{ + listener: tlsListener, + connCallback: callback, + } + } else { + hub = &TCPHub{ + listener: listener, + connCallback: callback, + } + } + + go hub.start() + return hub, nil +} +func ListenTCP6(address v2net.Address, port v2net.Port, callback ConnectionHandler, proxyMeta *proxy.InboundHandlerMeta, tlsConfig *tls.Config) (*TCPHub, error) { + if proxyMeta.KcpSupported && transport.IsKcpEnabled() { + return ListenKCPhub(address, port, callback, tlsConfig) + } else { + return ListenTCP(address, port, callback, tlsConfig) + } + return nil, errors.New("ListenTCP6: Not Implemented") +} func (this *TCPHub) Close() { this.accepting = false diff --git a/transport/transport.go b/transport/transport.go index 9d5cfc8f2..dee265fc4 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -1,10 +1,18 @@ package transport +import "github.com/v2ray/v2ray-core/transport/hub/kcpv" + var ( connectionReuse = true + enableKcp = false + KcpConfig *kcpv.Config ) // IsConnectionReusable returns true if V2Ray is trying to reuse TCP connections. func IsConnectionReusable() bool { return connectionReuse } + +func IsKcpEnabled() bool { + return enableKcp +}