From 3faa0b28545bdf5fbc384a18ad23e79230288695 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Thu, 6 Feb 2020 20:47:53 -0600 Subject: [PATCH 1/6] deduplicate the list of nodes to visit in a search (keeping newest rumors) --- src/yggdrasil/search.go | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/src/yggdrasil/search.go b/src/yggdrasil/search.go index ede4547..7df3bdd 100644 --- a/src/yggdrasil/search.go +++ b/src/yggdrasil/search.go @@ -97,8 +97,19 @@ func (sinfo *searchInfo) handleDHTRes(res *dhtRes) { // Adds the information from a dhtRes to an ongoing search. // Info about a node that has already been visited is not re-added to the search. func (sinfo *searchInfo) addToSearch(res *dhtRes) { - // Add to search + // Get a (deduplicated) list of known nodes to check + temp := make(map[crypto.NodeID]*dhtInfo, len(sinfo.toVisit)+len(res.Infos)) + for _, info := range sinfo.toVisit { + temp[*info.getNodeID()] = info + } + // Add new results to the list for _, info := range res.Infos { + temp[*info.getNodeID()] = info + } + // Move list to toVisit + delete(temp, sinfo.visited) + sinfo.toVisit = sinfo.toVisit[:0] + for _, info := range temp { sinfo.toVisit = append(sinfo.toVisit, info) } // Sort @@ -106,9 +117,9 @@ func (sinfo *searchInfo) addToSearch(res *dhtRes) { // Should return true if i is closer to the destination than j return dht_ordered(&sinfo.dest, sinfo.toVisit[i].getNodeID(), sinfo.toVisit[j].getNodeID()) }) - // Remove anything too far away + // Remove anything too far away to be useful for idx, info := range sinfo.toVisit { - if *info.getNodeID() == sinfo.visited || !dht_ordered(&sinfo.dest, info.getNodeID(), &sinfo.visited) { + if !dht_ordered(&sinfo.dest, info.getNodeID(), &sinfo.visited) { sinfo.toVisit = sinfo.toVisit[:idx] break } From cd9613fddc141a0d31df4975d0fb097418a15d3a Mon Sep 17 00:00:00 2001 From: Arceliar Date: Fri, 7 Feb 2020 22:34:54 -0600 Subject: [PATCH 2/6] add some additional debug timing info and logging to dials, and fix an unnecessary delay in search startup --- src/yggdrasil/dialer.go | 6 ++++++ src/yggdrasil/search.go | 16 +++++++++------- src/yggdrasil/session.go | 2 -- 3 files changed, 15 insertions(+), 9 deletions(-) diff --git a/src/yggdrasil/dialer.go b/src/yggdrasil/dialer.go index e9da97a..490502b 100644 --- a/src/yggdrasil/dialer.go +++ b/src/yggdrasil/dialer.go @@ -65,12 +65,15 @@ func (d *Dialer) DialContext(ctx context.Context, network, address string) (net. // DialByNodeIDandMask opens a session to the given node based on raw // NodeID parameters. If ctx is nil or has no timeout, then a default timeout of 6 seconds will apply, beginning *after* the search finishes. func (d *Dialer) DialByNodeIDandMask(ctx context.Context, nodeID, nodeMask *crypto.NodeID) (net.Conn, error) { + startDial := time.Now() conn := newConn(d.core, nodeID, nodeMask, nil) if err := conn.search(); err != nil { // TODO: make searches take a context, so they can be cancelled early conn.Close() return nil, err } + endSearch := time.Now() + d.core.log.Debugln("Dial searched for:", nodeID, "in time:", endSearch.Sub(startDial)) conn.session.setConn(nil, conn) var cancel context.CancelFunc if ctx == nil { @@ -80,6 +83,9 @@ func (d *Dialer) DialByNodeIDandMask(ctx context.Context, nodeID, nodeMask *cryp defer cancel() select { case <-conn.session.init: + endInit := time.Now() + d.core.log.Debugln("Dial initialized session for:", nodeID, "in time:", endInit.Sub(endSearch)) + d.core.log.Debugln("Finished dial for:", nodeID, "in time:", endInit.Sub(startDial)) return conn, nil case <-ctx.Done(): conn.Close() diff --git a/src/yggdrasil/search.go b/src/yggdrasil/search.go index 7df3bdd..5e1967c 100644 --- a/src/yggdrasil/search.go +++ b/src/yggdrasil/search.go @@ -22,9 +22,6 @@ import ( "github.com/yggdrasil-network/yggdrasil-go/src/crypto" ) -// This defines the maximum number of dhtInfo that we keep track of for nodes to query in an ongoing search. -const search_MAX_SEARCH_SIZE = 16 - // This defines the time after which we time out a search (so it can restart). const search_RETRY_TIME = 3 * time.Second const search_STEP_TIME = 100 * time.Millisecond @@ -79,7 +76,7 @@ func (s *searches) createSearch(dest *crypto.NodeID, mask *crypto.NodeID, callba // If there is, it adds the response info to the search and triggers a new search step. // If there's no ongoing search, or we if the dhtRes finished the search (it was from the target node), then don't do anything more. func (sinfo *searchInfo) handleDHTRes(res *dhtRes) { - old := sinfo.visited + var doStep bool if res != nil { sinfo.recv++ if sinfo.checkDHTRes(res) { @@ -87,8 +84,13 @@ func (sinfo *searchInfo) handleDHTRes(res *dhtRes) { } // Add results to the search sinfo.addToSearch(res) + // FIXME check this elsewhere so we don't need to create a from struct + from := dhtInfo{key: res.Key, coords: res.Coords} + doStep = sinfo.visited == *from.getNodeID() + } else { + doStep = true } - if res == nil || sinfo.visited != old { + if doStep { // Continue the search sinfo.doSearchStep() } @@ -186,7 +188,7 @@ func (sinfo *searchInfo) checkDHTRes(res *dhtRes) bool { from := dhtInfo{key: res.Key, coords: res.Coords} if *from.getNodeID() != sinfo.visited && dht_ordered(&sinfo.dest, from.getNodeID(), &sinfo.visited) { // Closer to the destination, so update visited - sinfo.searches.router.core.log.Debugln("Updating search:", sinfo.dest, *from.getNodeID(), sinfo.send, sinfo.recv) + sinfo.searches.router.core.log.Debugln("Updating search:", &sinfo.dest, from.getNodeID(), sinfo.send, sinfo.recv) sinfo.visited = *from.getNodeID() sinfo.time = time.Now() } @@ -213,7 +215,7 @@ func (sinfo *searchInfo) checkDHTRes(res *dhtRes) bool { } // Cleanup if _, isIn := sinfo.searches.searches[sinfo.dest]; isIn { - sinfo.searches.router.core.log.Debugln("Finished search:", sinfo.dest, sinfo.send, sinfo.recv) + sinfo.searches.router.core.log.Debugln("Finished search:", &sinfo.dest, sinfo.send, sinfo.recv) delete(sinfo.searches.searches, res.Dest) } } diff --git a/src/yggdrasil/session.go b/src/yggdrasil/session.go index 91c530d..eaa67fd 100644 --- a/src/yggdrasil/session.go +++ b/src/yggdrasil/session.go @@ -43,7 +43,6 @@ type sessionInfo struct { time time.Time // Time we last received a packet mtuTime time.Time // time myMTU was last changed pingTime time.Time // time the first ping was sent since the last received packet - pingSend time.Time // time the last ping was sent coords []byte // coords of destination reset bool // reset if coords change tstamp int64 // ATOMIC - tstamp from their last session ping, replay attack mitigation @@ -197,7 +196,6 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo { sinfo.time = now sinfo.mtuTime = now sinfo.pingTime = now - sinfo.pingSend = now sinfo.init = make(chan struct{}) sinfo.cancel = util.NewCancellation() higher := false From d7d0c2629c1e9f800d5e1e587ec73d118610a0fb Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 8 Feb 2020 17:04:00 -0600 Subject: [PATCH 3/6] don't deduplicate search responses, but limit the max number of nodes handled per response --- src/yggdrasil/search.go | 29 +++++++++++++---------------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/src/yggdrasil/search.go b/src/yggdrasil/search.go index 5e1967c..8d534d3 100644 --- a/src/yggdrasil/search.go +++ b/src/yggdrasil/search.go @@ -25,6 +25,7 @@ import ( // This defines the time after which we time out a search (so it can restart). const search_RETRY_TIME = 3 * time.Second const search_STEP_TIME = 100 * time.Millisecond +const search_MAX_RES_SIZE = 16 // Information about an ongoing search. // Includes the target NodeID, the bitmask to match it to an IP, and the list of nodes to visit / already visited. @@ -99,26 +100,22 @@ func (sinfo *searchInfo) handleDHTRes(res *dhtRes) { // Adds the information from a dhtRes to an ongoing search. // Info about a node that has already been visited is not re-added to the search. func (sinfo *searchInfo) addToSearch(res *dhtRes) { - // Get a (deduplicated) list of known nodes to check - temp := make(map[crypto.NodeID]*dhtInfo, len(sinfo.toVisit)+len(res.Infos)) - for _, info := range sinfo.toVisit { - temp[*info.getNodeID()] = info + // Used in sortng below + sortFunc := func(i, j int) bool { + // Should return true if i is closer to the destination than j + return dht_ordered(&sinfo.dest, sinfo.toVisit[i].getNodeID(), sinfo.toVisit[j].getNodeID()) } - // Add new results to the list + // Limit maximum number of results (mitigates DoS where nodes return a large number of bad results) + if len(res.Infos) > search_MAX_RES_SIZE { + sort.SliceStable(res.Infos, sortFunc) + res.Infos = res.Infos[:search_MAX_RES_SIZE] + } + // Append (without deduplication) to list of nodes to try for _, info := range res.Infos { - temp[*info.getNodeID()] = info - } - // Move list to toVisit - delete(temp, sinfo.visited) - sinfo.toVisit = sinfo.toVisit[:0] - for _, info := range temp { sinfo.toVisit = append(sinfo.toVisit, info) } // Sort - sort.SliceStable(sinfo.toVisit, func(i, j int) bool { - // Should return true if i is closer to the destination than j - return dht_ordered(&sinfo.dest, sinfo.toVisit[i].getNodeID(), sinfo.toVisit[j].getNodeID()) - }) + sort.SliceStable(sinfo.toVisit, sortFunc) // Remove anything too far away to be useful for idx, info := range sinfo.toVisit { if !dht_ordered(&sinfo.dest, info.getNodeID(), &sinfo.visited) { @@ -147,6 +144,7 @@ func (sinfo *searchInfo) doSearchStep() { sinfo.searches.router.dht.addCallback(&rq, sinfo.handleDHTRes) sinfo.searches.router.dht.ping(next, &sinfo.dest) sinfo.send++ + sinfo.time = time.Now() } } @@ -190,7 +188,6 @@ func (sinfo *searchInfo) checkDHTRes(res *dhtRes) bool { // Closer to the destination, so update visited sinfo.searches.router.core.log.Debugln("Updating search:", &sinfo.dest, from.getNodeID(), sinfo.send, sinfo.recv) sinfo.visited = *from.getNodeID() - sinfo.time = time.Now() } them := from.getNodeID() var destMasked crypto.NodeID From d0e6846173ad255460e8521a8dc97beeb7d13e0b Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 8 Feb 2020 20:15:48 -0600 Subject: [PATCH 4/6] work in progress to make searches use parallel threads per response, so one malicious node doesn't block progress from honest ones --- src/yggdrasil/conn.go | 9 +-- src/yggdrasil/search.go | 136 ++++++++++++++++++++++------------------ 2 files changed, 80 insertions(+), 65 deletions(-) diff --git a/src/yggdrasil/conn.go b/src/yggdrasil/conn.go index 25605cd..7091cf7 100644 --- a/src/yggdrasil/conn.go +++ b/src/yggdrasil/conn.go @@ -130,8 +130,8 @@ func (c *Conn) search() error { close(done) } } - sinfo := c.core.router.searches.newIterSearch(c.nodeID, c.nodeMask, searchCompleted) - sinfo.continueSearch() + sinfo, infos := c.core.router.searches.newIterSearch(c.nodeID, c.nodeMask, searchCompleted) + sinfo.continueSearch(infos) } else { err = errors.New("search already exists") close(done) @@ -152,10 +152,11 @@ func (c *Conn) doSearch() { if !isIn { // Nothing was found, so create a new search searchCompleted := func(sinfo *sessionInfo, e error) {} - sinfo = c.core.router.searches.newIterSearch(c.nodeID, c.nodeMask, searchCompleted) + var infos []*dhtInfo + sinfo, infos = c.core.router.searches.newIterSearch(c.nodeID, c.nodeMask, searchCompleted) c.core.log.Debugf("%s DHT search started: %p", c.String(), sinfo) // Start the search - sinfo.continueSearch() + sinfo.continueSearch(infos) } } c.core.router.Act(c.session, routerWork) diff --git a/src/yggdrasil/search.go b/src/yggdrasil/search.go index 8d534d3..ce78fd1 100644 --- a/src/yggdrasil/search.go +++ b/src/yggdrasil/search.go @@ -25,7 +25,6 @@ import ( // This defines the time after which we time out a search (so it can restart). const search_RETRY_TIME = 3 * time.Second const search_STEP_TIME = 100 * time.Millisecond -const search_MAX_RES_SIZE = 16 // Information about an ongoing search. // Includes the target NodeID, the bitmask to match it to an IP, and the list of nodes to visit / already visited. @@ -34,7 +33,6 @@ type searchInfo struct { dest crypto.NodeID mask crypto.NodeID time time.Time - toVisit []*dhtInfo visited crypto.NodeID // Closest address visited so far callback func(*sessionInfo, error) // TODO context.Context for timeout and cancellation @@ -77,81 +75,62 @@ func (s *searches) createSearch(dest *crypto.NodeID, mask *crypto.NodeID, callba // If there is, it adds the response info to the search and triggers a new search step. // If there's no ongoing search, or we if the dhtRes finished the search (it was from the target node), then don't do anything more. func (sinfo *searchInfo) handleDHTRes(res *dhtRes) { - var doStep bool if res != nil { sinfo.recv++ if sinfo.checkDHTRes(res) { return // Search finished successfully } - // Add results to the search - sinfo.addToSearch(res) - // FIXME check this elsewhere so we don't need to create a from struct - from := dhtInfo{key: res.Key, coords: res.Coords} - doStep = sinfo.visited == *from.getNodeID() - } else { - doStep = true - } - if doStep { - // Continue the search - sinfo.doSearchStep() + // Use results to start an additional search thread + infos := append([]*dhtInfo(nil), res.Infos...) + infos = sinfo.getAllowedInfos(infos) + sinfo.continueSearch(infos) } } -// Adds the information from a dhtRes to an ongoing search. -// Info about a node that has already been visited is not re-added to the search. -func (sinfo *searchInfo) addToSearch(res *dhtRes) { - // Used in sortng below - sortFunc := func(i, j int) bool { - // Should return true if i is closer to the destination than j - return dht_ordered(&sinfo.dest, sinfo.toVisit[i].getNodeID(), sinfo.toVisit[j].getNodeID()) - } - // Limit maximum number of results (mitigates DoS where nodes return a large number of bad results) - if len(res.Infos) > search_MAX_RES_SIZE { - sort.SliceStable(res.Infos, sortFunc) - res.Infos = res.Infos[:search_MAX_RES_SIZE] - } - // Append (without deduplication) to list of nodes to try - for _, info := range res.Infos { - sinfo.toVisit = append(sinfo.toVisit, info) - } - // Sort - sort.SliceStable(sinfo.toVisit, sortFunc) - // Remove anything too far away to be useful - for idx, info := range sinfo.toVisit { - if !dht_ordered(&sinfo.dest, info.getNodeID(), &sinfo.visited) { - sinfo.toVisit = sinfo.toVisit[:idx] - break - } - } -} - -// If there are no nodes left toVisit, then this cleans up the search. +// If there has been no response in too long, then this cleans up the search. // Otherwise, it pops the closest node to the destination (in keyspace) off of the toVisit list and sends a dht ping. -func (sinfo *searchInfo) doSearchStep() { - if len(sinfo.toVisit) == 0 { - if time.Since(sinfo.time) > search_RETRY_TIME { - // Dead end and no response in too long, do cleanup - delete(sinfo.searches.searches, sinfo.dest) - sinfo.callback(nil, errors.New("search reached dead end")) - } +func (sinfo *searchInfo) doSearchStep(infos []*dhtInfo) { + if time.Since(sinfo.time) > search_RETRY_TIME { + // Dead end and no response in too long, do cleanup + // FIXME we should really let all the parallel search threads exist when info is empty, and then delete when no threads are left, instead of keeping them all around until things time out or exit successfully + delete(sinfo.searches.searches, sinfo.dest) + sinfo.callback(nil, errors.New("search reached dead end")) return } - // Send to the next search target - if len(sinfo.toVisit) > 0 { - next := sinfo.toVisit[0] - sinfo.toVisit = sinfo.toVisit[1:] + if len(infos) > 0 { + // Send to the next search target + next := infos[0] rq := dhtReqKey{next.key, sinfo.dest} sinfo.searches.router.dht.addCallback(&rq, sinfo.handleDHTRes) sinfo.searches.router.dht.ping(next, &sinfo.dest) sinfo.send++ - sinfo.time = time.Now() } } +// Get a list of search targets that are close enough to the destination to try +// Requires an initial list as input +func (sinfo *searchInfo) getAllowedInfos(infos []*dhtInfo) []*dhtInfo { + sort.SliceStable(infos, func(i, j int) bool { + // Should return true if i is closer to the destination than j + return dht_ordered(&sinfo.dest, infos[i].getNodeID(), infos[j].getNodeID()) + }) + // Remove anything too far away to be useful + for idx, info := range infos { + if !dht_ordered(&sinfo.dest, info.getNodeID(), &sinfo.visited) { + infos = infos[:idx] + break + } + } + return infos +} + // If we've recently sent a ping for this search, do nothing. // Otherwise, doSearchStep and schedule another continueSearch to happen after search_RETRY_TIME. -func (sinfo *searchInfo) continueSearch() { - sinfo.doSearchStep() +func (sinfo *searchInfo) continueSearch(infos []*dhtInfo) { + sinfo.doSearchStep(infos) + if len(infos) > 0 { + infos = infos[1:] // Remove the node we just tried + } // In case the search dies, try to spawn another thread later // Note that this will spawn multiple parallel searches as time passes // Any that die aren't restarted, but a new one will start later @@ -162,21 +141,55 @@ func (sinfo *searchInfo) continueSearch() { if newSearchInfo != sinfo { return } - sinfo.continueSearch() + // Get good infos here instead of at the top, to make sure we can always start things off with a continueSearch call to ourself + infos = sinfo.getAllowedInfos(infos) + sinfo.continueSearch(infos) }) }) } +// Initially start a search +func (sinfo *searchInfo) startSearch() { + loc := sinfo.searches.router.core.switchTable.getLocator() + var infos []*dhtInfo + infos = append(infos, &dhtInfo{ + key: sinfo.searches.router.core.boxPub, + coords: loc.getCoords(), + }) + // Start the search by asking ourself, useful if we're the destination + sinfo.continueSearch(infos) + // Start a timer to clean up the search if everything times out + var cleanupFunc func() + cleanupFunc = func() { + sinfo.searches.router.Act(nil, func() { + // FIXME this keeps the search alive forever if not for the searches map, fix that + newSearchInfo := sinfo.searches.searches[sinfo.dest] + if newSearchInfo != sinfo { + return + } + elapsed := time.Since(sinfo.time) + if elapsed > search_RETRY_TIME { + // cleanup + delete(sinfo.searches.searches, sinfo.dest) + sinfo.callback(nil, errors.New("search reached dead end")) + return + } + time.AfterFunc(search_RETRY_TIME - elapsed, cleanupFunc) + }) + } +} + // Calls create search, and initializes the iterative search parts of the struct before returning it. -func (s *searches) newIterSearch(dest *crypto.NodeID, mask *crypto.NodeID, callback func(*sessionInfo, error)) *searchInfo { +func (s *searches) newIterSearch(dest *crypto.NodeID, mask *crypto.NodeID, callback func(*sessionInfo, error)) (*searchInfo, []*dhtInfo) { sinfo := s.createSearch(dest, mask, callback) sinfo.visited = s.router.dht.nodeID loc := s.router.core.switchTable.getLocator() - sinfo.toVisit = append(sinfo.toVisit, &dhtInfo{ + var infos []*dhtInfo + infos = append(infos, &dhtInfo{ key: s.router.core.boxPub, coords: loc.getCoords(), }) // Start the search by asking ourself, useful if we're the destination - return sinfo + return sinfo, infos } // Checks if a dhtRes is good (called by handleDHTRes). @@ -188,6 +201,7 @@ func (sinfo *searchInfo) checkDHTRes(res *dhtRes) bool { // Closer to the destination, so update visited sinfo.searches.router.core.log.Debugln("Updating search:", &sinfo.dest, from.getNodeID(), sinfo.send, sinfo.recv) sinfo.visited = *from.getNodeID() + sinfo.time = time.Now() } them := from.getNodeID() var destMasked crypto.NodeID From 8e05c6c6a740c45a0a4c55dd7f815745dd5b47ba Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 8 Feb 2020 20:26:37 -0600 Subject: [PATCH 5/6] better search cleanup, but needs more testing to make sure it really works --- src/yggdrasil/conn.go | 9 +++--- src/yggdrasil/search.go | 67 +++++++++++++++++------------------------ 2 files changed, 31 insertions(+), 45 deletions(-) diff --git a/src/yggdrasil/conn.go b/src/yggdrasil/conn.go index 7091cf7..f622903 100644 --- a/src/yggdrasil/conn.go +++ b/src/yggdrasil/conn.go @@ -130,8 +130,8 @@ func (c *Conn) search() error { close(done) } } - sinfo, infos := c.core.router.searches.newIterSearch(c.nodeID, c.nodeMask, searchCompleted) - sinfo.continueSearch(infos) + sinfo := c.core.router.searches.newIterSearch(c.nodeID, c.nodeMask, searchCompleted) + sinfo.startSearch() } else { err = errors.New("search already exists") close(done) @@ -152,11 +152,10 @@ func (c *Conn) doSearch() { if !isIn { // Nothing was found, so create a new search searchCompleted := func(sinfo *sessionInfo, e error) {} - var infos []*dhtInfo - sinfo, infos = c.core.router.searches.newIterSearch(c.nodeID, c.nodeMask, searchCompleted) + sinfo = c.core.router.searches.newIterSearch(c.nodeID, c.nodeMask, searchCompleted) c.core.log.Debugf("%s DHT search started: %p", c.String(), sinfo) // Start the search - sinfo.continueSearch(infos) + sinfo.startSearch() } } c.core.router.Act(c.session, routerWork) diff --git a/src/yggdrasil/search.go b/src/yggdrasil/search.go index ce78fd1..bf8c781 100644 --- a/src/yggdrasil/search.go +++ b/src/yggdrasil/search.go @@ -83,20 +83,15 @@ func (sinfo *searchInfo) handleDHTRes(res *dhtRes) { // Use results to start an additional search thread infos := append([]*dhtInfo(nil), res.Infos...) infos = sinfo.getAllowedInfos(infos) - sinfo.continueSearch(infos) + if len(infos) > 0 { + sinfo.continueSearch(infos) + } } } // If there has been no response in too long, then this cleans up the search. // Otherwise, it pops the closest node to the destination (in keyspace) off of the toVisit list and sends a dht ping. func (sinfo *searchInfo) doSearchStep(infos []*dhtInfo) { - if time.Since(sinfo.time) > search_RETRY_TIME { - // Dead end and no response in too long, do cleanup - // FIXME we should really let all the parallel search threads exist when info is empty, and then delete when no threads are left, instead of keeping them all around until things time out or exit successfully - delete(sinfo.searches.searches, sinfo.dest) - sinfo.callback(nil, errors.New("search reached dead end")) - return - } if len(infos) > 0 { // Send to the next search target next := infos[0] @@ -124,16 +119,12 @@ func (sinfo *searchInfo) getAllowedInfos(infos []*dhtInfo) []*dhtInfo { return infos } -// If we've recently sent a ping for this search, do nothing. -// Otherwise, doSearchStep and schedule another continueSearch to happen after search_RETRY_TIME. +// Run doSearchStep and schedule another continueSearch to happen after search_RETRY_TIME. +// Must not be called with an empty list of infos func (sinfo *searchInfo) continueSearch(infos []*dhtInfo) { sinfo.doSearchStep(infos) - if len(infos) > 0 { - infos = infos[1:] // Remove the node we just tried - } - // In case the search dies, try to spawn another thread later - // Note that this will spawn multiple parallel searches as time passes - // Any that die aren't restarted, but a new one will start later + infos = infos[1:] // Remove the node we just tried + // In case there's no response, try the next node in infos later time.AfterFunc(search_STEP_TIME, func() { sinfo.searches.router.Act(nil, func() { // FIXME this keeps the search alive forever if not for the searches map, fix that @@ -143,7 +134,9 @@ func (sinfo *searchInfo) continueSearch(infos []*dhtInfo) { } // Get good infos here instead of at the top, to make sure we can always start things off with a continueSearch call to ourself infos = sinfo.getAllowedInfos(infos) - sinfo.continueSearch(infos) + if len(infos) > 0 { + sinfo.continueSearch(infos) + } }) }) } @@ -156,40 +149,34 @@ func (sinfo *searchInfo) startSearch() { key: sinfo.searches.router.core.boxPub, coords: loc.getCoords(), }) - // Start the search by asking ourself, useful if we're the destination - sinfo.continueSearch(infos) - // Start a timer to clean up the search if everything times out - var cleanupFunc func() - cleanupFunc = func() { + // Start the search by asking ourself, useful if we're the destination + sinfo.continueSearch(infos) + // Start a timer to clean up the search if everything times out + var cleanupFunc func() + cleanupFunc = func() { sinfo.searches.router.Act(nil, func() { // FIXME this keeps the search alive forever if not for the searches map, fix that newSearchInfo := sinfo.searches.searches[sinfo.dest] if newSearchInfo != sinfo { return } - elapsed := time.Since(sinfo.time) - if elapsed > search_RETRY_TIME { - // cleanup - delete(sinfo.searches.searches, sinfo.dest) - sinfo.callback(nil, errors.New("search reached dead end")) - return - } - time.AfterFunc(search_RETRY_TIME - elapsed, cleanupFunc) - }) - } + elapsed := time.Since(sinfo.time) + if elapsed > search_RETRY_TIME { + // cleanup + delete(sinfo.searches.searches, sinfo.dest) + sinfo.callback(nil, errors.New("search reached dead end")) + return + } + time.AfterFunc(search_RETRY_TIME-elapsed, cleanupFunc) + }) + } } // Calls create search, and initializes the iterative search parts of the struct before returning it. -func (s *searches) newIterSearch(dest *crypto.NodeID, mask *crypto.NodeID, callback func(*sessionInfo, error)) (*searchInfo, []*dhtInfo) { +func (s *searches) newIterSearch(dest *crypto.NodeID, mask *crypto.NodeID, callback func(*sessionInfo, error)) *searchInfo { sinfo := s.createSearch(dest, mask, callback) sinfo.visited = s.router.dht.nodeID - loc := s.router.core.switchTable.getLocator() - var infos []*dhtInfo - infos = append(infos, &dhtInfo{ - key: s.router.core.boxPub, - coords: loc.getCoords(), - }) // Start the search by asking ourself, useful if we're the destination - return sinfo, infos + return sinfo } // Checks if a dhtRes is good (called by handleDHTRes). From 657777881be5d8db66b8db61acd0d256ef1c312a Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 8 Feb 2020 20:33:35 -0600 Subject: [PATCH 6/6] actually schedule the search cleanup code to run --- src/yggdrasil/search.go | 1 + 1 file changed, 1 insertion(+) diff --git a/src/yggdrasil/search.go b/src/yggdrasil/search.go index bf8c781..584a056 100644 --- a/src/yggdrasil/search.go +++ b/src/yggdrasil/search.go @@ -170,6 +170,7 @@ func (sinfo *searchInfo) startSearch() { time.AfterFunc(search_RETRY_TIME-elapsed, cleanupFunc) }) } + time.AfterFunc(search_RETRY_TIME, cleanupFunc) } // Calls create search, and initializes the iterative search parts of the struct before returning it.