From 8d9887294cfa5e9e9e35899944c74c54834d89af Mon Sep 17 00:00:00 2001 From: Arceliar Date: Thu, 17 May 2018 19:32:29 -0500 Subject: [PATCH 1/3] add dht time since last ping to admin socket, some DHT code cleanup, bugfix to insertIfNew --- src/yggdrasil/admin.go | 30 +++++++++++------------ src/yggdrasil/dht.go | 55 +++++++++++++++++++----------------------- 2 files changed, 39 insertions(+), 46 deletions(-) diff --git a/src/yggdrasil/admin.go b/src/yggdrasil/admin.go index c6af34b..8682f75 100644 --- a/src/yggdrasil/admin.go +++ b/src/yggdrasil/admin.go @@ -10,6 +10,7 @@ import "net/url" import "sort" import "strings" import "strconv" +import "time" // TODO? Make all of this JSON // TODO: Add authentication @@ -339,27 +340,24 @@ func (a *admin) getData_getSwitchPeers() []admin_nodeInfo { func (a *admin) getData_getDHT() []admin_nodeInfo { var infos []admin_nodeInfo + now := time.Now() getDHT := func() { for i := 0; i < a.core.dht.nBuckets(); i++ { b := a.core.dht.getBucket(i) - for _, v := range b.other { - addr := *address_addrForNodeID(v.getNodeID()) - info := admin_nodeInfo{ - {"IP", net.IP(addr[:]).String()}, - {"coords", fmt.Sprint(v.coords)}, - {"bucket", fmt.Sprint(i)}, + getInfo := func(vs []*dhtInfo) { + for _, v := range vs { + addr := *address_addrForNodeID(v.getNodeID()) + info := admin_nodeInfo{ + {"IP", net.IP(addr[:]).String()}, + {"coords", fmt.Sprint(v.coords)}, + {"bucket", fmt.Sprint(i)}, + {"lastSeen", fmt.Sprint(now.Sub(v.recv))}, + } + infos = append(infos, info) } - infos = append(infos, info) - } - for _, v := range b.peers { - addr := *address_addrForNodeID(v.getNodeID()) - info := admin_nodeInfo{ - {"IP", net.IP(addr[:]).String()}, - {"coords", fmt.Sprint(v.coords)}, - {"bucket", fmt.Sprint(i)}, - } - infos = append(infos, info) } + getInfo(b.other) + getInfo(b.peers) } } a.core.router.doAdmin(getDHT) diff --git a/src/yggdrasil/dht.go b/src/yggdrasil/dht.go index 2f7c266..5c7be3c 100644 --- a/src/yggdrasil/dht.go +++ b/src/yggdrasil/dht.go @@ -198,20 +198,12 @@ func (t *dht) nBuckets() int { func (t *dht) insertIfNew(info *dhtInfo, isPeer bool) { //fmt.Println("DEBUG: dht insertIfNew:", info.getNodeID(), info.coords) // Insert a peer if and only if the bucket doesn't already contain it - nodeID := info.getNodeID() - bidx, isOK := t.getBucketIndex(nodeID) - if !isOK { + if !t.shouldInsert(info) { return } - b := t.getBucket(bidx) - if !b.contains(info) { - // We've never heard this node before - // TODO is there a better time than "now" to set send/recv to? - // (Is there another "natural" choice that bootstraps faster?) - info.send = time.Now() - info.recv = info.send - t.insert(info, isPeer) - } + info.send = time.Now() + info.recv = info.send + t.insert(info, isPeer) } func (t *dht) insert(info *dhtInfo, isPeer bool) { @@ -459,24 +451,7 @@ func (t *dht) doMaintenance() { // Note that the above is a pointer comparison, and target can be nil // This is only for adding new nodes (learned from other lookups) // It only makes sense to ping if the node isn't already in the table - bidx, isOK := t.getBucketIndex(rumor.info.getNodeID()) - if !isOK { - continue - } - b := t.getBucket(bidx) - if b.contains(rumor.info) { - // Already know about this node - continue - } - // This is a good spot to check if a node is worth pinging - doPing := len(b.other) < dht_bucket_size - for _, info := range b.other { - if dht_firstCloserThanThird(rumor.info.getNodeID(), &t.nodeID, info.getNodeID()) { - // Add the node if they are closer to us than someone in the same bucket - doPing = true - } - } - if !doPing { + if !t.shouldInsert(rumor.info) { continue } } @@ -485,6 +460,26 @@ func (t *dht) doMaintenance() { } } +func (t *dht) shouldInsert(info *dhtInfo) bool { + bidx, isOK := t.getBucketIndex(info.getNodeID()) + if !isOK { + return false + } + b := t.getBucket(bidx) + if b.contains(info) { + return false + } + if len(b.other) < dht_bucket_size { + return true + } + for _, other := range b.other { + if dht_firstCloserThanThird(info.getNodeID(), &t.nodeID, other.getNodeID()) { + return true + } + } + return false +} + func dht_firstCloserThanThird(first *NodeID, second *NodeID, third *NodeID) bool { From fe518f4e3fbdf6110e86c7ce48bd278098c913ff Mon Sep 17 00:00:00 2001 From: Arceliar Date: Thu, 17 May 2018 21:20:31 -0500 Subject: [PATCH 2/3] bugfixes related to peer timeouts in the DHT, significantly improve DHT bootstrap speed --- src/yggdrasil/dht.go | 91 ++++++++++++++++++++++++----------------- src/yggdrasil/router.go | 3 +- src/yggdrasil/search.go | 2 +- 3 files changed, 57 insertions(+), 39 deletions(-) diff --git a/src/yggdrasil/dht.go b/src/yggdrasil/dht.go index 5c7be3c..90df62b 100644 --- a/src/yggdrasil/dht.go +++ b/src/yggdrasil/dht.go @@ -93,7 +93,7 @@ func (t *dht) handleReq(req *dhtReq) { key: t.core.boxPub, coords: coords, dest: req.dest, - infos: t.lookup(&req.dest), + infos: t.lookup(&req.dest, false), } t.sendRes(&res, req) // Also (possibly) add them to our DHT @@ -152,7 +152,7 @@ func (t *dht) handleRes(res *dhtRes) { } } -func (t *dht) lookup(nodeID *NodeID) []*dhtInfo { +func (t *dht) lookup(nodeID *NodeID, allowCloser bool) []*dhtInfo { // FIXME this allocates a bunch, sorts, and keeps the part it likes // It would be better to only track the part it likes to begin with addInfos := func(res []*dhtInfo, infos []*dhtInfo) []*dhtInfo { @@ -160,7 +160,7 @@ func (t *dht) lookup(nodeID *NodeID) []*dhtInfo { if info == nil { panic("Should never happen!") } - if dht_firstCloserThanThird(info.getNodeID(), nodeID, &t.nodeID) { + if allowCloser || dht_firstCloserThanThird(info.getNodeID(), nodeID, &t.nodeID) { res = append(res, info) } } @@ -197,13 +197,15 @@ func (t *dht) nBuckets() int { func (t *dht) insertIfNew(info *dhtInfo, isPeer bool) { //fmt.Println("DEBUG: dht insertIfNew:", info.getNodeID(), info.coords) - // Insert a peer if and only if the bucket doesn't already contain it - if !t.shouldInsert(info) { - return + // Always inserts peers, inserts other nodes if not already present + if isPeer || t.shouldInsert(info) { + // We've never heard this node before + // TODO is there a better time than "now" to set send/recv to? + // (Is there another "natural" choice that bootstraps faster?) + info.send = time.Now() + info.recv = info.send + t.insert(info, isPeer) } - info.send = time.Now() - info.recv = info.send - t.insert(info, isPeer) } func (t *dht) insert(info *dhtInfo, isPeer bool) { @@ -217,6 +219,11 @@ func (t *dht) insert(info *dhtInfo, isPeer bool) { return } b := t.getBucket(bidx) + if !isPeer && !b.containsOther(info) { + // This is a new entry, give it an old age so it's pinged sooner + // This speeds up bootstrapping + info.recv = info.recv.Add(-time.Minute) + } // First drop any existing entry from the bucket b.drop(&info.key) // Now add to the *end* of the bucket @@ -243,36 +250,45 @@ func (t *dht) getBucketIndex(nodeID *NodeID) (int, bool) { return t.nBuckets(), false } -func (b *bucket) contains(ninfo *dhtInfo) bool { +func dht_bucket_check(newInfo *dhtInfo, infos []*dhtInfo) bool { // Compares if key and coords match - var found bool - check := func(infos []*dhtInfo) { - for _, info := range infos { - if info == nil { - panic("Should never happen") - } - if info.key != info.key { - continue - } - if len(info.coords) != len(ninfo.coords) { - continue - } - match := true - for idx := 0; idx < len(info.coords); idx++ { - if info.coords[idx] != ninfo.coords[idx] { - match = false - break - } - } - if match { - found = true + if newInfo == nil { + panic("Should never happen") + } + for _, info := range infos { + if info == nil { + panic("Should never happen") + } + if info.key != newInfo.key { + continue + } + if len(info.coords) != len(newInfo.coords) { + continue + } + match := true + for idx := 0; idx < len(info.coords); idx++ { + if info.coords[idx] != newInfo.coords[idx] { + match = false break } } + if match { + return true + } } - check(b.peers) - check(b.other) - return found + return false +} + +func (b *bucket) containsPeer(info *dhtInfo) bool { + return dht_bucket_check(info, b.peers) +} + +func (b *bucket) containsOther(info *dhtInfo) bool { + return dht_bucket_check(info, b.other) +} + +func (b *bucket) contains(info *dhtInfo) bool { + return b.containsPeer(info) || b.containsOther(info) } func (b *bucket) drop(key *boxPubKey) { @@ -436,13 +452,14 @@ func (t *dht) doMaintenance() { t.offset = 0 } target := t.getTarget(t.offset) - for _, info := range t.lookup(target) { + for _, info := range t.lookup(target, true) { if time.Since(info.recv) > time.Minute { t.addToMill(info, target) + t.offset++ break } } - t.offset++ + //t.offset++ } for len(t.rumorMill) > 0 { var rumor dht_rumor @@ -466,7 +483,7 @@ func (t *dht) shouldInsert(info *dhtInfo) bool { return false } b := t.getBucket(bidx) - if b.contains(info) { + if b.containsOther(info) { return false } if len(b.other) < dht_bucket_size { diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index c77c14d..78bfa36 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -77,7 +77,8 @@ func (r *router) mainLoop() { case p := <-r.send: r.sendPacket(p) case info := <-r.core.dht.peers: - r.core.dht.insertIfNew(info, true) + r.core.dht.insertIfNew(info, false) // Insert as a normal node + r.core.dht.insertIfNew(info, true) // Insert as a peer case <-r.reset: r.core.sessions.resetInits() r.core.dht.reset() diff --git a/src/yggdrasil/search.go b/src/yggdrasil/search.go index 3dcb8d5..d440661 100644 --- a/src/yggdrasil/search.go +++ b/src/yggdrasil/search.go @@ -84,7 +84,7 @@ func (s *searches) sendSearch(info *searchInfo) { } func (s *searches) handleSearchReq(req *searchReq) { - lookup := s.core.dht.lookup(&req.dest) + lookup := s.core.dht.lookup(&req.dest, false) sent := false //fmt.Println("DEBUG len:", len(lookup)) for _, info := range lookup { From ec8fe338d5a423f4081c718a459f1acc952e15e6 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Thu, 17 May 2018 21:43:26 -0500 Subject: [PATCH 3/3] more insertIfNew bugfixes, and add peerOnly to getDHT output (true if a node is in the bucket.peers slice instead of bucket.others--it means they're not regularly pinged, they're only there to make sure DHT lookups include them as a result, for bootstrapping reasons) --- src/yggdrasil/admin.go | 7 ++++--- src/yggdrasil/dht.go | 12 +++++++++--- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/src/yggdrasil/admin.go b/src/yggdrasil/admin.go index 8682f75..3270365 100644 --- a/src/yggdrasil/admin.go +++ b/src/yggdrasil/admin.go @@ -344,20 +344,21 @@ func (a *admin) getData_getDHT() []admin_nodeInfo { getDHT := func() { for i := 0; i < a.core.dht.nBuckets(); i++ { b := a.core.dht.getBucket(i) - getInfo := func(vs []*dhtInfo) { + getInfo := func(vs []*dhtInfo, isPeer bool) { for _, v := range vs { addr := *address_addrForNodeID(v.getNodeID()) info := admin_nodeInfo{ {"IP", net.IP(addr[:]).String()}, {"coords", fmt.Sprint(v.coords)}, {"bucket", fmt.Sprint(i)}, + {"peerOnly", fmt.Sprint(isPeer)}, {"lastSeen", fmt.Sprint(now.Sub(v.recv))}, } infos = append(infos, info) } } - getInfo(b.other) - getInfo(b.peers) + getInfo(b.other, false) + getInfo(b.peers, true) } } a.core.router.doAdmin(getDHT) diff --git a/src/yggdrasil/dht.go b/src/yggdrasil/dht.go index 90df62b..1f03bee 100644 --- a/src/yggdrasil/dht.go +++ b/src/yggdrasil/dht.go @@ -197,8 +197,14 @@ func (t *dht) nBuckets() int { func (t *dht) insertIfNew(info *dhtInfo, isPeer bool) { //fmt.Println("DEBUG: dht insertIfNew:", info.getNodeID(), info.coords) - // Always inserts peers, inserts other nodes if not already present - if isPeer || t.shouldInsert(info) { + // Insert if no "other" entry already exists + nodeID := info.getNodeID() + bidx, isOK := t.getBucketIndex(nodeID) + if !isOK { + return + } + b := t.getBucket(bidx) + if (isPeer && !b.containsOther(info)) || t.shouldInsert(info) { // We've never heard this node before // TODO is there a better time than "now" to set send/recv to? // (Is there another "natural" choice that bootstraps faster?) @@ -222,7 +228,7 @@ func (t *dht) insert(info *dhtInfo, isPeer bool) { if !isPeer && !b.containsOther(info) { // This is a new entry, give it an old age so it's pinged sooner // This speeds up bootstrapping - info.recv = info.recv.Add(-time.Minute) + info.recv = info.recv.Add(-time.Hour) } // First drop any existing entry from the bucket b.drop(&info.key)