5
0
mirror of https://github.com/cwinfo/yggdrasil-go.git synced 2024-11-29 14:21:37 +00:00

more link updates

This commit is contained in:
Arceliar 2019-08-26 18:37:38 -05:00
parent f432875d87
commit 4d9c6342a7

View File

@ -46,23 +46,23 @@ type linkInterfaceMsgIO interface {
} }
type linkInterface struct { type linkInterface struct {
name string name string
link *link link *link
peer *peer peer *peer
msgIO linkInterfaceMsgIO msgIO linkInterfaceMsgIO
info linkInfo info linkInfo
incoming bool incoming bool
force bool force bool
closed chan struct{} closed chan struct{}
reader linkReader // Reads packets, notifies this linkInterface, passes packets to switch reader linkReader // Reads packets, notifies this linkInterface, passes packets to switch
writer linkWriter // Writes packets, notifies this linkInterface writer linkWriter // Writes packets, notifies this linkInterface
phony.Inbox // Protects the below phony.Inbox // Protects the below
sendTimer *time.Timer // Fires to signal that sending is blocked sendTimer *time.Timer // Fires to signal that sending is blocked
stallTimer *time.Timer // Fires to signal that no incoming traffic (including keep-alive) has been seen keepAliveTimer *time.Timer // Fires to send keep-alive traffic
recvTimer *time.Timer // Fires to send keep-alive traffic stallTimer *time.Timer // Fires to signal that no incoming traffic (including keep-alive) has been seen
closeTimer *time.Timer // Fires when the link has been idle so long we need to close it closeTimer *time.Timer // Fires when the link has been idle so long we need to close it
inSwitch bool // True if the switch is tracking this link inSwitch bool // True if the switch is tracking this link
stalled bool // True if we haven't been receiving any response traffic stalled bool // True if we haven't been receiving any response traffic
} }
func (l *link) init(c *Core) error { func (l *link) init(c *Core) error {
@ -243,36 +243,34 @@ func (intf *linkInterface) handler() error {
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
const ( const (
sendBlockedTime = time.Second // How long to wait before deciding a send is blocked sendTime = 1 * time.Second // How long to wait before deciding a send is blocked
keepAliveTime = 2 * time.Second // How long to wait before sending a keep-alive response if we have no real traffic to send keepAliveTime = 2 * time.Second // How long to wait before sending a keep-alive response if we have no real traffic to send
stallTime = 6 * time.Second // How long to wait for response traffic before deciding the connection has stalled stallTime = 6 * time.Second // How long to wait for response traffic before deciding the connection has stalled
closeTime = 2 * switch_timeout // How long to wait before closing the link closeTime = 2 * switch_timeout // How long to wait before closing the link
) )
// notify the intf that we're currently sending // notify the intf that we're currently sending
func (intf *linkInterface) notifySending(size int, isLinkTraffic bool) { func (intf *linkInterface) notifySending(size int, isLinkTraffic bool) {
intf.RecvFrom(nil, func() { intf.RecvFrom(&intf.writer, func() {
if !isLinkTraffic { if !isLinkTraffic {
intf.inSwitch = false intf.inSwitch = false
} }
intf.sendTimer = time.AfterFunc(sendBlockedTime, intf.notifyBlockedSend) intf.sendTimer = time.AfterFunc(sendTime, intf.notifyBlockedSend)
intf._cancelRecvTimer() intf._cancelStallTimer()
}) })
} }
// we just sent something, so cancel any pending timer to send keep-alive traffic // we just sent something, so cancel any pending timer to send keep-alive traffic
func (intf *linkInterface) _cancelRecvTimer() { func (intf *linkInterface) _cancelStallTimer() {
intf.RecvFrom(nil, func() { if intf.stallTimer != nil {
if intf.recvTimer != nil { intf.stallTimer.Stop()
intf.recvTimer.Stop() intf.stallTimer = nil
intf.recvTimer = nil }
}
})
} }
// called by an AfterFunc if we appear to have timed out // called by an AfterFunc if we appear to have timed out
func (intf *linkInterface) notifyBlockedSend() { func (intf *linkInterface) notifyBlockedSend() {
intf.RecvFrom(nil, func() { intf.RecvFrom(nil, func() { // Sent from a time.AfterFunc
if intf.sendTimer != nil { if intf.sendTimer != nil {
//As far as we know, we're still trying to send, and the timer fired. //As far as we know, we're still trying to send, and the timer fired.
intf.link.core.switchTable.blockPeer(intf.peer.port) intf.link.core.switchTable.blockPeer(intf.peer.port)
@ -282,7 +280,7 @@ func (intf *linkInterface) notifyBlockedSend() {
// notify the intf that we've finished sending, returning the peer to the switch // notify the intf that we've finished sending, returning the peer to the switch
func (intf *linkInterface) notifySent(size int, isLinkTraffic bool) { func (intf *linkInterface) notifySent(size int, isLinkTraffic bool) {
intf.RecvFrom(nil, func() { intf.RecvFrom(&intf.writer, func() {
intf.sendTimer.Stop() intf.sendTimer.Stop()
intf.sendTimer = nil intf.sendTimer = nil
if !isLinkTraffic { if !isLinkTraffic {
@ -306,8 +304,9 @@ func (intf *linkInterface) _notifySwitch() {
// Set the peer as stalled, to prevent them from returning to the switch until a read succeeds // Set the peer as stalled, to prevent them from returning to the switch until a read succeeds
func (intf *linkInterface) notifyStalled() { func (intf *linkInterface) notifyStalled() {
intf.RecvFrom(nil, func() { intf.RecvFrom(nil, func() { // Sent from a time.AfterFunc
if intf.stallTimer != nil { if intf.stallTimer != nil {
intf.stallTimer.Stop()
intf.stallTimer = nil intf.stallTimer = nil
intf.stalled = true intf.stalled = true
intf.link.core.switchTable.blockPeer(intf.peer.port) intf.link.core.switchTable.blockPeer(intf.peer.port)
@ -316,8 +315,8 @@ func (intf *linkInterface) notifyStalled() {
} }
// reset the close timer // reset the close timer
func (intf *linkInterface) notifyReading(from phony.Actor) { func (intf *linkInterface) notifyReading() {
intf.RecvFrom(from, func() { intf.RecvFrom(&intf.reader, func() {
if intf.closeTimer != nil { if intf.closeTimer != nil {
intf.closeTimer.Stop() intf.closeTimer.Stop()
} }
@ -326,26 +325,26 @@ func (intf *linkInterface) notifyReading(from phony.Actor) {
} }
// wake up the link if it was stalled, and (if size > 0) prepare to send keep-alive traffic // wake up the link if it was stalled, and (if size > 0) prepare to send keep-alive traffic
func (intf *linkInterface) notifyReadFrom(from phony.Actor, size int) { func (intf *linkInterface) notifyRead(size int) {
intf.RecvFrom(from, func() { intf.RecvFrom(&intf.reader, func() {
if intf.stallTimer != nil { if intf.stallTimer != nil {
intf.stallTimer.Stop() intf.stallTimer.Stop()
intf.stallTimer = nil intf.stallTimer = nil
} }
intf.stalled = false intf.stalled = false
intf._notifySwitch() intf._notifySwitch()
if size > 0 && intf.recvTimer == nil { if size > 0 && intf.stallTimer == nil {
intf.recvTimer = time.AfterFunc(keepAliveTime, intf.notifyDoKeepAlive) intf.stallTimer = time.AfterFunc(keepAliveTime, intf.notifyDoKeepAlive)
} }
}) })
} }
// We need to send keep-alive traffic now // We need to send keep-alive traffic now
func (intf *linkInterface) notifyDoKeepAlive() { func (intf *linkInterface) notifyDoKeepAlive() {
intf.RecvFrom(nil, func() { intf.RecvFrom(nil, func() { // Sent from a time.AfterFunc
if intf.recvTimer != nil { if intf.stallTimer != nil {
intf.recvTimer.Stop() intf.stallTimer.Stop()
intf.recvTimer = nil intf.stallTimer = nil
intf.writer.sendFrom(nil, [][]byte{nil}, true) // Empty keep-alive traffic intf.writer.sendFrom(nil, [][]byte{nil}, true) // Empty keep-alive traffic
} }
}) })
@ -383,9 +382,9 @@ type linkReader struct {
} }
func (r *linkReader) _read() { func (r *linkReader) _read() {
r.intf.notifyReading(r) r.intf.notifyReading()
msg, err := r.intf.msgIO.readMsg() msg, err := r.intf.msgIO.readMsg()
r.intf.notifyReadFrom(r, len(msg)) r.intf.notifyRead(len(msg))
if len(msg) > 0 { if len(msg) > 0 {
r.intf.peer.handlePacketFrom(r, msg) r.intf.peer.handlePacketFrom(r, msg)
} }