5
0
mirror of https://github.com/cwinfo/yggdrasil-go.git synced 2024-11-10 07:30:27 +00:00

explicitly notify the switch when a link appears to be blocked in a send instead of assuming this is the case for all idle links. how we decide when it's really blocked still needs testing/optimizing

This commit is contained in:
Arceliar 2019-09-24 18:01:35 -05:00
parent 691192ff5a
commit 8c64e6fa09
2 changed files with 42 additions and 9 deletions

View File

@ -64,6 +64,8 @@ type linkInterface struct {
closeTimer *time.Timer // Fires when the link has been idle so long we need to close it closeTimer *time.Timer // Fires when the link has been idle so long we need to close it
inSwitch bool // True if the switch is tracking this link inSwitch bool // True if the switch is tracking this link
stalled bool // True if we haven't been receiving any response traffic stalled bool // True if we haven't been receiving any response traffic
sendSeqSent uint // Incremented each time we start sending
sendSeqRecv uint // Incremented each time we finish sending
} }
func (l *link) init(c *Core) error { func (l *link) init(c *Core) error {
@ -273,7 +275,21 @@ func (intf *linkInterface) notifySending(size int, isLinkTraffic bool) {
} }
intf.sendTimer = time.AfterFunc(sendTime, intf.notifyBlockedSend) intf.sendTimer = time.AfterFunc(sendTime, intf.notifyBlockedSend)
intf._cancelStallTimer() intf._cancelStallTimer()
intf.sendSeqSent++
seq := intf.sendSeqSent
intf.Act(nil, func() {
intf._checkSending(seq)
}) })
})
}
// If check if we're still sending
func (intf *linkInterface) _checkSending(seq uint) {
if intf.sendSeqRecv != seq {
intf.link.core.switchTable.Act(intf, func() {
intf.link.core.switchTable._sendingIn(intf.peer.port)
})
}
} }
// we just sent something, so cancel any pending timer to send keep-alive traffic // we just sent something, so cancel any pending timer to send keep-alive traffic
@ -305,6 +321,7 @@ func (intf *linkInterface) notifySent(size int, isLinkTraffic bool) {
if size > 0 && intf.stallTimer == nil { if size > 0 && intf.stallTimer == nil {
intf.stallTimer = time.AfterFunc(stallTime, intf.notifyStalled) intf.stallTimer = time.AfterFunc(stallTime, intf.notifyStalled)
} }
intf.sendSeqRecv++
}) })
} }

View File

@ -177,6 +177,7 @@ type switchTable struct {
phony.Inbox // Owns the below phony.Inbox // Owns the below
queues switch_buffers // Queues - not atomic so ONLY use through the actor queues switch_buffers // Queues - not atomic so ONLY use through the actor
idle map[switchPort]struct{} // idle peers - not atomic so ONLY use through the actor idle map[switchPort]struct{} // idle peers - not atomic so ONLY use through the actor
sending map[switchPort]struct{} // peers known to be blocked in a send (somehow)
} }
// Minimum allowed total size of switch queues. // Minimum allowed total size of switch queues.
@ -203,6 +204,7 @@ func (t *switchTable) init(core *Core) {
core.config.Mutex.RUnlock() core.config.Mutex.RUnlock()
t.queues.bufs = make(map[string]switch_buffer) t.queues.bufs = make(map[string]switch_buffer)
t.idle = make(map[switchPort]struct{}) t.idle = make(map[switchPort]struct{})
t.sending = make(map[switchPort]struct{})
}) })
} }
@ -527,7 +529,7 @@ func (t *switchTable) unlockedHandleMsg(msg *switchMsg, fromPort switchPort, rep
t.parent = sender.port t.parent = sender.port
t.core.peers.sendSwitchMsgs(t) t.core.peers.sendSwitchMsgs(t)
} }
if doUpdate { if true || doUpdate {
t.updater.Store(&sync.Once{}) t.updater.Store(&sync.Once{})
} }
return return
@ -664,7 +666,7 @@ func (t *switchTable) bestPortForCoords(coords []byte) switchPort {
// Handle an incoming packet // Handle an incoming packet
// Either send it to ourself, or to the first idle peer that's free // Either send it to ourself, or to the first idle peer that's free
// Returns true if the packet has been handled somehow, false if it should be queued // Returns true if the packet has been handled somehow, false if it should be queued
func (t *switchTable) _handleIn(packet []byte, idle map[switchPort]struct{}) bool { func (t *switchTable) _handleIn(packet []byte, idle map[switchPort]struct{}, sending map[switchPort]struct{}) bool {
coords := switch_getPacketCoords(packet) coords := switch_getPacketCoords(packet)
closer := t.getCloser(coords) closer := t.getCloser(coords)
if len(closer) == 0 { if len(closer) == 0 {
@ -677,12 +679,13 @@ func (t *switchTable) _handleIn(packet []byte, idle map[switchPort]struct{}) boo
ports := t.core.peers.getPorts() ports := t.core.peers.getPorts()
for _, cinfo := range closer { for _, cinfo := range closer {
to := ports[cinfo.elem.port] to := ports[cinfo.elem.port]
_, isIdle := idle[cinfo.elem.port] //_, isIdle := idle[cinfo.elem.port]
_, isSending := sending[cinfo.elem.port]
var update bool var update bool
switch { switch {
case to == nil: case to == nil:
// no port was found, ignore it // no port was found, ignore it
case !isIdle: case isSending:
// the port is busy, ignore it // the port is busy, ignore it
case best == nil: case best == nil:
// this is the first idle port we've found, so select it until we find a // this is the first idle port we've found, so select it until we find a
@ -702,6 +705,7 @@ func (t *switchTable) _handleIn(packet []byte, idle map[switchPort]struct{}) boo
// has a n older tstamp, so presumably a worse path // has a n older tstamp, so presumably a worse path
case cinfo.elem.time.Before(best.elem.time): case cinfo.elem.time.Before(best.elem.time):
// same tstamp, but got it earlier, so presumably a better path // same tstamp, but got it earlier, so presumably a better path
//t.core.log.Println("DEBUG new best:", best.elem.time, cinfo.elem.time)
update = true update = true
default: default:
// the search for a port has finished // the search for a port has finished
@ -712,13 +716,18 @@ func (t *switchTable) _handleIn(packet []byte, idle map[switchPort]struct{}) boo
} }
} }
if best != nil { if best != nil {
if _, isIdle := idle[best.elem.port]; isIdle {
delete(idle, best.elem.port) delete(idle, best.elem.port)
ports[best.elem.port].sendPacketsFrom(t, [][]byte{packet})
return true
}
//delete(idle, best.elem.port)
// Tell ourselves to send to this node later // Tell ourselves to send to this node later
// If another (e.g. even better) hop becomes idle in the mean time, it'll take the packet instead // If another (e.g. even better) hop becomes idle in the mean time, it'll take the packet instead
// FIXME this is just a hack, but seems to help with stability... // FIXME this is just a hack, but seems to help with stability...
go t.Act(nil, func() { //go t.Act(nil, func() {
t._idleIn(best.elem.port) // t._idleIn(best.elem.port)
}) //})
//ports[best.elem.port].sendPacketsFrom(t, [][]byte{packet}) //ports[best.elem.port].sendPacketsFrom(t, [][]byte{packet})
//return true //return true
} }
@ -847,7 +856,7 @@ func (t *switchTable) packetInFrom(from phony.Actor, bytes []byte) {
func (t *switchTable) _packetIn(bytes []byte) { func (t *switchTable) _packetIn(bytes []byte) {
// Try to send it somewhere (or drop it if it's corrupt or at a dead end) // Try to send it somewhere (or drop it if it's corrupt or at a dead end)
if !t._handleIn(bytes, t.idle) { if !t._handleIn(bytes, t.idle, t.sending) {
// There's nobody free to take it right now, so queue it for later // There's nobody free to take it right now, so queue it for later
packet := switch_packetInfo{bytes, time.Now()} packet := switch_packetInfo{bytes, time.Now()}
streamID := switch_getPacketStreamID(packet.bytes) streamID := switch_getPacketStreamID(packet.bytes)
@ -874,8 +883,15 @@ func (t *switchTable) _packetIn(bytes []byte) {
func (t *switchTable) _idleIn(port switchPort) { func (t *switchTable) _idleIn(port switchPort) {
// Try to find something to send to this peer // Try to find something to send to this peer
delete(t.sending, port)
if !t._handleIdle(port) { if !t._handleIdle(port) {
// Didn't find anything ready to send yet, so stay idle // Didn't find anything ready to send yet, so stay idle
t.idle[port] = struct{}{} t.idle[port] = struct{}{}
} }
} }
func (t *switchTable) _sendingIn(port switchPort) {
if _, isIn := t.idle[port]; !isIn {
t.sending[port] = struct{}{}
}
}