5
0
mirror of https://github.com/cwinfo/yggdrasil-go.git synced 2024-11-26 07:11:40 +00:00

some peer/link cleanup

This commit is contained in:
Arceliar 2020-05-23 10:08:23 -05:00
parent cf2edc99d1
commit 59c5644a52
2 changed files with 21 additions and 23 deletions

View File

@ -62,7 +62,6 @@ type linkInterface struct {
keepAliveTimer *time.Timer // Fires to send keep-alive traffic keepAliveTimer *time.Timer // Fires to send keep-alive traffic
stallTimer *time.Timer // Fires to signal that no incoming traffic (including keep-alive) has been seen stallTimer *time.Timer // Fires to signal that no incoming traffic (including keep-alive) has been seen
closeTimer *time.Timer // Fires when the link has been idle so long we need to close it closeTimer *time.Timer // Fires when the link has been idle so long we need to close it
isIdle bool // True if the peer actor knows the link is idle
isSending bool // True between a notifySending and a notifySent isSending bool // True between a notifySending and a notifySent
blocked bool // True if we've blocked the peer in the switch blocked bool // True if we've blocked the peer in the switch
} }
@ -279,7 +278,7 @@ func (intf *linkInterface) out(bss [][]byte) {
// nil to prevent it from blocking if the link is somehow frozen // nil to prevent it from blocking if the link is somehow frozen
// this is safe because another packet won't be sent until the link notifies // this is safe because another packet won't be sent until the link notifies
// the peer that it's ready for one // the peer that it's ready for one
intf.writer.sendFrom(nil, bss, false) intf.writer.sendFrom(nil, bss)
}) })
} }
@ -290,7 +289,7 @@ func (intf *linkInterface) linkOut(bs []byte) {
// additional packets until this one finishes, otherwise this could leak // additional packets until this one finishes, otherwise this could leak
// memory if writing happens slower than link packets are generated... // memory if writing happens slower than link packets are generated...
// that seems unlikely, so it's a lesser evil than deadlocking for now // that seems unlikely, so it's a lesser evil than deadlocking for now
intf.writer.sendFrom(nil, [][]byte{bs}, true) intf.writer.sendFrom(nil, [][]byte{bs})
}) })
} }
@ -332,11 +331,8 @@ const (
) )
// notify the intf that we're currently sending // notify the intf that we're currently sending
func (intf *linkInterface) notifySending(size int, isLinkTraffic bool) { func (intf *linkInterface) notifySending(size int) {
intf.Act(&intf.writer, func() { intf.Act(&intf.writer, func() {
if !isLinkTraffic {
intf.isIdle = false
}
intf.isSending = true intf.isSending = true
intf.sendTimer = time.AfterFunc(sendTime, intf.notifyBlockedSend) intf.sendTimer = time.AfterFunc(sendTime, intf.notifyBlockedSend)
intf._cancelStallTimer() intf._cancelStallTimer()
@ -365,13 +361,18 @@ func (intf *linkInterface) notifyBlockedSend() {
} }
// notify the intf that we've finished sending, returning the peer to the switch // notify the intf that we've finished sending, returning the peer to the switch
func (intf *linkInterface) notifySent(size int, isLinkTraffic bool) { func (intf *linkInterface) notifySent(size int) {
intf.Act(&intf.writer, func() { intf.Act(&intf.writer, func() {
if intf.sendTimer != nil {
intf.sendTimer.Stop() intf.sendTimer.Stop()
intf.sendTimer = nil intf.sendTimer = nil
if !isLinkTraffic {
intf._notifyIdle()
} }
if intf.keepAliveTimer != nil {
// TODO? unset this when we start sending, not when we finish...
intf.keepAliveTimer.Stop()
intf.keepAliveTimer = nil
}
intf._notifyIdle()
intf.isSending = false intf.isSending = false
if size > 0 && intf.stallTimer == nil { if size > 0 && intf.stallTimer == nil {
intf.stallTimer = time.AfterFunc(stallTime, intf.notifyStalled) intf.stallTimer = time.AfterFunc(stallTime, intf.notifyStalled)
@ -381,11 +382,8 @@ func (intf *linkInterface) notifySent(size int, isLinkTraffic bool) {
// Notify the peer that we're ready for more traffic // Notify the peer that we're ready for more traffic
func (intf *linkInterface) _notifyIdle() { func (intf *linkInterface) _notifyIdle() {
if !intf.isIdle {
intf.isIdle = true
intf.peer.Act(intf, intf.peer._handleIdle) intf.peer.Act(intf, intf.peer._handleIdle)
} }
}
// Set the peer as stalled, to prevent them from returning to the switch until a read succeeds // Set the peer as stalled, to prevent them from returning to the switch until a read succeeds
func (intf *linkInterface) notifyStalled() { func (intf *linkInterface) notifyStalled() {
@ -416,8 +414,8 @@ func (intf *linkInterface) notifyRead(size int) {
intf.stallTimer.Stop() intf.stallTimer.Stop()
intf.stallTimer = nil intf.stallTimer = nil
} }
if size > 0 && intf.stallTimer == nil { if size > 0 && intf.keepAliveTimer == nil {
intf.stallTimer = time.AfterFunc(keepAliveTime, intf.notifyDoKeepAlive) intf.keepAliveTimer = time.AfterFunc(keepAliveTime, intf.notifyDoKeepAlive)
} }
if intf.blocked { if intf.blocked {
intf.blocked = false intf.blocked = false
@ -432,7 +430,7 @@ func (intf *linkInterface) notifyDoKeepAlive() {
if intf.stallTimer != nil { if intf.stallTimer != nil {
intf.stallTimer.Stop() intf.stallTimer.Stop()
intf.stallTimer = nil intf.stallTimer = nil
intf.writer.sendFrom(nil, [][]byte{nil}, true) // Empty keep-alive traffic intf.writer.sendFrom(nil, [][]byte{nil}) // Empty keep-alive traffic
} }
}) })
} }
@ -446,7 +444,7 @@ type linkWriter struct {
closed bool closed bool
} }
func (w *linkWriter) sendFrom(from phony.Actor, bss [][]byte, isLinkTraffic bool) { func (w *linkWriter) sendFrom(from phony.Actor, bss [][]byte) {
w.Act(from, func() { w.Act(from, func() {
if w.closed { if w.closed {
return return
@ -455,9 +453,9 @@ func (w *linkWriter) sendFrom(from phony.Actor, bss [][]byte, isLinkTraffic bool
for _, bs := range bss { for _, bs := range bss {
size += len(bs) size += len(bs)
} }
w.intf.notifySending(size, isLinkTraffic) w.intf.notifySending(size)
w.worker <- bss w.worker <- bss
w.intf.notifySent(size, isLinkTraffic) w.intf.notifySent(size)
}) })
} }

View File

@ -317,7 +317,7 @@ func (p *peer) dropFromQueue(from phony.Actor, seq uint64) {
p.Act(from, func() { p.Act(from, func() {
if seq == p.seq { if seq == p.seq {
p.drop = true p.drop = true
p.max = p.queue.size p.max = p.queue.size + streamMsgSize
} }
}) })
} }