diff --git a/src/yggdrasil/dht.go b/src/yggdrasil/dht.go index 013fd1e..8efc549 100644 --- a/src/yggdrasil/dht.go +++ b/src/yggdrasil/dht.go @@ -260,9 +260,7 @@ func (t *dht) handleRes(res *dhtRes) { key: res.Key, coords: res.Coords, } - if t.isImportant(&rinfo) { - t.insert(&rinfo) - } + t.insert(&rinfo) for _, info := range res.Infos { if *info.getNodeID() == t.nodeID { continue diff --git a/src/yggdrasil/search.go b/src/yggdrasil/search.go index 584a056..91f0490 100644 --- a/src/yggdrasil/search.go +++ b/src/yggdrasil/search.go @@ -4,15 +4,13 @@ package yggdrasil // The basic idea is as follows: // We may know a NodeID (with a mask) and want to connect -// We begin a search by initializing a list of all nodes in our DHT, sorted by closest to the destination -// We then iteratively ping nodes from the search, marking each pinged node as visited -// We add any unvisited nodes from ping responses to the search, truncating to some maximum search size -// This stops when we either run out of nodes to ping (we hit a dead end where we can't make progress without going back), or we reach the destination -// A new search packet is sent immediately after receiving a response -// A new search packet is sent periodically, once per second, in case a packet was dropped (this slowly causes the search to become parallel if the search doesn't timeout but also doesn't finish within 1 second for whatever reason) - -// TODO? -// Some kind of max search steps, in case the node is offline, so we don't crawl through too much of the network looking for a destination that isn't there? +// We begin a search by sending a dht lookup to ourself +// Each time a node responds, we sort the results and filter to only include useful nodes +// We then periodically send a packet to the first node from the list (after re-filtering) +// This happens in parallel for each node that replies +// Meanwhile, we keep a list of the (up to) 16 closest nodes to the destination that we've visited +// We only consider an unvisited node useful if either the list isn't full or the unvisited node is closer to the destination than the furthest node on the list +// That gives the search some chance to recover if it hits a dead end where a node doesn't know everyone it should import ( "errors" @@ -24,7 +22,8 @@ import ( // This defines the time after which we time out a search (so it can restart). const search_RETRY_TIME = 3 * time.Second -const search_STEP_TIME = 100 * time.Millisecond +const search_STEP_TIME = time.Second +const search_MAX_RESULTS = dht_lookup_size // Information about an ongoing search. // Includes the target NodeID, the bitmask to match it to an IP, and the list of nodes to visit / already visited. @@ -33,7 +32,7 @@ type searchInfo struct { dest crypto.NodeID mask crypto.NodeID time time.Time - visited crypto.NodeID // Closest address visited so far + visited []*crypto.NodeID // Closest addresses visited so far callback func(*sessionInfo, error) // TODO context.Context for timeout and cancellation send uint64 // log number of requests sent @@ -75,6 +74,9 @@ func (s *searches) createSearch(dest *crypto.NodeID, mask *crypto.NodeID, callba // If there is, it adds the response info to the search and triggers a new search step. // If there's no ongoing search, or we if the dhtRes finished the search (it was from the target node), then don't do anything more. func (sinfo *searchInfo) handleDHTRes(res *dhtRes) { + if nfo := sinfo.searches.searches[sinfo.dest]; nfo != sinfo { + return // already done + } if res != nil { sinfo.recv++ if sinfo.checkDHTRes(res) { @@ -105,16 +107,32 @@ func (sinfo *searchInfo) doSearchStep(infos []*dhtInfo) { // Get a list of search targets that are close enough to the destination to try // Requires an initial list as input func (sinfo *searchInfo) getAllowedInfos(infos []*dhtInfo) []*dhtInfo { + var temp []*dhtInfo + for _, info := range infos { + if false && len(sinfo.visited) < search_MAX_RESULTS { + // We're not full on results yet, so don't block anything yet + } else if !dht_ordered(&sinfo.dest, info.getNodeID(), sinfo.visited[len(sinfo.visited)-1]) { + // Too far away + continue + } + var known bool + for _, nfo := range sinfo.visited { + if *nfo == *info.getNodeID() { + known = true + break + } + } + if !known { + temp = append(temp, info) + } + } + infos = append(infos[:0], temp...) // restrict to only the allowed infos sort.SliceStable(infos, func(i, j int) bool { // Should return true if i is closer to the destination than j return dht_ordered(&sinfo.dest, infos[i].getNodeID(), infos[j].getNodeID()) - }) - // Remove anything too far away to be useful - for idx, info := range infos { - if !dht_ordered(&sinfo.dest, info.getNodeID(), &sinfo.visited) { - infos = infos[:idx] - break - } + }) // Sort infos to start with the closest + if len(infos) > search_MAX_RESULTS { + infos = infos[:search_MAX_RESULTS] // Limit max number of infos } return infos } @@ -164,6 +182,7 @@ func (sinfo *searchInfo) startSearch() { if elapsed > search_RETRY_TIME { // cleanup delete(sinfo.searches.searches, sinfo.dest) + sinfo.searches.router.core.log.Debugln("search timeout:", &sinfo.dest, sinfo.send, sinfo.recv) sinfo.callback(nil, errors.New("search reached dead end")) return } @@ -176,7 +195,7 @@ func (sinfo *searchInfo) startSearch() { // Calls create search, and initializes the iterative search parts of the struct before returning it. func (s *searches) newIterSearch(dest *crypto.NodeID, mask *crypto.NodeID, callback func(*sessionInfo, error)) *searchInfo { sinfo := s.createSearch(dest, mask, callback) - sinfo.visited = s.router.dht.nodeID + sinfo.visited = append(sinfo.visited, &s.router.dht.nodeID) return sinfo } @@ -185,13 +204,29 @@ func (s *searches) newIterSearch(dest *crypto.NodeID, mask *crypto.NodeID, callb // Otherwise return false. func (sinfo *searchInfo) checkDHTRes(res *dhtRes) bool { from := dhtInfo{key: res.Key, coords: res.Coords} - if *from.getNodeID() != sinfo.visited && dht_ordered(&sinfo.dest, from.getNodeID(), &sinfo.visited) { - // Closer to the destination, so update visited - sinfo.searches.router.core.log.Debugln("Updating search:", &sinfo.dest, from.getNodeID(), sinfo.send, sinfo.recv) - sinfo.visited = *from.getNodeID() - sinfo.time = time.Now() - } them := from.getNodeID() + var known bool + for _, v := range sinfo.visited { + if *v == *them { + known = true + break + } + } + if !known { + if len(sinfo.visited) < search_MAX_RESULTS || dht_ordered(&sinfo.dest, them, sinfo.visited[len(sinfo.visited)-1]) { + // Closer to the destination than the threshold, so update visited + sinfo.searches.router.core.log.Debugln("Updating search:", &sinfo.dest, them, sinfo.send, sinfo.recv) + sinfo.visited = append(sinfo.visited, them) + sort.SliceStable(sinfo.visited, func(i, j int) bool { + // Should return true if i is closer to the destination than j + return dht_ordered(&sinfo.dest, sinfo.visited[i], sinfo.visited[j]) + }) // Sort infos to start with the closest + if len(sinfo.visited) > search_MAX_RESULTS { + sinfo.visited = sinfo.visited[:search_MAX_RESULTS] + } + sinfo.time = time.Now() + } + } var destMasked crypto.NodeID var themMasked crypto.NodeID for idx := 0; idx < crypto.NodeIDLen; idx++ {