From 3dab94be9f4cc500652fb0f9f97914da7be21794 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Thu, 7 Jun 2018 10:58:24 -0500 Subject: [PATCH] keep dht peers alive --- src/yggdrasil/dht.go | 2 +- src/yggdrasil/peer.go | 33 +++++++++++++++++---------------- src/yggdrasil/router.go | 1 + src/yggdrasil/switch.go | 3 ++- 4 files changed, 21 insertions(+), 18 deletions(-) diff --git a/src/yggdrasil/dht.go b/src/yggdrasil/dht.go index e59017a..3c9f61c 100644 --- a/src/yggdrasil/dht.go +++ b/src/yggdrasil/dht.go @@ -81,7 +81,7 @@ type dht struct { func (t *dht) init(c *Core) { t.core = c t.nodeID = *t.core.GetNodeID() - t.peers = make(chan *dhtInfo, 1) + t.peers = make(chan *dhtInfo, 1024) t.reqs = make(map[boxPubKey]map[NodeID]time.Time) } diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index ce5cd0c..00f8a9c 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -107,8 +107,8 @@ type peer struct { lastAnc time.Time // TODO? rename and use this // used for protocol traffic (to bypass queues) linkOut (chan []byte) - lastMsg []byte // last switchMsg accepted doSend (chan struct{}) // tell the linkLoop to send a switchMsg + dinfo *dhtInfo // used to keep the DHT working } const peer_Throttle = 1 @@ -186,21 +186,22 @@ func (ps *peers) sendSwitchMsgs() { } } -func (ps *peers) fixSwitchAfterPeerDisconnect() { - // TODO something better, this is very wasteful - ports := ps.getPorts() - for _, p := range ports { - if p.lastMsg == nil { - continue - } - p.handleSwitchMsg(p.lastMsg) - } -} - func (p *peer) linkLoop() { go func() { p.doSend <- struct{}{} }() - for range p.doSend { - p.sendSwitchMsg() + tick := time.NewTicker(time.Second) + defer tick.Stop() + for { + select { + case _, ok := <-p.doSend: + if !ok { + return + } + p.sendSwitchMsg() + case _ = <-tick.C: + if p.dinfo != nil { + p.core.dht.peers <- p.dinfo + } + } } } @@ -224,7 +225,7 @@ func (p *peer) handlePacket(packet []byte) { } func (p *peer) handleTraffic(packet []byte, pTypeLen int) { - if p.port != 0 && p.lastMsg == nil { + if p.port != 0 && p.dinfo == nil { // Drop traffic until the peer manages to send us at least one good switchMsg return } @@ -356,7 +357,7 @@ func (p *peer) handleSwitchMsg(packet []byte) { coords: l.getCoords(), } p.core.dht.peers <- &dinfo - p.lastMsg = packet + p.dinfo = &dinfo } func getBytesForSig(next *sigPubKey, loc *switchLocator) []byte { diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index 872a473..3246f63 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -93,6 +93,7 @@ func (r *router) mainLoop() { // Any periodic maintenance stuff goes here r.core.switchTable.doMaintenance() r.core.dht.doMaintenance() + //r.core.peers.fixSwitchAfterPeerDisconnect() // FIXME makes sure dht peers get added quickly util_getBytes() // To slowly drain things } case f := <-r.admin: diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index a9a0fcd..fec47af 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -230,7 +230,7 @@ func (t *switchTable) cleanRoot() { func (t *switchTable) removePeer(port switchPort) { delete(t.data.peers, port) t.updater.Store(&sync.Once{}) - t.core.peers.fixSwitchAfterPeerDisconnect() + // TODO if parent, find a new peer to use as parent instead } func (t *switchTable) cleanDropped() { @@ -287,6 +287,7 @@ func (t *switchTable) handleMessage(msg *switchMessage, fromPort switchPort, sig doUpdate := false if !equiv(&msg.locator, &oldSender.locator) { doUpdate = true + //sender.firstSeen = now // TODO? uncomment to prevent flapping? } t.data.peers[fromPort] = sender updateRoot := false