diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index a4a41e7..98c080c 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -276,6 +276,13 @@ func (intf *linkInterface) notifySending(size int, isLinkTraffic bool) { }) } +// called by an AfterFunc if we seem to be blocked in a send syscall for a long time +func (intf *linkInterface) _notifySyscall() { + intf.link.core.switchTable.Act(intf, func() { + intf.link.core.switchTable._sendingIn(intf.peer.port) + }) +} + // we just sent something, so cancel any pending timer to send keep-alive traffic func (intf *linkInterface) _cancelStallTimer() { if intf.stallTimer != nil { @@ -284,9 +291,11 @@ func (intf *linkInterface) _cancelStallTimer() { } } -// called by an AfterFunc if we appear to have timed out +// This gets called from a time.AfterFunc, and notifies the switch that we appear +// to have gotten blocked on a write, so the switch should start routing traffic +// through other links, if alternatives exist func (intf *linkInterface) notifyBlockedSend() { - intf.Act(nil, func() { // Sent from a time.AfterFunc + intf.Act(nil, func() { if intf.sendTimer != nil { //As far as we know, we're still trying to send, and the timer fired. intf.link.core.switchTable.blockPeer(intf.peer.port) @@ -380,7 +389,19 @@ func (w *linkWriter) sendFrom(from phony.Actor, bss [][]byte, isLinkTraffic bool size += len(bs) } w.intf.notifySending(size, isLinkTraffic) + // start a timer that will fire if we get stuck in writeMsgs for an oddly long time + var once sync.Once + timer := time.AfterFunc(time.Millisecond, func() { + // 1 ms is kind of arbitrary + // the rationale is that this should be very long compared to a syscall + // but it's still short compared to end-to-end latency or human perception + once.Do(func() { + w.intf.Act(nil, w.intf._notifySyscall) + }) + }) w.intf.msgIO.writeMsgs(bss) + // Make sure we either stop the timer from doing anything or wait until it's done + once.Do(func() { timer.Stop() }) w.intf.notifySent(size, isLinkTraffic) // Cleanup for _, bs := range bss { diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index f4df60d..ba30758 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -177,6 +177,7 @@ type switchTable struct { phony.Inbox // Owns the below queues switch_buffers // Queues - not atomic so ONLY use through the actor idle map[switchPort]struct{} // idle peers - not atomic so ONLY use through the actor + sending map[switchPort]struct{} // peers known to be blocked in a send (somehow) } // Minimum allowed total size of switch queues. @@ -203,6 +204,7 @@ func (t *switchTable) init(core *Core) { core.config.Mutex.RUnlock() t.queues.bufs = make(map[string]switch_buffer) t.idle = make(map[switchPort]struct{}) + t.sending = make(map[switchPort]struct{}) }) } @@ -527,7 +529,7 @@ func (t *switchTable) unlockedHandleMsg(msg *switchMsg, fromPort switchPort, rep t.parent = sender.port t.core.peers.sendSwitchMsgs(t) } - if doUpdate { + if true || doUpdate { t.updater.Store(&sync.Once{}) } return @@ -664,7 +666,7 @@ func (t *switchTable) bestPortForCoords(coords []byte) switchPort { // Handle an incoming packet // Either send it to ourself, or to the first idle peer that's free // Returns true if the packet has been handled somehow, false if it should be queued -func (t *switchTable) _handleIn(packet []byte, idle map[switchPort]struct{}) bool { +func (t *switchTable) _handleIn(packet []byte, idle map[switchPort]struct{}, sending map[switchPort]struct{}) bool { coords := switch_getPacketCoords(packet) closer := t.getCloser(coords) if len(closer) == 0 { @@ -677,12 +679,13 @@ func (t *switchTable) _handleIn(packet []byte, idle map[switchPort]struct{}) boo ports := t.core.peers.getPorts() for _, cinfo := range closer { to := ports[cinfo.elem.port] - _, isIdle := idle[cinfo.elem.port] + //_, isIdle := idle[cinfo.elem.port] + _, isSending := sending[cinfo.elem.port] var update bool switch { case to == nil: // no port was found, ignore it - case !isIdle: + case isSending: // the port is busy, ignore it case best == nil: // this is the first idle port we've found, so select it until we find a @@ -702,6 +705,7 @@ func (t *switchTable) _handleIn(packet []byte, idle map[switchPort]struct{}) boo // has a n older tstamp, so presumably a worse path case cinfo.elem.time.Before(best.elem.time): // same tstamp, but got it earlier, so presumably a better path + //t.core.log.Println("DEBUG new best:", best.elem.time, cinfo.elem.time) update = true default: // the search for a port has finished @@ -712,10 +716,11 @@ func (t *switchTable) _handleIn(packet []byte, idle map[switchPort]struct{}) boo } } if best != nil { - // Send to the best idle next hop - delete(idle, best.elem.port) - ports[best.elem.port].sendPacketsFrom(t, [][]byte{packet}) - return true + if _, isIdle := idle[best.elem.port]; isIdle { + delete(idle, best.elem.port) + ports[best.elem.port].sendPacketsFrom(t, [][]byte{packet}) + return true + } } // Didn't find anyone idle to send it to return false @@ -785,6 +790,7 @@ func (b *switch_buffers) _cleanup(t *switchTable) { // Loops over packets and sends the newest one that's OK for this peer to send // Returns true if the peer is no longer idle, false if it should be added to the idle list func (t *switchTable) _handleIdle(port switchPort) bool { + // TODO? only send packets for which this is the best next hop that isn't currently blocked sending to := t.core.peers.getPorts()[port] if to == nil { return true @@ -842,7 +848,7 @@ func (t *switchTable) packetInFrom(from phony.Actor, bytes []byte) { func (t *switchTable) _packetIn(bytes []byte) { // Try to send it somewhere (or drop it if it's corrupt or at a dead end) - if !t._handleIn(bytes, t.idle) { + if !t._handleIn(bytes, t.idle, t.sending) { // There's nobody free to take it right now, so queue it for later packet := switch_packetInfo{bytes, time.Now()} streamID := switch_getPacketStreamID(packet.bytes) @@ -869,8 +875,15 @@ func (t *switchTable) _packetIn(bytes []byte) { func (t *switchTable) _idleIn(port switchPort) { // Try to find something to send to this peer + delete(t.sending, port) if !t._handleIdle(port) { // Didn't find anything ready to send yet, so stay idle t.idle[port] = struct{}{} } } + +func (t *switchTable) _sendingIn(port switchPort) { + if _, isIn := t.idle[port]; !isIn { + t.sending[port] = struct{}{} + } +} diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index ed8f7b9..66f708c 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -233,8 +233,9 @@ func (t *tcp) call(saddr string, options interface{}, sintf string) { } defer func() { // Block new calls for a little while, to mitigate livelock scenarios - time.Sleep(default_timeout) - time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond) + rand.Seed(time.Now().UnixNano()) + delay := default_timeout + time.Duration(rand.Intn(10000))*time.Millisecond + time.Sleep(delay) t.mutex.Lock() delete(t.calls, callname) t.mutex.Unlock()