diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index 7a843b6..5222644 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -64,8 +64,9 @@ type link struct { keepAliveTimer *time.Timer // Fires to send keep-alive traffic stallTimer *time.Timer // Fires to signal that no incoming traffic (including keep-alive) has been seen closeTimer *time.Timer // Fires when the link has been idle so long we need to close it - isSending bool // True between a notifySending and a notifySent - blocked bool // True if we've blocked the peer in the switch + readUnblocked bool // True if we've sent a read message unblocking this peer in the switch + writeUnblocked bool // True if we've sent a write message unblocking this peer in the swithc + shutdown bool // True if we're shutting down, avoids sending some messages that could race with new peers being crated in the same port } type linkOptions struct { @@ -285,7 +286,10 @@ func (intf *link) handler() error { } defer func() { // More cleanup can go here - intf.peer.Act(nil, intf.peer._removeSelf) + intf.Act(nil, func() { + intf.shutdown = true + intf.peer.Act(intf, intf.peer._removeSelf) + }) }() themAddr := address.AddrForNodeID(crypto.GetNodeID(&intf.info.box)) themAddrString := net.IP(themAddr[:]).String() @@ -385,7 +389,6 @@ const ( // notify the intf that we're currently sending func (intf *link) notifySending(size int) { intf.Act(&intf.writer, func() { - intf.isSending = true intf.sendTimer = time.AfterFunc(sendTime, intf.notifyBlockedSend) if intf.keepAliveTimer != nil { intf.keepAliveTimer.Stop() @@ -400,10 +403,14 @@ func (intf *link) notifySending(size int) { // through other links, if alternatives exist func (intf *link) notifyBlockedSend() { intf.Act(nil, func() { - if intf.sendTimer != nil && !intf.blocked { + if intf.sendTimer != nil { //As far as we know, we're still trying to send, and the timer fired. - intf.blocked = true - intf.links.core.switchTable.blockPeer(intf, intf.peer.port) + intf.sendTimer.Stop() + intf.sendTimer = nil + if !intf.shutdown && intf.writeUnblocked { + intf.writeUnblocked = false + intf.links.core.switchTable.blockPeer(intf, intf.peer.port, true) + } } }) } @@ -421,10 +428,13 @@ func (intf *link) notifySent(size int) { intf.keepAliveTimer = nil } intf._notifyIdle() - intf.isSending = false if size > 0 && intf.stallTimer == nil { intf.stallTimer = time.AfterFunc(stallTime, intf.notifyStalled) } + if !intf.shutdown && !intf.writeUnblocked { + intf.writeUnblocked = true + intf.links.core.switchTable.unblockPeer(intf, intf.peer.port, true) + } }) } @@ -439,9 +449,9 @@ func (intf *link) notifyStalled() { if intf.stallTimer != nil { intf.stallTimer.Stop() intf.stallTimer = nil - if !intf.blocked { - intf.blocked = true - intf.links.core.switchTable.blockPeer(intf, intf.peer.port) + if !intf.shutdown && intf.readUnblocked { + intf.readUnblocked = false + intf.links.core.switchTable.blockPeer(intf, intf.peer.port, false) } } }) @@ -467,9 +477,9 @@ func (intf *link) notifyRead(size int) { if size > 0 && intf.keepAliveTimer == nil { intf.keepAliveTimer = time.AfterFunc(keepAliveTime, intf.notifyDoKeepAlive) } - if intf.blocked { - intf.blocked = false - intf.links.core.switchTable.unblockPeer(intf, intf.peer.port) + if !intf.shutdown && !intf.readUnblocked { + intf.readUnblocked = true + intf.links.core.switchTable.unblockPeer(intf, intf.peer.port, false) } }) } diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index ed2edf2..9c7a91f 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -136,14 +136,19 @@ func (x *switchLocator) isAncestorOf(y *switchLocator) bool { // Information about a peer, used by the switch to build the tree and eventually make routing decisions. type peerInfo struct { - key crypto.SigPubKey // ID of this peer - locator switchLocator // Should be able to respond with signatures upon request - degree uint64 // Self-reported degree - time time.Time // Time this node was last seen - faster map[switchPort]uint64 // Counter of how often a node is faster than the current parent, penalized extra if slower - port switchPort // Interface number of this peer - msg switchMsg // The wire switchMsg used - blocked bool // True if the link is blocked, used to avoid parenting a blocked link + key crypto.SigPubKey // ID of this peer + locator switchLocator // Should be able to respond with signatures upon request + degree uint64 // Self-reported degree + time time.Time // Time this node was last seen + faster map[switchPort]uint64 // Counter of how often a node is faster than the current parent, penalized extra if slower + port switchPort // Interface number of this peer + msg switchMsg // The wire switchMsg used + readBlock bool // True if the link notified us of a read that blocked too long + writeBlock bool // True of the link notified us of a write that blocked too long +} + +func (pinfo *peerInfo) blocked() bool { + return pinfo.readBlock || pinfo.writeBlock } // This is just a uint64 with a named type for clarity reasons. @@ -250,15 +255,19 @@ func (t *switchTable) _cleanRoot() { } // Blocks and, if possible, unparents a peer -func (t *switchTable) blockPeer(from phony.Actor, port switchPort) { +func (t *switchTable) blockPeer(from phony.Actor, port switchPort, isWrite bool) { t.Act(from, func() { peer, isIn := t.data.peers[port] - if !isIn || peer.blocked { + switch { + case isIn && !isWrite && !peer.readBlock: + peer.readBlock = true + case isIn && isWrite && !peer.writeBlock: + peer.writeBlock = true + default: return } - peer.blocked = true t.data.peers[port] = peer - t._updateTable() + defer t._updateTable() if port != t.parent { return } @@ -273,13 +282,17 @@ func (t *switchTable) blockPeer(from phony.Actor, port switchPort) { }) } -func (t *switchTable) unblockPeer(from phony.Actor, port switchPort) { +func (t *switchTable) unblockPeer(from phony.Actor, port switchPort, isWrite bool) { t.Act(from, func() { peer, isIn := t.data.peers[port] - if !isIn || !peer.blocked { + switch { + case isIn && !isWrite && peer.readBlock: + peer.readBlock = false + case isIn && isWrite && peer.writeBlock: + peer.writeBlock = false + default: return } - peer.blocked = false t.data.peers[port] = peer t._updateTable() }) @@ -422,7 +435,8 @@ func (t *switchTable) _handleMsg(msg *switchMsg, fromPort switchPort, reprocessi if reprocessing { sender.faster = oldSender.faster sender.time = oldSender.time - sender.blocked = oldSender.blocked + sender.readBlock = oldSender.readBlock + sender.writeBlock = oldSender.writeBlock } else { sender.faster = make(map[switchPort]uint64, len(oldSender.faster)) for port, peer := range t.data.peers { @@ -445,7 +459,7 @@ func (t *switchTable) _handleMsg(msg *switchMsg, fromPort switchPort, reprocessi } } } - if sender.blocked != oldSender.blocked { + if sender.blocked() != oldSender.blocked() { doUpdate = true } // Update sender @@ -485,10 +499,10 @@ func (t *switchTable) _handleMsg(msg *switchMsg, fromPort switchPort, reprocessi case sender.faster[t.parent] >= switch_faster_threshold: // The is reliably faster than the current parent. updateRoot = true - case !sender.blocked && oldParent.blocked: + case !sender.blocked() && oldParent.blocked(): // Replace a blocked parent updateRoot = true - case reprocessing && sender.blocked && !oldParent.blocked: + case reprocessing && sender.blocked() && !oldParent.blocked(): // Don't replace an unblocked parent when reprocessing case reprocessing && sender.faster[t.parent] > oldParent.faster[sender.port]: // The sender seems to be reliably faster than the current parent, so switch to them instead. @@ -546,7 +560,7 @@ func (t *switchTable) _updateTable() { } newTable._init() for _, pinfo := range t.data.peers { - if pinfo.blocked || pinfo.locator.root != newTable.self.root { + if pinfo.blocked() || pinfo.locator.root != newTable.self.root { continue } loc := pinfo.locator.clone()