5
0
mirror of https://github.com/cwinfo/yggdrasil-go.git synced 2024-11-10 05:10:26 +00:00

Merge pull request #81 from Arceliar/dht

More DHT updates
This commit is contained in:
Neil Alexander 2018-05-18 08:00:22 +01:00 committed by GitHub
commit 546c5f1412
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 86 additions and 68 deletions

View File

@ -10,6 +10,7 @@ import "net/url"
import "sort" import "sort"
import "strings" import "strings"
import "strconv" import "strconv"
import "time"
// TODO? Make all of this JSON // TODO? Make all of this JSON
// TODO: Add authentication // TODO: Add authentication
@ -339,27 +340,25 @@ func (a *admin) getData_getSwitchPeers() []admin_nodeInfo {
func (a *admin) getData_getDHT() []admin_nodeInfo { func (a *admin) getData_getDHT() []admin_nodeInfo {
var infos []admin_nodeInfo var infos []admin_nodeInfo
now := time.Now()
getDHT := func() { getDHT := func() {
for i := 0; i < a.core.dht.nBuckets(); i++ { for i := 0; i < a.core.dht.nBuckets(); i++ {
b := a.core.dht.getBucket(i) b := a.core.dht.getBucket(i)
for _, v := range b.other { getInfo := func(vs []*dhtInfo, isPeer bool) {
addr := *address_addrForNodeID(v.getNodeID()) for _, v := range vs {
info := admin_nodeInfo{ addr := *address_addrForNodeID(v.getNodeID())
{"IP", net.IP(addr[:]).String()}, info := admin_nodeInfo{
{"coords", fmt.Sprint(v.coords)}, {"IP", net.IP(addr[:]).String()},
{"bucket", fmt.Sprint(i)}, {"coords", fmt.Sprint(v.coords)},
{"bucket", fmt.Sprint(i)},
{"peerOnly", fmt.Sprint(isPeer)},
{"lastSeen", fmt.Sprint(now.Sub(v.recv))},
}
infos = append(infos, info)
} }
infos = append(infos, info)
}
for _, v := range b.peers {
addr := *address_addrForNodeID(v.getNodeID())
info := admin_nodeInfo{
{"IP", net.IP(addr[:]).String()},
{"coords", fmt.Sprint(v.coords)},
{"bucket", fmt.Sprint(i)},
}
infos = append(infos, info)
} }
getInfo(b.other, false)
getInfo(b.peers, true)
} }
} }
a.core.router.doAdmin(getDHT) a.core.router.doAdmin(getDHT)

View File

@ -93,7 +93,7 @@ func (t *dht) handleReq(req *dhtReq) {
key: t.core.boxPub, key: t.core.boxPub,
coords: coords, coords: coords,
dest: req.dest, dest: req.dest,
infos: t.lookup(&req.dest), infos: t.lookup(&req.dest, false),
} }
t.sendRes(&res, req) t.sendRes(&res, req)
// Also (possibly) add them to our DHT // Also (possibly) add them to our DHT
@ -152,7 +152,7 @@ func (t *dht) handleRes(res *dhtRes) {
} }
} }
func (t *dht) lookup(nodeID *NodeID) []*dhtInfo { func (t *dht) lookup(nodeID *NodeID, allowCloser bool) []*dhtInfo {
// FIXME this allocates a bunch, sorts, and keeps the part it likes // FIXME this allocates a bunch, sorts, and keeps the part it likes
// It would be better to only track the part it likes to begin with // It would be better to only track the part it likes to begin with
addInfos := func(res []*dhtInfo, infos []*dhtInfo) []*dhtInfo { addInfos := func(res []*dhtInfo, infos []*dhtInfo) []*dhtInfo {
@ -160,7 +160,7 @@ func (t *dht) lookup(nodeID *NodeID) []*dhtInfo {
if info == nil { if info == nil {
panic("Should never happen!") panic("Should never happen!")
} }
if dht_firstCloserThanThird(info.getNodeID(), nodeID, &t.nodeID) { if allowCloser || dht_firstCloserThanThird(info.getNodeID(), nodeID, &t.nodeID) {
res = append(res, info) res = append(res, info)
} }
} }
@ -197,14 +197,14 @@ func (t *dht) nBuckets() int {
func (t *dht) insertIfNew(info *dhtInfo, isPeer bool) { func (t *dht) insertIfNew(info *dhtInfo, isPeer bool) {
//fmt.Println("DEBUG: dht insertIfNew:", info.getNodeID(), info.coords) //fmt.Println("DEBUG: dht insertIfNew:", info.getNodeID(), info.coords)
// Insert a peer if and only if the bucket doesn't already contain it // Insert if no "other" entry already exists
nodeID := info.getNodeID() nodeID := info.getNodeID()
bidx, isOK := t.getBucketIndex(nodeID) bidx, isOK := t.getBucketIndex(nodeID)
if !isOK { if !isOK {
return return
} }
b := t.getBucket(bidx) b := t.getBucket(bidx)
if !b.contains(info) { if (isPeer && !b.containsOther(info)) || t.shouldInsert(info) {
// We've never heard this node before // We've never heard this node before
// TODO is there a better time than "now" to set send/recv to? // TODO is there a better time than "now" to set send/recv to?
// (Is there another "natural" choice that bootstraps faster?) // (Is there another "natural" choice that bootstraps faster?)
@ -225,6 +225,11 @@ func (t *dht) insert(info *dhtInfo, isPeer bool) {
return return
} }
b := t.getBucket(bidx) b := t.getBucket(bidx)
if !isPeer && !b.containsOther(info) {
// This is a new entry, give it an old age so it's pinged sooner
// This speeds up bootstrapping
info.recv = info.recv.Add(-time.Hour)
}
// First drop any existing entry from the bucket // First drop any existing entry from the bucket
b.drop(&info.key) b.drop(&info.key)
// Now add to the *end* of the bucket // Now add to the *end* of the bucket
@ -251,36 +256,45 @@ func (t *dht) getBucketIndex(nodeID *NodeID) (int, bool) {
return t.nBuckets(), false return t.nBuckets(), false
} }
func (b *bucket) contains(ninfo *dhtInfo) bool { func dht_bucket_check(newInfo *dhtInfo, infos []*dhtInfo) bool {
// Compares if key and coords match // Compares if key and coords match
var found bool if newInfo == nil {
check := func(infos []*dhtInfo) { panic("Should never happen")
for _, info := range infos { }
if info == nil { for _, info := range infos {
panic("Should never happen") if info == nil {
} panic("Should never happen")
if info.key != info.key { }
continue if info.key != newInfo.key {
} continue
if len(info.coords) != len(ninfo.coords) { }
continue if len(info.coords) != len(newInfo.coords) {
} continue
match := true }
for idx := 0; idx < len(info.coords); idx++ { match := true
if info.coords[idx] != ninfo.coords[idx] { for idx := 0; idx < len(info.coords); idx++ {
match = false if info.coords[idx] != newInfo.coords[idx] {
break match = false
}
}
if match {
found = true
break break
} }
} }
if match {
return true
}
} }
check(b.peers) return false
check(b.other) }
return found
func (b *bucket) containsPeer(info *dhtInfo) bool {
return dht_bucket_check(info, b.peers)
}
func (b *bucket) containsOther(info *dhtInfo) bool {
return dht_bucket_check(info, b.other)
}
func (b *bucket) contains(info *dhtInfo) bool {
return b.containsPeer(info) || b.containsOther(info)
} }
func (b *bucket) drop(key *boxPubKey) { func (b *bucket) drop(key *boxPubKey) {
@ -444,13 +458,14 @@ func (t *dht) doMaintenance() {
t.offset = 0 t.offset = 0
} }
target := t.getTarget(t.offset) target := t.getTarget(t.offset)
for _, info := range t.lookup(target) { for _, info := range t.lookup(target, true) {
if time.Since(info.recv) > time.Minute { if time.Since(info.recv) > time.Minute {
t.addToMill(info, target) t.addToMill(info, target)
t.offset++
break break
} }
} }
t.offset++ //t.offset++
} }
for len(t.rumorMill) > 0 { for len(t.rumorMill) > 0 {
var rumor dht_rumor var rumor dht_rumor
@ -459,24 +474,7 @@ func (t *dht) doMaintenance() {
// Note that the above is a pointer comparison, and target can be nil // Note that the above is a pointer comparison, and target can be nil
// This is only for adding new nodes (learned from other lookups) // This is only for adding new nodes (learned from other lookups)
// It only makes sense to ping if the node isn't already in the table // It only makes sense to ping if the node isn't already in the table
bidx, isOK := t.getBucketIndex(rumor.info.getNodeID()) if !t.shouldInsert(rumor.info) {
if !isOK {
continue
}
b := t.getBucket(bidx)
if b.contains(rumor.info) {
// Already know about this node
continue
}
// This is a good spot to check if a node is worth pinging
doPing := len(b.other) < dht_bucket_size
for _, info := range b.other {
if dht_firstCloserThanThird(rumor.info.getNodeID(), &t.nodeID, info.getNodeID()) {
// Add the node if they are closer to us than someone in the same bucket
doPing = true
}
}
if !doPing {
continue continue
} }
} }
@ -485,6 +483,26 @@ func (t *dht) doMaintenance() {
} }
} }
func (t *dht) shouldInsert(info *dhtInfo) bool {
bidx, isOK := t.getBucketIndex(info.getNodeID())
if !isOK {
return false
}
b := t.getBucket(bidx)
if b.containsOther(info) {
return false
}
if len(b.other) < dht_bucket_size {
return true
}
for _, other := range b.other {
if dht_firstCloserThanThird(info.getNodeID(), &t.nodeID, other.getNodeID()) {
return true
}
}
return false
}
func dht_firstCloserThanThird(first *NodeID, func dht_firstCloserThanThird(first *NodeID,
second *NodeID, second *NodeID,
third *NodeID) bool { third *NodeID) bool {

View File

@ -77,7 +77,8 @@ func (r *router) mainLoop() {
case p := <-r.send: case p := <-r.send:
r.sendPacket(p) r.sendPacket(p)
case info := <-r.core.dht.peers: case info := <-r.core.dht.peers:
r.core.dht.insertIfNew(info, true) r.core.dht.insertIfNew(info, false) // Insert as a normal node
r.core.dht.insertIfNew(info, true) // Insert as a peer
case <-r.reset: case <-r.reset:
r.core.sessions.resetInits() r.core.sessions.resetInits()
r.core.dht.reset() r.core.dht.reset()

View File

@ -84,7 +84,7 @@ func (s *searches) sendSearch(info *searchInfo) {
} }
func (s *searches) handleSearchReq(req *searchReq) { func (s *searches) handleSearchReq(req *searchReq) {
lookup := s.core.dht.lookup(&req.dest) lookup := s.core.dht.lookup(&req.dest, false)
sent := false sent := false
//fmt.Println("DEBUG len:", len(lookup)) //fmt.Println("DEBUG len:", len(lookup))
for _, info := range lookup { for _, info := range lookup {