From 9834f222db65efab838a1a2403b8e039109742f2 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sun, 29 Mar 2020 19:01:50 -0500 Subject: [PATCH] more work in progress actorizing the remaining parts of the switch --- src/yggdrasil/api.go | 7 ++- src/yggdrasil/dht.go | 12 ++-- src/yggdrasil/link.go | 4 +- src/yggdrasil/nodeinfo.go | 3 +- src/yggdrasil/peer.go | 37 +++++++----- src/yggdrasil/router.go | 18 +++++- src/yggdrasil/search.go | 3 +- src/yggdrasil/session.go | 5 +- src/yggdrasil/switch.go | 123 +++++++++++++++++--------------------- 9 files changed, 111 insertions(+), 101 deletions(-) diff --git a/src/yggdrasil/api.go b/src/yggdrasil/api.go index 15e2acd..a722dc5 100644 --- a/src/yggdrasil/api.go +++ b/src/yggdrasil/api.go @@ -330,8 +330,11 @@ func (c *Core) EncryptionPublicKey() string { // connected to any other nodes (effectively making you the root of a // single-node network). func (c *Core) Coords() []uint64 { - loc := c.switchTable.getLocator() - return wire_coordsBytestoUint64s(loc.getCoords()) + var coords []byte + phony.Block(&c.router, func() { + coords = c.router.table.self.getCoords() + }) + return wire_coordsBytestoUint64s(coords) } // Address gets the IPv6 address of the Yggdrasil node. This is always a /128 diff --git a/src/yggdrasil/dht.go b/src/yggdrasil/dht.go index 8efc549..56d03ed 100644 --- a/src/yggdrasil/dht.go +++ b/src/yggdrasil/dht.go @@ -186,11 +186,9 @@ func dht_ordered(first, second, third *crypto.NodeID) bool { // Update info about the node that sent the request. func (t *dht) handleReq(req *dhtReq) { // Send them what they asked for - loc := t.router.core.switchTable.getLocator() - coords := loc.getCoords() res := dhtRes{ Key: t.router.core.boxPub, - Coords: coords, + Coords: t.router.table.self.getCoords(), Dest: req.Dest, Infos: t.lookup(&req.Dest, false), } @@ -300,11 +298,9 @@ func (t *dht) ping(info *dhtInfo, target *crypto.NodeID) { if target == nil { target = &t.nodeID } - loc := t.router.core.switchTable.getLocator() - coords := loc.getCoords() req := dhtReq{ Key: t.router.core.boxPub, - Coords: coords, + Coords: t.router.table.self.getCoords(), Dest: *target, } t.sendReq(&req, info) @@ -378,7 +374,7 @@ func (t *dht) getImportant() []*dhtInfo { }) // Keep the ones that are no further than the closest seen so far minDist := ^uint64(0) - loc := t.router.core.switchTable.getLocator() + loc := t.router.table.self important := infos[:0] for _, info := range infos { dist := uint64(loc.dist(info.coords)) @@ -416,7 +412,7 @@ func (t *dht) isImportant(ninfo *dhtInfo) bool { } important := t.getImportant() // Check if ninfo is of equal or greater importance to what we already know - loc := t.router.core.switchTable.getLocator() + loc := t.router.table.self ndist := uint64(loc.dist(ninfo.coords)) minDist := ^uint64(0) for _, info := range important { diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index fa6563f..978e8ea 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -300,7 +300,7 @@ func (intf *linkInterface) notifyBlockedSend() { intf.Act(nil, func() { if intf.sendTimer != nil { //As far as we know, we're still trying to send, and the timer fired. - intf.link.core.switchTable.blockPeer(intf.peer.port) + intf.link.core.switchTable.blockPeer(intf, intf.peer.port) } }) } @@ -340,7 +340,7 @@ func (intf *linkInterface) notifyStalled() { intf.stallTimer.Stop() intf.stallTimer = nil intf.stalled = true - intf.link.core.switchTable.blockPeer(intf.peer.port) + intf.link.core.switchTable.blockPeer(intf, intf.peer.port) } }) } diff --git a/src/yggdrasil/nodeinfo.go b/src/yggdrasil/nodeinfo.go index 745756f..b179d20 100644 --- a/src/yggdrasil/nodeinfo.go +++ b/src/yggdrasil/nodeinfo.go @@ -18,6 +18,7 @@ type nodeinfo struct { myNodeInfo NodeInfoPayload callbacks map[crypto.BoxPubKey]nodeinfoCallback cache map[crypto.BoxPubKey]nodeinfoCached + table *lookupTable } type nodeinfoCached struct { @@ -187,7 +188,7 @@ func (m *nodeinfo) sendNodeInfo(key crypto.BoxPubKey, coords []byte, isResponse } func (m *nodeinfo) _sendNodeInfo(key crypto.BoxPubKey, coords []byte, isResponse bool) { - loc := m.core.switchTable.getLocator() + loc := m.table.self nodeinfo := nodeinfoReqRes{ SendCoords: loc.getCoords(), IsResponse: isResponse, diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 7fa2b31..9acb932 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -162,7 +162,7 @@ func (ps *peers) _removePeer(p *peer) { if q := ps.ports[p.port]; p.port == 0 || q != p { return } // Can't remove self peer or nonexistant peer - ps.core.switchTable.forgetPeer(p.port) + ps.core.switchTable.forgetPeer(ps, p.port) oldPorts := ps.ports newPorts := make(map[switchPort]*peer) for k, v := range oldPorts { @@ -328,7 +328,7 @@ func (p *peer) _handleLinkTraffic(bs []byte) { // Gets a switchMsg from the switch, adds signed next-hop info for this peer, and sends it to them. func (p *peer) _sendSwitchMsg() { - msg := p.core.switchTable.getMsg() + msg := p.table.getMsg() if msg == nil { return } @@ -367,19 +367,26 @@ func (p *peer) _handleSwitchMsg(packet []byte) { } prevKey = hop.Next } - p.core.switchTable.handleMsg(&msg, p.port) - if !p.core.switchTable.checkRoot(&msg) { - // Bad switch message - p.dinfo = nil - return - } - // Pass a message to the dht informing it that this peer (still) exists - loc.coords = loc.coords[:len(loc.coords)-1] - p.dinfo = &dhtInfo{ - key: p.box, - coords: loc.getCoords(), - } - p._updateDHT() + p.core.switchTable.Act(p, func() { + if !p.core.switchTable._checkRoot(&msg) { + // Bad switch message + p.Act(&p.core.switchTable, func() { + p.dinfo = nil + }) + } else { + // handle the message + p.core.switchTable._handleMsg(&msg, p.port, false) + p.Act(&p.core.switchTable, func() { + // Pass a message to the dht informing it that this peer (still) exists + loc.coords = loc.coords[:len(loc.coords)-1] + p.dinfo = &dhtInfo{ + key: p.box, + coords: loc.getCoords(), + } + p._updateDHT() + }) + } + }) } // This generates the bytes that we sign or check the signature of for a switchMsg. diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index ac4d655..40b8303 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -46,6 +46,7 @@ type router struct { nodeinfo nodeinfo searches searches sessions sessions + table *lookupTable // has a copy of our locator } // Initializes the router struct, which includes setting up channels to/from the adapter. @@ -77,6 +78,21 @@ func (r *router) init(core *Core) { r.sessions.init(r) } +func (r *router) updateTable(from phony.Actor, table *lookupTable) { + r.Act(from, func() { + r.table = table + r.nodeinfo.Act(r, func() { + r.nodeinfo.table = table + }) + for _, ses := range r.sessions.sinfos { + sinfo := ses + sinfo.Act(r, func() { + sinfo.table = table + }) + } + }) +} + // Reconfigures the router and any child modules. This should only ever be run // by the router actor. func (r *router) reconfigure() { @@ -130,7 +146,7 @@ func (r *router) reset(from phony.Actor) { func (r *router) doMaintenance() { phony.Block(r, func() { // Any periodic maintenance stuff goes here - r.core.switchTable.doMaintenance() + r.core.switchTable.doMaintenance(r) r.dht.doMaintenance() r.sessions.cleanup() }) diff --git a/src/yggdrasil/search.go b/src/yggdrasil/search.go index 91f0490..febde3d 100644 --- a/src/yggdrasil/search.go +++ b/src/yggdrasil/search.go @@ -161,11 +161,10 @@ func (sinfo *searchInfo) continueSearch(infos []*dhtInfo) { // Initially start a search func (sinfo *searchInfo) startSearch() { - loc := sinfo.searches.router.core.switchTable.getLocator() var infos []*dhtInfo infos = append(infos, &dhtInfo{ key: sinfo.searches.router.core.boxPub, - coords: loc.getCoords(), + coords: sinfo.searches.router.table.self.getCoords(), }) // Start the search by asking ourself, useful if we're the destination sinfo.continueSearch(infos) diff --git a/src/yggdrasil/session.go b/src/yggdrasil/session.go index eaa67fd..01c2cdf 100644 --- a/src/yggdrasil/session.go +++ b/src/yggdrasil/session.go @@ -52,6 +52,7 @@ type sessionInfo struct { cancel util.Cancellation // Used to terminate workers conn *Conn // The associated Conn object callbacks []chan func() // Finished work from crypto workers + table *lookupTable // table.self is a locator where we get our coords } // Represents a session ping/pong packet, and includes information like public keys, a session handle, coords, a timestamp to prevent replays, and the tun/tap MTU. @@ -217,6 +218,7 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo { sinfo.myHandle = *crypto.NewHandle() sinfo.theirAddr = *address.AddrForNodeID(crypto.GetNodeID(&sinfo.theirPermPub)) sinfo.theirSubnet = *address.SubnetForNodeID(crypto.GetNodeID(&sinfo.theirPermPub)) + sinfo.table = ss.router.table ss.sinfos[sinfo.myHandle] = &sinfo ss.byTheirPerm[sinfo.theirPermPub] = &sinfo.myHandle return &sinfo @@ -266,8 +268,7 @@ func (ss *sessions) removeSession(sinfo *sessionInfo) { // Returns a session ping appropriate for the given session info. func (sinfo *sessionInfo) _getPing() sessionPing { - loc := sinfo.sessions.router.core.switchTable.getLocator() - coords := loc.getCoords() + coords := sinfo.table.self.getCoords() ping := sessionPing{ SendPermPub: sinfo.sessions.router.core.boxPub, Handle: sinfo.myHandle, diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index ab2e119..2661b46 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -12,12 +12,9 @@ package yggdrasil // A little annoying to do with constant changes from backpressure import ( - //"math/rand" - "sync" "time" "github.com/yggdrasil-network/yggdrasil-go/src/crypto" - //"github.com/yggdrasil-network/yggdrasil-go/src/util" "github.com/Arceliar/phony" ) @@ -149,6 +146,7 @@ type tableElem struct { type lookupTable struct { self switchLocator elems map[switchPort]tableElem + _msg switchMsg } // This is switch information which is mutable and needs to be modified by other goroutines, but is not accessed atomically. @@ -168,7 +166,6 @@ type switchTable struct { key crypto.SigPubKey // Our own key time time.Time // Time when locator.tstamp was last updated drop map[crypto.SigPubKey]int64 // Tstamp associated with a dropped root - mutex sync.RWMutex // Lock for reads/writes of switchData parent switchPort // Port of whatever peer is our parent, or self if we're root data switchData // phony.Inbox // Owns the below @@ -208,24 +205,17 @@ func (t *switchTable) reconfigure() { t.core.peers.reconfigure() } -// Safely gets a copy of this node's locator. -func (t *switchTable) getLocator() switchLocator { - t.mutex.RLock() - defer t.mutex.RUnlock() - return t.data.locator.clone() -} - // Regular maintenance to possibly timeout/reset the root and similar. -func (t *switchTable) doMaintenance() { - // Periodic maintenance work to keep things internally consistent - t.mutex.Lock() // Write lock - defer t.mutex.Unlock() // Release lock when we're done - t.cleanRoot() - t.cleanDropped() +func (t *switchTable) doMaintenance(from phony.Actor) { + t.Act(from, func() { + // Periodic maintenance work to keep things internally consistent + t._cleanRoot() + t._cleanDropped() + }) } // Updates the root periodically if it is ourself, or promotes ourself to root if we're better than the current root or if the current root has timed out. -func (t *switchTable) cleanRoot() { +func (t *switchTable) _cleanRoot() { // TODO rethink how this is done?... // Get rid of the root if it looks like its timed out now := time.Now() @@ -259,49 +249,49 @@ func (t *switchTable) cleanRoot() { } // Blocks and, if possible, unparents a peer -func (t *switchTable) blockPeer(port switchPort) { - t.mutex.Lock() - defer t.mutex.Unlock() - peer, isIn := t.data.peers[port] - if !isIn { - return - } - peer.blocked = true - t.data.peers[port] = peer - if port != t.parent { - return - } - t.parent = 0 - for _, info := range t.data.peers { - if info.port == port { - continue +func (t *switchTable) blockPeer(from phony.Actor, port switchPort) { + t.Act(from, func() { + peer, isIn := t.data.peers[port] + if !isIn { + return } - t.unlockedHandleMsg(&info.msg, info.port, true) - } - t.unlockedHandleMsg(&peer.msg, peer.port, true) + peer.blocked = true + t.data.peers[port] = peer + if port != t.parent { + return + } + t.parent = 0 + for _, info := range t.data.peers { + if info.port == port { + continue + } + t._handleMsg(&info.msg, info.port, true) + } + t._handleMsg(&peer.msg, peer.port, true) + }) } // Removes a peer. // Must be called by the router actor with a lambda that calls this. // If the removed peer was this node's parent, it immediately tries to find a new parent. -func (t *switchTable) forgetPeer(port switchPort) { - t.mutex.Lock() - defer t.mutex.Unlock() - delete(t.data.peers, port) - defer t._updateTable() - if port != t.parent { - return - } - t.parent = 0 - for _, info := range t.data.peers { - t.unlockedHandleMsg(&info.msg, info.port, true) - } +func (t *switchTable) forgetPeer(from phony.Actor, port switchPort) { + t.Act(from, func() { + delete(t.data.peers, port) + defer t._updateTable() + if port != t.parent { + return + } + t.parent = 0 + for _, info := range t.data.peers { + t._handleMsg(&info.msg, info.port, true) + } + }) } // Dropped is a list of roots that are better than the current root, but stopped sending new timestamps. // If we switch to a new root, and that root is better than an old root that previously timed out, then we can clean up the old dropped root infos. // This function is called periodically to do that cleanup. -func (t *switchTable) cleanDropped() { +func (t *switchTable) _cleanDropped() { // TODO? only call this after root changes, not periodically for root := range t.drop { if !firstIsBetter(&root, &t.data.locator.root) { @@ -327,9 +317,7 @@ type switchMsgHop struct { } // This returns a *switchMsg to a copy of this node's current switchMsg, which can safely have additional information appended to Hops and sent to a peer. -func (t *switchTable) getMsg() *switchMsg { - t.mutex.RLock() - defer t.mutex.RUnlock() +func (t *switchTable) _getMsg() *switchMsg { if t.parent == 0 { return &switchMsg{Root: t.key, TStamp: t.data.locator.tstamp} } else if parent, isIn := t.data.peers[t.parent]; isIn { @@ -341,14 +329,18 @@ func (t *switchTable) getMsg() *switchMsg { } } +func (t *lookupTable) getMsg() *switchMsg { + msg := t._msg + msg.Hops = append([]switchMsgHop(nil), t._msg.Hops...) + return &msg +} + // This function checks that the root information in a switchMsg is OK. // In particular, that the root is better, or else the same as the current root but with a good timestamp, and that this root+timestamp haven't been dropped due to timeout. -func (t *switchTable) checkRoot(msg *switchMsg) bool { +func (t *switchTable) _checkRoot(msg *switchMsg) bool { // returns false if it's a dropped root, not a better root, or has an older timestamp // returns true otherwise // used elsewhere to keep inserting peers into the dht only if root info is OK - t.mutex.RLock() - defer t.mutex.RUnlock() dropTstamp, isIn := t.drop[msg.Root] switch { case isIn && dropTstamp >= msg.TStamp: @@ -364,20 +356,13 @@ func (t *switchTable) checkRoot(msg *switchMsg) bool { } } -// This is a mutexed wrapper to unlockedHandleMsg, and is called by the peer structs in peers.go to pass a switchMsg for that peer into the switch. -func (t *switchTable) handleMsg(msg *switchMsg, fromPort switchPort) { - t.mutex.Lock() - defer t.mutex.Unlock() - t.unlockedHandleMsg(msg, fromPort, false) -} - // This updates the switch with information about a peer. // Then the tricky part, it decides if it should update our own locator as a result. // That happens if this node is already our parent, or is advertising a better root, or is advertising a better path to the same root, etc... // There are a lot of very delicate order sensitive checks here, so its' best to just read the code if you need to understand what it's doing. // It's very important to not change the order of the statements in the case function unless you're absolutely sure that it's safe, including safe if used alongside nodes that used the previous order. // Set the third arg to true if you're reprocessing an old message, e.g. to find a new parent after one disconnects, to avoid updating some timing related things. -func (t *switchTable) unlockedHandleMsg(msg *switchMsg, fromPort switchPort, reprocessing bool) { +func (t *switchTable) _handleMsg(msg *switchMsg, fromPort switchPort, reprocessing bool) { // TODO directly use a switchMsg instead of switchMessage + sigs now := time.Now() // Set up the sender peerInfo @@ -500,10 +485,10 @@ func (t *switchTable) unlockedHandleMsg(msg *switchMsg, fromPort switchPort, rep if peer.port == sender.port { continue } - t.unlockedHandleMsg(&peer.msg, peer.port, true) + t._handleMsg(&peer.msg, peer.port, true) } // Process the sender last, to avoid keeping them as a parent if at all possible. - t.unlockedHandleMsg(&sender.msg, sender.port, true) + t._handleMsg(&sender.msg, sender.port, true) case now.Sub(t.time) < switch_throttle: // We've already gotten an update from this root recently, so ignore this one to avoid flooding. case sender.locator.tstamp > t.data.locator.tstamp: @@ -521,7 +506,7 @@ func (t *switchTable) unlockedHandleMsg(msg *switchMsg, fromPort switchPort, rep } t.data.locator = sender.locator t.parent = sender.port - t.core.peers.sendSwitchMsgs(t) + defer t.core.peers.sendSwitchMsgs(t) } if true || doUpdate { defer t._updateTable() @@ -560,7 +545,9 @@ func (t *switchTable) _updateTable() { time: pinfo.time, } } - t.core.peers.updateTables(nil, &newTable) // TODO not be from nil + newTable._msg = *t._getMsg() + t.core.peers.updateTables(t, &newTable) + t.core.router.updateTable(t, &newTable) } // Starts the switch worker