diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index a4a41e7..ece69ca 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -64,6 +64,8 @@ type linkInterface struct { closeTimer *time.Timer // Fires when the link has been idle so long we need to close it inSwitch bool // True if the switch is tracking this link stalled bool // True if we haven't been receiving any response traffic + sendSeqSent uint // Incremented each time we start sending + sendSeqRecv uint // Incremented each time we finish sending } func (l *link) init(c *Core) error { @@ -273,9 +275,23 @@ func (intf *linkInterface) notifySending(size int, isLinkTraffic bool) { } intf.sendTimer = time.AfterFunc(sendTime, intf.notifyBlockedSend) intf._cancelStallTimer() + intf.sendSeqSent++ + seq := intf.sendSeqSent + intf.Act(nil, func() { + intf._checkSending(seq) + }) }) } +// If check if we're still sending +func (intf *linkInterface) _checkSending(seq uint) { + if intf.sendSeqRecv != seq { + intf.link.core.switchTable.Act(intf, func() { + intf.link.core.switchTable._sendingIn(intf.peer.port) + }) + } +} + // we just sent something, so cancel any pending timer to send keep-alive traffic func (intf *linkInterface) _cancelStallTimer() { if intf.stallTimer != nil { @@ -305,6 +321,7 @@ func (intf *linkInterface) notifySent(size int, isLinkTraffic bool) { if size > 0 && intf.stallTimer == nil { intf.stallTimer = time.AfterFunc(stallTime, intf.notifyStalled) } + intf.sendSeqRecv++ }) } diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index ece2fa2..0150e17 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -177,6 +177,7 @@ type switchTable struct { phony.Inbox // Owns the below queues switch_buffers // Queues - not atomic so ONLY use through the actor idle map[switchPort]struct{} // idle peers - not atomic so ONLY use through the actor + sending map[switchPort]struct{} // peers known to be blocked in a send (somehow) } // Minimum allowed total size of switch queues. @@ -203,6 +204,7 @@ func (t *switchTable) init(core *Core) { core.config.Mutex.RUnlock() t.queues.bufs = make(map[string]switch_buffer) t.idle = make(map[switchPort]struct{}) + t.sending = make(map[switchPort]struct{}) }) } @@ -527,7 +529,7 @@ func (t *switchTable) unlockedHandleMsg(msg *switchMsg, fromPort switchPort, rep t.parent = sender.port t.core.peers.sendSwitchMsgs(t) } - if doUpdate { + if true || doUpdate { t.updater.Store(&sync.Once{}) } return @@ -664,7 +666,7 @@ func (t *switchTable) bestPortForCoords(coords []byte) switchPort { // Handle an incoming packet // Either send it to ourself, or to the first idle peer that's free // Returns true if the packet has been handled somehow, false if it should be queued -func (t *switchTable) _handleIn(packet []byte, idle map[switchPort]struct{}) bool { +func (t *switchTable) _handleIn(packet []byte, idle map[switchPort]struct{}, sending map[switchPort]struct{}) bool { coords := switch_getPacketCoords(packet) closer := t.getCloser(coords) if len(closer) == 0 { @@ -677,12 +679,13 @@ func (t *switchTable) _handleIn(packet []byte, idle map[switchPort]struct{}) boo ports := t.core.peers.getPorts() for _, cinfo := range closer { to := ports[cinfo.elem.port] - _, isIdle := idle[cinfo.elem.port] + //_, isIdle := idle[cinfo.elem.port] + _, isSending := sending[cinfo.elem.port] var update bool switch { case to == nil: // no port was found, ignore it - case !isIdle: + case isSending: // the port is busy, ignore it case best == nil: // this is the first idle port we've found, so select it until we find a @@ -702,6 +705,7 @@ func (t *switchTable) _handleIn(packet []byte, idle map[switchPort]struct{}) boo // has a n older tstamp, so presumably a worse path case cinfo.elem.time.Before(best.elem.time): // same tstamp, but got it earlier, so presumably a better path + //t.core.log.Println("DEBUG new best:", best.elem.time, cinfo.elem.time) update = true default: // the search for a port has finished @@ -712,13 +716,18 @@ func (t *switchTable) _handleIn(packet []byte, idle map[switchPort]struct{}) boo } } if best != nil { - delete(idle, best.elem.port) + if _, isIdle := idle[best.elem.port]; isIdle { + delete(idle, best.elem.port) + ports[best.elem.port].sendPacketsFrom(t, [][]byte{packet}) + return true + } + //delete(idle, best.elem.port) // Tell ourselves to send to this node later // If another (e.g. even better) hop becomes idle in the mean time, it'll take the packet instead // FIXME this is just a hack, but seems to help with stability... - go t.Act(nil, func() { - t._idleIn(best.elem.port) - }) + //go t.Act(nil, func() { + // t._idleIn(best.elem.port) + //}) //ports[best.elem.port].sendPacketsFrom(t, [][]byte{packet}) //return true } @@ -847,7 +856,7 @@ func (t *switchTable) packetInFrom(from phony.Actor, bytes []byte) { func (t *switchTable) _packetIn(bytes []byte) { // Try to send it somewhere (or drop it if it's corrupt or at a dead end) - if !t._handleIn(bytes, t.idle) { + if !t._handleIn(bytes, t.idle, t.sending) { // There's nobody free to take it right now, so queue it for later packet := switch_packetInfo{bytes, time.Now()} streamID := switch_getPacketStreamID(packet.bytes) @@ -874,8 +883,15 @@ func (t *switchTable) _packetIn(bytes []byte) { func (t *switchTable) _idleIn(port switchPort) { // Try to find something to send to this peer + delete(t.sending, port) if !t._handleIdle(port) { // Didn't find anything ready to send yet, so stay idle t.idle[port] = struct{}{} } } + +func (t *switchTable) _sendingIn(port switchPort) { + if _, isIn := t.idle[port]; !isIn { + t.sending[port] = struct{}{} + } +}