diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index 8fbd031..6b7f446 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -46,23 +46,23 @@ type linkInterfaceMsgIO interface { } type linkInterface struct { - name string - link *link - peer *peer - msgIO linkInterfaceMsgIO - info linkInfo - incoming bool - force bool - closed chan struct{} - reader linkReader // Reads packets, notifies this linkInterface, passes packets to switch - writer linkWriter // Writes packets, notifies this linkInterface - phony.Inbox // Protects the below - sendTimer *time.Timer // Fires to signal that sending is blocked - stallTimer *time.Timer // Fires to signal that no incoming traffic (including keep-alive) has been seen - recvTimer *time.Timer // Fires to send keep-alive traffic - closeTimer *time.Timer // Fires when the link has been idle so long we need to close it - inSwitch bool // True if the switch is tracking this link - stalled bool // True if we haven't been receiving any response traffic + name string + link *link + peer *peer + msgIO linkInterfaceMsgIO + info linkInfo + incoming bool + force bool + closed chan struct{} + reader linkReader // Reads packets, notifies this linkInterface, passes packets to switch + writer linkWriter // Writes packets, notifies this linkInterface + phony.Inbox // Protects the below + sendTimer *time.Timer // Fires to signal that sending is blocked + keepAliveTimer *time.Timer // Fires to send keep-alive traffic + stallTimer *time.Timer // Fires to signal that no incoming traffic (including keep-alive) has been seen + closeTimer *time.Timer // Fires when the link has been idle so long we need to close it + inSwitch bool // True if the switch is tracking this link + stalled bool // True if we haven't been receiving any response traffic } func (l *link) init(c *Core) error { @@ -243,36 +243,34 @@ func (intf *linkInterface) handler() error { //////////////////////////////////////////////////////////////////////////////// const ( - sendBlockedTime = time.Second // How long to wait before deciding a send is blocked - keepAliveTime = 2 * time.Second // How long to wait before sending a keep-alive response if we have no real traffic to send - stallTime = 6 * time.Second // How long to wait for response traffic before deciding the connection has stalled - closeTime = 2 * switch_timeout // How long to wait before closing the link + sendTime = 1 * time.Second // How long to wait before deciding a send is blocked + keepAliveTime = 2 * time.Second // How long to wait before sending a keep-alive response if we have no real traffic to send + stallTime = 6 * time.Second // How long to wait for response traffic before deciding the connection has stalled + closeTime = 2 * switch_timeout // How long to wait before closing the link ) // notify the intf that we're currently sending func (intf *linkInterface) notifySending(size int, isLinkTraffic bool) { - intf.RecvFrom(nil, func() { + intf.RecvFrom(&intf.writer, func() { if !isLinkTraffic { intf.inSwitch = false } - intf.sendTimer = time.AfterFunc(sendBlockedTime, intf.notifyBlockedSend) - intf._cancelRecvTimer() + intf.sendTimer = time.AfterFunc(sendTime, intf.notifyBlockedSend) + intf._cancelStallTimer() }) } // we just sent something, so cancel any pending timer to send keep-alive traffic -func (intf *linkInterface) _cancelRecvTimer() { - intf.RecvFrom(nil, func() { - if intf.recvTimer != nil { - intf.recvTimer.Stop() - intf.recvTimer = nil - } - }) +func (intf *linkInterface) _cancelStallTimer() { + if intf.stallTimer != nil { + intf.stallTimer.Stop() + intf.stallTimer = nil + } } // called by an AfterFunc if we appear to have timed out func (intf *linkInterface) notifyBlockedSend() { - intf.RecvFrom(nil, func() { + intf.RecvFrom(nil, func() { // Sent from a time.AfterFunc if intf.sendTimer != nil { //As far as we know, we're still trying to send, and the timer fired. intf.link.core.switchTable.blockPeer(intf.peer.port) @@ -282,7 +280,7 @@ func (intf *linkInterface) notifyBlockedSend() { // notify the intf that we've finished sending, returning the peer to the switch func (intf *linkInterface) notifySent(size int, isLinkTraffic bool) { - intf.RecvFrom(nil, func() { + intf.RecvFrom(&intf.writer, func() { intf.sendTimer.Stop() intf.sendTimer = nil if !isLinkTraffic { @@ -306,8 +304,9 @@ func (intf *linkInterface) _notifySwitch() { // Set the peer as stalled, to prevent them from returning to the switch until a read succeeds func (intf *linkInterface) notifyStalled() { - intf.RecvFrom(nil, func() { + intf.RecvFrom(nil, func() { // Sent from a time.AfterFunc if intf.stallTimer != nil { + intf.stallTimer.Stop() intf.stallTimer = nil intf.stalled = true intf.link.core.switchTable.blockPeer(intf.peer.port) @@ -316,8 +315,8 @@ func (intf *linkInterface) notifyStalled() { } // reset the close timer -func (intf *linkInterface) notifyReading(from phony.Actor) { - intf.RecvFrom(from, func() { +func (intf *linkInterface) notifyReading() { + intf.RecvFrom(&intf.reader, func() { if intf.closeTimer != nil { intf.closeTimer.Stop() } @@ -326,26 +325,26 @@ func (intf *linkInterface) notifyReading(from phony.Actor) { } // wake up the link if it was stalled, and (if size > 0) prepare to send keep-alive traffic -func (intf *linkInterface) notifyReadFrom(from phony.Actor, size int) { - intf.RecvFrom(from, func() { +func (intf *linkInterface) notifyRead(size int) { + intf.RecvFrom(&intf.reader, func() { if intf.stallTimer != nil { intf.stallTimer.Stop() intf.stallTimer = nil } intf.stalled = false intf._notifySwitch() - if size > 0 && intf.recvTimer == nil { - intf.recvTimer = time.AfterFunc(keepAliveTime, intf.notifyDoKeepAlive) + if size > 0 && intf.stallTimer == nil { + intf.stallTimer = time.AfterFunc(keepAliveTime, intf.notifyDoKeepAlive) } }) } // We need to send keep-alive traffic now func (intf *linkInterface) notifyDoKeepAlive() { - intf.RecvFrom(nil, func() { - if intf.recvTimer != nil { - intf.recvTimer.Stop() - intf.recvTimer = nil + intf.RecvFrom(nil, func() { // Sent from a time.AfterFunc + if intf.stallTimer != nil { + intf.stallTimer.Stop() + intf.stallTimer = nil intf.writer.sendFrom(nil, [][]byte{nil}, true) // Empty keep-alive traffic } }) @@ -383,9 +382,9 @@ type linkReader struct { } func (r *linkReader) _read() { - r.intf.notifyReading(r) + r.intf.notifyReading() msg, err := r.intf.msgIO.readMsg() - r.intf.notifyReadFrom(r, len(msg)) + r.intf.notifyRead(len(msg)) if len(msg) > 0 { r.intf.peer.handlePacketFrom(r, msg) }