From 436c84ca3311cf9c57eb8eec6901a0edd408645f Mon Sep 17 00:00:00 2001 From: Arceliar Date: Fri, 23 Aug 2019 20:53:00 -0500 Subject: [PATCH] refactor sessions to store a pointer to router instead of core --- src/yggdrasil/conn.go | 2 +- src/yggdrasil/router.go | 2 +- src/yggdrasil/session.go | 53 +++++++++++++++++++++------------------- 3 files changed, 30 insertions(+), 27 deletions(-) diff --git a/src/yggdrasil/conn.go b/src/yggdrasil/conn.go index b4ce71f..4ba0b2a 100644 --- a/src/yggdrasil/conn.go +++ b/src/yggdrasil/conn.go @@ -252,7 +252,7 @@ func (c *Conn) Close() (err error) { } func (c *Conn) LocalAddr() crypto.NodeID { - return *crypto.GetNodeID(&c.session.core.boxPub) + return *crypto.GetNodeID(&c.core.boxPub) } func (c *Conn) RemoteAddr() crypto.NodeID { diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index ad9bc15..25f8e80 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -75,7 +75,7 @@ func (r *router) init(core *Core) { r.core.config.Mutex.RUnlock() r.dht.init(r) r.searches.init(r) - r.sessions.init(r.core) + r.sessions.init(r) } // Starts the mainLoop goroutine. diff --git a/src/yggdrasil/session.go b/src/yggdrasil/session.go index 02d4dc8..13f6442 100644 --- a/src/yggdrasil/session.go +++ b/src/yggdrasil/session.go @@ -40,7 +40,7 @@ func (h nonceHeap) peek() *crypto.BoxNonce { return &h[len(h)-1] } // This includes coords, permanent and ephemeral keys, handles and nonces, various sorts of timing information for timeout and maintenance, and some metadata for the admin API. type sessionInfo struct { phony.Actor // Protects all of the below, use it any time you read/change the contents of a session - core *Core // + sessions *sessions // reconfigure chan chan error // theirAddr address.Address // theirSubnet address.Subnet // @@ -136,7 +136,7 @@ func (s *sessionInfo) _update(p *sessionPing) bool { // Sessions are indexed by handle. // Additionally, stores maps of address/subnet onto keys, and keys onto handles. type sessions struct { - core *Core + router *router listener *Listener listenerMutex sync.Mutex reconfigure chan chan error @@ -149,8 +149,8 @@ type sessions struct { } // Initializes the session struct. -func (ss *sessions) init(core *Core) { - ss.core = core +func (ss *sessions) init(r *router) { + ss.router = r ss.reconfigure = make(chan chan error, 1) go func() { for { @@ -213,18 +213,18 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo { return nil } sinfo := sessionInfo{} - sinfo.core = ss.core + sinfo.sessions = ss sinfo.reconfigure = make(chan chan error, 1) sinfo.theirPermPub = *theirPermKey - sinfo.sharedPermKey = *ss.getSharedKey(&ss.core.boxPriv, &sinfo.theirPermPub) + sinfo.sharedPermKey = *ss.getSharedKey(&ss.router.core.boxPriv, &sinfo.theirPermPub) pub, priv := crypto.NewBoxKeys() sinfo.mySesPub = *pub sinfo.mySesPriv = *priv sinfo.myNonce = *crypto.NewBoxNonce() sinfo.theirMTU = 1280 - ss.core.config.Mutex.RLock() - sinfo.myMTU = uint16(ss.core.config.Current.IfMTU) - ss.core.config.Mutex.RUnlock() + ss.router.core.config.Mutex.RLock() + sinfo.myMTU = uint16(ss.router.core.config.Current.IfMTU) + ss.router.core.config.Mutex.RUnlock() now := time.Now() sinfo.timeOpened = now sinfo.time = now @@ -234,11 +234,11 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo { sinfo.init = make(chan struct{}) sinfo.cancel = util.NewCancellation() higher := false - for idx := range ss.core.boxPub { - if ss.core.boxPub[idx] > sinfo.theirPermPub[idx] { + for idx := range ss.router.core.boxPub { + if ss.router.core.boxPub[idx] > sinfo.theirPermPub[idx] { higher = true break - } else if ss.core.boxPub[idx] < sinfo.theirPermPub[idx] { + } else if ss.router.core.boxPub[idx] < sinfo.theirPermPub[idx] { break } } @@ -260,8 +260,8 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo { go func() { // Run cleanup when the session is canceled <-sinfo.cancel.Finished() - sinfo.core.router.doAdmin(func() { - sinfo.core.router.sessions.removeSession(&sinfo) + sinfo.sessions.router.doAdmin(func() { + sinfo.sessions.removeSession(&sinfo) }) }() go sinfo.startWorkers() @@ -298,18 +298,18 @@ func (ss *sessions) cleanup() { // Closes a session, removing it from sessions maps. func (ss *sessions) removeSession(sinfo *sessionInfo) { - if s := sinfo.core.router.sessions.sinfos[sinfo.myHandle]; s == sinfo { - delete(sinfo.core.router.sessions.sinfos, sinfo.myHandle) - delete(sinfo.core.router.sessions.byTheirPerm, sinfo.theirPermPub) + if s := sinfo.sessions.sinfos[sinfo.myHandle]; s == sinfo { + delete(sinfo.sessions.sinfos, sinfo.myHandle) + delete(sinfo.sessions.byTheirPerm, sinfo.theirPermPub) } } // Returns a session ping appropriate for the given session info. func (sinfo *sessionInfo) _getPing() sessionPing { - loc := sinfo.core.switchTable.getLocator() + loc := sinfo.sessions.router.core.switchTable.getLocator() coords := loc.getCoords() ping := sessionPing{ - SendPermPub: sinfo.core.boxPub, + SendPermPub: sinfo.sessions.router.core.boxPub, Handle: sinfo.myHandle, SendSesPub: sinfo.mySesPub, Tstamp: time.Now().Unix(), @@ -360,13 +360,13 @@ func (sinfo *sessionInfo) _sendPingPong(isPong bool) { p := wire_protoTrafficPacket{ Coords: sinfo.coords, ToKey: sinfo.theirPermPub, - FromKey: sinfo.core.boxPub, + FromKey: sinfo.sessions.router.core.boxPub, Nonce: *nonce, Payload: payload, } packet := p.encode() // TODO rewrite the below if/when the peer struct becomes an actor, to not go through the router first - sinfo.core.router.EnqueueFrom(sinfo, func() { sinfo.core.router.out(packet) }) + sinfo.sessions.router.EnqueueFrom(sinfo, func() { sinfo.sessions.router.out(packet) }) if sinfo.pingTime.Before(sinfo.time) { sinfo.pingTime = time.Now() } @@ -390,7 +390,7 @@ func (ss *sessions) handlePing(ping *sessionPing) { if s, _ := ss.getByTheirPerm(&ping.SendPermPub); s != sinfo { panic("This should not happen") } - conn := newConn(ss.core, crypto.GetNodeID(&sinfo.theirPermPub), &crypto.NodeID{}, sinfo) + conn := newConn(ss.router.core, crypto.GetNodeID(&sinfo.theirPermPub), &crypto.NodeID{}, sinfo) for i := range conn.nodeMask { conn.nodeMask[i] = 0xFF } @@ -400,7 +400,7 @@ func (ss *sessions) handlePing(ping *sessionPing) { ss.listenerMutex.Unlock() } if sinfo != nil { - sinfo.EnqueueFrom(&ss.core.router, func() { + sinfo.EnqueueFrom(ss.router, func() { // Update the session if !sinfo._update(ping) { /*panic("Should not happen in testing")*/ return @@ -468,7 +468,7 @@ func (sinfo *sessionInfo) _updateNonce(theirNonce *crypto.BoxNonce) { // Called after coord changes, so attemtps to use a session will trigger a new ping and notify the remote end of the coord change. func (ss *sessions) reset() { for _, sinfo := range ss.sinfos { - sinfo.EnqueueFrom(&ss.core.router, func() { + sinfo.EnqueueFrom(ss.router, func() { sinfo.reset = true }) } @@ -639,7 +639,10 @@ func (sinfo *sessionInfo) sendWorker() { util.PutBytes(msg.Message) util.PutBytes(p.Payload) // Send the packet - sinfo.core.router.out(packet) + // TODO replace this with a send to the peer struct if that becomes an actor + sinfo.sessions.router.EnqueueFrom(sinfo, func() { + sinfo.sessions.router.out(packet) + }) } ch <- callback }