From 47eb2fc47fef04b7cfde6d97dd5c0ae72ad144c8 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 22 Apr 2019 11:20:35 +0100 Subject: [PATCH] Break deadlock by creating session recv queue when session is created instead of repointing at search completion, also make expired atomic --- src/tuntap/tun.go | 16 ++++++++---- src/yggdrasil/conn.go | 56 ++++++++++++++-------------------------- src/yggdrasil/session.go | 4 +-- 3 files changed, 32 insertions(+), 44 deletions(-) diff --git a/src/tuntap/tun.go b/src/tuntap/tun.go index 912998c..5f87c20 100644 --- a/src/tuntap/tun.go +++ b/src/tuntap/tun.go @@ -193,12 +193,13 @@ func (tun *TunAdapter) connReader(conn *yggdrasil.Conn) error { delete(tun.conns, remoteNodeID) tun.mutex.Unlock() }() + tun.log.Debugln("Start connection reader for", conn.String()) b := make([]byte, 65535) for { n, err := conn.Read(b) if err != nil { tun.log.Errorln("TUN/TAP conn read error:", err) - return err + continue } if n == 0 { continue @@ -209,7 +210,7 @@ func (tun *TunAdapter) connReader(conn *yggdrasil.Conn) error { continue } if w != n { - tun.log.Errorln("TUN/TAP iface write len didn't match conn read len") + tun.log.Errorln("TUN/TAP iface write mismatch:", w, "bytes written vs", n, "bytes given") continue } } @@ -220,7 +221,7 @@ func (tun *TunAdapter) ifaceReader() error { for { n, err := tun.iface.Read(bs) if err != nil { - tun.log.Errorln("TUN/TAP iface read error:", err) + continue } // Look up if the dstination address is somewhere we already have an // open connection to @@ -253,6 +254,10 @@ func (tun *TunAdapter) ifaceReader() error { // Unknown address length or protocol continue } + if !dstAddr.IsValid() && !dstSnet.IsValid() { + // For now don't deal with any non-Yggdrasil ranges + continue + } dstNodeID, dstNodeIDMask = dstAddr.GetNodeIDandMask() // Do we have an active connection for this node ID? tun.mutex.Lock() @@ -260,10 +265,11 @@ func (tun *TunAdapter) ifaceReader() error { tun.mutex.Unlock() w, err := conn.Write(bs) if err != nil { - tun.log.Println("TUN/TAP conn write error:", err) + tun.log.Errorln("TUN/TAP conn write error:", err) continue } if w != n { + tun.log.Errorln("TUN/TAP conn write mismatch:", w, "bytes written vs", n, "bytes given") continue } } else { @@ -273,7 +279,7 @@ func (tun *TunAdapter) ifaceReader() error { go tun.connReader(&conn) } else { tun.mutex.Unlock() - tun.log.Println("TUN/TAP dial error:", err) + tun.log.Errorln("TUN/TAP dial error:", err) } } diff --git a/src/yggdrasil/conn.go b/src/yggdrasil/conn.go index 7898a12..9308d34 100644 --- a/src/yggdrasil/conn.go +++ b/src/yggdrasil/conn.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "sync" + "sync/atomic" "time" "github.com/yggdrasil-network/yggdrasil-go/src/crypto" @@ -17,9 +18,9 @@ type Conn struct { recv chan *wire_trafficPacket // Eventually gets attached to session.recv mutex *sync.RWMutex session *sessionInfo - readDeadline time.Time // TODO timer - writeDeadline time.Time // TODO timer - expired bool + readDeadline atomic.Value // time.Time // TODO timer + writeDeadline atomic.Value // time.Time // TODO timer + expired atomic.Value // bool } func (c *Conn) String() string { @@ -32,14 +33,12 @@ func (c *Conn) startSearch() { if err != nil { c.core.log.Debugln("DHT search failed:", err) c.mutex.Lock() - c.expired = true - c.mutex.Unlock() + c.expired.Store(true) return } if sinfo != nil { c.mutex.Lock() c.session = sinfo - c.session.recv = c.recv c.nodeID, c.nodeMask = sinfo.theirAddr.GetNodeIDandMask() c.mutex.Unlock() } @@ -75,30 +74,20 @@ func (c *Conn) startSearch() { } func (c *Conn) Read(b []byte) (int, error) { - err := func() error { - c.mutex.RLock() - defer c.mutex.RUnlock() - if c.expired { - return errors.New("session is closed") - } - return nil - }() - if err != nil { - return 0, err + if e, ok := c.expired.Load().(bool); ok && e { + return 0, errors.New("session is closed") } + c.mutex.RLock() + sinfo := c.session + c.mutex.RUnlock() select { // TODO... case p, ok := <-c.recv: if !ok { - c.mutex.Lock() - c.expired = true - c.mutex.Unlock() + c.expired.Store(true) return 0, errors.New("session is closed") } defer util.PutBytes(p.Payload) - c.mutex.RLock() - sinfo := c.session - c.mutex.RUnlock() var err error sinfo.doWorker(func() { if !sinfo.nonceIsOK(&p.Nonce) { @@ -131,19 +120,12 @@ func (c *Conn) Read(b []byte) (int, error) { } func (c *Conn) Write(b []byte) (bytesWritten int, err error) { - var sinfo *sessionInfo - err = func() error { - c.mutex.RLock() - defer c.mutex.RUnlock() - if c.expired { - return errors.New("session is closed") - } - sinfo = c.session - return nil - }() - if err != nil { - return 0, err + if e, ok := c.expired.Load().(bool); ok && e { + return 0, errors.New("session is closed") } + c.mutex.RLock() + sinfo := c.session + c.mutex.RUnlock() if sinfo == nil { c.core.router.doAdmin(func() { c.startSearch() @@ -173,7 +155,7 @@ func (c *Conn) Write(b []byte) (bytesWritten int, err error) { } func (c *Conn) Close() error { - c.expired = true + c.expired.Store(true) c.session.close() return nil } @@ -195,11 +177,11 @@ func (c *Conn) SetDeadline(t time.Time) error { } func (c *Conn) SetReadDeadline(t time.Time) error { - c.readDeadline = t + c.readDeadline.Store(t) return nil } func (c *Conn) SetWriteDeadline(t time.Time) error { - c.writeDeadline = t + c.writeDeadline.Store(t) return nil } diff --git a/src/yggdrasil/session.go b/src/yggdrasil/session.go index 15d346a..55d4a0c 100644 --- a/src/yggdrasil/session.go +++ b/src/yggdrasil/session.go @@ -321,6 +321,7 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo { sinfo.theirAddr = *address.AddrForNodeID(crypto.GetNodeID(&sinfo.theirPermPub)) sinfo.theirSubnet = *address.SubnetForNodeID(crypto.GetNodeID(&sinfo.theirPermPub)) sinfo.worker = make(chan func(), 1) + sinfo.recv = make(chan *wire_trafficPacket, 32) ss.sinfos[sinfo.myHandle] = &sinfo ss.byMySes[sinfo.mySesPub] = &sinfo.myHandle ss.byTheirPerm[sinfo.theirPermPub] = &sinfo.myHandle @@ -480,12 +481,11 @@ func (ss *sessions) handlePing(ping *sessionPing) { mutex: &sync.RWMutex{}, nodeID: crypto.GetNodeID(&sinfo.theirPermPub), nodeMask: &crypto.NodeID{}, - recv: make(chan *wire_trafficPacket, 32), + recv: sinfo.recv, } for i := range conn.nodeMask { conn.nodeMask[i] = 0xFF } - sinfo.recv = conn.recv ss.listener.conn <- conn } else { ss.core.log.Debugln("Received new session but there is no listener, ignoring")