From dc128121e57bd659ece83217b58a50d74a330ae0 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 16 May 2020 09:25:57 -0500 Subject: [PATCH] update switch blockPeer/unblockPeer logic and dht reset when coords change --- src/yggdrasil/dht.go | 13 +++++++------ src/yggdrasil/link.go | 1 + src/yggdrasil/peer.go | 27 +++++++++++++-------------- src/yggdrasil/switch.go | 20 +++++++++++++++++--- 4 files changed, 38 insertions(+), 23 deletions(-) diff --git a/src/yggdrasil/dht.go b/src/yggdrasil/dht.go index 56d03ed..f40ac3c 100644 --- a/src/yggdrasil/dht.go +++ b/src/yggdrasil/dht.go @@ -89,6 +89,11 @@ func (t *dht) reconfigure() { // Resets the DHT in response to coord changes. // This empties all info from the DHT and drops outstanding requests. func (t *dht) reset() { + for _, info := range t.table { + if t.isImportant(info) { + t.ping(info, nil) + } + } t.reqs = make(map[dhtReqKey]time.Time) t.table = make(map[crypto.NodeID]*dhtInfo) t.imp = nil @@ -144,12 +149,8 @@ func (t *dht) insert(info *dhtInfo) { // Insert a peer into the table if it hasn't been pinged lately, to keep peers from dropping func (t *dht) insertPeer(info *dhtInfo) { - oldInfo, isIn := t.table[*info.getNodeID()] - if !isIn || time.Since(oldInfo.recv) > dht_max_delay+30*time.Second { - // TODO? also check coords? - newInfo := *info // Insert a copy - t.insert(&newInfo) - } + t.insert(info) // FIXME this resets timers / ping counts / etc, so it seems kind of dangerous + t.ping(info, nil) // This is a quick fix to the above, ping them immediately... } // Return true if first/second/third are (partially) ordered correctly. diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index 7f6b9b5..539d048 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -376,6 +376,7 @@ func (intf *linkInterface) notifyRead(size int) { if size > 0 && intf.stallTimer == nil { intf.stallTimer = time.AfterFunc(keepAliveTime, intf.notifyDoKeepAlive) } + intf.link.core.switchTable.unblockPeer(intf, intf.peer.port) }) } diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 801691a..31bba66 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -194,21 +194,20 @@ func (ps *peers) sendSwitchMsgs(from phony.Actor) { }) } -// This must be launched in a separate goroutine by whatever sets up the peer struct. -// It handles link protocol traffic. -func (p *peer) start() { - var updateDHT func() - updateDHT = func() { - phony.Block(p, func() { - select { - case <-p.done: - default: - p._updateDHT() - time.AfterFunc(time.Second, updateDHT) +func (ps *peers) updateDHT(from phony.Actor) { + ps.Act(from, func() { + for _, peer := range ps.ports { + p := peer + if p.port == 0 { + continue } - }) - } - updateDHT() + p.Act(ps, p._updateDHT) + } + }) +} + +// This must be launched in a separate goroutine by whatever sets up the peer struct. +func (p *peer) start() { // Just for good measure, immediately send a switch message to this peer when we start p.Act(nil, p._sendSwitchMsg) } diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index 4f9044c..6ab9a02 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -239,11 +239,12 @@ func (t *switchTable) _cleanRoot() { func (t *switchTable) blockPeer(from phony.Actor, port switchPort) { t.Act(from, func() { peer, isIn := t.data.peers[port] - if !isIn { + if !isIn || peer.blocked { return } peer.blocked = true t.data.peers[port] = peer + t._updateTable() if port != t.parent { return } @@ -258,6 +259,18 @@ func (t *switchTable) blockPeer(from phony.Actor, port switchPort) { }) } +func (t *switchTable) unblockPeer(from phony.Actor, port switchPort) { + t.Act(from, func() { + peer, isIn := t.data.peers[port] + if !isIn || !peer.blocked { + return + } + peer.blocked = false + t.data.peers[port] = peer + t._updateTable() + }) +} + // Removes a peer. // Must be called by the router actor with a lambda that calls this. // If the removed peer was this node's parent, it immediately tries to find a new parent. @@ -482,11 +495,12 @@ func (t *switchTable) _handleMsg(msg *switchMsg, fromPort switchPort, reprocessi // The timestamp was updated, so we need to update locally and send to our peers. updateRoot = true } + // Note that we depend on the LIFO order of the stack of defers here... if updateRoot { if !equiv(&sender.locator, &t.data.locator) { doUpdate = true t.data.seq++ - t.core.router.reset(nil) + defer t.core.router.reset(t) } if t.data.locator.tstamp != sender.locator.tstamp { t.time = now @@ -495,7 +509,7 @@ func (t *switchTable) _handleMsg(msg *switchMsg, fromPort switchPort, reprocessi t.parent = sender.port defer t.core.peers.sendSwitchMsgs(t) } - if true || doUpdate { + if doUpdate { defer t._updateTable() } return