5
0
mirror of https://github.com/cwinfo/yggdrasil-go.git synced 2024-11-23 05:21:35 +00:00

cleanup and some bugfixes, cache important dht nodes until something gets added/removed

This commit is contained in:
Arceliar 2018-10-29 22:24:18 -05:00
parent 671c7f2a47
commit a008b42f99
2 changed files with 47 additions and 64 deletions

View File

@ -16,7 +16,6 @@ package yggdrasil
// TODO reoptimize search stuff (size, timeouts, etc) to play nicer with DHT churn // TODO reoptimize search stuff (size, timeouts, etc) to play nicer with DHT churn
import ( import (
"fmt"
"sort" "sort"
"time" "time"
) )
@ -64,9 +63,11 @@ type dhtRes struct {
type dht struct { type dht struct {
core *Core core *Core
nodeID NodeID nodeID NodeID
table map[NodeID]*dhtInfo
peers chan *dhtInfo // other goroutines put incoming dht updates here peers chan *dhtInfo // other goroutines put incoming dht updates here
reqs map[boxPubKey]map[NodeID]time.Time reqs map[boxPubKey]map[NodeID]time.Time
// These next two could be replaced by a single linked list or similar...
table map[NodeID]*dhtInfo
imp []*dhtInfo
} }
// Initializes the DHT // Initializes the DHT
@ -82,6 +83,7 @@ func (t *dht) init(c *Core) {
func (t *dht) reset() { func (t *dht) reset() {
t.reqs = make(map[boxPubKey]map[NodeID]time.Time) t.reqs = make(map[boxPubKey]map[NodeID]time.Time)
t.table = make(map[NodeID]*dhtInfo) t.table = make(map[NodeID]*dhtInfo)
t.imp = nil
} }
// Does a DHT lookup and returns up to dht_lookup_size results // Does a DHT lookup and returns up to dht_lookup_size results
@ -127,6 +129,7 @@ func (t *dht) insert(info *dhtInfo) {
info.throttle = oldInfo.throttle info.throttle = oldInfo.throttle
} }
} }
t.imp = nil // It needs to update to get a pointer to the new info
t.table[*info.getNodeID()] = info t.table[*info.getNodeID()] = info
} }
@ -180,7 +183,7 @@ func (t *dht) handleReq(req *dhtReq) {
coords: req.Coords, coords: req.Coords,
} }
imp := t.getImportant() imp := t.getImportant()
if _, isIn := t.table[*info.getNodeID()]; !isIn || t.isImportant(&info, imp) { if _, isIn := t.table[*info.getNodeID()]; !isIn && t.isImportant(&info, imp) {
t.insert(&info) t.insert(&info)
} }
} }
@ -223,10 +226,6 @@ func (t *dht) handleRes(res *dhtRes) {
if t.isImportant(&rinfo, imp) { if t.isImportant(&rinfo, imp) {
t.insert(&rinfo) t.insert(&rinfo)
} }
//t.insert(&rinfo) // Or at the end, after checking successor/predecessor?
if len(res.Infos) > dht_lookup_size {
//res.Infos = res.Infos[:dht_lookup_size] //FIXME debug
}
for _, info := range res.Infos { for _, info := range res.Infos {
if *info.getNodeID() == t.nodeID { if *info.getNodeID() == t.nodeID {
continue continue
@ -284,21 +283,14 @@ func (t *dht) ping(info *dhtInfo, target *NodeID) {
} }
func (t *dht) doMaintenance() { func (t *dht) doMaintenance() {
toPing := make(map[NodeID]*dhtInfo)
now := time.Now() now := time.Now()
imp := t.getImportant()
good := make(map[NodeID]*dhtInfo)
for _, info := range imp {
good[*info.getNodeID()] = info
}
for infoID, info := range t.table { for infoID, info := range t.table {
if now.Sub(info.recv) > time.Minute || info.pings > 3 { if now.Sub(info.recv) > time.Minute || info.pings > 3 {
delete(t.table, infoID) delete(t.table, infoID)
} else if t.isImportant(info, imp) { t.imp = nil
toPing[infoID] = info
} }
} }
for _, info := range toPing { for _, info := range t.getImportant() {
if now.Sub(info.recv) > info.throttle { if now.Sub(info.recv) > info.throttle {
t.ping(info, info.getNodeID()) t.ping(info, info.getNodeID())
info.pings++ info.pings++
@ -306,56 +298,49 @@ func (t *dht) doMaintenance() {
if info.throttle > 30*time.Second { if info.throttle > 30*time.Second {
info.throttle = 30 * time.Second info.throttle = 30 * time.Second
} }
continue
fmt.Println("DEBUG self:", t.nodeID[:8], "throttle:", info.throttle, "nodeID:", info.getNodeID()[:8], "coords:", info.coords)
} }
} }
return // Skip printing debug info
var out []interface{}
out = append(out, "DEBUG important:")
out = append(out, t.nodeID[:8])
for _, info := range imp {
out = append(out, info.getNodeID()[:8])
}
fmt.Println(out...)
} }
func (t *dht) getImportant() []*dhtInfo { func (t *dht) getImportant() []*dhtInfo {
// Get a list of all known nodes if t.imp == nil {
infos := make([]*dhtInfo, 0, len(t.table)) // Get a list of all known nodes
for _, info := range t.table { infos := make([]*dhtInfo, 0, len(t.table))
infos = append(infos, info) for _, info := range t.table {
} infos = append(infos, info)
// Sort them by increasing order in distance along the ring
sort.SliceStable(infos, func(i, j int) bool {
// Sort in order of predecessors (!), reverse from chord normal, because it plays nicer with zero bits for unknown parts of target addresses
return dht_ordered(infos[j].getNodeID(), infos[i].getNodeID(), &t.nodeID)
})
// Keep the ones that are no further than the closest seen so far
minDist := ^uint64(0)
loc := t.core.switchTable.getLocator()
important := infos[:0]
for _, info := range infos {
dist := uint64(loc.dist(info.coords))
if dist < minDist {
minDist = dist
important = append(important, info)
} }
} // Sort them by increasing order in distance along the ring
var temp []*dhtInfo sort.SliceStable(infos, func(i, j int) bool {
minDist = ^uint64(0) // Sort in order of predecessors (!), reverse from chord normal, because it plays nicer with zero bits for unknown parts of target addresses
for idx := len(infos) - 1; idx >= 0; idx-- { return dht_ordered(infos[j].getNodeID(), infos[i].getNodeID(), &t.nodeID)
info := infos[idx] })
dist := uint64(loc.dist(info.coords)) // Keep the ones that are no further than the closest seen so far
if dist < minDist { minDist := ^uint64(0)
minDist = dist loc := t.core.switchTable.getLocator()
temp = append(temp, info) important := infos[:0]
for _, info := range infos {
dist := uint64(loc.dist(info.coords))
if dist < minDist {
minDist = dist
important = append(important, info)
}
} }
var temp []*dhtInfo
minDist = ^uint64(0)
for idx := len(infos) - 1; idx >= 0; idx-- {
info := infos[idx]
dist := uint64(loc.dist(info.coords))
if dist < minDist {
minDist = dist
temp = append(temp, info)
}
}
for idx := len(temp) - 1; idx >= 0; idx-- {
important = append(important, temp[idx])
}
t.imp = important
} }
for idx := len(temp) - 1; idx >= 0; idx-- { return t.imp
important = append(important, temp[idx])
}
return important
} }
func (t *dht) isImportant(ninfo *dhtInfo, important []*dhtInfo) bool { func (t *dht) isImportant(ninfo *dhtInfo, important []*dhtInfo) bool {

View File

@ -11,8 +11,10 @@ package yggdrasil
// A new search packet is sent immediately after receiving a response // A new search packet is sent immediately after receiving a response
// A new search packet is sent periodically, once per second, in case a packet was dropped (this slowly causes the search to become parallel if the search doesn't timeout but also doesn't finish within 1 second for whatever reason) // A new search packet is sent periodically, once per second, in case a packet was dropped (this slowly causes the search to become parallel if the search doesn't timeout but also doesn't finish within 1 second for whatever reason)
// TODO?
// Some kind of max search steps, in case the node is offline, so we don't crawl through too much of the network looking for a destination that isn't there?
import ( import (
"fmt"
"sort" "sort"
"time" "time"
) )
@ -74,9 +76,6 @@ func (s *searches) handleDHTRes(res *dhtRes) {
sinfo, isIn := s.searches[res.Dest] sinfo, isIn := s.searches[res.Dest]
if !isIn || s.checkDHTRes(sinfo, res) { if !isIn || s.checkDHTRes(sinfo, res) {
// Either we don't recognize this search, or we just finished it // Either we don't recognize this search, or we just finished it
if isIn {
fmt.Println("DEBUG: search finished, length:", len(sinfo.visited))
}
return return
} else { } else {
// Add to the search and continue // Add to the search and continue
@ -92,6 +91,7 @@ func (s *searches) handleDHTRes(res *dhtRes) {
func (s *searches) addToSearch(sinfo *searchInfo, res *dhtRes) { func (s *searches) addToSearch(sinfo *searchInfo, res *dhtRes) {
// Add responses to toVisit if closer to dest than the res node // Add responses to toVisit if closer to dest than the res node
from := dhtInfo{key: res.Key, coords: res.Coords} from := dhtInfo{key: res.Key, coords: res.Coords}
sinfo.visited[*from.getNodeID()] = true
for _, info := range res.Infos { for _, info := range res.Infos {
if *info.getNodeID() == s.core.dht.nodeID || sinfo.visited[*info.getNodeID()] { if *info.getNodeID() == s.core.dht.nodeID || sinfo.visited[*info.getNodeID()] {
continue continue
@ -129,14 +129,12 @@ func (s *searches) doSearchStep(sinfo *searchInfo) {
if len(sinfo.toVisit) == 0 { if len(sinfo.toVisit) == 0 {
// Dead end, do cleanup // Dead end, do cleanup
delete(s.searches, sinfo.dest) delete(s.searches, sinfo.dest)
fmt.Println("DEBUG: search abandoned, length:", len(sinfo.visited))
return return
} else { } else {
// Send to the next search target // Send to the next search target
var next *dhtInfo var next *dhtInfo
next, sinfo.toVisit = sinfo.toVisit[0], sinfo.toVisit[1:] next, sinfo.toVisit = sinfo.toVisit[0], sinfo.toVisit[1:]
s.core.dht.ping(next, &sinfo.dest) s.core.dht.ping(next, &sinfo.dest)
sinfo.visited[*next.getNodeID()] = true
} }
} }