diff --git a/src/yggdrasil/admin.go b/src/yggdrasil/admin.go index c6af34b..3270365 100644 --- a/src/yggdrasil/admin.go +++ b/src/yggdrasil/admin.go @@ -10,6 +10,7 @@ import "net/url" import "sort" import "strings" import "strconv" +import "time" // TODO? Make all of this JSON // TODO: Add authentication @@ -339,27 +340,25 @@ func (a *admin) getData_getSwitchPeers() []admin_nodeInfo { func (a *admin) getData_getDHT() []admin_nodeInfo { var infos []admin_nodeInfo + now := time.Now() getDHT := func() { for i := 0; i < a.core.dht.nBuckets(); i++ { b := a.core.dht.getBucket(i) - for _, v := range b.other { - addr := *address_addrForNodeID(v.getNodeID()) - info := admin_nodeInfo{ - {"IP", net.IP(addr[:]).String()}, - {"coords", fmt.Sprint(v.coords)}, - {"bucket", fmt.Sprint(i)}, + getInfo := func(vs []*dhtInfo, isPeer bool) { + for _, v := range vs { + addr := *address_addrForNodeID(v.getNodeID()) + info := admin_nodeInfo{ + {"IP", net.IP(addr[:]).String()}, + {"coords", fmt.Sprint(v.coords)}, + {"bucket", fmt.Sprint(i)}, + {"peerOnly", fmt.Sprint(isPeer)}, + {"lastSeen", fmt.Sprint(now.Sub(v.recv))}, + } + infos = append(infos, info) } - infos = append(infos, info) - } - for _, v := range b.peers { - addr := *address_addrForNodeID(v.getNodeID()) - info := admin_nodeInfo{ - {"IP", net.IP(addr[:]).String()}, - {"coords", fmt.Sprint(v.coords)}, - {"bucket", fmt.Sprint(i)}, - } - infos = append(infos, info) } + getInfo(b.other, false) + getInfo(b.peers, true) } } a.core.router.doAdmin(getDHT) diff --git a/src/yggdrasil/dht.go b/src/yggdrasil/dht.go index 2f7c266..1f03bee 100644 --- a/src/yggdrasil/dht.go +++ b/src/yggdrasil/dht.go @@ -93,7 +93,7 @@ func (t *dht) handleReq(req *dhtReq) { key: t.core.boxPub, coords: coords, dest: req.dest, - infos: t.lookup(&req.dest), + infos: t.lookup(&req.dest, false), } t.sendRes(&res, req) // Also (possibly) add them to our DHT @@ -152,7 +152,7 @@ func (t *dht) handleRes(res *dhtRes) { } } -func (t *dht) lookup(nodeID *NodeID) []*dhtInfo { +func (t *dht) lookup(nodeID *NodeID, allowCloser bool) []*dhtInfo { // FIXME this allocates a bunch, sorts, and keeps the part it likes // It would be better to only track the part it likes to begin with addInfos := func(res []*dhtInfo, infos []*dhtInfo) []*dhtInfo { @@ -160,7 +160,7 @@ func (t *dht) lookup(nodeID *NodeID) []*dhtInfo { if info == nil { panic("Should never happen!") } - if dht_firstCloserThanThird(info.getNodeID(), nodeID, &t.nodeID) { + if allowCloser || dht_firstCloserThanThird(info.getNodeID(), nodeID, &t.nodeID) { res = append(res, info) } } @@ -197,14 +197,14 @@ func (t *dht) nBuckets() int { func (t *dht) insertIfNew(info *dhtInfo, isPeer bool) { //fmt.Println("DEBUG: dht insertIfNew:", info.getNodeID(), info.coords) - // Insert a peer if and only if the bucket doesn't already contain it + // Insert if no "other" entry already exists nodeID := info.getNodeID() bidx, isOK := t.getBucketIndex(nodeID) if !isOK { return } b := t.getBucket(bidx) - if !b.contains(info) { + if (isPeer && !b.containsOther(info)) || t.shouldInsert(info) { // We've never heard this node before // TODO is there a better time than "now" to set send/recv to? // (Is there another "natural" choice that bootstraps faster?) @@ -225,6 +225,11 @@ func (t *dht) insert(info *dhtInfo, isPeer bool) { return } b := t.getBucket(bidx) + if !isPeer && !b.containsOther(info) { + // This is a new entry, give it an old age so it's pinged sooner + // This speeds up bootstrapping + info.recv = info.recv.Add(-time.Hour) + } // First drop any existing entry from the bucket b.drop(&info.key) // Now add to the *end* of the bucket @@ -251,36 +256,45 @@ func (t *dht) getBucketIndex(nodeID *NodeID) (int, bool) { return t.nBuckets(), false } -func (b *bucket) contains(ninfo *dhtInfo) bool { +func dht_bucket_check(newInfo *dhtInfo, infos []*dhtInfo) bool { // Compares if key and coords match - var found bool - check := func(infos []*dhtInfo) { - for _, info := range infos { - if info == nil { - panic("Should never happen") - } - if info.key != info.key { - continue - } - if len(info.coords) != len(ninfo.coords) { - continue - } - match := true - for idx := 0; idx < len(info.coords); idx++ { - if info.coords[idx] != ninfo.coords[idx] { - match = false - break - } - } - if match { - found = true + if newInfo == nil { + panic("Should never happen") + } + for _, info := range infos { + if info == nil { + panic("Should never happen") + } + if info.key != newInfo.key { + continue + } + if len(info.coords) != len(newInfo.coords) { + continue + } + match := true + for idx := 0; idx < len(info.coords); idx++ { + if info.coords[idx] != newInfo.coords[idx] { + match = false break } } + if match { + return true + } } - check(b.peers) - check(b.other) - return found + return false +} + +func (b *bucket) containsPeer(info *dhtInfo) bool { + return dht_bucket_check(info, b.peers) +} + +func (b *bucket) containsOther(info *dhtInfo) bool { + return dht_bucket_check(info, b.other) +} + +func (b *bucket) contains(info *dhtInfo) bool { + return b.containsPeer(info) || b.containsOther(info) } func (b *bucket) drop(key *boxPubKey) { @@ -444,13 +458,14 @@ func (t *dht) doMaintenance() { t.offset = 0 } target := t.getTarget(t.offset) - for _, info := range t.lookup(target) { + for _, info := range t.lookup(target, true) { if time.Since(info.recv) > time.Minute { t.addToMill(info, target) + t.offset++ break } } - t.offset++ + //t.offset++ } for len(t.rumorMill) > 0 { var rumor dht_rumor @@ -459,24 +474,7 @@ func (t *dht) doMaintenance() { // Note that the above is a pointer comparison, and target can be nil // This is only for adding new nodes (learned from other lookups) // It only makes sense to ping if the node isn't already in the table - bidx, isOK := t.getBucketIndex(rumor.info.getNodeID()) - if !isOK { - continue - } - b := t.getBucket(bidx) - if b.contains(rumor.info) { - // Already know about this node - continue - } - // This is a good spot to check if a node is worth pinging - doPing := len(b.other) < dht_bucket_size - for _, info := range b.other { - if dht_firstCloserThanThird(rumor.info.getNodeID(), &t.nodeID, info.getNodeID()) { - // Add the node if they are closer to us than someone in the same bucket - doPing = true - } - } - if !doPing { + if !t.shouldInsert(rumor.info) { continue } } @@ -485,6 +483,26 @@ func (t *dht) doMaintenance() { } } +func (t *dht) shouldInsert(info *dhtInfo) bool { + bidx, isOK := t.getBucketIndex(info.getNodeID()) + if !isOK { + return false + } + b := t.getBucket(bidx) + if b.containsOther(info) { + return false + } + if len(b.other) < dht_bucket_size { + return true + } + for _, other := range b.other { + if dht_firstCloserThanThird(info.getNodeID(), &t.nodeID, other.getNodeID()) { + return true + } + } + return false +} + func dht_firstCloserThanThird(first *NodeID, second *NodeID, third *NodeID) bool { diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index c77c14d..78bfa36 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -77,7 +77,8 @@ func (r *router) mainLoop() { case p := <-r.send: r.sendPacket(p) case info := <-r.core.dht.peers: - r.core.dht.insertIfNew(info, true) + r.core.dht.insertIfNew(info, false) // Insert as a normal node + r.core.dht.insertIfNew(info, true) // Insert as a peer case <-r.reset: r.core.sessions.resetInits() r.core.dht.reset() diff --git a/src/yggdrasil/search.go b/src/yggdrasil/search.go index 3dcb8d5..d440661 100644 --- a/src/yggdrasil/search.go +++ b/src/yggdrasil/search.go @@ -84,7 +84,7 @@ func (s *searches) sendSearch(info *searchInfo) { } func (s *searches) handleSearchReq(req *searchReq) { - lookup := s.core.dht.lookup(&req.dest) + lookup := s.core.dht.lookup(&req.dest, false) sent := false //fmt.Println("DEBUG len:", len(lookup)) for _, info := range lookup {