mirror of
https://github.com/cwinfo/yggdrasil-go.git
synced 2024-11-10 02:50:27 +00:00
work in progress to make searches use parallel threads per response, so one malicious node doesn't block progress from honest ones
This commit is contained in:
parent
d7d0c2629c
commit
d0e6846173
@ -130,8 +130,8 @@ func (c *Conn) search() error {
|
||||
close(done)
|
||||
}
|
||||
}
|
||||
sinfo := c.core.router.searches.newIterSearch(c.nodeID, c.nodeMask, searchCompleted)
|
||||
sinfo.continueSearch()
|
||||
sinfo, infos := c.core.router.searches.newIterSearch(c.nodeID, c.nodeMask, searchCompleted)
|
||||
sinfo.continueSearch(infos)
|
||||
} else {
|
||||
err = errors.New("search already exists")
|
||||
close(done)
|
||||
@ -152,10 +152,11 @@ func (c *Conn) doSearch() {
|
||||
if !isIn {
|
||||
// Nothing was found, so create a new search
|
||||
searchCompleted := func(sinfo *sessionInfo, e error) {}
|
||||
sinfo = c.core.router.searches.newIterSearch(c.nodeID, c.nodeMask, searchCompleted)
|
||||
var infos []*dhtInfo
|
||||
sinfo, infos = c.core.router.searches.newIterSearch(c.nodeID, c.nodeMask, searchCompleted)
|
||||
c.core.log.Debugf("%s DHT search started: %p", c.String(), sinfo)
|
||||
// Start the search
|
||||
sinfo.continueSearch()
|
||||
sinfo.continueSearch(infos)
|
||||
}
|
||||
}
|
||||
c.core.router.Act(c.session, routerWork)
|
||||
|
@ -25,7 +25,6 @@ import (
|
||||
// This defines the time after which we time out a search (so it can restart).
|
||||
const search_RETRY_TIME = 3 * time.Second
|
||||
const search_STEP_TIME = 100 * time.Millisecond
|
||||
const search_MAX_RES_SIZE = 16
|
||||
|
||||
// Information about an ongoing search.
|
||||
// Includes the target NodeID, the bitmask to match it to an IP, and the list of nodes to visit / already visited.
|
||||
@ -34,7 +33,6 @@ type searchInfo struct {
|
||||
dest crypto.NodeID
|
||||
mask crypto.NodeID
|
||||
time time.Time
|
||||
toVisit []*dhtInfo
|
||||
visited crypto.NodeID // Closest address visited so far
|
||||
callback func(*sessionInfo, error)
|
||||
// TODO context.Context for timeout and cancellation
|
||||
@ -77,81 +75,62 @@ func (s *searches) createSearch(dest *crypto.NodeID, mask *crypto.NodeID, callba
|
||||
// If there is, it adds the response info to the search and triggers a new search step.
|
||||
// If there's no ongoing search, or we if the dhtRes finished the search (it was from the target node), then don't do anything more.
|
||||
func (sinfo *searchInfo) handleDHTRes(res *dhtRes) {
|
||||
var doStep bool
|
||||
if res != nil {
|
||||
sinfo.recv++
|
||||
if sinfo.checkDHTRes(res) {
|
||||
return // Search finished successfully
|
||||
}
|
||||
// Add results to the search
|
||||
sinfo.addToSearch(res)
|
||||
// FIXME check this elsewhere so we don't need to create a from struct
|
||||
from := dhtInfo{key: res.Key, coords: res.Coords}
|
||||
doStep = sinfo.visited == *from.getNodeID()
|
||||
} else {
|
||||
doStep = true
|
||||
}
|
||||
if doStep {
|
||||
// Continue the search
|
||||
sinfo.doSearchStep()
|
||||
// Use results to start an additional search thread
|
||||
infos := append([]*dhtInfo(nil), res.Infos...)
|
||||
infos = sinfo.getAllowedInfos(infos)
|
||||
sinfo.continueSearch(infos)
|
||||
}
|
||||
}
|
||||
|
||||
// Adds the information from a dhtRes to an ongoing search.
|
||||
// Info about a node that has already been visited is not re-added to the search.
|
||||
func (sinfo *searchInfo) addToSearch(res *dhtRes) {
|
||||
// Used in sortng below
|
||||
sortFunc := func(i, j int) bool {
|
||||
// Should return true if i is closer to the destination than j
|
||||
return dht_ordered(&sinfo.dest, sinfo.toVisit[i].getNodeID(), sinfo.toVisit[j].getNodeID())
|
||||
}
|
||||
// Limit maximum number of results (mitigates DoS where nodes return a large number of bad results)
|
||||
if len(res.Infos) > search_MAX_RES_SIZE {
|
||||
sort.SliceStable(res.Infos, sortFunc)
|
||||
res.Infos = res.Infos[:search_MAX_RES_SIZE]
|
||||
}
|
||||
// Append (without deduplication) to list of nodes to try
|
||||
for _, info := range res.Infos {
|
||||
sinfo.toVisit = append(sinfo.toVisit, info)
|
||||
}
|
||||
// Sort
|
||||
sort.SliceStable(sinfo.toVisit, sortFunc)
|
||||
// Remove anything too far away to be useful
|
||||
for idx, info := range sinfo.toVisit {
|
||||
if !dht_ordered(&sinfo.dest, info.getNodeID(), &sinfo.visited) {
|
||||
sinfo.toVisit = sinfo.toVisit[:idx]
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If there are no nodes left toVisit, then this cleans up the search.
|
||||
// If there has been no response in too long, then this cleans up the search.
|
||||
// Otherwise, it pops the closest node to the destination (in keyspace) off of the toVisit list and sends a dht ping.
|
||||
func (sinfo *searchInfo) doSearchStep() {
|
||||
if len(sinfo.toVisit) == 0 {
|
||||
if time.Since(sinfo.time) > search_RETRY_TIME {
|
||||
// Dead end and no response in too long, do cleanup
|
||||
delete(sinfo.searches.searches, sinfo.dest)
|
||||
sinfo.callback(nil, errors.New("search reached dead end"))
|
||||
}
|
||||
func (sinfo *searchInfo) doSearchStep(infos []*dhtInfo) {
|
||||
if time.Since(sinfo.time) > search_RETRY_TIME {
|
||||
// Dead end and no response in too long, do cleanup
|
||||
// FIXME we should really let all the parallel search threads exist when info is empty, and then delete when no threads are left, instead of keeping them all around until things time out or exit successfully
|
||||
delete(sinfo.searches.searches, sinfo.dest)
|
||||
sinfo.callback(nil, errors.New("search reached dead end"))
|
||||
return
|
||||
}
|
||||
// Send to the next search target
|
||||
if len(sinfo.toVisit) > 0 {
|
||||
next := sinfo.toVisit[0]
|
||||
sinfo.toVisit = sinfo.toVisit[1:]
|
||||
if len(infos) > 0 {
|
||||
// Send to the next search target
|
||||
next := infos[0]
|
||||
rq := dhtReqKey{next.key, sinfo.dest}
|
||||
sinfo.searches.router.dht.addCallback(&rq, sinfo.handleDHTRes)
|
||||
sinfo.searches.router.dht.ping(next, &sinfo.dest)
|
||||
sinfo.send++
|
||||
sinfo.time = time.Now()
|
||||
}
|
||||
}
|
||||
|
||||
// Get a list of search targets that are close enough to the destination to try
|
||||
// Requires an initial list as input
|
||||
func (sinfo *searchInfo) getAllowedInfos(infos []*dhtInfo) []*dhtInfo {
|
||||
sort.SliceStable(infos, func(i, j int) bool {
|
||||
// Should return true if i is closer to the destination than j
|
||||
return dht_ordered(&sinfo.dest, infos[i].getNodeID(), infos[j].getNodeID())
|
||||
})
|
||||
// Remove anything too far away to be useful
|
||||
for idx, info := range infos {
|
||||
if !dht_ordered(&sinfo.dest, info.getNodeID(), &sinfo.visited) {
|
||||
infos = infos[:idx]
|
||||
break
|
||||
}
|
||||
}
|
||||
return infos
|
||||
}
|
||||
|
||||
// If we've recently sent a ping for this search, do nothing.
|
||||
// Otherwise, doSearchStep and schedule another continueSearch to happen after search_RETRY_TIME.
|
||||
func (sinfo *searchInfo) continueSearch() {
|
||||
sinfo.doSearchStep()
|
||||
func (sinfo *searchInfo) continueSearch(infos []*dhtInfo) {
|
||||
sinfo.doSearchStep(infos)
|
||||
if len(infos) > 0 {
|
||||
infos = infos[1:] // Remove the node we just tried
|
||||
}
|
||||
// In case the search dies, try to spawn another thread later
|
||||
// Note that this will spawn multiple parallel searches as time passes
|
||||
// Any that die aren't restarted, but a new one will start later
|
||||
@ -162,21 +141,55 @@ func (sinfo *searchInfo) continueSearch() {
|
||||
if newSearchInfo != sinfo {
|
||||
return
|
||||
}
|
||||
sinfo.continueSearch()
|
||||
// Get good infos here instead of at the top, to make sure we can always start things off with a continueSearch call to ourself
|
||||
infos = sinfo.getAllowedInfos(infos)
|
||||
sinfo.continueSearch(infos)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
// Initially start a search
|
||||
func (sinfo *searchInfo) startSearch() {
|
||||
loc := sinfo.searches.router.core.switchTable.getLocator()
|
||||
var infos []*dhtInfo
|
||||
infos = append(infos, &dhtInfo{
|
||||
key: sinfo.searches.router.core.boxPub,
|
||||
coords: loc.getCoords(),
|
||||
})
|
||||
// Start the search by asking ourself, useful if we're the destination
|
||||
sinfo.continueSearch(infos)
|
||||
// Start a timer to clean up the search if everything times out
|
||||
var cleanupFunc func()
|
||||
cleanupFunc = func() {
|
||||
sinfo.searches.router.Act(nil, func() {
|
||||
// FIXME this keeps the search alive forever if not for the searches map, fix that
|
||||
newSearchInfo := sinfo.searches.searches[sinfo.dest]
|
||||
if newSearchInfo != sinfo {
|
||||
return
|
||||
}
|
||||
elapsed := time.Since(sinfo.time)
|
||||
if elapsed > search_RETRY_TIME {
|
||||
// cleanup
|
||||
delete(sinfo.searches.searches, sinfo.dest)
|
||||
sinfo.callback(nil, errors.New("search reached dead end"))
|
||||
return
|
||||
}
|
||||
time.AfterFunc(search_RETRY_TIME - elapsed, cleanupFunc)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Calls create search, and initializes the iterative search parts of the struct before returning it.
|
||||
func (s *searches) newIterSearch(dest *crypto.NodeID, mask *crypto.NodeID, callback func(*sessionInfo, error)) *searchInfo {
|
||||
func (s *searches) newIterSearch(dest *crypto.NodeID, mask *crypto.NodeID, callback func(*sessionInfo, error)) (*searchInfo, []*dhtInfo) {
|
||||
sinfo := s.createSearch(dest, mask, callback)
|
||||
sinfo.visited = s.router.dht.nodeID
|
||||
loc := s.router.core.switchTable.getLocator()
|
||||
sinfo.toVisit = append(sinfo.toVisit, &dhtInfo{
|
||||
var infos []*dhtInfo
|
||||
infos = append(infos, &dhtInfo{
|
||||
key: s.router.core.boxPub,
|
||||
coords: loc.getCoords(),
|
||||
}) // Start the search by asking ourself, useful if we're the destination
|
||||
return sinfo
|
||||
return sinfo, infos
|
||||
}
|
||||
|
||||
// Checks if a dhtRes is good (called by handleDHTRes).
|
||||
@ -188,6 +201,7 @@ func (sinfo *searchInfo) checkDHTRes(res *dhtRes) bool {
|
||||
// Closer to the destination, so update visited
|
||||
sinfo.searches.router.core.log.Debugln("Updating search:", &sinfo.dest, from.getNodeID(), sinfo.send, sinfo.recv)
|
||||
sinfo.visited = *from.getNodeID()
|
||||
sinfo.time = time.Now()
|
||||
}
|
||||
them := from.getNodeID()
|
||||
var destMasked crypto.NodeID
|
||||
|
Loading…
Reference in New Issue
Block a user