From 74ac535d550d30577dcefa8bc6d397356e6f65e2 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Fri, 8 Feb 2019 19:46:11 -0600 Subject: [PATCH 01/11] slightly faster switch logic, should be easier to have a useful tie-breaker for peers that are equally close to the destination via the tree metric --- misc/sim/treesim.go | 4 +++- src/yggdrasil/switch.go | 45 +++++++++++++++++++++-------------------- 2 files changed, 26 insertions(+), 23 deletions(-) diff --git a/misc/sim/treesim.go b/misc/sim/treesim.go index f4cd75f..a62f9ff 100644 --- a/misc/sim/treesim.go +++ b/misc/sim/treesim.go @@ -6,13 +6,15 @@ import "os" import "strings" import "strconv" import "time" -import "log" import "runtime" import "runtime/pprof" import "flag" +import "github.com/gologme/log" + import . "github.com/yggdrasil-network/yggdrasil-go/src/yggdrasil" +import . "github.com/yggdrasil-network/yggdrasil-go/src/crypto" //////////////////////////////////////////////////////////////////////////////// diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index db39d01..b777d89 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -569,23 +569,23 @@ func (t *switchTable) start() error { return nil } -// Check if a packet should go to the self node -// This means there's no node closer to the destination than us -// This is mainly used to identify packets addressed to us, or that hit a blackhole -func (t *switchTable) selfIsClosest(dest []byte) bool { +// Return a map of ports onto distance, keeping only ports closer to the destination than this node +// If the map is empty (or nil), then no peer is closer +func (t *switchTable) getCloser(dest []byte) map[switchPort]int { table := t.getTable() myDist := table.self.dist(dest) if myDist == 0 { // Skip the iteration step if it's impossible to be closer - return true + return nil } + closer := make(map[switchPort]int, len(table.elems)) for _, info := range table.elems { dist := info.locator.dist(dest) if dist < myDist { - return false + closer[info.port] = dist } } - return true + return closer } // Returns true if the peer is closer to the destination than ourself @@ -639,25 +639,26 @@ func (t *switchTable) bestPortForCoords(coords []byte) switchPort { func (t *switchTable) handleIn(packet []byte, idle map[switchPort]struct{}) bool { coords := switch_getPacketCoords(packet) ports := t.core.peers.getPorts() - if t.selfIsClosest(coords) { + closer := t.getCloser(coords) + if len(closer) == 0 { // TODO? call the router directly, and remove the whole concept of a self peer? ports[0].sendPacket(packet) return true } - table := t.getTable() - myDist := table.self.dist(coords) var best *peer - bestDist := myDist - for port := range idle { - if to := ports[port]; to != nil { - if info, isIn := table.elems[to.port]; isIn { - dist := info.locator.dist(coords) - if !(dist < bestDist) { - continue - } - best = to - bestDist = dist - } + var bestDist int + for port, dist := range closer { + to := ports[port] + _, isIdle := idle[port] + switch { + case to == nil: // skip + case !isIdle: // skip + case best == nil: // keep + fallthrough + case dist < bestDist: // keep + best = to + bestDist = dist + default: // skip } } if best != nil { @@ -696,7 +697,7 @@ func (b *switch_buffers) cleanup(t *switchTable) { // Remove queues for which we have no next hop packet := buf.packets[0] coords := switch_getPacketCoords(packet.bytes) - if t.selfIsClosest(coords) { + if len(t.getCloser(coords)) == 0 { for _, packet := range buf.packets { util.PutBytes(packet.bytes) } From 21cecf4630bf365a400ada8aa8fdcfa696029830 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 9 Feb 2019 17:44:25 -0600 Subject: [PATCH 02/11] consistently prioritize which peer to forward to instead of letting it be partly random --- src/yggdrasil/switch.go | 31 +++++++++++++++++++++++++------ 1 file changed, 25 insertions(+), 6 deletions(-) diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index b777d89..cc989d9 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -645,20 +645,39 @@ func (t *switchTable) handleIn(packet []byte, idle map[switchPort]struct{}) bool ports[0].sendPacket(packet) return true } + table := t.getTable() var best *peer var bestDist int + var bestCoordLen int for port, dist := range closer { to := ports[port] _, isIdle := idle[port] + coordLen := len(table.elems[port].locator.coords) + var update bool switch { - case to == nil: // skip - case !isIdle: // skip - case best == nil: // keep - fallthrough - case dist < bestDist: // keep + case to == nil: + //nothing + case !isIdle: + //nothing + case best == nil: + update = true + case dist < bestDist: + update = true + case dist > bestDist: + //nothing + case coordLen < bestCoordLen: + update = true + case coordLen > bestCoordLen: + //nothing + case port < best.port: + update = true + default: + //nothing + } + if update { best = to bestDist = dist - default: // skip + bestCoordLen = coordLen } } if best != nil { From 042adb0516e4516f0d316b786ff611bbb5da5ad8 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 23 Feb 2019 00:07:00 -0600 Subject: [PATCH 03/11] make sure the only place traffic is ever dropped is in the switch. this currently disables the dedicated crypto workers --- src/yggdrasil/link.go | 10 +++++----- src/yggdrasil/router.go | 41 ++++++++++++++++++++++++++++++---------- src/yggdrasil/session.go | 4 +++- src/yggdrasil/switch.go | 12 +++++++----- 4 files changed, 46 insertions(+), 21 deletions(-) diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index ad4b1fa..3cd344d 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -216,8 +216,8 @@ func (intf *linkInterface) handler() error { case signalReady <- struct{}{}: default: } - intf.link.core.log.Debugf("Sending packet to %s: %s, source %s", - strings.ToUpper(intf.info.linkType), themString, intf.info.local) + //intf.link.core.log.Debugf("Sending packet to %s: %s, source %s", + // strings.ToUpper(intf.info.linkType), themString, intf.info.local) } } }() @@ -237,9 +237,9 @@ func (intf *linkInterface) handler() error { recvTimer := time.NewTimer(recvTime) defer util.TimerStop(recvTimer) for { - intf.link.core.log.Debugf("State of %s: %s, source %s :: isAlive %t isReady %t sendTimerRunning %t recvTimerRunning %t", - strings.ToUpper(intf.info.linkType), themString, intf.info.local, - isAlive, isReady, sendTimerRunning, recvTimerRunning) + //intf.link.core.log.Debugf("State of %s: %s, source %s :: isAlive %t isReady %t sendTimerRunning %t recvTimerRunning %t", + // strings.ToUpper(intf.info.linkType), themString, intf.info.local, + // isAlive, isReady, sendTimerRunning, recvTimerRunning) select { case gotMsg, ok := <-signalAlive: if !ok { diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index 99e6982..d84b935 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -68,17 +68,34 @@ func (r *router) init(core *Core) { r.subnet = *address.SubnetForNodeID(&r.core.dht.nodeID) in := make(chan []byte, 32) // TODO something better than this... p := r.core.peers.newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, "(self)", nil) - p.out = func(packet []byte) { - // This is to make very sure it never blocks - select { - case in <- packet: - return - default: - util.PutBytes(packet) - } - } + p.out = func(packet []byte) { in <- packet } r.in = in - r.out = func(packet []byte) { p.handlePacket(packet) } // The caller is responsible for go-ing if it needs to not block + out := make(chan []byte, 32) + go func() { + for packet := range out { + p.handlePacket(packet) + } + }() + out2 := make(chan []byte, 32) + go func() { + // This worker makes sure r.out never blocks + // It will buffer traffic long enough for the switch worker to take it + // If (somehow) you can send faster than the switch can receive, then this would use unbounded memory + // But crypto slows sends enough that the switch should always be able to take the packets... + var buf [][]byte + for { + buf = append(buf, <-out2) + for len(buf) > 0 { + select { + case bs := <-out2: + buf = append(buf, bs) + case out <- buf[0]: + buf = buf[1:] + } + } + } + }() + r.out = func(packet []byte) { out2 <- packet } r.toRecv = make(chan router_recvPacket, 32) recv := make(chan []byte, 32) send := make(chan []byte, 32) @@ -306,6 +323,8 @@ func (r *router) sendPacket(bs []byte) { return } + sinfo.doSend(bs) + return sinfo.send <- bs } } @@ -385,6 +404,8 @@ func (r *router) handleTraffic(packet []byte) { if !isIn { return } + sinfo.doRecv(&p) + return sinfo.recv <- &p } diff --git a/src/yggdrasil/session.go b/src/yggdrasil/session.go index cdabaf2..bffc149 100644 --- a/src/yggdrasil/session.go +++ b/src/yggdrasil/session.go @@ -304,7 +304,7 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo { sinfo.theirSubnet = *address.SubnetForNodeID(crypto.GetNodeID(&sinfo.theirPermPub)) sinfo.send = make(chan []byte, 32) sinfo.recv = make(chan *wire_trafficPacket, 32) - go sinfo.doWorker() + //go sinfo.doWorker() ss.sinfos[sinfo.myHandle] = &sinfo ss.byMySes[sinfo.mySesPub] = &sinfo.myHandle ss.byTheirPerm[sinfo.theirPermPub] = &sinfo.myHandle @@ -625,6 +625,8 @@ func (sinfo *sessionInfo) doRecv(p *wire_trafficPacket) { sinfo.updateNonce(&p.Nonce) sinfo.time = time.Now() sinfo.bytesRecvd += uint64(len(bs)) + sinfo.core.router.recvPacket(bs, sinfo) + return select { case sinfo.core.router.toRecv <- router_recvPacket{bs, sinfo}: default: // avoid deadlocks, maybe do this somewhere else?... diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index 9b8c839..d45b885 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -668,10 +668,12 @@ func (t *switchTable) handleIn(packet []byte, idle map[switchPort]struct{}) bool //nothing case coordLen < bestCoordLen: update = true - case coordLen > bestCoordLen: - //nothing - case port < best.port: - update = true + /* + case coordLen > bestCoordLen: + //nothing + case port < best.port: + update = true + */ default: //nothing } @@ -800,7 +802,7 @@ func (t *switchTable) doWorker() { t.queues.bufs = make(map[string]switch_buffer) // Packets per PacketStreamID (string) idle := make(map[switchPort]struct{}) // this is to deduplicate things for { - t.core.log.Debugf("Switch state: idle = %d, buffers = %d", len(idle), len(t.queues.bufs)) + //t.core.log.Debugf("Switch state: idle = %d, buffers = %d", len(idle), len(t.queues.bufs)) select { case bytes := <-t.packetIn: // Try to send it somewhere (or drop it if it's corrupt or at a dead end) From bb3edd5e556fdb09f83eb8c2d5708f2de7f0dd56 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sun, 24 Feb 2019 12:59:30 -0600 Subject: [PATCH 04/11] add the relevant error to the default logging when a connection is closed --- src/yggdrasil/link.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index 3cd344d..27c3aa2 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -162,8 +162,6 @@ func (intf *linkInterface) handler() error { themString := fmt.Sprintf("%s@%s", themAddrString, intf.info.remote) intf.link.core.log.Infof("Connected %s: %s, source %s", strings.ToUpper(intf.info.linkType), themString, intf.info.local) - defer intf.link.core.log.Infof("Disconnected %s: %s, source %s", - strings.ToUpper(intf.info.linkType), themString, intf.info.local) // Start the link loop go intf.peer.linkLoop() // Start the writer @@ -304,12 +302,13 @@ func (intf *linkInterface) handler() error { }() // Run reader loop for { - msg, err := intf.msgIO.readMsg() + var msg []byte + msg, err = intf.msgIO.readMsg() if len(msg) > 0 { intf.peer.handlePacket(msg) } if err != nil { - return err + break } select { case signalAlive <- len(msg) > 0: @@ -317,5 +316,8 @@ func (intf *linkInterface) handler() error { } } //////////////////////////////////////////////////////////////////////////////// - return nil + // Remember to set `err` to something useful before returning + intf.link.core.log.Infof("Disconnected %s: %s, source %s, reason: %s", + strings.ToUpper(intf.info.linkType), themString, intf.info.local, err) + return err } From 654407dc6d1dc5724420d7e08f2064d0e5b32a68 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sun, 24 Feb 2019 13:24:55 -0600 Subject: [PATCH 05/11] close long-dead connections in link.go instead of in switch.go, this is important in case a connection opens but never bothers to send even one switch message --- src/yggdrasil/link.go | 12 +++++++++++- src/yggdrasil/switch.go | 23 ----------------------- 2 files changed, 11 insertions(+), 24 deletions(-) diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index 27c3aa2..df9625d 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -228,12 +228,15 @@ func (intf *linkInterface) handler() error { var isReady bool var sendTimerRunning bool var recvTimerRunning bool - recvTime := 6 * time.Second // TODO set to ReadTimeout from the config, reset if it gets changed + recvTime := 6 * time.Second // TODO set to ReadTimeout from the config, reset if it gets changed + closeTime := 2 * switch_timeout // TODO or maybe this makes more sense for ReadTimeout?... sendTime := time.Second sendTimer := time.NewTimer(sendTime) defer util.TimerStop(sendTimer) recvTimer := time.NewTimer(recvTime) defer util.TimerStop(recvTimer) + closeTimer := time.NewTimer(closeTime) + defer util.TimerStop(closeTimer) for { //intf.link.core.log.Debugf("State of %s: %s, source %s :: isAlive %t isReady %t sendTimerRunning %t recvTimerRunning %t", // strings.ToUpper(intf.info.linkType), themString, intf.info.local, @@ -243,6 +246,7 @@ func (intf *linkInterface) handler() error { if !ok { return } + util.TimerStop(closeTimer) util.TimerStop(recvTimer) recvTimerRunning = false isAlive = true @@ -274,6 +278,8 @@ func (intf *linkInterface) handler() error { // Start a timer, if it expires and we haven't gotten any return traffic (including a 0-sized ack), then assume there's a problem util.TimerStop(recvTimer) recvTimer.Reset(recvTime) + util.TimerStop(closeTimer) + closeTimer.Reset(closeTime) recvTimerRunning = true } case _, ok := <-signalReady: @@ -297,6 +303,10 @@ func (intf *linkInterface) handler() error { case <-recvTimer.C: // We haven't received anything, so assume there's a problem and don't return this node to the switch until they start responding isAlive = false + case <-closeTimer.C: + // We haven't received anything in a really long time, so things have died at the switch level and then some... + // Just close the connection at this point... + intf.msgIO.close() } } }() diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index d45b885..1b611af 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -215,7 +215,6 @@ func (t *switchTable) doMaintenance() { defer t.mutex.Unlock() // Release lock when we're done t.cleanRoot() t.cleanDropped() - t.cleanPeers() } // Updates the root periodically if it is ourself, or promotes ourself to root if we're better than the current root or if the current root has timed out. @@ -272,28 +271,6 @@ func (t *switchTable) forgetPeer(port switchPort) { } } -// Clean all unresponsive peers from the table, needed in case a peer stops updating. -// Needed in case a non-parent peer keeps the connection open but stops sending updates. -// Also reclaims space from deleted peers by copying the map. -func (t *switchTable) cleanPeers() { - now := time.Now() - for port, peer := range t.data.peers { - if now.Sub(peer.time) > switch_timeout+switch_throttle { - // Longer than switch_timeout to make sure we don't remove a working peer because the root stopped responding. - delete(t.data.peers, port) - go t.core.peers.removePeer(port) // TODO figure out if it's safe to do this without a goroutine, or make it safe - } - } - if _, isIn := t.data.peers[t.parent]; !isIn { - // The root timestamp would probably time out before this happens, but better safe than sorry. - // We removed the current parent, so find a new one. - t.parent = 0 - for _, peer := range t.data.peers { - t.unlockedHandleMsg(&peer.msg, peer.port, true) - } - } -} - // Dropped is a list of roots that are better than the current root, but stopped sending new timestamps. // If we switch to a new root, and that root is better than an old root that previously timed out, then we can clean up the old dropped root infos. // This function is called periodically to do that cleanup. From def4fb358787a73f9e2f44180b5363e560ceb29a Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sun, 24 Feb 2019 14:48:16 -0600 Subject: [PATCH 06/11] fix timeout and improve logging on connection close --- src/yggdrasil/link.go | 29 +++++++++++++++++++++++------ 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index df9625d..443b594 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -4,6 +4,7 @@ import ( "encoding/hex" "errors" "fmt" + "io" "net" "strings" "sync" @@ -223,6 +224,7 @@ func (intf *linkInterface) handler() error { // Used to enable/disable activity in the switch signalAlive := make(chan bool, 1) // True = real packet, false = keep-alive defer close(signalAlive) + ret := make(chan error, 1) // How we signal the return value when multiple goroutines are involved go func() { var isAlive bool var isReady bool @@ -247,6 +249,7 @@ func (intf *linkInterface) handler() error { return } util.TimerStop(closeTimer) + closeTimer.Reset(closeTime) util.TimerStop(recvTimer) recvTimerRunning = false isAlive = true @@ -278,8 +281,6 @@ func (intf *linkInterface) handler() error { // Start a timer, if it expires and we haven't gotten any return traffic (including a 0-sized ack), then assume there's a problem util.TimerStop(recvTimer) recvTimer.Reset(recvTime) - util.TimerStop(closeTimer) - closeTimer.Reset(closeTime) recvTimerRunning = true } case _, ok := <-signalReady: @@ -306,18 +307,27 @@ func (intf *linkInterface) handler() error { case <-closeTimer.C: // We haven't received anything in a really long time, so things have died at the switch level and then some... // Just close the connection at this point... + select { + case ret <- errors.New("timeout"): + default: + } intf.msgIO.close() } } }() // Run reader loop for { - var msg []byte - msg, err = intf.msgIO.readMsg() + msg, err := intf.msgIO.readMsg() if len(msg) > 0 { intf.peer.handlePacket(msg) } if err != nil { + if err != io.EOF { + select { + case ret <- err: + default: + } + } break } select { @@ -327,7 +337,14 @@ func (intf *linkInterface) handler() error { } //////////////////////////////////////////////////////////////////////////////// // Remember to set `err` to something useful before returning - intf.link.core.log.Infof("Disconnected %s: %s, source %s, reason: %s", - strings.ToUpper(intf.info.linkType), themString, intf.info.local, err) + select { + case err = <-ret: + intf.link.core.log.Infof("Disconnected %s: %s, source %s; error: %s", + strings.ToUpper(intf.info.linkType), themString, intf.info.local, err) + default: + err = nil + intf.link.core.log.Infof("Disconnected %s: %s, source %s", + strings.ToUpper(intf.info.linkType), themString, intf.info.local) + } return err } From 25692420501e7252c2615382b8115d7119008eb2 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Tue, 26 Feb 2019 21:07:56 -0600 Subject: [PATCH 07/11] fixes to linkInterface.handler() --- src/util/util.go | 20 ++++++++++++++++++++ src/yggdrasil/link.go | 11 ++++++++--- 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/src/util/util.go b/src/util/util.go index 45be3b1..df15ff2 100644 --- a/src/util/util.go +++ b/src/util/util.go @@ -56,3 +56,23 @@ func TimerStop(t *time.Timer) bool { } return true } + +// Run a blocking function with a timeout. +// Returns true if the function returns. +// Returns false if the timer fires. +// The blocked function remains blocked--the caller is responsible for somehow killing it. +func FuncTimeout(f func(), timeout time.Duration) bool { + success := make(chan struct{}) + go func() { + defer close(success) + f() + }() + timer := time.NewTimer(timeout) + defer TimerStop(timer) + select { + case <-success: + return true + case <-timer.C: + return false + } +} diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index 443b594..06020cd 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -92,11 +92,16 @@ func (intf *linkInterface) handler() error { meta.link = *myLinkPub metaBytes := meta.encode() // TODO timeouts on send/recv (goroutine for send/recv, channel select w/ timer) - err := intf.msgIO._sendMetaBytes(metaBytes) + var err error + if !util.FuncTimeout(func() { err = intf.msgIO._sendMetaBytes(metaBytes) }, 30*time.Second) { + return errors.New("timeout on metadata send") + } if err != nil { return err } - metaBytes, err = intf.msgIO._recvMetaBytes() + if !util.FuncTimeout(func() { metaBytes, err = intf.msgIO._recvMetaBytes() }, 30*time.Second) { + return errors.New("timeout on metadata recv") + } if err != nil { return err } @@ -110,7 +115,7 @@ func (intf *linkInterface) handler() error { return errors.New("failed to connect: wrong version") } // Check if we're authorized to connect to this key / IP - if !intf.force && !intf.link.core.peers.isAllowedEncryptionPublicKey(&meta.box) { + if !intf.incoming && !intf.force && !intf.link.core.peers.isAllowedEncryptionPublicKey(&meta.box) { intf.link.core.log.Debugf("%s connection to %s forbidden: AllowedEncryptionPublicKeys does not contain key %s", strings.ToUpper(intf.info.linkType), intf.info.remote, hex.EncodeToString(meta.box[:])) intf.msgIO.close() From 371b5ca6a2115d88fa5fe5f21f8d9624c65a766e Mon Sep 17 00:00:00 2001 From: Arceliar Date: Thu, 28 Feb 2019 18:49:34 -0600 Subject: [PATCH 08/11] Change log message about AllowedEncryptionPublicKeys from Debug to Warn --- src/yggdrasil/link.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index 06020cd..6fc7687 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -116,7 +116,7 @@ func (intf *linkInterface) handler() error { } // Check if we're authorized to connect to this key / IP if !intf.incoming && !intf.force && !intf.link.core.peers.isAllowedEncryptionPublicKey(&meta.box) { - intf.link.core.log.Debugf("%s connection to %s forbidden: AllowedEncryptionPublicKeys does not contain key %s", + intf.link.core.log.Warnf("%s connection to %s forbidden: AllowedEncryptionPublicKeys does not contain key %s", strings.ToUpper(intf.info.linkType), intf.info.remote, hex.EncodeToString(meta.box[:])) intf.msgIO.close() return nil From 06df791efc20ceec653c81dd624ff789eecd5fb6 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Thu, 28 Feb 2019 19:08:56 -0600 Subject: [PATCH 09/11] buffer packets moving from the switch to the router, allow them front drop if there's too many --- src/yggdrasil/switch.go | 33 +++++++++++++++++++++++++++++++-- 1 file changed, 31 insertions(+), 2 deletions(-) diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index 1b611af..bf6b919 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -176,6 +176,7 @@ type switchTable struct { admin chan func() // Pass a lambda for the admin socket to query stuff queues switch_buffers // Queues - not atomic so ONLY use through admin chan queueTotalMaxSize uint64 // Maximum combined size of queues + toRouter chan []byte // Packets to be sent to the router } // Minimum allowed total size of switch queues. @@ -199,6 +200,7 @@ func (t *switchTable) init(core *Core) { t.idleIn = make(chan switchPort, 1024) t.admin = make(chan func()) t.queueTotalMaxSize = SwitchQueueTotalMinSize + t.toRouter = make(chan []byte, 1) } // Safely gets a copy of this node's locator. @@ -616,17 +618,17 @@ func (t *switchTable) bestPortForCoords(coords []byte) switchPort { // Returns true if the packet has been handled somehow, false if it should be queued func (t *switchTable) handleIn(packet []byte, idle map[switchPort]struct{}) bool { coords := switch_getPacketCoords(packet) - ports := t.core.peers.getPorts() closer := t.getCloser(coords) if len(closer) == 0 { // TODO? call the router directly, and remove the whole concept of a self peer? - ports[0].sendPacket(packet) + t.toRouter <- packet return true } table := t.getTable() var best *peer var bestDist int var bestCoordLen int + ports := t.core.peers.getPorts() for port, dist := range closer { to := ports[port] _, isIdle := idle[port] @@ -775,6 +777,33 @@ func (t *switchTable) handleIdle(port switchPort) bool { // The switch worker does routing lookups and sends packets to where they need to be func (t *switchTable) doWorker() { + sendingToRouter := make(chan []byte, 1) + go func() { + // Keep sending packets to the router + self := t.core.peers.getPorts()[0] + for bs := range sendingToRouter { + self.sendPacket(bs) + } + }() + go func() { + // Keep taking packets from the idle worker and sending them to the above whenever it's idle, keeping anything extra in a (fifo, head-drop) buffer + var buf [][]byte + for { + buf = append(buf, <-t.toRouter) + for len(buf) > 0 { + select { + case bs := <-t.toRouter: + buf = append(buf, bs) + for len(buf) > 32 { + util.PutBytes(buf[0]) + buf = buf[1:] + } + case sendingToRouter <- buf[0]: + buf = buf[1:] + } + } + } + }() t.queues.switchTable = t t.queues.bufs = make(map[string]switch_buffer) // Packets per PacketStreamID (string) idle := make(map[switchPort]struct{}) // this is to deduplicate things From 304f22dc1d23807d7f612f3d32433121b4876c5c Mon Sep 17 00:00:00 2001 From: Arceliar Date: Thu, 28 Feb 2019 20:05:21 -0600 Subject: [PATCH 10/11] re-enable session workers in a way that doesn't block and drops packets before decrypting if necessary --- src/yggdrasil/router.go | 7 +------ src/yggdrasil/session.go | 32 +++++++++++++++++++++++--------- 2 files changed, 24 insertions(+), 15 deletions(-) diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index d84b935..6acd473 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -66,7 +66,7 @@ func (r *router) init(core *Core) { r.reconfigure = make(chan chan error, 1) r.addr = *address.AddrForNodeID(&r.core.dht.nodeID) r.subnet = *address.SubnetForNodeID(&r.core.dht.nodeID) - in := make(chan []byte, 32) // TODO something better than this... + in := make(chan []byte, 1) // TODO something better than this... p := r.core.peers.newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, "(self)", nil) p.out = func(packet []byte) { in <- packet } r.in = in @@ -322,9 +322,6 @@ func (r *router) sendPacket(bs []byte) { // Don't continue - drop the packet return } - - sinfo.doSend(bs) - return sinfo.send <- bs } } @@ -404,8 +401,6 @@ func (r *router) handleTraffic(packet []byte) { if !isIn { return } - sinfo.doRecv(&p) - return sinfo.recv <- &p } diff --git a/src/yggdrasil/session.go b/src/yggdrasil/session.go index bffc149..012af57 100644 --- a/src/yggdrasil/session.go +++ b/src/yggdrasil/session.go @@ -304,7 +304,7 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo { sinfo.theirSubnet = *address.SubnetForNodeID(crypto.GetNodeID(&sinfo.theirPermPub)) sinfo.send = make(chan []byte, 32) sinfo.recv = make(chan *wire_trafficPacket, 32) - //go sinfo.doWorker() + go sinfo.doWorker() ss.sinfos[sinfo.myHandle] = &sinfo ss.byMySes[sinfo.mySesPub] = &sinfo.myHandle ss.byTheirPerm[sinfo.theirPermPub] = &sinfo.myHandle @@ -525,17 +525,36 @@ func (ss *sessions) resetInits() { // It handles calling the relatively expensive crypto operations. // It's also responsible for checking nonces and dropping out-of-date/duplicate packets, or else calling the function to update nonces if the packet is OK. func (sinfo *sessionInfo) doWorker() { + send := make(chan []byte, 32) + defer close(send) + go func() { + for bs := range send { + sinfo.doSend(bs) + } + }() + recv := make(chan *wire_trafficPacket, 32) + defer close(recv) + go func() { + for p := range recv { + sinfo.doRecv(p) + } + }() for { select { case p, ok := <-sinfo.recv: if ok { - sinfo.doRecv(p) + select { + case recv <- p: + default: + // We need something to not block, and it's best to drop it before we decrypt + util.PutBytes(p.Payload) + } } else { return } case bs, ok := <-sinfo.send: if ok { - sinfo.doSend(bs) + send <- bs } else { return } @@ -625,10 +644,5 @@ func (sinfo *sessionInfo) doRecv(p *wire_trafficPacket) { sinfo.updateNonce(&p.Nonce) sinfo.time = time.Now() sinfo.bytesRecvd += uint64(len(bs)) - sinfo.core.router.recvPacket(bs, sinfo) - return - select { - case sinfo.core.router.toRecv <- router_recvPacket{bs, sinfo}: - default: // avoid deadlocks, maybe do this somewhere else?... - } + sinfo.core.router.toRecv <- router_recvPacket{bs, sinfo} } From a6ae159329938f8252f2495fcc79abafcc1eaf86 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Fri, 1 Mar 2019 18:26:52 +0000 Subject: [PATCH 11/11] Give some more feedback that a configuration reload actually happens --- src/yggdrasil/core.go | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/src/yggdrasil/core.go b/src/yggdrasil/core.go index 2e23dd1..0443328 100644 --- a/src/yggdrasil/core.go +++ b/src/yggdrasil/core.go @@ -125,11 +125,15 @@ func (c *Core) addPeerLoop() { // UpdateConfig updates the configuration in Core and then signals the // various module goroutines to reconfigure themselves if needed func (c *Core) UpdateConfig(config *config.NodeConfig) { + c.log.Infoln("Reloading configuration...") + c.configMutex.Lock() c.configOld = c.config c.config = *config c.configMutex.Unlock() + errors := 0 + components := []chan chan error{ c.admin.reconfigure, c.searches.reconfigure, @@ -148,9 +152,16 @@ func (c *Core) UpdateConfig(config *config.NodeConfig) { response := make(chan error) component <- response if err := <-response; err != nil { - c.log.Println(err) + c.log.Errorln(err) + errors++ } } + + if errors > 0 { + c.log.Warnln(errors, "modules reported errors during configuration reload") + } else { + c.log.Infoln("Configuration reloaded successfully") + } } // GetBuildName gets the current build name. This is usually injected if built