From 307b24d8cb505143cb56372a3ff81118d6f1d158 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 17 Jul 2019 21:42:17 +0100 Subject: [PATCH] Fix Conn.Read/Conn.Write behavior after Conn.Close, get rid of second TUN/TAP conn reader goroutine, no longer use deadlines --- src/tuntap/conn.go | 66 +++++++++++++----------------------------- src/yggdrasil/conn.go | 42 +++++++++++++++++++-------- src/yggdrasil/debug.go | 7 +++-- 3 files changed, 54 insertions(+), 61 deletions(-) diff --git a/src/tuntap/conn.go b/src/tuntap/conn.go index c5e6e81..24d862e 100644 --- a/src/tuntap/conn.go +++ b/src/tuntap/conn.go @@ -43,7 +43,7 @@ func (s *tunConn) _close_nomutex() { }() } -func (s *tunConn) reader() error { +func (s *tunConn) reader() (err error) { select { case _, ok := <-s.stop: if !ok { @@ -51,55 +51,29 @@ func (s *tunConn) reader() error { } default: } - s.tun.log.Debugln("Starting conn reader for", s) - defer s.tun.log.Debugln("Stopping conn reader for", s) + s.tun.log.Debugln("Starting conn reader for", s.conn.String()) + defer s.tun.log.Debugln("Stopping conn reader for", s.conn.String()) var n int - var err error - read := make(chan bool) b := make([]byte, 65535) - go func() { - s.tun.log.Debugln("Starting conn reader helper for", s) - defer s.tun.log.Debugln("Stopping conn reader helper for", s) - for { - s.conn.SetReadDeadline(time.Now().Add(tunConnTimeout)) - if n, err = s.conn.Read(b); err != nil { - s.tun.log.Errorln(s.conn.String(), "TUN/TAP conn read error:", err) - if e, eok := err.(yggdrasil.ConnError); eok { - s.tun.log.Debugln("Conn reader helper", s, "error:", e) - switch { - case e.Temporary(): - fallthrough - case e.Timeout(): - read <- false - continue - case e.Closed(): - fallthrough - default: - s.close() - return - } - } - read <- false - } - read <- true - } - }() for { select { - case r, ok := <-read: - if r && n > 0 { - bs := append(util.GetBytes(), b[:n]...) - select { - case s.tun.send <- bs: - default: - util.PutBytes(bs) - } - } - if ok { - s.stillAlive() // TODO? Only stay alive if we read >0 bytes? - } case <-s.stop: return nil + default: + } + if n, err = s.conn.Read(b); err != nil { + if e, eok := err.(yggdrasil.ConnError); eok && !e.Temporary() { + s.tun.log.Errorln(s.conn.String(), "TUN/TAP conn read error:", err) + return e + } + } else if n > 0 { + bs := append(util.GetBytes(), b[:n]...) + select { + case s.tun.send <- bs: + default: + util.PutBytes(bs) + } + s.stillAlive() } } } @@ -112,8 +86,8 @@ func (s *tunConn) writer() error { } default: } - s.tun.log.Debugln("Starting conn writer for", s) - defer s.tun.log.Debugln("Stopping conn writer for", s) + s.tun.log.Debugln("Starting conn writer for", s.conn.String()) + defer s.tun.log.Debugln("Stopping conn writer for", s.conn.String()) for { select { case <-s.stop: diff --git a/src/yggdrasil/conn.go b/src/yggdrasil/conn.go index 2a286b0..0e18078 100644 --- a/src/yggdrasil/conn.go +++ b/src/yggdrasil/conn.go @@ -49,7 +49,7 @@ type Conn struct { nodeID *crypto.NodeID nodeMask *crypto.NodeID mutex sync.RWMutex - closed bool + close chan bool session *sessionInfo readDeadline atomic.Value // time.Time // TODO timer writeDeadline atomic.Value // time.Time // TODO timer @@ -62,6 +62,7 @@ func newConn(core *Core, nodeID *crypto.NodeID, nodeMask *crypto.NodeID, session nodeID: nodeID, nodeMask: nodeMask, session: session, + close: make(chan bool), } return &conn } @@ -127,12 +128,14 @@ func (c *Conn) Read(b []byte) (int, error) { for { // Wait for some traffic to come through from the session select { + case <-c.close: + return 0, ConnError{errors.New("session closed"), false, false, true, 0} case <-timer.C: - return 0, ConnError{errors.New("timeout"), true, false, false, 0} + return 0, ConnError{errors.New("read timeout"), true, false, false, 0} case p, ok := <-sinfo.recv: // If the session is closed then do nothing if !ok { - return 0, ConnError{errors.New("session is closed"), false, false, true, 0} + return 0, ConnError{errors.New("session closed"), false, false, true, 0} } defer util.PutBytes(p.Payload) var err error @@ -167,16 +170,26 @@ func (c *Conn) Read(b []byte) (int, error) { // Hand over to the session worker defer func() { if recover() != nil { - err = errors.New("read failed, session already closed") + err = ConnError{errors.New("read failed, session already closed"), false, false, true, 0} close(done) } }() // In case we're racing with a close - select { // Send to worker + // Send to worker + select { case sinfo.worker <- workerFunc: + case <-c.close: + return 0, ConnError{errors.New("session closed"), false, false, true, 0} case <-timer.C: - return 0, ConnError{errors.New("timeout"), true, false, false, 0} + return 0, ConnError{errors.New("read timeout"), true, false, false, 0} + } + // Wait for the worker to finish + select { + case <-done: // Wait for the worker to finish, failing this can cause memory errors (util.[Get||Put]Bytes stuff) + case <-c.close: + return 0, ConnError{errors.New("session closed"), false, false, true, 0} + case <-timer.C: + return 0, ConnError{errors.New("read timeout"), true, false, false, 0} } - <-done // Wait for the worker to finish, failing this can cause memory errors (util.[Get||Put]Bytes stuff) // Something went wrong in the session worker so abort if err != nil { if ce, ok := err.(*ConnError); ok && ce.Temporary() { @@ -257,7 +270,7 @@ func (c *Conn) Write(b []byte) (bytesWritten int, err error) { select { // Send to worker case sinfo.worker <- workerFunc: case <-timer.C: - return 0, ConnError{errors.New("timeout"), true, false, false, 0} + return 0, ConnError{errors.New("write timeout"), true, false, false, 0} } // Wait for the worker to finish, otherwise there are memory errors ([Get||Put]Bytes stuff) <-done @@ -269,16 +282,21 @@ func (c *Conn) Write(b []byte) (bytesWritten int, err error) { return written, err } -func (c *Conn) Close() error { +func (c *Conn) Close() (err error) { c.mutex.Lock() defer c.mutex.Unlock() if c.session != nil { // Close the session, if it hasn't been closed already c.core.router.doAdmin(c.session.close) } - // This can't fail yet - TODO? - c.closed = true - return nil + func() { + defer func() { + recover() + err = ConnError{errors.New("close failed, session already closed"), false, false, true, 0} + }() + close(c.close) // Closes reader/writer goroutines + }() + return } func (c *Conn) LocalAddr() crypto.NodeID { diff --git a/src/yggdrasil/debug.go b/src/yggdrasil/debug.go index c4eed63..5cb7c46 100644 --- a/src/yggdrasil/debug.go +++ b/src/yggdrasil/debug.go @@ -439,14 +439,14 @@ func (c *Core) DEBUG_maybeSendUDPKeys(saddr string) { */ //////////////////////////////////////////////////////////////////////////////// - +/* func (c *Core) DEBUG_addPeer(addr string) { err := c.admin.addPeer(addr, "") if err != nil { panic(err) } } - +*/ /* func (c *Core) DEBUG_addSOCKSConn(socksaddr, peeraddr string) { go func() { @@ -541,13 +541,14 @@ func (c *Core) DEBUG_setIfceExpr(expr *regexp.Regexp) { c.log.Println("DEBUG_setIfceExpr no longer implemented") } +/* func (c *Core) DEBUG_addAllowedEncryptionPublicKey(boxStr string) { err := c.admin.addAllowedEncryptionPublicKey(boxStr) if err != nil { panic(err) } } - +*/ //////////////////////////////////////////////////////////////////////////////// func DEBUG_simLinkPeers(p, q *peer) {