From ae79246a66dadfdd3a84599731597cf9a1c3ed8e Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 4 Mar 2019 17:09:48 +0000 Subject: [PATCH 01/19] Move TCP under link.go --- src/yggdrasil/admin.go | 4 ++-- src/yggdrasil/core.go | 8 +------- src/yggdrasil/link.go | 9 ++++++++- src/yggdrasil/multicast.go | 6 +++--- src/yggdrasil/tcp.go | 32 ++++++++++++++++---------------- 5 files changed, 30 insertions(+), 29 deletions(-) diff --git a/src/yggdrasil/admin.go b/src/yggdrasil/admin.go index 6280339..d0d4cc9 100644 --- a/src/yggdrasil/admin.go +++ b/src/yggdrasil/admin.go @@ -577,9 +577,9 @@ func (a *admin) addPeer(addr string, sintf string) error { if err == nil { switch strings.ToLower(u.Scheme) { case "tcp": - a.core.tcp.connect(u.Host, sintf) + a.core.link.tcp.connect(u.Host, sintf) case "socks": - a.core.tcp.connectSOCKS(u.Host, u.Path[1:]) + a.core.link.tcp.connectSOCKS(u.Host, u.Path[1:]) default: return errors.New("invalid peer: " + addr) } diff --git a/src/yggdrasil/core.go b/src/yggdrasil/core.go index 0443328..b2a85ec 100644 --- a/src/yggdrasil/core.go +++ b/src/yggdrasil/core.go @@ -44,7 +44,6 @@ type Core struct { admin admin searches searches multicast multicast - tcp tcpInterface link link log *log.Logger } @@ -144,7 +143,7 @@ func (c *Core) UpdateConfig(config *config.NodeConfig) { c.router.tun.reconfigure, c.router.cryptokey.reconfigure, c.switchTable.reconfigure, - c.tcp.reconfigure, + // c.link.reconfigure, c.multicast.reconfigure, } @@ -205,11 +204,6 @@ func (c *Core) Start(nc *config.NodeConfig, log *log.Logger) error { c.init() - if err := c.tcp.init(c); err != nil { - c.log.Errorln("Failed to start TCP interface") - return err - } - if err := c.link.init(c); err != nil { c.log.Errorln("Failed to start link interfaces") return err diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index 6fc7687..8c03e08 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -8,6 +8,7 @@ import ( "net" "strings" "sync" + //"sync/atomic" "time" @@ -20,7 +21,8 @@ type link struct { core *Core mutex sync.RWMutex // protects interfaces below interfaces map[linkInfo]*linkInterface - awdl awdl // AWDL interface support + awdl awdl // AWDL interface support + tcp tcpInterface // TCP interface support // TODO timeout (to remove from switch), read from config.ReadTimeout } @@ -58,6 +60,11 @@ func (l *link) init(c *Core) error { l.interfaces = make(map[linkInfo]*linkInterface) l.mutex.Unlock() + if err := l.tcp.init(l); err != nil { + c.log.Errorln("Failed to start TCP interface") + return err + } + if err := l.awdl.init(l); err != nil { l.core.log.Errorln("Failed to start AWDL interface") return err diff --git a/src/yggdrasil/multicast.go b/src/yggdrasil/multicast.go index b20ee94..f416ea2 100644 --- a/src/yggdrasil/multicast.go +++ b/src/yggdrasil/multicast.go @@ -27,7 +27,7 @@ func (m *multicast) init(core *Core) { for { e := <-m.reconfigure m.myAddrMutex.Lock() - m.myAddr = m.core.tcp.getAddr() + m.myAddr = m.core.link.tcp.getAddr() m.myAddrMutex.Unlock() e <- nil } @@ -109,7 +109,7 @@ func (m *multicast) interfaces() []net.Interface { func (m *multicast) announce() { var anAddr net.TCPAddr m.myAddrMutex.Lock() - m.myAddr = m.core.tcp.getAddr() + m.myAddr = m.core.link.tcp.getAddr() m.myAddrMutex.Unlock() groupAddr, err := net.ResolveUDPAddr("udp6", m.groupAddr) if err != nil { @@ -183,6 +183,6 @@ func (m *multicast) listen() { } addr.Zone = from.Zone saddr := addr.String() - m.core.tcp.connect(saddr, addr.Zone) + m.core.link.tcp.connect(saddr, addr.Zone) } } diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index c65e2e6..989480d 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -32,7 +32,7 @@ const tcp_ping_interval = (default_timeout * 2 / 3) // The TCP listener and information about active TCP connections, to avoid duplication. type tcpInterface struct { - core *Core + link *link reconfigure chan chan error serv net.Listener stop chan bool @@ -77,16 +77,16 @@ func (iface *tcpInterface) connectSOCKS(socksaddr, peeraddr string) { } // Initializes the struct. -func (iface *tcpInterface) init(core *Core) (err error) { - iface.core = core +func (iface *tcpInterface) init(l *link) (err error) { + iface.link = l iface.stop = make(chan bool, 1) iface.reconfigure = make(chan chan error, 1) go func() { for { e := <-iface.reconfigure - iface.core.configMutex.RLock() - updated := iface.core.config.Listen != iface.core.configOld.Listen - iface.core.configMutex.RUnlock() + iface.link.core.configMutex.RLock() + updated := iface.link.core.config.Listen != iface.link.core.configOld.Listen + iface.link.core.configMutex.RUnlock() if updated { iface.stop <- true iface.serv.Close() @@ -103,9 +103,9 @@ func (iface *tcpInterface) init(core *Core) (err error) { func (iface *tcpInterface) listen() error { var err error - iface.core.configMutex.RLock() - iface.addr = iface.core.config.Listen - iface.core.configMutex.RUnlock() + iface.link.core.configMutex.RLock() + iface.addr = iface.link.core.config.Listen + iface.link.core.configMutex.RUnlock() ctx := context.Background() lc := net.ListenConfig{ @@ -127,16 +127,16 @@ func (iface *tcpInterface) listen() error { // Runs the listener, which spawns off goroutines for incoming connections. func (iface *tcpInterface) listener() { defer iface.serv.Close() - iface.core.log.Infoln("Listening for TCP on:", iface.serv.Addr().String()) + iface.link.core.log.Infoln("Listening for TCP on:", iface.serv.Addr().String()) for { sock, err := iface.serv.Accept() if err != nil { - iface.core.log.Errorln("Failed to accept connection:", err) + iface.link.core.log.Errorln("Failed to accept connection:", err) return } select { case <-iface.stop: - iface.core.log.Errorln("Stopping listener") + iface.link.core.log.Errorln("Stopping listener") return default: if err != nil { @@ -280,12 +280,12 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { remote, _, _ := net.SplitHostPort(sock.RemoteAddr().String()) remotelinklocal := net.ParseIP(remote).IsLinkLocalUnicast() name := "tcp://" + sock.RemoteAddr().String() - link, err := iface.core.link.create(&stream, name, "tcp", local, remote, incoming, remotelinklocal) + link, err := iface.link.core.link.create(&stream, name, "tcp", local, remote, incoming, remotelinklocal) if err != nil { - iface.core.log.Println(err) + iface.link.core.log.Println(err) panic(err) } - iface.core.log.Debugln("DEBUG: starting handler for", name) + iface.link.core.log.Debugln("DEBUG: starting handler for", name) err = link.handler() - iface.core.log.Debugln("DEBUG: stopped handler for", name, err) + iface.link.core.log.Debugln("DEBUG: stopped handler for", name, err) } From be8db0c120b48c233e29318138cb62cda94a2296 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 4 Mar 2019 17:52:57 +0000 Subject: [PATCH 02/19] Support multiple TCP listeners --- cmd/yggdrasil/main.go | 10 ++- src/config/config.go | 6 +- src/yggdrasil/link.go | 9 ++- src/yggdrasil/tcp.go | 155 +++++++++++++++++++++--------------- src/yggdrasil/tcp_darwin.go | 2 +- src/yggdrasil/tcp_other.go | 2 +- 6 files changed, 110 insertions(+), 74 deletions(-) diff --git a/cmd/yggdrasil/main.go b/cmd/yggdrasil/main.go index aa5a749..e0c764e 100644 --- a/cmd/yggdrasil/main.go +++ b/cmd/yggdrasil/main.go @@ -134,12 +134,20 @@ func readConfig(useconf *bool, useconffile *string, normaliseconf *bool) *nodeCo } } } + // Do a quick check for old-format Listen statement so that mapstructure + // doesn't fail and crash + if listen, ok := dat["Listen"].(string); ok { + if strings.HasPrefix(listen, "tcp://") { + dat["Listen"] = []string{listen} + } else { + dat["Listen"] = []string{"tcp://" + listen} + } + } // Overlay our newly mapped configuration onto the autoconf node config that // we generated above. if err = mapstructure.Decode(dat, &cfg); err != nil { panic(err) } - return cfg } diff --git a/src/config/config.go b/src/config/config.go index 14b1649..807ce25 100644 --- a/src/config/config.go +++ b/src/config/config.go @@ -12,7 +12,7 @@ import ( // NodeConfig defines all configuration values needed to run a signle yggdrasil node type NodeConfig struct { - Listen string `comment:"Listen address for peer connections. Default is to listen for all\nTCP connections over IPv4 and IPv6 with a random port."` + Listen []string `comment:"Listen address for peer connections. Default is to listen for all\nTCP connections over IPv4 and IPv6 with a random port."` AdminListen string `comment:"Listen address for admin connections. Default is to listen for local\nconnections either on TCP/9001 or a UNIX socket depending on your\nplatform. Use this value for yggdrasilctl -endpoint=X. To disable\nthe admin socket, use the value \"none\" instead."` Peers []string `comment:"List of connection strings for static peers in URI format, e.g.\ntcp://a.b.c.d:e or socks://a.b.c.d:e/f.g.h.i:j."` InterfacePeers map[string][]string `comment:"List of connection strings for static peers in URI format, arranged\nby source interface, e.g. { \"eth0\": [ tcp://a.b.c.d:e ] }. Note that\nSOCKS peerings will NOT be affected by this option and should go in\nthe \"Peers\" section instead."` @@ -79,10 +79,10 @@ func GenerateConfig(isAutoconf bool) *NodeConfig { // Create a node configuration and populate it. cfg := NodeConfig{} if isAutoconf { - cfg.Listen = "[::]:0" + cfg.Listen = []string{"tcp://[::]:0"} } else { r1 := rand.New(rand.NewSource(time.Now().UnixNano())) - cfg.Listen = fmt.Sprintf("[::]:%d", r1.Intn(65534-32768)+32768) + cfg.Listen = []string{fmt.Sprintf("[::]:%d", r1.Intn(65534-32768)+32768)} } cfg.AdminListen = defaults.GetDefaults().DefaultAdminListen cfg.EncryptionPublicKey = hex.EncodeToString(bpub[:]) diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index 8c03e08..277f24c 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -21,8 +21,9 @@ type link struct { core *Core mutex sync.RWMutex // protects interfaces below interfaces map[linkInfo]*linkInterface - awdl awdl // AWDL interface support - tcp tcpInterface // TCP interface support + handlers map[string]linkListener + awdl awdl // AWDL interface support + tcp tcp // TCP interface support // TODO timeout (to remove from switch), read from config.ReadTimeout } @@ -34,6 +35,10 @@ type linkInfo struct { remote string // Remote name or address } +type linkListener interface { + init(*link) error +} + type linkInterfaceMsgIO interface { readMsg() ([]byte, error) writeMsg([]byte) (int, error) diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index 989480d..45b15f9 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -31,13 +31,12 @@ const default_timeout = 6 * time.Second const tcp_ping_interval = (default_timeout * 2 / 3) // The TCP listener and information about active TCP connections, to avoid duplication. -type tcpInterface struct { +type tcp struct { link *link reconfigure chan chan error - serv net.Listener stop chan bool - addr string mutex sync.Mutex // Protecting the below + listeners map[string]net.Listener calls map[string]struct{} conns map[tcpInfo](chan struct{}) } @@ -52,7 +51,7 @@ type tcpInfo struct { } // Wrapper function to set additional options for specific connection types. -func (iface *tcpInterface) setExtraOptions(c net.Conn) { +func (t *tcp) setExtraOptions(c net.Conn) { switch sock := c.(type) { case *net.TCPConn: sock.SetNoDelay(true) @@ -62,62 +61,81 @@ func (iface *tcpInterface) setExtraOptions(c net.Conn) { } // Returns the address of the listener. -func (iface *tcpInterface) getAddr() *net.TCPAddr { - return iface.serv.Addr().(*net.TCPAddr) +func (t *tcp) getAddr() *net.TCPAddr { + for _, listener := range t.listeners { + return listener.Addr().(*net.TCPAddr) + } + return nil } // Attempts to initiate a connection to the provided address. -func (iface *tcpInterface) connect(addr string, intf string) { - iface.call(addr, nil, intf) +func (t *tcp) connect(addr string, intf string) { + t.call(addr, nil, intf) } // Attempst to initiate a connection to the provided address, viathe provided socks proxy address. -func (iface *tcpInterface) connectSOCKS(socksaddr, peeraddr string) { - iface.call(peeraddr, &socksaddr, "") +func (t *tcp) connectSOCKS(socksaddr, peeraddr string) { + t.call(peeraddr, &socksaddr, "") } // Initializes the struct. -func (iface *tcpInterface) init(l *link) (err error) { - iface.link = l - iface.stop = make(chan bool, 1) - iface.reconfigure = make(chan chan error, 1) +func (t *tcp) init(l *link) error { + t.link = l + t.stop = make(chan bool, 1) + t.reconfigure = make(chan chan error, 1) + go func() { for { - e := <-iface.reconfigure - iface.link.core.configMutex.RLock() - updated := iface.link.core.config.Listen != iface.link.core.configOld.Listen - iface.link.core.configMutex.RUnlock() + e := <-t.reconfigure + t.link.core.configMutex.RLock() + //updated := t.link.core.config.Listen != t.link.core.configOld.Listen + updated := false + t.link.core.configMutex.RUnlock() if updated { - iface.stop <- true - iface.serv.Close() - e <- iface.listen() + /* t.stop <- true + for _, listener := range t.listeners { + listener.Close() + } + e <- t.listen() */ } else { e <- nil } } }() - return iface.listen() + t.mutex.Lock() + t.calls = make(map[string]struct{}) + t.conns = make(map[tcpInfo](chan struct{})) + t.listeners = make(map[string]net.Listener) + t.mutex.Unlock() + + t.link.core.configMutex.RLock() + defer t.link.core.configMutex.RUnlock() + for _, listenaddr := range t.link.core.config.Listen { + if listenaddr[:6] != "tcp://" { + continue + } + if err := t.listen(listenaddr[6:]); err != nil { + return err + } + } + + return nil } -func (iface *tcpInterface) listen() error { +func (t *tcp) listen(listenaddr string) error { var err error - iface.link.core.configMutex.RLock() - iface.addr = iface.link.core.config.Listen - iface.link.core.configMutex.RUnlock() - ctx := context.Background() lc := net.ListenConfig{ - Control: iface.tcpContext, + Control: t.tcpContext, } - iface.serv, err = lc.Listen(ctx, "tcp", iface.addr) + listener, err := lc.Listen(ctx, "tcp", listenaddr) if err == nil { - iface.mutex.Lock() - iface.calls = make(map[string]struct{}) - iface.conns = make(map[tcpInfo](chan struct{})) - iface.mutex.Unlock() - go iface.listener() + t.mutex.Lock() + t.listeners[listenaddr] = listener + t.mutex.Unlock() + go t.listener(listenaddr) return nil } @@ -125,41 +143,46 @@ func (iface *tcpInterface) listen() error { } // Runs the listener, which spawns off goroutines for incoming connections. -func (iface *tcpInterface) listener() { - defer iface.serv.Close() - iface.link.core.log.Infoln("Listening for TCP on:", iface.serv.Addr().String()) +func (t *tcp) listener(listenaddr string) { + listener, ok := t.listeners[listenaddr] + if !ok { + t.link.core.log.Errorln("Tried to start TCP listener for", listenaddr, "which doesn't exist") + return + } + defer listener.Close() + t.link.core.log.Infoln("Listening for TCP on:", listener.Addr().String()) for { - sock, err := iface.serv.Accept() + sock, err := listener.Accept() if err != nil { - iface.link.core.log.Errorln("Failed to accept connection:", err) + t.link.core.log.Errorln("Failed to accept connection:", err) return } select { - case <-iface.stop: - iface.link.core.log.Errorln("Stopping listener") + case <-t.stop: + t.link.core.log.Errorln("Stopping listener") return default: if err != nil { panic(err) } - go iface.handler(sock, true) + go t.handler(sock, true) } } } // Checks if we already have a connection to this node -func (iface *tcpInterface) isAlreadyConnected(info tcpInfo) bool { - iface.mutex.Lock() - defer iface.mutex.Unlock() - _, isIn := iface.conns[info] +func (t *tcp) isAlreadyConnected(info tcpInfo) bool { + t.mutex.Lock() + defer t.mutex.Unlock() + _, isIn := t.conns[info] return isIn } // Checks if we already are calling this address -func (iface *tcpInterface) isAlreadyCalling(saddr string) bool { - iface.mutex.Lock() - defer iface.mutex.Unlock() - _, isIn := iface.calls[saddr] +func (t *tcp) isAlreadyCalling(saddr string) bool { + t.mutex.Lock() + defer t.mutex.Unlock() + _, isIn := t.calls[saddr] return isIn } @@ -168,25 +191,25 @@ func (iface *tcpInterface) isAlreadyCalling(saddr string) bool { // If the dial is successful, it launches the handler. // When finished, it removes the outgoing call, so reconnection attempts can be made later. // This all happens in a separate goroutine that it spawns. -func (iface *tcpInterface) call(saddr string, socksaddr *string, sintf string) { +func (t *tcp) call(saddr string, socksaddr *string, sintf string) { go func() { callname := saddr if sintf != "" { callname = fmt.Sprintf("%s/%s", saddr, sintf) } - if iface.isAlreadyCalling(callname) { + if t.isAlreadyCalling(callname) { return } - iface.mutex.Lock() - iface.calls[callname] = struct{}{} - iface.mutex.Unlock() + t.mutex.Lock() + t.calls[callname] = struct{}{} + t.mutex.Unlock() defer func() { // Block new calls for a little while, to mitigate livelock scenarios time.Sleep(default_timeout) time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond) - iface.mutex.Lock() - delete(iface.calls, callname) - iface.mutex.Unlock() + t.mutex.Lock() + delete(t.calls, callname) + t.mutex.Unlock() }() var conn net.Conn var err error @@ -212,7 +235,7 @@ func (iface *tcpInterface) call(saddr string, socksaddr *string, sintf string) { } } else { dialer := net.Dialer{ - Control: iface.tcpContext, + Control: t.tcpContext, } if sintf != "" { ief, err := net.InterfaceByName(sintf) @@ -267,25 +290,25 @@ func (iface *tcpInterface) call(saddr string, socksaddr *string, sintf string) { return } } - iface.handler(conn, false) + t.handler(conn, false) }() } -func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { +func (t *tcp) handler(sock net.Conn, incoming bool) { defer sock.Close() - iface.setExtraOptions(sock) + t.setExtraOptions(sock) stream := stream{} stream.init(sock) local, _, _ := net.SplitHostPort(sock.LocalAddr().String()) remote, _, _ := net.SplitHostPort(sock.RemoteAddr().String()) remotelinklocal := net.ParseIP(remote).IsLinkLocalUnicast() name := "tcp://" + sock.RemoteAddr().String() - link, err := iface.link.core.link.create(&stream, name, "tcp", local, remote, incoming, remotelinklocal) + link, err := t.link.core.link.create(&stream, name, "tcp", local, remote, incoming, remotelinklocal) if err != nil { - iface.link.core.log.Println(err) + t.link.core.log.Println(err) panic(err) } - iface.link.core.log.Debugln("DEBUG: starting handler for", name) + t.link.core.log.Debugln("DEBUG: starting handler for", name) err = link.handler() - iface.link.core.log.Debugln("DEBUG: stopped handler for", name, err) + t.link.core.log.Debugln("DEBUG: stopped handler for", name, err) } diff --git a/src/yggdrasil/tcp_darwin.go b/src/yggdrasil/tcp_darwin.go index 6483ef8..9d55a1d 100644 --- a/src/yggdrasil/tcp_darwin.go +++ b/src/yggdrasil/tcp_darwin.go @@ -10,7 +10,7 @@ import ( // WARNING: This context is used both by net.Dialer and net.Listen in tcp.go -func (iface *tcpInterface) tcpContext(network, address string, c syscall.RawConn) error { +func (t *tcp) tcpContext(network, address string, c syscall.RawConn) error { var control error var recvanyif error diff --git a/src/yggdrasil/tcp_other.go b/src/yggdrasil/tcp_other.go index 5d62b53..47bd772 100644 --- a/src/yggdrasil/tcp_other.go +++ b/src/yggdrasil/tcp_other.go @@ -8,6 +8,6 @@ import ( // WARNING: This context is used both by net.Dialer and net.Listen in tcp.go -func (iface *tcpInterface) tcpContext(network, address string, c syscall.RawConn) error { +func (t *tcp) tcpContext(network, address string, c syscall.RawConn) error { return nil } From 82bb95b77f2e64264f67c627df56812637705c4f Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 4 Mar 2019 18:41:32 +0000 Subject: [PATCH 03/19] Some more (inelegant) multiple listener code plus some reconfigure support --- src/util/util.go | 17 +++++++++++ src/yggdrasil/awdl.go | 15 ++++++++-- src/yggdrasil/core.go | 2 +- src/yggdrasil/link.go | 31 +++++++++++++++---- src/yggdrasil/tcp.go | 69 ++++++++++++++++++++++++++++++------------- 5 files changed, 103 insertions(+), 31 deletions(-) diff --git a/src/util/util.go b/src/util/util.go index df15ff2..d669fa5 100644 --- a/src/util/util.go +++ b/src/util/util.go @@ -76,3 +76,20 @@ func FuncTimeout(f func(), timeout time.Duration) bool { return false } } + +// This calculates the difference between two arrays and returns items +// that appear in A but not in B - useful somewhat when reconfiguring +// and working out what configuration items changed +func Difference(a, b []string) []string { + ab := []string{} + mb := map[string]bool{} + for _, x := range b { + mb[x] = true + } + for _, x := range a { + if _, ok := mb[x]; !ok { + ab = append(ab, x) + } + } + return ab +} diff --git a/src/yggdrasil/awdl.go b/src/yggdrasil/awdl.go index 4236688..fe64e8b 100644 --- a/src/yggdrasil/awdl.go +++ b/src/yggdrasil/awdl.go @@ -7,9 +7,10 @@ import ( ) type awdl struct { - link *link - mutex sync.RWMutex // protects interfaces below - interfaces map[string]*awdlInterface + link *link + reconfigure chan chan error + mutex sync.RWMutex // protects interfaces below + interfaces map[string]*awdlInterface } type awdlInterface struct { @@ -49,8 +50,16 @@ func (a *awdl) init(l *link) error { a.link = l a.mutex.Lock() a.interfaces = make(map[string]*awdlInterface) + a.reconfigure = make(chan chan error, 1) a.mutex.Unlock() + go func() { + for { + e := <-a.reconfigure + e <- nil + } + }() + return nil } diff --git a/src/yggdrasil/core.go b/src/yggdrasil/core.go index b2a85ec..12ff14f 100644 --- a/src/yggdrasil/core.go +++ b/src/yggdrasil/core.go @@ -143,7 +143,7 @@ func (c *Core) UpdateConfig(config *config.NodeConfig) { c.router.tun.reconfigure, c.router.cryptokey.reconfigure, c.switchTable.reconfigure, - // c.link.reconfigure, + c.link.reconfigure, c.multicast.reconfigure, } diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index 277f24c..d040aac 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -18,12 +18,13 @@ import ( ) type link struct { - core *Core - mutex sync.RWMutex // protects interfaces below - interfaces map[linkInfo]*linkInterface - handlers map[string]linkListener - awdl awdl // AWDL interface support - tcp tcp // TCP interface support + core *Core + reconfigure chan chan error + mutex sync.RWMutex // protects interfaces below + interfaces map[linkInfo]*linkInterface + handlers map[string]linkListener + awdl awdl // AWDL interface support + tcp tcp // TCP interface support // TODO timeout (to remove from switch), read from config.ReadTimeout } @@ -63,6 +64,7 @@ func (l *link) init(c *Core) error { l.core = c l.mutex.Lock() l.interfaces = make(map[linkInfo]*linkInterface) + l.reconfigure = make(chan chan error) l.mutex.Unlock() if err := l.tcp.init(l); err != nil { @@ -75,6 +77,23 @@ func (l *link) init(c *Core) error { return err } + go func() { + for { + e := <-l.reconfigure + tcpresponse := make(chan error) + awdlresponse := make(chan error) + l.tcp.reconfigure <- tcpresponse + l.awdl.reconfigure <- awdlresponse + if err := <-tcpresponse; err != nil { + e <- err + } + if err := <-awdlresponse; err != nil { + e <- err + } + e <- nil + } + }() + return nil } diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index 45b15f9..bacb346 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -25,6 +25,7 @@ import ( "golang.org/x/net/proxy" "github.com/yggdrasil-network/yggdrasil-go/src/crypto" + "github.com/yggdrasil-network/yggdrasil-go/src/util" ) const default_timeout = 6 * time.Second @@ -32,13 +33,13 @@ const tcp_ping_interval = (default_timeout * 2 / 3) // The TCP listener and information about active TCP connections, to avoid duplication. type tcp struct { - link *link - reconfigure chan chan error - stop chan bool - mutex sync.Mutex // Protecting the below - listeners map[string]net.Listener - calls map[string]struct{} - conns map[tcpInfo](chan struct{}) + link *link + reconfigure chan chan error + mutex sync.Mutex // Protecting the below + listeners map[string]net.Listener + listenerstops map[string]chan bool + calls map[string]struct{} + conns map[tcpInfo](chan struct{}) } // This is used as the key to a map that tracks existing connections, to prevent multiple connections to the same keys and local/remote address pair from occuring. @@ -81,22 +82,38 @@ func (t *tcp) connectSOCKS(socksaddr, peeraddr string) { // Initializes the struct. func (t *tcp) init(l *link) error { t.link = l - t.stop = make(chan bool, 1) t.reconfigure = make(chan chan error, 1) go func() { for { e := <-t.reconfigure t.link.core.configMutex.RLock() - //updated := t.link.core.config.Listen != t.link.core.configOld.Listen - updated := false + added := util.Difference(t.link.core.config.Listen, t.link.core.configOld.Listen) + deleted := util.Difference(t.link.core.configOld.Listen, t.link.core.config.Listen) + updated := len(added) > 0 || len(deleted) > 0 t.link.core.configMutex.RUnlock() if updated { - /* t.stop <- true - for _, listener := range t.listeners { + for _, add := range added { + if add[:6] != "tcp://" { + continue + } + if err := t.listen(add[6:]); err != nil { + e <- err + continue + } + } + for _, delete := range deleted { + t.link.core.log.Warnln("Removing listener", delete, "not currently implemented") + /*t.mutex.Lock() + if listener, ok := t.listeners[delete]; ok { listener.Close() } - e <- t.listen() */ + if listener, ok := t.listenerstops[delete]; ok { + listener <- true + } + t.mutex.Unlock()*/ + } + e <- nil } else { e <- nil } @@ -107,6 +124,7 @@ func (t *tcp) init(l *link) error { t.calls = make(map[string]struct{}) t.conns = make(map[tcpInfo](chan struct{})) t.listeners = make(map[string]net.Listener) + t.listenerstops = make(map[string]chan bool) t.mutex.Unlock() t.link.core.configMutex.RLock() @@ -134,6 +152,7 @@ func (t *tcp) listen(listenaddr string) error { if err == nil { t.mutex.Lock() t.listeners[listenaddr] = listener + t.listenerstops[listenaddr] = make(chan bool, 1) t.mutex.Unlock() go t.listener(listenaddr) return nil @@ -149,17 +168,25 @@ func (t *tcp) listener(listenaddr string) { t.link.core.log.Errorln("Tried to start TCP listener for", listenaddr, "which doesn't exist") return } + reallistenaddr := listener.Addr().String() defer listener.Close() - t.link.core.log.Infoln("Listening for TCP on:", listener.Addr().String()) + t.link.core.log.Infoln("Listening for TCP on:", reallistenaddr) for { - sock, err := listener.Accept() - if err != nil { - t.link.core.log.Errorln("Failed to accept connection:", err) - return - } + var sock net.Conn + var err error + accepted := make(chan bool) + go func() { + sock, err = listener.Accept() + accepted <- true + }() select { - case <-t.stop: - t.link.core.log.Errorln("Stopping listener") + case <-accepted: + if err != nil { + t.link.core.log.Errorln("Failed to accept connection:", err) + return + } + case <-t.listenerstops[listenaddr]: + t.link.core.log.Errorln("Stopping TCP listener on:", reallistenaddr) return default: if err != nil { From eeede4e6d0c0d221a25f8e5816569f3400d89cfc Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 4 Mar 2019 18:47:40 +0000 Subject: [PATCH 04/19] Fix some obvious concurrency bugs --- src/yggdrasil/tcp.go | 28 +++++++++++++++++----------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index bacb346..8dcd4c0 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -63,9 +63,14 @@ func (t *tcp) setExtraOptions(c net.Conn) { // Returns the address of the listener. func (t *tcp) getAddr() *net.TCPAddr { + // TODO: Fix this, because this will currently only give a single address + // to multicast.go, which obviously is not great, but right now multicast.go + // doesn't have the ability to send more than one address in a packet either + t.mutex.Lock() for _, listener := range t.listeners { return listener.Addr().(*net.TCPAddr) } + t.mutex.Unlock() return nil } @@ -83,6 +88,12 @@ func (t *tcp) connectSOCKS(socksaddr, peeraddr string) { func (t *tcp) init(l *link) error { t.link = l t.reconfigure = make(chan chan error, 1) + t.mutex.Lock() + t.calls = make(map[string]struct{}) + t.conns = make(map[tcpInfo](chan struct{})) + t.listeners = make(map[string]net.Listener) + t.listenerstops = make(map[string]chan bool) + t.mutex.Unlock() go func() { for { @@ -90,9 +101,8 @@ func (t *tcp) init(l *link) error { t.link.core.configMutex.RLock() added := util.Difference(t.link.core.config.Listen, t.link.core.configOld.Listen) deleted := util.Difference(t.link.core.configOld.Listen, t.link.core.config.Listen) - updated := len(added) > 0 || len(deleted) > 0 t.link.core.configMutex.RUnlock() - if updated { + if len(added) > 0 || len(deleted) > 0 { for _, add := range added { if add[:6] != "tcp://" { continue @@ -120,13 +130,6 @@ func (t *tcp) init(l *link) error { } }() - t.mutex.Lock() - t.calls = make(map[string]struct{}) - t.conns = make(map[tcpInfo](chan struct{})) - t.listeners = make(map[string]net.Listener) - t.listenerstops = make(map[string]chan bool) - t.mutex.Unlock() - t.link.core.configMutex.RLock() defer t.link.core.configMutex.RUnlock() for _, listenaddr := range t.link.core.config.Listen { @@ -163,8 +166,11 @@ func (t *tcp) listen(listenaddr string) error { // Runs the listener, which spawns off goroutines for incoming connections. func (t *tcp) listener(listenaddr string) { + t.mutex.Lock() listener, ok := t.listeners[listenaddr] - if !ok { + listenerstop, ok2 := t.listenerstops[listenaddr] + t.mutex.Unlock() + if !ok || !ok2 { t.link.core.log.Errorln("Tried to start TCP listener for", listenaddr, "which doesn't exist") return } @@ -185,7 +191,7 @@ func (t *tcp) listener(listenaddr string) { t.link.core.log.Errorln("Failed to accept connection:", err) return } - case <-t.listenerstops[listenaddr]: + case <-listenerstop: t.link.core.log.Errorln("Stopping TCP listener on:", reallistenaddr) return default: From 0be0b078cbe08b0752b5eece2b2aeef5c919a79a Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 4 Mar 2019 19:00:06 +0000 Subject: [PATCH 05/19] Remove unused types in link.go --- src/yggdrasil/link.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index d040aac..ac86938 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -22,7 +22,6 @@ type link struct { reconfigure chan chan error mutex sync.RWMutex // protects interfaces below interfaces map[linkInfo]*linkInterface - handlers map[string]linkListener awdl awdl // AWDL interface support tcp tcp // TCP interface support // TODO timeout (to remove from switch), read from config.ReadTimeout @@ -36,10 +35,6 @@ type linkInfo struct { remote string // Remote name or address } -type linkListener interface { - init(*link) error -} - type linkInterfaceMsgIO interface { readMsg() ([]byte, error) writeMsg([]byte) (int, error) From 2b8648e2b346daef301cd2d968c1315d40617e38 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 4 Mar 2019 19:04:09 +0000 Subject: [PATCH 06/19] Fix debug builds --- src/yggdrasil/debug.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/yggdrasil/debug.go b/src/yggdrasil/debug.go index 94faba4..8405714 100644 --- a/src/yggdrasil/debug.go +++ b/src/yggdrasil/debug.go @@ -449,19 +449,19 @@ func (c *Core) DEBUG_addSOCKSConn(socksaddr, peeraddr string) { //* func (c *Core) DEBUG_setupAndStartGlobalTCPInterface(addrport string) { - c.config.Listen = addrport - if err := c.tcp.init(c /*, addrport, 0*/); err != nil { - c.log.Println("Failed to start TCP interface:", err) + c.config.Listen = []string{addrport} + if err := c.link.init(c); err != nil { + c.log.Println("Failed to start interfaces:", err) panic(err) } } func (c *Core) DEBUG_getGlobalTCPAddr() *net.TCPAddr { - return c.tcp.serv.Addr().(*net.TCPAddr) + return c.link.tcp.getAddr() } func (c *Core) DEBUG_addTCPConn(saddr string) { - c.tcp.call(saddr, nil, "") + c.link.tcp.call(saddr, nil, "") } //*/ From 61774aed3bfe286422bdb4a92e5bd3e4f84848e0 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 4 Mar 2019 20:33:08 +0000 Subject: [PATCH 07/19] Show proto in admin socket, link linkInfo from peer, other fixes --- src/yggdrasil/admin.go | 6 ++++-- src/yggdrasil/debug.go | 10 +++++++++- src/yggdrasil/link.go | 2 +- src/yggdrasil/peer.go | 8 +++++--- src/yggdrasil/router.go | 10 +++++++++- src/yggdrasil/tcp.go | 34 ++++++---------------------------- 6 files changed, 34 insertions(+), 36 deletions(-) diff --git a/src/yggdrasil/admin.go b/src/yggdrasil/admin.go index d0d4cc9..ae5b02a 100644 --- a/src/yggdrasil/admin.go +++ b/src/yggdrasil/admin.go @@ -666,7 +666,8 @@ func (a *admin) getData_getPeers() []admin_nodeInfo { {"uptime", int(time.Since(p.firstSeen).Seconds())}, {"bytes_sent", atomic.LoadUint64(&p.bytesSent)}, {"bytes_recvd", atomic.LoadUint64(&p.bytesRecvd)}, - {"endpoint", p.endpoint}, + {"proto", p.intf.info.linkType}, + {"endpoint", p.intf.info.remote}, {"box_pub_key", hex.EncodeToString(p.box[:])}, } peerInfos = append(peerInfos, info) @@ -692,7 +693,8 @@ func (a *admin) getData_getSwitchPeers() []admin_nodeInfo { {"port", elem.port}, {"bytes_sent", atomic.LoadUint64(&peer.bytesSent)}, {"bytes_recvd", atomic.LoadUint64(&peer.bytesRecvd)}, - {"endpoint", peer.endpoint}, + {"proto", peer.intf.info.linkType}, + {"endpoint", peer.intf.info.remote}, {"box_pub_key", hex.EncodeToString(peer.box[:])}, } peerInfos = append(peerInfos, info) diff --git a/src/yggdrasil/debug.go b/src/yggdrasil/debug.go index 8405714..e575b72 100644 --- a/src/yggdrasil/debug.go +++ b/src/yggdrasil/debug.go @@ -97,7 +97,15 @@ func (c *Core) DEBUG_getPeers() *peers { } func (ps *peers) DEBUG_newPeer(box crypto.BoxPubKey, sig crypto.SigPubKey, link crypto.BoxSharedKey) *peer { - return ps.newPeer(&box, &sig, &link, "(simulator)", nil) + sim := linkInterface{ + name: "(simulator)", + info: linkInfo{ + local: "(simulator)", + remote: "(simulator)", + linkType: "sim", + }, + } + return ps.newPeer(&box, &sig, &link, &sim, nil) } /* diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index ac86938..4f7c6e1 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -173,7 +173,7 @@ func (intf *linkInterface) handler() error { intf.link.mutex.Unlock() // Create peer shared := crypto.GetSharedKey(myLinkPriv, &meta.link) - intf.peer = intf.link.core.peers.newPeer(&meta.box, &meta.sig, shared, intf.name, func() { intf.msgIO.close() }) + intf.peer = intf.link.core.peers.newPeer(&meta.box, &meta.sig, shared, intf, func() { intf.msgIO.close() }) if intf.peer == nil { return errors.New("failed to create peer") } diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 237d6f6..ce35936 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -98,6 +98,7 @@ type peer struct { bytesRecvd uint64 // To track bandwidth usage for getPeers // BUG: sync/atomic, 32 bit platforms need the above to be the first element core *Core + intf *linkInterface port switchPort box crypto.BoxPubKey sig crypto.SigPubKey @@ -113,18 +114,19 @@ type peer struct { } // Creates a new peer with the specified box, sig, and linkShared keys, using the lowest unoccupied port number. -func (ps *peers) newPeer(box *crypto.BoxPubKey, sig *crypto.SigPubKey, linkShared *crypto.BoxSharedKey, endpoint string, closer func()) *peer { +func (ps *peers) newPeer(box *crypto.BoxPubKey, sig *crypto.SigPubKey, linkShared *crypto.BoxSharedKey, intf *linkInterface, closer func()) *peer { now := time.Now() p := peer{box: *box, sig: *sig, shared: *crypto.GetSharedKey(&ps.core.boxPriv, box), linkShared: *linkShared, - endpoint: endpoint, firstSeen: now, doSend: make(chan struct{}, 1), dinfo: make(chan *dhtInfo, 1), close: closer, - core: ps.core} + core: ps.core, + intf: intf, + } ps.mutex.Lock() defer ps.mutex.Unlock() oldPorts := ps.getPorts() diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index 6acd473..1d4c077 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -67,7 +67,15 @@ func (r *router) init(core *Core) { r.addr = *address.AddrForNodeID(&r.core.dht.nodeID) r.subnet = *address.SubnetForNodeID(&r.core.dht.nodeID) in := make(chan []byte, 1) // TODO something better than this... - p := r.core.peers.newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, "(self)", nil) + self := linkInterface{ + name: "(self)", + info: linkInfo{ + local: "(self)", + remote: "(self)", + linkType: "self", + }, + } + p := r.core.peers.newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, &self, nil) p.out = func(packet []byte) { in <- packet } r.in = in out := make(chan []byte, 32) diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index 8dcd4c0..c0d568a 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -24,7 +24,6 @@ import ( "golang.org/x/net/proxy" - "github.com/yggdrasil-network/yggdrasil-go/src/crypto" "github.com/yggdrasil-network/yggdrasil-go/src/util" ) @@ -39,16 +38,7 @@ type tcp struct { listeners map[string]net.Listener listenerstops map[string]chan bool calls map[string]struct{} - conns map[tcpInfo](chan struct{}) -} - -// This is used as the key to a map that tracks existing connections, to prevent multiple connections to the same keys and local/remote address pair from occuring. -// Different address combinations are allowed, so multi-homing is still technically possible (but not necessarily advisable). -type tcpInfo struct { - box crypto.BoxPubKey - sig crypto.SigPubKey - localAddr string - remoteAddr string + conns map[linkInfo](chan struct{}) } // Wrapper function to set additional options for specific connection types. @@ -90,7 +80,7 @@ func (t *tcp) init(l *link) error { t.reconfigure = make(chan chan error, 1) t.mutex.Lock() t.calls = make(map[string]struct{}) - t.conns = make(map[tcpInfo](chan struct{})) + t.conns = make(map[linkInfo](chan struct{})) t.listeners = make(map[string]net.Listener) t.listenerstops = make(map[string]chan bool) t.mutex.Unlock() @@ -167,20 +157,20 @@ func (t *tcp) listen(listenaddr string) error { // Runs the listener, which spawns off goroutines for incoming connections. func (t *tcp) listener(listenaddr string) { t.mutex.Lock() - listener, ok := t.listeners[listenaddr] + listener, ok1 := t.listeners[listenaddr] listenerstop, ok2 := t.listenerstops[listenaddr] t.mutex.Unlock() - if !ok || !ok2 { + if !ok1 || !ok2 { t.link.core.log.Errorln("Tried to start TCP listener for", listenaddr, "which doesn't exist") return } reallistenaddr := listener.Addr().String() defer listener.Close() t.link.core.log.Infoln("Listening for TCP on:", reallistenaddr) + accepted := make(chan bool) for { var sock net.Conn var err error - accepted := make(chan bool) go func() { sock, err = listener.Accept() accepted <- true @@ -191,26 +181,14 @@ func (t *tcp) listener(listenaddr string) { t.link.core.log.Errorln("Failed to accept connection:", err) return } + go t.handler(sock, true) case <-listenerstop: t.link.core.log.Errorln("Stopping TCP listener on:", reallistenaddr) return - default: - if err != nil { - panic(err) - } - go t.handler(sock, true) } } } -// Checks if we already have a connection to this node -func (t *tcp) isAlreadyConnected(info tcpInfo) bool { - t.mutex.Lock() - defer t.mutex.Unlock() - _, isIn := t.conns[info] - return isIn -} - // Checks if we already are calling this address func (t *tcp) isAlreadyCalling(saddr string) bool { t.mutex.Lock() From 88925d3e06921107a603eb96def354cc938bea52 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 4 Mar 2019 22:45:35 +0000 Subject: [PATCH 08/19] Centralise call/listen functions in link.go --- src/yggdrasil/admin.go | 15 +++---------- src/yggdrasil/link.go | 44 ++++++++++++++++++++++++++++++++------ src/yggdrasil/multicast.go | 2 +- src/yggdrasil/tcp.go | 25 ++++------------------ 4 files changed, 45 insertions(+), 41 deletions(-) diff --git a/src/yggdrasil/admin.go b/src/yggdrasil/admin.go index ae5b02a..228c43c 100644 --- a/src/yggdrasil/admin.go +++ b/src/yggdrasil/admin.go @@ -573,18 +573,9 @@ func (a *admin) printInfos(infos []admin_nodeInfo) string { // addPeer triggers a connection attempt to a node. func (a *admin) addPeer(addr string, sintf string) error { - u, err := url.Parse(addr) - if err == nil { - switch strings.ToLower(u.Scheme) { - case "tcp": - a.core.link.tcp.connect(u.Host, sintf) - case "socks": - a.core.link.tcp.connectSOCKS(u.Host, u.Path[1:]) - default: - return errors.New("invalid peer: " + addr) - } - } else { - return errors.New("invalid peer: " + addr) + err := a.core.link.call(addr, sintf) + if err != nil { + return err } return nil } diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index 4f7c6e1..cbbdb5e 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "net" + "net/url" "strings" "sync" @@ -68,21 +69,20 @@ func (l *link) init(c *Core) error { } if err := l.awdl.init(l); err != nil { - l.core.log.Errorln("Failed to start AWDL interface") + c.log.Errorln("Failed to start AWDL interface") return err } go func() { for { e := <-l.reconfigure - tcpresponse := make(chan error) - awdlresponse := make(chan error) - l.tcp.reconfigure <- tcpresponse - l.awdl.reconfigure <- awdlresponse - if err := <-tcpresponse; err != nil { + response := make(chan error) + l.tcp.reconfigure <- response + if err := <-response; err != nil { e <- err } - if err := <-awdlresponse; err != nil { + l.awdl.reconfigure <- response + if err := <-response; err != nil { e <- err } e <- nil @@ -92,6 +92,36 @@ func (l *link) init(c *Core) error { return nil } +func (l *link) call(uri string, sintf string) error { + u, err := url.Parse(uri) + if err != nil { + return err + } + pathtokens := strings.Split(strings.Trim(u.Path, "/"), "/") + switch u.Scheme { + case "tcp": + l.tcp.call(u.Host, nil, sintf) + case "socks": + l.tcp.call(pathtokens[0], &u.Host, sintf) + default: + return errors.New("unknown call scheme: " + u.Scheme) + } + return nil +} + +func (l *link) listen(uri string) error { + u, err := url.Parse(uri) + if err != nil { + return err + } + switch u.Scheme { + case "tcp": + return l.tcp.listen(u.Host) + default: + return errors.New("unknown listen scheme: " + u.Scheme) + } +} + func (l *link) create(msgIO linkInterfaceMsgIO, name, linkType, local, remote string, incoming, force bool) (*linkInterface, error) { // Technically anything unique would work for names, but lets pick something human readable, just for debugging intf := linkInterface{ diff --git a/src/yggdrasil/multicast.go b/src/yggdrasil/multicast.go index f416ea2..59f0eea 100644 --- a/src/yggdrasil/multicast.go +++ b/src/yggdrasil/multicast.go @@ -183,6 +183,6 @@ func (m *multicast) listen() { } addr.Zone = from.Zone saddr := addr.String() - m.core.link.tcp.connect(saddr, addr.Zone) + m.core.link.call("tcp://"+saddr, addr.Zone) } } diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index c0d568a..74f14d8 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -64,16 +64,6 @@ func (t *tcp) getAddr() *net.TCPAddr { return nil } -// Attempts to initiate a connection to the provided address. -func (t *tcp) connect(addr string, intf string) { - t.call(addr, nil, intf) -} - -// Attempst to initiate a connection to the provided address, viathe provided socks proxy address. -func (t *tcp) connectSOCKS(socksaddr, peeraddr string) { - t.call(peeraddr, &socksaddr, "") -} - // Initializes the struct. func (t *tcp) init(l *link) error { t.link = l @@ -104,14 +94,6 @@ func (t *tcp) init(l *link) error { } for _, delete := range deleted { t.link.core.log.Warnln("Removing listener", delete, "not currently implemented") - /*t.mutex.Lock() - if listener, ok := t.listeners[delete]; ok { - listener.Close() - } - if listener, ok := t.listenerstops[delete]; ok { - listener <- true - } - t.mutex.Unlock()*/ } e <- nil } else { @@ -202,7 +184,7 @@ func (t *tcp) isAlreadyCalling(saddr string) bool { // If the dial is successful, it launches the handler. // When finished, it removes the outgoing call, so reconnection attempts can be made later. // This all happens in a separate goroutine that it spawns. -func (t *tcp) call(saddr string, socksaddr *string, sintf string) { +func (t *tcp) call(saddr string, options interface{}, sintf string) { go func() { callname := saddr if sintf != "" { @@ -224,12 +206,13 @@ func (t *tcp) call(saddr string, socksaddr *string, sintf string) { }() var conn net.Conn var err error - if socksaddr != nil { + socksaddr, issocks := options.(string) + if issocks { if sintf != "" { return } var dialer proxy.Dialer - dialer, err = proxy.SOCKS5("tcp", *socksaddr, nil, proxy.Direct) + dialer, err = proxy.SOCKS5("tcp", socksaddr, nil, proxy.Direct) if err != nil { return } From 2ef823e69c7fd25cb6f1b18e3d9ca7ac965e4a9d Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 4 Mar 2019 23:16:46 +0000 Subject: [PATCH 09/19] Fix deadlock when reconfiguring multicast --- src/yggdrasil/link.go | 13 ++++++++----- src/yggdrasil/tcp.go | 4 +++- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index cbbdb5e..c4f52a3 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -76,14 +76,17 @@ func (l *link) init(c *Core) error { go func() { for { e := <-l.reconfigure - response := make(chan error) - l.tcp.reconfigure <- response - if err := <-response; err != nil { + tcpresponse := make(chan error) + awdlresponse := make(chan error) + l.tcp.reconfigure <- tcpresponse + if err := <-tcpresponse; err != nil { e <- err + continue } - l.awdl.reconfigure <- response - if err := <-response; err != nil { + l.awdl.reconfigure <- awdlresponse + if err := <-awdlresponse; err != nil { e <- err + continue } e <- nil } diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index 74f14d8..80d9ccd 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -16,6 +16,7 @@ package yggdrasil import ( "context" + "errors" "fmt" "math/rand" "net" @@ -57,10 +58,10 @@ func (t *tcp) getAddr() *net.TCPAddr { // to multicast.go, which obviously is not great, but right now multicast.go // doesn't have the ability to send more than one address in a packet either t.mutex.Lock() + defer t.mutex.Unlock() for _, listener := range t.listeners { return listener.Addr().(*net.TCPAddr) } - t.mutex.Unlock() return nil } @@ -85,6 +86,7 @@ func (t *tcp) init(l *link) error { if len(added) > 0 || len(deleted) > 0 { for _, add := range added { if add[:6] != "tcp://" { + e <- errors.New("unknown scheme: " + add) continue } if err := t.listen(add[6:]); err != nil { From e71108dd26537ba3fffe6e228f160808d1c3785d Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Tue, 5 Mar 2019 09:16:44 +0000 Subject: [PATCH 10/19] Fix date in changelog.md --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6c290c8..3aa0b81 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,7 +25,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. - in case of vulnerabilities. --> -## [0.3.3] - 2018-02-18 +## [0.3.3] - 2019-02-18 ### Added - Dynamic reconfiguration, which allows reloading the configuration file to make changes during runtime by sending a `SIGHUP` signal (note: this only works with `-useconffile` and not `-useconf` and currently reconfiguring TUN/TAP is not supported) - Support for building Yggdrasil as an iOS or Android framework if the appropriate tools (e.g. `gomobile`/`gobind` + SDKs) are available From de2aff27583c51cdc1a0bd428235f3c391519ffd Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 6 Mar 2019 11:06:13 +0000 Subject: [PATCH 11/19] Refactor multicast so that it creates a new TCP listener for each interface with LL addresses (so that it will not break if Listen is not set with a wildcard address) --- src/yggdrasil/link.go | 3 +- src/yggdrasil/multicast.go | 54 ++++++++++++++++++++------------- src/yggdrasil/tcp.go | 61 ++++++++++++++++++++------------------ 3 files changed, 68 insertions(+), 50 deletions(-) diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index c4f52a3..07adbe8 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -119,7 +119,8 @@ func (l *link) listen(uri string) error { } switch u.Scheme { case "tcp": - return l.tcp.listen(u.Host) + _, err := l.tcp.listen(u.Host) + return err default: return errors.New("unknown listen scheme: " + u.Scheme) } diff --git a/src/yggdrasil/multicast.go b/src/yggdrasil/multicast.go index 59f0eea..401f678 100644 --- a/src/yggdrasil/multicast.go +++ b/src/yggdrasil/multicast.go @@ -5,7 +5,6 @@ import ( "fmt" "net" "regexp" - "sync" "time" "golang.org/x/net/ipv6" @@ -16,19 +15,16 @@ type multicast struct { reconfigure chan chan error sock *ipv6.PacketConn groupAddr string - myAddr *net.TCPAddr - myAddrMutex sync.RWMutex + listeners map[string]*tcpListener } func (m *multicast) init(core *Core) { m.core = core m.reconfigure = make(chan chan error, 1) + m.listeners = make(map[string]*tcpListener) go func() { for { e := <-m.reconfigure - m.myAddrMutex.Lock() - m.myAddr = m.core.link.tcp.getAddr() - m.myAddrMutex.Unlock() e <- nil } }() @@ -94,10 +90,12 @@ func (m *multicast) interfaces() []net.Interface { continue } for _, expr := range exprs { + // Compile each regular expression e, err := regexp.Compile(expr) if err != nil { panic(err) } + // Does the interface match the regular expression? Store it if so if e.MatchString(iface.Name) { interfaces = append(interfaces, iface) } @@ -107,10 +105,6 @@ func (m *multicast) interfaces() []net.Interface { } func (m *multicast) announce() { - var anAddr net.TCPAddr - m.myAddrMutex.Lock() - m.myAddr = m.core.link.tcp.getAddr() - m.myAddrMutex.Unlock() groupAddr, err := net.ResolveUDPAddr("udp6", m.groupAddr) if err != nil { panic(err) @@ -121,27 +115,47 @@ func (m *multicast) announce() { } for { for _, iface := range m.interfaces() { - m.sock.JoinGroup(&iface, groupAddr) + // Find interface addresses addrs, err := iface.Addrs() if err != nil { panic(err) } - m.myAddrMutex.RLock() - anAddr.Port = m.myAddr.Port - m.myAddrMutex.RUnlock() for _, addr := range addrs { addrIP, _, _ := net.ParseCIDR(addr.String()) + // Ignore IPv4 addresses if addrIP.To4() != nil { continue - } // IPv6 only + } + // Ignore non-link-local addresses if !addrIP.IsLinkLocalUnicast() { continue } - anAddr.IP = addrIP - anAddr.Zone = iface.Name - destAddr.Zone = iface.Name - msg := []byte(anAddr.String()) - m.sock.WriteTo(msg, nil, destAddr) + // Join the multicast group + m.sock.JoinGroup(&iface, groupAddr) + // Try and see if we already have a TCP listener for this interface + var listener *tcpListener + if _, ok := m.listeners[iface.Name]; !ok { + // No listener was found - let's create one + listenaddr := fmt.Sprintf("[%s%%%s]:0", addrIP, iface.Name) + if l, err := m.core.link.tcp.listen(listenaddr); err == nil { + // Store the listener so that we can stop it later if needed + listener = &tcpListener{ + listener: l, + stop: make(chan bool), + } + m.listeners[iface.Name] = listener + } + } else { + // An existing listener was found + listener = m.listeners[iface.Name] + } + // Get the listener details and construct the multicast beacon + lladdr := (*listener.listener).Addr().String() + if a, err := net.ResolveTCPAddr("tcp6", lladdr); err == nil { + destAddr.Zone = iface.Name + msg := []byte(a.String()) + m.sock.WriteTo(msg, nil, destAddr) + } break } time.Sleep(time.Second) diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index 80d9ccd..652f5ab 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -33,13 +33,17 @@ const tcp_ping_interval = (default_timeout * 2 / 3) // The TCP listener and information about active TCP connections, to avoid duplication. type tcp struct { - link *link - reconfigure chan chan error - mutex sync.Mutex // Protecting the below - listeners map[string]net.Listener - listenerstops map[string]chan bool - calls map[string]struct{} - conns map[linkInfo](chan struct{}) + link *link + reconfigure chan chan error + mutex sync.Mutex // Protecting the below + listeners map[string]*tcpListener + calls map[string]struct{} + conns map[linkInfo](chan struct{}) +} + +type tcpListener struct { + listener *net.Listener + stop chan bool } // Wrapper function to set additional options for specific connection types. @@ -60,7 +64,7 @@ func (t *tcp) getAddr() *net.TCPAddr { t.mutex.Lock() defer t.mutex.Unlock() for _, listener := range t.listeners { - return listener.Addr().(*net.TCPAddr) + return (*listener.listener).Addr().(*net.TCPAddr) } return nil } @@ -72,8 +76,7 @@ func (t *tcp) init(l *link) error { t.mutex.Lock() t.calls = make(map[string]struct{}) t.conns = make(map[linkInfo](chan struct{})) - t.listeners = make(map[string]net.Listener) - t.listenerstops = make(map[string]chan bool) + t.listeners = make(map[string]*tcpListener) t.mutex.Unlock() go func() { @@ -89,7 +92,7 @@ func (t *tcp) init(l *link) error { e <- errors.New("unknown scheme: " + add) continue } - if err := t.listen(add[6:]); err != nil { + if _, err := t.listen(add[6:]); err != nil { e <- err continue } @@ -110,7 +113,7 @@ func (t *tcp) init(l *link) error { if listenaddr[:6] != "tcp://" { continue } - if err := t.listen(listenaddr[6:]); err != nil { + if _, err := t.listen(listenaddr[6:]); err != nil { return err } } @@ -118,7 +121,7 @@ func (t *tcp) init(l *link) error { return nil } -func (t *tcp) listen(listenaddr string) error { +func (t *tcp) listen(listenaddr string) (*net.Listener, error) { var err error ctx := context.Background() @@ -127,36 +130,36 @@ func (t *tcp) listen(listenaddr string) error { } listener, err := lc.Listen(ctx, "tcp", listenaddr) if err == nil { + l := tcpListener{ + listener: &listener, + stop: make(chan bool, 1), + } t.mutex.Lock() - t.listeners[listenaddr] = listener - t.listenerstops[listenaddr] = make(chan bool, 1) + t.listeners[listenaddr[6:]] = &l t.mutex.Unlock() - go t.listener(listenaddr) - return nil + go t.listener(&l) + return &listener, nil } - return err + return nil, err } // Runs the listener, which spawns off goroutines for incoming connections. -func (t *tcp) listener(listenaddr string) { - t.mutex.Lock() - listener, ok1 := t.listeners[listenaddr] - listenerstop, ok2 := t.listenerstops[listenaddr] - t.mutex.Unlock() - if !ok1 || !ok2 { - t.link.core.log.Errorln("Tried to start TCP listener for", listenaddr, "which doesn't exist") +func (t *tcp) listener(listener *tcpListener) { + if listener == nil { return } - reallistenaddr := listener.Addr().String() - defer listener.Close() + reallistener := *listener.listener + reallistenaddr := reallistener.Addr().String() + stop := listener.stop + defer reallistener.Close() t.link.core.log.Infoln("Listening for TCP on:", reallistenaddr) accepted := make(chan bool) for { var sock net.Conn var err error go func() { - sock, err = listener.Accept() + sock, err = reallistener.Accept() accepted <- true }() select { @@ -166,7 +169,7 @@ func (t *tcp) listener(listenaddr string) { return } go t.handler(sock, true) - case <-listenerstop: + case <-stop: t.link.core.log.Errorln("Stopping TCP listener on:", reallistenaddr) return } From f4e17b9a9f10bab9292cddc4bd20f146d2582fa2 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 6 Mar 2019 12:07:33 +0000 Subject: [PATCH 12/19] Properly handle multicast interfaces going up and down --- src/yggdrasil/link.go | 8 +++---- src/yggdrasil/multicast.go | 37 ++++++++++++++++++---------- src/yggdrasil/tcp.go | 49 ++++++++++++++++++++++---------------- 3 files changed, 57 insertions(+), 37 deletions(-) diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index 07adbe8..a81b50d 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -261,11 +261,11 @@ func (intf *linkInterface) handler() error { // Now block until something is ready or the timer triggers keepalive traffic select { case <-tcpTimer.C: - intf.link.core.log.Debugf("Sending (legacy) keep-alive to %s: %s, source %s", + intf.link.core.log.Tracef("Sending (legacy) keep-alive to %s: %s, source %s", strings.ToUpper(intf.info.linkType), themString, intf.info.local) send(nil) case <-sendAck: - intf.link.core.log.Debugf("Sending ack to %s: %s, source %s", + intf.link.core.log.Tracef("Sending ack to %s: %s, source %s", strings.ToUpper(intf.info.linkType), themString, intf.info.local) send(nil) case msg := <-intf.peer.linkOut: @@ -280,7 +280,7 @@ func (intf *linkInterface) handler() error { case signalReady <- struct{}{}: default: } - //intf.link.core.log.Debugf("Sending packet to %s: %s, source %s", + //intf.link.core.log.Tracef("Sending packet to %s: %s, source %s", // strings.ToUpper(intf.info.linkType), themString, intf.info.local) } } @@ -331,7 +331,7 @@ func (intf *linkInterface) handler() error { sendTimerRunning = true } if !gotMsg { - intf.link.core.log.Debugf("Received ack from %s: %s, source %s", + intf.link.core.log.Tracef("Received ack from %s: %s, source %s", strings.ToUpper(intf.info.linkType), themString, intf.info.local) } case sentMsg, ok := <-signalSent: diff --git a/src/yggdrasil/multicast.go b/src/yggdrasil/multicast.go index 401f678..d4a03ff 100644 --- a/src/yggdrasil/multicast.go +++ b/src/yggdrasil/multicast.go @@ -64,13 +64,13 @@ func (m *multicast) start() error { return nil } -func (m *multicast) interfaces() []net.Interface { +func (m *multicast) interfaces() map[string]net.Interface { // Get interface expressions from config m.core.configMutex.RLock() exprs := m.core.config.MulticastInterfaces m.core.configMutex.RUnlock() // Ask the system for network interfaces - var interfaces []net.Interface + interfaces := make(map[string]net.Interface) allifaces, err := net.Interfaces() if err != nil { panic(err) @@ -97,7 +97,7 @@ func (m *multicast) interfaces() []net.Interface { } // Does the interface match the regular expression? Store it if so if e.MatchString(iface.Name) { - interfaces = append(interfaces, iface) + interfaces[iface.Name] = iface } } } @@ -114,7 +114,10 @@ func (m *multicast) announce() { panic(err) } for { - for _, iface := range m.interfaces() { + interfaces := m.interfaces() + // Now that we have a list of valid interfaces from the operating system, + // we can start checking if we can send multicasts on them + for _, iface := range interfaces { // Find interface addresses addrs, err := iface.Addrs() if err != nil { @@ -134,23 +137,24 @@ func (m *multicast) announce() { m.sock.JoinGroup(&iface, groupAddr) // Try and see if we already have a TCP listener for this interface var listener *tcpListener - if _, ok := m.listeners[iface.Name]; !ok { + if l, ok := m.listeners[iface.Name]; !ok || l.listener == nil { // No listener was found - let's create one listenaddr := fmt.Sprintf("[%s%%%s]:0", addrIP, iface.Name) if l, err := m.core.link.tcp.listen(listenaddr); err == nil { + m.core.log.Debugln("Started multicasting on", iface.Name) // Store the listener so that we can stop it later if needed - listener = &tcpListener{ - listener: l, - stop: make(chan bool), - } - m.listeners[iface.Name] = listener + m.listeners[iface.Name] = l } } else { // An existing listener was found listener = m.listeners[iface.Name] } + // Make sure nothing above failed for some reason + if listener == nil { + continue + } // Get the listener details and construct the multicast beacon - lladdr := (*listener.listener).Addr().String() + lladdr := listener.listener.Addr().String() if a, err := net.ResolveTCPAddr("tcp6", lladdr); err == nil { destAddr.Zone = iface.Name msg := []byte(a.String()) @@ -160,7 +164,16 @@ func (m *multicast) announce() { } time.Sleep(time.Second) } - time.Sleep(time.Second) + // There might be interfaces that we configured listeners for but are no + // longer up - if that's the case then we should stop the listeners + for name, listener := range m.listeners { + if _, ok := interfaces[name]; !ok { + listener.stop <- true + delete(m.listeners, name) + m.core.log.Debugln("No longer multicasting on", name) + } + } + time.Sleep(time.Second * 5) } } diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index 652f5ab..f46dc56 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -42,7 +42,7 @@ type tcp struct { } type tcpListener struct { - listener *net.Listener + listener net.Listener stop chan bool } @@ -63,8 +63,8 @@ func (t *tcp) getAddr() *net.TCPAddr { // doesn't have the ability to send more than one address in a packet either t.mutex.Lock() defer t.mutex.Unlock() - for _, listener := range t.listeners { - return (*listener.listener).Addr().(*net.TCPAddr) + for _, l := range t.listeners { + return l.listener.Addr().(*net.TCPAddr) } return nil } @@ -121,7 +121,7 @@ func (t *tcp) init(l *link) error { return nil } -func (t *tcp) listen(listenaddr string) (*net.Listener, error) { +func (t *tcp) listen(listenaddr string) (*tcpListener, error) { var err error ctx := context.Background() @@ -131,37 +131,40 @@ func (t *tcp) listen(listenaddr string) (*net.Listener, error) { listener, err := lc.Listen(ctx, "tcp", listenaddr) if err == nil { l := tcpListener{ - listener: &listener, - stop: make(chan bool, 1), + listener: listener, + stop: make(chan bool), } - t.mutex.Lock() - t.listeners[listenaddr[6:]] = &l - t.mutex.Unlock() - go t.listener(&l) - return &listener, nil + go t.listener(&l, listenaddr[6:]) + return &l, nil } return nil, err } // Runs the listener, which spawns off goroutines for incoming connections. -func (t *tcp) listener(listener *tcpListener) { - if listener == nil { +func (t *tcp) listener(l *tcpListener, listenaddr string) { + if l == nil { return } - reallistener := *listener.listener - reallistenaddr := reallistener.Addr().String() - stop := listener.stop - defer reallistener.Close() - t.link.core.log.Infoln("Listening for TCP on:", reallistenaddr) + // Track the listener so that we can find it again in future + t.mutex.Lock() + t.listeners[listenaddr] = l + t.mutex.Unlock() + // And here we go! accepted := make(chan bool) + defer l.listener.Close() + t.link.core.log.Infoln("Listening for TCP on:", l.listener.Addr().String()) for { var sock net.Conn var err error + // Listen in a separate goroutine, as that way it does not block us from + // receiving "stop" events go func() { - sock, err = reallistener.Accept() + sock, err = l.listener.Accept() accepted <- true }() + // Wait for either an accepted connection, or a message telling us to stop + // the TCP listener select { case <-accepted: if err != nil { @@ -169,8 +172,12 @@ func (t *tcp) listener(listener *tcpListener) { return } go t.handler(sock, true) - case <-stop: - t.link.core.log.Errorln("Stopping TCP listener on:", reallistenaddr) + case <-l.stop: + t.link.core.log.Infoln("Stopping TCP listener on:", l.listener.Addr().String()) + l.listener.Close() + t.mutex.Lock() + delete(t.listeners, listenaddr) + t.mutex.Unlock() return } } From c0d5a8c0bd8e9de113edc01063f98b2665ca56a3 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 6 Mar 2019 12:09:57 +0000 Subject: [PATCH 13/19] Clean up old listeners first --- src/yggdrasil/multicast.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/yggdrasil/multicast.go b/src/yggdrasil/multicast.go index d4a03ff..0b913ed 100644 --- a/src/yggdrasil/multicast.go +++ b/src/yggdrasil/multicast.go @@ -115,6 +115,15 @@ func (m *multicast) announce() { } for { interfaces := m.interfaces() + // There might be interfaces that we configured listeners for but are no + // longer up - if that's the case then we should stop the listeners + for name, listener := range m.listeners { + if _, ok := interfaces[name]; !ok { + listener.stop <- true + delete(m.listeners, name) + m.core.log.Debugln("No longer multicasting on", name) + } + } // Now that we have a list of valid interfaces from the operating system, // we can start checking if we can send multicasts on them for _, iface := range interfaces { @@ -164,15 +173,6 @@ func (m *multicast) announce() { } time.Sleep(time.Second) } - // There might be interfaces that we configured listeners for but are no - // longer up - if that's the case then we should stop the listeners - for name, listener := range m.listeners { - if _, ok := interfaces[name]; !ok { - listener.stop <- true - delete(m.listeners, name) - m.core.log.Debugln("No longer multicasting on", name) - } - } time.Sleep(time.Second * 5) } } From 531d9f39ca03fae52a397b8e1a87ba436574b132 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 6 Mar 2019 12:15:40 +0000 Subject: [PATCH 14/19] Fix multicast bug, set static multicast interval 15 seconds --- src/yggdrasil/multicast.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/yggdrasil/multicast.go b/src/yggdrasil/multicast.go index 0b913ed..50891bc 100644 --- a/src/yggdrasil/multicast.go +++ b/src/yggdrasil/multicast.go @@ -153,6 +153,7 @@ func (m *multicast) announce() { m.core.log.Debugln("Started multicasting on", iface.Name) // Store the listener so that we can stop it later if needed m.listeners[iface.Name] = l + listener = l } } else { // An existing listener was found @@ -171,9 +172,8 @@ func (m *multicast) announce() { } break } - time.Sleep(time.Second) } - time.Sleep(time.Second * 5) + time.Sleep(time.Second * 15) } } From 18ef28a4772e402785c48e5df59c0fb020563da4 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 6 Mar 2019 13:00:45 +0000 Subject: [PATCH 15/19] Fix default Listen config --- src/config/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/config/config.go b/src/config/config.go index 807ce25..28756f1 100644 --- a/src/config/config.go +++ b/src/config/config.go @@ -82,7 +82,7 @@ func GenerateConfig(isAutoconf bool) *NodeConfig { cfg.Listen = []string{"tcp://[::]:0"} } else { r1 := rand.New(rand.NewSource(time.Now().UnixNano())) - cfg.Listen = []string{fmt.Sprintf("[::]:%d", r1.Intn(65534-32768)+32768)} + cfg.Listen = []string{fmt.Sprintf("tcp://[::]:%d", r1.Intn(65534-32768)+32768)} } cfg.AdminListen = defaults.GetDefaults().DefaultAdminListen cfg.EncryptionPublicKey = hex.EncodeToString(bpub[:]) From b8cabf321276911bca9238726c67f7fe30560ee0 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 6 Mar 2019 16:40:48 +0000 Subject: [PATCH 16/19] Support removing Listen interfaces at runtime properly --- src/yggdrasil/multicast.go | 6 +++--- src/yggdrasil/tcp.go | 23 +++++++++++++++-------- 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/src/yggdrasil/multicast.go b/src/yggdrasil/multicast.go index 50891bc..1c67044 100644 --- a/src/yggdrasil/multicast.go +++ b/src/yggdrasil/multicast.go @@ -149,11 +149,11 @@ func (m *multicast) announce() { if l, ok := m.listeners[iface.Name]; !ok || l.listener == nil { // No listener was found - let's create one listenaddr := fmt.Sprintf("[%s%%%s]:0", addrIP, iface.Name) - if l, err := m.core.link.tcp.listen(listenaddr); err == nil { + if li, err := m.core.link.tcp.listen(listenaddr); err == nil { m.core.log.Debugln("Started multicasting on", iface.Name) // Store the listener so that we can stop it later if needed - m.listeners[iface.Name] = l - listener = l + m.listeners[iface.Name] = li + listener = li } } else { // An existing listener was found diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index f46dc56..0179c20 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -16,7 +16,6 @@ package yggdrasil import ( "context" - "errors" "fmt" "math/rand" "net" @@ -87,18 +86,26 @@ func (t *tcp) init(l *link) error { deleted := util.Difference(t.link.core.configOld.Listen, t.link.core.config.Listen) t.link.core.configMutex.RUnlock() if len(added) > 0 || len(deleted) > 0 { - for _, add := range added { - if add[:6] != "tcp://" { - e <- errors.New("unknown scheme: " + add) + for _, a := range added { + if a[:6] != "tcp://" { continue } - if _, err := t.listen(add[6:]); err != nil { + if _, err := t.listen(a[6:]); err != nil { e <- err continue } } - for _, delete := range deleted { - t.link.core.log.Warnln("Removing listener", delete, "not currently implemented") + for _, d := range deleted { + if d[:6] != "tcp://" { + continue + } + t.mutex.Lock() + if listener, ok := t.listeners[d[6:]]; ok { + t.mutex.Unlock() + listener.stop <- true + } else { + t.mutex.Unlock() + } } e <- nil } else { @@ -134,7 +141,7 @@ func (t *tcp) listen(listenaddr string) (*tcpListener, error) { listener: listener, stop: make(chan bool), } - go t.listener(&l, listenaddr[6:]) + go t.listener(&l, listenaddr) return &l, nil } From 57eb6eaeb0c095d66ff4fa19912c81f820c2303a Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 6 Mar 2019 17:45:47 +0000 Subject: [PATCH 17/19] Clean up config package --- src/config/config.go | 11 +---------- src/config/i2p.go | 8 -------- src/config/tor.go | 8 -------- 3 files changed, 1 insertion(+), 26 deletions(-) delete mode 100644 src/config/i2p.go delete mode 100644 src/config/tor.go diff --git a/src/config/config.go b/src/config/config.go index 28756f1..3c8bbcc 100644 --- a/src/config/config.go +++ b/src/config/config.go @@ -12,7 +12,7 @@ import ( // NodeConfig defines all configuration values needed to run a signle yggdrasil node type NodeConfig struct { - Listen []string `comment:"Listen address for peer connections. Default is to listen for all\nTCP connections over IPv4 and IPv6 with a random port."` + Listen []string `comment:"Listen addresses for peer connections. Default is to listen for all\nTCP connections over IPv4 and IPv6 with a random port."` AdminListen string `comment:"Listen address for admin connections. Default is to listen for local\nconnections either on TCP/9001 or a UNIX socket depending on your\nplatform. Use this value for yggdrasilctl -endpoint=X. To disable\nthe admin socket, use the value \"none\" instead."` Peers []string `comment:"List of connection strings for static peers in URI format, e.g.\ntcp://a.b.c.d:e or socks://a.b.c.d:e/f.g.h.i:j."` InterfacePeers map[string][]string `comment:"List of connection strings for static peers in URI format, arranged\nby source interface, e.g. { \"eth0\": [ tcp://a.b.c.d:e ] }. Note that\nSOCKS peerings will NOT be affected by this option and should go in\nthe \"Peers\" section instead."` @@ -30,13 +30,6 @@ type NodeConfig struct { SwitchOptions SwitchOptions `comment:"Advanced options for tuning the switch. Normally you will not need\nto edit these options."` NodeInfoPrivacy bool `comment:"By default, nodeinfo contains some defaults including the platform,\narchitecture and Yggdrasil version. These can help when surveying\nthe network and diagnosing network routing problems. Enabling\nnodeinfo privacy prevents this, so that only items specified in\n\"NodeInfo\" are sent back if specified."` NodeInfo map[string]interface{} `comment:"Optional node info. This must be a { \"key\": \"value\", ... } map\nor set as null. This is entirely optional but, if set, is visible\nto the whole network on request."` - //Net NetConfig `comment:"Extended options for connecting to peers over other networks."` -} - -// NetConfig defines network/proxy related configuration values -type NetConfig struct { - Tor TorConfig `comment:"Experimental options for configuring peerings over Tor."` - I2P I2PConfig `comment:"Experimental options for configuring peerings over I2P."` } // SessionFirewall controls the session firewall configuration @@ -71,8 +64,6 @@ type SwitchOptions struct { // isAutoconf is that the TCP and UDP ports will likely end up with different // port numbers. func GenerateConfig(isAutoconf bool) *NodeConfig { - // Create a new core. - //core := Core{} // Generate encryption keys. bpub, bpriv := crypto.NewBoxKeys() spub, spriv := crypto.NewSigKeys() diff --git a/src/config/i2p.go b/src/config/i2p.go deleted file mode 100644 index 0ee4a2b..0000000 --- a/src/config/i2p.go +++ /dev/null @@ -1,8 +0,0 @@ -package config - -// I2PConfig is the configuration structure for i2p related configuration -type I2PConfig struct { - Keyfile string // private key file or empty string for ephemeral keys - Addr string // address of i2p api connector - Enabled bool -} diff --git a/src/config/tor.go b/src/config/tor.go deleted file mode 100644 index c169cbb..0000000 --- a/src/config/tor.go +++ /dev/null @@ -1,8 +0,0 @@ -package config - -// TorConfig is the configuration structure for Tor Proxy related values -type TorConfig struct { - OnionKeyfile string // hidden service private key for ADD_ONION (currently unimplemented) - ControlAddr string // tor control port address - Enabled bool -} From 917ca6c1c58696a13c58b4dc44a66644d1165d53 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Fri, 8 Mar 2019 10:26:46 +0000 Subject: [PATCH 18/19] Make changes based on review comments --- src/util/util.go | 2 +- src/yggdrasil/admin.go | 2 +- src/yggdrasil/awdl.go | 3 +-- src/yggdrasil/link.go | 2 +- src/yggdrasil/tcp.go | 38 ++++++++++++++++++++++++++------------ 5 files changed, 30 insertions(+), 17 deletions(-) diff --git a/src/util/util.go b/src/util/util.go index d669fa5..49e0207 100644 --- a/src/util/util.go +++ b/src/util/util.go @@ -87,7 +87,7 @@ func Difference(a, b []string) []string { mb[x] = true } for _, x := range a { - if _, ok := mb[x]; !ok { + if !mb[x] { ab = append(ab, x) } } diff --git a/src/yggdrasil/admin.go b/src/yggdrasil/admin.go index 8cb195e..a0854f2 100644 --- a/src/yggdrasil/admin.go +++ b/src/yggdrasil/admin.go @@ -676,7 +676,7 @@ func (a *admin) getData_getPeers() []admin_nodeInfo { {"bytes_sent", atomic.LoadUint64(&p.bytesSent)}, {"bytes_recvd", atomic.LoadUint64(&p.bytesRecvd)}, {"proto", p.intf.info.linkType}, - {"endpoint", p.intf.info.remote}, + {"endpoint", p.intf.name}, {"box_pub_key", hex.EncodeToString(p.box[:])}, } peerInfos = append(peerInfos, info) diff --git a/src/yggdrasil/awdl.go b/src/yggdrasil/awdl.go index fe64e8b..5e8cce1 100644 --- a/src/yggdrasil/awdl.go +++ b/src/yggdrasil/awdl.go @@ -54,8 +54,7 @@ func (a *awdl) init(l *link) error { a.mutex.Unlock() go func() { - for { - e := <-a.reconfigure + for e := range a.reconfigure { e <- nil } }() diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index a81b50d..67ce5c1 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -105,7 +105,7 @@ func (l *link) call(uri string, sintf string) error { case "tcp": l.tcp.call(u.Host, nil, sintf) case "socks": - l.tcp.call(pathtokens[0], &u.Host, sintf) + l.tcp.call(pathtokens[0], u.Host, sintf) default: return errors.New("unknown call scheme: " + u.Scheme) } diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index 0179c20..2a177c0 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -159,7 +159,13 @@ func (t *tcp) listener(l *tcpListener, listenaddr string) { t.mutex.Unlock() // And here we go! accepted := make(chan bool) - defer l.listener.Close() + defer func() { + t.link.core.log.Infoln("Stopping TCP listener on:", l.listener.Addr().String()) + l.listener.Close() + t.mutex.Lock() + delete(t.listeners, listenaddr) + t.mutex.Unlock() + }() t.link.core.log.Infoln("Listening for TCP on:", l.listener.Addr().String()) for { var sock net.Conn @@ -178,13 +184,8 @@ func (t *tcp) listener(l *tcpListener, listenaddr string) { t.link.core.log.Errorln("Failed to accept connection:", err) return } - go t.handler(sock, true) + go t.handler(sock, true, nil) case <-l.stop: - t.link.core.log.Infoln("Stopping TCP listener on:", l.listener.Addr().String()) - l.listener.Close() - t.mutex.Lock() - delete(t.listeners, listenaddr) - t.mutex.Unlock() return } } @@ -230,8 +231,12 @@ func (t *tcp) call(saddr string, options interface{}, sintf string) { if sintf != "" { return } + dialerdst, er := net.ResolveTCPAddr("tcp", socksaddr) + if er != nil { + return + } var dialer proxy.Dialer - dialer, err = proxy.SOCKS5("tcp", socksaddr, nil, proxy.Direct) + dialer, err = proxy.SOCKS5("tcp", dialerdst.String(), nil, proxy.Direct) if err != nil { return } @@ -246,6 +251,7 @@ func (t *tcp) call(saddr string, options interface{}, sintf string) { addr: saddr, }, } + t.handler(conn, false, dialerdst.String()) } else { dialer := net.Dialer{ Control: t.tcpContext, @@ -302,12 +308,12 @@ func (t *tcp) call(saddr string, options interface{}, sintf string) { if err != nil { return } + t.handler(conn, false, nil) } - t.handler(conn, false) }() } -func (t *tcp) handler(sock net.Conn, incoming bool) { +func (t *tcp) handler(sock net.Conn, incoming bool, options interface{}) { defer sock.Close() t.setExtraOptions(sock) stream := stream{} @@ -315,8 +321,16 @@ func (t *tcp) handler(sock net.Conn, incoming bool) { local, _, _ := net.SplitHostPort(sock.LocalAddr().String()) remote, _, _ := net.SplitHostPort(sock.RemoteAddr().String()) remotelinklocal := net.ParseIP(remote).IsLinkLocalUnicast() - name := "tcp://" + sock.RemoteAddr().String() - link, err := t.link.core.link.create(&stream, name, "tcp", local, remote, incoming, remotelinklocal) + var name string + var proto string + if socksaddr, issocks := options.(string); issocks { + name = "socks://" + socksaddr + "/" + sock.RemoteAddr().String() + proto = "socks" + } else { + name = "tcp://" + sock.RemoteAddr().String() + proto = "tcp" + } + link, err := t.link.core.link.create(&stream, name, proto, local, remote, incoming, remotelinklocal) if err != nil { t.link.core.log.Println(err) panic(err) From 426d1570259dedec8d527b8c832b24b4829d4765 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Fri, 8 Mar 2019 18:51:07 -0600 Subject: [PATCH 19/19] make sure we don't replace an existing listener --- src/yggdrasil/tcp.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index 2a177c0..43ea443 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -155,8 +155,14 @@ func (t *tcp) listener(l *tcpListener, listenaddr string) { } // Track the listener so that we can find it again in future t.mutex.Lock() - t.listeners[listenaddr] = l - t.mutex.Unlock() + if _, isIn := t.listeners[listenaddr]; isIn { + t.mutex.Unlock() + l.listener.Close() + return + } else { + t.listeners[listenaddr] = l + t.mutex.Unlock() + } // And here we go! accepted := make(chan bool) defer func() {