mirror of
https://github.com/cwinfo/yggdrasil-go.git
synced 2024-11-25 13:51:35 +00:00
keep peers separate from other nodes in dht
This commit is contained in:
parent
e04ab7cfe6
commit
9ce0b7fbea
@ -290,7 +290,16 @@ func (a *admin) getData_getDHT() []admin_nodeInfo {
|
||||
getDHT := func() {
|
||||
for i := 0; i < a.core.dht.nBuckets(); i++ {
|
||||
b := a.core.dht.getBucket(i)
|
||||
for _, v := range b.infos {
|
||||
for _, v := range b.other {
|
||||
addr := *address_addrForNodeID(v.getNodeID())
|
||||
info := admin_nodeInfo{
|
||||
{"IP", net.IP(addr[:]).String()},
|
||||
{"coords", fmt.Sprint(v.coords)},
|
||||
{"bucket", fmt.Sprint(i)},
|
||||
}
|
||||
infos = append(infos, info)
|
||||
}
|
||||
for _, v := range b.peers {
|
||||
addr := *address_addrForNodeID(v.getNodeID())
|
||||
info := admin_nodeInfo{
|
||||
{"IP", net.IP(addr[:]).String()},
|
||||
|
@ -154,7 +154,8 @@ func (c *Core) DEBUG_getDHTSize() int {
|
||||
total := 0
|
||||
for bidx := 0; bidx < c.dht.nBuckets(); bidx++ {
|
||||
b := c.dht.getBucket(bidx)
|
||||
total += len(b.infos)
|
||||
total += len(b.peers)
|
||||
total += len(b.other)
|
||||
}
|
||||
return total
|
||||
}
|
||||
|
@ -46,7 +46,8 @@ func (info *dhtInfo) getNodeID() *NodeID {
|
||||
}
|
||||
|
||||
type bucket struct {
|
||||
infos []*dhtInfo
|
||||
peers []*dhtInfo
|
||||
other []*dhtInfo
|
||||
}
|
||||
|
||||
type dhtReq struct {
|
||||
@ -100,7 +101,7 @@ func (t *dht) handleReq(req *dhtReq) {
|
||||
key: req.key,
|
||||
coords: req.coords,
|
||||
}
|
||||
t.insertIfNew(&info) // This seems DoSable (we just trust their coords...)
|
||||
t.insertIfNew(&info, false) // This seems DoSable (we just trust their coords...)
|
||||
//if req.dest != t.nodeID { t.ping(&info, info.getNodeID()) } // Or spam...
|
||||
}
|
||||
|
||||
@ -125,13 +126,18 @@ func (t *dht) handleRes(res *dhtRes) {
|
||||
return
|
||||
}
|
||||
b := t.getBucket(bidx)
|
||||
for _, oldinfo := range b.infos {
|
||||
for _, oldinfo := range b.peers {
|
||||
if oldinfo.key == rinfo.key {
|
||||
rinfo.send = oldinfo.send
|
||||
}
|
||||
}
|
||||
for _, oldinfo := range b.other {
|
||||
if oldinfo.key == rinfo.key {
|
||||
rinfo.send = oldinfo.send
|
||||
}
|
||||
}
|
||||
// Insert into table
|
||||
t.insert(&rinfo)
|
||||
t.insert(&rinfo, false)
|
||||
if res.dest == *rinfo.getNodeID() {
|
||||
return
|
||||
} // No infinite recursions
|
||||
@ -170,7 +176,8 @@ func (t *dht) lookup(nodeID *NodeID) []*dhtInfo {
|
||||
var res []*dhtInfo
|
||||
for bidx := 0; bidx < t.nBuckets(); bidx++ {
|
||||
b := t.getBucket(bidx)
|
||||
res = addInfos(res, b.infos)
|
||||
res = addInfos(res, b.peers)
|
||||
res = addInfos(res, b.other)
|
||||
}
|
||||
doSort := func(infos []*dhtInfo) {
|
||||
less := func(i, j int) bool {
|
||||
@ -195,7 +202,7 @@ func (t *dht) nBuckets() int {
|
||||
return len(t.buckets_hidden)
|
||||
}
|
||||
|
||||
func (t *dht) insertIfNew(info *dhtInfo) {
|
||||
func (t *dht) insertIfNew(info *dhtInfo, isPeer bool) {
|
||||
//fmt.Println("DEBUG: dht insertIfNew:", info.getNodeID(), info.coords)
|
||||
// Insert a peer if and only if the bucket doesn't already contain it
|
||||
nodeID := info.getNodeID()
|
||||
@ -210,11 +217,11 @@ func (t *dht) insertIfNew(info *dhtInfo) {
|
||||
// (Is there another "natural" choice that bootstraps faster?)
|
||||
info.send = time.Now()
|
||||
info.recv = info.send
|
||||
t.insert(info)
|
||||
t.insert(info, isPeer)
|
||||
}
|
||||
}
|
||||
|
||||
func (t *dht) insert(info *dhtInfo) {
|
||||
func (t *dht) insert(info *dhtInfo, isPeer bool) {
|
||||
//fmt.Println("DEBUG: dht insert:", info.getNodeID(), info.coords)
|
||||
// First update the time on this info
|
||||
info.recv = time.Now()
|
||||
@ -228,18 +235,23 @@ func (t *dht) insert(info *dhtInfo) {
|
||||
// First drop any existing entry from the bucket
|
||||
b.drop(&info.key)
|
||||
// Now add to the *end* of the bucket
|
||||
b.infos = append(b.infos, info)
|
||||
if isPeer {
|
||||
// TODO make sure we don't duplicate peers in b.other too
|
||||
b.peers = append(b.peers, info)
|
||||
return
|
||||
}
|
||||
b.other = append(b.other, info)
|
||||
// Check if the next bucket is non-full and return early if it is
|
||||
if bidx+1 == t.nBuckets() {
|
||||
return
|
||||
}
|
||||
bnext := t.getBucket(bidx + 1)
|
||||
if len(bnext.infos) < dht_bucket_size {
|
||||
if len(bnext.other) < dht_bucket_size {
|
||||
return
|
||||
}
|
||||
// Shrink from the *front* to requied size
|
||||
for len(b.infos) > dht_bucket_size {
|
||||
b.infos = b.infos[1:]
|
||||
for len(b.other) > dht_bucket_size {
|
||||
b.other = b.other[1:]
|
||||
}
|
||||
}
|
||||
|
||||
@ -256,23 +268,34 @@ func (t *dht) getBucketIndex(nodeID *NodeID) (int, bool) {
|
||||
|
||||
func (b *bucket) contains(ninfo *dhtInfo) bool {
|
||||
// Compares if key and coords match
|
||||
for _, info := range b.infos {
|
||||
var found bool
|
||||
check := func(infos []*dhtInfo) {
|
||||
for _, info := range infos {
|
||||
if info == nil {
|
||||
panic("Should never happen")
|
||||
}
|
||||
if info.key == ninfo.key {
|
||||
if len(info.coords) != len(ninfo.coords) {
|
||||
return false
|
||||
if info.key != info.key {
|
||||
continue
|
||||
}
|
||||
if len(info.coords) != len(ninfo.coords) {
|
||||
continue
|
||||
}
|
||||
match := true
|
||||
for idx := 0; idx < len(info.coords); idx++ {
|
||||
if info.coords[idx] != ninfo.coords[idx] {
|
||||
return false
|
||||
match = false
|
||||
break
|
||||
}
|
||||
}
|
||||
return true
|
||||
if match {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
check(b.peers)
|
||||
check(b.other)
|
||||
return found
|
||||
}
|
||||
|
||||
func (b *bucket) drop(key *boxPubKey) {
|
||||
@ -286,7 +309,8 @@ func (b *bucket) drop(key *boxPubKey) {
|
||||
}
|
||||
return cleaned
|
||||
}
|
||||
b.infos = clean(b.infos)
|
||||
b.peers = clean(b.peers)
|
||||
b.other = clean(b.other)
|
||||
}
|
||||
|
||||
func (t *dht) sendReq(req *dhtReq, dest *dhtInfo) {
|
||||
@ -333,7 +357,7 @@ func (t *dht) sendRes(res *dhtRes, req *dhtReq) {
|
||||
}
|
||||
|
||||
func (b *bucket) isEmpty() bool {
|
||||
return len(b.infos) == 0
|
||||
return len(b.peers)+len(b.other) == 0
|
||||
}
|
||||
|
||||
func (b *bucket) nextToPing() *dhtInfo {
|
||||
@ -343,7 +367,8 @@ func (b *bucket) nextToPing() *dhtInfo {
|
||||
// Gives them time to respond
|
||||
// And time between traffic loss from short term congestion in the network
|
||||
var toPing *dhtInfo
|
||||
for _, next := range b.infos {
|
||||
update := func(infos []*dhtInfo) {
|
||||
for _, next := range infos {
|
||||
if time.Since(next.send) < 6*time.Second {
|
||||
continue
|
||||
}
|
||||
@ -351,6 +376,9 @@ func (b *bucket) nextToPing() *dhtInfo {
|
||||
toPing = next
|
||||
}
|
||||
}
|
||||
}
|
||||
update(b.peers)
|
||||
update(b.other)
|
||||
return toPing
|
||||
}
|
||||
|
||||
|
@ -77,7 +77,7 @@ func (r *router) mainLoop() {
|
||||
case p := <-r.send:
|
||||
r.sendPacket(p)
|
||||
case info := <-r.core.dht.peers:
|
||||
r.core.dht.insert(info) //r.core.dht.insertIfNew(info)
|
||||
r.core.dht.insertIfNew(info, true)
|
||||
case <-r.reset:
|
||||
r.core.sessions.resetInits()
|
||||
case <-ticker.C:
|
||||
|
@ -113,7 +113,7 @@ func generateConfig(isAutoconf bool) *nodeConfig {
|
||||
cfg.Listen = "[::]:0"
|
||||
} else {
|
||||
r1 := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
cfg.Listen = fmt.Sprintf("[::]:%d", r1.Intn(65534 - 32768) + 32768)
|
||||
cfg.Listen = fmt.Sprintf("[::]:%d", r1.Intn(65534-32768)+32768)
|
||||
}
|
||||
cfg.AdminListen = "[::1]:9001"
|
||||
cfg.BoxPub = hex.EncodeToString(bpub[:])
|
||||
|
Loading…
Reference in New Issue
Block a user