5
0
mirror of https://github.com/cwinfo/yggdrasil-go.git synced 2024-09-20 04:52:33 +00:00

retry failed iterative searches, possibly becoming parallel if things are just slow, and keep track of / skip nodes that were already visited in the search

This commit is contained in:
Arceliar 2018-06-02 14:57:06 -05:00
parent ed6c9c2a54
commit 09baad48e3

View File

@ -21,12 +21,16 @@ import "time"
//import "fmt" //import "fmt"
const search_MAX_SEARCH_SIZE = 16
const search_RETRY_TIME = time.Second
type searchInfo struct { type searchInfo struct {
dest *NodeID dest *NodeID
mask *NodeID mask *NodeID
time time.Time time time.Time
packet []byte packet []byte
toVisit []*dhtInfo toVisit []*dhtInfo
visited map[NodeID]bool
} }
type searches struct { type searches struct {
@ -73,6 +77,9 @@ func (s *searches) addToSearch(sinfo *searchInfo, res *dhtRes) {
// Add responses to toVisit if closer to dest than the res node // Add responses to toVisit if closer to dest than the res node
from := dhtInfo{key: res.key, coords: res.coords} from := dhtInfo{key: res.key, coords: res.coords}
for _, info := range res.infos { for _, info := range res.infos {
if sinfo.visited[*info.getNodeID()] {
continue
}
if dht_firstCloserThanThird(info.getNodeID(), &res.dest, from.getNodeID()) { if dht_firstCloserThanThird(info.getNodeID(), &res.dest, from.getNodeID()) {
sinfo.toVisit = append(sinfo.toVisit, info) sinfo.toVisit = append(sinfo.toVisit, info)
} }
@ -91,14 +98,14 @@ func (s *searches) addToSearch(sinfo *searchInfo, res *dhtRes) {
return dht_firstCloserThanThird(sinfo.toVisit[i].getNodeID(), &res.dest, sinfo.toVisit[j].getNodeID()) return dht_firstCloserThanThird(sinfo.toVisit[i].getNodeID(), &res.dest, sinfo.toVisit[j].getNodeID())
}) })
// Truncate to some maximum size // Truncate to some maximum size
if len(sinfo.toVisit) > 16 { if len(sinfo.toVisit) > search_MAX_SEARCH_SIZE {
sinfo.toVisit = sinfo.toVisit[:16] sinfo.toVisit = sinfo.toVisit[:search_MAX_SEARCH_SIZE]
} }
} }
func (s *searches) doSearchStep(sinfo *searchInfo) { func (s *searches) doSearchStep(sinfo *searchInfo) {
if len(sinfo.toVisit) == 0 || time.Since(sinfo.time) > 6*time.Second { if len(sinfo.toVisit) == 0 {
// Dead end or timeout, do cleanup // Dead end, do cleanup
delete(s.searches, *sinfo.dest) delete(s.searches, *sinfo.dest)
return return
} else { } else {
@ -106,20 +113,36 @@ func (s *searches) doSearchStep(sinfo *searchInfo) {
var next *dhtInfo var next *dhtInfo
next, sinfo.toVisit = sinfo.toVisit[0], sinfo.toVisit[1:] next, sinfo.toVisit = sinfo.toVisit[0], sinfo.toVisit[1:]
s.core.dht.ping(next, sinfo.dest) s.core.dht.ping(next, sinfo.dest)
sinfo.visited[*next.getNodeID()] = true
} }
} }
func (s *searches) continueSearch(sinfo *searchInfo) { func (s *searches) continueSearch(sinfo *searchInfo) {
if time.Since(sinfo.time) < time.Second { if time.Since(sinfo.time) < search_RETRY_TIME {
return return
} }
sinfo.time = time.Now() sinfo.time = time.Now()
s.doSearchStep(sinfo) s.doSearchStep(sinfo)
// In case the search dies, try to spawn another thread later
// Note that this will spawn multiple parallel searches as time passes
// Any that die aren't restarted, but a new one will start later
retryLater := func() {
newSearchInfo := s.searches[*sinfo.dest]
if newSearchInfo != sinfo {
return
}
s.continueSearch(sinfo)
}
go func() {
time.Sleep(search_RETRY_TIME)
s.core.router.admin <- retryLater
}()
} }
func (s *searches) newIterSearch(dest *NodeID, mask *NodeID) *searchInfo { func (s *searches) newIterSearch(dest *NodeID, mask *NodeID) *searchInfo {
sinfo := s.createSearch(dest, mask) sinfo := s.createSearch(dest, mask)
sinfo.toVisit = s.core.dht.lookup(dest, false) sinfo.toVisit = s.core.dht.lookup(dest, false)
sinfo.visited = make(map[NodeID]bool)
return sinfo return sinfo
} }