From cdedd304af772b9790d55f8746f5ccdf01d17e85 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 5 May 2018 17:14:03 -0500 Subject: [PATCH] make removePeers work for TCP connections and minor admin cleanup --- src/yggdrasil/admin.go | 86 +++++++++++++++++------------------------- src/yggdrasil/debug.go | 28 ++------------ src/yggdrasil/peer.go | 22 +++++++++++ src/yggdrasil/tcp.go | 11 +----- src/yggdrasil/udp.go | 12 +----- 5 files changed, 64 insertions(+), 95 deletions(-) diff --git a/src/yggdrasil/admin.go b/src/yggdrasil/admin.go index 385ff5c..fc95e23 100644 --- a/src/yggdrasil/admin.go +++ b/src/yggdrasil/admin.go @@ -5,6 +5,7 @@ import "os" import "bytes" import "errors" import "fmt" +import "net/url" import "sort" import "strings" import "strconv" @@ -57,18 +58,18 @@ func (a *admin) init(c *Core, listenaddr string) { a.addHandler("getSessions", nil, func(out *[]byte, _ ...string) { *out = []byte(a.printInfos(a.getData_getSessions())) }) - a.addHandler("addPeer", []string{""}, func(out *[]byte, saddr ...string) { + a.addHandler("addPeer", []string{""}, func(out *[]byte, saddr ...string) { if a.addPeer(saddr[0]) == nil { *out = []byte("Adding peer: " + saddr[0] + "\n") } else { *out = []byte("Failed to add peer: " + saddr[0] + "\n") } }) - a.addHandler("removePeer", []string{""}, func(out *[]byte, saddr ...string) { - if a.removePeer(saddr[0]) == nil { - *out = []byte("Removing peer: " + saddr[0] + "\n") + a.addHandler("removePeer", []string{""}, func(out *[]byte, sport ...string) { + if a.removePeer(sport[0]) == nil { + *out = []byte("Removing peer: " + sport[0] + "\n") } else { - *out = []byte("Failed to remove peer: " + saddr[0] + "\n") + *out = []byte("Failed to remove peer: " + sport[0] + "\n") } }) a.addHandler("setTunTap", []string{"", "[]", "[]"}, func(out *[]byte, ifparams ...string) { @@ -191,60 +192,43 @@ func (a *admin) printInfos(infos []admin_nodeInfo) string { return strings.Join(out, "\n") } -func (a *admin) addPeer(p string) error { - pAddr := p - if p[:4] == "tcp:" || p[:4] == "udp:" { - pAddr = p[4:] - } - switch { - case len(p) >= 4 && p[:4] == "udp:": - // Connect to peer over UDP - udpAddr, err := net.ResolveUDPAddr("udp", pAddr) - if err != nil { - return err +func (a *admin) addPeer(addr string) error { + u, err := url.Parse(addr) + if err == nil { + switch strings.ToLower(u.Scheme) { + case "tcp": + a.core.DEBUG_addTCPConn(u.Host) + case "udp": + a.core.DEBUG_maybeSendUDPKeys(u.Host) + case "socks": + a.core.DEBUG_addSOCKSConn(u.Host, u.Path[1:]) + default: + return errors.New("invalid peer: " + addr) } - var addr connAddr - addr.fromUDPAddr(udpAddr) - a.core.udp.mutex.RLock() - _, isIn := a.core.udp.conns[addr] - a.core.udp.mutex.RUnlock() - if !isIn { - a.core.udp.sendKeys(addr) + } else { + // no url scheme provided + addr = strings.ToLower(addr) + if strings.HasPrefix(addr, "udp:") { + a.core.DEBUG_maybeSendUDPKeys(addr[4:]) + return nil + } else { + if strings.HasPrefix(addr, "tcp:") { + addr = addr[4:] + } + a.core.DEBUG_addTCPConn(addr) + return nil } - return nil - case len(p) >= 4 && p[:4] == "tcp:": - default: - // Connect to peer over TCP - _, err := net.ResolveTCPAddr("tcp", pAddr) - if err != nil { - return err - } - a.core.tcp.call(p) + return errors.New("invalid peer: " + addr) } return nil } func (a *admin) removePeer(p string) error { - pAddr := p - if p[:4] == "tcp:" || p[:4] == "udp:" { - pAddr = p[4:] - } - switch { - case len(p) >= 4 && p[:4] == "udp:": - // Connect to peer over UDP - udpAddr, err := net.ResolveUDPAddr("udp", pAddr) - if err != nil { - return err - } - var addr connAddr - addr.fromUDPAddr(udpAddr) - a.core.udp.sendClose(addr) - return nil - case len(p) >= 4 && p[:4] == "tcp:": - default: - // Connect to peer over TCP - return errors.New("Removing TCP peer not yet supported") + iport, err := strconv.Atoi(p) + if err != nil { + return err } + a.core.peers.removePeer(switchPort(iport)) return nil } diff --git a/src/yggdrasil/debug.go b/src/yggdrasil/debug.go index e70e8a6..f87bb75 100644 --- a/src/yggdrasil/debug.go +++ b/src/yggdrasil/debug.go @@ -12,9 +12,7 @@ import "golang.org/x/net/proxy" import "fmt" import "net" -import "net/url" import "log" -import "strings" import "regexp" // Core @@ -313,29 +311,9 @@ func (c *Core) DEBUG_maybeSendUDPKeys(saddr string) { //////////////////////////////////////////////////////////////////////////////// func (c *Core) DEBUG_addPeer(addr string) { - u, err := url.Parse(addr) - if err == nil { - switch strings.ToLower(u.Scheme) { - case "tcp": - c.DEBUG_addTCPConn(u.Host) - case "udp": - c.DEBUG_maybeSendUDPKeys(u.Host) - case "socks": - c.DEBUG_addSOCKSConn(u.Host, u.Path[1:]) - default: - panic("invalid peer: " + addr) - } - } else { - // no url scheme provided - addr = strings.ToLower(addr) - if strings.HasPrefix(addr, "udp:") { - c.DEBUG_maybeSendUDPKeys(addr[4:]) - } else { - if strings.HasPrefix(addr, "tcp:") { - addr = addr[4:] - } - c.DEBUG_addTCPConn(addr) - } + err := c.admin.addPeer(addr) + if err != nil { + panic(err) } } diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 7f1de6b..ea05728 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -73,6 +73,8 @@ type peer struct { // Specifically, processing switch messages, signing, and verifying sigs // Resets at the start of each tick throttle uint8 + // Called when a peer is removed, to close the underlying connection, or via admin api + close func() } const peer_Throttle = 1 @@ -123,6 +125,26 @@ func (ps *peers) newPeer(box *boxPubKey, return &p } +func (ps *peers) removePeer(port switchPort) { + // TODO? store linkIn in the peer struct, close it here? (once) + if port == 0 { + return + } // Can't remove self peer + ps.mutex.Lock() + oldPorts := ps.getPorts() + p, isIn := oldPorts[port] + newPorts := make(map[switchPort]*peer) + for k, v := range oldPorts { + newPorts[k] = v + } + delete(newPorts, port) + ps.putPorts(newPorts) + ps.mutex.Unlock() + if isIn && p.close != nil { + p.close() + } +} + func (p *peer) linkLoop(in <-chan []byte) { ticker := time.NewTicker(time.Second) defer ticker.Stop() diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index d84dd34..3a4e9fb 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -238,19 +238,12 @@ func (iface *tcpInterface) handler(sock net.Conn) { util_putBytes(msg) } } + p.close = func() { sock.Close() } setNoDelay(sock, true) go p.linkLoop(linkIn) defer func() { // Put all of our cleanup here... - p.core.peers.mutex.Lock() - oldPorts := p.core.peers.getPorts() - newPorts := make(map[switchPort]*peer) - for k, v := range oldPorts { - newPorts[k] = v - } - delete(newPorts, p.port) - p.core.peers.putPorts(newPorts) - p.core.peers.mutex.Unlock() + p.core.peers.removePeer(p.port) close(linkIn) }() them, _, _ := net.SplitHostPort(sock.RemoteAddr().String()) diff --git a/src/yggdrasil/udp.go b/src/yggdrasil/udp.go index 35871c6..fffda93 100644 --- a/src/yggdrasil/udp.go +++ b/src/yggdrasil/udp.go @@ -115,20 +115,11 @@ func (iface *udpInterface) startConn(info *connInfo) { iface.mutex.Lock() delete(iface.conns, info.addr) iface.mutex.Unlock() - iface.core.peers.mutex.Lock() - oldPorts := iface.core.peers.getPorts() - newPorts := make(map[switchPort]*peer) - for k, v := range oldPorts { - newPorts[k] = v - } - delete(newPorts, info.peer.port) - iface.core.peers.putPorts(newPorts) - iface.core.peers.mutex.Unlock() + iface.core.peers.removePeer(info.peer.port) close(info.linkIn) close(info.keysIn) close(info.closeIn) close(info.out) - iface.sendClose(info.addr) iface.core.log.Println("Removing peer:", info.name) }() for { @@ -296,6 +287,7 @@ func (iface *udpInterface) handleKeys(msg []byte, addr connAddr) { } }() //*/ + conn.peer.close = func() { iface.sendClose(conn.addr) } iface.mutex.Lock() iface.conns[addr] = conn iface.mutex.Unlock()