diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 8c38c06..c94cf26 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -110,9 +110,9 @@ type peer struct { endpoint string firstSeen time.Time // To track uptime for getPeers linkOut (chan []byte) // used for protocol traffic (to bypass queues) - doSend (chan struct{}) // tell the linkLoop to send a switchMsg - dinfo (chan *dhtInfo) // used to keep the DHT working + dinfo *dhtInfo // used to keep the DHT working out func([][]byte) // Set up by whatever created the peers struct, used to send packets to other nodes + done (chan struct{}) // closed to exit the linkLoop close func() // Called when a peer is removed, to close the underlying connection, or via admin api } @@ -124,8 +124,7 @@ func (ps *peers) newPeer(box *crypto.BoxPubKey, sig *crypto.SigPubKey, linkShare shared: *crypto.GetSharedKey(&ps.core.boxPriv, box), linkShared: *linkShared, firstSeen: now, - doSend: make(chan struct{}, 1), - dinfo: make(chan *dhtInfo, 1), + done: make(chan struct{}), close: closer, core: ps.core, intf: intf, @@ -170,29 +169,19 @@ func (ps *peers) removePeer(port switchPort) { if p.close != nil { p.close() } - close(p.doSend) + close(p.done) } } // If called, sends a notification to each peer that they should send a new switch message. // Mainly called by the switch after an update. -func (ps *peers) sendSwitchMsgs() { +func (ps *peers) sendSwitchMsgs(from phony.IActor) { ports := ps.getPorts() for _, p := range ports { if p.port == 0 { continue } - p.doSendSwitchMsgs() - } -} - -// If called, sends a notification to the peer's linkLoop to trigger a switchMsg send. -// Mainly called by sendSwitchMsgs or during linkLoop startup. -func (p *peer) doSendSwitchMsgs() { - defer func() { recover() }() // In case there's a race with close(p.doSend) - select { - case p.doSend <- struct{}{}: - default: + p.EnqueueFrom(from, p._sendSwitchMsg) } } @@ -201,24 +190,23 @@ func (p *peer) doSendSwitchMsgs() { func (p *peer) linkLoop() { tick := time.NewTicker(time.Second) defer tick.Stop() - p.doSendSwitchMsgs() - var dinfo *dhtInfo + <-p.SyncExec(p._sendSwitchMsg) // Startup message for { select { - case _, ok := <-p.doSend: - if !ok { - return - } - <-p.SyncExec(p._sendSwitchMsg) - case dinfo = <-p.dinfo: + case <-p.done: + return case _ = <-tick.C: - if dinfo != nil { - <-p.SyncExec(func() { p.core.router.insertPeer(p, dinfo) }) - } + <-p.SyncExec(p._updateDHT) } } } +func (p *peer) _updateDHT() { + if p.dinfo != nil { + p.core.router.insertPeer(p, p.dinfo) + } +} + func (p *peer) handlePacketFrom(from phony.IActor, packet []byte) { p.EnqueueFrom(from, func() { p._handlePacket(packet) @@ -366,16 +354,16 @@ func (p *peer) _handleSwitchMsg(packet []byte) { p.core.switchTable.handleMsg(&msg, p.port) if !p.core.switchTable.checkRoot(&msg) { // Bad switch message - p.dinfo <- nil + p.dinfo = nil return } // Pass a mesage to the dht informing it that this peer (still) exists loc.coords = loc.coords[:len(loc.coords)-1] - dinfo := dhtInfo{ + p.dinfo = &dhtInfo{ key: p.box, coords: loc.getCoords(), } - p.dinfo <- &dinfo + p._updateDHT() } // This generates the bytes that we sign or check the signature of for a switchMsg. diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index d87b788..0be0124 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -248,7 +248,7 @@ func (t *switchTable) cleanRoot() { t.core.router.reset(nil) } t.data.locator = switchLocator{root: t.key, tstamp: now.Unix()} - t.core.peers.sendSwitchMsgs() + t.core.peers.sendSwitchMsgs(nil) // TODO update if/when the switch becomes an actor } } @@ -515,7 +515,7 @@ func (t *switchTable) unlockedHandleMsg(msg *switchMsg, fromPort switchPort, rep } t.data.locator = sender.locator t.parent = sender.port - t.core.peers.sendSwitchMsgs() + t.core.peers.sendSwitchMsgs(nil) // TODO update if/when the switch becomes an actor } if doUpdate { t.updater.Store(&sync.Once{})