5
0
mirror of https://github.com/cwinfo/yggdrasil-go.git synced 2024-11-26 07:11:40 +00:00

have createSession fill the sessionInfo.cancel field, have Conn use Conn.session.cancel instead of storing its own cancellation, this should prevent any of these things from being both nil and reachable at the same time

This commit is contained in:
Arceliar 2019-08-05 19:11:28 -05:00
parent 8a85149817
commit 679866d5ff
4 changed files with 15 additions and 14 deletions

View File

@ -57,7 +57,6 @@ type Conn struct {
core *Core core *Core
readDeadline atomic.Value // time.Time // TODO timer readDeadline atomic.Value // time.Time // TODO timer
writeDeadline atomic.Value // time.Time // TODO timer writeDeadline atomic.Value // time.Time // TODO timer
cancel util.Cancellation
mutex sync.RWMutex // protects the below mutex sync.RWMutex // protects the below
nodeID *crypto.NodeID nodeID *crypto.NodeID
nodeMask *crypto.NodeID nodeMask *crypto.NodeID
@ -71,7 +70,6 @@ func newConn(core *Core, nodeID *crypto.NodeID, nodeMask *crypto.NodeID, session
nodeID: nodeID, nodeID: nodeID,
nodeMask: nodeMask, nodeMask: nodeMask,
session: session, session: session,
cancel: util.NewCancellation(),
} }
return &conn return &conn
} }
@ -142,10 +140,10 @@ func (c *Conn) doSearch() {
func (c *Conn) getDeadlineCancellation(value *atomic.Value) util.Cancellation { func (c *Conn) getDeadlineCancellation(value *atomic.Value) util.Cancellation {
if deadline, ok := value.Load().(time.Time); ok { if deadline, ok := value.Load().(time.Time); ok {
// A deadline is set, so return a Cancellation that uses it // A deadline is set, so return a Cancellation that uses it
return util.CancellationWithDeadline(c.cancel, deadline) return util.CancellationWithDeadline(c.session.cancel, deadline)
} else { } else {
// No cancellation was set, so return a child cancellation with no timeout // No cancellation was set, so return a child cancellation with no timeout
return util.CancellationChild(c.cancel) return util.CancellationChild(c.session.cancel)
} }
} }
@ -241,10 +239,9 @@ func (c *Conn) Close() (err error) {
defer c.mutex.Unlock() defer c.mutex.Unlock()
if c.session != nil { if c.session != nil {
// Close the session, if it hasn't been closed already // Close the session, if it hasn't been closed already
c.core.router.doAdmin(c.session.close) if e := c.session.cancel.Cancel(errors.New("connection closed")); e != nil {
} err = ConnError{errors.New("close failed, session already closed"), false, false, true, 0}
if e := c.cancel.Cancel(errors.New("connection closed")); e != nil { }
err = ConnError{errors.New("close failed, session already closed"), false, false, true, 0}
} }
return return
} }

View File

@ -69,7 +69,7 @@ func (d *Dialer) DialByNodeIDandMask(nodeID, nodeMask *crypto.NodeID) (*Conn, er
defer t.Stop() defer t.Stop()
select { select {
case <-conn.session.init: case <-conn.session.init:
conn.session.startWorkers(conn.cancel) conn.session.startWorkers()
return conn, nil return conn, nil
case <-t.C: case <-t.C:
conn.Close() conn.Close()

View File

@ -161,8 +161,7 @@ func (r *router) handleTraffic(packet []byte) {
return return
} }
sinfo, isIn := r.core.sessions.getSessionForHandle(&p.Handle) sinfo, isIn := r.core.sessions.getSessionForHandle(&p.Handle)
if !isIn || sinfo.cancel == nil { if !isIn {
// FIXME make sure sinfo.cancel can never be nil
util.PutBytes(p.Payload) util.PutBytes(p.Payload)
return return
} }

View File

@ -208,6 +208,7 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo {
sinfo.pingTime = now sinfo.pingTime = now
sinfo.pingSend = now sinfo.pingSend = now
sinfo.init = make(chan struct{}) sinfo.init = make(chan struct{})
sinfo.cancel = util.NewCancellation()
higher := false higher := false
for idx := range ss.core.boxPub { for idx := range ss.core.boxPub {
if ss.core.boxPub[idx] > sinfo.theirPermPub[idx] { if ss.core.boxPub[idx] > sinfo.theirPermPub[idx] {
@ -232,6 +233,11 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo {
sinfo.send = make(chan []byte, 32) sinfo.send = make(chan []byte, 32)
ss.sinfos[sinfo.myHandle] = &sinfo ss.sinfos[sinfo.myHandle] = &sinfo
ss.byTheirPerm[sinfo.theirPermPub] = &sinfo.myHandle ss.byTheirPerm[sinfo.theirPermPub] = &sinfo.myHandle
go func() {
// Run cleanup when the session is canceled
<-sinfo.cancel.Finished()
sinfo.core.router.doAdmin(sinfo.close)
}()
return &sinfo return &sinfo
} }
@ -362,7 +368,7 @@ func (ss *sessions) handlePing(ping *sessionPing) {
for i := range conn.nodeMask { for i := range conn.nodeMask {
conn.nodeMask[i] = 0xFF conn.nodeMask[i] = 0xFF
} }
conn.session.startWorkers(conn.cancel) conn.session.startWorkers()
ss.listener.conn <- conn ss.listener.conn <- conn
} }
ss.listenerMutex.Unlock() ss.listenerMutex.Unlock()
@ -431,8 +437,7 @@ func (ss *sessions) reset() {
//////////////////////////// Worker Functions Below //////////////////////////// //////////////////////////// Worker Functions Below ////////////////////////////
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
func (sinfo *sessionInfo) startWorkers(cancel util.Cancellation) { func (sinfo *sessionInfo) startWorkers() {
sinfo.cancel = cancel
go sinfo.recvWorker() go sinfo.recvWorker()
go sinfo.sendWorker() go sinfo.sendWorker()
} }