5
0
mirror of https://github.com/cwinfo/yggdrasil-go.git synced 2024-11-22 21:10:29 +00:00

better search cleanup, but needs more testing to make sure it really works

This commit is contained in:
Arceliar 2020-02-08 20:26:37 -06:00
parent d0e6846173
commit 8e05c6c6a7
2 changed files with 31 additions and 45 deletions

View File

@ -130,8 +130,8 @@ func (c *Conn) search() error {
close(done) close(done)
} }
} }
sinfo, infos := c.core.router.searches.newIterSearch(c.nodeID, c.nodeMask, searchCompleted) sinfo := c.core.router.searches.newIterSearch(c.nodeID, c.nodeMask, searchCompleted)
sinfo.continueSearch(infos) sinfo.startSearch()
} else { } else {
err = errors.New("search already exists") err = errors.New("search already exists")
close(done) close(done)
@ -152,11 +152,10 @@ func (c *Conn) doSearch() {
if !isIn { if !isIn {
// Nothing was found, so create a new search // Nothing was found, so create a new search
searchCompleted := func(sinfo *sessionInfo, e error) {} searchCompleted := func(sinfo *sessionInfo, e error) {}
var infos []*dhtInfo sinfo = c.core.router.searches.newIterSearch(c.nodeID, c.nodeMask, searchCompleted)
sinfo, infos = c.core.router.searches.newIterSearch(c.nodeID, c.nodeMask, searchCompleted)
c.core.log.Debugf("%s DHT search started: %p", c.String(), sinfo) c.core.log.Debugf("%s DHT search started: %p", c.String(), sinfo)
// Start the search // Start the search
sinfo.continueSearch(infos) sinfo.startSearch()
} }
} }
c.core.router.Act(c.session, routerWork) c.core.router.Act(c.session, routerWork)

View File

@ -83,20 +83,15 @@ func (sinfo *searchInfo) handleDHTRes(res *dhtRes) {
// Use results to start an additional search thread // Use results to start an additional search thread
infos := append([]*dhtInfo(nil), res.Infos...) infos := append([]*dhtInfo(nil), res.Infos...)
infos = sinfo.getAllowedInfos(infos) infos = sinfo.getAllowedInfos(infos)
sinfo.continueSearch(infos) if len(infos) > 0 {
sinfo.continueSearch(infos)
}
} }
} }
// If there has been no response in too long, then this cleans up the search. // If there has been no response in too long, then this cleans up the search.
// Otherwise, it pops the closest node to the destination (in keyspace) off of the toVisit list and sends a dht ping. // Otherwise, it pops the closest node to the destination (in keyspace) off of the toVisit list and sends a dht ping.
func (sinfo *searchInfo) doSearchStep(infos []*dhtInfo) { func (sinfo *searchInfo) doSearchStep(infos []*dhtInfo) {
if time.Since(sinfo.time) > search_RETRY_TIME {
// Dead end and no response in too long, do cleanup
// FIXME we should really let all the parallel search threads exist when info is empty, and then delete when no threads are left, instead of keeping them all around until things time out or exit successfully
delete(sinfo.searches.searches, sinfo.dest)
sinfo.callback(nil, errors.New("search reached dead end"))
return
}
if len(infos) > 0 { if len(infos) > 0 {
// Send to the next search target // Send to the next search target
next := infos[0] next := infos[0]
@ -124,16 +119,12 @@ func (sinfo *searchInfo) getAllowedInfos(infos []*dhtInfo) []*dhtInfo {
return infos return infos
} }
// If we've recently sent a ping for this search, do nothing. // Run doSearchStep and schedule another continueSearch to happen after search_RETRY_TIME.
// Otherwise, doSearchStep and schedule another continueSearch to happen after search_RETRY_TIME. // Must not be called with an empty list of infos
func (sinfo *searchInfo) continueSearch(infos []*dhtInfo) { func (sinfo *searchInfo) continueSearch(infos []*dhtInfo) {
sinfo.doSearchStep(infos) sinfo.doSearchStep(infos)
if len(infos) > 0 { infos = infos[1:] // Remove the node we just tried
infos = infos[1:] // Remove the node we just tried // In case there's no response, try the next node in infos later
}
// In case the search dies, try to spawn another thread later
// Note that this will spawn multiple parallel searches as time passes
// Any that die aren't restarted, but a new one will start later
time.AfterFunc(search_STEP_TIME, func() { time.AfterFunc(search_STEP_TIME, func() {
sinfo.searches.router.Act(nil, func() { sinfo.searches.router.Act(nil, func() {
// FIXME this keeps the search alive forever if not for the searches map, fix that // FIXME this keeps the search alive forever if not for the searches map, fix that
@ -143,7 +134,9 @@ func (sinfo *searchInfo) continueSearch(infos []*dhtInfo) {
} }
// Get good infos here instead of at the top, to make sure we can always start things off with a continueSearch call to ourself // Get good infos here instead of at the top, to make sure we can always start things off with a continueSearch call to ourself
infos = sinfo.getAllowedInfos(infos) infos = sinfo.getAllowedInfos(infos)
sinfo.continueSearch(infos) if len(infos) > 0 {
sinfo.continueSearch(infos)
}
}) })
}) })
} }
@ -156,40 +149,34 @@ func (sinfo *searchInfo) startSearch() {
key: sinfo.searches.router.core.boxPub, key: sinfo.searches.router.core.boxPub,
coords: loc.getCoords(), coords: loc.getCoords(),
}) })
// Start the search by asking ourself, useful if we're the destination // Start the search by asking ourself, useful if we're the destination
sinfo.continueSearch(infos) sinfo.continueSearch(infos)
// Start a timer to clean up the search if everything times out // Start a timer to clean up the search if everything times out
var cleanupFunc func() var cleanupFunc func()
cleanupFunc = func() { cleanupFunc = func() {
sinfo.searches.router.Act(nil, func() { sinfo.searches.router.Act(nil, func() {
// FIXME this keeps the search alive forever if not for the searches map, fix that // FIXME this keeps the search alive forever if not for the searches map, fix that
newSearchInfo := sinfo.searches.searches[sinfo.dest] newSearchInfo := sinfo.searches.searches[sinfo.dest]
if newSearchInfo != sinfo { if newSearchInfo != sinfo {
return return
} }
elapsed := time.Since(sinfo.time) elapsed := time.Since(sinfo.time)
if elapsed > search_RETRY_TIME { if elapsed > search_RETRY_TIME {
// cleanup // cleanup
delete(sinfo.searches.searches, sinfo.dest) delete(sinfo.searches.searches, sinfo.dest)
sinfo.callback(nil, errors.New("search reached dead end")) sinfo.callback(nil, errors.New("search reached dead end"))
return return
} }
time.AfterFunc(search_RETRY_TIME - elapsed, cleanupFunc) time.AfterFunc(search_RETRY_TIME-elapsed, cleanupFunc)
}) })
} }
} }
// Calls create search, and initializes the iterative search parts of the struct before returning it. // Calls create search, and initializes the iterative search parts of the struct before returning it.
func (s *searches) newIterSearch(dest *crypto.NodeID, mask *crypto.NodeID, callback func(*sessionInfo, error)) (*searchInfo, []*dhtInfo) { func (s *searches) newIterSearch(dest *crypto.NodeID, mask *crypto.NodeID, callback func(*sessionInfo, error)) *searchInfo {
sinfo := s.createSearch(dest, mask, callback) sinfo := s.createSearch(dest, mask, callback)
sinfo.visited = s.router.dht.nodeID sinfo.visited = s.router.dht.nodeID
loc := s.router.core.switchTable.getLocator() return sinfo
var infos []*dhtInfo
infos = append(infos, &dhtInfo{
key: s.router.core.boxPub,
coords: loc.getCoords(),
}) // Start the search by asking ourself, useful if we're the destination
return sinfo, infos
} }
// Checks if a dhtRes is good (called by handleDHTRes). // Checks if a dhtRes is good (called by handleDHTRes).