From 679866d5ff8d300985018420e5bf027be9016a0f Mon Sep 17 00:00:00 2001 From: Arceliar Date: Mon, 5 Aug 2019 19:11:28 -0500 Subject: [PATCH] have createSession fill the sessionInfo.cancel field, have Conn use Conn.session.cancel instead of storing its own cancellation, this should prevent any of these things from being both nil and reachable at the same time --- src/yggdrasil/conn.go | 13 +++++-------- src/yggdrasil/dialer.go | 2 +- src/yggdrasil/router.go | 3 +-- src/yggdrasil/session.go | 11 ++++++++--- 4 files changed, 15 insertions(+), 14 deletions(-) diff --git a/src/yggdrasil/conn.go b/src/yggdrasil/conn.go index 1a05bd8..2452a3d 100644 --- a/src/yggdrasil/conn.go +++ b/src/yggdrasil/conn.go @@ -57,7 +57,6 @@ type Conn struct { core *Core readDeadline atomic.Value // time.Time // TODO timer writeDeadline atomic.Value // time.Time // TODO timer - cancel util.Cancellation mutex sync.RWMutex // protects the below nodeID *crypto.NodeID nodeMask *crypto.NodeID @@ -71,7 +70,6 @@ func newConn(core *Core, nodeID *crypto.NodeID, nodeMask *crypto.NodeID, session nodeID: nodeID, nodeMask: nodeMask, session: session, - cancel: util.NewCancellation(), } return &conn } @@ -142,10 +140,10 @@ func (c *Conn) doSearch() { func (c *Conn) getDeadlineCancellation(value *atomic.Value) util.Cancellation { if deadline, ok := value.Load().(time.Time); ok { // A deadline is set, so return a Cancellation that uses it - return util.CancellationWithDeadline(c.cancel, deadline) + return util.CancellationWithDeadline(c.session.cancel, deadline) } else { // No cancellation was set, so return a child cancellation with no timeout - return util.CancellationChild(c.cancel) + return util.CancellationChild(c.session.cancel) } } @@ -241,10 +239,9 @@ func (c *Conn) Close() (err error) { defer c.mutex.Unlock() if c.session != nil { // Close the session, if it hasn't been closed already - c.core.router.doAdmin(c.session.close) - } - if e := c.cancel.Cancel(errors.New("connection closed")); e != nil { - err = ConnError{errors.New("close failed, session already closed"), false, false, true, 0} + if e := c.session.cancel.Cancel(errors.New("connection closed")); e != nil { + err = ConnError{errors.New("close failed, session already closed"), false, false, true, 0} + } } return } diff --git a/src/yggdrasil/dialer.go b/src/yggdrasil/dialer.go index 6ce2e8a..db5d5a4 100644 --- a/src/yggdrasil/dialer.go +++ b/src/yggdrasil/dialer.go @@ -69,7 +69,7 @@ func (d *Dialer) DialByNodeIDandMask(nodeID, nodeMask *crypto.NodeID) (*Conn, er defer t.Stop() select { case <-conn.session.init: - conn.session.startWorkers(conn.cancel) + conn.session.startWorkers() return conn, nil case <-t.C: conn.Close() diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index 161ae60..a11f6ae 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -161,8 +161,7 @@ func (r *router) handleTraffic(packet []byte) { return } sinfo, isIn := r.core.sessions.getSessionForHandle(&p.Handle) - if !isIn || sinfo.cancel == nil { - // FIXME make sure sinfo.cancel can never be nil + if !isIn { util.PutBytes(p.Payload) return } diff --git a/src/yggdrasil/session.go b/src/yggdrasil/session.go index c39f60d..f9c38fa 100644 --- a/src/yggdrasil/session.go +++ b/src/yggdrasil/session.go @@ -208,6 +208,7 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo { sinfo.pingTime = now sinfo.pingSend = now sinfo.init = make(chan struct{}) + sinfo.cancel = util.NewCancellation() higher := false for idx := range ss.core.boxPub { if ss.core.boxPub[idx] > sinfo.theirPermPub[idx] { @@ -232,6 +233,11 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo { sinfo.send = make(chan []byte, 32) ss.sinfos[sinfo.myHandle] = &sinfo ss.byTheirPerm[sinfo.theirPermPub] = &sinfo.myHandle + go func() { + // Run cleanup when the session is canceled + <-sinfo.cancel.Finished() + sinfo.core.router.doAdmin(sinfo.close) + }() return &sinfo } @@ -362,7 +368,7 @@ func (ss *sessions) handlePing(ping *sessionPing) { for i := range conn.nodeMask { conn.nodeMask[i] = 0xFF } - conn.session.startWorkers(conn.cancel) + conn.session.startWorkers() ss.listener.conn <- conn } ss.listenerMutex.Unlock() @@ -431,8 +437,7 @@ func (ss *sessions) reset() { //////////////////////////// Worker Functions Below //////////////////////////// //////////////////////////////////////////////////////////////////////////////// -func (sinfo *sessionInfo) startWorkers(cancel util.Cancellation) { - sinfo.cancel = cancel +func (sinfo *sessionInfo) startWorkers() { go sinfo.recvWorker() go sinfo.sendWorker() }