diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 15371e3..0efcbe3 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -108,6 +108,8 @@ type peer struct { // used for protocol traffic (to bypass queues) linkIn (chan []byte) // handlePacket sends, linkLoop recvs linkOut (chan []byte) + lastMsg []byte // last switchMsg accepted + doSend (chan struct{}) // tell the linkLoop to send a switchMsg } const peer_Throttle = 1 @@ -127,6 +129,7 @@ func (ps *peers) newPeer(box *boxPubKey, sig *sigPubKey) *peer { shared: *getSharedKey(&ps.core.boxPriv, box), lastAnc: now, firstSeen: now, + doSend: make(chan struct{}, 1), core: ps.core} ps.mutex.Lock() defer ps.mutex.Unlock() @@ -171,8 +174,33 @@ func (ps *peers) removePeer(port switchPort) { } } +func (ps *peers) sendSwitchMsgs() { + ports := ps.getPorts() + for _, p := range ports { + if p.port == 0 { + continue + } + select { + case p.doSend <- struct{}{}: + default: + } + } +} + +func (ps *peers) fixSwitchAfterPeerDisconnect() { + // TODO something better, this is very wasteful + ports := ps.getPorts() + for _, p := range ports { + if p.lastMsg == nil { + continue + } + p.handleSwitchMsg(p.lastMsg) + } +} + func (p *peer) linkLoop() { - ticker := time.NewTicker(time.Second) + go func() { p.doSend <- struct{}{} }() + ticker := time.NewTicker(10 * time.Second) defer ticker.Stop() for { select { @@ -182,16 +210,15 @@ func (p *peer) linkLoop() { } p.handleLinkTraffic(packet) case <-ticker.C: - if time.Since(p.lastAnc) > 16*time.Second && p.close != nil { - // Seems to have timed out, try to trigger a close - // FIXME this depends on lastAnc or something equivalent being updated - p.close() - } p.throttle = 0 - if p.port == 0 { - continue - } // Don't send announces on selfInterface - // TODO change update logic, the new switchMsg works differently, we only need to send if something changes + if p.lastMsg != nil { + // TODO? remove ticker completely + // p.throttle isn't useful anymore (if they send a wrong message, remove peer instead) + // the handleMessage below is just for debugging, but it *shouldn't* be needed now that things react to state changes instantly + // The one case where it's maybe useful is if you get messages faster than the switch throttle, but that should fix itself after the next periodic update or timeout + p.handleSwitchMsg(p.lastMsg) + } + case <-p.doSend: p.sendSwitchMsg() } } @@ -217,11 +244,10 @@ func (p *peer) handlePacket(packet []byte) { } func (p *peer) handleTraffic(packet []byte, pTypeLen int) { - //if p.port != 0 && p.msgAnc == nil { - // // Drop traffic until the peer manages to send us at least one anc - // // TODO equivalent for new switch format, maybe add some bool flag? - // return - //} + if p.port != 0 && p.lastMsg == nil { + // Drop traffic until the peer manages to send us at least one good switchMsg + return + } ttl, ttlLen := wire_decode_uint64(packet[pTypeLen:]) ttlBegin := pTypeLen ttlEnd := pTypeLen + ttlLen @@ -350,6 +376,7 @@ func (p *peer) handleSwitchMsg(packet []byte) { coords: l.getCoords(), } p.core.dht.peers <- &dinfo + p.lastMsg = packet } func getBytesForSig(next *sigPubKey, loc *switchLocator) []byte { diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index d765f59..a9a0fcd 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -223,12 +223,14 @@ func (t *switchTable) cleanRoot() { } t.data.locator = switchLocator{root: t.key, tstamp: now.Unix()} t.data.sigs = nil + t.core.peers.sendSwitchMsgs() } } func (t *switchTable) removePeer(port switchPort) { delete(t.data.peers, port) t.updater.Store(&sync.Once{}) + t.core.peers.fixSwitchAfterPeerDisconnect() } func (t *switchTable) cleanDropped() { @@ -250,6 +252,7 @@ func (t *switchTable) createMessage(port switchPort) (*switchMessage, []sigInfo) } func (t *switchTable) handleMessage(msg *switchMessage, fromPort switchPort, sigs []sigInfo) { + // TODO directly use a switchMsg instead of switchMessage + sigs t.mutex.Lock() defer t.mutex.Unlock() now := time.Now() @@ -344,6 +347,7 @@ func (t *switchTable) handleMessage(msg *switchMessage, fromPort switchPort, sig t.parent = sender.port t.data.sigs = sigs //t.core.log.Println("Switch update:", msg.Locator.Root, msg.Locator.Tstamp, msg.Locator.Coords) + t.core.peers.sendSwitchMsgs() } if doUpdate { t.updater.Store(&sync.Once{})