From 48bbdac9b3596c513e2530a5bc20dad6bfc4faea Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 24 Aug 2019 16:27:12 -0500 Subject: [PATCH] add a helper actor to the link reader to make it play nicer with backpressure --- src/yggdrasil/link.go | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index 4b7f766..98f44bc 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -16,6 +16,8 @@ import ( "github.com/yggdrasil-network/yggdrasil-go/src/address" "github.com/yggdrasil-network/yggdrasil-go/src/crypto" "github.com/yggdrasil-network/yggdrasil-go/src/util" + + "github.com/Arceliar/phony" ) type link struct { @@ -388,14 +390,17 @@ func (intf *linkInterface) handler() error { } }() // Run reader loop - for { + var helper phony.Actor + done := make(chan struct{}) + var helperFunc func() + helperFunc = func() { + // The helper reads in a loop and sends to the peer + // It loops by sending itself a message, which can be delayed by backpressure + // So if the peer is busy, backpressure will pause reading until the peer catches up msg, err := intf.msgIO.readMsg() if len(msg) > 0 { // TODO rewrite this if the link becomes an actor - // FIXME this could theoretically send traffic faster than the peer can handle - // The alternative is to SyncExec, but that causes traffic to block while *other* links work - // Need to figure out why and find a workaround - intf.peer.handlePacketFrom(nil, msg) + intf.peer.handlePacketFrom(&helper, msg) } if err != nil { if err != io.EOF { @@ -404,13 +409,19 @@ func (intf *linkInterface) handler() error { default: } } - break + close(done) + return } select { case signalAlive <- len(msg) > 0: default: } + // Now try to read again + helper.EnqueueFrom(nil, helperFunc) } + // Start the read loop + helper.EnqueueFrom(nil, helperFunc) + <-done // Wait for the helper to exit //////////////////////////////////////////////////////////////////////////////// // Remember to set `err` to something useful before returning select {