5
0
mirror of https://github.com/cwinfo/yggdrasil-go.git synced 2024-11-29 01:41:37 +00:00

Merge pull request #276 from Arceliar/dht

Tune DHT a little better
This commit is contained in:
Neil Alexander 2018-12-22 08:53:44 +00:00 committed by GitHub
commit e428077a2a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 44 additions and 13 deletions

View File

@ -12,7 +12,12 @@ import (
"github.com/yggdrasil-network/yggdrasil-go/src/crypto" "github.com/yggdrasil-network/yggdrasil-go/src/crypto"
) )
const dht_lookup_size = 16 const (
dht_lookup_size = 16
dht_timeout = 6 * time.Minute
dht_max_delay = 5 * time.Minute
dht_max_delay_dirty = 30 * time.Second
)
// dhtInfo represents everything we know about a node in the DHT. // dhtInfo represents everything we know about a node in the DHT.
// This includes its key, a cache of it's NodeID, coords, and timing/ping related info for deciding who/when to ping nodes for maintenance. // This includes its key, a cache of it's NodeID, coords, and timing/ping related info for deciding who/when to ping nodes for maintenance.
@ -23,6 +28,7 @@ type dhtInfo struct {
recv time.Time // When we last received a message recv time.Time // When we last received a message
pings int // Time out if at least 3 consecutive maintenance pings drop pings int // Time out if at least 3 consecutive maintenance pings drop
throttle time.Duration throttle time.Duration
dirty bool // Set to true if we've used this node in ping responses (for queries about someone other than the person doing the asking, i.e. real searches) since the last time we heard from the node
} }
// Returns the *NodeID associated with dhtInfo.key, calculating it on the fly the first time or from a cache all subsequent times. // Returns the *NodeID associated with dhtInfo.key, calculating it on the fly the first time or from a cache all subsequent times.
@ -134,6 +140,15 @@ func (t *dht) insert(info *dhtInfo) {
t.table[*info.getNodeID()] = info t.table[*info.getNodeID()] = info
} }
// Insert a peer into the table if it hasn't been pinged lately, to keep peers from dropping
func (t *dht) insertPeer(info *dhtInfo) {
oldInfo, isIn := t.table[*info.getNodeID()]
if !isIn || time.Since(oldInfo.recv) > dht_max_delay+30*time.Second {
// TODO? also check coords?
t.insert(info)
}
}
// Return true if first/second/third are (partially) ordered correctly. // Return true if first/second/third are (partially) ordered correctly.
func dht_ordered(first, second, third *crypto.NodeID) bool { func dht_ordered(first, second, third *crypto.NodeID) bool {
lessOrEqual := func(first, second *crypto.NodeID) bool { lessOrEqual := func(first, second *crypto.NodeID) bool {
@ -185,6 +200,14 @@ func (t *dht) handleReq(req *dhtReq) {
if _, isIn := t.table[*info.getNodeID()]; !isIn && t.isImportant(&info) { if _, isIn := t.table[*info.getNodeID()]; !isIn && t.isImportant(&info) {
t.ping(&info, nil) t.ping(&info, nil)
} }
// Maybe mark nodes from lookup as dirty
if req.Dest != *info.getNodeID() {
// This node asked about someone other than themself, so this wasn't just idle traffic.
for _, info := range res.Infos {
// Mark nodes dirty so we're sure to check up on them again later
info.dirty = true
}
}
} }
// Sends a lookup response to the specified node. // Sends a lookup response to the specified node.
@ -302,19 +325,32 @@ func (t *dht) doMaintenance() {
} }
t.callbacks = newCallbacks t.callbacks = newCallbacks
for infoID, info := range t.table { for infoID, info := range t.table {
if now.Sub(info.recv) > time.Minute || info.pings > 3 { switch {
case info.pings > 6:
// It failed to respond to too many pings
fallthrough
case now.Sub(info.recv) > dht_timeout:
// It's too old
fallthrough
case info.dirty && now.Sub(info.recv) > dht_max_delay_dirty && !t.isImportant(info):
// We won't ping it to refresh it, so just drop it
delete(t.table, infoID) delete(t.table, infoID)
t.imp = nil t.imp = nil
} }
} }
for _, info := range t.getImportant() { for _, info := range t.getImportant() {
if now.Sub(info.recv) > info.throttle { switch {
case now.Sub(info.recv) > info.throttle:
info.throttle *= 2
if info.throttle < time.Second {
info.throttle = time.Second
} else if info.throttle > dht_max_delay {
info.throttle = dht_max_delay
}
fallthrough
case info.dirty && now.Sub(info.recv) > dht_max_delay_dirty:
t.ping(info, nil) t.ping(info, nil)
info.pings++ info.pings++
info.throttle += time.Second
if info.throttle > 30*time.Second {
info.throttle = 30 * time.Second
}
} }
} }
} }

View File

@ -110,12 +110,7 @@ func (r *router) mainLoop() {
case p := <-r.send: case p := <-r.send:
r.sendPacket(p) r.sendPacket(p)
case info := <-r.core.dht.peers: case info := <-r.core.dht.peers:
now := time.Now() r.core.dht.insertPeer(info)
oldInfo, isIn := r.core.dht.table[*info.getNodeID()]
r.core.dht.insert(info)
if isIn && now.Sub(oldInfo.recv) < 45*time.Second {
info.recv = oldInfo.recv
}
case <-r.reset: case <-r.reset:
r.core.sessions.resetInits() r.core.sessions.resetInits()
r.core.dht.reset() r.core.dht.reset()