mirror of
https://github.com/cwinfo/yggdrasil-go.git
synced 2024-11-22 18:50:27 +00:00
misc dht and tree changes to stabilize coords and bootstrap the dht faster
This commit is contained in:
parent
96c55da987
commit
8228242eed
@ -6,6 +6,7 @@ import "os"
|
|||||||
import "strings"
|
import "strings"
|
||||||
import "strconv"
|
import "strconv"
|
||||||
import "time"
|
import "time"
|
||||||
|
import "log"
|
||||||
|
|
||||||
import "runtime/pprof"
|
import "runtime/pprof"
|
||||||
import "flag"
|
import "flag"
|
||||||
@ -418,12 +419,12 @@ func main() {
|
|||||||
idxstore := loadGraph("misc/sim/fc00-2017-08-12.txt")
|
idxstore := loadGraph("misc/sim/fc00-2017-08-12.txt")
|
||||||
//idxstore := loadGraph("skitter")
|
//idxstore := loadGraph("skitter")
|
||||||
kstore := getKeyedStore(idxstore)
|
kstore := getKeyedStore(idxstore)
|
||||||
/*
|
//*
|
||||||
|
logger := log.New(os.Stderr, "", log.Flags())
|
||||||
for _, n := range kstore {
|
for _, n := range kstore {
|
||||||
log := n.core.DEBUG_getLogger()
|
n.core.DEBUG_setLogger(logger)
|
||||||
log.SetOutput(os.Stderr)
|
|
||||||
}
|
}
|
||||||
*/
|
//*/
|
||||||
startNetwork(kstore)
|
startNetwork(kstore)
|
||||||
//time.Sleep(10*time.Second)
|
//time.Sleep(10*time.Second)
|
||||||
// Note that testPaths only works if pressure is turend off
|
// Note that testPaths only works if pressure is turend off
|
||||||
|
@ -25,9 +25,9 @@ import "time"
|
|||||||
|
|
||||||
// Maximum size for buckets and lookups
|
// Maximum size for buckets and lookups
|
||||||
// Exception for buckets if the next one is non-full
|
// Exception for buckets if the next one is non-full
|
||||||
const dht_bucket_size = 2 // This should be at least 2
|
|
||||||
const dht_lookup_size = 2 // This should be at least 1, below 2 is impractical
|
|
||||||
const dht_bucket_number = 8 * NodeIDLen // This shouldn't be changed
|
const dht_bucket_number = 8 * NodeIDLen // This shouldn't be changed
|
||||||
|
const dht_bucket_size = 2 // This should be at least 2
|
||||||
|
const dht_lookup_size = 16 // This should be at least 1, below 2 is impractical
|
||||||
|
|
||||||
type dhtInfo struct {
|
type dhtInfo struct {
|
||||||
nodeID_hidden *NodeID
|
nodeID_hidden *NodeID
|
||||||
@ -141,23 +141,16 @@ func (t *dht) handleRes(res *dhtRes) {
|
|||||||
if res.dest == *rinfo.getNodeID() {
|
if res.dest == *rinfo.getNodeID() {
|
||||||
return
|
return
|
||||||
} // No infinite recursions
|
} // No infinite recursions
|
||||||
// ping the nodes we were told about
|
|
||||||
if len(res.infos) > dht_lookup_size {
|
if len(res.infos) > dht_lookup_size {
|
||||||
// Ignore any "extra" lookup results
|
// Ignore any "extra" lookup results
|
||||||
res.infos = res.infos[:dht_lookup_size]
|
res.infos = res.infos[:dht_lookup_size]
|
||||||
}
|
}
|
||||||
for _, info := range res.infos {
|
for _, info := range res.infos {
|
||||||
bidx, isOK := t.getBucketIndex(info.getNodeID())
|
if dht_firstCloserThanThird(info.getNodeID(), &res.dest, rinfo.getNodeID()) {
|
||||||
if !isOK {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
b := t.getBucket(bidx)
|
|
||||||
if b.contains(info) {
|
|
||||||
continue
|
|
||||||
} // wait for maintenance cycle to get them
|
|
||||||
t.addToMill(info, info.getNodeID())
|
t.addToMill(info, info.getNodeID())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (t *dht) lookup(nodeID *NodeID) []*dhtInfo {
|
func (t *dht) lookup(nodeID *NodeID) []*dhtInfo {
|
||||||
// FIXME this allocates a bunch, sorts, and keeps the part it likes
|
// FIXME this allocates a bunch, sorts, and keeps the part it likes
|
||||||
@ -167,7 +160,7 @@ func (t *dht) lookup(nodeID *NodeID) []*dhtInfo {
|
|||||||
if info == nil {
|
if info == nil {
|
||||||
panic("Should never happen!")
|
panic("Should never happen!")
|
||||||
}
|
}
|
||||||
if true || dht_firstCloserThanThird(info.getNodeID(), nodeID, &t.nodeID) {
|
if dht_firstCloserThanThird(info.getNodeID(), nodeID, &t.nodeID) {
|
||||||
res = append(res, info)
|
res = append(res, info)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -467,10 +460,43 @@ func (t *dht) doMaintenance() {
|
|||||||
}
|
}
|
||||||
t.offset++
|
t.offset++
|
||||||
}
|
}
|
||||||
if len(t.rumorMill) > 0 {
|
for len(t.rumorMill) > 0 {
|
||||||
var rumor dht_rumor
|
var rumor dht_rumor
|
||||||
rumor, t.rumorMill = t.rumorMill[0], t.rumorMill[1:]
|
rumor, t.rumorMill = t.rumorMill[0], t.rumorMill[1:]
|
||||||
|
if rumor.target == rumor.info.getNodeID() {
|
||||||
|
// Note that the above is a pointer comparison, and target can be nil
|
||||||
|
// This is only for adding new nodes (learned from other lookups)
|
||||||
|
// It only makes sense to ping if the node isn't already in the table
|
||||||
|
bidx, isOK := t.getBucketIndex(rumor.info.getNodeID())
|
||||||
|
if !isOK {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
b := t.getBucket(bidx)
|
||||||
|
if b.contains(rumor.info) {
|
||||||
|
// Already know about this node
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// This is a good spot to check if a node is worth pinging
|
||||||
|
add := len(b.other) < dht_bucket_size
|
||||||
|
if bidx+1 == t.nBuckets() {
|
||||||
|
add = true
|
||||||
|
}
|
||||||
|
bnext := t.getBucket(bidx + 1)
|
||||||
|
if len(bnext.other) < dht_bucket_size {
|
||||||
|
add = true
|
||||||
|
}
|
||||||
|
for _, info := range b.other {
|
||||||
|
if dht_firstCloserThanThird(rumor.info.getNodeID(), &t.nodeID, info.getNodeID()) {
|
||||||
|
// Add the node if they are closer to us than someone in the same bucket
|
||||||
|
add = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !add {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
t.ping(rumor.info, rumor.target)
|
t.ping(rumor.info, rumor.target)
|
||||||
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -488,8 +514,11 @@ func dht_firstCloserThanThird(first *NodeID,
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *dht) resetPeers() {
|
func (t *dht) reset() {
|
||||||
|
// This is mostly so bootstrapping will reset to resend coords into the network
|
||||||
for _, b := range t.buckets_hidden {
|
for _, b := range t.buckets_hidden {
|
||||||
b.peers = b.peers[:0]
|
b.peers = b.peers[:0]
|
||||||
|
b.other = b.other[:0]
|
||||||
}
|
}
|
||||||
|
t.offset = 0
|
||||||
}
|
}
|
||||||
|
@ -80,7 +80,7 @@ func (r *router) mainLoop() {
|
|||||||
r.core.dht.insertIfNew(info, true)
|
r.core.dht.insertIfNew(info, true)
|
||||||
case <-r.reset:
|
case <-r.reset:
|
||||||
r.core.sessions.resetInits()
|
r.core.sessions.resetInits()
|
||||||
r.core.dht.resetPeers()
|
r.core.dht.reset()
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
{
|
{
|
||||||
// Any periodic maintenance stuff goes here
|
// Any periodic maintenance stuff goes here
|
||||||
|
@ -128,7 +128,7 @@ type switchMessage struct {
|
|||||||
type switchPort uint64
|
type switchPort uint64
|
||||||
type tableElem struct {
|
type tableElem struct {
|
||||||
port switchPort
|
port switchPort
|
||||||
firstSeen time.Time
|
//firstSeen time.Time
|
||||||
locator switchLocator
|
locator switchLocator
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -304,7 +304,7 @@ func (t *switchTable) handleMessage(msg *switchMessage, fromPort switchPort, sig
|
|||||||
doUpdate := false
|
doUpdate := false
|
||||||
if !equiv(&msg.locator, &oldSender.locator) {
|
if !equiv(&msg.locator, &oldSender.locator) {
|
||||||
doUpdate = true
|
doUpdate = true
|
||||||
sender.firstSeen = now
|
//sender.firstSeen = now
|
||||||
}
|
}
|
||||||
t.data.peers[fromPort] = sender
|
t.data.peers[fromPort] = sender
|
||||||
updateRoot := false
|
updateRoot := false
|
||||||
@ -355,7 +355,7 @@ func (t *switchTable) handleMessage(msg *switchMessage, fromPort switchPort, sig
|
|||||||
case t.core.router.reset <- struct{}{}:
|
case t.core.router.reset <- struct{}{}:
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
//t.core.log.Println("Switch update:", msg.Locator.Root, msg.Locator.Tstamp, msg.Locator.Coords)
|
//t.core.log.Println("Switch update:", msg.locator.root, msg.locator.tstamp, msg.locator.coords)
|
||||||
//fmt.Println("Switch update:", msg.Locator.Root, msg.Locator.Tstamp, msg.Locator.Coords)
|
//fmt.Println("Switch update:", msg.Locator.Root, msg.Locator.Tstamp, msg.Locator.Coords)
|
||||||
}
|
}
|
||||||
if t.data.locator.tstamp != msg.locator.tstamp {
|
if t.data.locator.tstamp != msg.locator.tstamp {
|
||||||
@ -397,7 +397,7 @@ func (t *switchTable) updateTable() {
|
|||||||
newTable.elems = append(newTable.elems, tableElem{
|
newTable.elems = append(newTable.elems, tableElem{
|
||||||
locator: loc,
|
locator: loc,
|
||||||
//degree: pinfo.degree,
|
//degree: pinfo.degree,
|
||||||
firstSeen: pinfo.firstSeen,
|
//firstSeen: pinfo.firstSeen,
|
||||||
//forward: pinfo.forward,
|
//forward: pinfo.forward,
|
||||||
port: pinfo.port,
|
port: pinfo.port,
|
||||||
})
|
})
|
||||||
|
Loading…
Reference in New Issue
Block a user