5
0
mirror of https://github.com/cwinfo/yggdrasil-go.git synced 2024-11-22 20:00:27 +00:00

make the switch react to peer coord changes immediately, and send out updates immediately

This commit is contained in:
Arceliar 2018-06-07 00:16:47 -05:00
parent 85afe187ff
commit ecf37cae8a
2 changed files with 46 additions and 15 deletions

View File

@ -108,6 +108,8 @@ type peer struct {
// used for protocol traffic (to bypass queues) // used for protocol traffic (to bypass queues)
linkIn (chan []byte) // handlePacket sends, linkLoop recvs linkIn (chan []byte) // handlePacket sends, linkLoop recvs
linkOut (chan []byte) linkOut (chan []byte)
lastMsg []byte // last switchMsg accepted
doSend (chan struct{}) // tell the linkLoop to send a switchMsg
} }
const peer_Throttle = 1 const peer_Throttle = 1
@ -127,6 +129,7 @@ func (ps *peers) newPeer(box *boxPubKey, sig *sigPubKey) *peer {
shared: *getSharedKey(&ps.core.boxPriv, box), shared: *getSharedKey(&ps.core.boxPriv, box),
lastAnc: now, lastAnc: now,
firstSeen: now, firstSeen: now,
doSend: make(chan struct{}, 1),
core: ps.core} core: ps.core}
ps.mutex.Lock() ps.mutex.Lock()
defer ps.mutex.Unlock() defer ps.mutex.Unlock()
@ -171,8 +174,33 @@ func (ps *peers) removePeer(port switchPort) {
} }
} }
func (ps *peers) sendSwitchMsgs() {
ports := ps.getPorts()
for _, p := range ports {
if p.port == 0 {
continue
}
select {
case p.doSend <- struct{}{}:
default:
}
}
}
func (ps *peers) fixSwitchAfterPeerDisconnect() {
// TODO something better, this is very wasteful
ports := ps.getPorts()
for _, p := range ports {
if p.lastMsg == nil {
continue
}
p.handleSwitchMsg(p.lastMsg)
}
}
func (p *peer) linkLoop() { func (p *peer) linkLoop() {
ticker := time.NewTicker(time.Second) go func() { p.doSend <- struct{}{} }()
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop() defer ticker.Stop()
for { for {
select { select {
@ -182,16 +210,15 @@ func (p *peer) linkLoop() {
} }
p.handleLinkTraffic(packet) p.handleLinkTraffic(packet)
case <-ticker.C: case <-ticker.C:
if time.Since(p.lastAnc) > 16*time.Second && p.close != nil {
// Seems to have timed out, try to trigger a close
// FIXME this depends on lastAnc or something equivalent being updated
p.close()
}
p.throttle = 0 p.throttle = 0
if p.port == 0 { if p.lastMsg != nil {
continue // TODO? remove ticker completely
} // Don't send announces on selfInterface // p.throttle isn't useful anymore (if they send a wrong message, remove peer instead)
// TODO change update logic, the new switchMsg works differently, we only need to send if something changes // the handleMessage below is just for debugging, but it *shouldn't* be needed now that things react to state changes instantly
// The one case where it's maybe useful is if you get messages faster than the switch throttle, but that should fix itself after the next periodic update or timeout
p.handleSwitchMsg(p.lastMsg)
}
case <-p.doSend:
p.sendSwitchMsg() p.sendSwitchMsg()
} }
} }
@ -217,11 +244,10 @@ func (p *peer) handlePacket(packet []byte) {
} }
func (p *peer) handleTraffic(packet []byte, pTypeLen int) { func (p *peer) handleTraffic(packet []byte, pTypeLen int) {
//if p.port != 0 && p.msgAnc == nil { if p.port != 0 && p.lastMsg == nil {
// // Drop traffic until the peer manages to send us at least one anc // Drop traffic until the peer manages to send us at least one good switchMsg
// // TODO equivalent for new switch format, maybe add some bool flag? return
// return }
//}
ttl, ttlLen := wire_decode_uint64(packet[pTypeLen:]) ttl, ttlLen := wire_decode_uint64(packet[pTypeLen:])
ttlBegin := pTypeLen ttlBegin := pTypeLen
ttlEnd := pTypeLen + ttlLen ttlEnd := pTypeLen + ttlLen
@ -350,6 +376,7 @@ func (p *peer) handleSwitchMsg(packet []byte) {
coords: l.getCoords(), coords: l.getCoords(),
} }
p.core.dht.peers <- &dinfo p.core.dht.peers <- &dinfo
p.lastMsg = packet
} }
func getBytesForSig(next *sigPubKey, loc *switchLocator) []byte { func getBytesForSig(next *sigPubKey, loc *switchLocator) []byte {

View File

@ -223,12 +223,14 @@ func (t *switchTable) cleanRoot() {
} }
t.data.locator = switchLocator{root: t.key, tstamp: now.Unix()} t.data.locator = switchLocator{root: t.key, tstamp: now.Unix()}
t.data.sigs = nil t.data.sigs = nil
t.core.peers.sendSwitchMsgs()
} }
} }
func (t *switchTable) removePeer(port switchPort) { func (t *switchTable) removePeer(port switchPort) {
delete(t.data.peers, port) delete(t.data.peers, port)
t.updater.Store(&sync.Once{}) t.updater.Store(&sync.Once{})
t.core.peers.fixSwitchAfterPeerDisconnect()
} }
func (t *switchTable) cleanDropped() { func (t *switchTable) cleanDropped() {
@ -250,6 +252,7 @@ func (t *switchTable) createMessage(port switchPort) (*switchMessage, []sigInfo)
} }
func (t *switchTable) handleMessage(msg *switchMessage, fromPort switchPort, sigs []sigInfo) { func (t *switchTable) handleMessage(msg *switchMessage, fromPort switchPort, sigs []sigInfo) {
// TODO directly use a switchMsg instead of switchMessage + sigs
t.mutex.Lock() t.mutex.Lock()
defer t.mutex.Unlock() defer t.mutex.Unlock()
now := time.Now() now := time.Now()
@ -344,6 +347,7 @@ func (t *switchTable) handleMessage(msg *switchMessage, fromPort switchPort, sig
t.parent = sender.port t.parent = sender.port
t.data.sigs = sigs t.data.sigs = sigs
//t.core.log.Println("Switch update:", msg.Locator.Root, msg.Locator.Tstamp, msg.Locator.Coords) //t.core.log.Println("Switch update:", msg.Locator.Root, msg.Locator.Tstamp, msg.Locator.Coords)
t.core.peers.sendSwitchMsgs()
} }
if doUpdate { if doUpdate {
t.updater.Store(&sync.Once{}) t.updater.Store(&sync.Once{})