diff --git a/app/dns/udpns.go b/app/dns/udpns.go index 4f021ea92..ffddde058 100644 --- a/app/dns/udpns.go +++ b/app/dns/udpns.go @@ -11,7 +11,7 @@ import ( "v2ray.com/core/common" "v2ray.com/core/common/buf" "v2ray.com/core/common/net" - "v2ray.com/core/common/signal" + "v2ray.com/core/common/signal/pubsub" "v2ray.com/core/common/task" "v2ray.com/core/transport/internet/udp" ) @@ -33,7 +33,7 @@ type ClassicNameServer struct { sync.RWMutex address net.Destination ips map[string][]IPRecord - updated signal.Notifier + pub *pubsub.Service udpServer *udp.Dispatcher cleanup *task.Periodic reqID uint32 @@ -46,6 +46,7 @@ func NewClassicNameServer(address net.Destination, dispatcher core.Dispatcher, c ips: make(map[string][]IPRecord), udpServer: udp.NewDispatcher(dispatcher), clientIP: clientIP, + pub: pubsub.NewService(), } s.cleanup = &task.Periodic{ Interval: time.Minute, @@ -96,7 +97,10 @@ func (s *ClassicNameServer) HandleResponse(payload *buf.Buffer) { now := time.Now() for _, rr := range msg.Answer { var ip net.IP - domain = rr.Header().Name + name := rr.Header().Name + if len(name) > 0 { + domain = rr.Header().Name + } ttl := rr.Header().Ttl switch rr := rr.(type) { case *dns.A: @@ -105,7 +109,7 @@ func (s *ClassicNameServer) HandleResponse(payload *buf.Buffer) { ip = rr.AAAA } if ttl == 0 { - ttl = 300 + ttl = 600 } if len(ip) > 0 { ips = append(ips, IPRecord{ @@ -133,7 +137,7 @@ func (s *ClassicNameServer) updateIP(domain string, ips []IPRecord) { } } s.ips[domain] = ips - s.updated.Signal() + s.pub.Publish(domain, nil) } func (s *ClassicNameServer) getMsgOptions() *dns.OPT { @@ -255,6 +259,9 @@ func (s *ClassicNameServer) QueryIP(ctx context.Context, domain string) ([]net.I return ips, nil } + sub := s.pub.Subscribe(fqdn) + defer sub.Close() + s.sendQuery(ctx, fqdn) for { @@ -266,7 +273,7 @@ func (s *ClassicNameServer) QueryIP(ctx context.Context, domain string) ([]net.I select { case <-ctx.Done(): return nil, ctx.Err() - case <-s.updated.Wait(): + case <-sub.Wait(): } } } diff --git a/common/signal/notifier.go b/common/signal/notifier.go index ef1a26931..19836e54f 100755 --- a/common/signal/notifier.go +++ b/common/signal/notifier.go @@ -1,47 +1,26 @@ package signal -import "sync" - // Notifier is a utility for notifying changes. The change producer may notify changes multiple time, and the consumer may get notified asynchronously. type Notifier struct { - sync.Mutex - waiters []chan struct{} - notCosumed bool + c chan struct{} } // NewNotifier creates a new Notifier. func NewNotifier() *Notifier { - return &Notifier{} + return &Notifier{ + c: make(chan struct{}, 1), + } } // Signal signals a change, usually by producer. This method never blocks. func (n *Notifier) Signal() { - n.Lock() - defer n.Unlock() - - if len(n.waiters) == 0 { - n.notCosumed = true - return + select { + case n.c <- struct{}{}: + default: } - - for _, w := range n.waiters { - close(w) - } - n.waiters = make([]chan struct{}, 0, 8) } -// Wait returns a channel for waiting for changes. +// Wait returns a channel for waiting for changes. The returned channel never gets closed. func (n *Notifier) Wait() <-chan struct{} { - n.Lock() - defer n.Unlock() - - w := make(chan struct{}) - if n.notCosumed { - n.notCosumed = false - close(w) - return w - } - - n.waiters = append(n.waiters, w) - return w + return n.c } diff --git a/common/signal/notifier_test.go b/common/signal/notifier_test.go index ca6a77003..b9e0b1275 100644 --- a/common/signal/notifier_test.go +++ b/common/signal/notifier_test.go @@ -10,7 +10,7 @@ import ( func TestNotifierSignal(t *testing.T) { //assert := With(t) - var n Notifier + n := NewNotifier() w := n.Wait() n.Signal() diff --git a/common/signal/pubsub/pubsub.go b/common/signal/pubsub/pubsub.go new file mode 100644 index 000000000..d5ef8f184 --- /dev/null +++ b/common/signal/pubsub/pubsub.go @@ -0,0 +1,97 @@ +package pubsub + +import ( + "sync" + "time" + + "v2ray.com/core/common" + "v2ray.com/core/common/task" +) + +type Subscriber struct { + name string + buffer chan interface{} + removed chan struct{} +} + +func (s *Subscriber) push(msg interface{}) { + select { + case s.buffer <- msg: + default: + } +} + +func (s *Subscriber) Wait() <-chan interface{} { + return s.buffer +} + +func (s *Subscriber) Close() { + close(s.removed) +} + +func (s *Subscriber) IsClosed() bool { + select { + case <-s.removed: + return true + default: + return false + } +} + +type Service struct { + sync.RWMutex + subs []*Subscriber + ctask *task.Periodic +} + +func NewService() *Service { + s := &Service{} + s.ctask = &task.Periodic{ + Execute: s.cleanup, + Interval: time.Second * 30, + } + common.Must(s.ctask.Start()) + return s +} + +func (s *Service) cleanup() error { + s.Lock() + defer s.Unlock() + + if len(s.subs) < 16 { + return nil + } + + newSub := make([]*Subscriber, 0, len(s.subs)) + for _, sub := range s.subs { + if !sub.IsClosed() { + newSub = append(newSub, sub) + } + } + + s.subs = newSub + return nil +} + +func (s *Service) Subscribe(name string) *Subscriber { + sub := &Subscriber{ + name: name, + buffer: make(chan interface{}, 16), + removed: make(chan struct{}), + } + s.Lock() + s.subs = append(s.subs, sub) + s.Unlock() + return sub +} + +func (s *Service) Publish(name string, message interface{}) { + s.RLock() + defer s.RUnlock() + + for _, sub := range s.subs { + if sub.name == name && !sub.IsClosed() { + sub.push(message) + } + } +} diff --git a/common/signal/pubsub/pubsub_test.go b/common/signal/pubsub/pubsub_test.go new file mode 100644 index 000000000..88a884093 --- /dev/null +++ b/common/signal/pubsub/pubsub_test.go @@ -0,0 +1,33 @@ +package pubsub_test + +import ( + "testing" + + . "v2ray.com/core/common/signal/pubsub" + . "v2ray.com/ext/assert" +) + +func TestPubsub(t *testing.T) { + assert := With(t) + + service := NewService() + + sub := service.Subscribe("a") + service.Publish("a", 1) + + select { + case v := <-sub.Wait(): + assert(v.(int), Equals, 1) + default: + t.Fail() + } + + sub.Close() + service.Publish("a", 2) + + select { + case <-sub.Wait(): + t.Fail() + default: + } +} diff --git a/proxy/vmess/outbound/outbound.go b/proxy/vmess/outbound/outbound.go index 5371b0601..1f58befcf 100644 --- a/proxy/vmess/outbound/outbound.go +++ b/proxy/vmess/outbound/outbound.go @@ -62,7 +62,7 @@ func (v *Handler) Process(ctx context.Context, link *core.Link, dialer proxy.Dia if err != nil { return newError("failed to find an available destination").Base(err).AtWarning() } - defer conn.Close() + defer conn.Close() //nolint: errcheck target, ok := proxy.TargetFromContext(ctx) if !ok {